Coverage for aixweather/imports/DWD.py: 93%

138 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-01-06 16:01 +0000

1""" 

2imports weather data from the DWD 

3""" 

4import logging 

5import zipfile 

6import os 

7import shutil 

8import datetime as dt 

9import urllib.request 

10import pandas as pd 

11 

12from aixweather.imports import utils_import 

13from aixweather import definitions 

14 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19def import_DWD_historical(start: dt.datetime, station: str) -> pd.DataFrame: 

20 """ 

21 Pull historical data from DWD: 

22 (https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/) and 

23 format them into a dataframe. 

24 

25 Args: 

26 start: defines how much data must be pulled 

27 station: station id of the DWD 

28 

29 Returns: 

30 Dataframe weather data from DWD that is as raw as possible. 

31 """ 

32 measurements = [ 

33 "air_temperature", 

34 "solar", 

35 "wind", 

36 "precipitation", 

37 "soil_temperature", 

38 "cloudiness", 

39 ] 

40 

41 base_url = ( 

42 "https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/" 

43 ) 

44 

45 # if start of datapoint older than 530 days from now -> import data from historical folder too 

46 days_from_now = (dt.datetime.now() - start).days 

47 if days_from_now >= (530 - 1): 

48 historical_folder = True 

49 else: 

50 historical_folder = False 

51 

52 # create dataframe in which all data to be stored 

53 df_w_ges = pd.DataFrame() 

54 

55 # get weather data from dwd per measurement 

56 for single_measurement in measurements: 

57 # inconsistent pathing from DWD resolved by using the 10-min Values for these measurements 

58 if single_measurement == "solar" or single_measurement == "air_temperature": 

59 df_w = _pull_DWD_historical_data( 

60 f"{base_url}/10_minutes/{single_measurement}/recent/", 

61 station=station, 

62 ) 

63 if historical_folder: 

64 df_hist = _pull_DWD_historical_data( 

65 f"{base_url}/10_minutes/{single_measurement}/historical/", 

66 station=station, 

67 ) 

68 # add up rows (time periods) 

69 df_w = pd.concat([df_w, df_hist]) 

70 # dataframes may overlap with same values, delete duplicates 

71 df_w = df_w[~df_w.index.duplicated(keep="first")] 

72 else: 

73 df_w = _pull_DWD_historical_data( 

74 f"{base_url}/hourly/{single_measurement}/recent/", 

75 station=station, 

76 ) 

77 if historical_folder: 

78 df_hist = _pull_DWD_historical_data( 

79 f"{base_url}/hourly/{single_measurement}/historical/", 

80 station=station, 

81 ) 

82 # add up rows (time periods) 

83 df_w = pd.concat([df_w, df_hist]) 

84 # dataframes may overlap with same values, delete duplicates 

85 df_w = df_w[~df_w.index.duplicated(keep="first")] 

86 

87 # concat each measurement (column) 

88 df_w_ges = pd.concat([df_w_ges, df_w], axis=1, join="outer", sort=True) 

89 

90 return df_w_ges 

91 

92 

93def import_DWD_forecast(station: str) -> pd.DataFrame: 

94 """ 

95 Import weather forecast data from the DWD (German Weather Service) for a specified station. 

96 

97 Args: 

98 station (str): Station ID of the DWD for which forecast data is to be imported. 

99 For debugging purposes: station 01028. 

100 

101 Returns: 

102 pd.DataFrame: DataFrame containing weather forecast data from the DWD. 

103 """ 

104 try: 

105 from wetterdienst.provider.dwd.mosmix import DwdMosmixRequest, DwdMosmixType 

106 except ImportError: 

107 raise ImportError("Optional dependency 'DWD_forecast' not installed, can't import data.") 

108 ### pull forecast data using the package wetterdienst 

109 stations = DwdMosmixRequest( 

110 parameter="small", mosmix_type=DwdMosmixType.SMALL 

111 ).filter_by_station_id(station_id=[station]) 

112 # query object to get dataframe with forecast values 

113 try: 

114 values = next(stations.values.query()) 

115 except Exception as excep: 

116 raise ValueError( 

117 f"There is no loadable forecast for station {station}" 

118 ) from excep 

119 

120 imported_df = values.df.to_pandas() 

121 

122 ### transform to one column per measurement 

123 # Convert the 'Timestamp' column to a datetime object 

124 imported_df["date"] = pd.to_datetime(imported_df["date"]) 

125 

126 # Set the 'Timestamp' column as the index 

127 imported_df.set_index("date", inplace=True) 

128 

129 # Drop unnecessary columns 

