# Copyright 2013 – present by the SalishSeaCast Project contributors
# and The University of British Columbia
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# SPDX-License-Identifier: Apache-2.0
"""SalishSeaCast worker that loads ECCC MSC 2.5 km rotated lat-lon continental grid HRDPS GRIB2
files, crops them to the subdomain needed for SalishSeaCast NEMO forcing, and writes them to
new GRIB2 files.
"""
# Development notebook:
#
# * https://github.com/SalishSeaCast/analysis-doug/tree/main/notebooks/continental-HRDPS/crop-grib-to-SSC-domain.ipynb.ipynb
import logging
import os
import warnings
from pathlib import Path
import arrow
import watchdog.observers
import watchdog.events
import xarray
from cfgrib import xarray_to_grib
from nemo_nowcast import NowcastWorker
from nowcast import lib
NAME = "crop_gribs"
logger = logging.getLogger(NAME)
[docs]
def main():
"""For command-line usage see:
:command:`python -m nowcast.workers.crop_gribs --help`
"""
worker = NowcastWorker(NAME, description=__doc__)
worker.init_cli()
worker.cli.add_argument(
"forecast",
choices={"00", "06", "12", "18"},
help="Name of forecast to crop files in.",
)
worker.cli.add_date_option(
"--fcst-date",
default=arrow.now().floor("day"),
help="Forecast date to crop files in.",
)
worker.cli.add_argument(
"--backfill",
action="store_true",
help="""
Crop files immediately instead of waiting for file system events to signal their
existence. This is intended for use in recovery from automation failures.
""",
)
worker.cli.add_argument(
"--var-hour",
type=int,
help="Forecast hour to crop file for specific variable in; must be used with --var",
)
worker.cli.add_argument(
"--var",
dest="msc_var_name",
help="Forecast variable to crop file for; must be used with --var-hour",
)
worker.run(crop_gribs, success, failure)
return worker
def success(parsed_args):
ymd = parsed_args.fcst_date.format("YYYY-MM-DD")
logger.info(f"{ymd} {parsed_args.forecast} GRIBs cropping complete")
msg_type = f"success {parsed_args.forecast}"
return msg_type
def failure(parsed_args):
ymd = parsed_args.fcst_date.format("YYYY-MM-DD")
logger.critical(f"{ymd} {parsed_args.forecast} GRIBs cropping failed")
msg_type = f"failure {parsed_args.forecast}"
return msg_type
def crop_gribs(parsed_args, config, *args):
"""Collect weather forecast results from hourly GRIB2 files
and produces day-long NEMO atmospheric forcing netCDF files.
"""
checklist = {}
fcst_hr = parsed_args.forecast
fcst_date = parsed_args.fcst_date
backfill = parsed_args.backfill
var_hour = parsed_args.var_hour
msc_var_name = parsed_args.msc_var_name
logger.info(
f"cropping {fcst_date.format('YYYY-MM-DD')} ECCC HRDPS 2.5km continental "
f"{fcst_hr}Z GRIB files to SalishSeaCast subdomain"
)
eccc_file_tmpl = config["weather"]["download"]["2.5 km"]["ECCC file template"]
var_names = config["weather"]["download"]["2.5 km"]["variables"]
fcst_dur = var_hour or config["weather"]["download"]["2.5 km"]["forecast duration"]
msc_var_names = [vars[0] for vars in var_names]
eccc_grib_files = _calc_grib_file_paths(
eccc_file_tmpl,
fcst_date,
fcst_hr,
fcst_dur,
msc_var_names,
config,
msc_var_name,
)
if backfill:
for eccc_grib_file in eccc_grib_files:
_write_ssc_grib_file(eccc_grib_file, config)
logger.info(
f"finished cropping ECCC grib file to SalishSeaCast subdomain: {eccc_grib_file}"
)
checklist[fcst_hr] = "cropped to SalishSeaCast subdomain"
return checklist
if msc_var_name and var_hour:
# Crop a single variable-hour file
eccc_grib_file = eccc_grib_files.pop()
_write_ssc_grib_file(eccc_grib_file, config)
logger.info(
f"finished cropping ECCC grib file to SalishSeaCast subdomain: {eccc_grib_file}"
)
checklist[fcst_hr] = "cropped to SalishSeaCast subdomain"
return checklist
handler = _GribFileEventHandler(eccc_grib_files, config)
observer = watchdog.observers.Observer()
grib_dir = Path(config["weather"]["download"]["2.5 km"]["GRIB dir"])
fcst_yyyymmdd = fcst_date.format("YYYYMMDD")
grib_fcst_dir = grib_dir / Path(fcst_yyyymmdd, fcst_hr)
observer.schedule(handler, os.fspath(grib_fcst_dir), recursive=True)
logger.info(f"starting to watch for ECCC grib files to crop in {grib_fcst_dir}/")
# Create the directory we're going to watch to handle 2 rare situations that can cause
# a FileNotFoundError to be raised (issue #197):
# 1. collect_weather and crop_gribs are launched concurrently. Occasionally crop_gribs tries
# to start watching the directory before collect_weather creates it.
# 2. When collect_weather --backfill has to be run to recover from automation problems we need
# to start crop_gribs first. So, it needs to create the directory.
grp_name = config["file group"]
lib.mkdir(grib_fcst_dir, logger, grp_name=grp_name)
observer.start()
start_time = arrow.now()
while eccc_grib_files:
if (arrow.now() - start_time).seconds > 3600 * 8: # 8 hours
_handle_stalled_observer(eccc_grib_files, fcst_hr, config)
break
# We need to have a timeout on the observer thread so that the status
# of the ECCC grib files set gets checked, otherwise the worker never
# finishes because the main thread is blocked by the observer thread.
observer.join(timeout=0.5)
observer.stop()
observer.join()
logger.info(
f"finished cropping ECCC grib files to SalishSeaCast subdomain in {grib_fcst_dir}/"
)
checklist[fcst_hr] = "cropped to SalishSeaCast subdomain"
return checklist
def _calc_grib_file_paths(
file_tmpl, fcst_date, fcst_hr, fcst_dur, msc_var_names, config, msc_var_name=None
):
"""
:param str file_tmpl:
:param :py:class:`arrow.Arrow` fcst_date:
:param str fcst_hr:
:param int fcst_dur:
:param list msc_var_names:
:param :py:class:`nemo_nowcast.Config` config:
:param msc_var_name:
:rtype: set
"""
grib_dir = Path(config["weather"]["download"]["2.5 km"]["GRIB dir"])
fcst_yyyymmdd = fcst_date.format("YYYYMMDD")
logger.debug(
f"creating ECCC GRIB file paths list for {fcst_yyyymmdd} {fcst_hr}Z forecast"
)
fcst_steps_start = 1
if msc_var_name:
msc_var_names = [msc_var_name]
fcst_steps_start = fcst_dur
grib_files = set()
for msc_var in msc_var_names:
for fcst_step in range(fcst_steps_start, fcst_dur + 1):
grib_hr_dir = grib_dir / Path(fcst_yyyymmdd, fcst_hr, f"{fcst_step:03d}")
grib_file = file_tmpl.format(
date=fcst_yyyymmdd,
forecast=fcst_hr,
variable=msc_var,
hour=f"{fcst_step:03d}",
)
grib_files.add(grib_hr_dir / grib_file)
return grib_files
def _write_ssc_grib_file(eccc_grib_file, config):
"""
:param :py:class:`pathlib.Path` eccc_grib_file:
:param :py:class:`nemo_nowcast.Config` config:
"""
ssc_grib_file = (
f"{eccc_grib_file.parent / eccc_grib_file.stem}_SSC{eccc_grib_file.suffix}"
)
if Path(ssc_grib_file).exists():
logger.debug(
f"cropping skipped because SalishSeaCast subdomain GRIB file exist: {ssc_grib_file}"
)
return
y_min, y_max = config["weather"]["download"]["2.5 km"]["lat indices"]
x_min, x_max = config["weather"]["download"]["2.5 km"]["lon indices"]
# We need 1 point more than the final domain size to facilitate calculation of the
# grid rotation angle for the wind components
y_slice = slice(y_min, y_max + 1)
x_slice = slice(x_min, x_max + 1)
ny, nx = y_max - y_min + 1, x_max - x_min + 1
with xarray.open_dataset(
eccc_grib_file, engine="cfgrib", backend_kwargs={"indexpath": ""}
) as eccc_ds:
ssc_ds = eccc_ds.sel(y=y_slice, x=x_slice)
# **NOTE:** This is brittle if the ECCC HRDPS file name convention changes
msc_var = eccc_grib_file.stem.split("HRDPS_")[1].split("_RLatLon")[0]
vars = config["weather"]["download"]["2.5 km"]["variables"]
for var in vars:
if var[0] == msc_var:
grib_var = var[1]
break
ssc_ds[grib_var].attrs.update(
{
"GRIB_numberOfPoints": nx * ny,
"GRIB_Nx": nx,
"GRIB_Ny": ny,
}
)
_xarray_to_grib(ssc_ds, ssc_grib_file)
logger.debug(f"wrote GRIB file cropped to SalishSeaCast subdomain: {ssc_grib_file}")
def _xarray_to_grib(ssc_ds, ssc_grib_file):
"""Write GRIB file.
This is a separate function to facilitate unit testing of _write_ssc_grib_file().
:param :py:class:`xarray.Dataset` ssc_ds:
:param :py:class:`pathlib.Path` ssc_grib_file:
"""
with warnings.catch_warnings():
warnings.simplefilter("ignore")
xarray_to_grib.to_grib(ssc_ds, ssc_grib_file)
def _handle_stalled_observer(eccc_grib_files, fcst_hr, config):
"""
:param set eccc_grib_files:
:param str fcst_hr:
:param :py:class:`nemo_nowcast.Config` config:
"""
if (remaining_files := len(eccc_grib_files)) > 10:
logger.critical(
f"crop_gribs {fcst_hr} has watched for 8h and {remaining_files} "
f"files remain unprocessed: "
f"{' '.join(os.fspath(eccc_grib_file) for eccc_grib_file in eccc_grib_files)}"
)
# We need to break rather than raise WorkerError because crop_gribs needs to
# appear to finish successfully regardless so that the instance for the next
# forecast is started.
return
logger.info(
f"crop_gribs {fcst_hr} has watched for 8h; retrying remaining {remaining_files} file(s)"
)
for eccc_grib_file in eccc_grib_files:
try:
_write_ssc_grib_file(eccc_grib_file, config)
except FileNotFoundError:
logger.critical(
f"crop_gribs {fcst_hr} has watched for 8h and at least 1 file has not "
f"yet been downloaded: "
f"{' '.join(os.fspath(eccc_grib_file) for eccc_grib_file in eccc_grib_files)}"
)
# We need to return rather than raise WorkerError because crop_gribs needs to
# appear to finish successfully regardless so that the instance for the next
# forecast is started.
return
class _GribFileEventHandler(watchdog.events.FileSystemEventHandler):
"""watchdog file system event handler that detects completion of HRDPS file moves
from the downloads directory into the atmospheric forcing tree.
"""
def __init__(self, eccc_grib_files, config):
super().__init__()
self.eccc_grib_files = eccc_grib_files
self.config = config
def on_closed(self, event):
super().on_closed(event)
if Path(event.src_path) in self.eccc_grib_files:
eccc_grib_file = Path(event.src_path)
_write_ssc_grib_file(eccc_grib_file, self.config)
self.eccc_grib_files.remove(eccc_grib_file)
logger.debug(
f"observer thread files remaining to process: {len(self.eccc_grib_files)}"
)
if __name__ == "__main__":
main() # pragma: no cover