Source code for sitchlib.gsm_decomposer

"""Decompose GSM scans."""

from utility import Utility


[docs]class GsmDecomposer(object): """Decomposes GSM scans."""
[docs] @classmethod def decompose(cls, scan_document): """Turn one scan document into a list of channel scan documents. Args: scan_document (dict): GSM modem scan. Returns: list: List of tuples. First position in tuple identifies scan type. Second position is the actual scan data. """ results_set = [("cell", scan_document)] scan_items = scan_document["scan_results"] for channel in scan_items: try: channel = GsmDecomposer.enrich_channel_with_scan(channel, scan_document) # channel["arfcn_int"] = GsmDecomposer.arfcn_int(channel["arfcn"]) # NOQA if channel["arfcn"] == 0 and channel["cell"] != 0: continue # If the data is incomplete, we don't forward # Now we bring the hex values to decimal... channel = GsmDecomposer.convert_hex_targets(channel) channel = GsmDecomposer.convert_float_targets(channel) # Setting CGI identifiers channel["cgi_str"] = GsmDecomposer.make_bts_friendly(channel) channel["cgi_int"] = GsmDecomposer.get_cgi_int(channel) channel["event_type"] = "gsm_modem_channel" chan_enriched = ('gsm_modem_channel', channel) results_set.append(chan_enriched) except Exception as e: print("Exception caught in GsmDecomposer: %s for channel:\n %s\nin: %s" % (e, str(channel), str(scan_document))) # NOQA raise e return results_set
[docs] @classmethod def enrich_channel_with_scan(cls, channel, scan_document): """Enrich channel with scan document metadata.""" channel["band"] = scan_document["band"] channel["scan_finish"] = scan_document["scan_finish"] channel["site_name"] = scan_document["site_name"] channel["sensor_id"] = scan_document["sensor_id"] channel["sensor_name"] = scan_document["sensor_name"] channel["scanner_public_ip"] = scan_document["scanner_public_ip"] channel["event_timestamp"] = scan_document["scan_finish"] return channel
[docs] @classmethod def get_cgi_int(cls, channel): """Attempt to create an integer representation of CGI.""" try: cgi_int = int(channel["cgi_str"].replace(':', '')) except: print("EnrichGSM: Unable to convert CGI to int") print(channel["cgi_str"]) cgi_int = 0 return cgi_int
[docs] @classmethod def bts_from_channel(cls, channel): """Return clean BTS from channel.""" bts = {"mcc": channel["mcc"], "mnc": channel["mnc"], "lac": channel["lac"], "cellid": channel["cellid"]} return bts
[docs] @classmethod def make_bts_friendly(cls, bts_struct): """Expect a dict with keys for mcc, mnc, lac, cellid.""" retval = "%s:%s:%s:%s" % (str(bts_struct["mcc"]), str(bts_struct["mnc"]), str(bts_struct["lac"]), str(bts_struct["cellid"])) return retval
[docs] @classmethod def convert_hex_targets(cls, channel): """Convert LAC anc CellID from hex to decimal.""" for target in ['lac', 'cellid']: if target in channel: channel[target] = Utility.hex_to_dec(channel[target]) return channel
[docs] @classmethod def convert_float_targets(cls, channel): """Convert rxq and rxl to float.""" for target in ['rxq', 'rxl']: if target in channel: channel[target] = Utility.str_to_float(channel[target]) return channel