# Copyright 2013 – present by the SalishSeaCast Project contributors
# and The University of British Columbia
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""SalishSeaCast worker that monitors and reports on the progress of a
NEMO hindcast run on an HPC cluster that uses the SLURM scheduler.
"""
import logging
import os
import tempfile
import time
from pathlib import Path
import arrow
import attr
import f90nml
from nemo_nowcast import NowcastWorker, WorkerError
from nowcast import ssh_sftp
NAME = "watch_NEMO_hindcast"
logger = logging.getLogger(NAME)
[docs]
def main():
"""Set up and run the worker.
For command-line usage see:
:command:`python -m nowcast.workers.watch_NEMO_hindcast --help`
"""
worker = NowcastWorker(NAME, description=__doc__)
worker.init_cli()
worker.cli.add_argument("host_name", help="Name of the host to monitor the run on")
worker.cli.add_argument("--run-id", help="Run id to watch; e.g. 01dec14hindcast")
worker.run(watch_NEMO_hindcast, success, failure)
def success(parsed_args):
"""
:param :py:class:`argparse.Namespace` parsed_args:
:return: Nowcast system message type
:rtype: str
"""
logger.info(f"NEMO hindcast run on {parsed_args.host_name} watcher terminated")
msg_type = "success"
return msg_type
def failure(parsed_args):
"""
:param :py:class:`argparse.Namespace` parsed_args:
:return: Nowcast system message type
:rtype: str
"""
logger.critical(f"NEMO hindcast run on {parsed_args.host_name} watcher failed")
msg_type = "failure"
return msg_type
def watch_NEMO_hindcast(parsed_args, config, *args):
"""
:param :py:class:`argparse.Namespace` parsed_args:
:param :py:class:`nemo_nowcast.Config` config:
:return: Nowcast system checklist items
:rtype: dict
"""
host_name = parsed_args.host_name
run_id = parsed_args.run_id
ssh_key = Path(
os.environ["HOME"],
".ssh",
config["run"]["hindcast hosts"][host_name]["ssh key"],
)
users = config["run"]["hindcast hosts"][host_name]["users"]
scratch_dir = Path(config["run"]["hindcast hosts"][host_name]["scratch dir"])
hpc_job_classes = {"qstat": _QstatHindcastJob, "squeue": _SqueueHindcastJob}
queue_info_cmd = config["run"]["hindcast hosts"][host_name][
"queue info cmd"
].rsplit("/", 1)[-1]
try:
ssh_client, sftp_client = ssh_sftp.sftp(host_name, ssh_key)
job = hpc_job_classes[queue_info_cmd](
ssh_client, sftp_client, host_name, users, scratch_dir, run_id
)
job.get_run_id()
while job.is_queued():
time.sleep(60 * 5)
job.get_tmp_run_dir()
job.get_run_info()
while job.is_running():
time.sleep(60 * 5)
while True:
completion_state = job.get_completion_state()
if completion_state == "completed":
break
if completion_state in {"cancelled", "aborted"}:
raise WorkerError
time.sleep(60)
finally:
sftp_client.close()
ssh_client.close()
checklist = {
"hindcast": {
"host": job.host_name,
"run id": job.run_id,
"run date": arrow.get(job.run_id[:7], "DDMMMYY").format("YYYY-MM-DD"),
"completed": completion_state == "completed",
}
}
return checklist
@attr.s
class _QstatHindcastJob:
"""Interact with the hindcast job on an HPC host that uses :command:`qstat`."""
ssh_client = attr.ib()
sftp_client = attr.ib()
host_name = attr.ib(type=str)
users = attr.ib()
scratch_dir = attr.ib(type=Path)
run_id = attr.ib(default=None, type=str)
job_id = attr.ib(default=None, type=str)
tmp_run_dir = attr.ib(default=None, type=Path)
it000 = attr.ib(default=None, type=int)
itend = attr.ib(default=None, type=int)
date0 = attr.ib(default=None, type=arrow.Arrow)
rdt = attr.ib(default=None, type=float)
def get_run_id(self):
"""Query the queue manager to get the job id, and the salishsea run id
of the hindcast run.
"""
queue_info = self._get_queue_info()
self.job_id, _, _, self.run_id = queue_info.split()[:4]
self.job_id = self.job_id.rsplit(".", 2)[0]
logger.info(f"watching {self.run_id} job {self.job_id} on {self.host_name}")
def is_queued(self):
"""Query the queue manager to get the state of the hindcast run.
:return: Flag indicating whether or not run is queued
:rtype: boolean
"""
queue_info = self._get_queue_info()
try:
state = queue_info.split()[9]
except AttributeError:
# job has disappeared from the queue; maybe cancelled
logger.error(
f"{self.run_id} job {self.job_id} not found on {self.host_name} queue"
)
raise WorkerError
if state != "Q":
return False
logger.info(f"{self.run_id} job {self.job_id} is queued")
return True
def get_tmp_run_dir(self):
"""Query the HPC host file system to get the temporary run directory of the
hindcast job.
"""
cmd = f"ls -d {self.scratch_dir/self.run_id}*"
stdout = self._ssh_exec_command(cmd)
self.tmp_run_dir = Path(stdout.splitlines()[0].strip())
logger.debug(f"found tmp run dir: {self.host_name}:{self.tmp_run_dir}")
def get_run_info(self):
"""Download the hindcast job namelist_cfg file from the HPC host and extract
NEMO run parameters from it:
* it000: starting time step number
* itend: ending time step number
* date0: calendar date of the start of the run
* rdt: baroclinic time step
"""
with tempfile.NamedTemporaryFile("wt") as namelist_cfg:
self.sftp_client.get(f"{self.tmp_run_dir}/namelist_cfg", namelist_cfg.name)
logger.debug(f"downloaded {self.host_name}:{self.tmp_run_dir}/namelist_cfg")
namelist = f90nml.read(namelist_cfg.name)
self.it000 = namelist["namrun"]["nn_it000"]
self.itend = namelist["namrun"]["nn_itend"]
self.date0 = arrow.get(str(namelist["namrun"]["nn_date0"]), "YYYYMMDD")
self.rdt = namelist["namdom"]["rn_rdt"]
logger.debug(
f"{self.run_id} on {self.host_name}: "
f"it000={self.it000}, itend={self.itend}, date0={self.date0}, rdt={self.rdt}"
)
def is_running(self):
"""Query the queue manager to get the state of the hindcast run.
While the job is running, report its progress via a log message.
If one or more "E R R O R" lines are found in the ocean.output file,
cancel the job.
:return: Flag indicating whether or not run is in R state
:rtype: boolean
"""
if self._get_job_state() != "R":
return False
# Keep checking until we find a time.step file
try:
time_step_file = ssh_sftp.ssh_exec_command(
self.ssh_client,
f"cat {self.tmp_run_dir}/time.step",
self.host_name,
logger,
)
except ssh_sftp.SSHCommandError:
logger.info(
f"{self.run_id} on {self.host_name}: time.step not found; continuing to watch..."
)
return True
self._report_progress(time_step_file)
# grep ocean.output file for "E R R O R" lines
try:
ocean_output_errors = ssh_sftp.ssh_exec_command(
self.ssh_client,
f"grep 'E R R O R' {self.tmp_run_dir}/ocean.output",
self.host_name,
logger,
)
except ssh_sftp.SSHCommandError:
logger.error(f"{self.run_id} on {self.host_name}: ocean.output not found")
return False
error_lines = ocean_output_errors.splitlines()
if not error_lines:
return True
# Cancel run if "E R R O R" in ocean.output
logger.error(
f"{self.run_id} on {self.host_name}: "
f"found {len(error_lines)} 'E R R O R' line(s) in ocean.output"
)
cmd = f"/usr/bin/qdel {self.job_id}"
self._ssh_exec_command(
cmd, f"{self.run_id} on {self.host_name}: cancelled {self.job_id}"
)
return False
def _get_job_state(self):
"""Query the queue manger to get the state of the hindcast run.
:return: Run state reported by queue manager or "UNKNOWN" if the job is not on the queue.
:rtype: str
"""
try:
queue_info = self._get_queue_info()
state = queue_info.split()[9]
except (WorkerError, AttributeError):
# job has disappeared from the queue; finished or cancelled
logger.info(
f"{self.run_id} job {self.job_id} not found on {self.host_name} queue"
)
state = "UNKNOWN"
return state
def _report_progress(self, time_step_file):
"""Calculate and log run progress based on value in time.step file."""
time_step = int(time_step_file.splitlines()[0].strip())
model_seconds = (time_step - self.it000) * self.rdt
model_time = self.date0.shift(seconds=model_seconds).format(
"YYYY-MM-DD HH:mm:ss UTC"
)
fraction_done = (time_step - self.it000) / (self.itend - self.it000)
logger.info(
f"{self.run_id} on {self.host_name}: timestep: "
f"{time_step} = {model_time}, {fraction_done:.1%} complete"
)
def get_completion_state(self):
"""TORQUE/MOAB doesn't provide a way to query resource use records for the completion
state of the hindcast run, so the best we can do is assume that it completed.
:return: Completion state of the run: "completed"
:rtype: str
"""
return "completed"
def _ssh_exec_command(self, cmd, success_msg="", accept_stderr=""):
"""Execute cmd on the HPC host, returning its stdout.
If cmd is successful, and success_msg is provided, log success_msg at the
INFO level.
If cmd fails, log stderr from the HPC host at the ERROR level, and raise
WorkerError.
:param str cmd:
:param str success_msg:
:raise: WorkerError
:return: Standard output from the executed command.
:rtype: str with newline separators
"""
try:
stdout = ssh_sftp.ssh_exec_command(
self.ssh_client, cmd, self.host_name, logger
)
if success_msg:
logger.info(success_msg)
return stdout
except ssh_sftp.SSHCommandError as exc:
for line in exc.stderr.splitlines():
if line.startswith(accept_stderr):
return exc.stdout
logger.error(line)
raise WorkerError
def _get_queue_info(self):
"""Query the queue manager to get the state of the hindcast run.
:return: None or 1 line of output from queue info command that describes
the run's state
:rtype: str
"""
squeue_cmd = "/usr/bin/qstat -a"
cmd = (
f"{squeue_cmd} -u {self.users}"
if self.job_id is None
else f"{squeue_cmd} {self.job_id}"
)
stdout = self._ssh_exec_command(cmd, accept_stderr="qstat: Unknown Job Id")
if not stdout:
if self.job_id is None:
logger.error(f"no jobs found on {self.host_name} queue")
raise WorkerError
else:
# Various callers handle job id not on queue in difference ways
return
for queue_info in stdout.splitlines()[5:]:
queue_info_parts = queue_info.strip().split()
run_id, state = queue_info_parts[3], queue_info_parts[9]
if state == "C":
continue
if self.run_id is not None:
if self.run_id == run_id:
return queue_info.strip()
else:
if "hindcast" in run_id:
return queue_info.strip()
@attr.s
class _SqueueHindcastJob:
"""Interact with the hindcast job on an HPC host that uses :command:`squeue`."""
ssh_client = attr.ib()
sftp_client = attr.ib()
host_name = attr.ib(type=str)
users = attr.ib()
scratch_dir = attr.ib(type=Path)
run_id = attr.ib(default=None, type=str)
job_id = attr.ib(default=None, type=str)
tmp_run_dir = attr.ib(default=None, type=Path)
it000 = attr.ib(default=None, type=int)
itend = attr.ib(default=None, type=int)
date0 = attr.ib(default=None, type=arrow.Arrow)
rdt = attr.ib(default=None, type=float)
def get_run_id(self):
"""Query the queue manager to get the job id, and the salishsea run id
of the hindcast run.
"""
queue_info = self._get_queue_info()
self.job_id, self.run_id = queue_info.split()[:2]
logger.info(f"watching {self.run_id} job {self.job_id} on {self.host_name}")
def is_queued(self):
"""Query the queue manager to get the state of the hindcast run.
:return: Flag indicating whether or not run is queued in PENDING state
:rtype: boolean
"""
queue_info = self._get_queue_info()
try:
state, reason, start_time = queue_info.split()[2:]
except AttributeError:
# job has disappeared from the queue; maybe cancelled
logger.error(
f"{self.run_id} job {self.job_id} not found on {self.host_name} queue"
)
raise WorkerError
if state != "PENDING":
return False
msg = f"{self.run_id} job {self.job_id} pending due to {reason.lower()}"
if start_time != "N/A":
msg = f"{msg}, scheduled for {start_time}"
logger.info(msg)
return True
def get_tmp_run_dir(self):
"""Query the HPC host file system to get the temporary run directory of the
hindcast job.
"""
cmd = f"ls -d {self.scratch_dir/self.run_id}*"
stdout = self._ssh_exec_command(cmd)
self.tmp_run_dir = Path(stdout.splitlines()[0].strip())
logger.debug(f"found tmp run dir: {self.host_name}:{self.tmp_run_dir}")
def get_run_info(self):
"""Download the hindcast job namelist_cfg file from the HPC host and extract
NEMO run parameters from it:
* it000: starting time step number
* itend: ending time step number
* date0: calendar date of the start of the run
* rdt: baroclinic time step
"""
with tempfile.NamedTemporaryFile("wt") as namelist_cfg:
self.sftp_client.get(f"{self.tmp_run_dir}/namelist_cfg", namelist_cfg.name)
logger.debug(f"downloaded {self.host_name}:{self.tmp_run_dir}/namelist_cfg")
namelist = f90nml.read(namelist_cfg.name)
self.it000 = namelist["namrun"]["nn_it000"]
self.itend = namelist["namrun"]["nn_itend"]
self.date0 = arrow.get(str(namelist["namrun"]["nn_date0"]), "YYYYMMDD")
self.rdt = namelist["namdom"]["rn_rdt"]
logger.debug(
f"{self.run_id} on {self.host_name}: "
f"it000={self.it000}, itend={self.itend}, date0={self.date0}, rdt={self.rdt}"
)
def is_running(self):
"""Query the queue manager to get the state of the hindcast run.
While the job is running, report its progress via a log message.
If one or more "E R R O R" lines are found in the ocean.output file,
cancel the job.
If exactly one "E R R O R" line is found, assume that the run got "stuck" and
handle it accordingly.
:return: Flag indicating whether or not run is in RUNNING state
:rtype: boolean
"""
if self._get_job_state() != "RUNNING":
return False
# Keep checking until we find a time.step file
try:
time_step_file = ssh_sftp.ssh_exec_command(
self.ssh_client,
f"cat {self.tmp_run_dir}/time.step",
self.host_name,
logger,
)
except ssh_sftp.SSHCommandError:
logger.info(
f"{self.run_id} on {self.host_name}: time.step not found; continuing to watch..."
)
return True
self._report_progress(time_step_file)
# grep ocean.output file for "E R R O R" lines
try:
ocean_output_errors = ssh_sftp.ssh_exec_command(
self.ssh_client,
f"grep 'E R R O R' {self.tmp_run_dir}/ocean.output",
self.host_name,
logger,
)
except ssh_sftp.SSHCommandError:
logger.error(f"{self.run_id} on {self.host_name}: ocean.output not found")
return False
error_lines = ocean_output_errors.splitlines()
if not error_lines:
return True
# Cancel run if "E R R O R" in ocean.output
logger.error(
f"{self.run_id} on {self.host_name}: "
f"found {len(error_lines)} 'E R R O R' line(s) in ocean.output"
)
cmd = f"/opt/software/slurm/bin/scancel {self.job_id}"
self._ssh_exec_command(
cmd, f"{self.run_id} on {self.host_name}: cancelled {self.job_id}"
)
if len(error_lines) != 1:
# More than 1 "E R R O R" line mean the run failed irrevocably
return False
# Exactly 1 "E R R O R" line means the run is "stuck" and it can be re-queued
self._handle_stuck_job()
while self.is_queued():
time.sleep(60 * 5)
self.get_tmp_run_dir()
self.get_run_info()
return True
def _get_job_state(self):
"""Query the queue manager to get the state of the hindcast run.
:return: Run state reported by queue manager or "UNKNOWN" if the job is not on the queue.
:rtype: str
"""
try:
queue_info = self._get_queue_info()
state = queue_info.split()[2]
except (WorkerError, AttributeError):
# job has disappeared from the queue; finished or cancelled
logger.info(
f"{self.run_id} job {self.job_id} not found on {self.host_name} queue"
)
state = "UNKNOWN"
return state
def _report_progress(self, time_step_file):
"""Calculate and log run progress based on value in time.step file."""
time_step = int(time_step_file.splitlines()[0].strip())
model_seconds = (time_step - self.it000) * self.rdt
model_time = self.date0.shift(seconds=model_seconds).format(
"YYYY-MM-DD HH:mm:ss UTC"
)
fraction_done = (time_step - self.it000) / (self.itend - self.it000)
logger.info(
f"{self.run_id} on {self.host_name}: timestep: "
f"{time_step} = {model_time}, {fraction_done:.1%} complete"
)
def _handle_stuck_job(self):
"""Exactly 1 "E R R O R" line is usually a symptom of a run that got stuck
because a processor was unable to read from a forcing file but NEMO didn't
bubble the error up to cause the run to fail, so the run will time out
with no further advancement of the time step.
So, we re-queue the run for another try, then we re-queue the next hindcast run
(if we find its temporary run directory) with a dependency on the re-queued
stuck run.
"""
# Re-queue the stuck run and update slurm run id
sbatch = f"/opt/software/slurm/bin/sbatch"
cmd = f"{sbatch} {self.tmp_run_dir}/SalishSeaNEMO.sh"
self._ssh_exec_command(cmd, f"{self.run_id} on {self.host_name}: re-queued")
self.job_id = None
self.get_run_id()
# Find next run, and requeue it with afterok dependence on newly queued run
cmd = f"ls -dtr {self.scratch_dir}/*hindcast*"
stdout = self._ssh_exec_command(cmd)
next_tmp_run_dir = Path(stdout.splitlines()[0].strip())
next_run_id = next_tmp_run_dir.name[:15]
logger.debug(f"found next run tmp run dir: {self.host_name}:{next_tmp_run_dir}")
cmd = f"{sbatch} -d afterok:{self.job_id} {next_tmp_run_dir}/SalishSeaNEMO.sh"
self._ssh_exec_command(cmd, f"{next_run_id} on {self.host_name}: re-queued")
def get_completion_state(self):
"""Query the slurm resource use records to get the completion state of the
hindcast run.
:return: Completion state of the run: "unknown", "completed", "cancelled",
or "aborted".
:rtype: str
"""
sacct_cmd = f"/opt/software/slurm/bin/sacct --user {self.users}"
cmd = f"{sacct_cmd} --job {self.job_id}.batch --format=state"
stdout = self._ssh_exec_command(cmd)
if len(stdout.splitlines()) == 2:
logger.debug(
f"{self.job_id} batch step not found in saact report; continuing to look..."
)
return "unknown"
state = stdout.splitlines()[2].strip()
logger.info(f"{self.run_id} on {self.host_name}: {state}")
if state in {"COMPLETED", "CANCELLED"}:
return state.lower()
return "aborted"
def _ssh_exec_command(self, cmd, success_msg=""):
"""Execute cmd on the HPC host, returning its stdout.
If cmd is successful, and success_msg is provided, log success_msg at the
INFO level.
If cmd fails, log stderr from the HPC host at the ERROR level, and raise
WorkerError.
:param str cmd:
:param str success_msg:
:raise: WorkerError
:return: Standard output from the executed command.
:rtype: str with newline separators
"""
try:
stdout = ssh_sftp.ssh_exec_command(
self.ssh_client, cmd, self.host_name, logger
)
if success_msg:
logger.info(success_msg)
return stdout
except ssh_sftp.SSHCommandError as exc:
for line in exc.stderr.splitlines():
logger.error(line)
raise WorkerError
def _get_queue_info(self):
"""Query the queue manager to get the state of the hindcast run.
:return: None or 1 line of output from queue info command that describes
the run's state
:rtype: str
"""
squeue_cmd = f"/opt/software/slurm/bin/squeue --user {self.users}"
queue_info_format = '--Format "jobid,name,state,reason,starttime"'
cmd = (
f"{squeue_cmd} {queue_info_format}"
if self.job_id is None
else f"{squeue_cmd} --job {self.job_id} {queue_info_format}"
)
stdout = self._ssh_exec_command(cmd)
if len(stdout.splitlines()) == 1:
if self.job_id is None:
logger.error(f"no jobs found on {self.host_name} queue")
raise WorkerError
else:
# Various callers handle job id not on queue in difference ways
return
for queue_info in stdout.splitlines()[1:]:
if self.run_id is not None:
if self.run_id in queue_info.strip().split()[1]:
return queue_info.strip()
else:
if "hindcast" in queue_info.strip().split()[1]:
return queue_info.strip()
if __name__ == "__main__":
main() # pragma: no cover