Source code for sitchlib.gps_device

"""GPS device wrapper."""

from gps3 import gps3
from utility import Utility
import copy
import time


[docs]class GpsListener(object): """Wrap the GPS device with an iterator.""" def __init__(self, delay=60): """Initialize the GPS listener. Args: delay (int, optional): Delay between polls of gpsd. """ self.delay = delay self.gps_socket = gps3.GPSDSocket() self.data_stream = gps3.DataStream() self.gps_socket.connect() self.gps_socket.watch() def __iter__(self): """Periodically yield GPS coordinates of sensor. Yields: dict: GeoJSON representing GPS coordinates of sensor. """ for new_data in self.gps_socket: if Utility.is_valid_json(new_data): self.data_stream.unpack(new_data) if "lon" in self.data_stream.TPV: if self.data_stream.TPV['lon'] != 'n/a': geojson = {"scan_program": "gpsd", "type": "Feature", "sat_time": self.data_stream.TPV["time"], "sys_time": Utility.get_now_string(), "location": { "type": "Point", "coordinates": [ float(self.data_stream.TPV["lon"]), float(self.data_stream.TPV["lat"])]}} # NOQA geojson["time_drift"] = self.get_time_delta(geojson["sat_time"], # NOQA geojson["sys_time"]) # NOQA geojson["event_timestamp"] = Utility.get_now_string() yield copy.deepcopy(geojson) time.sleep(self.delay)
[docs] @classmethod def get_time_delta(cls, iso_1, iso_2): """Get the drift, in minutes, between two ISO times.""" dt_1 = Utility.dt_from_iso(iso_1) dt_2 = Utility.dt_from_iso(iso_2) return Utility.dt_delta_in_minutes(dt_1, dt_2)