You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			93 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			93 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Python
		
	
| import toml
 | |
| import os
 | |
| import db_model as db
 | |
| from peewee import PostgresqlDatabase
 | |
| import datetime
 | |
| from sshtunnel import SSHTunnelForwarder
 | |
| from corona_statistik import CoronaStatistik
 | |
| 
 | |
| 
 | |
| def config_laden():
 | |
|     configfile = os.path.join(SKRIPTPFAD, "config.toml")
 | |
|     with open(configfile) as file:
 | |
|         return toml.loads(file.read())
 | |
| 
 | |
| 
 | |
| SKRIPTPFAD = os.path.abspath(os.path.dirname(__file__))
 | |
| CONFIG = config_laden()
 | |
| QUELLEN = ["jhu", "who", "rki"]
 | |
| TYP = "verdoppelungsrate"
 | |
| 
 | |
| 
 | |
| def letzten_eintrag_ermitteln(quelle, country_region):
 | |
|     ts = db.CoronaStatistik.select(
 | |
|         db.CoronaStatistik.ts).where((db.CoronaStatistik.typ == TYP) &
 | |
|                                      (db.CoronaStatistik.quelle == quelle) &
 | |
|                                      (db.CoronaStatistik.country_region == country_region)
 | |
|                                      ).order_by(db.CoronaStatistik.ts.desc()).limit(1).scalar()
 | |
|     return ts
 | |
| 
 | |
| 
 | |
| def verdoppelungsrate_ermitteln(quelle, land, confirmed, confirmed_ts):
 | |
|     halbe_infizierte = confirmed / 2
 | |
|     ts_halbe = db.CoronaDaten.select(db.CoronaDaten.ts).where(
 | |
|         (db.CoronaDaten.quelle == quelle) & (db.CoronaDaten.country_region == land) &
 | |
|         (db.CoronaDaten.confirmed < halbe_infizierte)).order_by(db.CoronaDaten.confirmed.desc()).limit(1).scalar()
 | |
|     try:
 | |
|         verdoppelungstage = (confirmed_ts - ts_halbe).total_seconds()
 | |
|     except TypeError:
 | |
|         verdoppelungstage = None
 | |
|     return verdoppelungstage
 | |
| 
 | |
| 
 | |
| def letzte_verdopplungsraten_berechnen(quelle, land, ts):
 | |
|     kein_passender_eintrag = False
 | |
|     daten = []
 | |
|     query = db.CoronaDaten.select(db.CoronaDaten.ts, db.CoronaDaten.confirmed).where(
 | |
|         (db.CoronaDaten.quelle == quelle) & (db.CoronaDaten.country_region == land) & (db.CoronaDaten.ts > ts)).dicts()
 | |
|     for datensatz in query:
 | |
|         verdoppelungsrate = verdoppelungsrate_ermitteln(quelle, land, datensatz["confirmed"], datensatz["ts"])
 | |
|         if verdoppelungsrate is not None:
 | |
|             verdoppelungsrate_tage = round(verdoppelungsrate_in_tagen(verdoppelungsrate), 3)
 | |
|             print(f"Verdoppelungsrate für {land} ({quelle}) am {datensatz['ts']}: {verdoppelungsrate_tage}")
 | |
|             daten.append(CoronaStatistik(datensatz["ts"], quelle, land, TYP, verdoppelungsrate_tage))
 | |
|         else:
 | |
|             print(f"Kein passender Eintrag gefunden für {land} ({quelle})")
 | |
|             kein_passender_eintrag = True
 | |
|     if kein_passender_eintrag:
 | |
|         print(f"Setze -1 für {land} ({quelle})")
 | |
|         daten.append(CoronaStatistik(datetime.datetime.now(), quelle, land, TYP, -1))
 | |
|     return daten
 | |
| 
 | |
| 
 | |
| def verdoppelungsrate_in_tagen(verdoppelungsrate):
 | |
|     try:
 | |
|         return verdoppelungsrate / (60 * 60 * 24)
 | |
|     except TypeError:
 | |
|         return None
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     with SSHTunnelForwarder(
 | |
|             (CONFIG["ssh"]["ip_server"], CONFIG["ssh"]["ssh_port"]), ssh_username=CONFIG["ssh"]["user"],
 | |
|             ssh_password=CONFIG["ssh"]["pw"], remote_bind_address=('127.0.0.1', CONFIG["pg"]["pgport"])) as server:
 | |
| 
 | |
|         db.database.initialize(PostgresqlDatabase(CONFIG["pg"]["pgdb"],
 | |
|                                                   user=CONFIG["pg"]["pguser"], password=CONFIG["pg"]["pgpw"],
 | |
|                                                   host="127.0.0.1",
 | |
|                                                   port=server.local_bind_port))
 | |
| 
 | |
|         query_quelle = db.CoronaDaten.select(db.CoronaDaten.quelle).group_by(db.CoronaDaten.quelle)
 | |
|         for data in query_quelle:
 | |
|             query_country = db.CoronaDaten.select(
 | |
|                 db.CoronaDaten.country_region).where(db.CoronaDaten.quelle == data.quelle
 | |
|                                                      ).group_by(db.CoronaDaten.country_region)
 | |
|             for datensatz in query_country:
 | |
|                 if data.quelle == "jhu" and datensatz.country_region == "Germany":
 | |
|                     letzte_verdopplungsraten_berechnen(data.quelle, datensatz.country_region, datetime.datetime(2020, 4, 3))
 | |
|         db.Database.close(db.database)
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main()
 |