Coverage for aixweather/imports/DWD.py: 94%

140 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-12-31 11:58 +0000

1""" 

2imports weather data from the DWD 

3""" 

4import logging 

5import zipfile 

6import os 

7import shutil 

8import datetime as dt 

9import urllib.request 

10import pandas as pd 

11 

12from aixweather.imports import utils_import 

13from aixweather import definitions 

14 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19def import_DWD_historical(start: dt.datetime, station: str) -> pd.DataFrame: 

20 """ 

21 Pull historical data from DWD: 

22 (https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/) and 

23 format them into a dataframe. 

24 

25 Args: 

26 start: defines how much data must be pulled 

27 station: station id of the DWD 

28 

29 Returns: 

30 Dataframe weather data from DWD that is as raw as possible. 

31 """ 

32 measurements = [ 

33 "air_temperature", 

34 "solar", 

35 "wind", 

36 "precipitation", 

37 "soil_temperature", 

38 "cloudiness", 

39 ] 

40 

41 base_url = ( 

42 "https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/" 

43 ) 

44 

45 # if start of datapoint older than 530 days from now -> import data from historical folder too 

46 days_from_now = (dt.datetime.now() - start).days 

47 if days_from_now >= (530 - 1): 

48 historical_folder = True 

49 else: 

50 historical_folder = False 

51 

52 # create dataframe in which all data to be stored 

53 df_w_ges = pd.DataFrame() 

54 

55 # get weather data from dwd per measurement 

56 for single_measurement in measurements: 

57 # inconsistent pathing from DWD resolved by using the 10-min Values for these measurements 

58 if single_measurement in ["solar", "air_temperature", "wind"]: 

59 df_w = _pull_DWD_historical_data( 

60 f"{base_url}/10_minutes/{single_measurement}/recent/", 

61 station=station, 

62 ) 

63 if historical_folder: 

64 df_hist = _pull_DWD_historical_data( 

65 f"{base_url}/10_minutes/{single_measurement}/historical/", 

66 station=station, 

67 ) 

68 # add up rows (time periods) 

69 df_w = pd.concat([df_w, df_hist]) 

70 # dataframes may overlap with same values, delete duplicates 

71 df_w = df_w[~df_w.index.duplicated(keep="first")] 

72 else: 

73 df_w = _pull_DWD_historical_data( 

74 f"{base_url}/hourly/{single_measurement}/recent/", 

75 station=station, 

76 ) 

77 if historical_folder: 

78 df_hist = _pull_DWD_historical_data( 

79 f"{base_url}/hourly/{single_measurement}/historical/", 

80 station=station, 

81 ) 

82 # add up rows (time periods) 

83 df_w = pd.concat([df_w, df_hist]) 

84 # dataframes may overlap with same values, delete duplicates 

85 df_w = df_w[~df_w.index.duplicated(keep="first")] 

86 

87 # concat each measurement (column) 

88 df_w_ges = pd.concat([df_w_ges, df_w], axis=1, join="outer", sort=True) 

89 

90 return df_w_ges 

91 

92 

93def import_DWD_forecast(station: str) -> pd.DataFrame: 

94 """ 

95 Import weather forecast data from the DWD (German Weather Service) for a specified station. 

96 

97 Args: 

98 station (str): Station ID of the DWD for which forecast data is to be imported. 

99 For debugging purposes: station 01028. 

100 

101 Returns: 

102 pd.DataFrame: DataFrame containing weather forecast data from the DWD. 

103 """ 

104 try: 

105 from wetterdienst.provider.dwd.mosmix import DwdMosmixRequest, DwdMosmixType 

106 except ImportError: 

107 raise ImportError("Optional dependency 'DWD_forecast' not installed, can't import data.") 

108 ### pull forecast data using the package wetterdienst 

109 stations = DwdMosmixRequest( 

110 parameter="small", mosmix_type=DwdMosmixType.SMALL 

111 ).filter_by_station_id(station_id=[station]) 

112 # query object to get dataframe with forecast values 

113 try: 

114 values = next(stations.values.query()) 

115 except Exception as excep: 

116 raise ValueError( 

117 f"There is no loadable forecast for station {station}" 

118 ) from excep 

119 

120 imported_df = values.df.to_pandas() 

121 

122 ### transform to one column per measurement 

123 # Convert the 'Timestamp' column to a datetime object 

124 imported_df["date"] = pd.to_datetime(imported_df["date"]) 

125 

126 # Set the 'Timestamp' column as the index 

127 imported_df.set_index("date", inplace=True) 

128 

129 # Drop unnecessary columns 

130 imported_df.drop(columns=["station_id", "dataset", "quality"], inplace=True) 

