Source code for nowcast.workers.make_forcing_links

#  Copyright 2013 – present by the SalishSeaCast Project contributors
#  and The University of British Columbia
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
"""SalishSeaCast worker that creates forcing files symlinks for NEMO runs.

Create the forcing file symlinks for a nowcast run on the HPC/cloud
facility where the run will be executed.
"""
import logging
import os
import shutil
from pathlib import Path

import arrow
from nemo_nowcast import NowcastWorker

from nowcast import ssh_sftp

NAME = "make_forcing_links"
logger = logging.getLogger(NAME)


[docs] def main(): """Set up and run the worker. For command-line usage see: :command:`python -m nowcast.workers.make_forcing_links --help` """ worker = NowcastWorker(NAME, description=__doc__) worker.init_cli() worker.cli.add_argument( "host_name", help="Name of the host to symlink forcing files on" ) worker.cli.add_argument( "run_type", choices={"nowcast+", "forecast2", "ssh", "nowcast-green", "nowcast-agrif"}, help=""" Type of run to symlink files for: 'nowcast+' means nowcast & 1st forecast runs, 'forecast2' means 2nd forecast run, 'ssh' means Neah Bay sea surface height files only (for forecast run), 'nowcast-green' means nowcast green ocean run, 'nowcast-agrif' means nowcast green ocean run with AGRIF sub-domains. """, ) worker.cli.add_argument( "--shared-storage", action="store_true", help=""" If running on a machine (Salish) that directly accesses the repo datafiles, copy the forcing files instead of symlinking them so that they do not get changed as a result of preparations for faster runs on remote hosts """, ) worker.cli.add_date_option( "--run-date", default=(arrow.now().floor("day")), help="Date of the run to symlink files for.", ) worker.run(make_forcing_links, success, failure) return worker
def success(parsed_args): logger.info( f'{parsed_args.run_type} {parsed_args.run_date.format("YYYY-MM-DD")} ' f"forcing file links on {parsed_args.host_name} created" ) msg_type = f"success {parsed_args.run_type}" return msg_type def failure(parsed_args): logger.critical( f'{parsed_args.run_type} {parsed_args.run_date.format("YYYY-MM-DD")} ' f"forcing file links creation on {parsed_args.host_name} failed" ) msg_type = f"failure {parsed_args.run_type}" return msg_type def make_forcing_links(parsed_args, config, *args): host_name = parsed_args.host_name run_type = parsed_args.run_type run_date = parsed_args.run_date shared_storage = parsed_args.shared_storage ssh_key = Path( os.environ["HOME"], ".ssh", config["run"]["enabled hosts"][host_name]["ssh key"] ) ssh_client, sftp_client = ssh_sftp.sftp(host_name, ssh_key) _make_NeahBay_ssh_links(sftp_client, run_date, config, host_name, shared_storage) if run_type == "ssh": sftp_client.close() ssh_client.close() checklist = { host_name: { "links": f"{parsed_args.run_type} " f'{parsed_args.run_date.format("YYYY-MM-DD")} ssh', "run date": parsed_args.run_date.format("YYYY-MM-DD"), } } return checklist _make_runoff_links(sftp_client, run_type, run_date, config, host_name) _make_weather_links(sftp_client, run_date, config, host_name, run_type) _make_live_ocean_links(sftp_client, run_date, config, host_name, shared_storage) sftp_client.close() ssh_client.close() checklist = { host_name: { "links": f"{parsed_args.run_type} " f'{parsed_args.run_date.format("YYYY-MM-DD")} ' f"ssh rivers weather LiveOcean ", "run date": parsed_args.run_date.format("YYYY-MM-DD"), } } return checklist def _make_NeahBay_ssh_links(sftp_client, run_date, config, host_name, shared_storage): host_config = config["run"]["enabled hosts"][host_name] run_prep_dir = Path(host_config["run prep dir"]) _clear_links(sftp_client, run_prep_dir, "ssh") for day in range(-1, 3): filename = config["ssh"]["file template"].format( run_date.shift(days=day).date() ) src = Path(host_config["forcing"]["ssh dir"], "fcst", filename) dest = run_prep_dir / "ssh" / filename if shared_storage: shutil.copy2(src, dest) logger.debug(f"{src} copied to {dest} on {host_name}") else: _create_symlink(sftp_client, host_name, src, dest) def _make_runoff_links(sftp_client, run_type, run_date, config, host_name): host_config = config["run"]["enabled hosts"][host_name] run_prep_dir = Path(host_config["run prep dir"]) _clear_links(sftp_client, run_prep_dir, "rivers") for tmpl in config["rivers"]["file templates"].values(): src = Path( host_config["forcing"]["rivers dir"], tmpl.format(run_date.shift(days=-1).date()), ) for day in range(-1, 3): filename = tmpl.format(run_date.shift(days=day).date()) dest = run_prep_dir / "rivers" / filename _create_symlink(sftp_client, host_name, src, dest) if run_type in {"nowcast-green", "nowcast-agrif"}: src = Path( host_config["forcing"]["Fraser turbidity dir"], config["rivers"]["turbidity"]["file template"].format(run_date.date()), ) dest = run_prep_dir / "rivers" / src.name _create_symlink(sftp_client, host_name, src, dest) def _make_weather_links(sftp_client, run_date, config, host_name, run_type): host_config = config["run"]["enabled hosts"][host_name] run_prep_dir = Path(host_config["run prep dir"]) _clear_links(sftp_client, run_prep_dir, "NEMO-atmos/") NEMO_atmos_dir = run_prep_dir / "NEMO-atmos" nowcast_runs = {"nowcast+", "nowcast-green", "nowcast-agrif"} weather_start = -1 if run_type in nowcast_runs else 0 for day in range(weather_start, 3): filename = config["weather"]["file template"].format( run_date.shift(days=day).date() ) if run_type in nowcast_runs: dir = "" if day <= 0 else "fcst" else: dir = "fcst" src = Path(host_config["forcing"]["weather dir"], dir, filename) dest = NEMO_atmos_dir / filename _create_symlink(sftp_client, host_name, src, dest) def _make_live_ocean_links(sftp_client, run_date, config, host_name, shared_storage): host_config = config["run"]["enabled hosts"][host_name] run_prep_dir = Path(host_config["run prep dir"]) dest_path = Path("LiveOcean") _clear_links(sftp_client, run_prep_dir, dest_path) filename_template = config["temperature salinity"]["file template"] filename = filename_template.format(run_date.date()) bc_dir = Path(host_config["forcing"]["bc dir"]) src = bc_dir / filename for day in range(-1, 3): filename = filename_template.format(run_date.shift(days=day).date()) src = bc_dir / filename if day <= 0 else src dest = run_prep_dir / dest_path / filename if shared_storage: shutil.copy2(os.fspath(src), os.fspath(dest)) else: _create_symlink(sftp_client, host_name, src, dest) def _clear_links(sftp_client, run_prep_dir, forcing_dir): links_dir = run_prep_dir / forcing_dir for linkname in sftp_client.listdir(os.fspath(links_dir)): sftp_client.unlink(os.fspath(links_dir / linkname)) logger.debug(f"{links_dir} symlinks cleared") def _create_symlink(sftp_client, host_name, src, dest): sftp_client.symlink(os.fspath(src), os.fspath(dest)) logger.debug(f"{src} symlinked as {dest} on {host_name}") if __name__ == "__main__": main() # pragma: no cover