# Copyright 2013 – present by the SalishSeaCast Project contributors
# and The University of British Columbia
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""SalishSeaCast worker that generates a sea surface height boundary conditions file from
NOAA Neah Bay observation and forecast values.
"""
import datetime
import logging
import os
from pathlib import Path
import shutil
import arrow
import matplotlib.backends.backend_agg
import matplotlib.figure
from nemo_nowcast import NowcastWorker
import netCDF4
import numpy
import pytz
from salishsea_tools import nc_tools
from nowcast import lib, residuals
NAME = "make_ssh_files"
logger = logging.getLogger(NAME)
[docs]
def main():
"""Set up and run the worker.
For command-line usage see:
:command:`python -m nowcast.workers.make_ssh_file --help`
"""
worker = NowcastWorker(NAME, description=__doc__)
worker.init_cli()
worker.cli.add_argument(
"run_type",
choices={"nowcast", "forecast2"},
help="""
Type of run to prepare open boundary sea surface height file for:
'nowcast' means nowcast & forecast runs,
'forecast2' means preliminary forecast run
""",
)
worker.cli.add_date_option(
"--run-date",
default=arrow.now().floor("day"),
help="""
Date to prepare open boundary sea surface height file for.
Defaults to today.
""",
)
worker.cli.add_argument(
"--text-file",
type=Path,
help="""
Absolute path and file name of legacy file like sshNB_YYYY-MM-DD_HH.txt
to process instead of CSV file from NOAA tarball.
**This option is intended for hindcast boundary file creation and should be used with
the --debug option.**
""",
)
worker.cli.add_argument(
"--archive", action="store_true", help="text-file is archive type"
)
worker.run(make_ssh_file, success, failure)
return worker
def success(parsed_args):
"""
:param :py:class:`argparse.Namespace` parsed_args:
:return: Nowcast system message type
:rtype: str
"""
run_date = parsed_args.run_date
run_type = parsed_args.run_type
logger.info(
f"sea surface height boundary file for {run_date.format('YYYY-MM-DD')} {run_type} run created"
)
return f"success {run_type}"
def failure(parsed_args):
"""
:param :py:class:`argparse.Namespace` parsed_args:
:return: Nowcast system message type
:rtype: str
"""
run_date = parsed_args.run_date
run_type = parsed_args.run_type
logger.critical(
f"sea surface height boundary file for {run_date.format('YYYY-MM-DD')} {run_type} run creation failed"
)
return f"failure {run_type}"
def make_ssh_file(parsed_args, config, *args):
"""
:param :py:class:`argparse.Namespace` parsed_args:
:param :py:class:`nemo_nowcast.Config` config:
:return: Nowcast system checklist items
:rtype: dict
"""
run_date = parsed_args.run_date
yyyymmdd = run_date.format("YYYYMMDD")
run_type = parsed_args.run_type
ssh_forecast = "06" if run_type == "nowcast" else "00"
logger.info(
f"building {run_type} sea surface height boundary conditions file(s) from "
f"{run_date.format('YYYY-MM-DD')} NOAA Neah Bay {ssh_forecast}Z "
f"observation and forecast values"
)
ssh_dir = Path(config["ssh"]["ssh dir"])
tar_file_tmpl = config["ssh"]["download"]["tar file template"]
csv_file = Path(
tar_file_tmpl.format(yyyymmdd=yyyymmdd, forecast=ssh_forecast)
).with_suffix(".csv")
data_file = parsed_args.text_file or ssh_dir / "txt" / csv_file
checklist = {run_type: {}}
if parsed_args.text_file is None:
# Store a copy of the CSV file from the NOAA tarball in the run results directory so that
# there is definitive record of the sea surface height data that was used for the run
_copy_csv_to_results_dir(data_file, run_date, run_type, config)
checklist[run_type].update({"csv": os.fspath(data_file)})
# Grab all sea surface height data in the NOAA data file
tidal_preds_dir = Path(config["ssh"]["tidal predictions"])
neah_bay_hourly_tides = config["ssh"]["neah bay hourly"]
dates, sshs, fflags = residuals.NeahBay_forcing_anom(
data_file,
(
run_date.datetime
if run_type == "nowcast"
else run_date.shift(days=-1).datetime
),
tidal_preds_dir / neah_bay_hourly_tides,
parsed_args.archive,
parsed_args.text_file is None,
)
logger.debug(f"read sea surface height data from {data_file}")
# Identify days with full ssh information
dates_full = _list_full_days(dates, sshs, fflags)
lons, lats = _get_lons_lats(config)
if parsed_args.text_file is None:
fig, ax = _setup_plot()
# Loop through full days and save netcdf file(s)
for ip, d in enumerate(dates_full):
tc, _, sshd, fflagd = _isolate_day(d, dates, sshs, fflags)
forecast_flag = fflagd.any()
# Plotting
if parsed_args.text_file is None and ip < 3:
ax.plot(sshd, "-o", lw=2, label=d.strftime("%d-%b-%Y"))
filepath = _save_netcdf(
d, tc, sshd, forecast_flag, data_file, config, lats, lons
)
logger.info(f"wrote sea surface height boundary file: {filepath}")
filename = os.path.basename(filepath)
lib.fix_perms(filename, grp_name=config["file group"])
if forecast_flag:
if "fcst" not in checklist[run_type]:
checklist[run_type].update({"fcst": [filepath]})
else:
checklist[run_type]["fcst"].append(filepath)
else:
checklist[run_type].update({"obs": filepath})
_ensure_all_files_created(run_date, run_type, ssh_dir, checklist, config)
if parsed_args.text_file is None:
_render_plot(fig, ax, config)
return checklist
def _copy_csv_to_results_dir(data_file, run_date, run_type, config):
results_date = run_date if run_type == "nowcast" else run_date.shift(days=-1)
results_dir = Path(
config["results archive"][run_type], results_date.format("DDMMMYY").lower()
)
lib.mkdir(results_dir, logger, grp_name=config["file group"], exist_ok=True)
shutil.copy2(data_file, results_dir)
logger.debug(f"copied {data_file} to {results_dir}")
def _list_full_days(dates, surges, forecast_flags):
"""Return a list of days that have a full 24 hour data set."""
# Check if first day is a full day
tc, ds, _, _ = _isolate_day(dates[0], dates, surges, forecast_flags)
if ds.shape[0] == tc.shape[0]:
start = dates[0]
else:
start = dates[0] + datetime.timedelta(days=1)
start = datetime.datetime(
start.year, start.month, start.day, tzinfo=pytz.timezone("UTC")
)
# Check if last day is a full day
tc, ds, _, _ = _isolate_day(dates[-1], dates, surges, forecast_flags)
if ds.shape[0] == tc.shape[0]:
end = dates[-1]
else:
end = dates[-1] - datetime.timedelta(days=1)
end = datetime.datetime(end.year, end.month, end.day, tzinfo=pytz.timezone("UTC"))
# list of dates that are full
dates_list = [
start + datetime.timedelta(days=i) for i in range((end - start).days + 1)
]
return dates_list
def _isolate_day(day, dates, surges, forecast_flags):
"""Return array of time_counter and datetime objects over a 24 hour
period covering one full day.
Returns the surge and forecast_flag for that day as well
"""
tc = numpy.arange(24)
dates_r = []
surge_r = []
flag_r = []
for t, surge, flag in zip(dates, surges, forecast_flags):
if t.month == day.month:
if t.day == day.day:
dates_r.append(t)
surge_r.append(surge)
flag_r.append(flag)
return tc, numpy.array(dates_r), numpy.array(surge_r), numpy.array(flag_r)
def _get_lons_lats(config):
coords = Path(config["ssh"]["coordinates"])
with netCDF4.Dataset(coords) as coordinates:
lats = coordinates.variables["nav_lat"][:]
lons = coordinates.variables["nav_lon"][:]
logger.debug(f"loaded lats & lons from {coords}")
return lons, lats
def _save_netcdf(day, tc, surges, forecast_flag, textfile, config, lats, lons):
"""Save the surge for a given day in a netCDF4 file."""
# Western open boundary (JdF) grid parameter values for NEMO
startj, endj, r = 370, 470, 1
lengthj = endj - startj
# netCDF4 file setup
save_path = config["ssh"]["ssh dir"]
filename = config["ssh"]["file template"].format(day)
if forecast_flag:
filepath = os.path.join(save_path, "fcst", filename)
comment = "Prediction from Neah Bay storm surge website"
else:
filepath = os.path.join(save_path, "obs", filename)
try:
# Unlink file path in case it exists as a symlink to a fcst/
# file created by upload_forcing worker because there was
# no obs/ file
os.unlink(filepath)
except OSError:
# File path does not exist
pass
comment = "Observation from Neah Bay storm surge website"
comment = " ".join((comment, f"generated by SalishSeaCast {NAME} worker"))
ssh_file = netCDF4.Dataset(filepath, "w")
nc_tools.init_dataset_attrs(
ssh_file,
title="Neah Bay SSH hourly values",
notebook_name="N/A",
nc_filepath=filepath,
comment=comment,
quiet=True,
)
ssh_file.source = os.fspath(textfile)
ssh_file.references = f"https://github.com/SalishSeaCast/SalishSeaNowcast/blob/main/nowcast/workers/{NAME}.py"
logger.debug(f"created western open boundary file {filepath}")
# Create netCDF dimensions
ssh_file.createDimension("time_counter", None)
ssh_file.createDimension("yb", 1)
ssh_file.createDimension("xbT", lengthj * r)
# Create netCDF variables
time_counter = ssh_file.createVariable("time_counter", "float32", "time_counter")
time_counter.long_name = "Time axis"
time_counter.axis = "T"
time_counter.units = f"hour since 00:00:00 on {day:%Y-%m-%d}"
# Latitudes and longitudes
nav_lat = ssh_file.createVariable("nav_lat", "float32", ("yb", "xbT"))
nav_lat.long_name = "Latitude"
nav_lat.units = "degrees_north"
nav_lon = ssh_file.createVariable("nav_lon", "float32", ("yb", "xbT"))
nav_lon.long_name = "Longitude"
nav_lon.units = "degrees_east"
# Sea surface height
sossheig = ssh_file.createVariable(
"sossheig", "float32", ("time_counter", "yb", "xbT"), zlib=True
)
sossheig.units = "m"
sossheig.long_name = "Sea surface height"
sossheig.grid = "SalishSea2"
# Baroclinic u and v velocity components
vobtcrtx = ssh_file.createVariable(
"vobtcrtx", "float32", ("time_counter", "yb", "xbT"), zlib=True
)
vobtcrtx.units = "m/s"
vobtcrtx.long_name = "Barotropic U Velocity"
vobtcrtx.grid = "SalishSea2"
vobtcrty = ssh_file.createVariable(
"vobtcrty", "float32", ("time_counter", "yb", "xbT"), zlib=True
)
vobtcrty.units = "m/s"
vobtcrty.long_name = "Barotropic V Velocity"
vobtcrty.grid = "SalishSea2"
# Boundary description for NEMO
nbidta = ssh_file.createVariable("nbidta", "int32", ("yb", "xbT"), zlib=True)
nbidta.long_name = "i grid position"
nbidta.units = 1
nbjdta = ssh_file.createVariable("nbjdta", "int32", ("yb", "xbT"), zlib=True)
nbjdta.long_name = "j grid position"
nbjdta.units = 1
nbrdta = ssh_file.createVariable("nbrdta", "int32", ("yb", "xbT"), zlib=True)
nbrdta.long_name = "position from boundary"
nbrdta.units = 1
# Load values
for ir in range(r):
nav_lat[0, ir * lengthj : (ir + 1) * lengthj] = lats[startj:endj, ir]
nav_lon[0, ir * lengthj : (ir + 1) * lengthj] = lons[startj:endj, ir]
nbidta[0, ir * lengthj : (ir + 1) * lengthj] = ir
nbjdta[0, ir * lengthj : (ir + 1) * lengthj] = range(startj, endj)
nbrdta[0, ir * lengthj : (ir + 1) * lengthj] = ir
for ib in range(lengthj * r):
sossheig[:, 0, ib] = surges
time_counter[:] = tc + 1
vobtcrtx[:, 0, ib] = numpy.zeros(len(surges))
vobtcrty[:, 0, ib] = numpy.zeros(len(surges))
ssh_file.close()
try:
lib.fix_perms(filepath)
except PermissionError:
# Can't change permissions/group because we don't own the file
# but that's okay because we were able to write it above
pass
logger.debug(f"saved western open boundary file {filepath}")
return filepath
def _ensure_all_files_created(run_date, run_type, ssh_dir, checklist, config):
"""Confirm that obs files were created. If not, create them by symlinking to fcst file for date."""
earliest_obs_date = (
run_date.shift(days=-4) if run_type == "nowcast" else run_date.shift(days=-5)
)
obs_dates = arrow.Arrow.range("days", earliest_obs_date, run_date.shift(days=-1))
for obs_date in obs_dates:
obs_file = config["ssh"]["file template"].format(obs_date.datetime)
obs_path = ssh_dir / "obs" / obs_file
if not obs_path.exists():
fcst_relative_path = Path("..", "fcst", obs_file)
obs_path.symlink_to(fcst_relative_path)
logger.critical(
f"{obs_path} was not created; using {fcst_relative_path} instead via symlink"
)
checklist[run_type].update({"obs": os.fspath(fcst_relative_path)})
def _setup_plot():
fig = matplotlib.figure.Figure(figsize=(10, 4))
ax = fig.add_subplot(1, 1, 1)
ax.set_title("Neah Bay SSH")
ax.set_ylim([-1, 1])
ax.grid()
ax.set_ylabel("Sea surface height (m)")
return fig, ax
def _render_plot(fig, ax, config):
ax.legend(loc=4)
image_file = config["ssh"]["monitoring image"]
canvas = matplotlib.backends.backend_agg.FigureCanvasAgg(fig)
canvas.print_figure(image_file)
lib.fix_perms(image_file, grp_name=config["file group"])
logger.debug(
f"rendered sea surface height processing monitoring image: {image_file}"
)
if __name__ == "__main__":
main() # pragma: no cover