131 

132 # Pivot the dataframe to have each measurement as a separate column 

133 imported_df = imported_df.pivot(columns="parameter", values="value") 

134 

135 return imported_df 

136 

137 

138def import_meta_DWD_historical(station: str) -> utils_import.MetaData: 

139 """ 

140 Downloads and extracts metadata related to the specified station from 

141 the DWD (Deutscher Wetterdienst) Open Data Interface. 

142 

143 Parameters: 

144 station: Station ID for which metadata is to be retrieved. 

145 

146 Returns: 

147 meta (meta_data object): An object of the meta_data class with 

148 populated attributes related to the station. 

149 """ 

150 

151 url = ( 

152 "https://www.dwd.de/DE/leistungen/klimadatendeutschland/" 

153 "statliste/statlex_rich.txt;jsessionid" 

154 "=68E14BA255FE50BDC4AD9FF4F835895F.live31092?view=nasPublication&nn=16102" 

155 ) 

156 

157 # load station overview 

158 data_str = urllib.request.urlopen(url).read().decode("latin-1") 

159 

160 ### find station ID and its values 

161 # Splitting the data into lines 

162 lines = data_str.strip().split("\n") 

163 

164 # Getting the header line and the line with dashes 

165 header_line = lines[0] 

166 dash_line = lines[1] 

167 

168 # Finding the column breaks based on the dash line 

169 column_breaks = [0] 

170 for i in range(len(dash_line)): 

171 if dash_line[i] != "-" and (i == 0 or dash_line[i - 1] == "-"): 

172 column_breaks.append(i) 

173 column_breaks.append(len(dash_line)) 

174 

175 # Splitting the header line based on column breaks 

176 header = [ 

177 header_line[start:end].strip() 

178 for start, end in zip(column_breaks[:-1], column_breaks[1:]) 

179 ] 

180 

181 # Initializing a dictionary to store the result 

182 station_data = {} 

183 

184 # Iterating through the rows and finding the one with the desired STAT_ID 

185 for line in lines[2:]: 

186 values = [ 

187 line[start:end].strip() 

188 for start, end in zip(column_breaks[:-1], column_breaks[1:]) 

189 ] 

190 stat_id = str(values[header.index("STAT_ID")]) 

191 if stat_id == station: 

192 station_data = {key: value for key, value in zip(header, values)} 

193 break 

194 

195 if station_data == {}: 

196 raise ValueError( 

197 f"Station for historical weatherdata with ID {station} could not be" 

198 f"found in station list {url}." 

199 ) 

200 

201 ### convert to meta class 

202 meta = utils_import.MetaData() 

203 meta.station_id = station_data["STAT_ID"] 

204 meta.station_name = station_data["STAT_NAME"] 

205 meta.altitude = station_data["HS"] 

206 meta.longitude = station_data["LA_HIGH"] 

207 meta.latitude = station_data["BR_HIGH"] 

208 meta.station_exists_since = station_data["BEGINN"] 

209 meta.station_exists_until = station_data["ENDE"] 

210 meta.input_source = "DWD Historical" 

211 meta.set_imported_timezone(1) # Always in Germany, used for later export 

212 

213 return meta 

214 

215 

216def import_meta_DWD_forecast(station: str) -> utils_import.MetaData: 

217 """ 

218 Downloads and extracts metadata related to the specified station 

219 from the DWD (Deutscher Wetterdienst) Open Data Interface. 

220 

221 Parameters: 

222 station: Station ID for which metadata is to be retrieved. 

223 

224 Returns: 

225 meta (meta_data object): An object of the meta_data class with 

226 populated attributes related to the station. 

227 """ 

228 url = ( 

229 "https://www.dwd.de/DE/leistungen/met_verfahren_mosmix/" 

230 "mosmix_stationskatalog.cfg?view=nasPublication&nn" 

231 "=16102" 

232 ) 

233 

234 # load station overview 

235 data_str = urllib.request.urlopen(url).read().decode("latin-1") 

236 

237 ### find station ID and its values 

238 def extract_info_for_station(data_str, station_id): 

239 # Splitting the data by lines 

240 lines = data_str.strip().split("\n") 

241 

242 # Iterating through the lines to find the desired ID 

243 for line in lines[2:]: 

244 # Splitting the line into parts 

245 parts = line.split() 

246 

247 # Extracting the ID and checking if it matches the search ID 

248 id = parts[0] 

249 if id == station_id: 

250 # Creating a dictionary to store the details 

251 result_dict = {} 

252 result_dict["ID"] = id 

253 result_dict["ICAO"] = parts[1] 

254 result_dict["NAME"] = " ".join(parts[2:-3]) 

