Source code for nowcast.workers.watch_NEMO

#  Copyright 2013 – present by the SalishSeaCast Project contributors
#  and The University of British Columbia
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
"""SalishSeaCast worker that monitors and reports on the
progress of a run on the ONC cloud computing facility or salish.
"""
import logging
import os
import shlex
import subprocess
import time
from pathlib import Path

import arrow
import f90nml
from nemo_nowcast import NowcastWorker

NAME = "watch_NEMO"
logger = logging.getLogger(NAME)

POLL_INTERVAL = 5 * 60  # seconds


[docs] def main(): """Set up and run the worker. For command-line usage see: :command:`python -m nowcast.workers.watch_NEMO --help` """ worker = NowcastWorker(NAME, description=__doc__) worker.init_cli() worker.cli.add_argument("host_name", help="Name of the host to monitor the run on") worker.cli.add_argument( "run_type", choices={"nowcast", "nowcast-green", "forecast", "forecast2"}, help=""" Type of run to monitor: 'nowcast' means nowcast physics run, 'nowcast-green' means nowcast green ocean run, 'forecast' means updated forecast run, 'forecast2' means preliminary forecast run, """, ) worker.run(watch_NEMO, success, failure)
def success(parsed_args): logger.info("{0.run_type} NEMO run on {0.host_name} completed".format(parsed_args)) msg_type = "success {.run_type}".format(parsed_args) return msg_type def failure(parsed_args): logger.critical("{0.run_type} NEMO run on {0.host_name} failed".format(parsed_args)) msg_type = "failure {.run_type}".format(parsed_args) return msg_type def watch_NEMO(parsed_args, config, tell_manager): host_name = parsed_args.host_name run_type = parsed_args.run_type run_info = tell_manager("need", "NEMO run").payload run_date = arrow.get(run_info[run_type]["run date"]) pid = _find_run_pid(run_info[run_type]) logger.debug(f"{run_type} on {host_name}: run pid: {pid}") # Get run time steps and date info from namelist run_dir = Path(run_info[run_type]["run dir"]) namelist = f90nml.read(run_dir / "namelist_cfg") it000 = namelist["namrun"]["nn_it000"] itend = namelist["namrun"]["nn_itend"] date0 = arrow.get(str(namelist["namrun"]["nn_date0"]), "YYYYMMDD") rdt = namelist["namdom"]["rn_rdt"] # Watch for the run process to end while _pid_exists(pid): try: with (run_dir / "time.step").open("rt") as f: time_step = int(f.read().strip()) model_seconds = (time_step - it000) * rdt model_time = date0.shift(seconds=model_seconds).format( "YYYY-MM-DD HH:mm:ss UTC" ) fraction_done = (time_step - it000) / (itend - it000) msg = ( f"{run_type} on {host_name}: timestep: " f"{time_step} = {model_time}, {fraction_done:.1%} complete" ) except FileNotFoundError: # time.step file not found; assume that run is young and it # hasn't been created yet, or has finished and it has been # moved to the results directory msg = ( f"{run_type} on {host_name}: time.step not found; " f"continuing to watch..." ) logger.info(msg) time.sleep(POLL_INTERVAL) checklist = { run_type: {"host": host_name, "run date": run_date.format("YYYY-MM-DD")} } run_duration = config["run types"][run_type]["duration"] timesteps_per_day = 86400 / rdt restart_timestep = ( int(it000 + int(run_duration) * timesteps_per_day) - 1 if run_type in ("forecast", "forecast2") else itend ) checklist[run_type]["completed"] = _confirm_run_success( host_name, run_type, run_date, run_dir, itend, restart_timestep, config ) return checklist def _find_run_pid(run_info): if run_info["run exec cmd"].startswith("qsub"): torque_id = run_info["run id"] cmd = shlex.split(f"pgrep {torque_id}") logger.debug(f"searching processes for {torque_id}") else: run_exec_cmd = run_info["run exec cmd"] cmd = shlex.split(f'pgrep --newest --exact --full "{run_exec_cmd}"') logger.debug(f'searching processes for "{run_exec_cmd}"') pid = None while pid is None: try: proc = subprocess.run( cmd, stdout=subprocess.PIPE, check=True, universal_newlines=True ) pid = int(proc.stdout) except subprocess.CalledProcessError: # Process has not yet been spawned pass return pid def _pid_exists(pid): """Check whether pid exists in the current process table. From: http://stackoverflow.com/a/6940314 """ if pid < 0: return False if pid == 0: # According to "man 2 kill" PID 0 refers to every process # in the process group of the calling process. # On certain systems 0 is a valid PID but we have no way # to know that in a portable fashion. raise ValueError("invalid PID 0") try: os.kill(pid, 0) except ProcessLookupError: return False except PermissionError: # PermissionError clearly means there's a process to deny access to return True except OSError: raise return True def _confirm_run_success( host_name, run_type, run_date, run_dir, itend, restart_timestep, config ): run_succeeded = True run_config = config["run"]["enabled hosts"][host_name]["run types"][run_type] dmy = run_date.format("DDMMMYY").lower() results_dir = Path(run_config["results"], dmy) if not results_dir.exists(): run_succeeded = False logger.critical(f"No results directory for {host_name} run: {results_dir}") # Continue the rest of the checks in the temporary run directory results_dir = run_dir if (results_dir / "output.abort.nc").exists(): run_succeeded = False logger.critical( f"{host_name} {run_type}/{dmy} run aborted: " f'{results_dir/"output.abort.nc"}' ) try: with (results_dir / "time.step").open("rt") as f: time_step = int(f.read().strip()) if time_step != itend: run_succeeded = False logger.critical( f"{host_name} {run_type}/{dmy} run failed: " f"final time step is {time_step} not {itend}" ) except FileNotFoundError: run_succeeded = False logger.critical(f"{host_name} {run_type}/{dmy} run failed; no time.step file") pass restart_file = results_dir / f"SalishSea_{restart_timestep:08d}_restart.nc" if not restart_file.exists(): run_succeeded = False logger.critical( f"{host_name} {run_type}/{dmy} run failed; " f"no physics restart file: {restart_file}" ) if run_type == "nowcast-green": tracer_restart_file = ( results_dir / f"SalishSea_{restart_timestep:08d}_restart_trc.nc" ) if not tracer_restart_file.exists(): run_succeeded = False logger.critical( f"{host_name} {run_type}/{dmy} run failed; " f"no tracers restart file: {tracer_restart_file}" ) try: with (results_dir / "ocean.output").open("rt") as f: for line in f: if "E R R O R" in line: run_succeeded = False logger.critical( f"{host_name} {run_type}/{dmy} run failed; " f'1 or more E R R O R in: {results_dir/"ocean.output"}' ) break except FileNotFoundError: run_succeeded = False logger.critical( f"{host_name} {run_type}/{dmy} run failed; no ocean.output file" ) pass try: with (results_dir / "solver.stat").open("rt") as f: for line in f: if "NaN" in line: run_succeeded = False logger.critical( f"{host_name} {run_type}/{dmy} run failed; " f'NaN in: {results_dir/"solver.stat"}' ) break except FileNotFoundError: run_succeeded = False logger.critical(f"{host_name} {run_type}/{dmy} run failed; no solver.stat file") pass if run_type == "nowcast-green": try: with (results_dir / "tracer.stat").open("rt") as f: for line in f: if "NaN" in line: run_succeeded = False logger.critical( f"{host_name} {run_type}/{dmy} run failed; " f'NaN in: {results_dir/"solver.stat"}' ) break except FileNotFoundError: run_succeeded = False logger.critical( f"{host_name} {run_type}/{dmy} run failed; no tracer.stat file" ) pass return run_succeeded if __name__ == "__main__": main() # pragma: no cover