This commit is contained in:
jimmy 2021-05-29 03:12:56 +00:00
parent 1cba58857b
commit bad182b916
1 changed files with 76 additions and 37 deletions

View File

@ -1,49 +1,88 @@
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError
from influxdb.exceptions import InfluxDBServerError
import datetime import datetime
import logging import logging
import json import json
import os import os
import ssl
import traceback
def on_connect(client, userdata, flags, rc): import os
for topic in topics: import configparser
client.subscribe(topic)
class App:
def __init__(self):
with open("mqtt-influx.json", 'r') as f:
self.config = json.load(f)
debug = os.environ.get("DEBUG")
if debug:
level = logging.INFO
else:
level = logging.WARNING
logging.basicConfig(level=level)
def on_message(client, userdata, msg): self.mqttInit()
current_time = datetime.datetime.utcnow().isoformat() self.influxInit()
data = json.loads(msg.payload)
print(data)
json_body = [
{
"measurement": msg.topic,
"tags": {},
"time": current_time,
"fields": data
}
]
logging.info(json_body)
influx_client.write_points(json_body)
if 'TOPICS' not in os.environ:
print("At least one topic is needed.")
exit(1)
topics = os.environ['TOPICS'].split(' ')
print(topics)
logging.basicConfig(level=logging.INFO) def mqttInit(self):
influx_client = InfluxDBClient(os.environ['INFLUX_HOST'], int(os.environ['INFLUX_PORT']), database=os.environ["INFLUX_DB"]) self.topics = ["sensors"]
influx_client.create_database(os.environ["INFLUX_DB"]) self.client = mqtt.Client()
self.client.username_pw_set(self.config["mqtt"]["user"],
self.config["mqtt"]["password"])
self.client.tls_set(cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
try:
self.client.connect(self.config["mqtt"]["host"], self.config["mqtt"]["port"], 60)
except:
logging.critical("Failed to connect to MQTT server")
exit(1)
def influxInit(self):
try:
self.influx_client = InfluxDBClient(self.config["influx"]["host"],
self.config["influx"]["port"],
database=self.config["influx"]["database"])
self.influx_client.create_database("sensors")
except Exception as e:
logging.critical("Failed to connect to database %s", e)
exit(1)
def start(self):
logging.info("Starting")
self.client.loop_forever()
def on_connect(self, client, userdata, flags, rc):
for topic in self.config["mqtt"]["topics"]:
client.subscribe(topic)
def on_message(self, client, userdata, msg):
current_time = datetime.datetime.utcnow().isoformat()
data = json.loads(msg.payload)
json_body = [
{
"measurement": msg.topic,
"tags": {},
"time": current_time,
"fields": data
}
]
logging.info(json_body)
try:
self.influx_client.write_points(json_body)
except InfluxDBClientError as e:
logging.error("Influx write error %s", e)
except Exception as e:
logging.critical("Influx server error %s", e)
traceback.print_exc()
exit(1)
client = mqtt.Client() if __name__ == "__main__":
app = App()
client.on_connect = on_connect app.start()
client.on_message = on_message
client.connect(os.environ['MQTT_HOST'], int(os.environ["MQTT_PORT"]), 60)
print("Starting")
client.loop_forever()