255 result_dict["LAT"] = parts[-3] 

256 result_dict["LON"] = parts[-2] 

257 result_dict["ELEV"] = parts[-1] 

258 return result_dict 

259 

260 # warn that the station does not exist 

261 raise ValueError( 

262 f"Station for forecast data with the ID {station_id} could not be found in the " 

263 f"station list: {url}" 

264 ) 

265 

266 station_data = extract_info_for_station(data_str, station) 

267 

268 # convert to meta class 

269 meta = utils_import.MetaData() 

270 meta.station_id = station_data["ID"] 

271 meta.station_name = station_data["NAME"] 

272 meta.altitude = station_data["ELEV"] 

273 meta.longitude = station_data["LON"] 

274 meta.latitude = station_data["LAT"] 

275 meta.input_source = "DWD Forecast" 

276 meta.set_imported_timezone(1) # Always in Germany, used for later export 

277 

278 return meta 

279 

280 

281def _pull_DWD_historical_data(url: str, station: str) -> pd.DataFrame: 

282 """ 

283 Ruft die Messdaten von der angegebenen URL ab und konvertiert diese in 

284 ein pandas DataFrame 

285 

286 :param url: str URL des DWD-Ordners, in welchem die Messdaten gespeichert sind 

287 :param station: int/str Stationsname der DWD Wetterstation, Aachen-Orsbach ist 15000 

288 :return: data pandas DataFrame Abgerufener Datensatz und eventuelle Fehlermeldungen 

289 """ 

290 

291 # First, load all available filenames 

292 http_obj = urllib.request.urlopen(url).read().decode() 

293 

294 # DWD data contains the stations with leading zeros, the meta-data and station lists without 

295 # leading zeros. Apply leading zeros for pulling DWD data. 

296 station_with_leading_zeros = station.zfill(5) 

297 

298 # select only those file names that belong to the station 

299 zip_names = [ 

300 i 

301 for i in http_obj.split('"') 

302 if f"_{station_with_leading_zeros}_" in i and not i.startswith(">") 

303 ] 

304 

305 data_total = pd.DataFrame() 

306 

307 # download and read all available data to df 

308 for zip_name in zip_names: 

309 unzipped_path = _download_DWD_file(url, zip_name) 

310 

311 # extract data file path 

312 file_name = list(filter(lambda s: s[0] == "p", os.listdir(unzipped_path)))[0] 

313 file_path = os.path.join(unzipped_path, file_name) 

314 

315 # read data file 

316 data = pd.read_csv(file_path, sep=";") 

317 

318 # unify 10min data with 1h data for "MESS_DATUM" format 

319 # -> convert 2022012400 to 202201240000 

320 if len(data.iloc[0]["MESS_DATUM"].astype(str)) == 10: # if hourly 

321 data["MESS_DATUM"] = data["MESS_DATUM"] * 100 # add two zeros 

322 

323 # make MESS_DATUM the index for concenating 

324 data.set_index("MESS_DATUM", inplace=True, drop=True) 

325 

326 data_total = pd.concat([data_total, data], verify_integrity=False) 

327 

328 shutil.rmtree(definitions.local_folder_temp) 

329 

330 return data_total 

331 

332 

333def _download_DWD_file(url: str, zip_name: str): 

334 """ 

335 Downloads the file with the given filename from the specified URL and unzip. 

336 

337 Parameters: 

338 url (str): URL of the DWD folder. 

339 zip_name (str): Name of the file to be downloaded. 

340 

341 Returns: 

342 tuple: A tuple containing a boolean indicating the success or failure of the download, 

343 and the location of the downloaded file. 

344 Returns (False, None) if an error occurs during download. 

345 """ 

346 folder_unzip = "unzipped_content" 

347 

348 total_zip_name = os.path.join(definitions.local_folder_temp, zip_name) 

349 

350 if not os.path.exists(definitions.local_folder_temp): 

351 os.makedirs(definitions.local_folder_temp) 

352 

353 for i in range(4): # try retrieval 3 times 

354 try: 

355 urllib.request.urlretrieve(url + zip_name, total_zip_name) 

356 logger.debug("Loaded: %s", total_zip_name) 

357 

358 # save unzipped files to folder_unzip 

359 extract_path = os.path.join(definitions.local_folder_temp, folder_unzip) 

360 with zipfile.ZipFile(total_zip_name, "r") as zip_ref: 

361 zip_ref.extractall(extract_path) 

362 

363 return extract_path 

364 except Exception as excep: 

365 if i == 3: 

366 raise ConnectionError( 

367 f"Not loaded: {total_zip_name} \n" f"with error: {excep}" 

368 ) from excep