"""ARFCN Correlator."""
import alert_manager
import os
import sqlite3
from utility import Utility
[docs]class ArfcnCorrelator(object):
"""The ArfcnCorrelator compares ARFCN metadata against feeds and threshold.
The feed data is put in place by the FeedManager class, prior to
instantiating the ArfcnCorrelator.
"""
def __init__(self, feed_dir, whitelist, power_threshold, device_id):
"""Initializing the ArfcnCorrelator.
Args:
arfcn_db (str): Full path to the ARFCN database.
whitelist (list): This is a list of ARFCNs that should be
considered trustworthy enough to skip feed comparison.
This does not override comparison against threshold.
power_threshold (str): No matter the type, it will be coerced,
if possible, to float. This is the value that Kalibrate-
reported channel power will be compared against to make a
determination on whether or not to fire an alarm.
"""
self.alerts = alert_manager.AlertManager(device_id)
self.geo_state = {"type": "Point", "coordinates": [0, 0]}
self.feed_dir = feed_dir
self.arfcn_db = os.path.join(feed_dir, "arfcn.db")
self.power_threshold = float(power_threshold)
self.observed_arfcn = whitelist
self.arfcn_threshold = []
self.arfcn_range = []
return
[docs] def correlate(self, scan_bolus):
"""Entrypoint for correlation, wraps individual checks.
Args:
scan_bolus (tuple): Position 0 contains a string defining scan
type. If it's type 'gps', the geo_state instance variable
will be updated with Position 1's contents. If the scan type
is 'kal_channel', we perform feed and threshold comparison.
any other scan type will be compared against the feed only.
Returns:
list: Returns a list of alerts. If no alerts are generated, an
empty list is returned.
"""
retval = []
scan_type = scan_bolus[0]
scan = scan_bolus[1]
if scan_type == "gps":
self.geo_state = scan["location"]
arfcn = ArfcnCorrelator.arfcn_from_scan(scan_type, scan)
if scan_type == "kal_channel":
scan_bolus[1]["location"] = self.geo_state
retval.append(scan_bolus)
if self.arfcn_over_threshold(scan["power"]):
message = "ARFCN %s over threshold at %s / %s. Observed: %s" % ( # NOQA
scan["channel"],
scan["site_name"],
scan["sensor_name"],
scan["power"])
alert = self.alerts.build_alert(200, message, self.geo_state)
alert[1]["site_name"] = scan["site_name"]
alert[1]["sensor_name"] = scan["sensor_name"]
alert[1]["sensor_id"] = scan["sensor_id"]
retval.append(alert)
self.manage_arfcn_lists("in", arfcn, "threshold")
else:
self.manage_arfcn_lists("out", arfcn, "threshold")
feed_alerts = self.compare_arfcn_to_feed(arfcn, scan["site_name"],
scan["sensor_name"])
for feed_alert in feed_alerts:
feed_alert[1]["site_name"] = scan["site_name"]
feed_alert[1]["sensor_name"] = scan["sensor_name"]
feed_alert[1]["sensor_id"] = scan["sensor_id"]
retval.append(feed_alert)
self.observed_arfcn.append(arfcn)
return retval
[docs] def manage_arfcn_lists(self, direction, arfcn, aspect):
"""Manage the instance variable lists of ARFCNs.
This is necessary to maintain an accurate state over time, and reduce
unnecessary noise.
Args:
direction (str): Only will take action if this is "in" or "out"
arfcn (str): This is the ARFCN that will be moved in or our of
the list
aspect (str): This is used to match the ARFCN with the list it
should be moved in or out of. This should be either
"threshold" or "not_in_range".
"""
reference = {"threshold": self.arfcn_threshold,
"not_in_range": self.arfcn_range}
if direction == "in":
if reference[aspect].count(arfcn) > 0:
pass
else:
reference[aspect].append(arfcn)
elif direction == "out":
if reference[aspect].count(arfcn) == 0:
pass
else:
while arfcn in reference[aspect]:
reference[aspect].remove(arfcn)
return
[docs] def arfcn_over_threshold(self, arfcn_power):
"""Compare the ARFCN power against the thresholdset on instantiation.
Args:
arfcn_power (float): If this isn't a float already, it will be
coerced to float.
Returns:
bool: True if arfcn_power is over threshold, False if not.
"""
if float(arfcn_power) > self.power_threshold:
return True
else:
return False
[docs] def compare_arfcn_to_feed(self, arfcn, site_name, sensor_name):
"""Wrap other functions that dig into the FCC license DB.
This relies on the observed_arfcn instance variable for caching, to
skip DB comparison, that way we (probably) won't end up with a
forever-increasing queue size.
Args:
arfcn (str): This is the text representation of the ARFCN we want
to compare against the FCC license database.
Returns:
list: You get back a list of alerts as tuples, where position 0 is
'sitch_alert' and position 1 is the actual alert.
"""
results = []
# If we can't compare geo, have ARFCN 0 or already been found in feed:
if (str(arfcn) in ["0", None] or
arfcn in self.observed_arfcn or
self.geo_state["coordinates"] == [0, 0]):
return results
else:
msg = "ArfcnCorrelator: Cache miss on ARFCN %s" % str(arfcn)
print(msg)
results.extend(self.feed_alert_generator(arfcn, site_name,
sensor_name))
return results
[docs] def feed_alert_generator(self, arfcn, site_name, sensor_name):
"""Wrap the yield_arfcn_from_feed function, and generates alerts.
Args:
arfcn (str): This is the string representation of the ARFCN to be
correlated.
Returns:
list: This returns a list of alert tuples.
"""
results = []
if arfcn is None:
return results
if not self.match_arfcn_against_feed(arfcn, self.geo_state):
msg = "Unable to locate a license for ARFCN %s at %s / %s" % (
str(arfcn), site_name, sensor_name)
self.manage_arfcn_lists("in", arfcn, "not_in_range")
alert = self.alerts.build_alert(400, msg, self.geo_state)
results.append(alert)
return results
[docs] @classmethod
def arfcn_from_scan(cls, scan_type, scan_doc):
"""Pull the ARFCN from different scan types.
Args:
scan_type (str): "kal_channel", "gsm_modem_channel", or "gps".
scan_doc (dict): Scan document
Returns:
str: ARFCN from scan, or None if scan is unrecognized or
unsupported.
"""
if scan_type == "kal_channel":
return scan_doc["arfcn_int"]
elif scan_type == "gsm_modem_channel":
return scan_doc["arfcn"]
elif scan_type == "cell":
return None
elif scan_type == "scan":
return None
elif scan_type == "gps":
return None
else:
print("ArfcnCorrelator: Unknown scan type: %s" % str(scan_type))
return None
[docs] def match_arfcn_against_feed(self, arfcn, state_gps):
"""Get a match for the ARFCN within range of the sensor.
Args:
arfcn (str): Absolute Radio Frequency Channel Number
Returns:
bool: True if there is an ARFCN in range, False if not.
"""
result = False
conn = sqlite3.connect(self.arfcn_db)
c = conn.cursor()
clean_arfcn = str(arfcn)
c.execute("SELECT arfcn, carrier, lon, lat FROM arfcn WHERE arfcn=?", (clean_arfcn, )) # NOQA
for result in c.fetchall():
test_set = {"arfcn": result[0], "carrier": result[1],
"lon": result[2], "lat": result[3]}
if self.is_in_range(test_set, state_gps):
result = True
conn.close()
break
return result
[docs] @classmethod
def is_in_range(cls, item_gps, state_gps):
"""Return True if items are within 40km."""
state_gps_lat = state_gps["coordinates"][1]
state_gps_lon = state_gps["coordinates"][0]
max_range = 40000 # 40km
state_lon = state_gps_lon
state_lat = state_gps_lat
item_lon = item_gps["lon"]
item_lat = item_gps["lat"]
distance = Utility.calculate_distance(state_lon, state_lat,
item_lon, item_lat)
if distance > max_range:
return False
else:
return True