Source code for nowcast.workers.run_NEMO_agrif

#  Copyright 2013 – present by the SalishSeaCast Project contributors
#  and The University of British Columbia
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
"""SalishSeaCast worker that repares the YAML run description file and
bash run script for a NEMO SMELT AGRIF run on an HPC cluster that uses the
TORQUE/MOAB scheduler, and queues the run.
"""
import logging
import os
import tempfile
from pathlib import Path
from types import SimpleNamespace

import arrow
import f90nml
import yaml
from nemo_nowcast import NowcastWorker, WorkerError

from nowcast import ssh_sftp

NAME = "run_NEMO_agrif"
logger = logging.getLogger(NAME)


[docs] def main(): """Set up and run the worker. For command-line usage see: :command:`python -m nowcast.workers.run_NEMO_agrif --help` """ worker = NowcastWorker(NAME, description=__doc__) worker.init_cli() worker.cli.add_argument("host_name", help="Name of the host to queue the run on") worker.cli.add_argument( "run_type", choices={"nowcast-agrif"}, help=""" Type of run to execute: 'nowcast-agrif' means nowcast green ocean run with AGRIF sub-grids """, ) worker.cli.add_date_option( "--run-date", default=arrow.now().floor("day"), help="Date to execute the run for.", ) worker.run(run_NEMO_agrif, success, failure)
def success(parsed_args): """ :param :py:class:`argparse.Namespace` parsed_args: :return: Nowcast system message type :rtype: str """ logger.info( f"NEMO nowcast-agrif run for " f'{parsed_args.run_date.format("YYYY-MM-DD")} ' f"queued on {parsed_args.host_name}" ) msg_type = "success" return msg_type def failure(parsed_args): """ :param :py:class:`argparse.Namespace` parsed_args: :return: Nowcast system message type :rtype: str """ logger.critical( f"NEMO nowcast-agrif run for " f'{parsed_args.run_date.format("YYYY-MM-DD")} ' f"failed to queue on {parsed_args.host_name}" ) msg_type = "failure" return msg_type def run_NEMO_agrif(parsed_args, config, *args): """ :param :py:class:`argparse.Namespace` parsed_args: :param :py:class:`nemo_nowcast.Config` config: :return: Nowcast system checklist items :rtype: dict """ host_name = parsed_args.host_name run_date = parsed_args.run_date ssh_key = Path( os.environ["HOME"], ".ssh", config["run"]["enabled hosts"][host_name]["ssh key"] ) run_id = f'{run_date.format("DDMMMYY").lower()}nowcast-agrif' try: ssh_client, sftp_client = ssh_sftp.sftp(host_name, ssh_key) prev_run_namelists_info = _get_prev_run_namelists_info( sftp_client, host_name, run_date.shift(days=-1), config ) _edit_namelist_times( sftp_client, host_name, prev_run_namelists_info, run_date, config ) _edit_run_desc( sftp_client, host_name, prev_run_namelists_info, run_id, run_date, config ) run_dir, job_id = _launch_run(ssh_client, host_name, run_id, config) finally: sftp_client.close() ssh_client.close() checklist = { "nowcast-agrif": { "host": host_name, "run id": run_id, "run dir": run_dir, "job id": job_id, "run date": run_date.format("YYYY-MM-DD"), } } return checklist def _get_prev_run_namelists_info(sftp_client, host_name, prev_run_date, config): """ :param :py:class:`paramiko.sftp_client.SFTPClient` sftp_client: :param str host_name: :param :py:class:`arrow.Arrow` prev_run_date: :param :py:class:`nemo_nowcast.Config` config: :return: Namespace of run timing info from previous run namelists: itend: last time step number in full domain rdt: time step in seconds in full domain 1_rdt: time step in seconds in 1st sub-grid 2_rdt: time step in seconds in 2nd sub-grid :rtype: :py:class:`types.SimpleNamespace` """ scratch_dir = Path(config["run"]["enabled hosts"][host_name]["scratch dir"]) dmy = prev_run_date.format("DDMMMYY").lower() prev_namelist_cfgs = ["namelist_cfg", "1_namelist_cfg"] prev_run_namelists_info = SimpleNamespace() for i, namelist in enumerate(prev_namelist_cfgs): prev_namelist_cfg = scratch_dir / dmy / namelist with tempfile.NamedTemporaryFile("wt") as namelist_cfg: sftp_client.get(os.fspath(prev_namelist_cfg), namelist_cfg.name) logger.debug(f"downloaded {host_name}:{prev_namelist_cfg}") namelist = f90nml.read(namelist_cfg.name) if i == 0: prev_run_namelists_info.itend = namelist["namrun"]["nn_itend"] prev_run_namelists_info.rdt = namelist["namdom"]["rn_rdt"] else: setattr( prev_run_namelists_info, f"{i}_rdt", namelist["namdom"]["rn_rdt"] ) return prev_run_namelists_info def _edit_namelist_times( sftp_client, host_name, prev_run_namelists_info, run_date, config ): """ :param :py:class:`paramiko.sftp_client.SFTPClient` sftp_client: :param str host_name: :param :py:class:`types.SimpleNamespace` prev_run_namelists_info: :param :py:class:`arrow.Arrow` run_date: :param :py:class:`nemo_nowcast.Config` config: """ itend = prev_run_namelists_info.itend + 24 * 60 * 60 / prev_run_namelists_info.rdt patches = { "namelist.time": { "namrun": { "nn_it000": prev_run_namelists_info.itend + 1, "nn_itend": int(itend), "nn_date0": int(run_date.format("YYYYMMDD")), } }, "namelist.time.BS": {"namrun": {"nn_date0": int(run_date.format("YYYYMMDD"))}}, } run_prep_dir = Path(config["run"]["enabled hosts"][host_name]["run prep dir"]) for i, namelist in enumerate(patches): sftp_client.get( os.fspath(run_prep_dir / namelist), f"/tmp/nowcast-agrif.{namelist}" ) logger.debug(f"downloaded {host_name}:{run_prep_dir/namelist}") f90nml.patch( f"/tmp/nowcast-agrif.{namelist}", patches[namelist], f"/tmp/patched_nowcast-agrif.{namelist}", ) logger.debug(f"patched {namelist}") sftp_client.put( f"/tmp/patched_nowcast-agrif.{namelist}", os.fspath(run_prep_dir / namelist) ) logger.debug(f"uploaded new {host_name}:{run_prep_dir/namelist}") def _edit_run_desc( sftp_client, host_name, prev_run_namelists_info, run_id, run_date, config, yaml_tmpl="/tmp/nowcast-agrif_template.yaml", ): """ :param :py:class:`paramiko.sftp_client.SFTPClient` sftp_client: :param str host_name: :param :py:class:`types.SimpleNamespace` prev_run_namelists_info: :param str run_id: :param :py:class:`arrow.Arrow` run_date: :param :py:class:`nemo_nowcast.Config` config: :param str yaml_tmpl: """ run_prep_dir = Path(config["run"]["enabled hosts"][host_name]["run prep dir"]) sftp_client.get( f"{run_prep_dir}/nowcast-agrif_template.yaml", "/tmp/nowcast-agrif_template.yaml", ) logger.debug(f"downloaded {host_name}:{run_prep_dir}/nowcast-agrif_template.yaml") with Path(yaml_tmpl).open("rt") as run_desc_tmpl: run_desc = yaml.safe_load(run_desc_tmpl) run_desc["run_id"] = run_id logger.debug(f"set run_id to {run_id}") scratch_dir = Path(config["run"]["enabled hosts"][host_name]["scratch dir"]) prev_run_dir = scratch_dir / (run_date.shift(days=-1).format("DDMMMYY").lower()) restart_file = ( f"{prev_run_dir}/SalishSea_{prev_run_namelists_info.itend:08d}_restart.nc" ) run_desc["restart"]["restart.nc"] = restart_file logger.debug(f"set restart.nc to {restart_file}") restart_trc_file = ( f"{prev_run_dir}/" f"SalishSea_{prev_run_namelists_info.itend:08d}_restart_trc.nc" ) run_desc["restart"]["restart_trc.nc"] = restart_trc_file logger.debug(f"set restart_trc.nc to {restart_trc_file}") for i in range(1, 2): itend = int( prev_run_namelists_info.itend * prev_run_namelists_info.rdt / getattr(prev_run_namelists_info, f"{i}_rdt") ) restart_file = f"{prev_run_dir}/{i}_SalishSea_{itend:08d}_restart.nc" run_desc["restart"][f"AGRIF_{i}"]["restart.nc"] = restart_file logger.debug(f"set AGRIF_{i} restart.nc to {restart_file}") restart_trc_file = f"{prev_run_dir}/{i}_SalishSea_{itend:08d}_restart_trc.nc" run_desc["restart"][f"AGRIF_{i}"]["restart_trc.nc"] = restart_trc_file logger.debug(f"set AGRIF_{i} restart_trc.nc to {restart_trc_file}") with Path(yaml_tmpl).open("wt") as run_desc_tmpl: yaml.safe_dump(run_desc, run_desc_tmpl, default_flow_style=False) sftp_client.put("/tmp/nowcast-agrif_template.yaml", f"{run_prep_dir}/{run_id}.yaml") logger.debug(f"uploaded {host_name}:{run_prep_dir}/{run_id}.yaml") def _launch_run(ssh_client, host_name, run_id, config): """ :param :py:class:`paramiko.client.SSHClient` :param str host_name: :param str run_id: :param :py:class:`nemo_nowcast.Config` config: :returns: Job id from TORQUE/MOAD resource manager :rtype: str """ salishsea_cmd = config["run"]["enabled hosts"][host_name]["salishsea cmd"] run_prep_dir = Path(config["run"]["enabled hosts"][host_name]["run prep dir"]) run_desc = run_prep_dir / f"{run_id}.yaml" scratch_dir = Path(config["run"]["enabled hosts"][host_name]["scratch dir"]) results_dir = scratch_dir / run_id[:7] cmd = f"{salishsea_cmd} run {run_desc} {results_dir} --debug" logger.debug(f"launching run on {host_name}: {cmd}") try: stdout = ssh_sftp.ssh_exec_command(ssh_client, cmd, host_name, logger) except ssh_sftp.SSHCommandError as exc: for line in exc.stderr.splitlines(): logger.error(line) raise WorkerError run_dir = stdout.splitlines()[-3].split()[-1] logger.debug(f"temporary run dir: {host_name}:{run_dir}") job_id = stdout.splitlines()[-2].split()[-1] logger.info(f"job id for {run_id}: {job_id}") return run_dir, job_id if __name__ == "__main__": main() # pragma: no cover