Source code for vodem.simple

"""
Vodem Simple

"""
import datetime
import logging
import time

from . import api, util

logging.basicConfig()


[docs]def sms_get_flag(): """ Sms get flag """ response = api.sms_received_flag() logging.getLogger(__name__).debug(response) return util.decode_number(response['sms_received_flag'])
[docs]def sms_set_flag(): """ Sms set flag """ response = api.sms_received_flag_flag() logging.getLogger(__name__).debug(response) return util.decode_number(response['sms_received_flag']) == 0
[docs]def sms_unread_count(): """ Sms unread count """ response = api.sms_unread_num() logging.getLogger(__name__).debug(response) return util.decode_number(response['sms_dev_unread_count'])
[docs]def sms_inbox_count(): """ Sms inbox count """ response = api.sms_capacity_info() logging.getLogger(__name__).debug(response) return util.decode_number(response['sms_nv_rev_total'])
[docs]def sms_outbox_count(): """ Sms outbox count """ response = api.sms_capacity_info() logging.getLogger(__name__).debug(response) return util.decode_number(response['sms_nv_send_total'])
[docs]def sms_draftbox_count(): """ Sms draftbox count """ response = api.sms_capacity_info() logging.getLogger(__name__).debug(response) return util.decode_number(response['sms_nv_draftbox_total'])
[docs]def sms_set_read(index): """ Sms set read Args index (int): """ encoded_index = '{0};'.format(index) params = { 'msg_id': encoded_index, } logging.getLogger(__name__).debug(params) response = api.set_msg_read(params) logging.getLogger(__name__).debug(response) return util.decode_result(response['result'])
[docs]def sms_save(number, message, date=None, async=False): """ Sms save Args number (str): message (str): date (datetime): async (bool): """ if date is None: date = datetime.datetime.now() encoded_number = '{0};'.format(number) encoded_message = util.encode_sms_message(message) encoded_time = util.encode_time(date) message_encoding = util.encoding_of(message) params = { 'SMSNumber': encoded_number, 'SMSMessage': encoded_message, 'sms_time': encoded_time, 'encode_type': message_encoding, } logging.getLogger(__name__).debug(params) response = api.save_sms(params) logging.getLogger(__name__).debug(response) if not async: return _wait_for_status('5')
[docs]def sms_inbox_unread(per_page=10, safe=False): """ Sms inbox unread Args per_page (int): safe (bool): """ current_page = 1 while True: current_page_data = sms_inbox_page( page=current_page, count=per_page, safe=safe) if len(current_page_data) != 0: for message in current_page_data: if message['tag'] == 1: yield message current_page += 1 else: return
[docs]def sms_inbox_read(per_page=10, safe=False): """ Sms inbox read Args per_page (int): safe (bool): """ current_page = 1 while True: current_page_data = sms_inbox_page( page=current_page, count=per_page, safe=safe) if len(current_page_data) != 0: for message in current_page_data: if message['tag'] == 0: yield message current_page += 1 else: return
[docs]def sms_inbox(per_page=10, safe=False): """ Sms inbox Args per_page (int): safe (bool): """ current_page = 1 while True: current_page_data = sms_inbox_page( page=current_page, count=per_page, safe=safe) if len(current_page_data) != 0: for message in current_page_data: yield message current_page += 1 else: return
[docs]def sms_inbox_page(page=1, count=10, safe=False): """ Sms inbox page Args page (int): count (int): safe (bool): """ encoded_page = '{0}'.format(page - 1) encoded_data_per_page = '{0}'.format(count) params = { 'tags': '12', 'data_per_page': encoded_data_per_page, 'page': encoded_page, } response = api.sms_page_data(params) response = [_decode_message(m, safe=safe) for m in response['messages']] return response
[docs]def sms_outbox(per_page=10, safe=False): """ Sms outbox Args per_page (int): safe (bool): """ current_page = 1 while True: current_page_data = sms_outbox_page( page=current_page, count=per_page, safe=safe) if len(current_page_data) != 0: for message in current_page_data: yield message current_page += 1 else: return
[docs]def sms_outbox_page(page=1, count=10, safe=False): """ Sms outbox page Args page (int): count (int): safe (bool): """ encoded_page = '{0}'.format(page - 1) encoded_data_per_page = '{0}'.format(count) params = { 'tags': '2', 'data_per_page': encoded_data_per_page, 'page': encoded_page, } response = api.sms_page_data(params) response = [_decode_message(m, safe=safe) for m in response['messages']] return response
[docs]def sms_draftbox(per_page=10, safe=False): """ Sms draftbox Args per_page (int): safe (bool): """ current_page = 1 while True: current_page_data = sms_draftbox_page( page=current_page, count=per_page, safe=safe) if len(current_page_data) != 0: for message in current_page_data: yield message current_page += 1 else: return
[docs]def sms_draftbox_page(page=1, count=10, safe=False): """ Sms draftbox page Args page (int): count (int): safe (bool): """ encoded_page = '{0}'.format(page - 1) encoded_data_per_page = '{0}'.format(count) params = { 'tags': '11', 'data_per_page': encoded_data_per_page, 'page': encoded_page, } response = api.sms_page_data(params) response = [_decode_message(m, safe=safe) for m in response['messages']] return response
[docs]def sms_send(number, message, date=None, index=-1, async=False): """ Sms send Args number (str): message (str): date (datetime): index (int): async (bool): """ if date is None: date = datetime.datetime.now() encoded_number = '{0};'.format(number) encoded_message = util.encode_sms_message(message) encoded_time = util.encode_time(date) encoded_index = '{0};'.format(index) message_encoding = util.encoding_of(message) params = { 'Number': encoded_number, 'MessageBody': encoded_message, 'sms_time': encoded_time, 'ID': encoded_index, 'encode_type': message_encoding, } logging.getLogger(__name__).debug(params) response = api.send_sms(params) logging.getLogger(__name__).debug(response) if not async: return _wait_for_status('4')
[docs]def sms_delete(index, async=False): """ Sms delete Args index (int): async (bool): """ encoded_index = '{0};'.format(index) params = { 'msg_id': encoded_index, } logging.getLogger(__name__).debug(params) response = api.delete_sms(params) logging.getLogger(__name__).debug(response) if not async: return _wait_for_status('6')
def _decode_message(message, safe): logging.getLogger(__name__).debug(message) message['content'] = util.try_decode( util.decode_sms_message, message['content'], safe=safe) message['date'] = util.try_decode( util.decode_time, message['date'], safe=safe) message['tag'] = util.try_decode( util.decode_sms_tag, message['tag'], safe=safe) message['id'] = util.try_decode( util.decode_sms_id, message['id'], safe=safe) logging.getLogger(__name__).debug(message) return message def _wait_for_status(sms_cmd, wait_max_times=10, wait_interval=2.0): wait_count = 0 params = { 'sms_cmd': sms_cmd, } logging.getLogger(__name__).debug(params) while True: # wait first to allow device time to process request if wait_count == wait_max_times: return # a float asks for more accurate intervals from time.sleep time.sleep(wait_interval) wait_count += 1 status_response = api.sms_cmd_status_info(params) logging.getLogger(__name__).debug(status_response) # if device has not processed anything for given sms_cmd since reset # response value is garbage, so ignore it and wait again if 'sms_cmd_status_result' in status_response.keys(): status = util.decode_sms_cmd_status_info( status_response['sms_cmd_status_result']) if status is not None: if status == 3: return True if status == 2: return False