From bad182b916577eeab5ede3c40644beede4d58152 Mon Sep 17 00:00:00 2001 From: jimmy Date: Sat, 29 May 2021 03:12:56 +0000 Subject: [PATCH] Copy from https://git.chch.tech/chchtech/mqtt-influx --- mqttinflux/main.py | 113 ++++++++++++++++++++++++++++++--------------- 1 file changed, 76 insertions(+), 37 deletions(-) diff --git a/mqttinflux/main.py b/mqttinflux/main.py index f148ea8..31434be 100644 --- a/mqttinflux/main.py +++ b/mqttinflux/main.py @@ -1,49 +1,88 @@ import paho.mqtt.client as mqtt from influxdb import InfluxDBClient +from influxdb.exceptions import InfluxDBClientError +from influxdb.exceptions import InfluxDBServerError import datetime import logging import json import os - - -def on_connect(client, userdata, flags, rc): - for topic in topics: - client.subscribe(topic) +import ssl +import traceback +import os +import configparser + +class App: + def __init__(self): + with open("mqtt-influx.json", 'r') as f: + self.config = json.load(f) + debug = os.environ.get("DEBUG") + if debug: + level = logging.INFO + else: + level = logging.WARNING + logging.basicConfig(level=level) -def on_message(client, userdata, msg): - current_time = datetime.datetime.utcnow().isoformat() - data = json.loads(msg.payload) - print(data) - json_body = [ - { - "measurement": msg.topic, - "tags": {}, - "time": current_time, - "fields": data - } - ] - logging.info(json_body) - influx_client.write_points(json_body) - -if 'TOPICS' not in os.environ: - print("At least one topic is needed.") - exit(1) -topics = os.environ['TOPICS'].split(' ') -print(topics) + self.mqttInit() + self.influxInit() -logging.basicConfig(level=logging.INFO) -influx_client = InfluxDBClient(os.environ['INFLUX_HOST'], int(os.environ['INFLUX_PORT']), database=os.environ["INFLUX_DB"]) -influx_client.create_database(os.environ["INFLUX_DB"]) + def mqttInit(self): + self.topics = ["sensors"] + self.client = mqtt.Client() + self.client.username_pw_set(self.config["mqtt"]["user"], + self.config["mqtt"]["password"]) + self.client.tls_set(cert_reqs=ssl.CERT_REQUIRED, + tls_version=ssl.PROTOCOL_TLS) + self.client.on_connect = self.on_connect + self.client.on_message = self.on_message + + try: + self.client.connect(self.config["mqtt"]["host"], self.config["mqtt"]["port"], 60) + except: + logging.critical("Failed to connect to MQTT server") + exit(1) + + def influxInit(self): + try: + self.influx_client = InfluxDBClient(self.config["influx"]["host"], + self.config["influx"]["port"], + database=self.config["influx"]["database"]) + self.influx_client.create_database("sensors") + except Exception as e: + logging.critical("Failed to connect to database %s", e) + exit(1) + + def start(self): + logging.info("Starting") + self.client.loop_forever() + + def on_connect(self, client, userdata, flags, rc): + for topic in self.config["mqtt"]["topics"]: + client.subscribe(topic) + + def on_message(self, client, userdata, msg): + current_time = datetime.datetime.utcnow().isoformat() + data = json.loads(msg.payload) + json_body = [ + { + "measurement": msg.topic, + "tags": {}, + "time": current_time, + "fields": data + } + ] + logging.info(json_body) + try: + self.influx_client.write_points(json_body) + except InfluxDBClientError as e: + logging.error("Influx write error %s", e) + except Exception as e: + logging.critical("Influx server error %s", e) + traceback.print_exc() + exit(1) -client = mqtt.Client() - -client.on_connect = on_connect -client.on_message = on_message - -client.connect(os.environ['MQTT_HOST'], int(os.environ["MQTT_PORT"]), 60) +if __name__ == "__main__": + app = App() + app.start() -print("Starting") - -client.loop_forever() \ No newline at end of file