Source code for nowcast.workers.crop_gribs

#  Copyright 2013 – present by the SalishSeaCast Project contributors
#  and The University of British Columbia
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

# SPDX-License-Identifier: Apache-2.0


"""SalishSeaCast worker that loads ECCC MSC 2.5 km rotated lat-lon continental grid HRDPS GRIB2
files, crops them to the subdomain needed for SalishSeaCast NEMO forcing, and writes them to
new GRIB2 files.
"""
# Development notebook:
#
# * https://github.com/SalishSeaCast/analysis-doug/tree/main/notebooks/continental-HRDPS/crop-grib-to-SSC-domain.ipynb.ipynb


import logging
import os
import warnings
from pathlib import Path

import arrow
import watchdog.observers
import watchdog.events
import xarray
from cfgrib import xarray_to_grib
from nemo_nowcast import NowcastWorker

from nowcast import lib


NAME = "crop_gribs"
logger = logging.getLogger(NAME)


[docs] def main(): """For command-line usage see: :command:`python -m nowcast.workers.crop_gribs --help` """ worker = NowcastWorker(NAME, description=__doc__) worker.init_cli() worker.cli.add_argument( "forecast", choices={"00", "06", "12", "18"}, help="Name of forecast to crop files in.", ) worker.cli.add_date_option( "--fcst-date", default=arrow.now().floor("day"), help="Forecast date to crop files in.", ) worker.cli.add_argument( "--backfill", action="store_true", help=""" Crop files immediately instead of waiting for file system events to signal their existence. This is intended for use in recovery from automation failures. """, ) worker.cli.add_argument( "--var-hour", type=int, help="Forecast hour to crop file for specific variable in; must be used with --var", ) worker.cli.add_argument( "--var", dest="msc_var_name", help="Forecast variable to crop file for; must be used with --var-hour", ) worker.run(crop_gribs, success, failure) return worker
def success(parsed_args): ymd = parsed_args.fcst_date.format("YYYY-MM-DD") logger.info(f"{ymd} {parsed_args.forecast} GRIBs cropping complete") msg_type = f"success {parsed_args.forecast}" return msg_type def failure(parsed_args): ymd = parsed_args.fcst_date.format("YYYY-MM-DD") logger.critical(f"{ymd} {parsed_args.forecast} GRIBs cropping failed") msg_type = f"failure {parsed_args.forecast}" return msg_type def crop_gribs(parsed_args, config, *args): """Collect weather forecast results from hourly GRIB2 files and produces day-long NEMO atmospheric forcing netCDF files. """ checklist = {} fcst_hr = parsed_args.forecast fcst_date = parsed_args.fcst_date backfill = parsed_args.backfill var_hour = parsed_args.var_hour msc_var_name = parsed_args.msc_var_name logger.info( f"cropping {fcst_date.format('YYYY-MM-DD')} ECCC HRDPS 2.5km continental " f"{fcst_hr}Z GRIB files to SalishSeaCast subdomain" ) eccc_file_tmpl = config["weather"]["download"]["2.5 km"]["ECCC file template"] var_names = config["weather"]["download"]["2.5 km"]["variables"] fcst_dur = var_hour or config["weather"]["download"]["2.5 km"]["forecast duration"] msc_var_names = [vars[0] for vars in var_names] eccc_grib_files = _calc_grib_file_paths( eccc_file_tmpl, fcst_date, fcst_hr, fcst_dur, msc_var_names, config, msc_var_name, ) if backfill: for eccc_grib_file in eccc_grib_files: _write_ssc_grib_file(eccc_grib_file, config) logger.info( f"finished cropping ECCC grib file to SalishSeaCast subdomain: {eccc_grib_file}" ) checklist[fcst_hr] = "cropped to SalishSeaCast subdomain" return checklist if msc_var_name and var_hour: # Crop a single variable-hour file eccc_grib_file = eccc_grib_files.pop() _write_ssc_grib_file(eccc_grib_file, config) logger.info( f"finished cropping ECCC grib file to SalishSeaCast subdomain: {eccc_grib_file}" ) checklist[fcst_hr] = "cropped to SalishSeaCast subdomain" return checklist handler = _GribFileEventHandler(eccc_grib_files, config) observer = watchdog.observers.Observer() grib_dir = Path(config["weather"]["download"]["2.5 km"]["GRIB dir"]) fcst_yyyymmdd = fcst_date.format("YYYYMMDD") grib_fcst_dir = grib_dir / Path(fcst_yyyymmdd, fcst_hr) observer.schedule(handler, os.fspath(grib_fcst_dir), recursive=True) logger.info(f"starting to watch for ECCC grib files to crop in {grib_fcst_dir}/") # Create the directory we're going to watch to handle 2 rare situations that can cause # a FileNotFoundError to be raised (issue #197): # 1. collect_weather and crop_gribs are launched concurrently. Occasionally crop_gribs tries # to start watching the directory before collect_weather creates it. # 2. When collect_weather --backfill has to be run to recover from automation problems we need # to start crop_gribs first. So, it needs to create the directory. grp_name = config["file group"] lib.mkdir(grib_fcst_dir, logger, grp_name=grp_name) observer.start() start_time = arrow.now() while eccc_grib_files: if (arrow.now() - start_time).seconds > 3600 * 8: # 8 hours _handle_stalled_observer(eccc_grib_files, fcst_hr, config) break # We need to have a timeout on the observer thread so that the status # of the ECCC grib files set gets checked, otherwise the worker never # finishes because the main thread is blocked by the observer thread. observer.join(timeout=0.5) observer.stop() observer.join() logger.info( f"finished cropping ECCC grib files to SalishSeaCast subdomain in {grib_fcst_dir}/" ) checklist[fcst_hr] = "cropped to SalishSeaCast subdomain" return checklist def _calc_grib_file_paths( file_tmpl, fcst_date, fcst_hr, fcst_dur, msc_var_names, config, msc_var_name=None ): """ :param str file_tmpl: :param :py:class:`arrow.Arrow` fcst_date: :param str fcst_hr: :param int fcst_dur: :param list msc_var_names: :param :py:class:`nemo_nowcast.Config` config: :param msc_var_name: :rtype: set """ grib_dir = Path(config["weather"]["download"]["2.5 km"]["GRIB dir"]) fcst_yyyymmdd = fcst_date.format("YYYYMMDD") logger.debug( f"creating ECCC GRIB file paths list for {fcst_yyyymmdd} {fcst_hr}Z forecast" ) fcst_steps_start = 1 if msc_var_name: msc_var_names = [msc_var_name] fcst_steps_start = fcst_dur grib_files = set() for msc_var in msc_var_names: for fcst_step in range(fcst_steps_start, fcst_dur + 1): grib_hr_dir = grib_dir / Path(fcst_yyyymmdd, fcst_hr, f"{fcst_step:03d}") grib_file = file_tmpl.format( date=fcst_yyyymmdd, forecast=fcst_hr, variable=msc_var, hour=f"{fcst_step:03d}", ) grib_files.add(grib_hr_dir / grib_file) return grib_files def _write_ssc_grib_file(eccc_grib_file, config): """ :param :py:class:`pathlib.Path` eccc_grib_file: :param :py:class:`nemo_nowcast.Config` config: """ ssc_grib_file = ( f"{eccc_grib_file.parent / eccc_grib_file.stem}_SSC{eccc_grib_file.suffix}" ) if Path(ssc_grib_file).exists(): logger.debug( f"cropping skipped because SalishSeaCast subdomain GRIB file exist: {ssc_grib_file}" ) return y_min, y_max = config["weather"]["download"]["2.5 km"]["lat indices"] x_min, x_max = config["weather"]["download"]["2.5 km"]["lon indices"] # We need 1 point more than the final domain size to facilitate calculation of the # grid rotation angle for the wind components y_slice = slice(y_min, y_max + 1) x_slice = slice(x_min, x_max + 1) ny, nx = y_max - y_min + 1, x_max - x_min + 1 with xarray.open_dataset( eccc_grib_file, engine="cfgrib", backend_kwargs={"indexpath": ""} ) as eccc_ds: ssc_ds = eccc_ds.sel(y=y_slice, x=x_slice) # **NOTE:** This is brittle if the ECCC HRDPS file name convention changes msc_var = eccc_grib_file.stem.split("HRDPS_")[1].split("_RLatLon")[0] vars = config["weather"]["download"]["2.5 km"]["variables"] for var in vars: if var[0] == msc_var: grib_var = var[1] break ssc_ds[grib_var].attrs.update( { "GRIB_numberOfPoints": nx * ny, "GRIB_Nx": nx, "GRIB_Ny": ny, } ) _xarray_to_grib(ssc_ds, ssc_grib_file) logger.debug(f"wrote GRIB file cropped to SalishSeaCast subdomain: {ssc_grib_file}") def _xarray_to_grib(ssc_ds, ssc_grib_file): """Write GRIB file. This is a separate function to facilitate unit testing of _write_ssc_grib_file(). :param :py:class:`xarray.Dataset` ssc_ds: :param :py:class:`pathlib.Path` ssc_grib_file: """ with warnings.catch_warnings(): warnings.simplefilter("ignore") xarray_to_grib.to_grib(ssc_ds, ssc_grib_file) def _handle_stalled_observer(eccc_grib_files, fcst_hr, config): """ :param set eccc_grib_files: :param str fcst_hr: :param :py:class:`nemo_nowcast.Config` config: """ if (remaining_files := len(eccc_grib_files)) > 10: logger.critical( f"crop_gribs {fcst_hr} has watched for 8h and {remaining_files} " f"files remain unprocessed: " f"{' '.join(os.fspath(eccc_grib_file) for eccc_grib_file in eccc_grib_files)}" ) # We need to break rather than raise WorkerError because crop_gribs needs to # appear to finish successfully regardless so that the instance for the next # forecast is started. return logger.info( f"crop_gribs {fcst_hr} has watched for 8h; retrying remaining {remaining_files} file(s)" ) for eccc_grib_file in eccc_grib_files: try: _write_ssc_grib_file(eccc_grib_file, config) except FileNotFoundError: logger.critical( f"crop_gribs {fcst_hr} has watched for 8h and at least 1 file has not " f"yet been downloaded: " f"{' '.join(os.fspath(eccc_grib_file) for eccc_grib_file in eccc_grib_files)}" ) # We need to return rather than raise WorkerError because crop_gribs needs to # appear to finish successfully regardless so that the instance for the next # forecast is started. return class _GribFileEventHandler(watchdog.events.FileSystemEventHandler): """watchdog file system event handler that detects completion of HRDPS file moves from the downloads directory into the atmospheric forcing tree. """ def __init__(self, eccc_grib_files, config): super().__init__() self.eccc_grib_files = eccc_grib_files self.config = config def on_closed(self, event): super().on_closed(event) if Path(event.src_path) in self.eccc_grib_files: eccc_grib_file = Path(event.src_path) _write_ssc_grib_file(eccc_grib_file, self.config) self.eccc_grib_files.remove(eccc_grib_file) logger.debug( f"observer thread files remaining to process: {len(self.eccc_grib_files)}" ) if __name__ == "__main__": main() # pragma: no cover