You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

174 lines
7.4 KiB
Python

5 years ago
#!/usr/bin/python3
import datetime
import os
import sys
import time
import toml
from psutil import process_iter
import eigene_wetterdaten
import messwerte_umrechner as mwu
import weewx_db_model as db_weewx
import setup_logging
import db_postgrest
def config_laden():
configfile = os.path.join(SKRIPTPFAD, "wetterconfig.toml")
with open(configfile) as file:
return toml.loads(file.read())
SKRIPTPFAD = os.path.abspath(os.path.dirname(__file__))
CONFIG = config_laden()
LOGGER = setup_logging.create_logger("grafana_export", CONFIG["loglevel"])
SENDELIMIT = 1000
def check_process():
processliste = [p.cmdline() for p in process_iter() if "python" in p.name()]
return sum([p.count(sys.argv[0]) for p in processliste])
def rohdaten_laden(timestamp, stationsname):
aufruf_wiederholen = False
LOGGER.debug("Lese WeeWx Datenbank")
query = db_weewx.Archive.select().where(db_weewx.Archive.date_time > timestamp)\
.order_by(db_weewx.Archive.date_time.asc()).limit(int(SENDELIMIT + 2))
5 years ago
gesamtzahl = (len(query))
if gesamtzahl > SENDELIMIT:
gesamtzahl = SENDELIMIT
LOGGER.info(f"Gesamtanzahl an Datensätze: {gesamtzahl} wird geladen")
basiswetterdaten_liste = []
zusatzwetterdaten_liste = []
for nr, datensatz in enumerate(query):
LOGGER.debug(f"Datensatz {nr + 1} von {gesamtzahl}")
ts = datensatz.date_time
ts_with_tz = ts.replace(tzinfo=datetime.timezone.utc)
rohdaten = {"ts": ts_with_tz}
5 years ago
if datensatz.us_units == 1:
rohdaten["outtemp"] = mwu.temperaturumrechner(datensatz.out_temp)
rohdaten["inTemp"] = mwu.temperaturumrechner(datensatz.in_temp)
rohdaten["luftdruck"] = mwu.druckumrechner(datensatz.barometer)
rohdaten["wind"] = mwu.windumrechner(datensatz.wind_speed)
rohdaten["windboe"] = mwu.windumrechner(datensatz.wind_gust)
rohdaten["regenrate"] = mwu.regen_rate(datensatz.rain_rate)
rohdaten["regenaktuell"] = mwu.regen_menge(datensatz.rain)
rohdaten["taupunkt"] = mwu.temperaturumrechner(datensatz.dewpoint)
rohdaten["heatindex"] = mwu.temperaturumrechner(datensatz.heatindex)
rohdaten["windchill"] = mwu.temperaturumrechner(datensatz.windchill)
else:
rohdaten["outtemp"] = datensatz.out_temp
rohdaten["inTemp"] = datensatz.in_temp
rohdaten["luftdruck"] = datensatz.barometer
rohdaten["wind"] = datensatz.wind_speed
rohdaten["windboe"] = datensatz.wind_gust
rohdaten["regenrate"] = datensatz.rain_rate
rohdaten["regenaktuell"] = datensatz.rain
rohdaten["taupunkt"] = datensatz.dewpoint
rohdaten["heatindex"] = datensatz.heatindex
rohdaten["wincchill"] = datensatz.windchill
if isinstance(datensatz.out_humidity, (int, float)):
rohdaten["outluftfeuchte"] = float(round(datensatz.out_humidity, 0))
else:
rohdaten["outluftfeuchte"] = None
if isinstance(datensatz.in_humidity, (int, float)):
rohdaten["inLuftfeuchte"] = float(round(datensatz.in_humidity, 0))
else:
rohdaten["inLuftfeuchte"] = None
if isinstance(datensatz.wind_dir, (int, float)):
rohdaten["windrichtung_grad"] = float(round(datensatz.wind_dir, 1))
else:
rohdaten["windrichtung_grad"] = None
if isinstance(datensatz.wind_gust_dir, (int, float)):
rohdaten["windboe_richtung"] = float(round(datensatz.wind_gust_dir, 1))
else:
rohdaten["windboe_richtung"] = None
rohdaten["windrichtung"] = mwu.himmelsrichtungwandler(datensatz.wind_dir)
rohdaten["out_abs_luftfeuchte"] = mwu.absolute_luftfeuchtigkeit(rohdaten["outtemp"], rohdaten["outluftfeuchte"])
rohdaten = eigene_wetterdaten.eigene_wetterdaten(rohdaten)
basiswetterdaten_liste.append(
db_postgrest.Basiswetterdaten(stationsname=stationsname,
**{key: value for key, value in rohdaten.items() if
key in db_postgrest.Basiswetterdaten.__dataclass_fields__}))
zusatzdatenliste = [{key: value} for key, value in rohdaten.items() if
key not in db_postgrest.Basiswetterdaten.__dataclass_fields__]
for zusatzdaten in zusatzdatenliste:
for key, value in zusatzdaten.items():
if key is None or value is None:
continue
zusatzwetterdaten_liste.append(
db_postgrest.Zusatzwetterdaten(ts=rohdaten["ts"], stationsname=stationsname, wertname=key,
wert=value, public=False))
if nr >= SENDELIMIT - 1:
aufruf_wiederholen = True
break
return basiswetterdaten_liste, zusatzwetterdaten_liste, aufruf_wiederholen
def freigabe_setzen(zusatzwetterdaten_liste):
for zusatzdaten in zusatzwetterdaten_liste:
if zusatzdaten.wertname in CONFIG["grafana"]["public"]:
zusatzdaten.public = CONFIG["grafana"]["public"][zusatzdaten.wertname]
else:
zusatzdaten.public = False
return zusatzwetterdaten_liste
def main():
if sys.platform == "linux":
laufende_prozesse = check_process()
if laufende_prozesse > 1:
print("EXIT aufgrund laufender Prozesse")
sys.exit()
# Verzögerung aufgrund vom Cronjob, >>alle 5Minute, damit es nicht mit der Erstellung von Weewx kolidiert
time.sleep(CONFIG["weewx"]["sleeptime"])
5 years ago
db_adapter = CONFIG["weewx"]["db"]
db = db_weewx.init_db(CONFIG["weewx"][db_adapter]["database"], db_adapter, CONFIG["weewx"].get(db_adapter))
db_weewx.database.initialize(db)
headers = {f"Authorization": "{user} {token}".format(user=CONFIG["zieldb"]["postgrest"]["user"],
token=CONFIG["zieldb"]["postgrest"]["token"])}
url = CONFIG["zieldb"]["postgrest"]["url"]
if not url.endswith("/"):
url = f"{url}/"
while True:
letzter_ts_server = db_postgrest.hole_letzten_ts(url,
CONFIG["zieldb"]["postgrest"]["tablename_basiswetterdaten"],
headers, CONFIG["grafana"]["grafana_name"])
letzter_ts_server = letzter_ts_server.replace(tzinfo=datetime.timezone.utc)
5 years ago
basiswetterdaten, zusatzwetterdaten, aufruf_wiederholen = rohdaten_laden(letzter_ts_server.timestamp(),
CONFIG["grafana"]["grafana_name"])
zusatzwetterdaten = freigabe_setzen(zusatzwetterdaten)
db_postgrest.sende_daten(url, CONFIG["zieldb"]["postgrest"]["tablename_basiswetterdaten"], headers,
basiswetterdaten, LOGGER)
db_postgrest.sende_daten(url, CONFIG["zieldb"]["postgrest"]["tablename_zusatzwetterdaten"], headers,
zusatzwetterdaten, LOGGER)
if not aufruf_wiederholen:
break
if __name__ == "__main__":
start = datetime.datetime.now()
LOGGER.debug(f"Start: {start}")
try:
main()
except KeyboardInterrupt:
LOGGER.info("Durch Benutzer abgebrochen")
else:
LOGGER.info("Export erfolgreich")
ende = datetime.datetime.now()
LOGGER.debug(f"Ende: {ende}")
LOGGER.debug(f"Dauer: {ende - start}")