Source code for sitchlib.config_helper
"""Config Helper."""
import hvac
import json
import os
import pprint
import sys
import yaml
from device_detector import DeviceDetector as dd
from utility import Utility as utility
[docs]class ConfigHelper:
"""Manage configuration information for entire SITCH Sensor."""
def __init__(self, sitch_var_base_dir="/data/sitch/"):
"""Initialize ConfigHelper.
Args:
sitch_var_base_dir (str): Base directory for feed and log data.
"""
self.base_event = utility.generate_base_event()
self.detector = dd()
self.db_schemas = self.get_db_schemas()
self.db_translate_schemas = self.get_db_schema_translations()
self.print_devices_as_detected()
self.device_id = self.base_event["sensor_id"]
self.feed_radio_targets = self.get_list_from_env("FEED_RADIO_TARGETS")
self.site_name = self.base_event["site_name"]
self.sensor_name = self.base_event["sensor_name"]
self.platform_name = os.getenv('RESIN_DEVICE_TYPE', 'NOT_RESIN-MANAGED') # NOQA
self.log_prefix = os.path.join(sitch_var_base_dir, "log/")
self.log_host = ConfigHelper.get_from_env("LOG_HOST")
self.log_method = "local_file"
self.kal_band = ConfigHelper.get_from_env("KAL_BAND")
self.kal_gain = ConfigHelper.get_from_env("KAL_GAIN")
self.kal_threshold = ConfigHelper.get_from_env("KAL_THRESHOLD")
self.gsm_modem_band = ConfigHelper.get_from_env("GSM_MODEM_BAND")
self.health_check_interval = int(os.getenv("HEALTH_CHECK_INTERVAL",
3600))
self.gsm_modem_port = self.get_gsm_modem_port()
self.gps_device_port = self.get_gps_device_port()
self.ls_ca_path = "/host/run/dbus/crypto/ca.crt"
self.ls_cert_path = "/host/run/dbus/crypto/logstash.crt"
self.ls_key_path = "/host/run/dbus/crypto/logstash.key"
self.ls_crypto_base_path = "/host/run/dbus/crypto/"
self.vault_token = ConfigHelper.get_from_env("VAULT_TOKEN")
self.vault_url = ConfigHelper.get_from_env("VAULT_URL")
self.vault_path = ConfigHelper.get_from_env("VAULT_PATH")
self.mode = os.getenv("MODE", "GOGOGO")
self.public_ip = str(utility.get_public_ip())
self.feed_dir = os.path.join(sitch_var_base_dir, "feed/")
self.feed_url_base = ConfigHelper.get_from_env("FEED_URL_BASE")
self.mcc_list = ConfigHelper.get_list_from_env("MCC_LIST")
self.state_list = ConfigHelper.get_list_from_env("STATE_LIST")
self.vault_secrets = self.get_secret_from_vault()
self.gps_drift_threshold = 1000
self.filebeat_template = self.get_filebeat_template()
self.filebeat_config_file_path = "/etc/filebeat.yml"
self.arfcn_whitelist = ConfigHelper.get_list_from_env("ARFCN_WHITELIST", # NOQA
optional=True)
self.cgi_whitelist = ConfigHelper.get_list_from_env("CGI_WHITELIST",
optional=True)
self.no_feed_update = os.getenv("NO_FEED_UPDATE")
return
[docs] def print_devices_as_detected(self):
"""Print detected GPS and GSM devices."""
pp = pprint.PrettyPrinter()
print("\nConfigurator: Detected GSM modems:")
pp.pprint(self.detector.gsm_radios)
print("Configurator: Detected GPS devices:")
pp.pprint(self.detector.gps_devices)
return
[docs] def get_gsm_modem_port(self):
"""Get GSM modem port from detector, override with env var."""
if os.getenv('GSM_MODEM_PORT') is None:
if self.detector.gsm_radios != []:
target_device = self.detector.gsm_radios[0]["device"]
return target_device
return os.getenv('GSM_MODEM_PORT')
[docs] def get_gps_device_port(self):
"""Get GPS device from detector, override with env var."""
if os.getenv('GPS_DEVICE_PORT') is None:
if self.detector.gps_devices != []:
target_device = self.detector.gps_devices[0]
return target_device
return os.getenv('GPS_DEVICE_PORT')
[docs] def build_logrotate_config(self):
"""Generate logrotate config file contents."""
lr_options = str("{\nrotate 14" +
"\ndaily" +
"\ncompress" +
"\ndelaycompress" +
"\nmissingok" +
"\nnotifempty\n}")
lr_config = "%s/*.log %s" % (self.log_prefix, lr_options)
return lr_config
[docs] @classmethod
def get_filebeat_template(cls, filename="/etc/templates/filebeat.json"):
"""Get the filebeat config from template file."""
with open(filename, 'r') as template_file:
return json.load(template_file)
[docs] @classmethod
def get_db_schemas(cls, filename="/etc/schemas/feed_db_schema.yaml"):
"""Get the feed DB schemas from file."""
with open(filename, 'r') as schema_file:
return yaml.load(schema_file)
[docs] @classmethod
def get_db_schema_translations(cls, filename="/etc/schemas/feed_db_translation.yaml"): # NOQA
"""Get the feed DB schema translations from file."""
with open(filename, 'r') as translate_file:
return yaml.load(translate_file)
[docs] def write_filebeat_config(self):
"""Write out filebeat config to file."""
fb = self.filebeat_template
fb["output.logstash"]["hosts"] = [self.log_host]
fb["output.logstash"]["ssl.key"] = self.ls_key_path
fb["output.logstash"]["ssl.certificate"] = self.ls_cert_path
fb["output.logstash"]["ssl.certificate_authorities"] = [self.ls_ca_path] # NOQA
fb["filebeat.registry_file"] = os.path.join(self.log_prefix, "fb_registry") # NOQA
fb = self.set_filebeat_logfile_paths(self.log_prefix, fb)
with open(self.filebeat_config_file_path, 'w') as out_file:
yaml.safe_dump(fb, out_file)
return
[docs] @classmethod
def set_filebeat_logfile_paths(cls, log_prefix, filebeat_config):
"""Sets all log file paths to align with configured log prefix."""
placeholder = "/var/log/sitch/"
for prospector in filebeat_config["filebeat.prospectors"]:
working_paths = []
for path in prospector["paths"]:
w_path = path.replace(placeholder, "")
working_paths.append(os.path.join(log_prefix, w_path))
prospector["paths"] = working_paths
return filebeat_config
[docs] def get_secret_from_vault(self):
"""Retrieve secrets from Vault."""
client = hvac.Client(url=self.vault_url, token=self.vault_token)
print("Configurator: Get secrets from %s, with path %s" % (self.vault_url, # NOQA
self.vault_path)) # NOQA
try:
response = client.read(self.vault_path)
secrets = response["data"]
except Exception as e:
print("Configurator: Error in getting secret from vault!")
print(e)
secrets = "NONE"
return secrets
[docs] @classmethod
def get_from_env(cls, k):
"""Get configuration items from env vars. Hard exit if not set."""
retval = os.getenv(k)
if retval is None:
print("Configurator: Required config variable not set: %s" % k)
print("Configurator: Unable to continue. Exiting.")
sys.exit(2)
return retval
[docs] @classmethod
def get_list_from_env(cls, k, optional=False):
"""Get a list from environment variables.
If optional=True, the absence of this var will cause a hard exit.
"""
try:
retval = os.getenv(k).split(',')
except AttributeError:
retval = None
if retval is None and optional is False:
print("Configurator: Required config variable not set: %s" % k)
print("Configurator: Unable to continue. Exiting.")
sys.exit(2)
elif retval is None and optional is True:
retval = []
return retval