import paho.mqtt.client as mqtt from influxdb import InfluxDBClient from influxdb.exceptions import InfluxDBClientError from influxdb.exceptions import InfluxDBServerError import datetime import logging import json import os import ssl import traceback import os import configparser class App: def __init__(self): with open("mqtt-influx.json", 'r') as f: self.config = json.load(f) debug = os.environ.get("DEBUG") if debug: level = logging.INFO else: level = logging.WARNING logging.basicConfig(level=level) self.mqttInit() self.influxInit() def mqttInit(self): self.topics = ["sensors"] self.client = mqtt.Client() self.client.username_pw_set(self.config["mqtt"]["user"], self.config["mqtt"]["password"]) self.client.tls_set(cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS) self.client.on_connect = self.on_connect self.client.on_message = self.on_message try: self.client.connect(self.config["mqtt"]["host"], self.config["mqtt"]["port"], 60) except: logging.critical("Failed to connect to MQTT server") exit(1) def influxInit(self): try: self.influx_client = InfluxDBClient(self.config["influx"]["host"], self.config["influx"]["port"], database=self.config["influx"]["database"]) self.influx_client.create_database("sensors") except Exception as e: logging.critical("Failed to connect to database %s", e) exit(1) def start(self): logging.info("Starting") self.client.loop_forever() def on_connect(self, client, userdata, flags, rc): for topic in self.config["mqtt"]["topics"]: client.subscribe(topic) def on_message(self, client, userdata, msg): current_time = datetime.datetime.utcnow().isoformat() data = json.loads(msg.payload) json_body = [ { "measurement": msg.topic, "tags": {}, "time": current_time, "fields": data } ] logging.info(json_body) try: self.influx_client.write_points(json_body) except InfluxDBClientError as e: logging.error("Influx write error %s", e) except Exception as e: logging.critical("Influx server error %s", e) traceback.print_exc() exit(1) if __name__ == "__main__": app = App() app.start()