# Copyright 2013 – present by the SalishSeaCast Project contributors
# and The University of British Columbia
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""SalishSeaCast worker that builds a new directory of symlinks to model results files
for the rolling forecast datasets and replaces the previous rolling forecast directory
with the new one.
"""
import logging
import os
import shlex
import shutil
import subprocess
from pathlib import Path
import arrow
from nemo_nowcast import NowcastWorker
NAME = "update_forecast_datasets"
logger = logging.getLogger(NAME)
[docs]
def main():
"""Set up and run the worker.
For command-line usage see:
:command:`python -m nowcast.workers.update_forecast_datasets -h`
"""
worker = NowcastWorker(NAME, description=__doc__)
worker.init_cli()
worker.cli.add_argument(
"model",
choices={"fvcom", "nemo", "wwatch3"},
help="""
Model to update the rolling forecast datasets for:
'fvcom' means the Vancouver Harbour Fraser River (VHFR) FVCOM model,
'nemo' means the SalishSeaCast NEMO model,
'wwatch3' means the Strait of Georgia WaveWatch3(TM) model.
""",
)
worker.cli.add_argument(
"run_type",
choices={"forecast", "forecast2"},
help="""
Type of run to update rolling forecast datasets for:
'forecast' means afternoon updated forecast runs,
'forecast2' means early morning preliminary forecast runs,
""",
)
worker.cli.add_date_option(
"--run-date",
default=(arrow.now().floor("day")),
help="Date of the run to update rolling forecast datasets for.",
)
worker.run(update_forecast_datasets, success, failure)
return worker
def success(parsed_args):
model = parsed_args.model
run_type = parsed_args.run_type
ymd = parsed_args.run_date.format("YYYY-MM-DD")
logger.info(f"{model} {ymd} {run_type} rolling forecast datasets updated")
msg_type = f"success {model} {run_type}"
return msg_type
def failure(parsed_args):
model = parsed_args.model
run_type = parsed_args.run_type
ymd = parsed_args.run_date.format("YYYY-MM-DD")
logger.critical(f"{model} {ymd} {run_type} rolling forecast datasets update failed")
msg_type = f"failure {model} {run_type}"
return msg_type
def update_forecast_datasets(parsed_args, config, *args):
model = parsed_args.model
run_type = parsed_args.run_type
run_date = parsed_args.run_date
updated_dirs = []
try:
most_recent_fcst_dir = Path(
config["rolling forecasts"][model]["most recent forecast dir"]
)
_symlink_most_recent_forecast(
run_date, most_recent_fcst_dir, model, run_type, config
)
updated_dirs.append(os.fspath(most_recent_fcst_dir))
except KeyError:
# no most recent forecast dir for NEMO, and that's okay
pass
try:
forecast_dir = Path(config["rolling forecasts"][model]["dest dir"])
_update_rolling_forecast(run_date, forecast_dir, model, run_type, config)
updated_dirs.append(os.fspath(forecast_dir))
except KeyError:
# no rolling forecast dir for VHFR FVCOM, and that's okay
pass
checklist = {model: {run_type: updated_dirs}}
return checklist
def _symlink_most_recent_forecast(
run_date, most_recent_fcst_dir, model, run_type, config
):
ddmmmyy = run_date.format("DDMMMYY").lower()
logger.info(
f"updating {model} most_recent_forecast directory from {run_type}/{ddmmmyy} run"
)
for f in most_recent_fcst_dir.iterdir():
f.unlink()
logger.debug(f"deleted previous forecast symlinks from {most_recent_fcst_dir}")
runs = {"fvcom": "vhfr fvcom runs", "wwatch3": "wave forecasts"}
results_archive = {
"nemo": Path(config["results archive"]["nowcast"]),
"fvcom": Path(config[runs["fvcom"]]["results archive"]["forecast x2"]),
"wwatch3": Path(config[runs["wwatch3"]]["results archive"][run_type]),
}[model]
for f in (results_archive / ddmmmyy).glob("*.nc"):
if "restart" not in f.name:
(most_recent_fcst_dir / f.name).symlink_to(f)
logger.debug(
f"symlinked *.nc files from {results_archive/ddmmmyy} in to {most_recent_fcst_dir}"
)
logger.info(
f"updated {model} most_recent_forecast directory from {run_type}/{ddmmmyy} run"
)
def _update_rolling_forecast(run_date, forecast_dir, model, run_type, config):
ddmmmyy = run_date.format("DDMMMYY").lower()
logger.info(f"updating {model} forecast directory for {run_type}/{ddmmmyy} run")
new_forecast_dir = _create_new_forecast_dir(forecast_dir, model, run_type)
days_from_past = config["rolling forecasts"]["days from past"]
tmp_forecast_results_archive = Path(
config["rolling forecasts"]["temporary results archives"], f"{model}_forecast"
)
try:
shutil.rmtree(tmp_forecast_results_archive)
except FileNotFoundError:
# Temporary forecast results directory doesn't exist, and that's okay
pass
_add_past_days_results(
run_date, days_from_past, new_forecast_dir, model, run_type, config
)
_add_forecast_results(
run_date,
new_forecast_dir,
tmp_forecast_results_archive,
model,
run_type,
config,
)
shutil.rmtree(os.fspath(forecast_dir))
new_forecast_dir.replace(forecast_dir)
logger.info(
f"updated {model} forecast directory for {run_type}/{ddmmmyy} run: "
f"{forecast_dir}"
)
def _create_new_forecast_dir(forecast_dir, model, run_type):
new_forecast_dir = forecast_dir.with_name(f"{forecast_dir.name}_new")
new_forecast_dir.mkdir()
logger.debug(
f"created new {model} forecast directory for {run_type} run: "
f"{new_forecast_dir}"
)
return new_forecast_dir
def _add_past_days_results(
run_date, days_from_past, new_forecast_dir, model, run_type, config
):
results_archive = (
Path(config["results archive"]["nowcast"])
if model == "nemo"
else Path(config["wave forecasts"]["results archive"]["nowcast"])
)
first_date = (
run_date.shift(days=-days_from_past)
if run_type == "forecast"
else run_date.shift(days=-(days_from_past - 1))
)
wwatch3_forecast2 = model == "wwatch3" and run_type == "forecast2"
last_date = run_date.shift(days=-1) if wwatch3_forecast2 else run_date
for day in arrow.Arrow.range("day", first_date, last_date):
_symlink_results(results_archive, day, new_forecast_dir, day, model, run_type)
def _add_forecast_results(
run_date, new_forecast_dir, tmp_forecast_results_archive, model, run_type, config
):
results_archive = (
Path(config["results archive"][run_type])
if model == "nemo"
else Path(config["wave forecasts"]["results archive"][run_type])
)
if run_type == "forecast":
_symlink_results(
results_archive,
run_date,
new_forecast_dir,
run_date.shift(days=+1),
model,
run_type,
)
return
# For preliminary forecast (run_type == 'forecast2'):
# Use 1st 24h of forecast run from previous day.
_extract_1st_forecast_day(tmp_forecast_results_archive, run_date, model, config)
day = run_date.shift(days=-1) if model == "wwatch3" else run_date
_symlink_results(
tmp_forecast_results_archive,
day.shift(days=+1),
new_forecast_dir,
day.shift(days=+1),
model,
run_type,
)
# Use forecast2 run for rest of forecast
_symlink_results(
results_archive,
run_date,
new_forecast_dir,
day.shift(days=+2),
model,
run_type,
)
def _extract_1st_forecast_day(tmp_forecast_results_archive, run_date, model, config):
# Create the destination directory
ddmmmyy_m1 = run_date.shift(days=-1).format("DDMMMYY").lower()
ddmmmyy = run_date.format("DDMMMYY").lower()
ddmmmyy_p1 = run_date.shift(days=+1).format("DDMMMYY").lower()
model_params = {
"nemo": {
"day dir": tmp_forecast_results_archive / ddmmmyy_p1,
"results archive": Path(config["results archive"]["forecast"]),
"forecast day": ddmmmyy,
"time variable": "time_counter",
},
"wwatch3": {
"day dir": tmp_forecast_results_archive / ddmmmyy,
"results archive": Path(
config["wave forecasts"]["results archive"]["forecast"]
),
"forecast day": ddmmmyy_m1,
"time variable": "time",
},
}
day_dir = model_params[model]["day dir"]
try:
day_dir.mkdir(parents=True)
except FileExistsError:
# Day directory exists, and that's okay
pass
logger.debug(f"created new {model} temporary forecast directory: {day_dir}")
results_archive = model_params[model]["results archive"]
forecast_day = model_params[model]["forecast day"]
for forecast_file in (results_archive / forecast_day).glob("*.nc"):
if any(
(
forecast_file.name.startswith("SalishSea_1d"),
forecast_file.name.startswith("VENUS"),
forecast_file.name.endswith("restart.nc"),
)
):
continue
forecast_file_24h = day_dir / forecast_file.name
forecast_time_intervals = {
"nemo": (
24
if forecast_file.name.startswith("SalishSea_1h")
or forecast_file.name.startswith("CHS_currents")
else 24 * 6
),
"wwatch3": (
24 * 2 if forecast_file.name.startswith("SoG_ww3_fields") else 24 * 6
),
}
forecast_times = forecast_time_intervals[model]
time_var = model_params[model]["time variable"]
cmd = (
f"/usr/bin/ncks -d {time_var},0,{forecast_times-1} "
f"{forecast_file} {forecast_file_24h}"
)
logger.debug(f"running {cmd} in subprocess")
subprocess.run(shlex.split(cmd))
logger.debug(f"extracted 1st 24h of {forecast_file} to {forecast_file_24h}")
def _symlink_results(
results_archive, results_day, forecast_dir, forecast_day, model, run_type
):
# Create the destination directory
ddmmmyy = forecast_day.format("DDMMMYY").lower()
day_dir = forecast_dir / ddmmmyy
day_dir.mkdir()
logger.debug(
f"created new {model} forecast directory for {run_type} run: {day_dir}"
)
# Symlink the results files into the destination directory
ddmmmyy = results_day.format("DDMMMYY").lower()
for f in (results_archive / ddmmmyy).glob("*.nc"):
if any(
(
f.name.startswith("SalishSea_1d"),
f.name.startswith("VENUS"),
f.name.endswith("restart.nc"),
)
):
continue
(day_dir / f.name).symlink_to(f)
logger.debug(f"symlinked *.nc files from {results_archive/ddmmmyy} in to {day_dir}")
if __name__ == "__main__":
main() # pragma: no cover