Skip to content

Commit a5a61fe

Browse files
committed
updated to_zarr() method in the getch_era5 class
1 parent 1c217a8 commit a5a61fe

1 file changed

Lines changed: 124 additions & 103 deletions

File tree

TopoPyScale/fetch_era5.py

Lines changed: 124 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ def get_era5_snowmapper(self, surf_plev, lastday):
208208
Funtion to call fetching of ERA5 data
209209
TODO:
210210
- merge monthly data into one file (cdo?)- this creates massive slow down!
211+
- merge monthly data into one file (cdo?)- this creates massive slow down!
211212
"""
212213
lonW = self.config.project.extent.get('lonW') - 0.4
213214
lonE = self.config.project.extent.get('lonE') + 0.4
@@ -241,25 +242,34 @@ def get_era5_snowmapper(self, surf_plev, lastday):
241242
output_format=output_format
242243
)
243244

244-
def to_zarr(self):
245+
def to_zarr(self, from_daily=True):
246+
"""
247+
Funtion to convert netcdf files into a Zarr store.
248+
"""
245249

246250
# extract chunk size for time and pressure level
247-
nlevels = len(self.config.climate[self.config.project.climate].plevels)
248251
ntime = int(self.config.climate.era5.timestep[:-1]) * 365
249-
250-
print("Be aware that the conversion to Zarr requires plenty of memory, and may crash the console if resources are not sufficient")
251-
252-
convert_netcdf_stack_to_zarr(
253-
path_to_netcdfs= str(self.config.climate.path / 'yearly'),
254-
zarrout= str(self.config.climate.path/'ERA5.zarr'),
255-
chunks={'latitude':3, 'longitude':3, 'time':ntime, 'level':nlevels},
256-
compressor=None,
257-
plev_name='PLEV_*.nc',
258-
surf_name='SURF_*.nc',
259-
parallelize=False,
260-
n_cores=self.config.project.parallelization.setting.multicore.CPU_cores
252+
if from_daily:
253+
convert_daily_nc_to_zarr(
254+
path_to_netcdfs= str(self.config.climate.path / 'daily'),
255+
zarr_store= str(self.config.climate.path/ self.config.climate.era5.zarr_store),
256+
chunks={'latitude':3, 'longitude':3, 'time':ntime},
257+
compressor=None,
258+
plev_name='dPLEV_*.nc',
259+
surf_name='dSURF_*.nc',
260+
dask_worker=self.config.project.parallelization.dask
261261
)
262262

263+
else:
264+
print('---> Converting yearly netcdf files to Zarr')
265+
convert_yearly_nc_to_zarr(
266+
path_to_netcdfs= str(self.config.climate.path / 'yearly'),
267+
zarr_store= str(self.config.climate.path/ self.config.climate.era5.zarr_store),
268+
chunks={'latitude':3, 'longitude':3, 'time':ntime},
269+
compressor=None,
270+
plev_name='PLEV_*.nc',
271+
surf_name='SURF_*.nc')
272+
263273

264274
def convert_yearly_nc_to_zarr(path_to_netcdfs='inputs/climate/yearly',
265275
zarr_store='inputs/climate/ERA5.zarr',
@@ -295,32 +305,17 @@ def convert_yearly_nc_to_zarr(path_to_netcdfs='inputs/climate/yearly',
295305
print(f"===> Combined Zarr store saved to {zarr_store}")
296306

297307

298-
def append_year_to_zarr(path_to_netcdfs='inputs/climate/yearly',
299-
zarr_store='inputs/climate/ERA5.zarr',
300-
chunks={'latitude':3, 'longitude':3, 'time':8760},
301-
compressor=None,
302-
plev_files=[],
303-
surf_files=[]):
304-
"""
305-
Function to append years to an existing Zarr store
306-
"""
307-
# Append remaining years
308-
for surf_file, pres_file in zip(surface_files, pressure_files):
309-
with xr.open_dataset(surf_file) as surface_ds, xr.open_dataset(pres_file) as pressure_ds:
310-
lyear.append(surf_file.name.split('.')[0][-4:])
311-
print(f"---> Appending year {surf_file.name.split('.')[0][-4:]}")
312-
combined_ds = xr.merge([surface_ds, pressure_ds], compat='override')
313-
combined_ds.chunk(chunks).to_zarr(zarr_store, append_dim="time")
314-
print(f"===> Years {lyear} appended to the Zarr store {zarr_store}")
315-
316-
317308
def convert_daily_nc_to_zarr(path_to_netcdfs='inputs/climate/yearly',
318-
zarrout='inputs/climate/ERA5.zarr',
309+
zarr_store='inputs/climate/ERA5.zarr',
319310
chunks={'latitude':3, 'longitude':3, 'time':8760, 'level':13},
320311
compressor=None,
321312
plev_name='dPLEV_*.nc',
322313
surf_name='dSURF_*.nc',
323-
n_cores=4):
314+
n_cores=4,
315+
dask_worker={'n_workers':4,
316+
'threads_per_worker':1,
317+
'memory_target_fraction':0.95,
318+
'memory_limit':'1.5GB'}):
324319
"""
325320
Function to convert a stack of netcdf PLEV and SURF files to a zarr store. Optimizing chunking based on auto detection by dask.
326321
@@ -331,40 +326,64 @@ def convert_daily_nc_to_zarr(path_to_netcdfs='inputs/climate/yearly',
331326
compressor (obj): Compressor object to use for encoding. See Zarr compression. Default: BloscCodec(cname='lz4', clevel=5, shuffle='bitshuffle', blocksize=0)
332327
plev_name (str): file pattern of the netcdf containing the pressure levels. Use * instead of the year
333328
surf_name (str): file pattern of the netcdf containing the surface level. Use * instead of the year
334-
n_cores (int): number of cores on which to distribute the load
329+
dask_worker (dict): specification of the dask worker in a dictionnary
335330
336331
"""
332+
cluster = LocalCluster(processes=True, **dask_worker)
333+
334+
with Client(cluster) as client:
335+
print(f"Dask client started with {len(client.scheduler_info()['workers'])} workers")
336+
337+
dwd = Path(path_to_netcdfs)
338+
surface_files = sorted(dwd.glob(surf_name))
339+
pressure_files = sorted(dwd.glob(plev_name))
340+
341+
if compressor is None:
342+
compressor = BloscCodec(cname='lz4', clevel=5, shuffle='bitshuffle', blocksize=0)
343+
344+
surface_ds = xr.open_mfdataset(
345+
surface_files,
346+
combine="by_coords",
347+
parallel=True,
348+
chunks={"time": chuncks.get("time")} # Chunk time dimension to 1 year
349+
)
350+
351+
pressure_ds = xr.open_mfdataset(
352+
pressure_files,
353+
combine="by_coords",
354+
parallel=True,
355+
chunks={"time": chuncks.get("time")} # Chunk time dimension to 1 year
356+
)
357+
358+
# Rechunk spatial dimensions
359+
surface_ds = surface_ds.chunk({"latitude": chunks.get("latitude"), "longitude": chunks.get("longitude")})
360+
pressure_ds = pressure_ds.chunk({"latitude": chunks.get("latitude"), "longitude": chunks.get("longitude")})
361+
362+
combined_ds = xr.merge([surface_ds, pressure_ds], compat='override')
363+
encoding = {var: {"compressors": [compressor]} for var in combined_ds.data_vars}
364+
combined_ds.to_zarr(zarr_store, mode="w", encoding=encoding, compute=True)
365+
print(f"===> Combined NC files to Zarr store: {zarr_store}")
337366

338-
client = Client(n_workers=n_cores, threads_per_worker=1, dashboard_address=':8787')
339-
dwd = Path(path_to_netcdfs)
340-
surface_files = sorted(dwd.glob(surf_name))
341-
pressure_files = sorted(dwd.glob(plev_name))
342367

343-
if compressor is None:
344-
compressor = BloscCodec(cname='lz4', clevel=5, shuffle='bitshuffle', blocksize=0)
345368

346-
surface_ds = xr.open_mfdataset(
347-
surface_files,
348-
combine="by_coords",
349-
parallel=True,
350-
chunks={"time": chuncks.get("time")} # Chunk time dimension to 1 year
351-
)
352-
353-
pressure_ds = xr.open_mfdataset(
354-
pressure_files,
355-
combine="by_coords",
356-
parallel=True,
357-
chunks={"time": chuncks.get("time")} # Chunk time dimension to 1 year
358-
)
369+
def append_year_to_zarr(path_to_netcdfs='inputs/climate/yearly',
370+
zarr_store='inputs/climate/ERA5.zarr',
371+
chunks={'latitude':3, 'longitude':3, 'time':8760},
372+
compressor=None,
373+
plev_files=[],
374+
surf_files=[]):
375+
"""
376+
Function to append years to an existing Zarr store
377+
"""
378+
# Append remaining years
379+
for surf_file, pres_file in zip(surface_files, pressure_files):
380+
with xr.open_dataset(surf_file) as surface_ds, xr.open_dataset(pres_file) as pressure_ds:
381+
lyear.append(surf_file.name.split('.')[0][-4:])
382+
print(f"---> Appending year {surf_file.name.split('.')[0][-4:]}")
383+
combined_ds = xr.merge([surface_ds, pressure_ds], compat='override')
384+
combined_ds.chunk(chunks).to_zarr(zarr_store, append_dim="time")
385+
print(f"===> Years {lyear} appended to the Zarr store {zarr_store}")
359386

360-
# Rechunk spatial dimensions
361-
surface_ds = surface_ds.chunk({"latitude": chunks.get("latitude"), "longitude": chunks.get("longitude")})
362-
pressure_ds = pressure_ds.chunk({"latitude": chunks.get("latitude"), "longitude": chunks.get("longitude")})
363-
364-
combined_ds = xr.merge([surface_ds, pressure_ds], compat='override')
365-
encoding = {var: {"compressors": [compressor]} for var in combined_ds.data_vars}
366-
combined_ds.to_zarr(zarr_store, mode="w", encoding=encoding, compute=True)
367-
print(f"===> Combined NC files to Zarr store: {zarr_store}")
368387

369388

370389
def to_zarr(ds, fout, chuncks={'x':3, 'y':3, 'time':8760, 'level':7}, compressor=None, mode='w'):
@@ -425,7 +444,7 @@ def fetch_era5_google_from_zarr(eraDir, startDate, endDate, lonW, latS, lonE, la
425444
def retrieve_era5(product, startDate, endDate, eraDir, latN, latS, lonE, lonW, step,
426445
num_threads=10, surf_plev='surf', plevels=None, realtime=False,
427446
output_format='netcdf', download_format="unarchived", new_CDS_API=True,
428-
rm_daily=False, surf_varoi=None, plev_varoi=None):
447+
surf_varoi=None, plev_varoi=None, merge_to_yearly=False, rm_daily=False):
429448
""" Sets up era5 surface retrieval.
430449
* Creates list of year/month pairs to iterate through.
431450
* MARS retrievals are most efficient when subset by time.
@@ -447,12 +466,13 @@ def retrieve_era5(product, startDate, endDate, eraDir, latN, latS, lonE, lonW, s
447466
output_format (str): default is "netcdf", can be "grib".
448467
download_format (str): default "unarchived". Can be "zip"
449468
new_CDS_API: flag to handle new formating of SURF files with the new CDS API (2024).
450-
rm_daily: remove folder containing all daily ERA5 file. Option to clear space of data converted to yearly files.
451469
surf_varoi: list of surface variable to download. Default is None, automatically assigning minimum required for topo_scale
452470
plev_varoi: list of pressure level variables to download. . Default is None, automatically assigning minimum required for topo_scale
471+
merge_to_yearly (bool): if true, daily files are merged into yearly netcdf
472+
rm_daily: remove folder containing all daily ERA5 file. Option to clear space of data converted to yearly files.
453473
454474
Returns:
455-
Monthly era surface files stored in disk.
475+
ERA surface and pressure level files stored in disk, either as daily (default) or yearly.
456476
457477
"""
458478
print('\n')
@@ -597,50 +617,51 @@ def retrieve_era5(product, startDate, endDate, eraDir, latN, latS, lonE, lonW, s
597617
if surf_plev == 'plev':
598618
remap_CDSbeta(str(eraDir / "daily" / "dPLEV*.nc"), file_type='plev')
599619

600-
# code to merge daily files to monthly
601-
# - [ ] write option earlier that checks also if monthly files already exist. improve logic that does not require to have both monthly and daily files
602-
cdo = Cdo()
603-
for year in df.year.unique():
604-
print(f"---> Merging daily {surf_plev.upper()} files from {year}")
605-
606-
if surf_plev == 'surf':
607-
fpat = str(eraDir / "daily" / ("dSURF_%04d*.nc" % (year)))
608-
fout = str(eraDir / "yearly" / ("SURF_%04d.nc" % (year)))
620+
if merge_to_yearly:
621+
# code to merge daily files to monthly
622+
# - [ ] Remove if conversion of daily files to Zarr works fine. This would remove the cdo dependency.
623+
cdo = Cdo()
624+
for year in df.year.unique():
625+
print(f"---> Merging daily {surf_plev.upper()} files from {year}")
626+
627+
if surf_plev == 'surf':
628+
fpat = str(eraDir / "daily" / ("dSURF_%04d*.nc" % (year)))
629+
fout = str(eraDir / "yearly" / ("SURF_%04d.nc" % (year)))
609630

610631

611-
try:
612-
cdo.mergetime(input=fpat, output=fout)
613-
except Exception as e:
614-
print(f"CDO warning (non-fatal): {e}")
615-
# Check if output file was actually created despite warnings
616-
if os.path.exists(fout):
617-
print(f"Merge completed successfully: {fout}")
618-
else:
619-
raise e # Re-raise if it's a real error
632+
try:
633+
cdo.mergetime(input=fpat, output=fout)
634+
except Exception as e:
635+
print(f"CDO warning (non-fatal): {e}")
636+
# Check if output file was actually created despite warnings
637+
if os.path.exists(fout):
638+
print(f"Merge completed successfully: {fout}")
639+
else:
640+
raise e # Re-raise if it's a real error
620641

621642

622643

623-
if surf_plev == 'plev':
624-
fpat = str(eraDir / "daily" / ("dPLEV_%04d*.nc" % (year)))
625-
fout = str(eraDir / "yearly" / ("PLEV_%04d.nc" % (year)))
644+
if surf_plev == 'plev':
645+
fpat = str(eraDir / "daily" / ("dPLEV_%04d*.nc" % (year)))
646+
fout = str(eraDir / "yearly" / ("PLEV_%04d.nc" % (year)))
626647

648+
try:
649+
cdo.mergetime(input=fpat, output=fout)
650+
except Exception as e:
651+
print(f"CDO warning (non-fatal): {e}")
652+
# Check if output file was actually created despite warnings
653+
if os.path.exists(fout):
654+
print(f"Merge completed successfully: {fout}")
655+
else:
656+
raise e # Re-raise if it's a real error
657+
658+
659+
if rm_daily:
627660
try:
628-
cdo.mergetime(input=fpat, output=fout)
629-
except Exception as e:
630-
print(f"CDO warning (non-fatal): {e}")
631-
# Check if output file was actually created despite warnings
632-
if os.path.exists(fout):
633-
print(f"Merge completed successfully: {fout}")
634-
else:
635-
raise e # Re-raise if it's a real error
636-
637-
638-
if rm_daily:
639-
try:
640-
shutil.rmtree(eraDir / "daily")
641-
print('---> Daily files removed')
642-
except:
643-
raise IOError('deletion of daily files issue')
661+
shutil.rmtree(eraDir / "daily")
662+
print('---> Daily files removed')
663+
except:
664+
raise IOError('deletion of daily files issue')
644665

645666
print("===> ERA5 netcdf files ready")
646667

0 commit comments

Comments
 (0)