"""
imports weather data from the DWD
"""
import logging
import zipfile
import os
import shutil
import datetime as dt
import urllib.request
import pandas as pd
from aixweather.imports import utils_import
from aixweather import definitions
logger = logging.getLogger(__name__)
[docs]def import_DWD_historical(start: dt.datetime, station: str) -> pd.DataFrame:
"""
Pull historical data from DWD:
(https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/) and
format them into a dataframe.
Args:
start: defines how much data must be pulled
station: station id of the DWD
Returns:
Dataframe weather data from DWD that is as raw as possible.
"""
measurements = [
"air_temperature",
"solar",
"wind",
"precipitation",
"soil_temperature",
"cloudiness",
]
base_url = (
"https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/"
)
# if start of datapoint older than 530 days from now -> import data from historical folder too
days_from_now = (dt.datetime.now() - start).days
if days_from_now >= (530 - 1):
historical_folder = True
else:
historical_folder = False
# create dataframe in which all data to be stored
df_w_ges = pd.DataFrame()
# get weather data from dwd per measurement
for single_measurement in measurements:
# inconsistent pathing from DWD resolved by using the 10-min Values for these measurements
if single_measurement == "solar" or single_measurement == "air_temperature":
df_w = _pull_DWD_historical_data(
f"{base_url}/10_minutes/{single_measurement}/recent/",
station=station,
)
if historical_folder:
df_hist = _pull_DWD_historical_data(
f"{base_url}/10_minutes/{single_measurement}/historical/",
station=station,
)
# add up rows (time periods)
df_w = pd.concat([df_w, df_hist])
# dataframes may overlap with same values, delete duplicates
df_w = df_w[~df_w.index.duplicated(keep="first")]
else:
df_w = _pull_DWD_historical_data(
f"{base_url}/hourly/{single_measurement}/recent/",
station=station,
)
if historical_folder:
df_hist = _pull_DWD_historical_data(
f"{base_url}/hourly/{single_measurement}/historical/",
station=station,
)
# add up rows (time periods)
df_w = pd.concat([df_w, df_hist])
# dataframes may overlap with same values, delete duplicates
df_w = df_w[~df_w.index.duplicated(keep="first")]
# concat each measurement (column)
df_w_ges = pd.concat([df_w_ges, df_w], axis=1, join="outer", sort=True)
return df_w_ges
[docs]def import_DWD_forecast(station: str) -> pd.DataFrame:
"""
Import weather forecast data from the DWD (German Weather Service) for a specified station.
Args:
station (str): Station ID of the DWD for which forecast data is to be imported.
For debugging purposes: station 01028.
Returns:
pd.DataFrame: DataFrame containing weather forecast data from the DWD.
"""
try:
from wetterdienst.provider.dwd.mosmix import DwdMosmixRequest, DwdMosmixType
except ImportError:
raise ImportError("Optional dependency 'DWD_forecast' not installed, can't import data.")
### pull forecast data using the package wetterdienst
stations = DwdMosmixRequest(
parameter="small", mosmix_type=DwdMosmixType.SMALL
).filter_by_station_id(station_id=[station])
# query object to get dataframe with forecast values
try:
values = next(stations.values.query())
except Exception as excep:
raise ValueError(
f"There is no loadable forecast for station {station}"
) from excep
imported_df = values.df.to_pandas()
### transform to one column per measurement
# Convert the 'Timestamp' column to a datetime object
imported_df["date"] = pd.to_datetime(imported_df["date"])
# Set the 'Timestamp' column as the index
imported_df.set_index("date", inplace=True)
# Drop unnecessary columns
imported_df.drop(columns=["station_id", "dataset", "quality"], inplace=True)
# Pivot the dataframe to have each measurement as a separate column
imported_df = imported_df.pivot(columns="parameter", values="value")
return imported_df
def _pull_DWD_historical_data(url: str, station: str) -> pd.DataFrame:
"""
Ruft die Messdaten von der angegebenen URL ab und konvertiert diese in
ein pandas DataFrame
:param url: str URL des DWD-Ordners, in welchem die Messdaten gespeichert sind
:param station: int/str Stationsname der DWD Wetterstation, Aachen-Orsbach ist 15000
:return: data pandas DataFrame Abgerufener Datensatz und eventuelle Fehlermeldungen
"""
# First, load all available filenames
http_obj = urllib.request.urlopen(url).read().decode()
# DWD data contains the stations with leading zeros, the meta-data and station lists without
# leading zeros. Apply leading zeros for pulling DWD data.
station_with_leading_zeros = station.zfill(5)
# select only those file names that belong to the station
zip_names = [
i
for i in http_obj.split('"')
if f"_{station_with_leading_zeros}_" in i and not i.startswith(">")
]
data_total = pd.DataFrame()
# download and read all available data to df
for zip_name in zip_names:
unzipped_path = _download_DWD_file(url, zip_name)
# extract data file path
file_name = list(filter(lambda s: s[0] == "p", os.listdir(unzipped_path)))[0]
file_path = os.path.join(unzipped_path, file_name)
# read data file
data = pd.read_csv(file_path, sep=";")
# unify 10min data with 1h data for "MESS_DATUM" format
# -> convert 2022012400 to 202201240000
if len(data.iloc[0]["MESS_DATUM"].astype(str)) == 10: # if hourly
data["MESS_DATUM"] = data["MESS_DATUM"] * 100 # add two zeros
# make MESS_DATUM the index for concenating
data.set_index("MESS_DATUM", inplace=True, drop=True)
data_total = pd.concat([data_total, data], verify_integrity=True)
shutil.rmtree(definitions.local_folder_temp)
return data_total
def _download_DWD_file(url: str, zip_name: str):
"""
Downloads the file with the given filename from the specified URL and unzip.
Parameters:
url (str): URL of the DWD folder.
zip_name (str): Name of the file to be downloaded.
Returns:
tuple: A tuple containing a boolean indicating the success or failure of the download,
and the location of the downloaded file.
Returns (False, None) if an error occurs during download.
"""
folder_unzip = "unzipped_content"
total_zip_name = os.path.join(definitions.local_folder_temp, zip_name)
if not os.path.exists(definitions.local_folder_temp):
os.makedirs(definitions.local_folder_temp)
for i in range(4): # try retrieval 3 times
try:
urllib.request.urlretrieve(url + zip_name, total_zip_name)
logger.debug("Loaded: %s", total_zip_name)
# save unzipped files to folder_unzip
extract_path = os.path.join(definitions.local_folder_temp, folder_unzip)
with zipfile.ZipFile(total_zip_name, "r") as zip_ref:
zip_ref.extractall(extract_path)
return extract_path
except Exception as excep:
if i == 3:
raise ConnectionError(
f"Not loaded: {total_zip_name} \n" f"with error: {excep}"
) from excep