# Copyright 2013 – present by the SalishSeaCast Project contributors
# and The University of British Columbia
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# SPDX-License-Identifier: Apache-2.0
"""Salish Sea WaveWatch3 forecast worker that produces the hourly
ocean currents forcing file for a prelim-forecast or forecast run
"""
import logging
import os
import shlex
import subprocess
from pathlib import Path
import arrow
import xarray
from nemo_nowcast import NowcastWorker
from salishsea_tools import viz_tools
NAME = "make_ww3_current_file"
logger = logging.getLogger(NAME)
[docs]
def main():
"""Set up and run the worker.
For command-line usage see:
:command:`python -m nowcast.workers.make_ww3_current_file --help`
"""
worker = NowcastWorker(NAME, description=__doc__)
worker.init_cli()
worker.cli.add_argument(
"host_name", help="Name of the host to create the currents file on"
)
worker.cli.add_argument(
"run_type",
choices={"forecast2", "forecast", "nowcast"},
help="""
Type of run to create the currents file for:
'forecast2' means preliminary forecast run (after NEMO forecast2 run),
'forecast' means updated forecast run (after NEMO forecast run),
'nowcast' means updated 1 day only (for hindcast runs)
""",
)
worker.cli.add_date_option(
"--run-date",
default=arrow.now().floor("day"),
help="Start date of run to create the currents file for.",
)
worker.run(make_ww3_current_file, success, failure)
def success(parsed_args):
logger.info(
f"wwatch3 currents forcing file created "
f"on {parsed_args.host_name} "
f'for {parsed_args.run_date.format("YYYY-MM-DD")} '
f"{parsed_args.run_type} run"
)
msg_type = f"success {parsed_args.run_type}"
return msg_type
def failure(parsed_args):
logger.critical(
f"wwatch3 currents forcing file creation failed "
f"on {parsed_args.host_name} "
f'for {parsed_args.run_date.format("YYYY-MM-DD")} '
f"{parsed_args.run_type} run"
)
msg_type = f"failure {parsed_args.run_type}"
return msg_type
def make_ww3_current_file(parsed_args, config, *args):
host_name = parsed_args.host_name
run_type = parsed_args.run_type
run_date = parsed_args.run_date
ymd = run_date.format("YYYY-MM-DD")
logger.info(f"Creating wwatch3 currents forcing file for {ymd} {run_type} run")
host_config = config["run"]["enabled hosts"][host_name]
grid_dir = Path(config["wave forecasts"]["grid dir"])
mesh_mask = os.fspath(grid_dir / config["run types"]["nowcast"]["mesh mask"])
nemo_dir = Path(host_config["run types"]["nowcast"]["results"]).parent
nemo_file_tmpl = config["wave forecasts"]["NEMO file template"]
dest_dir = Path(config["wave forecasts"]["run prep dir"], "current")
filepath_tmpl = config["wave forecasts"]["current file template"]
nc_filepath = dest_dir / filepath_tmpl.format(yyyymmdd=run_date.format("YYYYMMDD"))
if run_type in {"nowcast", "forecast"}:
datasets = _calc_nowcast_datasets(run_date, nemo_dir, nemo_file_tmpl)
if run_type == "forecast":
datasets.update(_calc_forecast_datasets(run_date, nemo_dir, nemo_file_tmpl))
if run_type == "forecast2":
datasets = _calc_forecast2_datasets(
run_date, nemo_dir, nemo_file_tmpl, dest_dir
)
with xarray.open_dataset(mesh_mask) as grid:
lats = grid.nav_lat[1:, 1:]
lons = grid.nav_lon[1:, 1:] + 360
logger.debug(f"lats and lons from: {mesh_mask}")
drop_vars = {
"area",
"bounds_lon",
"bounds_lat",
"bounds_nav_lon",
"bounds_nav_lat",
"depthu_bounds",
"depthv_bounds",
"time_centered_bounds",
"time_counter_bounds",
}
chunks = {
"u": {
"time_counter": 3,
"depthu": 40,
"y": 898,
"x": 398,
},
"v": {
"time_counter": 3,
"depthv": 40,
"y": 898,
"x": 398,
},
}
with xarray.open_mfdataset(
datasets["u"],
chunks=chunks["u"],
compat="override",
coords="minimal",
data_vars="minimal",
drop_variables=drop_vars,
) as u_nemo:
logger.debug(f'u velocities from {datasets["u"]}')
with xarray.open_mfdataset(
datasets["v"],
chunks=chunks["v"],
compat="override",
coords="minimal",
data_vars="minimal",
drop_variables=drop_vars,
) as v_nemo:
logger.debug(f'v velocities from {datasets["v"]}')
u_unstaggered, v_unstaggered = viz_tools.unstagger(
u_nemo.vozocrtx.isel(depthu=0), v_nemo.vomecrty.isel(depthv=0)
)
del u_unstaggered.coords["time_centered"]
del u_unstaggered.coords["depthu"]
del v_unstaggered.coords["time_centered"]
del v_unstaggered.coords["depthv"]
logger.debug("unstaggered velocity components on to mesh mask lats/lons")
u_current, v_current = viz_tools.rotate_vel(u_unstaggered, v_unstaggered)
logger.debug("rotated velocity components north/south alignment")
ds = _create_dataset(
u_current.time_counter, lats, lons, u_current, v_current, datasets
)
logger.debug("created currents dataset")
ds.to_netcdf(os.fspath(nc_filepath))
logger.debug(f"stored currents forcing file: {nc_filepath}")
checklist = {
run_type: os.fspath(nc_filepath),
"run date": run_date.format("YYYY-MM-DD"),
}
return checklist
def _calc_nowcast_datasets(run_date, nemo_dir, nemo_file_tmpl):
datasets = {"u": [], "v": []}
dmy = run_date.format("DDMMMYY").lower()
s_yyyymmdd = e_yyyymmdd = run_date.format("YYYYMMDD")
for grid in datasets:
nowcast_file = (
nemo_dir
/ Path("nowcast", dmy)
/ nemo_file_tmpl.format(
s_yyyymmdd=s_yyyymmdd, e_yyyymmdd=e_yyyymmdd, grid=grid.upper()
)
)
datasets[grid].append(nowcast_file)
logger.debug(f"{grid} dataset: {nowcast_file}")
return datasets
def _calc_forecast_datasets(run_date, nemo_dir, nemo_file_tmpl):
datasets = {"u": [], "v": []}
dmy = run_date.format("DDMMMYY").lower()
s_yyyymmdd = e_yyyymmdd = run_date.format("YYYYMMDD")
for grid in datasets:
nowcast_file = (
nemo_dir
/ Path("nowcast", dmy)
/ nemo_file_tmpl.format(
s_yyyymmdd=s_yyyymmdd, e_yyyymmdd=e_yyyymmdd, grid=grid.upper()
)
)
datasets[grid].append(nowcast_file)
logger.debug(f"{grid} dataset: {nowcast_file}")
s_yyyymmdd = run_date.shift(days=+1).format("YYYYMMDD")
e_yyyymmdd = run_date.shift(days=+2).format("YYYYMMDD")
for grid in datasets:
forecast_file = (
nemo_dir
/ Path("forecast", dmy)
/ nemo_file_tmpl.format(
s_yyyymmdd=s_yyyymmdd, e_yyyymmdd=e_yyyymmdd, grid=grid.upper()
)
)
datasets[grid].append(forecast_file)
logger.debug(f"{grid} dataset: {forecast_file}")
return datasets
def _calc_forecast2_datasets(run_date, nemo_dir, nemo_file_tmpl, dest_dir):
datasets = {"u": [], "v": []}
dmy = run_date.shift(days=-1).format("DDMMMYY").lower()
s_yyyymmdd = run_date.format("YYYYMMDD")
e_yyyymmdd = run_date.shift(days=+1).format("YYYYMMDD")
for grid in datasets:
forecast_file = (
nemo_dir
/ Path("forecast", dmy)
/ nemo_file_tmpl.format(
s_yyyymmdd=s_yyyymmdd, e_yyyymmdd=e_yyyymmdd, grid=grid.upper()
)
)
forecast_file_24h = dest_dir / nemo_file_tmpl.format(
s_yyyymmdd=s_yyyymmdd, e_yyyymmdd=s_yyyymmdd, grid=grid.upper()
)
cmd = (
f"/usr/bin/ncks -d time_counter,0,23 "
f"{forecast_file} {forecast_file_24h}"
)
logger.debug(f"running {cmd} in subprocess")
subprocess.run(shlex.split(cmd))
logger.debug(f"extracted 1st 24h of {forecast_file} to {forecast_file_24h}")
datasets[grid].append(forecast_file_24h)
logger.debug(f"{grid} dataset: {forecast_file_24h}")
s_yyyymmdd = run_date.shift(days=+1).format("YYYYMMDD")
e_yyyymmdd = run_date.shift(days=+2).format("YYYYMMDD")
for grid in datasets:
forecast2_file = (
nemo_dir
/ Path("forecast2", dmy)
/ nemo_file_tmpl.format(
s_yyyymmdd=s_yyyymmdd, e_yyyymmdd=e_yyyymmdd, grid=grid.upper()
)
)
datasets[grid].append(forecast2_file)
logger.debug(f"{grid} dataset: {forecast2_file}")
return datasets
def _create_dataset(time, lats, lons, u_current, v_current, datasets):
now = arrow.now()
ds = xarray.Dataset(
data_vars={
"u_current": u_current.rename({"time_counter": "time"}),
"v_current": v_current.rename({"time_counter": "time"}),
},
coords={
"time": time.rename("time").rename({"time_counter": "time"}),
"latitude": lats,
"longitude": lons,
},
attrs={
"creation_date": str(now),
"history": f'[{now.format("YYYY-MM-DD HH:mm:ss")}] '
f"created by SalishSeaNowcast "
f"make_ww3_current_file worker",
"source": f"UBC SalishSeaCast NEMO results datasets: {datasets}",
},
)
return ds
if __name__ == "__main__":
main()