import toml import os import db_model as db from peewee import PostgresqlDatabase import datetime from sshtunnel import SSHTunnelForwarder from corona_statistik import CoronaStatistik def config_laden(): configfile = os.path.join(SKRIPTPFAD, "config.toml") with open(configfile) as file: return toml.loads(file.read()) SKRIPTPFAD = os.path.abspath(os.path.dirname(__file__)) CONFIG = config_laden() QUELLEN = ["jhu", "who", "rki"] TYP = "verdoppelungsrate" def letzten_eintrag_ermitteln(quelle, country_region): ts = db.CoronaStatistik.select( db.CoronaStatistik.ts).where((db.CoronaStatistik.typ == TYP) & (db.CoronaStatistik.quelle == quelle) & (db.CoronaStatistik.country_region == country_region) ).order_by(db.CoronaStatistik.ts.desc()).limit(1).scalar() return ts def verdoppelungsrate_ermitteln(quelle, land, confirmed, confirmed_ts): halbe_infizierte = confirmed / 2 ts_halbe = db.CoronaDaten.select(db.CoronaDaten.ts).where( (db.CoronaDaten.quelle == quelle) & (db.CoronaDaten.country_region == land) & (db.CoronaDaten.confirmed < halbe_infizierte)).order_by(db.CoronaDaten.confirmed.desc()).limit(1).scalar() try: verdoppelungstage = (confirmed_ts - ts_halbe).total_seconds() except TypeError: verdoppelungstage = None return verdoppelungstage def letzte_verdopplungsraten_berechnen(quelle, land, ts): kein_passender_eintrag = False daten = [] query = db.CoronaDaten.select(db.CoronaDaten.ts, db.CoronaDaten.confirmed).where( (db.CoronaDaten.quelle == quelle) & (db.CoronaDaten.country_region == land) & (db.CoronaDaten.ts > ts)).dicts() for datensatz in query: verdoppelungsrate = verdoppelungsrate_ermitteln(quelle, land, datensatz["confirmed"], datensatz["ts"]) if verdoppelungsrate is not None: verdoppelungsrate_tage = round(verdoppelungsrate_in_tagen(verdoppelungsrate), 3) print(f"Verdoppelungsrate für {land} ({quelle}) am {datensatz['ts']}: {verdoppelungsrate_tage}") daten.append(CoronaStatistik(datensatz["ts"], quelle, land, TYP, verdoppelungsrate_tage)) else: print(f"Kein passender Eintrag gefunden für {land} ({quelle})") kein_passender_eintrag = True if kein_passender_eintrag: print(f"Setze -1 für {land} ({quelle})") daten.append(CoronaStatistik(datetime.datetime.now(), quelle, land, TYP, -1)) return daten def verdoppelungsrate_in_tagen(verdoppelungsrate): try: return verdoppelungsrate / (60 * 60 * 24) except TypeError: return None def main(): with SSHTunnelForwarder( (CONFIG["ssh"]["ip_server"], CONFIG["ssh"]["ssh_port"]), ssh_username=CONFIG["ssh"]["user"], ssh_password=CONFIG["ssh"]["pw"], remote_bind_address=('127.0.0.1', CONFIG["pg"]["pgport"])) as server: db.database.initialize(PostgresqlDatabase(CONFIG["pg"]["pgdb"], user=CONFIG["pg"]["pguser"], password=CONFIG["pg"]["pgpw"], host="127.0.0.1", port=server.local_bind_port)) query_quelle = db.CoronaDaten.select(db.CoronaDaten.quelle).group_by(db.CoronaDaten.quelle) for data in query_quelle: query_country = db.CoronaDaten.select( db.CoronaDaten.country_region).where(db.CoronaDaten.quelle == data.quelle ).group_by(db.CoronaDaten.country_region) for datensatz in query_country: if data.quelle == "jhu" and datensatz.country_region == "Germany": letzte_verdopplungsraten_berechnen(data.quelle, datensatz.country_region, datetime.datetime(2020, 4, 3)) db.Database.close(db.database) if __name__ == "__main__": main()