130 imported_df.drop(columns=["station_id", "dataset", "quality"], inplace=True) 

131 

132 # Pivot the dataframe to have each measurement as a separate column 

133 imported_df = imported_df.pivot(columns="parameter", values="value") 

134 

135 return imported_df 

136 

137 

138def import_meta_DWD_historical(station: str) -> utils_import.MetaData: 

139 """ 

140 Downloads and extracts metadata related to the specified station from 

141 the DWD (Deutscher Wetterdienst) Open Data Interface. 

142 

143 Parameters: 

144 station: Station ID for which metadata is to be retrieved. 

145 

146 Returns: 

147 meta (meta_data object): An object of the meta_data class with 

148 populated attributes related to the station. 

149 """ 

150 

151 url = ( 

152 "https://www.dwd.de/DE/leistungen/klimadatendeutschland/" 

153 "statliste/statlex_rich.txt;jsessionid" 

154 "=68E14BA255FE50BDC4AD9FF4F835895F.live31092?view=nasPublication&nn=16102" 

155 ) 

156 

157 # load station overview 

158 data_str = urllib.request.urlopen(url).read().decode("latin-1") 

159 

160 ### find station ID and its values 

161 # Splitting the data into lines 

162 lines = data_str.strip().split("\n") 

163 

164 # Getting the header line and the line with dashes 

165 header_line = lines[0] 

166 dash_line = lines[1] 

167 

168 # Finding the column breaks based on the dash line 

169 column_breaks = [0] 

170 for i in range(len(dash_line)): 

171 if dash_line[i] != "-" and (i == 0 or dash_line[i - 1] == "-"): 

172 column_breaks.append(i) 

173 column_breaks.append(len(dash_line)) 

174 

175 # Splitting the header line based on column breaks 

176 header = [ 

177 header_line[start:end].strip() 

178 for start, end in zip(column_breaks[:-1], column_breaks[1:]) 

179 ] 

180 

181 # Initializing a dictionary to store the result 

182 station_data = {} 

183 

184 # Iterating through the rows and finding the one with the desired STAT_ID 

185 for line in lines[2:]: 

186 values = [ 

187 line[start:end].strip() 

188 for start, end in zip(column_breaks[:-1], column_breaks[1:]) 

189 ] 

190 stat_id = str(values[header.index("STAT_ID")]) 

191 if stat_id == station: 

192 station_data = {key: value for key, value in zip(header, values)} 

193 break 

194 

195 if station_data == {}: 

196 raise ValueError( 

197 f"Station for historical weatherdata with ID {station} could not be" 

198 f"found in station list {url}." 

199 ) 

200 

201 ### convert to meta class 

202 meta = utils_import.MetaData() 

203 meta.station_id = station_data["STAT_ID"] 

204 meta.station_name = station_data["STAT_NAME"] 

205 meta.altitude = station_data["HS"] 

206 meta.longitude = station_data["LA_HIGH"] 

207 meta.latitude = station_data["BR_HIGH"] 

208 meta.station_exists_since = station_data["BEGINN"] 

209 meta.station_exists_until = station_data["ENDE"] 

210 meta.input_source = "DWD Historical" 

211 

212 return meta 

213 

214 

215def import_meta_DWD_forecast(station: str) -> utils_import.MetaData: 

216 """ 

217 Downloads and extracts metadata related to the specified station 

218 from the DWD (Deutscher Wetterdienst) Open Data Interface. 

219 

220 Parameters: 

221 station: Station ID for which metadata is to be retrieved. 

222 

223 Returns: 

224 meta (meta_data object): An object of the meta_data class with 

225 populated attributes related to the station. 

226 """ 

227 url = ( 

228 "https://www.dwd.de/DE/leistungen/met_verfahren_mosmix/" 

229 "mosmix_stationskatalog.cfg?view=nasPublication&nn" 

230 "=16102" 

231 ) 

232 

233 # load station overview 

234 data_str = urllib.request.urlopen(url).read().decode("latin-1") 

235 

236 ### find station ID and its values 

237 def extract_info_for_station(data_str, station_id): 

238 # Splitting the data by lines 

239 lines = data_str.strip().split("\n") 

240 

241 # Iterating through the lines to find the desired ID 

242 for line in lines[2:]: 

243 # Splitting the line into parts 

244 parts = line.split() 

245 

246 # Extracting the ID and checking if it matches the search ID 

247 id = parts[0] 

248 if id == station_id: 

249 # Creating a dictionary to store the details 

250 result_dict = {} 

251 result_dict["ID"] = id 

252 result_dict["ICAO"] = parts[1] 

253 result_dict["NAME"] = " ".join(parts[2:-3]) 

