Source code for nowcast.workers.watch_NEMO_agrif

#  Copyright 2013 – present by the SalishSeaCast Project contributors
#  and The University of British Columbia
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
"""SalishSeaCast worker that monitors and reports on the progress of a
NEMO AGRIF run on an HPC cluster that uses the TORQUE/MOAB scheduler.
"""
import logging
import os
import tempfile
import time
from pathlib import Path
from types import SimpleNamespace

import arrow
import f90nml
from nemo_nowcast import NowcastWorker, WorkerError

from nowcast import ssh_sftp

NAME = "watch_NEMO_agrif"
logger = logging.getLogger(NAME)


[docs] def main(): """Set up and run the worker. For command-line usage see: :command:`python -m nowcast.workers.watch_NEMO_agrif --help` """ worker = NowcastWorker(NAME, description=__doc__) worker.init_cli() worker.cli.add_argument("host_name", help="Name of the host to monitor the run on") worker.cli.add_argument("job_id", help="Job identifier of the job to monitor") worker.run(watch_NEMO_agrif, success, failure)
def success(parsed_args): """ :param :py:class:`argparse.Namespace` parsed_args: :return: Nowcast system message type :rtype: str """ logger.info( f"NEMO AGRIF run {parsed_args.job_id} on {parsed_args.host_name} completed" ) msg_type = "success" return msg_type def failure(parsed_args): """ :param :py:class:`argparse.Namespace` parsed_args: :return: Nowcast system message type :rtype: str """ logger.critical( f"NEMO AGRIF run {parsed_args.job_id} on {parsed_args.host_name} " f"watcher failed" ) msg_type = "failure" return msg_type def watch_NEMO_agrif(parsed_args, config, *args): """ :param :py:class:`argparse.Namespace` parsed_args: :param :py:class:`nemo_nowcast.Config` config: :return: Nowcast system checklist items :rtype: dict """ host_name = parsed_args.host_name job_id = parsed_args.job_id.split(".", 1)[0] ssh_key = Path( os.environ["HOME"], ".ssh", config["run"]["enabled hosts"][host_name]["ssh key"] ) scratch_dir = Path(config["run"]["enabled hosts"][host_name]["scratch dir"]) try: ssh_client, sftp_client = ssh_sftp.sftp(host_name, ssh_key) run_id = _get_run_id(ssh_client, host_name, job_id) while _is_queued(ssh_client, host_name, job_id, run_id): time.sleep(60) tmp_run_dir = _get_tmp_run_dir(ssh_client, host_name, scratch_dir, run_id) run_info = _get_run_info(sftp_client, host_name, tmp_run_dir) while _is_running(ssh_client, host_name, job_id, run_id, tmp_run_dir, run_info): time.sleep(60 * 5) finally: sftp_client.close() ssh_client.close() checklist = { "nowcast-agrif": { "host": host_name, "job id": job_id, "run date": arrow.get(run_id[:7], "DDMMMYY").format("YYYY-MM-DD"), "completed": True, } } return checklist def _get_run_id(ssh_client, host_name, job_id): """ :param :py:class:`paramiko.client.SSHClient` ssh_client: :param str host_name: :param str job_id: :return: run id :rtype: str """ queue_info = _get_queue_info(ssh_client, host_name, job_id) for line in queue_info.splitlines(): if line.strip().startswith("Job_Name"): run_id = line.split()[2] logger.info(f"watching {run_id} job {job_id} on {host_name}") return run_id def _is_queued(ssh_client, host_name, job_id, run_id): """ :param :py:class:`paramiko.client.SSHClient` ssh_client: :param str host_name: :param str job_id: :param str run_id: :return: Flag indicating whether or not run is queued :rtype: boolean """ queue_info = _get_queue_info(ssh_client, host_name, job_id) state = "UNKNOWN" for line in queue_info.splitlines(): if line.strip().startswith("job_state"): state = line.split()[2] break if state != "Q": return False msg = f"{run_id} job {job_id} is queued on {host_name}" logger.info(msg) return True def _is_running(ssh_client, host_name, job_id, run_id, tmp_run_dir, run_info): """ :param :py:class:`paramiko.client.SSHClient` ssh_client: :param str host_name: :param str job_id: :param str run_id: :param :py:class:`pathlib.Path` tmp_run_dir: :param :py:class:`types.SimpleNamespace` run_info: :return: Flag indicating whether or not run is executing :rtype: boolean """ state = "UNKNOWN" queue_info = _get_queue_info(ssh_client, host_name, job_id, ignore_unknown_job=True) for line in queue_info.splitlines(): if line.strip().startswith("job_state"): state = line.split()[2] break if state != "R": return False try: stdout = ssh_sftp.ssh_exec_command( ssh_client, f"cat {tmp_run_dir}/time.step", host_name, logger ) except ssh_sftp.SSHCommandError: # time.step file not found or empty; assume that run is young and it # hasn't been created yet, or has finished and it has been # moved to the results directory logger.info( f"{run_id} on {host_name}: time.step not found; continuing to watch..." ) return True time_step = int(stdout.splitlines()[0].strip()) model_seconds = (time_step - run_info.it000) * run_info.rdt model_time = run_info.date0.shift(seconds=model_seconds).format( "YYYY-MM-DD HH:mm:ss UTC" ) fraction_done = (time_step - run_info.it000) / (run_info.itend - run_info.it000) logger.info( f"{run_id} on {host_name}: timestep: " f"{time_step} = {model_time}, {fraction_done:.1%} complete" ) return True def _get_queue_info(ssh_client, host_name, job_id, ignore_unknown_job=False): """ :param :py:class:`paramiko.client.SSHClient` ssh_client: :param str host_name: :param str job_id: :param boolean ignore_unknown_job: :return: Output from TORQUE/MOAB qstat command that describes the run's state :rtype: str """ try: stdout = ssh_sftp.ssh_exec_command( ssh_client, f"/global/system/torque/bin/qstat -f -1 {job_id}", host_name, logger, ) except ssh_sftp.SSHCommandError as exc: if ignore_unknown_job: if exc.stderr == f"qstat: Unknown Job Id {job_id}.orca2.ibb\n": return "job_state = UNKNOWN\n" for line in exc.stderr.splitlines(): logger.error(line) raise WorkerError return stdout def _get_tmp_run_dir(ssh_client, host_name, scratch_dir, run_id): """ :param :py:class:`paramiko.client.SSHClient` ssh_client: :param str host_name: :param :py:class:`pathlib.Path` scratch_dir: :param str run_id: :return: Temporary run directory :rtype: :py:class:`pathlib.Path` """ stdout = ssh_sftp.ssh_exec_command( ssh_client, f"ls -d {scratch_dir/run_id}_*", host_name, logger ) tmp_run_dir = Path(stdout.splitlines()[0].strip()) logger.debug(f"found tmp run dir: {host_name}:{tmp_run_dir}") return tmp_run_dir def _get_run_info(sftp_client, host_name, tmp_run_dir): """ :param :py:class:`paramiko.sftp_client.SFTPClient` sftp_client: :param str host_name: :param :py:class:`pathlib.Path` tmp_run_dir: :return: Namespace of run timing info: it000: 1st time step number itend: last time step number date0: run start date rdt: time step in seconds :rtype: :py:class:`types.SimpleNamespace` """ with tempfile.NamedTemporaryFile("wt") as namelist_cfg: sftp_client.get(f"{tmp_run_dir}/namelist_cfg", namelist_cfg.name) logger.debug(f"downloaded {host_name}:{tmp_run_dir}/namelist_cfg") namelist = f90nml.read(namelist_cfg.name) run_info = SimpleNamespace( it000=namelist["namrun"]["nn_it000"], itend=namelist["namrun"]["nn_itend"], date0=arrow.get(str(namelist["namrun"]["nn_date0"]), "YYYYMMDD"), rdt=namelist["namdom"]["rn_rdt"], ) return run_info if __name__ == "__main__": main() # pragma: no cover