# Copyright 2013 – present by the SalishSeaCast Project contributors
# and The University of British Columbia
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""SalishSeaCast worker that upload forcing files for NEMO runs.
# SPDX-License-Identifier: Apache-2.0
Upload the forcing files for a nowcast or forecast run to the HPC/cloud
facility where the run will be executed.
"""
import logging
import os
from pathlib import Path
import arrow
from nemo_nowcast import NowcastWorker
from nowcast import ssh_sftp
NAME = "upload_forcing"
logger = logging.getLogger(NAME)
[docs]
def main():
"""Set up and run the worker.
For command-line usage see:
:command:`python -m nowcast.workers.upload_forcing --help`
"""
worker = NowcastWorker(NAME, description=__doc__)
worker.init_cli()
worker.cli.add_argument(
"host_name", help="Name of the host to upload forcing files to"
)
worker.cli.add_argument(
"run_type",
choices={"nowcast+", "forecast2", "ssh", "turbidity"},
help="""
Type of run to upload files for:
'nowcast+' means nowcast & 1st forecast runs,
'forecast2' means 2nd forecast run,
'ssh' means Neah Bay sea surface height files only (for forecast run),
'turbidity' means Fraser River turbidity file only
(for nowcast-green run).
""",
)
worker.cli.add_date_option(
"--run-date",
default=arrow.now().floor("day"),
help="Date of the run to upload files for.",
)
worker.run(upload_forcing, success, failure)
return worker
def success(parsed_args):
logger.info(
f'{parsed_args.run_type} {parsed_args.run_date.format("YYYY-MM-DD")} '
f"forcing files upload to {parsed_args.host_name} completed"
)
msg_type = f"success {parsed_args.run_type}"
return msg_type
def failure(parsed_args):
logger.critical(
f'{parsed_args.run_type} {parsed_args.run_date.format("YYYY-MM-DD")} '
f"forcing files upload to {parsed_args.host_name} failed"
)
msg_type = f"failure {parsed_args.run_type}"
return msg_type
def upload_forcing(parsed_args, config, *args):
host_name = parsed_args.host_name
run_type = parsed_args.run_type
run_date = parsed_args.run_date
ssh_key = Path(
os.environ["HOME"], ".ssh", config["run"]["enabled hosts"][host_name]["ssh key"]
)
host_config = config["run"]["enabled hosts"][host_name]
ssh_client, sftp_client = ssh_sftp.sftp(host_name, ssh_key)
checklist = {
host_name: {
run_type: {
"run date": parsed_args.run_date.format("YYYY-MM-DD"),
"file types": [],
}
}
}
# Neah Bay sea surface height
_upload_ssh_files(sftp_client, run_type, run_date, config, host_name, host_config)
if run_type == "ssh":
sftp_client.close()
ssh_client.close()
checklist[host_name][run_type]["file types"] = ["ssh"]
return checklist
# Rivers turbidity and runoff
if run_type == "turbidity":
_upload_fraser_turbidity_file(
sftp_client, run_date, config, host_name, host_config
)
sftp_client.close()
ssh_client.close()
checklist[host_name][run_type]["file types"] = ["turbidity"]
return checklist
_upload_river_runoff_files(sftp_client, run_date, config, host_name, host_config)
# Weather
_upload_weather(sftp_client, run_type, run_date, config, host_name, host_config)
# Live Ocean Boundary Conditions
_upload_live_ocean_files(
sftp_client, run_type, run_date, config, host_name, host_config
)
sftp_client.close()
ssh_client.close()
checklist[host_name][run_type]["file types"] = [
"ssh",
"rivers",
"weather",
"boundary conditions",
]
return checklist
def _upload_ssh_files(sftp_client, run_type, run_date, config, host_name, host_config):
for day in range(-1, 3):
filename = config["ssh"]["file template"].format(
run_date.shift(days=day).date()
)
dest_dir = "obs" if day == -1 else "fcst"
localpath = Path(config["ssh"]["ssh dir"], dest_dir, filename)
remotepath = Path(host_config["forcing"]["ssh dir"], dest_dir, filename)
try:
ssh_sftp.upload_file(sftp_client, host_name, localpath, remotepath, logger)
except FileNotFoundError:
if dest_dir != "obs":
raise
# obs file does not exist, so create symlink to corresponding
# forecast file
fcst = Path(config["ssh"]["ssh dir"], "fcst", filename)
fcst.symlink_to(localpath)
logger.warning(f"ssh obs file not found; created symlink to {fcst}")
ssh_sftp.upload_file(sftp_client, host_name, localpath, remotepath, logger)
def _upload_fraser_turbidity_file(
sftp_client, run_date, config, host_name, host_config
):
filename_tmpl = config["rivers"]["turbidity"]["file template"]
filename = filename_tmpl.format(run_date.date())
localpath = Path(config["rivers"]["turbidity"]["forcing dir"], filename)
remotepath = Path(host_config["forcing"]["Fraser turbidity dir"], filename)
try:
ssh_sftp.upload_file(sftp_client, host_name, localpath, remotepath, logger)
except FileNotFoundError:
# turbidity file does not exist, so create symlink to persist
# previous day's file
prev_day_fn = filename_tmpl.format(run_date.shift(days=-1).date())
try:
localpath.symlink_to(localpath.with_name(prev_day_fn))
except FileExistsError:
# This probably happens due to a race condition when 2 or more
# upload_forcing workers are running concurrently; see
# https://bitbucket.org/salishsea/salishseanowcast/issues/57
# So, we assume that another instance created the symlink, and
# don't worry.
pass
logger.critical(
f"Fraser River turbidity forcing file not found; "
f"created symlink to {localpath.with_name(prev_day_fn)}"
)
ssh_sftp.upload_file(sftp_client, host_name, localpath, remotepath, logger)
def _upload_river_runoff_files(sftp_client, run_date, config, host_name, host_config):
for tmpl in config["rivers"]["file templates"].values():
filename = tmpl.format(run_date.shift(days=-1).date())
localpath = Path(config["rivers"]["rivers dir"], filename)
remotepath = Path(host_config["forcing"]["rivers dir"], filename)
try:
ssh_sftp.upload_file(sftp_client, host_name, localpath, remotepath, logger)
except FileNotFoundError:
# River runoff file does not exist, so create symlink to
# persist previous day's file.
prev_day_fn = tmpl.format(run_date.shift(days=-2).date())
try:
localpath.symlink_to(localpath.with_name(prev_day_fn))
except FileExistsError:
# This probably happens due to a race condition when 2 or more
# upload_forcing workers are running concurrently.
# So, we assume that another instance created the symlink, and
# don't worry.
pass
logger.critical(
f"Rivers runoff forcing file not found; created symlink to "
f"{localpath.with_name(prev_day_fn)}"
)
ssh_sftp.upload_file(sftp_client, host_name, localpath, remotepath, logger)
def _upload_weather(sftp_client, run_type, run_date, config, host_name, host_config):
if run_type == "nowcast+":
weather_start = 0
else:
weather_start = 1
for day in range(weather_start, 3):
filename = config["weather"]["file template"].format(
run_date.shift(days=day).date()
)
dest_dir = "" if day == 0 else "fcst"
localpath = Path(config["weather"]["ops dir"], dest_dir, filename)
remotepath = Path(host_config["forcing"]["weather dir"], dest_dir, filename)
ssh_sftp.upload_file(sftp_client, host_name, localpath, remotepath, logger)
def _upload_live_ocean_files(
sftp_client, run_type, run_date, config, host_name, host_config
):
filename = config["temperature salinity"]["file template"].format(run_date.date())
localpath = Path(config["temperature salinity"]["bc dir"], filename)
remotepath = Path(host_config["forcing"]["bc dir"], filename)
try:
ssh_sftp.upload_file(sftp_client, host_name, localpath, remotepath, logger)
except FileNotFoundError:
# Boundary condition file does not exist, so create symlink to
# persist previous day's file.
# This happens as a matter of course for forecast2 runs because
# they run before the day's LiveOcean product is available,
# but for other run types it is a cause for concern.
prev_day_fn = config["temperature salinity"]["file template"].format(
run_date.shift(days=-1).date()
)
try:
localpath.symlink_to(localpath.with_name(prev_day_fn))
except FileExistsError:
# This probably happens due to a race condition when 2 or more
# upload_forcing workers are running concurrently; see
# https://bitbucket.org/salishsea/salishseanowcast/issues/57
# So, we assume that another instance created the symlink, and
# don't worry.
pass
logging_level = logging.INFO if run_type == "forecast2" else logging.CRITICAL
logger.log(
logging_level,
f"LiveOcean boundary conditions file not found; "
f"created symlink to {localpath.with_name(prev_day_fn)}",
)
ssh_sftp.upload_file(sftp_client, host_name, localpath, remotepath, logger)
if __name__ == "__main__":
main() # pragma: no cover