"""General utilities."""
# from __future__ import print_function
import datetime
import dateutil.parser as du_parser
import json
import os
import pprint
import psutil
import subprocess
import requests
from location_tool import LocationTool
[docs]class Utility:
"""General utility class."""
[docs] @classmethod
def dt_delta_in_minutes(cls, dt_1, dt_2):
"""Calculate the delta between two datetime objects, in minutes."""
delta_seconds = abs((dt_1.replace(tzinfo=None) -
dt_2.replace(tzinfo=None)).total_seconds())
retval = delta_seconds / 60
return retval
@classmethod
def generate_base_event(cls):
base_event = {"site_name": os.getenv('LOCATION_NAME', 'SITCH_SITE'),
"sensor_name": os.getenv('RESIN_DEVICE_NAME_AT_INIT',
'NOT_RESIN-MANAGED'),
"sensor_id": os.getenv('HOSTNAME', 'NO_HOSTNAME'),
"event_timestamp": cls.get_now_string(),
"event_type": "base_event"}
return base_event.copy()
[docs] @classmethod
def dt_from_iso(cls, iso_time):
"""Exchange an ISO8601-formatted string for a datetime object."""
return du_parser.parse(iso_time)
[docs] @classmethod
def epoch_to_iso8601(cls, unix_time):
"""Transform epoch time to ISO8601 format."""
cleaned = float(unix_time)
return datetime.datetime.utcfromtimestamp(cleaned).isoformat()
[docs] @classmethod
def get_now_string(cls):
"""Get ISO8601 timestamp for now."""
now = datetime.datetime.now().isoformat()
return now
[docs] @classmethod
def start_component(cls, runcmd):
"""Start a thing."""
try:
subprocess.Popen(runcmd.split())
except KeyError as e:
print(e)
return False
return True
[docs] @classmethod
def create_path_if_nonexistent(cls, path):
"""Create filesystem directory path."""
if os.path.exists(path) and os.path.isdir(path):
return
elif os.path.exists(os.path.dirname(path)):
return
os.makedirs(os.path.dirname(path))
print(("Utility: Creating directory: %s") % path)
return
[docs] @classmethod
def create_file_if_nonexistent(cls, path, lfile):
"""Create file and path, if it doesn't already exist."""
fullpath = os.path.join(path, lfile)
if os.path.isfile(fullpath):
return
else:
logmsg = "Utility: Creating log file: %s" % fullpath
print(logmsg)
open(fullpath, 'a').close()
return
[docs] @classmethod
def write_file(cls, location, contents):
"""Write string to file."""
with open(location, 'w') as fh:
fh.write(contents)
[docs] @classmethod
def strip_list(cls, raw_struct):
"""Strip contents from single-item list."""
if (type(raw_struct) is list and len(raw_struct)) == 1:
return raw_struct[0]
else:
return raw_struct
[docs] @classmethod
def get_public_ip(cls):
"""Get public IP."""
url = 'https://api.ipify.org/?format=json'
try:
result = (requests.get(url).json())['ip']
except requests.exceptions.ConnectionError:
print("Utility: Unable to get public IP from ipify.org. "
"Set to 127.0.0.1")
result = "127.0.0.1"
return result
[docs] @classmethod
def calculate_distance(cls, lon_1, lat_1, lon_2, lat_2):
"""Wrap the LocationTool.get_distance_between_points() fn."""
if None in [lon_1, lat_1, lon_2, lat_2]:
print("Utility: Geo coordinate is zero, not resolving distance.")
return 0
pos_1 = (lat_1, lon_1)
pos_2 = (lat_2, lon_2)
dist_in_km = LocationTool.get_distance_between_points(pos_1, pos_2)
dist_in_m = dist_in_km * 1000
return dist_in_m
[docs] @classmethod
def str_to_float(cls, s):
"""Change string to float."""
retval = None
try:
retval = float(s)
except:
errmsg = "Utility: Unable to convert %s to float" % str(s)
print(errmsg)
return retval
[docs] @classmethod
def heartbeat(cls, service_name):
"""Generate heartbeat message."""
scan = {"scan_program": "heartbeat",
"heartbeat_service_name": service_name,
"timestamp": Utility.get_now_string()}
return scan
[docs] @classmethod
def is_valid_json(cls, in_str):
"""Test string for json validity."""
try:
json.loads(in_str)
return True
except:
return False
[docs] @classmethod
def pretty_string(cls, structure):
"""Pretty-print lines."""
result = ""
pp = pprint.PrettyPrinter()
formatted = pp.pformat(structure)
for line in formatted.splitlines():
nextline = " %s\n" % line
result = result + nextline
return result
[docs] @classmethod
def hex_to_dec(cls, hx):
"""Change hex to decimal."""
try:
integer = int(str(hx), 16)
except Exception as e:
print("Unable to convert %s to an integer" % str(hx))
print(e)
integer = 0
return str(integer)
[docs] @classmethod
def construct_feed_file_name(cls, feed_dir, prefix):
"""Construct full path for feed file."""
file_name = "%s.csv.gz" % prefix
dest_file_name = os.path.join(feed_dir, file_name)
return dest_file_name
[docs] @classmethod
def validate_geojson(cls, geojson):
"""Ensure that geojson contains the right fields"""
valid = True
if len(geojson["coordinates"]) != 2:
valid = False
elif "type" not in geojson:
valid = False
elif geojson["type"] != "Point":
valid = False
return valid
@classmethod
def create_gmaps_link(cls, lat, lon):
return ("https://www.google.com/maps/search/?api=1&query=%s,%s" % (lat, lon)) # NOQA
# @classmethod
# def hdmi_print(cls, message):
# with open('/dev/tty1', 'w') as ttyfile:
# print(message, file=ttyfile)
# return