254 result_dict["LAT"] = parts[-3] 

255 result_dict["LON"] = parts[-2] 

256 result_dict["ELEV"] = parts[-1] 

257 return result_dict 

258 

259 # warn that the station does not exist 

260 raise ValueError( 

261 f"Station for forecast data with the ID {station_id} could not be found in the " 

262 f"station list: {url}" 

263 ) 

264 

265 station_data = extract_info_for_station(data_str, station) 

266 

267 # convert to meta class 

268 meta = utils_import.MetaData() 

269 meta.station_id = station_data["ID"] 

270 meta.station_name = station_data["NAME"] 

271 meta.altitude = station_data["ELEV"] 

272 meta.longitude = station_data["LON"] 

273 meta.latitude = station_data["LAT"] 

274 meta.input_source = "DWD Forecast" 

275 

276 return meta 

277 

278 

279def _pull_DWD_historical_data(url: str, station: str) -> pd.DataFrame: 

280 """ 

281 Ruft die Messdaten von der angegebenen URL ab und konvertiert diese in 

282 ein pandas DataFrame 

283 

284 :param url: str URL des DWD-Ordners, in welchem die Messdaten gespeichert sind 

285 :param station: int/str Stationsname der DWD Wetterstation, Aachen-Orsbach ist 15000 

286 :return: data pandas DataFrame Abgerufener Datensatz und eventuelle Fehlermeldungen 

287 """ 

288 

289 # First, load all available filenames 

290 http_obj = urllib.request.urlopen(url).read().decode() 

291 

292 # DWD data contains the stations with leading zeros, the meta-data and station lists without 

293 # leading zeros. Apply leading zeros for pulling DWD data. 

294 station_with_leading_zeros = station.zfill(5) 

295 

296 # select only those file names that belong to the station 

297 zip_names = [ 

298 i 

299 for i in http_obj.split('"') 

300 if f"_{station_with_leading_zeros}_" in i and not i.startswith(">") 

301 ] 

302 

303 data_total = pd.DataFrame() 

304 

305 # download and read all available data to df 

306 for zip_name in zip_names: 

307 unzipped_path = _download_DWD_file(url, zip_name) 

308 

309 # extract data file path 

310 file_name = list(filter(lambda s: s[0] == "p", os.listdir(unzipped_path)))[0] 

311 file_path = os.path.join(unzipped_path, file_name) 

312 

313 # read data file 

314 data = pd.read_csv(file_path, sep=";") 

315 

316 # unify 10min data with 1h data for "MESS_DATUM" format 

317 # -> convert 2022012400 to 202201240000 

318 if len(data.iloc[0]["MESS_DATUM"].astype(str)) == 10: # if hourly 

319 data["MESS_DATUM"] = data["MESS_DATUM"] * 100 # add two zeros 

320 

321 # make MESS_DATUM the index for concenating 

322 data.set_index("MESS_DATUM", inplace=True, drop=True) 

323 

324 data_total = pd.concat([data_total, data], verify_integrity=True) 

325 

326 shutil.rmtree(definitions.local_folder_temp) 

327 

328 return data_total 

329 

330 

331def _download_DWD_file(url: str, zip_name: str): 

332 """ 

333 Downloads the file with the given filename from the specified URL and unzip. 

334 

335 Parameters: 

336 url (str): URL of the DWD folder. 

337 zip_name (str): Name of the file to be downloaded. 

338 

339 Returns: 

340 tuple: A tuple containing a boolean indicating the success or failure of the download, 

341 and the location of the downloaded file. 

342 Returns (False, None) if an error occurs during download. 

343 """ 

344 folder_unzip = "unzipped_content" 

345 

346 total_zip_name = os.path.join(definitions.local_folder_temp, zip_name) 

347 

348 if not os.path.exists(definitions.local_folder_temp): 

349 os.makedirs(definitions.local_folder_temp) 

350 

351 for i in range(4): # try retrieval 3 times 

352 try: 

353 urllib.request.urlretrieve(url + zip_name, total_zip_name) 

354 logger.debug("Loaded: %s", total_zip_name) 

355 

356 # save unzipped files to folder_unzip 

357 extract_path = os.path.join(definitions.local_folder_temp, folder_unzip) 

358 with zipfile.ZipFile(total_zip_name, "r") as zip_ref: 

359 zip_ref.extractall(extract_path) 

360 

361 return extract_path 

362 except Exception as excep: 

363 if i == 3: 

364 raise ConnectionError( 

365 f"Not loaded: {total_zip_name} \n" f"with error: {excep}" 

366 ) from excep