Corona-Daten des RKI mit Python herunterladen

Auch Ende 2020 hält uns Corona fest im Griff. Vor allem vor den Feiertagen habe ich mir die Frage gestellt, wie sinnvoll es ist, wenn Personen aus unterschiedlichen Landesteilen zusammenkommen. Ich wollte also auf einen Blick für meine Familie sehen können, wie sich die Lage in den jeweiligen Landkreisen entwickelt. Idealerweise automatisiert jeden Morgen auf meinem Handy - ohne die völlig benutzerunfreundlichen Veröffentlichungen des RKI durchstöbern zu müssen.

Die Idee war geboren. Ein Python-Skript sollte mir jede Nacht die Daten vom RKI abziehen, in einer Grafik aufbereiten und via Telegram an meine Familie verteilen. Wenn die Muse anhält, werde ich in den nächsten Tagen also kurz beschreiben, wie ich Daten vom RKI mit Python abziehe, diese mit Plotly grafisch aufbereite und schließlich in einen Telegram-Kanal jeden Morgen poste.

Artikel in dieser Serie

Beginnen wir mit dem Abziehen der Daten aus dem NPGEO Corona Hub. Das RKI veröffentlicht die Daten - aus meiner Sicht sehr dürftig dokumentiert - über arcgis.com, das mittels API von Python aus angesprochen werden kann. Wir benötigen zunächst also die URL.

Da ich mich für Daten auf Landkreisebene interessiere, öffnen wir https://npgeo-corona-npgeo-de.hub.arcgis.com/search?collection=Dataset und wählen den Landkreis-Datensatz aus. Über den API-Explorer-Tab können wir Filter setzen, die relevanten Felder auswählen und die sich ergebende URL kopieren.

API Explorer
API Explorer

Dann bereiten wir unser Python-Skript vor, indem wir die notwendigen Bibliotheken laden.

# imports
import json
import os
import pandas as pd
import requests

from datetime import datetime
from pandas.io.json import json_normalize

Im nächsten Schritt definieren wir ein paar Variablen.

# define variables
api_url = r'https://services7.arcgis.com/mOBPykOjAyBO2ZKk/arcgis/rest/services/RKI_Landkreisdaten/FeatureServer/0/query?where=1%3D1&outFields=*&outSR=4326&f=json'
app_dir = os.path.abspath(os.path.dirname(__file__))
data_dir = app_dir + '/data/'

Der eigentliche Download verfolgt über eine Kombination von json.loads() mit requests.get(). Mittels json_normalize() wandeln wir das heruntergeladen JSON-File in einen Pandas-Datensatz um.

# download and normalize
d = json.loads(requests.get(api_url).text)
df = json_normalize(d, record_path=['features'])

Perfekt. Mittels df.head() können wir einen Blick auf die Daten werfen. Mir persönlich gefällt das attributes.-Präfix bei den Spaltennamen nicht. Lass es uns also entfernen. Dafür erzeugen wir ein leeres Array, schleifen über die Spaltennamen, ersetzen dabei jeweils “attributes.” mit "" und überschreiben schließlich die Spaltennamen mit dem Array. Es gibt dafür sicher einen besseren Weg.

# remove "attributes." from field names
colnames = []
for i in range(len(df.columns)):
    colnames.append(df.columns[i].replace('attributes.', ''))

df.columns = colnames

Das RKI veröffentlicht über die API lediglich die neusten Daten. Sofern wir einen Zeitverlauf analysieren wollen, müssen wir die Daten also abspeichern. Der Einfachheit halber sichern wir den heruntergeladenen Datensatz als CSV mittels Pandas to_csv()-Funktion. Damit wir die Dateien den einzelnen Tagen zuordnen können, extrahieren wir aus der Spalte last_update das Datum und bauen das in den Namen der exportierten CSV-Datei ein.

# extract last update date (for filename)
file_date = datetime.strptime(df['last_update'][0], '%d.%m.%Y, %H:%M Uhr').strftime("%Y%m%d_%H%M")

# epxort CSV to data directory, using last update date for filename
df.to_csv(
    path_or_buf = data_dir + "rki_" + file_date + ".csv",
    sep = ";",
    header = True,
    index = False,
    encoding = 'utf-8'
)

Und damit sind wir eigentlich auch schon fertig. Damit ich nicht für jede Anwendung die Daten neu einlesen und zusammenfügen muss, habe ich meinem Python-Skript noch einen letzten Absatz mitgegeben. Dieser erzeugt zunächst eine Liste aller unter data/ abgespeicherten Dateien (deren Dateiname mit rki_ beginnt), liest diese anschließend der Reihe nach ein, fügt sie zu einem Datensatz zusammen und speichert diesen als zusätzliches CSV-File ab.

# create list of all exported CSV files
filepaths = [f for f in os.listdir((data_dir)) if f.startswith('rki_')]
filepaths

# append exported CSV files
rkitmp = []
for f in filepaths:
    dftmp = pd.read_csv(data_dir + f, sep = ";", decimal=",")
    rkitmp.append(dftmp)

# concat to Pandas data frame
rki = pd.concat(rkitmp, axis=0, ignore_index=True, sort = True)
rki = rki.drop_duplicates()

# export combined data
rki.to_csv(
    path_or_buf = data_dir + "rki.csv",
    sep = ";",
    header = True,
    index = False,
    encoding = 'utf-8'
)

#Script #Data

Reply to this post
I used to have a comments box on this blog, but found that maintaining another system and dealing with the cookies is cumbersome. So, if you have any thoughts on what I wrote in this article, please write me an email and we can have a conversation about it.
reply via email ✉️

Enjoyed reading this?
I put a lot of work into maintaining this blog and I really enjoy the interactions I get with you, dear readers. I you liked what you just read and want to do me a little favor, please tip me a cup of coffee as it's the fuel that keeps me going:
buy me a coffee ☕️