Source code for nowcast.workers.watch_fvcom

#  Copyright 2013 – present by the SalishSeaCast Project contributors
#  and The University of British Columbia
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
"""Salish Sea nowcast worker that monitors and reports on the
progress of an FVCOM run on the ONC cloud computing facility.
"""
import logging
import os
import shlex
import subprocess
import time
from pathlib import Path

from nemo_nowcast import NowcastWorker

NAME = "watch_fvcom"
logger = logging.getLogger(NAME)

POLL_INTERVAL = 5 * 60  # seconds


[docs] def main(): """Set up and run the worker. For command-line usage see: :command:`python -m nowcast.workers.watch_fvcom --help` """ worker = NowcastWorker(NAME, description=__doc__) worker.init_cli() worker.cli.add_argument("host_name", help="Name of the host to monitor the run on") worker.cli.add_argument( "model_config", choices={"r12", "x2"}, help=""" Model configuration to monitor run for for: 'r12' means the r12 resolution 'x2' means the x2 resolution """, ) worker.cli.add_argument( "run_type", choices={"nowcast", "forecast"}, help=""" Type of run to monitor: 'nowcast' means nowcast run (after NEMO nowcast run) 'forecast' means updated forecast run (next 36h UTC, after NEMO forecast run) """, ) worker.run(watch_fvcom, success, failure)
def success(parsed_args): """ :param :py:class:`argparse.Namespace` parsed_args: :return: Nowcast system message type :rtype: str """ logger.info( f"{parsed_args.model_config} {parsed_args.run_type} FVCOM VH-FR run " f"on {parsed_args.host_name} completed" ) msg_type = f"success {parsed_args.model_config} {parsed_args.run_type}" return msg_type def failure(parsed_args): """ :param :py:class:`argparse.Namespace` parsed_args: :return: Nowcast system message type :rtype: str """ logger.critical( f"{parsed_args.model_config} {parsed_args.run_type} FVCOM VH-FR run on " f"{parsed_args.host_name} failed" ) msg_type = f"failure {parsed_args.model_config} {parsed_args.run_type}" return msg_type def watch_fvcom(parsed_args, config, tell_manager): """ :param :py:class:`argparse.Namespace` parsed_args: :param :py:class:`nemo_nowcast.Config` config: :param :py:func:`nemo_nowcast.NowcastWorker.tell_manager` tell_manager: :return: Nowcast system checklist items :rtype: dict """ host_name = parsed_args.host_name model_config = parsed_args.model_config run_type = parsed_args.run_type run_info = tell_manager("need", "FVCOM run").payload pid = _find_run_pid(run_info[f"{model_config} {run_type}"]) logger.debug(f"{run_type} on {host_name}: run pid: {pid}") run_dir = Path(run_info[f"{model_config} {run_type}"]["run dir"]) # Watch for the run process to end while _pid_exists(pid): try: with (run_dir / "fvcom.log").open("rt") as f: lines = f.readlines() lines.reverse() for line in lines: if line.strip().startswith("!") and line.strip().endswith("|"): time_step, model_time, time_to_finish, _ = ( line.strip().strip("!").split(maxsplit=3) ) msg = ( f"{model_config} {run_type} on {host_name}: timestep: " f'{time_step} = {model_time[:-7].replace("T", " ")} ' f"UTC estimated time to finish: {time_to_finish}" ) break else: # fvcom.log file found, but no run status line found msg = ( f"{model_config} {run_type} on {host_name}: no run progress found in " f"fvcom.log ; continuing to watch..." ) except FileNotFoundError: # fvcom.log file not found; assume that run is young and it # hasn't been created yet, or has finished and it has been # moved to the results directory msg = ( f"{model_config} {run_type} on {host_name}: fvcom.log not found; " f"continuing to watch..." ) logger.info(msg) time.sleep(POLL_INTERVAL) return { f"{model_config} {run_type}": { "host": host_name, "model config": model_config, "run date": run_info[f"{model_config} {run_type}"]["run date"], "completed": True, } } def _find_run_pid(run_info): run_exec_cmd = run_info["run exec cmd"] cmd = shlex.split(f'pgrep --newest --exact --full "{run_exec_cmd}"') logger.debug(f'searching processes for "{run_exec_cmd}"') pid = None while pid is None: try: proc = subprocess.run( cmd, stdout=subprocess.PIPE, check=True, universal_newlines=True ) pid = int(proc.stdout) except subprocess.CalledProcessError: # Process has not yet been spawned pass return pid def _pid_exists(pid): """Check whether pid exists in the current process table. From: http://stackoverflow.com/a/6940314 """ if pid < 0: return False if pid == 0: # According to "man 2 kill" PID 0 refers to every process # in the process group of the calling process. # On certain systems 0 is a valid PID but we have no way # to know that in a portable fashion. raise ValueError("invalid PID 0") try: os.kill(pid, 0) except ProcessLookupError: return False except PermissionError: # PermissionError clearly means there's a process to deny access to return True except OSError: raise return True if __name__ == "__main__": main() # pragma: no cover