Source code for nowcast.workers.run_NEMO

#  Copyright 2013 – present by the SalishSeaCast Project contributors
#  and The University of British Columbia
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
"""SalishSeaCast worker that prepares the YAML run description file and bash run script
for a nowcast, nowcast-green, forecast or forecast2 run on the ONC cloud, and launches the run.
"""
import logging
import os
import shlex
import subprocess
from pathlib import Path

import arrow
import f90nml
import nemo_cmd.prepare
import salishsea_cmd.api
import salishsea_cmd.run
import yaml
from nemo_nowcast import NowcastWorker, WorkerError

from nowcast import lib

NAME = "run_NEMO"
logger = logging.getLogger(NAME)


[docs] def main(): """Set up and run the worker. For command-line usage see: :command:`python -m nowcast.workers.run_NEMO --help` """ worker = NowcastWorker(NAME, description=__doc__) worker.init_cli() worker.cli.add_argument("host_name", help="Name of the host to execute the run on") worker.cli.add_argument( "run_type", choices={"nowcast", "nowcast-green", "forecast", "forecast2"}, help=""" Type of run to execute: 'nowcast' means nowcast physics run, 'nowcast-green' means nowcast green ocean run, 'forecast' means updated forecast run, 'forecast2' means preliminary forecast run, """, ) worker.cli.add_date_option( "--run-date", default=arrow.now().floor("day"), help="Date to execute the run for.", ) worker.run(run_NEMO, success, failure)
def success(parsed_args): """ :param :py:class:`argparse.Namespace` parsed_args: :return: Nowcast system message type :rtype: str """ logger.info( f"{parsed_args.run_type} NEMO run for " f'{parsed_args.run_date.format("YYYY-MM-DD")} ' f"on {parsed_args.host_name} started" ) msg_type = f"success {parsed_args.run_type}" return msg_type def failure(parsed_args): """ :param :py:class:`argparse.Namespace` parsed_args: :return: Nowcast system message type :rtype: str """ logger.critical( f"{parsed_args.run_type} NEMO run for " f'{parsed_args.run_date.format("YYYY-MM-DD")} ' f"on {parsed_args.host_name} failed" ) msg_type = f"failure {parsed_args.run_type}" return msg_type def run_NEMO(parsed_args, config, tell_manager): """ :param :py:class:`argparse.Namespace` parsed_args: :param :py:class:`nemo_nowcast.Config` config: :param :py:func:`nemo_nowcast.NowcastWorker.tell_manager` tell_manager: :return: Nowcast system checklist items :rtype: dict """ host_name = parsed_args.host_name run_type = parsed_args.run_type run_date = parsed_args.run_date if not run_type.startswith("nowcast"): try: run_info = tell_manager("need", "NEMO run").payload run_date = arrow.get(run_info["nowcast"]["run date"]) except AttributeError: # nowcast run date is unavailable, so use run date from command-line pass run_desc_filepath = _create_run_desc_file(run_date, run_type, host_name, config) run_dir = Path(salishsea_cmd.api.prepare(run_desc_filepath)) logger.debug(f"{run_type}: temporary run directory: {run_dir}") run_script_filepath = _create_run_script( run_date, run_type, run_dir, run_desc_filepath, host_name, config ) run_desc_filepath.unlink() run_exec_cmd, run_id = _launch_run_script( run_type, run_script_filepath, host_name, config ) return { run_type: { "host": host_name, "run dir": os.fspath(run_dir), "run exec cmd": run_exec_cmd, "run id": run_id, "run date": run_date.format("YYYY-MM-DD"), } } def _create_run_desc_file(run_date, run_type, host_name, config): dmy = run_date.format("DDMMMYY").lower() run_id = f"{dmy}{run_type}" run_days = { "nowcast": run_date, "nowcast-green": run_date, "forecast": run_date.shift(days=1), "forecast2": run_date.shift(days=2), } run_duration = config["run types"][run_type]["duration"] host_config = config["run"]["enabled hosts"][host_name] run_prep_dir = Path(host_config["run prep dir"]) restart_timestep = _update_time_namelist( run_date, run_type, run_duration, host_config, run_prep_dir ) run_desc = _run_description( run_days[run_type], run_type, run_id, restart_timestep, host_name, config ) run_desc_filepath = run_prep_dir / f"{run_id}.yaml" with run_desc_filepath.open("wt") as f: yaml.safe_dump(run_desc, f, default_flow_style=False) logger.debug(f"{run_type}: run description file: {run_desc_filepath}") return run_desc_filepath def _update_time_namelist(run_date, run_type, run_duration, host_config, run_prep_dir): prev_runs = { # run-type: based-on run-type, date offset "nowcast": ("nowcast", -1), "nowcast-green": ("nowcast-green", -1), "forecast": ("nowcast", 0), "forecast2": ("forecast", 0), } prev_run_type, date_offset = prev_runs[run_type] results_dir = Path(host_config["run types"][prev_run_type]["results"]) dmy = run_date.shift(days=date_offset).format("DDMMMYY").lower() prev_run_namelist = f90nml.read(results_dir / dmy / "namelist_cfg") prev_it000 = prev_run_namelist["namrun"]["nn_it000"] rdt = prev_run_namelist["namdom"]["rn_rdt"] timesteps_per_day = 86400 / rdt namelist_time = run_prep_dir / "namelist.time" ## TODO: Refactor to use f90nml.patch() with namelist_time.open("rt") as f: lines = f.readlines() new_lines, restart_timestep = _calc_new_namelist_lines( run_date, run_type, run_duration, prev_it000, timesteps_per_day, lines ) with namelist_time.open("wt") as f: f.writelines(new_lines) return restart_timestep def _calc_new_namelist_lines( run_date, run_type, run_duration, prev_it000, timesteps_per_day, lines ): it000_line, it000 = _get_namelist_value("nn_it000", lines) itend_line, itend = _get_namelist_value("nn_itend", lines) new_it000 = int(prev_it000 + timesteps_per_day) lines[it000_line] = lines[it000_line].replace(it000, str(new_it000)) restart_timestep = int((prev_it000 - 1) + int(run_duration) * timesteps_per_day) new_itend = int(restart_timestep + (run_duration * timesteps_per_day)) lines[itend_line] = lines[itend_line].replace(itend, str(new_itend)) date0_line, date0 = _get_namelist_value("nn_date0", lines) run_date_offset = { "nowcast": 0, "nowcast-green": 0, "forecast": 1, "forecast2": 2, } new_date0 = run_date.shift(days=run_date_offset[run_type]) lines[date0_line] = lines[date0_line].replace(date0, new_date0.format("YYYYMMDD")) stocklist_line, stocklist = _get_namelist_value("nn_stocklist", lines) next_restart_timestep = int( restart_timestep + int(run_duration) * timesteps_per_day ) lines[stocklist_line] = lines[stocklist_line].replace( stocklist, f"{next_restart_timestep}," ) return lines, restart_timestep def _get_namelist_value(key, lines): line_index = [ i for i, line in enumerate(lines) if line.strip() and line.split()[0] == key ][-1] value = lines[line_index].split()[2] return line_index, value def _run_description(run_date, run_type, run_id, restart_timestep, host_name, config): host_config = config["run"]["enabled hosts"][host_name] restart_from = config["run types"][run_type]["restart from"] try: restart_dir = Path(host_config["run types"][restart_from]["results"]) except KeyError: logger.critical( f"no results directory to get {restart_from} restart file from " f"in {host_name} run config" ) raise WorkerError prev_run_dmys = { # run-type: previous run's ddmmmyy results directory name "nowcast": run_date.shift(days=-1).format("DDMMMYY").lower(), "nowcast-green": run_date.shift(days=-1).format("DDMMMYY").lower(), "forecast": run_date.shift(days=-1).format("DDMMMYY").lower(), "forecast2": run_date.shift(days=-2).format("DDMMMYY").lower(), } restart_filepaths = { "restart.nc": os.fspath( Path( restart_dir / prev_run_dmys[run_type] / f"SalishSea_{restart_timestep:08d}_restart.nc" ).resolve() ) } if run_type == "nowcast-green": restart_filepaths["restart_trc.nc"] = os.fspath( Path( restart_dir / prev_run_dmys[run_type] / f"SalishSea_{restart_timestep:08d}_restart_trc.nc" ).resolve() ) run_prep_dir = Path(host_config["run prep dir"]) bottom_friction_mask = Path(host_config["forcing"]["bottom friction mask"]) forcing = { "NEMO-atmos": {"link to": os.fspath((run_prep_dir / "NEMO-atmos").resolve())}, "ssh": {"link to": os.fspath((run_prep_dir / "ssh/").resolve())}, "tides": {"link to": os.fspath((run_prep_dir / "tides/").resolve())}, "tracers": {"link to": os.fspath((run_prep_dir / "tracers/").resolve())}, "LiveOcean": {"link to": os.fspath((run_prep_dir / "LiveOcean/").resolve())}, "rivers": {"link to": os.fspath((run_prep_dir / "rivers/").resolve())}, "river_turb": {"link to": os.fspath((run_prep_dir / "rivers/").resolve())}, "bfr_coef.nc": {"link to": os.fspath(bottom_friction_mask)}, "grid": {"link to": os.fspath((run_prep_dir / "grid/").resolve())}, "rivers-climatology": { "link to": os.fspath((run_prep_dir / "rivers-climatology/").resolve()) }, } run_sets_dir = Path(host_config["run types"][run_type]["run sets dir"]) namelists = { "namelist_cfg": [os.fspath((run_prep_dir / "namelist.time").resolve())] } namelist_sections = ( "namelist.domain", "namelist.atmos_rivers", "namelist.light", "namelist.lateral", "namelist.bottom", "namelist.tracer", "namelist.dynamics", "namelist.vertical", "namelist.compute", ) namelists["namelist_cfg"].extend( [ os.fspath((run_sets_dir / namelist).resolve()) for namelist in namelist_sections ] ) if run_type == "nowcast-green": namelist_top_sections = ( "namelist_top_restart", "namelist_top_TracerDefAndBdy", "namelist_top_physics", ) namelists["namelist_top_cfg"] = [ os.fspath((run_sets_dir / namelist).resolve()) for namelist in namelist_top_sections ] namelist_smelt_sections = ( "namelist_smelt_biology", "namelist_smelt_rivers", "namelist_smelt_skog", ) namelists["namelist_smelt_cfg"] = [ os.fspath((run_sets_dir / namelist).resolve()) for namelist in namelist_smelt_sections ] nemo_config_name = config["run types"][run_type]["config name"] run_desc = salishsea_cmd.api.run_description( run_id=run_id, config_name=nemo_config_name, mpi_decomposition=(host_config["run types"][run_type]["mpi decomposition"]), walltime=(host_config["run types"][run_type].get("walltime")), NEMO_code_config=os.fspath( (run_prep_dir / "../NEMO-3.6-code" / "NEMOGCM" / "CONFIG").resolve() ), XIOS_code=os.fspath((run_prep_dir / "../XIOS-2/").resolve()), forcing_path=os.fspath(run_prep_dir.resolve()), runs_dir=os.fspath(run_prep_dir.resolve()), forcing=forcing, namelists=namelists, ) grid_dir = Path(host_config["grid dir"]) run_desc["grid"]["coordinates"] = os.fspath( grid_dir / config["run types"][run_type]["coordinates"] ) run_desc["grid"]["bathymetry"] = os.fspath( grid_dir / config["run types"][run_type]["bathymetry"] ) lpe_filename = config["run types"][run_type]["land processor elimination"] run_desc["grid"]["land processor elimination"] = os.fspath(grid_dir / lpe_filename) run_desc["restart"] = restart_filepaths run_desc["output"].update( { "iodefs": os.fspath((run_sets_dir / "iodef.xml").resolve()), "domaindefs": os.fspath((run_sets_dir / "domain_def.xml").resolve()), "fielddefs": os.fspath((run_sets_dir / "field_def.xml").resolve()), } ) del run_desc["output"]["domain"] del run_desc["output"]["fields"] if (run_sets_dir / "file_def.xml").exists(): run_desc["output"]["filedefs"] = os.fspath( (run_sets_dir / "file_def.xml").resolve() ) run_desc["vcs revisions"] = { "git": [ os.fspath((run_prep_dir / "../grid").resolve()), os.fspath((run_prep_dir / "../moad_tools").resolve()), os.fspath((run_prep_dir / "../NEMO-Cmd").resolve()), os.fspath((run_prep_dir / "../NEMO_Nowcast").resolve()), os.fspath((run_prep_dir / "../rivers-climatology").resolve()), os.fspath((run_prep_dir / "../SalishSeaCmd").resolve()), os.fspath((run_prep_dir / "../SalishSeaNowcast").resolve()), os.fspath((run_prep_dir / "../SS-run-sets").resolve()), os.fspath((run_prep_dir / "../tides").resolve()), os.fspath((run_prep_dir / "../tools").resolve()), os.fspath((run_prep_dir / "../tracers").resolve()), os.fspath((run_prep_dir / "../XIOS-ARCH").resolve()), ], } return run_desc def _create_run_script( run_date, run_type, run_dir, run_desc_filepath, host_name, config ): host_config = config["run"]["enabled hosts"][host_name] dmy = run_date.format("DDMMMYY").lower() results_dir = Path(host_config["run types"][run_type]["results"]) script = _build_script( run_dir, run_type, run_desc_filepath, results_dir / dmy, host_name, config ) run_script_filepath = run_dir / "SalishSeaNEMO.sh" with run_script_filepath.open("wt") as f: f.write(script) lib.fix_perms( run_script_filepath, mode=int(lib.FilePerms(user="rwx", group="rwx", other="r")) ) logger.debug(f"{run_type}: run script: {run_script_filepath}") return run_script_filepath def _build_script(run_dir, run_type, run_desc_filepath, results_dir, host_name, config): run_desc = nemo_cmd.prepare.load_run_desc(run_desc_filepath) host_config = config["run"]["enabled hosts"][host_name] nemo_processors = nemo_cmd.prepare.get_n_processors(run_desc, run_dir) xios_processors = int(run_desc["output"]["XIOS servers"]) email = host_config.get("email", "nobody@example.com") xios_host = host_config.get("xios host") script = "#!/bin/bash\n" if host_config["job exec cmd"] == "qsub": script = "\n".join( ( script, "{pbs_common}".format( pbs_common=salishsea_cmd.run._pbs_directives( run_desc, nemo_processors + xios_processors, email, results_dir, stderr_stdout=False, ) ), ) ) script = "\n".join( ( script, "{defns}\n" "{execute}\n" "{fix_permissions}\n" "{cleanup}".format( defns=_definitions( run_type, run_desc, run_desc_filepath, run_dir, results_dir, host_name, config, ), execute=_execute(nemo_processors, xios_processors, xios_host), fix_permissions=_fix_permissions(), cleanup=_cleanup(), ), ) ) return script def _definitions( run_type, run_desc, run_desc_filepath, run_dir, results_dir, host_name, config ): host_config = config["run"]["enabled hosts"][host_name] mpirun = "mpirun" if host_config.get("mpi hosts file") is not None: mpirun = f'mpirun --mca btl ^openib --mca orte_tmpdir_base /dev/shm --hostfile {host_config["mpi hosts file"]}' defns = ( 'RUN_ID="{run_id}"\n' 'RUN_DESC="{run_desc_file}"\n' 'WORK_DIR="{run_dir}"\n' 'RESULTS_DIR="{results_dir}"\n' 'MPIRUN="{mpirun}"\n' 'COMBINE="{salishsea_cmd} combine"\n' 'GATHER="{salishsea_cmd} gather"\n' ).format( run_id=run_desc["run_id"], run_desc_file=run_desc_filepath.name, run_dir=run_dir, results_dir=results_dir, mpirun=mpirun, salishsea_cmd=host_config["salishsea_cmd"], ) return defns def _execute(nemo_processors, xios_processors, xios_host): mpirun = ( f"${{MPIRUN}} -np {nemo_processors} --bind-to none ./nemo.exe : " f"-np {xios_processors} --bind-to none ./xios_server.exe" ) if xios_host is not None: mpirun = ( f"${{MPIRUN}} -np {nemo_processors} --bind-to none ./nemo.exe : " f"-host {xios_host} -np {xios_processors} --bind-to none " f"./xios_server.exe" ) script = ( "mkdir -p ${RESULTS_DIR}\n" "\n" "cd ${WORK_DIR}\n" 'echo "working dir: $(pwd)" >>${RESULTS_DIR}/stdout\n' "\n" 'echo "Starting run at $(date)" >>${RESULTS_DIR}/stdout\n' ) script += f"{mpirun} >>${{RESULTS_DIR}}/stdout 2>>${{RESULTS_DIR}}/stderr\n" script += ( 'echo "Ended run at $(date)" >>${RESULTS_DIR}/stdout\n' "\n" 'echo "Results combining started at $(date)" >>${RESULTS_DIR}/stdout\n' "${COMBINE} ${RUN_DESC} --debug >>${RESULTS_DIR}/stdout\n" 'echo "Results combining ended at $(date)" >>${RESULTS_DIR}/stdout\n' "\n" 'echo "Results gathering started at $(date)" >>${RESULTS_DIR}/stdout\n' "${GATHER} ${RESULTS_DIR} --debug >>${RESULTS_DIR}/stdout\n" 'echo "Results gathering ended at $(date)" >>${RESULTS_DIR}/stdout\n' ) return script def _fix_permissions(): script = ( "chmod g+rwx ${RESULTS_DIR}\n" "chmod g+rw ${RESULTS_DIR}/*\n" "chmod o+rx ${RESULTS_DIR}\n" "chmod o+r ${RESULTS_DIR}/*\n" ) return script def _cleanup(): script = ( 'echo "Deleting run directory" >>${RESULTS_DIR}/stdout\n' "rmdir $(pwd)\n" 'echo "Finished at $(date)" >>${RESULTS_DIR}/stdout\n' ) return script def _launch_run_script(run_type, run_script_filepath, host_name, config): host_config = config["run"]["enabled hosts"][host_name] logger.info(f"{run_type}: launching {run_script_filepath} on {host_name}") cmd = f'{host_config["job exec cmd"]} {run_script_filepath}' run_exec_cmd = cmd logger.debug(f"{run_type}: running command in subprocess: {shlex.split(cmd)}") if host_config["job exec cmd"] == "qsub": proc = subprocess.run( shlex.split(cmd), stdout=subprocess.PIPE, check=True, universal_newlines=True, ) torque_id = proc.stdout.strip() logger.debug(f"{run_type}: TORQUE/PBD job id: {torque_id}") cmd = shlex.split(f"pgrep {torque_id}") run_id = torque_id else: run_id = None subprocess.Popen(shlex.split(cmd)) cmd = shlex.split(f'pgrep --newest --exact --full "{cmd}"') run_process_pid = None while not run_process_pid: try: proc = subprocess.run( cmd, stdout=subprocess.PIPE, check=True, universal_newlines=True ) run_process_pid = int(proc.stdout) except subprocess.CalledProcessError: # Process has not yet been spawned pass logger.debug(f"{run_type} on {host_name}: run pid: {run_process_pid}") return run_exec_cmd, run_id if __name__ == "__main__": main() # pragma: no cover