Source code for sitchlib.geo_correlator

"""Correlate based on geograpgic information."""

from alert_manager import AlertManager
from utility import Utility


[docs]class GeoCorrelator(object): """Geographic correlator.""" def __init__(self, device_id): """Initialize the Geographic Correlator.""" self.geo_anchor = {} self.threshold = 100 self.time_threshold = 10 self.device_id = device_id
[docs] def correlate(self, scan_bolus): """Correlate one geo event. The first time we get a geo event, we set the state and print a message to stdout to that effect. Every subsequent message is compared against the geo_anchor. Once the anchor is set, it does not change for the life of the instance. Correlation of subsequent events causes the distance beween the anchor and current event to be determined and if the threshold of 10km is exceeded, an alert is returned. Args: scan_bolus (tuple): Two-item tuple. Position 0 contains the scan type, which is not checked. We should only ever have geo events coming through this method. Position 1 is expected to contain geo json. Returns: list: List of alerts. If no alerts are fired, the list returned is zero-length. """ scan_body = scan_bolus[1] if self.geo_anchor == {}: self.geo_anchor = scan_body print("GeoCorrelator: Setting anchor to %s" % str(scan_body)) alerts = [] else: alerts = GeoCorrelator.geo_drift_check(self.geo_anchor, scan_body, self.threshold, self.device_id) for alert in GeoCorrelator.time_drift_check(scan_body, self.time_threshold, self.device_id): alerts.append(alert) for alert in alerts: alert[1]["site_name"] = scan_body["site_name"] alert[1]["sensor_name"] = scan_body["sensor_name"] alert[1]["sensor_id"] = scan_body["sensor_id"] return alerts
[docs] @classmethod def geo_drift_check(cls, geo_anchor, gps_scan, threshold, device_id): """Fire alarm if distance between points exceeds threshold. Args: geo_anchor (dict): Geographic anchor point, usually stored in an instance variable and passed in via the `correlate()` method. gps_scan (dict): Same format as geo_anchor, expects the same format as `geo_anchor`. threshold (int): Alerting threshold in km. Returns: list: list of alerts (usually just one) or an empty list of there are no alerts. """ lat_1 = geo_anchor["location"]["coordinates"][1] lon_1 = geo_anchor["location"]["coordinates"][0] lat_2 = gps_scan["location"]["coordinates"][1] lon_2 = gps_scan["location"]["coordinates"][0] current_distance = Utility.calculate_distance(lon_1, lat_1, lon_2, lat_2) if current_distance < threshold: return [] else: message = "Possible GPS spoofing attack! %d delta from anchor at %s / %s %s !" % (current_distance, gps_scan["site_name"], gps_scan["sensor_name"], Utility.create_gmaps_link(lat_1, lon_1)) # NOQA alert = AlertManager(device_id).build_alert(300, message, gps_scan["location"]) return[alert]
[docs] @classmethod def time_drift_check(cls, gps_scan, threshold_mins, device_id): """Checks drift value, alarms if beyond threshold.""" current_delta = gps_scan["time_drift"] if current_delta < threshold_mins: return [] else: message = "Possible GPS time spoofing attack! %d delta from system at %s / %s" % (current_delta, gps_scan["site_name"], gps_scan["sensor_name"]) # NOQA alert = AlertManager(device_id).build_alert(310, message, gps_scan["location"]) return[alert]