Source code for nowcast.workers.make_averaged_dataset

#  Copyright 2013 – present by the SalishSeaCast Project contributors
#  and The University of British Columbia
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  See the License for the specific language governing permissions and
#  limitations under the License.

# SPDX-License-Identifier: Apache-2.0

"""SalishSeaCast worker that creates a down-sampled time-series dataset netCDF4 file from
another model product file using the Reshapr API.
# Intended uses:
# * create day-averaged datasets from hour-averaged NEMO output files as a model run
#   post-processing step
# * create month-averaged datasets from day-averaged at the end of each month of running
import logging
import os
from pathlib import Path

import arrow
import structlog
from nemo_nowcast import NowcastWorker, WorkerError
import reshapr.api.v1.extract
import xarray
from tenacity import retry, stop_after_attempt, wait_random, retry_if_exception_type

NAME = "make_averaged_dataset"
logger = logging.getLogger(NAME)

[docs] def main(): """For command-line usage see: :command:`python -m nowcast.workers.make_averaged_dataset --help` """ _configure_structlog() worker = NowcastWorker(NAME, description=__doc__) worker.init_cli() worker.cli.add_argument( "avg_time_interval", choices={"day", "month"}, help="Time interval over which to average the dataset", ) worker.cli.add_argument( "reshapr_var_group", choices={"biology", "chemistry", "physics"}, help="Dataset variable group to run extraction for", ) worker.cli.add_date_option( "--run-date","day"), help=""" Date of the run to calculate the day-averaged dataset for, or start date of the collection of daily runs to calculate the month-averaged dataset for. """, ), success, failure) return worker
def _configure_structlog(): """Configure structlog (used by Reshapr) to pass a formatted log message string to the :py:mod:`logging`. ref: """ structlog.configure( processors=[ # If log level is too low, abort pipeline and throw away log entry. structlog.stdlib.filter_by_level, # If the "stack_info" key in the event dict is true, remove it and # render the current stack trace in the "stack" key. structlog.processors.StackInfoRenderer(), # If the "exc_info" key in the event dict is either true or a # sys.exc_info() tuple, remove "exc_info" and render the exception # with traceback into the "exception" key. structlog.processors.format_exc_info, # If some value is in bytes, decode it to a unicode str. structlog.processors.UnicodeDecoder(), # Render the final event dict nicely aligned and ordered, but without colours # because we're generally logging to files., ], # ``wrapper_class`` is the bound logger that you get back from # get_logger(). This one imitates the API of ``logging.Logger``. wrapper_class=structlog.stdlib.BoundLogger, # ``logger_factory`` is used to create wrapped loggers that are used for # OUTPUT. This one returns a ``logging.Logger``. The final value (a string) # from the final processor (``ConsoleRenderer``) will be passed to # the method of the same name as that called on the bound logger. logger_factory=structlog.stdlib.LoggerFactory(), # Effectively freeze configuration after creating the first bound logger. cache_logger_on_first_use=True, ) def success(parsed_args): avg_time_interval = parsed_args.avg_time_interval run_date = parsed_args.run_date reshapr_var_group = parsed_args.reshapr_var_group match avg_time_interval: case "day": f"{avg_time_interval}-averaged dataset for {run_date.format('DD-MMM-YYYY')} " f"{reshapr_var_group} created" ) case "month": f"{avg_time_interval}-averaged dataset for {run_date.format('MMM-YYYY')} " f"{reshapr_var_group} created" ) msg_type = f"success {avg_time_interval} {reshapr_var_group}" return msg_type def failure(parsed_args): avg_time_interval = parsed_args.avg_time_interval run_date = parsed_args.run_date reshapr_var_group = parsed_args.reshapr_var_group match avg_time_interval: case "day": logger.critical( f"{avg_time_interval}-averaged dataset for {run_date.format('DD-MMM-YYYY')} " f"{reshapr_var_group} creation failed" ) case "month": logger.critical( f"{avg_time_interval}-averaged dataset for {run_date.format('MMM-YYYY')} " f"{reshapr_var_group} creation failed" ) msg_type = f"failure {avg_time_interval} {reshapr_var_group}" return msg_type def make_averaged_dataset(parsed_args, config, *args): avg_time_interval = parsed_args.avg_time_interval reshapr_var_group = parsed_args.reshapr_var_group run_date = parsed_args.run_date if avg_time_interval == "month" and != 1: logger.error( f"Month-averaging must start on the first day of a month but " f"run_date = {run_date.format('YYYY-MM-DD')}" ) raise WorkerError reshapr_config_dir = Path(config["averaged datasets"]["reshapr config dir"]) reshapr_config_yaml = config["averaged datasets"][avg_time_interval][ reshapr_var_group ]["reshapr config"] match avg_time_interval: case "day": f"creating {avg_time_interval}-averaged dataset for " f"{run_date.format('DD-MMM-YYYY')} {reshapr_var_group}" ) reshapr_config = reshapr.api.v1.extract.load_extraction_config( reshapr_config_dir / reshapr_config_yaml, run_date, run_date ) dest_dir = Path(reshapr_config["extracted dataset"]["dest dir"]) ddmmmyy = run_date.format("DDMMMYY").lower() reshapr_config["extracted dataset"]["dest dir"] = dest_dir / ddmmmyy case "month": f"creating {avg_time_interval}-averaged dataset for " f"{run_date.format('MMM-YYYY')} {reshapr_var_group}" ) start_date = run_date end_date = run_date.shift(months=+1, days=-1) reshapr_config = reshapr.api.v1.extract.load_extraction_config( reshapr_config_dir / reshapr_config_yaml, start_date, end_date ) nc_path = _extract_netcdf(reshapr_config, reshapr_config_yaml) nc_path.chmod(0o664) if avg_time_interval == "day": file_pattern = config["averaged datasets"][avg_time_interval][ reshapr_var_group ]["file pattern"] dest_nc_filename = file_pattern.format(yyyymmdd=run_date.format("YYYYMMDD")) nc_path = nc_path.rename(nc_path.with_name(dest_nc_filename)) return { f"{avg_time_interval} {reshapr_var_group}": { "run date": run_date.format("YYYY-MM-DD"), "file path": os.fspath(nc_path), } } @retry( retry=retry_if_exception_type((WorkerError, OSError, RuntimeError)), reraise=True, stop=stop_after_attempt(5), wait=wait_random(min=5, max=10), ) def _extract_netcdf(reshapr_config, reshapr_config_yaml): nc_path = reshapr.api.v1.extract.extract_netcdf(reshapr_config, reshapr_config_yaml) if not nc_path.exists(): logger.error(f"reshapr extraction failed for {os.fspath(nc_path)}") raise WorkerError() nc_path_size = nc_path.stat().st_size if nc_path_size < 1_000_000: logger.error( f"reshapr extraction failed: {os.fspath(nc_path)} is too small: {nc_path_size}" ) raise WorkerError() try: xarray.open_dataset(nc_path) except OSError as exc: logger.error(exc) raise WorkerError return nc_path if __name__ == "__main__": main() # pragma: no cover