Source code for nowcast.workers.download_results

#  Copyright 2013 – present by the SalishSeaCast Project contributors
#  and The University of British Columbia
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

# SPDX-License-Identifier: Apache-2.0


"""SalishSeaCast worker that downloads the results files from an HPC/cloud facility run
to archival storage.
"""

import logging
import os
import shlex
from pathlib import Path

import arrow
from nemo_nowcast import NowcastWorker, WorkerError

from nowcast import lib

NAME = "download_results"
logger = logging.getLogger(NAME)


[docs] def main(): """For command-line usage see: :command:`python -m nowcast.workers.download_results --help` """ worker = NowcastWorker(NAME, description=__doc__) worker.init_cli() worker.cli.add_argument( "host_name", help="Name of the host to download results files from" ) worker.cli.add_argument( "run_type", choices={ "nowcast", "nowcast-green", "forecast", "forecast2", "hindcast", "nowcast-agrif", }, help="Type of run to download results files from.", ) worker.cli.add_argument( "--dest-host", default="localhost", help="Name of the host to download results files to. Default is ``localhost``.", ) worker.cli.add_date_option( "--run-date", default=arrow.now().floor("day"), help="Date of the run to download results files from.", ) worker.run(download_results, success, failure) return worker
def success(parsed_args): logger.info( f'{parsed_args.run_type} {parsed_args.run_date.format("YYYY-MM-DD")} ' f"results files from {parsed_args.host_name} downloaded" ) msg_type = f"success {parsed_args.run_type}" return msg_type def failure(parsed_args): logger.critical( f'{parsed_args.run_type} {parsed_args.run_date.format("YYYY-MM-DD")} ' f"results files download from {parsed_args.host_name} failed" ) msg_type = f"failure {parsed_args.run_type}" return msg_type def download_results(parsed_args, config, *args): host_name = parsed_args.host_name run_type = parsed_args.run_type dest_host = parsed_args.dest_host run_date = parsed_args.run_date try: try: # Hindcast special case 1st due to hindcast host in enabled hosts # with empty run types collection to enable forcing uploads host_config = config["run"]["hindcast hosts"][host_name] except KeyError: host_config = config["run"]["enabled hosts"][host_name] except KeyError: logger.critical(f"unrecognized host: {host_name}") raise WorkerError results_dir = run_date.format("DDMMMYY").lower() run_type_results = Path(host_config["run types"][run_type]["results"]) src_dir = run_type_results / results_dir src = f"{host_name}:{src_dir}" try: dest = Path(config["results archive"][run_type]) except TypeError: dest_path = Path(config["results archive"][run_type][dest_host]) dest = dest_path if dest_host == "localhost" else f"{dest_host}:{dest_path}" logger.info(f"downloading results from {src} to {dest}") cmd = shlex.split(f"scp -pr {src} {dest}") lib.run_in_subprocess(cmd, logger.debug, logger.error) checklist = {run_type: {"run date": run_date.format("YYYY-MM-DD")}} if dest_host == "localhost": results_archive_dir = _tidy_localhost(run_type, dest, results_dir, config) for freq in "1h 1d".split(): checklist[run_type][freq] = list( map(os.fspath, results_archive_dir.glob(f"*SalishSea_{freq}_*.nc")) ) else: checklist[run_type]["destination"] = dest return checklist def _tidy_localhost(run_type, dest, results_dir, config): results_archive_dir = dest / results_dir lib.fix_perms( results_archive_dir, mode=int(lib.FilePerms(user="rwx", group="rwx", other="rx")), grp_name=config["file group"], ) for filepath in results_archive_dir.glob("*"): lib.fix_perms(filepath, grp_name=config["file group"]) return results_archive_dir if __name__ == "__main__": main() # pragma: no cover