Source code for nowcast.workers.run_fvcom

#  Copyright 2013 – present by the SalishSeaCast Project contributors
#  and The University of British Columbia
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
"""Salish Sea FVCOM Vancouver Harbour and Fraser River model worker that
prepares the temporary run directory and bash run script for a nowcast or
forecast run on the ONC cloud, and launches the run.
"""
import logging
import os
import shlex
import shutil
import subprocess
import tempfile
import textwrap
from datetime import timedelta
from pathlib import Path

import arrow
import f90nml
import fvcom_cmd.api
import yaml
from nemo_nowcast import NowcastWorker

from nowcast import lib

NAME = "run_fvcom"
logger = logging.getLogger(NAME)


[docs] def main(): """Set up and run the worker. For command-line usage see: :command:`python -m nowcast.workers.run_fvcom --help` """ worker = NowcastWorker(NAME, description=__doc__) worker.init_cli() worker.cli.add_argument("host_name", help="Name of the host to execute the run on") worker.cli.add_argument( "model_config", choices={"r12", "x2"}, help=""" Model configuration of run to execute: 'r12' means the r12 resolution 'x2' means the x2 resolution """, ) worker.cli.add_argument( "run_type", choices={"nowcast", "forecast"}, help=""" Type of run to execute: 'nowcast' means run for present UTC day (after NEMO nowcast run) 'forecast' means updated forecast run (next 36h UTC, after NEMO forecast run) """, ) worker.cli.add_date_option( "--run-date", default=arrow.now().floor("day"), help="Date to execute the run for.", ) worker.run(run_fvcom, success, failure)
def success(parsed_args): """ :param :py:class:`argparse.Namespace` parsed_args: :return: Nowcast system message type :rtype: str """ logger.info( f"{parsed_args.model_config} {parsed_args.run_type} FVCOM VH-FR run for " f'{parsed_args.run_date.format("YYYY-MM-DD")} ' f"on {parsed_args.host_name} started" ) msg_type = f"success {parsed_args.model_config} {parsed_args.run_type}" return msg_type def failure(parsed_args): """ :param :py:class:`argparse.Namespace` parsed_args: :return: Nowcast system message type :rtype: str """ logger.critical( f"{parsed_args.model_config} {parsed_args.run_type} FVCOM VH-FR run for " f'{parsed_args.run_date.format("YYYY-MM-DD")} ' f"on {parsed_args.host_name} failed" ) msg_type = f"failure {parsed_args.model_config} {parsed_args.run_type}" return msg_type def run_fvcom(parsed_args, config, *args): """ :param :py:class:`argparse.Namespace` parsed_args: :param :py:class:`nemo_nowcast.Config` config: :return: Nowcast system checklist items :rtype: dict """ host_name = parsed_args.host_name model_config = parsed_args.model_config run_type = parsed_args.run_type run_date = parsed_args.run_date run_desc_file_path = _create_run_desc_file(run_date, model_config, run_type, config) tmp_run_dir = fvcom_cmd.api.prepare(run_desc_file_path) logger.debug(f"{run_type}: temporary run directory: {tmp_run_dir}") ## TODO: It would be nice if prepare() copied YAML file to tmp run dir shutil.copy2(run_desc_file_path, tmp_run_dir / run_desc_file_path.name) _prep_fvcom_input_dir(run_date, model_config, run_type, config) run_script_path = _create_run_script( run_date, model_config, run_type, tmp_run_dir, run_desc_file_path, config ) run_desc_file_path.unlink() run_exec_cmd = _launch_run_script(run_type, run_script_path, host_name) return { f"{model_config} {run_type}": { "host": host_name, "run dir": os.fspath(tmp_run_dir), "run exec cmd": run_exec_cmd, "model config": model_config, "run date": run_date.format("YYYY-MM-DD"), } } def _create_run_desc_file(run_date, model_config, run_type, config): """ :param :py:class:`arrow.Arrow` run_date: :param str model_config: :param str run_type: :param :py:class:`nemo_nowcast.Config` config: :return: Run description file path :rtype: :py:class:`pathlib.Path` """ ddmmmyy = run_date.format("DDMMMYY").lower() run_id = f"{ddmmmyy}fvcom-{model_config}-{run_type}" run_prep_dir = Path(config["vhfr fvcom runs"]["run prep dir"]) run_desc = _run_description( run_id, run_date, model_config, run_type, run_prep_dir, config ) run_desc_file_path = run_prep_dir / f"{run_id}.yaml" with run_desc_file_path.open("wt") as f: yaml.safe_dump(run_desc, f, default_flow_style=False) logger.debug(f"{run_type}: run description file: {run_desc_file_path}") return run_desc_file_path def _run_description(run_id, run_date, model_config, run_type, run_prep_dir, config): """ :param str run_id: :param :py:class:`arrow.Arrow` run_date: :param str model_config: :param str run_type: :param :py:class:`pathlib.Path` run_prep_dir: :param :py:class:`nemo_nowcast.Config` config: :return: Run description :rtype dict: """ casename = config["vhfr fvcom runs"]["case name"][model_config] _edit_namelists(casename, run_date, model_config, run_type, run_prep_dir, config) namelist_path = _assemble_namelist(casename, run_type, run_prep_dir, config) run_desc = { "run_id": run_id, "casename": casename, "nproc": config["vhfr fvcom runs"]["number of processors"][model_config], "paths": { "FVCOM": os.fspath( Path(config["vhfr fvcom runs"]["FVCOM exe path"]).resolve() ), "runs directory": os.fspath(run_prep_dir.resolve()), "input": os.fspath((run_prep_dir / f"input.{model_config}").resolve()), }, "namelist": os.fspath(namelist_path.resolve()), ## TODO: Add VCS revision tracking, but need to be able to handle Git ## repos to do so. } return run_desc def _edit_namelists(casename, run_date, model_config, run_type, run_prep_dir, config): """ :param str casename: :param :py:class:`arrow.Arrow` run_date: :param str model_config: :param str run_type: :param :py:class:`pathlib.Path` run_prep_dir: :param :py:class:`nemo_nowcast.Config` config: """ start_offsets = {"nowcast": timedelta(hours=0), "forecast": timedelta(hours=24)} start_date = run_date + start_offsets[run_type] run_durations = {"nowcast": timedelta(hours=24), "forecast": timedelta(hours=36)} atmos_file_tmpl = config["vhfr fvcom runs"]["atmospheric forcing"][ "atmos file template" ] atmos_files = { field_type: atmos_file_tmpl.format( model_config=model_config, run_type=run_type, field_type=field_type, yyyymmdd=start_date.format("YYYYMMDD"), ) for field_type in config["vhfr fvcom runs"]["atmospheric forcing"][ "field types" ] } rivers_file_tmpl = config["vhfr fvcom runs"]["rivers forcing"][ "rivers file template" ] rivers_file = Path( rivers_file_tmpl.format( model_config=model_config, run_type=run_type, yyyymmdd=start_date.format("YYYYMMDD"), ) ).with_suffix(".nc_riv.nml") bdy_file_tmpl = config["vhfr fvcom runs"]["nemo coupling"]["boundary file template"] bdy_file = bdy_file_tmpl.format( model_config=model_config, run_type=run_type, yyyymmdd=start_date.format("YYYYMMDD"), ) time_step = config["vhfr fvcom runs"]["run types"][f"{run_type} {model_config}"][ "time step" ] patches = { run_prep_dir / "namelist.case": { "nml_case": { "case_title": casename, "start_date": start_date.format("YYYY-MM-DD HH:mm:ss.00"), "end_date": ( (start_date + run_durations[run_type]).format( "YYYY-MM-DD HH:mm:ss.00" ) ), } }, run_prep_dir / "namelist.startup.hotstart": { "nml_startup": {"startup_file": f"vh_{model_config}_restart_0001.nc"} }, run_prep_dir / "namelist.numerics": {"nml_integration": {"extstep_seconds": time_step}}, run_prep_dir / "namelist.restart": { "nml_restart": { "rst_first_out": start_date.shift(days=+1).format( "YYYY-MM-DD HH:mm:ss.00" ) } }, run_prep_dir / "namelist.netcdf": { "nml_netcdf": { "nc_first_out": start_date.format("YYYY-MM-DD 01:00:00.00"), "nc_output_stack": 24 if run_type == "nowcast" else 36, } }, run_prep_dir / "namelist.physics": { "nml_heating_calculated": {"heating_calculate_file": atmos_files["hfx"]} }, run_prep_dir / "namelist.surface": { "nml_surface_forcing": { "wind_file": atmos_files["wnd"], "precipitation_file": atmos_files["precip"], "airpressure_file": atmos_files["hfx"], } }, run_prep_dir / f"namelist.rivers.{model_config}": { "nml_river_type": {"river_info_file": os.fspath(rivers_file)} }, run_prep_dir / "namelist.obc": { "nml_open_boundary_control": { "obc_node_list_file": config["vhfr fvcom runs"]["fvcom grid"][ model_config ]["obc nodes file"] } }, run_prep_dir / "namelist.grid": { "nml_grid_coordinates": { "grid_file": config["vhfr fvcom runs"]["fvcom grid"][model_config][ "grid file" ], "sigma_levels_file": config["vhfr fvcom runs"]["fvcom grid"][ model_config ]["sigma file"], "depth_file": config["vhfr fvcom runs"]["fvcom grid"][model_config][ "depths file" ], "coriolis_file": config["vhfr fvcom runs"]["fvcom grid"][model_config][ "coriolis file" ], "sponge_file": config["vhfr fvcom runs"]["fvcom grid"][model_config][ "sponge file" ], } }, run_prep_dir / "namelist.nesting": {"nml_nesting": {"nesting_file_name": bdy_file}}, run_prep_dir / "namelist.station_timeseries": { "nml_station_timeseries": { "station_file": Path( config["vhfr fvcom runs"]["output station timeseries"][model_config] ).name } }, } for namelist_path, patch in patches.items(): _patch_namelist(namelist_path, patch) def _patch_namelist(namelist_path, patch): """ :param :py:class:`pathlib.Path` namelist_path: :param dict patch: """ # f90nml insists on writing the patched namelist to a file, # so we use an ephemeral temporary file with tempfile.TemporaryFile("wt") as tmp_patched_namelist: nml = f90nml.patch(namelist_path, patch, tmp_patched_namelist) with namelist_path.open("wt") as patched_nameslist: nml.write(patched_nameslist) logger.debug(f"patched namelist: {namelist_path}") def _assemble_namelist(casename, run_type, run_prep_dir, config): """ :param str casename: :param str run_type: :param :py:class:`pathlib.Path` run_prep_dir: :param :py:class:`nemo_nowcast.Config` config: :return: Namelist file path :rtype: :py:class:`pathlib.Path` """ namelist_file = f"{casename}_run.nml" namelist_files = config["vhfr fvcom runs"]["namelists"][namelist_file] with (run_prep_dir / namelist_file).open("wt") as namelist: for nml in namelist_files: nml_path = Path(nml) if not nml_path.is_absolute(): nml_path = run_prep_dir / nml with nml_path.open("rt") as f: namelist.writelines(f.readlines()) namelist.write("\n") logger.debug(f"{run_type}: namelist file: {run_prep_dir/namelist_file}") return run_prep_dir / namelist_file def _prep_fvcom_input_dir(run_date, model_config, run_type, config): """ :param :py:class:`arrow.Arrow` run_date: :param str model_config: :param str run_type: :param :py:class:`nemo_nowcast.Config` config: """ fvcom_input_dir = Path(config["vhfr fvcom runs"]["input dir"][model_config]) grid_dir = Path(config["vhfr fvcom runs"]["fvcom grid"]["grid dir"]) for grid_file in ( "grid file", "depths file", "sigma file", "coriolis file", "sponge file", "obc nodes file", ): f = Path(config["vhfr fvcom runs"]["fvcom grid"][model_config][grid_file]) (fvcom_input_dir / f).symlink_to(grid_dir / f) logger.debug(f"symlinked {grid_dir} files into {fvcom_input_dir}") output_timeseries_file = Path( config["vhfr fvcom runs"]["output station timeseries"][model_config] ) (fvcom_input_dir / output_timeseries_file.name).symlink_to(output_timeseries_file) logger.debug(f"symlinked {output_timeseries_file} file into {fvcom_input_dir}") casename = config["vhfr fvcom runs"]["case name"][model_config] restart_dir = Path( config["vhfr fvcom runs"]["run types"][f"nowcast {model_config}"]["results"] ) restart_file_date = run_date.shift(days=-1) if run_type == "nowcast" else run_date restart_file = ( restart_dir / restart_file_date.format("DDMMMYY").lower() / f"{casename}_restart_0001.nc" ) (fvcom_input_dir / restart_file.name).symlink_to(restart_file) logger.debug(f"symlinked {restart_file} file into {fvcom_input_dir}") def _create_run_script( run_date, model_config, run_type, tmp_run_dir, run_desc_file_path, config ): """ :param :py:class:`arrow.Arrow` run_date: :param str model_config: :param str run_type: :param :py:class:`pathlib.Path` tmp_run_dir: :param :py:class:`pathlib.Path` run_desc_file_path: :param :py:class:`nemo_nowcast.Config` config: :return: Run script file path :rtype: :py:class:`pathlib.Path` """ results_dir = Path( config["vhfr fvcom runs"]["run types"][f"{run_type} {model_config}"]["results"] ) ddmmmyy = run_date.format("DDMMMYY").lower() script = _build_script( tmp_run_dir, run_desc_file_path, results_dir / ddmmmyy, model_config, config ) run_script_path = tmp_run_dir / "VHFR_FVCOM.sh" with run_script_path.open("wt") as f: f.write(script) lib.fix_perms( run_script_path, mode=int(lib.FilePerms(user="rwx", group="rwx", other="r")) ) logger.debug(f"{run_type}: run script: {run_script_path}") return run_script_path def _build_script(tmp_run_dir, run_desc_file_path, results_dir, model_config, config): """ :param :py:class:`pathlib.Path` tmp_run_dir: :param :py:class:`pathlib.Path` run_desc_file_path: :param :py:class:`pathlib.Path` results_dir: :param str model_config: :param :py:class:`nemo_nowcast.Config` config: :return: Run script :rtype: str """ with run_desc_file_path.open("rt") as f: run_desc = yaml.safe_load(f) script = "#!/bin/bash\n" script = "\n".join( ( script, "{defns}\n" "{execute}\n" "{fix_permissions}\n" "{cleanup}".format( defns=_definitions( run_desc, tmp_run_dir, run_desc_file_path, results_dir, model_config, config, ), execute=_execute(model_config, config), fix_permissions=_fix_permissions(), cleanup=_cleanup(), ), ) ) return script def _definitions( run_desc, tmp_run_dir, run_desc_file_path, results_dir, model_config, config ): """ :param dict run_desc: :param :py:class:`pathlib.Path` tmp_run_dir: :param :py:class:`pathlib.Path` run_desc_file_path: :param :py:class:`pathlib.Path` results_dir: :param str model_config: :param :py:class:`nemo_nowcast.Config` config: :return: Run script definitions :rtype: str """ run_id = run_desc["run_id"] fvc_cmd = config["vhfr fvcom runs"]["fvc_cmd"] mpi_hosts_file = config["vhfr fvcom runs"]["mpi hosts file"][model_config] defns = ( f'RUN_ID="{run_id}"\n' f'RUN_DESC="{run_desc_file_path.name}"\n' f'WORK_DIR="{tmp_run_dir}"\n' f'RESULTS_DIR="{results_dir}"\n' f'MPIRUN="mpirun --mca btl ^openib --mca orte_tmpdir_base /dev/shm --hostfile {mpi_hosts_file}"\n' f'GATHER="{fvc_cmd} gather"\n' ) return defns def _execute(model_config, config): """ :param str model_config: :param :py:class:`nemo_nowcast.Config` config: :return: Run script model execution commands :rtype: str """ n_processors = config["vhfr fvcom runs"]["number of processors"][model_config] casename = config["vhfr fvcom runs"]["case name"][model_config] script = textwrap.dedent( f"""\ mkdir -p ${{RESULTS_DIR}} cd ${{WORK_DIR}} echo "working dir: $(pwd)" >>${{RESULTS_DIR}}/stdout echo "Starting run at $(date)" >>${{RESULTS_DIR}}/stdout ${{MPIRUN}} -np {n_processors} --bind-to none ./fvcom \\ --casename={casename} --logfile=./fvcom.log \\ >>${{RESULTS_DIR}}/stdout 2>>${{RESULTS_DIR}}/stderr echo "Ended run at $(date)" >>${{RESULTS_DIR}}/stdout /bin/rm -f --verbose ${{WORK_DIR}}/.fvcomtestfile /bin/rmdir --verbose ${{WORK_DIR}}/output echo "Results gathering started at $(date)" >>${{RESULTS_DIR}}/stdout ${{GATHER}} ${{RESULTS_DIR}} --debug >>${{RESULTS_DIR}}/stdout echo "Results gathering ended at $(date)" >>${{RESULTS_DIR}}/stdout """ ) return script def _fix_permissions(): """ :return: Run script results directory and files permissions adjustment commands :rtype: str """ script = ( "chmod g+rwx ${RESULTS_DIR}\n" "chmod g+rw ${RESULTS_DIR}/*\n" "chmod o+rx ${RESULTS_DIR}\n" "chmod o+r ${RESULTS_DIR}/*\n" ) return script def _cleanup(): """ :return: Run script commands to delete temporary run directory :rtype: str """ script = ( 'echo "Deleting run directory" >>${RESULTS_DIR}/stdout\n' "rmdir $(pwd)\n" 'echo "Finished at $(date)" >>${RESULTS_DIR}/stdout\n' ) return script def _launch_run_script(run_type, run_script_path, host_name): """ :param str run_type: :param :py:class:`pathlib.Path` run_script_path: :param str host_name: :param :py:class:`nemo_nowcast.Config` config: :return: Run execution command :rtype: str """ logger.info(f"{run_type}: launching {run_script_path} on {host_name}") run_exec_cmd = f"bash {run_script_path}" logger.debug( f"{run_type}: running command in subprocess: {shlex.split(run_exec_cmd)}" ) subprocess.Popen(shlex.split(run_exec_cmd)) run_process_pid = None while not run_process_pid: try: proc = subprocess.run( shlex.split(f'pgrep --newest --exact --full "{run_exec_cmd}"'), stdout=subprocess.PIPE, check=True, universal_newlines=True, ) run_process_pid = int(proc.stdout) except subprocess.CalledProcessError: # Process has not yet been spawned pass logger.debug(f"{run_type} on {host_name}: run pid: {run_process_pid}") return run_exec_cmd if __name__ == "__main__": main() # pragma: no cover