datalogger/mqttinflux/main.py

89 lines
2.6 KiB
Python
Raw Permalink Normal View History

2021-02-19 11:06:59 +00:00
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError
from influxdb.exceptions import InfluxDBServerError
2021-02-19 11:06:59 +00:00
import datetime
import logging
import json
import os
import ssl
import traceback
import os
import configparser
class App:
def __init__(self):
with open("mqtt-influx.json", 'r') as f:
self.config = json.load(f)
debug = os.environ.get("DEBUG")
if debug:
level = logging.INFO
else:
level = logging.WARNING
logging.basicConfig(level=level)
self.mqttInit()
self.influxInit()
def mqttInit(self):
self.topics = ["sensors"]
self.client = mqtt.Client()
self.client.username_pw_set(self.config["mqtt"]["user"],
self.config["mqtt"]["password"])
self.client.tls_set(cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
try:
self.client.connect(self.config["mqtt"]["host"], self.config["mqtt"]["port"], 60)
except:
logging.critical("Failed to connect to MQTT server")
exit(1)
def influxInit(self):
try:
self.influx_client = InfluxDBClient(self.config["influx"]["host"],
self.config["influx"]["port"],
database=self.config["influx"]["database"])
self.influx_client.create_database("sensors")
except Exception as e:
logging.critical("Failed to connect to database %s", e)
exit(1)
def start(self):
logging.info("Starting")
self.client.loop_forever()
def on_connect(self, client, userdata, flags, rc):
for topic in self.config["mqtt"]["topics"]:
client.subscribe(topic)
def on_message(self, client, userdata, msg):
current_time = datetime.datetime.utcnow().isoformat()
data = json.loads(msg.payload)
json_body = [
{
"measurement": msg.topic,
"tags": {},
"time": current_time,
"fields": data
}
]
logging.info(json_body)
try:
self.influx_client.write_points(json_body)
except InfluxDBClientError as e:
logging.error("Influx write error %s", e)
except Exception as e:
logging.critical("Influx server error %s", e)
traceback.print_exc()
exit(1)
if __name__ == "__main__":
app = App()
app.start()