You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Corona/verdoppelungsrate.py

93 lines
3.9 KiB
Python

import toml
import os
import db_model as db
from peewee import PostgresqlDatabase
import datetime
from sshtunnel import SSHTunnelForwarder
from corona_statistik import CoronaStatistik
def config_laden():
configfile = os.path.join(SKRIPTPFAD, "config.toml")
with open(configfile) as file:
return toml.loads(file.read())
SKRIPTPFAD = os.path.abspath(os.path.dirname(__file__))
CONFIG = config_laden()
QUELLEN = ["jhu", "who", "rki"]
TYP = "verdoppelungsrate"
def letzten_eintrag_ermitteln(quelle, country_region):
ts = db.CoronaStatistik.select(
db.CoronaStatistik.ts).where((db.CoronaStatistik.typ == TYP) &
(db.CoronaStatistik.quelle == quelle) &
(db.CoronaStatistik.country_region == country_region)
).order_by(db.CoronaStatistik.ts.desc()).limit(1).scalar()
return ts
def verdoppelungsrate_ermitteln(quelle, land, confirmed, confirmed_ts):
halbe_infizierte = confirmed / 2
ts_halbe = db.CoronaDaten.select(db.CoronaDaten.ts).where(
(db.CoronaDaten.quelle == quelle) & (db.CoronaDaten.country_region == land) &
(db.CoronaDaten.confirmed < halbe_infizierte)).order_by(db.CoronaDaten.confirmed.desc()).limit(1).scalar()
try:
verdoppelungstage = (confirmed_ts - ts_halbe).total_seconds()
except TypeError:
verdoppelungstage = None
return verdoppelungstage
def letzte_verdopplungsraten_berechnen(quelle, land, ts):
kein_passender_eintrag = False
daten = []
query = db.CoronaDaten.select(db.CoronaDaten.ts, db.CoronaDaten.confirmed).where(
(db.CoronaDaten.quelle == quelle) & (db.CoronaDaten.country_region == land) & (db.CoronaDaten.ts > ts)).dicts()
for datensatz in query:
verdoppelungsrate = verdoppelungsrate_ermitteln(quelle, land, datensatz["confirmed"], datensatz["ts"])
if verdoppelungsrate is not None:
verdoppelungsrate_tage = round(verdoppelungsrate_in_tagen(verdoppelungsrate), 3)
print(f"Verdoppelungsrate für {land} ({quelle}) am {datensatz['ts']}: {verdoppelungsrate_tage}")
daten.append(CoronaStatistik(datensatz["ts"], quelle, land, TYP, verdoppelungsrate_tage))
else:
print(f"Kein passender Eintrag gefunden für {land} ({quelle})")
kein_passender_eintrag = True
if kein_passender_eintrag:
print(f"Setze -1 für {land} ({quelle})")
daten.append(CoronaStatistik(datetime.datetime.now(), quelle, land, TYP, -1))
return daten
def verdoppelungsrate_in_tagen(verdoppelungsrate):
try:
return verdoppelungsrate / (60 * 60 * 24)
except TypeError:
return None
def main():
with SSHTunnelForwarder(
(CONFIG["ssh"]["ip_server"], CONFIG["ssh"]["ssh_port"]), ssh_username=CONFIG["ssh"]["user"],
ssh_password=CONFIG["ssh"]["pw"], remote_bind_address=('127.0.0.1', CONFIG["pg"]["pgport"])) as server:
db.database.initialize(PostgresqlDatabase(CONFIG["pg"]["pgdb"],
user=CONFIG["pg"]["pguser"], password=CONFIG["pg"]["pgpw"],
host="127.0.0.1",
port=server.local_bind_port))
query_quelle = db.CoronaDaten.select(db.CoronaDaten.quelle).group_by(db.CoronaDaten.quelle)
for data in query_quelle:
query_country = db.CoronaDaten.select(
db.CoronaDaten.country_region).where(db.CoronaDaten.quelle == data.quelle
).group_by(db.CoronaDaten.country_region)
for datensatz in query_country:
if data.quelle == "jhu" and datensatz.country_region == "Germany":
letzte_verdopplungsraten_berechnen(data.quelle, datensatz.country_region, datetime.datetime(2020, 4, 3))
db.Database.close(db.database)
if __name__ == "__main__":
main()