datalogger/mqttinflux/main.py

89 lines
2.6 KiB
Python

import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError
from influxdb.exceptions import InfluxDBServerError
import datetime
import logging
import json
import os
import ssl
import traceback
import os
import configparser
class App:
def __init__(self):
with open("mqtt-influx.json", 'r') as f:
self.config = json.load(f)
debug = os.environ.get("DEBUG")
if debug:
level = logging.INFO
else:
level = logging.WARNING
logging.basicConfig(level=level)
self.mqttInit()
self.influxInit()
def mqttInit(self):
self.topics = ["sensors"]
self.client = mqtt.Client()
self.client.username_pw_set(self.config["mqtt"]["user"],
self.config["mqtt"]["password"])
self.client.tls_set(cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
try:
self.client.connect(self.config["mqtt"]["host"], self.config["mqtt"]["port"], 60)
except:
logging.critical("Failed to connect to MQTT server")
exit(1)
def influxInit(self):
try:
self.influx_client = InfluxDBClient(self.config["influx"]["host"],
self.config["influx"]["port"],
database=self.config["influx"]["database"])
self.influx_client.create_database("sensors")
except Exception as e:
logging.critical("Failed to connect to database %s", e)
exit(1)
def start(self):
logging.info("Starting")
self.client.loop_forever()
def on_connect(self, client, userdata, flags, rc):
for topic in self.config["mqtt"]["topics"]:
client.subscribe(topic)
def on_message(self, client, userdata, msg):
current_time = datetime.datetime.utcnow().isoformat()
data = json.loads(msg.payload)
json_body = [
{
"measurement": msg.topic,
"tags": {},
"time": current_time,
"fields": data
}
]
logging.info(json_body)
try:
self.influx_client.write_points(json_body)
except InfluxDBClientError as e:
logging.error("Influx write error %s", e)
except Exception as e:
logging.critical("Influx server error %s", e)
traceback.print_exc()
exit(1)
if __name__ == "__main__":
app = App()
app.start()