Coverage for aixweather/imports/DWD.py: 93%
138 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-06 16:01 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-01-06 16:01 +0000
1"""
2imports weather data from the DWD
3"""
4import logging
5import zipfile
6import os
7import shutil
8import datetime as dt
9import urllib.request
10import pandas as pd
12from aixweather.imports import utils_import
13from aixweather import definitions
16logger = logging.getLogger(__name__)
19def import_DWD_historical(start: dt.datetime, station: str) -> pd.DataFrame:
20 """
21 Pull historical data from DWD:
22 (https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/) and
23 format them into a dataframe.
25 Args:
26 start: defines how much data must be pulled
27 station: station id of the DWD
29 Returns:
30 Dataframe weather data from DWD that is as raw as possible.
31 """
32 measurements = [
33 "air_temperature",
34 "solar",
35 "wind",
36 "precipitation",
37 "soil_temperature",
38 "cloudiness",
39 ]
41 base_url = (
42 "https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/"
43 )
45 # if start of datapoint older than 530 days from now -> import data from historical folder too
46 days_from_now = (dt.datetime.now() - start).days
47 if days_from_now >= (530 - 1):
48 historical_folder = True
49 else:
50 historical_folder = False
52 # create dataframe in which all data to be stored
53 df_w_ges = pd.DataFrame()
55 # get weather data from dwd per measurement
56 for single_measurement in measurements:
57 # inconsistent pathing from DWD resolved by using the 10-min Values for these measurements
58 if single_measurement == "solar" or single_measurement == "air_temperature":
59 df_w = _pull_DWD_historical_data(
60 f"{base_url}/10_minutes/{single_measurement}/recent/",
61 station=station,
62 )
63 if historical_folder:
64 df_hist = _pull_DWD_historical_data(
65 f"{base_url}/10_minutes/{single_measurement}/historical/",
66 station=station,
67 )
68 # add up rows (time periods)
69 df_w = pd.concat([df_w, df_hist])
70 # dataframes may overlap with same values, delete duplicates
71 df_w = df_w[~df_w.index.duplicated(keep="first")]
72 else:
73 df_w = _pull_DWD_historical_data(
74 f"{base_url}/hourly/{single_measurement}/recent/",
75 station=station,
76 )
77 if historical_folder:
78 df_hist = _pull_DWD_historical_data(
79 f"{base_url}/hourly/{single_measurement}/historical/",
80 station=station,
81 )
82 # add up rows (time periods)
83 df_w = pd.concat([df_w, df_hist])
84 # dataframes may overlap with same values, delete duplicates
85 df_w = df_w[~df_w.index.duplicated(keep="first")]
87 # concat each measurement (column)
88 df_w_ges = pd.concat([df_w_ges, df_w], axis=1, join="outer", sort=True)
90 return df_w_ges
93def import_DWD_forecast(station: str) -> pd.DataFrame:
94 """
95 Import weather forecast data from the DWD (German Weather Service) for a specified station.
97 Args:
98 station (str): Station ID of the DWD for which forecast data is to be imported.
99 For debugging purposes: station 01028.
101 Returns:
102 pd.DataFrame: DataFrame containing weather forecast data from the DWD.
103 """
104 try:
105 from wetterdienst.provider.dwd.mosmix import DwdMosmixRequest, DwdMosmixType
106 except ImportError:
107 raise ImportError("Optional dependency 'DWD_forecast' not installed, can't import data.")
108 ### pull forecast data using the package wetterdienst
109 stations = DwdMosmixRequest(
110 parameter="small", mosmix_type=DwdMosmixType.SMALL
111 ).filter_by_station_id(station_id=[station])
112 # query object to get dataframe with forecast values
113 try:
114 values = next(stations.values.query())
115 except Exception as excep:
116 raise ValueError(
117 f"There is no loadable forecast for station {station}"
118 ) from excep
120 imported_df = values.df.to_pandas()
122 ### transform to one column per measurement
123 # Convert the 'Timestamp' column to a datetime object
124 imported_df["date"] = pd.to_datetime(imported_df["date"])
126 # Set the 'Timestamp' column as the index
127 imported_df.set_index("date", inplace=True)
129 # Drop unnecessary columns
130 imported_df.drop(columns=["station_id", "dataset", "quality"], inplace=True)
132 # Pivot the dataframe to have each measurement as a separate column
133 imported_df = imported_df.pivot(columns="parameter", values="value")
135 return imported_df
138def import_meta_DWD_historical(station: str) -> utils_import.MetaData:
139 """
140 Downloads and extracts metadata related to the specified station from
141 the DWD (Deutscher Wetterdienst) Open Data Interface.
143 Parameters:
144 station: Station ID for which metadata is to be retrieved.
146 Returns:
147 meta (meta_data object): An object of the meta_data class with
148 populated attributes related to the station.
149 """
151 url = (
152 "https://www.dwd.de/DE/leistungen/klimadatendeutschland/"
153 "statliste/statlex_rich.txt;jsessionid"
154 "=68E14BA255FE50BDC4AD9FF4F835895F.live31092?view=nasPublication&nn=16102"
155 )
157 # load station overview
158 data_str = urllib.request.urlopen(url).read().decode("latin-1")
160 ### find station ID and its values
161 # Splitting the data into lines
162 lines = data_str.strip().split("\n")
164 # Getting the header line and the line with dashes
165 header_line = lines[0]
166 dash_line = lines[1]
168 # Finding the column breaks based on the dash line
169 column_breaks = [0]
170 for i in range(len(dash_line)):
171 if dash_line[i] != "-" and (i == 0 or dash_line[i - 1] == "-"):
172 column_breaks.append(i)
173 column_breaks.append(len(dash_line))
175 # Splitting the header line based on column breaks
176 header = [
177 header_line[start:end].strip()
178 for start, end in zip(column_breaks[:-1], column_breaks[1:])
179 ]
181 # Initializing a dictionary to store the result
182 station_data = {}
184 # Iterating through the rows and finding the one with the desired STAT_ID
185 for line in lines[2:]:
186 values = [
187 line[start:end].strip()
188 for start, end in zip(column_breaks[:-1], column_breaks[1:])
189 ]
190 stat_id = str(values[header.index("STAT_ID")])
191 if stat_id == station:
192 station_data = {key: value for key, value in zip(header, values)}
193 break
195 if station_data == {}:
196 raise ValueError(
197 f"Station for historical weatherdata with ID {station} could not be"
198 f"found in station list {url}."
199 )
201 ### convert to meta class
202 meta = utils_import.MetaData()
203 meta.station_id = station_data["STAT_ID"]
204 meta.station_name = station_data["STAT_NAME"]
205 meta.altitude = station_data["HS"]
206 meta.longitude = station_data["LA_HIGH"]
207 meta.latitude = station_data["BR_HIGH"]
208 meta.station_exists_since = station_data["BEGINN"]
209 meta.station_exists_until = station_data["ENDE"]
210 meta.input_source = "DWD Historical"
212 return meta
215def import_meta_DWD_forecast(station: str) -> utils_import.MetaData:
216 """
217 Downloads and extracts metadata related to the specified station
218 from the DWD (Deutscher Wetterdienst) Open Data Interface.
220 Parameters:
221 station: Station ID for which metadata is to be retrieved.
223 Returns:
224 meta (meta_data object): An object of the meta_data class with
225 populated attributes related to the station.
226 """
227 url = (
228 "https://www.dwd.de/DE/leistungen/met_verfahren_mosmix/"
229 "mosmix_stationskatalog.cfg?view=nasPublication&nn"
230 "=16102"
231 )
233 # load station overview
234 data_str = urllib.request.urlopen(url).read().decode("latin-1")
236 ### find station ID and its values
237 def extract_info_for_station(data_str, station_id):
238 # Splitting the data by lines
239 lines = data_str.strip().split("\n")
241 # Iterating through the lines to find the desired ID
242 for line in lines[2:]:
243 # Splitting the line into parts
244 parts = line.split()
246 # Extracting the ID and checking if it matches the search ID
247 id = parts[0]
248 if id == station_id:
249 # Creating a dictionary to store the details
250 result_dict = {}
251 result_dict["ID"] = id
252 result_dict["ICAO"] = parts[1]
253 result_dict["NAME"] = " ".join(parts[2:-3])
254 result_dict["LAT"] = parts[-3]
255 result_dict["LON"] = parts[-2]
256 result_dict["ELEV"] = parts[-1]
257 return result_dict
259 # warn that the station does not exist
260 raise ValueError(
261 f"Station for forecast data with the ID {station_id} could not be found in the "
262 f"station list: {url}"
263 )
265 station_data = extract_info_for_station(data_str, station)
267 # convert to meta class
268 meta = utils_import.MetaData()
269 meta.station_id = station_data["ID"]
270 meta.station_name = station_data["NAME"]
271 meta.altitude = station_data["ELEV"]
272 meta.longitude = station_data["LON"]
273 meta.latitude = station_data["LAT"]
274 meta.input_source = "DWD Forecast"
276 return meta
279def _pull_DWD_historical_data(url: str, station: str) -> pd.DataFrame:
280 """
281 Ruft die Messdaten von der angegebenen URL ab und konvertiert diese in
282 ein pandas DataFrame
284 :param url: str URL des DWD-Ordners, in welchem die Messdaten gespeichert sind
285 :param station: int/str Stationsname der DWD Wetterstation, Aachen-Orsbach ist 15000
286 :return: data pandas DataFrame Abgerufener Datensatz und eventuelle Fehlermeldungen
287 """
289 # First, load all available filenames
290 http_obj = urllib.request.urlopen(url).read().decode()
292 # DWD data contains the stations with leading zeros, the meta-data and station lists without
293 # leading zeros. Apply leading zeros for pulling DWD data.
294 station_with_leading_zeros = station.zfill(5)
296 # select only those file names that belong to the station
297 zip_names = [
298 i
299 for i in http_obj.split('"')
300 if f"_{station_with_leading_zeros}_" in i and not i.startswith(">")
301 ]
303 data_total = pd.DataFrame()
305 # download and read all available data to df
306 for zip_name in zip_names:
307 unzipped_path = _download_DWD_file(url, zip_name)
309 # extract data file path
310 file_name = list(filter(lambda s: s[0] == "p", os.listdir(unzipped_path)))[0]
311 file_path = os.path.join(unzipped_path, file_name)
313 # read data file
314 data = pd.read_csv(file_path, sep=";")
316 # unify 10min data with 1h data for "MESS_DATUM" format
317 # -> convert 2022012400 to 202201240000
318 if len(data.iloc[0]["MESS_DATUM"].astype(str)) == 10: # if hourly
319 data["MESS_DATUM"] = data["MESS_DATUM"] * 100 # add two zeros
321 # make MESS_DATUM the index for concenating
322 data.set_index("MESS_DATUM", inplace=True, drop=True)
324 data_total = pd.concat([data_total, data], verify_integrity=True)
326 shutil.rmtree(definitions.local_folder_temp)
328 return data_total
331def _download_DWD_file(url: str, zip_name: str):
332 """
333 Downloads the file with the given filename from the specified URL and unzip.
335 Parameters:
336 url (str): URL of the DWD folder.
337 zip_name (str): Name of the file to be downloaded.
339 Returns:
340 tuple: A tuple containing a boolean indicating the success or failure of the download,
341 and the location of the downloaded file.
342 Returns (False, None) if an error occurs during download.
343 """
344 folder_unzip = "unzipped_content"
346 total_zip_name = os.path.join(definitions.local_folder_temp, zip_name)
348 if not os.path.exists(definitions.local_folder_temp):
349 os.makedirs(definitions.local_folder_temp)
351 for i in range(4): # try retrieval 3 times
352 try:
353 urllib.request.urlretrieve(url + zip_name, total_zip_name)
354 logger.debug("Loaded: %s", total_zip_name)
356 # save unzipped files to folder_unzip
357 extract_path = os.path.join(definitions.local_folder_temp, folder_unzip)
358 with zipfile.ZipFile(total_zip_name, "r") as zip_ref:
359 zip_ref.extractall(extract_path)
361 return extract_path
362 except Exception as excep:
363 if i == 3:
364 raise ConnectionError(
365 f"Not loaded: {total_zip_name} \n" f"with error: {excep}"
366 ) from excep