Source code for nowcast.next_workers

#  Copyright 2013 – present by the SalishSeaCast Project contributors
#  and The University of British Columbia
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

# SPDX-License-Identifier: Apache-2.0


"""Functions to calculate lists of workers to launch after previous workers
end their work.

Function names **must** be of the form :py:func:`after_worker_name`.
"""
from pathlib import Path

import arrow
from nemo_nowcast import NextWorker


[docs] def after_download_weather(msg, config, checklist): """Calculate the list of workers to launch after the download_weather worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure 2.5km 00": [], "failure 2.5km 06": [], "failure 2.5km 12": [], "failure 2.5km 18": [], "failure 1km 00": [], "failure 1km 12": [], "success 2.5km 00": [], "success 2.5km 06": [], "success 2.5km 12": [], "success 2.5km 18": [], "success 1km 00": [], "success 1km 12": [], } if msg.type.startswith("success"): data_date = arrow.now().shift(days=-1).format("YYYY-MM-DD") if msg.type.endswith("2.5km 06"): for river_name in config["rivers"]["stations"]["ECCC"]: next_workers["success 2.5km 06"].append( NextWorker( "nowcast.workers.collect_river_data", args=["ECCC", river_name, "--data-date", data_date], ) ) for stn in config["observations"]["ctd data"]["stations"]: next_workers["success 2.5km 06"].append( NextWorker("nowcast.workers.get_onc_ctd", args=[stn]) ) for ferry in config["observations"]["ferry data"]["ferries"]: next_workers["success 2.5km 06"].append( NextWorker("nowcast.workers.get_onc_ferry", args=[ferry]) ) if "forecast2" in config["run types"]: next_workers["success 2.5km 06"].append( NextWorker("nowcast.workers.collect_NeahBay_ssh", args=["00"]), ) race_condition_workers = { "grib_to_netcdf", "make_runoff_file", "make_v202111_runoff_file", } return next_workers[msg.type], race_condition_workers if msg.type.endswith("2.5km 12"): for river_name in config["rivers"]["stations"]["USGS"]: next_workers["success 2.5km 12"].append( NextWorker( "nowcast.workers.collect_river_data", args=["USGS", river_name, "--data-date", data_date], ) ) next_workers["success 2.5km 12"].extend( [ NextWorker("nowcast.workers.make_turbidity_file"), NextWorker("nowcast.workers.collect_NeahBay_ssh", args=["06"]), NextWorker("nowcast.workers.download_live_ocean"), ] ) race_condition_workers = { "grib_to_netcdf", "make_live_ocean_files", "make_runoff_file", "make_v202111_runoff_file", } return next_workers[msg.type], race_condition_workers return next_workers[msg.type]
[docs] def after_collect_weather(msg, config, checklist): """Calculate the list of workers to launch after the collect_weather worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure 2.5km 00": [], "failure 2.5km 06": [], "failure 2.5km 12": [], "failure 2.5km 18": [], "failure 1km 00": [], "failure 1km 12": [], "success 2.5km 00": [], "success 2.5km 06": [], "success 2.5km 12": [], "success 2.5km 18": [], "success 1km 00": [], "success 1km 12": [], msg.type: after_download_weather(msg, config, checklist), } if msg.type.endswith("2.5km 00"): if msg.type.startswith("success"): grib_dir = Path(checklist["weather forecast"]["00 2.5km"]) fcst_date_yyyymmdd = grib_dir.parent.stem fcst_date = arrow.get(fcst_date_yyyymmdd, "YYYYMMDD").format("YYYY-MM-DD") next_workers["success 2.5km 00"].extend( [ NextWorker("nowcast.workers.collect_weather", args=["06", "2.5km"]), NextWorker( "nowcast.workers.crop_gribs", args=["06", "--fcst-date", fcst_date], ), ] ) if msg.type.endswith("2.5km 06"): if msg.type.startswith("success"): next_workers, race_condition_workers = after_download_weather( msg, config, checklist ) next_workers.extend( [ NextWorker("nowcast.workers.collect_weather", args=["12", "2.5km"]), NextWorker("nowcast.workers.crop_gribs", args=["12"]), ] ) return next_workers, race_condition_workers if msg.type.endswith("2.5km 12"): if msg.type.startswith("success"): next_workers, race_condition_workers = after_download_weather( msg, config, checklist ) next_workers.extend( [ NextWorker("nowcast.workers.collect_weather", args=["18", "2.5km"]), NextWorker("nowcast.workers.crop_gribs", args=["18"]), ] ) return next_workers, race_condition_workers if msg.type.endswith("2.5km 18"): if msg.type.startswith("success"): grib_dir = Path(checklist["weather forecast"]["18 2.5km"]) fcst_date_yyyymmdd = grib_dir.parent.stem fcst_date = ( arrow.get(fcst_date_yyyymmdd, "YYYYMMDD") .shift(days=+1) .format("YYYY-MM-DD") ) next_workers["success 2.5km 18"].extend( [ NextWorker("nowcast.workers.download_weather", args=["00", "1km"]), NextWorker("nowcast.workers.download_weather", args=["12", "1km"]), NextWorker("nowcast.workers.collect_weather", args=["00", "2.5km"]), NextWorker( "nowcast.workers.crop_gribs", args=["00", "--fcst-date", fcst_date], ), ] ) return next_workers[msg.type]
[docs] def after_crop_gribs(msg, config, checklist): """Calculate the list of workers to launch after the crop_gribs worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure 00": [], "failure 06": [], "failure 12": [], "failure 18": [], "success 00": [], "success 06": [], "success 12": [], "success 18": [], } if msg.type == "success 06": next_workers["success 06"].append( NextWorker("nowcast.workers.grib_to_netcdf", args=["forecast2"]) ) if msg.type == "success 12": next_workers["success 12"].append( NextWorker("nowcast.workers.grib_to_netcdf", args=["nowcast+"]) ) return next_workers[msg.type]
[docs] def after_collect_river_data(msg, config, checklist): """Calculate the list of workers to launch after the collect_river_data worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = {"crash": [], "failure": [], "success": []} return next_workers[msg.type]
[docs] def after_make_v202111_runoff_file(msg, config, checklist): """Calculate the list of workers to launch after the make_v202111_runoff_file worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = {"crash": [], "failure": [], "success": []} return next_workers[msg.type]
[docs] def after_make_runoff_file(msg, config, checklist): """Calculate the list of workers to launch after the make_runoff_file worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = {"crash": [], "failure": [], "success": []} return next_workers[msg.type]
[docs] def after_collect_NeahBay_ssh(msg, config, checklist): """Calculate the list of workers to launch after the collect_NeahBay_ssh worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure 00": [], "failure 06": [], "failure 12": [], "failure 18": [], "success 00": [], "success 06": [], "success 12": [], "success 18": [], } if msg.type.startswith("success"): ssh_fcst_run_type_map = { "00": "forecast2", "06": "nowcast", } data_date = checklist["Neah Bay ssh data"]["data date"] ssh_forecast = msg.type.split()[1] next_workers[f"success {ssh_forecast}"] = [ NextWorker( "nowcast.workers.make_ssh_files", args=[ ssh_fcst_run_type_map[ssh_forecast], "--run-date", data_date, ], ) ] return next_workers[msg.type]
[docs] def after_make_ssh_files(msg, config, checklist): """Calculate the list of workers to launch after the make_ssh_files worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure nowcast": [], "failure forecast2": [], "success nowcast": [], "success forecast2": [], } if msg.type.startswith("success"): next_workers[msg.type].append( NextWorker("nowcast.workers.make_v202111_runoff_file") ) next_workers[msg.type].append(NextWorker("nowcast.workers.make_runoff_file")) return next_workers[msg.type]
[docs] def after_grib_to_netcdf(msg, config, checklist): """Calculate the list of workers to launch after the grib_to_netcdf worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure nowcast+": [], "failure forecast2": [], "success nowcast+": [], "success forecast2": [], } if msg.type.startswith("success"): _, run_type = msg.type.split() if run_type == "nowcast+": next_workers["success nowcast+"].append( NextWorker("nowcast.workers.ping_erddap", args=["weather"]) ) if run_type == "forecast2": for host in config["run"]["enabled hosts"]: if not config["run"]["enabled hosts"][host]["shared storage"]: next_workers[f"success {run_type}"].append( NextWorker( "nowcast.workers.upload_forcing", args=[host, run_type] ) ) return next_workers[msg.type]
[docs] def after_get_onc_ctd(msg, config, checklist): """Calculate the list of workers to launch after the get_onc_ctd worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure": [], "success SCVIP": [], "success SEVIP": [], "success USDDL": [], } if msg.type.startswith("success"): ctd_stn = msg.type.split()[1] next_workers[msg.type].append( NextWorker("nowcast.workers.ping_erddap", args=[f"{ctd_stn}-CTD"]) ) return next_workers[msg.type]
[docs] def after_get_onc_ferry(msg, config, checklist): """Calculate the list of workers to launch after the get_onc_ferry worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = {"crash": [], "failure TWDP": [], "success TWDP": []} if msg.type.startswith("success"): ferry_platform = msg.type.split()[1] next_workers[msg.type].append( NextWorker("nowcast.workers.ping_erddap", args=[f"{ferry_platform}-ferry"]) ) return next_workers[msg.type]
[docs] def after_download_live_ocean(msg, config, checklist): """Calculate the list of workers to launch after the download_live_ocean worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = {"crash": [], "failure": [], "success": []} if msg.type == "success": next_workers["success"].append( NextWorker( "nowcast.workers.make_live_ocean_files", args=["--run-date", list(checklist["Live Ocean products"].keys())[-1]], ) ) return next_workers[msg.type]
[docs] def after_make_live_ocean_files(msg, config, checklist): """Calculate the list of workers to launch after the make_live_ocean_files worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = {"crash": [], "failure": [], "success": []} if msg.type == "success": for host in config["run"]["enabled hosts"]: if not config["run"]["enabled hosts"][host]["shared storage"]: next_workers[msg.type].append( NextWorker( "nowcast.workers.upload_forcing", args=[host, "nowcast+"] ) ) return next_workers[msg.type]
[docs] def after_make_turbidity_file(msg, config, checklist): """Calculate the list of workers to launch after the make_turbidity_file worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = {"crash": [], "failure": [], "success": []} return next_workers[msg.type]
[docs] def after_upload_forcing(msg, config, checklist): """Calculate the list of workers to launch after the upload_forcing worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure nowcast+": [], "failure forecast2": [], "failure ssh": [], "failure turbidity": [], "success nowcast+": [], "success forecast2": [], "success ssh": [], "success turbidity": [], } if not msg.type.startswith("success"): return next_workers[msg.type] forcing_run_type = msg.type.split()[1] run_types = { "forecast2": ("forecast2",), "nowcast+": ("nowcast",), "ssh": ("forecast",), "turbidity": ("nowcast-green", "nowcast-agrif"), }[forcing_run_type] try: host_name = list(msg.payload.keys())[0] host_config = config["run"]["enabled hosts"][host_name] except (AttributeError, IndexError): # Malformed payload - no host name in payload; # upload_forcing worker probably crashed return [] if not host_config["make forcing links"]: return [] for nemo_run_type in run_types: if nemo_run_type in host_config["run types"]: links_run_type = ( nemo_run_type if forcing_run_type == "turbidity" else forcing_run_type ) next_workers[f"success {forcing_run_type}"] = [ NextWorker( "nowcast.workers.make_forcing_links", args=[ host_name, links_run_type, "--run-date", msg.payload[host_name][forcing_run_type]["run date"], ], ) ] return next_workers[msg.type]
[docs] def after_run_NEMO(msg, config, checklist): """Calculate the list of workers to launch after the run_NEMO worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure nowcast": [], "failure nowcast-green": [], "failure forecast": [], "failure forecast2": [], "success nowcast": [], "success nowcast-green": [], "success forecast": [], "success forecast2": [], } if msg.type.startswith("success"): run_type = msg.type.split()[1] host = msg.payload[run_type]["host"] next_workers[msg.type].append( NextWorker("nowcast.workers.watch_NEMO", args=[host, run_type], host=host) ) return next_workers[msg.type]
[docs] def after_watch_NEMO(msg, config, checklist): """Calculate the list of workers to launch after the watch_NEMO worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure nowcast": [], "failure nowcast-green": [], "failure forecast": [], "failure forecast2": [], "success nowcast": [], "success nowcast-green": [], "success forecast": [], "success forecast2": [], } race_condition_workers = {} if msg.type.startswith("success"): run_type = msg.type.split()[1] if run_type == "nowcast": next_workers[msg.type].extend( [ NextWorker( "nowcast.workers.make_forcing_links", args=[ msg.payload["nowcast"]["host"], "ssh", "--run-date", msg.payload[run_type]["run date"], ], ), ## TODO: Add a config switch to control running FVCOM VHFR # NextWorker( # "nowcast.workers.make_fvcom_boundary", # args=[(config["vhfr fvcom runs"]["host"]), "x2", "nowcast"], # host=(config["vhfr fvcom runs"]["host"]), # ), ] ) if run_type == "forecast": host_name = config["wave forecasts"]["host"] next_workers[msg.type].extend( [ NextWorker( "nowcast.workers.make_ww3_wind_file", args=[ host_name, "forecast", "--run-date", msg.payload[run_type]["run date"], ], host=host_name, ), NextWorker( "nowcast.workers.make_ww3_current_file", args=[ host_name, "forecast", "--run-date", msg.payload[run_type]["run date"], ], host=host_name, ), ] ) race_condition_workers = {"make_ww3_wind_file", "make_ww3_current_file"} for host in config["run"]["enabled hosts"]: if not config["run"]["enabled hosts"][host]["shared storage"]: next_workers[msg.type].append( NextWorker( "nowcast.workers.upload_forcing", args=[ host, "turbidity", "--run-date", msg.payload[run_type]["run date"], ], ) ) if run_type == "forecast2": host_name = config["wave forecasts"]["host"] run_date = arrow.get(msg.payload[run_type]["run date"]).shift(days=+1) next_workers[msg.type].extend( [ NextWorker( "nowcast.workers.make_ww3_wind_file", args=[ host_name, "forecast2", "--run-date", run_date.format("YYYY-MM-DD"), ], host=host_name, ), NextWorker( "nowcast.workers.make_ww3_current_file", args=[ host_name, "forecast2", "--run-date", run_date.format("YYYY-MM-DD"), ], host=host_name, ), ] ) race_condition_workers = {"make_ww3_wind_file", "make_ww3_current_file"} enabled_host_config = config["run"]["enabled hosts"][ msg.payload[run_type]["host"] ] if not enabled_host_config["shared storage"]: next_workers[msg.type].append( NextWorker( "nowcast.workers.download_results", args=[ msg.payload[run_type]["host"], run_type, "--run-date", msg.payload[run_type]["run date"], ], ) ) if race_condition_workers: return next_workers[msg.type], race_condition_workers return next_workers[msg.type]
[docs] def after_run_NEMO_agrif(msg, config, checklist): """Calculate the list of workers to launch after the run_NEMO_agrif worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = {"crash": [], "failure": [], "success": []} if msg.type.startswith("success"): host = msg.payload["nowcast-agrif"]["host"] job_id = msg.payload["nowcast-agrif"]["job id"] next_workers[msg.type].append( NextWorker("nowcast.workers.watch_NEMO_agrif", args=[host, job_id]) ) return next_workers[msg.type]
[docs] def after_watch_NEMO_agrif(msg, config, checklist): """Calculate the list of workers to launch after the watch_NEMO_agrif worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = {"crash": [], "failure": [], "success": []} if msg.type == "success": next_workers[msg.type].append( NextWorker( "nowcast.workers.download_results", args=[ msg.payload["nowcast-agrif"]["host"], "nowcast-agrif", "--run-date", msg.payload["nowcast-agrif"]["run date"], ], ) ) return next_workers[msg.type]
[docs] def after_watch_NEMO_hindcast(msg, config, checklist): """Calculate the list of workers to launch after the watch_NEMO_handcast worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = {"crash": [], "failure": [], "success": []} if msg.type == "success": next_workers[msg.type].extend( [ NextWorker( "nowcast.workers.download_results", args=[ msg.payload["hindcast"]["host"], "hindcast", "--run-date", msg.payload["hindcast"]["run date"], ], ), NextWorker( "nowcast.workers.watch_NEMO_hindcast", args=[msg.payload["hindcast"]["host"]], ), NextWorker( "nowcast.workers.run_NEMO_hindcast", args=[msg.payload["hindcast"]["host"]], ), ] ) return next_workers[msg.type]
[docs] def after_run_NEMO_hindcast(msg, config, checklist): """Calculate the list of workers to launch after the run_NEMO_hindcast worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = {"crash": [], "failure": [], "success": []} return next_workers[msg.type]
[docs] def after_make_fvcom_boundary(msg, config, checklist): """Calculate the list of workers to launch after the after_make_fvcom_boundary worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure x2 nowcast": [], "failure r12 nowcast": [], "success x2 nowcast": [], "success r12 nowcast": [], } if msg.type.startswith("success"): run_type = msg.type.split()[2] model_config = msg.payload[run_type]["model config"] run_date = msg.payload[run_type]["run date"] next_workers[msg.type].extend( [ NextWorker( "nowcast.workers.make_fvcom_atmos_forcing", args=[model_config, run_type, "--run-date", run_date], host="localhost", ), NextWorker( "nowcast.workers.make_fvcom_rivers_forcing", args=[ (config["vhfr fvcom runs"]["host"]), model_config, run_type, "--run-date", run_date, ], host=(config["vhfr fvcom runs"]["host"]), ), ] ) return next_workers[msg.type]
[docs] def after_make_fvcom_rivers_forcing(msg, config, checklist): """Calculate the list of workers to launch after the after_make_fvcom_rivers_forcing worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure x2 nowcast": [], "failure r12 nowcast": [], "success x2 nowcast": [], "success r12 nowcast": [], } return []
[docs] def after_make_fvcom_atmos_forcing(msg, config, checklist): """Calculate the list of workers to launch after the after_make_fvcom_atmos_forcing worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure x2 nowcast": [], "failure r12 nowcast": [], "success x2 nowcast": [], "success r12 nowcast": [], } if msg.type.startswith("success"): host_name = config["vhfr fvcom runs"]["host"] run_type = msg.type.split()[2] model_config = msg.payload[run_type]["model config"] run_date = msg.payload[run_type]["run date"] next_workers[msg.type].append( NextWorker( "nowcast.workers.upload_fvcom_atmos_forcing", args=[host_name, model_config, run_type, "--run-date", run_date], ) ) return next_workers[msg.type]
[docs] def after_upload_fvcom_atmos_forcing(msg, config, checklist): """Calculate the list of workers to launch after the after_upload_fvcom_atmos_forcing worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure x2 nowcast": [], "failure r12 nowcast": [], "success x2 nowcast": [], "success r12 nowcast": [], } if msg.type.startswith("success"): host_name = config["vhfr fvcom runs"]["host"] run_type = msg.type.split()[2] model_config = msg.payload[host_name][run_type]["model config"] run_date = msg.payload[host_name][run_type]["run date"] next_workers[msg.type].append( NextWorker( "nowcast.workers.run_fvcom", args=[host_name, model_config, run_type, "--run-date", run_date], host=host_name, ) ) return next_workers[msg.type]
[docs] def after_run_fvcom(msg, config, checklist): """Calculate the list of workers to launch after the after_run_fvcom worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure x2 nowcast": [], "failure r12 nowcast": [], "success x2 nowcast": [], "success r12 nowcast": [], } if msg.type.startswith("success"): _, model_config, run_type = msg.type.split() host_name = msg.payload[f"{model_config} {run_type}"]["host"] next_workers[msg.type].append( NextWorker( "nowcast.workers.watch_fvcom", args=[host_name, model_config, run_type], host=host_name, ) ) return next_workers[msg.type]
[docs] def after_watch_fvcom(msg, config, checklist): """Calculate the list of workers to launch after the after_watch_fvcom worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure x2 nowcast": [], "failure r12 nowcast": [], "success x2 nowcast": [], "success r12 nowcast": [], } if msg.type.startswith("success"): _, model_config, run_type = msg.type.split() next_workers[msg.type].append( NextWorker( "nowcast.workers.download_fvcom_results", args=[ msg.payload[f"{model_config} {run_type}"]["host"], model_config, run_type, "--run-date", msg.payload[f"{model_config} {run_type}"]["run date"], ], ) ) if run_type == "nowcast" and model_config == "x2": next_workers[msg.type].append( NextWorker( "nowcast.workers.make_fvcom_boundary", args=[ (config["vhfr fvcom runs"]["host"]), "r12", "nowcast", "--run-date", msg.payload[f"{model_config} {run_type}"]["run date"], ], host=(config["vhfr fvcom runs"]["host"]), ) ) return next_workers[msg.type]
[docs] def after_make_ww3_wind_file(msg, config, checklist): """Calculate the list of workers to launch after the after_make_ww3_wind_file worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ return []
[docs] def after_make_ww3_current_file(msg, config, checklist): """Calculate the list of workers to launch after the after_make_ww3_current_file worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure forecast2": [], "failure nowcast": [], "failure forecast": [], "success forecast2": [], "success nowcast": [], "success forecast": [], } if msg.type.startswith("success"): host_name = config["wave forecasts"]["host"] run_type = msg.type.split()[1] run_type = "nowcast" if run_type == "forecast" else run_type next_workers[msg.type].append( NextWorker( "nowcast.workers.run_ww3", # We make the current files for the period of nowcast+forecast, # but run nowcast then forecast separately args=[ host_name, run_type, "--run-date", msg.payload["run date"], ], host=host_name, ) ) return next_workers[msg.type]
[docs] def after_run_ww3(msg, config, checklist): """Calculate the list of workers to launch after the after_run_ww3 worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure forecast2": [], "failure nowcast": [], "failure forecast": [], "success forecast2": [], "success nowcast": [], "success forecast": [], } if msg.type.startswith("success"): run_type = msg.type.split()[1] host = msg.payload[run_type]["host"] next_workers[msg.type].append( NextWorker("nowcast.workers.watch_ww3", args=[host, run_type], host=host) ) return next_workers[msg.type]
[docs] def after_watch_ww3(msg, config, checklist): """Calculate the list of workers to launch after the watch_ww3 worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure forecast2": [], "failure nowcast": [], "failure forecast": [], "success forecast2": [], "success nowcast": [], "success forecast": [], } if msg.type.startswith("success"): run_type = msg.type.split()[1] next_workers[msg.type].append( NextWorker( "nowcast.workers.download_wwatch3_results", args=[ msg.payload[run_type]["host"], run_type, "--run-date", msg.payload[run_type]["run date"], ], ) ) if run_type == "nowcast": pass next_workers[msg.type].append( NextWorker( "nowcast.workers.run_ww3", args=[ msg.payload[run_type]["host"], "forecast", "--run-date", msg.payload[run_type]["run date"], ], host=msg.payload[run_type]["host"], ) ) return next_workers[msg.type]
[docs] def after_download_results(msg, config, checklist): """Calculate the list of workers to launch after the download_results worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure nowcast": [], "failure nowcast-green": [], "failure forecast": [], "failure forecast2": [], "failure hindcast": [], "failure nowcast-agrif": [], "success nowcast": [], "success nowcast-green": [], "success forecast": [], "success forecast2": [], "success hindcast": [], "success nowcast-agrif": [], } if msg.type.startswith("success"): run_type = msg.type.split()[1] run_date = msg.payload[run_type]["run date"] if run_type == "hindcast": next_workers[msg.type].append( NextWorker("nowcast.workers.split_results", args=[run_type, run_date]) ) return next_workers[msg.type] if run_type.startswith("nowcast"): next_workers[msg.type].append( NextWorker( "nowcast.workers.make_plots", args=["nemo", run_type, "research", "--run-date", run_date], ) ) if run_type == "nowcast": compare_date = arrow.get(run_date).shift(days=-1).format("YYYY-MM-DD") next_workers[msg.type].extend( [ NextWorker( "nowcast.workers.make_plots", args=[ "nemo", run_type, "comparison", "--run-date", compare_date, ], ), NextWorker( "nowcast.workers.make_CHS_currents_file", args=[run_type, "--run-date", run_date], ), ] ) if run_type == "nowcast-green": next_workers[msg.type].append( NextWorker("nowcast.workers.ping_erddap", args=["nowcast-green"]), ) for var_group in {"biology", "chemistry", "physics"}: next_workers[msg.type].append( NextWorker( "nowcast.workers.make_averaged_dataset", args=["day", var_group, "--run-date", run_date], ) ) if arrow.get(run_date).shift(days=+1).day == 1: yyyymmm = arrow.get(run_date).format("YYYY-MMM").lower() next_workers[msg.type].append( NextWorker( "nowcast.workers.archive_tarball", args=["nowcast-green", yyyymmm, "robot.graham"], ) ) return next_workers[msg.type] if run_type.startswith("forecast"): next_workers[msg.type].append( NextWorker( "nowcast.workers.make_CHS_currents_file", args=[run_type, "--run-date", run_date], ) ) return next_workers[msg.type]
[docs] def after_make_averaged_dataset(msg, config, checklist): """Calculate the list of workers to launch after the make_averaged_dataset worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure day biology": [], "failure day chemistry": [], "failure day physics": [], "failure month biology": [], "failure month chemistry": [], "failure month physics": [], "success day biology": [], "success day chemistry": [], "success day physics": [], "success month biology": [], "success month chemistry": [], "success month physics": [], } if msg.type.startswith("success day"): *_, reshapr_var_group = msg.type.split() run_date = arrow.get(msg.payload[f"day {reshapr_var_group}"]["run date"]) if run_date.shift(days=+1).day == 1: first_of_month = run_date.format("YYYY-MM-01") next_workers[msg.type].append( NextWorker( "nowcast.workers.make_averaged_dataset", args=["month", reshapr_var_group, "--run-date", first_of_month], host="localhost", ) ) return next_workers[msg.type]
[docs] def after_archive_tarball(msg, config, checklist): """Calculate the list of workers to launch after the archive_tarball worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ return []
[docs] def after_make_CHS_currents_file(msg, config, checklist): """Calculate the list of workers to launch after the make_CHS_currents_file worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure nowcast": [], "failure forecast": [], "failure forecast2": [], "success nowcast": [], "success forecast": [], "success forecast2": [], } if msg.type.startswith("success forecast"): run_type = msg.type.split()[1] run_date = msg.payload[run_type]["run date"] next_workers[msg.type].append( NextWorker( "nowcast.workers.update_forecast_datasets", args=["nemo", run_type, "--run-date", run_date], ) ) return next_workers[msg.type]
[docs] def after_split_results(msg, config, checklist): """Calculate the list of workers to launch after the split_results worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure hindcast": [], "success hindcast": [], } if msg.type.startswith("success"): if config["results tarballs"]["archive hindcast"]: last_date = max(map(arrow.get, msg.payload)) if arrow.get(last_date).shift(days=+1).day == 1: yyyymmm = arrow.get(last_date).format("YYYY-MMM").lower() next_workers[msg.type].append( NextWorker( "nowcast.workers.archive_tarball", args=["hindcast", yyyymmm, "robot.graham"], ) ) return next_workers[msg.type]
[docs] def after_download_wwatch3_results(msg, config, checklist): """Calculate the list of workers to launch after the download_wwatch3_results worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure forecast2": [], "failure nowcast": [], "failure forecast": [], "success forecast2": [], "success nowcast": [], "success forecast": [], } if msg.type.startswith("success"): run_type = msg.type.split()[1] run_date = checklist["WWATCH3 run"][run_type]["run date"] if run_type.startswith("forecast"): next_workers[msg.type].append( NextWorker( "nowcast.workers.update_forecast_datasets", args=["wwatch3", run_type, "--run-date", run_date], ) ) return next_workers[msg.type]
[docs] def after_download_fvcom_results(msg, config, checklist): """Calculate the list of workers to launch after the download_fvcom_results worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure x2 nowcast": [], "failure r12 nowcast": [], "success x2 nowcast": [], "success r12 nowcast": [], } if msg.type.startswith("success"): run_type = msg.type.split()[2] run_date = msg.payload[run_type]["run date"] model_config = msg.payload[run_type]["model config"] next_workers[msg.type].extend( [ NextWorker( "nowcast.workers.get_vfpa_hadcp", args=["--data-date", run_date] ), NextWorker( "nowcast.workers.make_plots", args=[ "fvcom", f"{run_type}-{model_config}", "research", "--run-date", run_date, ], ), ] ) if run_type == "nowcast": next_workers[msg.type].append( NextWorker( "nowcast.workers.ping_erddap", args=[f"fvcom-{model_config}-nowcast"], ) ) return next_workers[msg.type]
[docs] def after_get_vfpa_hadcp(msg, config, checklist): """Calculate the list of workers to launch after the after_get_vfpa_hadcp worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = {"crash": [], "failure": [], "success": []} if msg.type.startswith("success"): next_workers[msg.type].append( NextWorker("nowcast.workers.ping_erddap", args=["VFPA-HADCP"]) ) return next_workers[msg.type]
[docs] def after_update_forecast_datasets(msg, config, checklist): """Calculate the list of workers to launch after the update_forecast_datasets worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure nemo forecast": [], "failure nemo forecast2": [], "failure wwatch3 forecast": [], "failure wwatch3 forecast2": [], "success nemo forecast": [], "success nemo forecast2": [], "success wwatch3 forecast": [], "success wwatch3 forecast2": [], } if msg.type.startswith("success"): model = msg.type.split()[1] run_type = msg.type.split()[2] run_date = checklist[f"{model.upper()} run"][run_type]["run date"] next_workers[msg.type].append( NextWorker("nowcast.workers.ping_erddap", args=[f"{model}-forecast"]) ) if model == "nemo": next_workers[msg.type].extend( [ NextWorker( "nowcast.workers.make_plots", args=["nemo", run_type, "publish", "--run-date", run_date], ), NextWorker( "nowcast.workers.make_surface_current_tiles", args=[run_type, "--run-date", run_date], ), ] ) return next_workers[msg.type]
[docs] def after_ping_erddap(msg, config, checklist): """Calculate the list of workers to launch after the ping_erddap worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "success weather": [], "failure weather": [], "success SCVIP-CTD": [], "failure SCVIP-CTD": [], "success SEVIP-CTD": [], "failure SEVIP-CTD": [], "success USDDL-CTD": [], "failure USDDL-CTD": [], "success TWDP-ferry": [], "failure TWDP-ferry": [], "success VFPA-HADCP": [], "failure VFPA-HADCP": [], "success nowcast-green": [], "failure nowcast-green": [], "success nemo-forecast": [], "failure nemo-forecast": [], "success wwatch3-forecast": [], "failure wwatch3-forecast": [], "success fvcom-x2-nowcast": [], "failure fvcom-x2-nowcast": [], "success fvcom-r12-nowcast": [], "failure fvcom-r12-nowcast": [], } if msg.type == "success wwatch3-forecast": run_types = checklist["WWATCH3 run"].keys() run_type = "forecast2" if "forecast2" in run_types else "forecast" run_date = checklist["WWATCH3 run"][run_type]["run date"] next_workers[msg.type].append( NextWorker( "nowcast.workers.make_plots", args=["wwatch3", run_type, "publish", "--run-date", run_date], ) ) if msg.type == "success VFPA-HADCP": try: keys = checklist["FVCOM run"].keys() except KeyError: # "FVCOM run" is only in the checklist after runs. # If it's too early in the day, just return. return next_workers[msg.type] for key in keys: model_config, run_type = key.split() run_date = checklist["FVCOM run"][f"{model_config} {run_type}"]["run date"] if ( model_config == "r12" and "completed" in checklist["FVCOM run"]["r12 nowcast"] ): next_workers[msg.type] = [ NextWorker( "nowcast.workers.make_plots", args=[ "fvcom", "nowcast-r12", "publish", "--run-date", run_date, ], ) ] break if "completed" not in checklist["FVCOM run"][f"{model_config} {run_type}"]: continue next_workers[msg.type].append( NextWorker( "nowcast.workers.make_plots", args=[ "fvcom", f"{run_type}-{model_config}", "publish", "--run-date", run_date, ], ) ) return next_workers[msg.type]
[docs] def after_make_plots(msg, config, checklist): """Calculate the list of workers to launch after the make_plots worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure nemo nowcast research": [], "failure nemo nowcast comparison": [], "failure nemo nowcast publish": [], "failure nemo nowcast-green research": [], "failure nemo nowcast-agrif research": [], "failure nemo forecast publish": [], "failure nemo forecast2 publish": [], "failure fvcom nowcast-x2 publish": [], "failure fvcom nowcast-r12 publish": [], "failure fvcom nowcast-x2 research": [], "failure fvcom nowcast-r12 research": [], "failure wwatch3 forecast publish": [], "failure wwatch3 forecast2 publish": [], "success nemo nowcast research": [], "success nemo nowcast comparison": [], "success nemo nowcast publish": [], "success nemo nowcast-green research": [], "success nemo nowcast-agrif research": [], "success nemo forecast publish": [], "success nemo forecast2 publish": [], "success fvcom nowcast-x2 publish": [], "success fvcom nowcast-r12 publish": [], "success fvcom nowcast-x2 research": [], "success fvcom nowcast-r12 research": [], "success wwatch3 forecast publish": [], "success wwatch3 forecast2 publish": [], } if msg.type.startswith("success"): _, model, run_type, _ = msg.type.split() if model == "nemo" and "forecast" in run_type: next_workers[msg.type].append( NextWorker( "nowcast.workers.make_feeds", args=[ run_type, "--run-date", checklist["NEMO run"][run_type]["run date"], ], ) ) return next_workers[msg.type]
[docs] def after_make_surface_current_tiles(msg, config, checklist): """Calculate the list of workers to launch after the make_surface_current_tiles worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ return []
[docs] def after_make_feeds(msg, config, checklist): """Calculate the list of workers to launch after the make_feeds worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure forecast": [], "failure forecast2": [], "success forecast": [], "success forecast2": [NextWorker("nemo_nowcast.workers.clear_checklist")], } return next_workers[msg.type]
[docs] def after_clear_checklist(msg, config, checklist): """Calculate the list of workers to launch after the clear_checklist worker ends. :arg msg: Nowcast system message. :type msg: :py:class:`nemo_nowcast.message.Message` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ next_workers = { "crash": [], "failure": [], "success": [NextWorker("nemo_nowcast.workers.rotate_logs")], } return next_workers[msg.type]
[docs] def after_rotate_logs(msg, config, checklist): """Calculate the list of workers to launch after the rotate_logs worker ends, but it is an empty list because rotate_logs is the last worker in the daily automation cycle. :arg msg: Nowcast system message. :type msg: :py:class:`collections.namedtuple` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ return []
[docs] def after_rotate_hindcast_logs(msg, config, checklist): """Calculate the list of workers to launch after the rotate_hindcast_logs worker ends, but it is an empty list because rotate_hindcast_logs is a maintenance tool that is outside the flow of automation. :arg msg: Nowcast system message. :type msg: :py:class:`collections.namedtuple` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ return []
[docs] def after_launch_remote_worker(msg, config, checklist): """Calculate the list of workers to launch after the launch_remote_worker worker ends, but it is an empty list because launch_remote_worker is a maintenance tool that is outside the flow of automation. :arg msg: Nowcast system message. :type msg: :py:class:`collections.namedtuple` :arg config: :py:class:`dict`-like object that holds the nowcast system configuration that is loaded from the system configuration file. :type config: :py:class:`nemo_nowcast.config.Config` :arg dict checklist: System checklist: data structure containing the present state of the nowcast system. :returns: Worker(s) to launch next :rtype: list """ return []