Source code for sitchlib.gsm_modem

"""GSM Modem device ...driver?..."""

import re
import serial
import sys
import time


[docs]class GsmModem(object): """GSM Modem handler class. Interfaces with device over serial. Calling GsmModem.set_eng_mode() causes the module to go into engineering mode, which will cause it to return cell network information. It has an iterator (generator) built in that cranks out dicts. """ def __init__(self, ser_port): """Initialization opens the port. Args: ser_port (str): Device to open and interact with. """ self.eng_init = 'AT+CENG=2,1 \r\n' self.unset_eng = 'AT+CENG=0 \r\n' self.gps_init = 'AT+CGPSINF=0 \r\n' self.echo_off = 'ATE0 \r\n' self.reg_info = 'AT+COPS? \r\n' self.imsi_info = 'AT+CIMI \r\n' self.config_dump = 'ATV1Q0&V \r\n' print("GSM: opening serial port: %s" % ser_port) time.sleep(10) self.serconn = serial.Serial(ser_port, 4800, timeout=1) ser_open_iter = 0 while not self.serconn.is_open: print("GSM: Attempting to open %s again..." % ser_port) time.sleep(1) ser_open_iter = ser_open_iter + 1 self.serconn.open() if ser_open_iter > 5: print("GSM: Failed to open serial port %s!" % ser_port) sys.exit(2) return def __iter__(self): """Yield scans from GSM modem.""" page = [] while True: line = None line = self.serconn.readline() processed_line = self.process_line(line) if line is None: pass elif processed_line is None: pass elif processed_line == {}: pass elif "cell" in processed_line: if (str(processed_line["cell"]) == str(0) and page != []): yield page page = [] page.append(processed_line) else: page.append(processed_line)
[docs] def eng_mode(self, status): """Set or unset engineering mode on the modem. Args: status (bool): True to enable engineering mode, False to disable. """ self.serconn.flush() if status is False: print("GsmModem: Unsetting engineering mode, flushing") self.serconn.write(self.unset_eng) while True: output = self.serconn.readline() if output == '': break else: print(output) else: print("GsmModem: Setting engineering mode") self.serconn.write(self.eng_init) self.serconn.flush() time.sleep(2) output = self.serconn.readline() print(output) self.serconn.flush() return
[docs] def get_reg_info(self): """Get registration information from the modem.""" self.serconn.write(self.reg_info) self.serconn.flush() time.sleep(2) output = self.serconn.readline() if "AT+" in output: output = GsmModem.clean_operator_string(self.serconn.readline()) print(output) self.serconn.flush() return output
[docs] def dump_config(self): """Dump modem's configuration.""" self.serconn.write(self.config_dump) self.serconn.flush() time.sleep(2) retval = [] while True: output = self.serconn.readline() if output == '': break retval.append(str(output)) self.serconn.flush() return retval
[docs] def get_imsi(self): """Get the IMSI of the SIM installed in the modem.""" rx = r'(?P<imsi>\S+)' self.serconn.write(self.imsi_info) self.serconn.flush() time.sleep(2) retval = [] while True: output = self.serconn.readline() if output == '': break if "AT+CIMI" in output: continue if output == "\r\n": continue if "OK\r\n" in output: continue retval.append(str(output)) self.serconn.flush() try: test_string = "".join(retval).replace('\r\n', '') retval = re.match(rx, test_string).group("imsi") except AttributeError as e: print("GSM: Unable to clean up IMSI") print(e) return retval
[docs] def set_band(self, band): """Set the band the GSM modem should communicate on. If the band does not set correctly, an error will print to stdout and the original setting will persist. Args: band (str): Pick one: `EGSM_MODE`, `PGSM_MODE`, `DCS_MODE`, `GSM850_MODE`, `PCS_MODE`, `EGSM_DCS_MODE`, `GSM850_PCS_MODE`, `EGSM_PCS_MODE`, or `ALL_BAND`. """ if band in ["EGSM_MODE", "PGSM_MODE", "DCS_MODE", "GSM850_MODE", "PCS_MODE", "EGSM_DCS_MODE", "GSM850_PCS_MODE", "EGSM_PCS_MODE", "ALL_BAND"]: term_command = "AT+CBAND=\"%s\" \r\n" % band print("GSM: Setting GSM band with: %s" % term_command) self.serconn.write(term_command) self.serconn.flush() time.sleep(2) output = self.serconn.readline() print(output) self.serconn.flush() else: print("GSM: Not setting band, unrecognized value: %s" % band)
[docs] @classmethod def clean_operator_string(cls, operator_string): """Clean up the operator string.""" rx = r'^[^\"]+\"(?P<operator_name>[^\"]+)\"' try: cleaned = re.match(rx, operator_string).group("operator_name") except AttributeError as e: print("GSM: Unable to clean up operator string") print(e) cleaned = operator_string return cleaned
[docs] @classmethod def process_line(cls, line): """Process line output from GSM modem. We expect to see only lines starting with `+CENG:`. Otherwise, it's an empty dictionary getting returned. Args: line (str): Raw line output from GSM modem. Returns: dict: Structured data parsed from `line`. """ processed = None if line.startswith('+CENG:'): dataz = line.split(':')[1].lstrip().replace('"', '').replace('\r\n', '') # NOQA line_parts = dataz.split(',') if len(line_parts) == 12: processed = GsmModem.process_12(line_parts) elif len(line_parts) == 7: processed = GsmModem.process_7(line_parts) elif len(line_parts) == 8: processed = GsmModem.process_8(line_parts) else: print("GSM: Unrecognized GSM message format:") print(line) elif line.startswith('AT+'): processed = {} elif re.match('^\s*$', line): processed = {} elif re.match('^OK\s*$', line): processed = {} else: print("GSM: Unprocessable line from modem!") print(line) processed = {} return processed
[docs] @classmethod def process_12(cls, parts): """Process a 12-part CENG message. Args: parts (list): Parts parsed from original CENG message. Returns: dict: Structured cell channel metadata. """ retval = {"cell": int(parts[0]), "arfcn": int(parts[1]), "rxl": int(parts[2]), "rxq": int(parts[3]), "mcc": parts[4], "mnc": parts[5], "bsic": parts[6], "cellid": parts[7], "rla": int(parts[8]), "txp": int(parts[9]), "lac": parts[10], "ta": int(parts[11]) } return retval
[docs] @classmethod def process_8(cls, parts): """Process an 8-part CENG message. Args: parts (list): Parts parsed from original CENG message. Returns: dict: Structured cell channel metadata. """ retval = {"cell": int(parts[0]), "arfcn": int(parts[1]), "rxl": int(parts[2]), "bsic": parts[3], "cellid": parts[4], "mcc": parts[5], "mnc": parts[6], "lac": parts[7] } return retval
[docs] @classmethod def process_7(cls, parts): """Process a 7-part CENG message. In a 7-item line, cellid is not provided. We set it to 0 to prevent barfing elsewhere. Args: parts (list): Parts parsed from original CENG message. Returns: dict: Structured cell channel metadata. """ retval = {"cell": int(parts[0]), "arfcn": int(parts[1]), "rxl": int(parts[2]), "bsic": parts[3], "mcc": parts[4], "mnc": parts[5], "lac": parts[6], "cellid": 0 } return retval