# Copyright 2013 – present by the SalishSeaCast Project contributors
# and The University of British Columbia
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# SPDX-License-Identifier: Apache-2.0
"""SalishSeaCast worker that produces daily average Fraser River
turbidity file from hourly real-time turbidity data collected from Environment
and Climate Change Canada Fraser River water quality buoy.
"""
import datetime as dt
import logging
import os
from pathlib import Path
import arrow
import netCDF4 as nc
import numpy as np
import pandas as pd
import pytz
from nemo_nowcast import NowcastWorker
NAME = "make_turbidity_file"
logger = logging.getLogger(NAME)
[docs]
def main():
"""Set up and run the worker.
For command-line usage see:
:command:`python -m nowcast.workers.make_turbidity_file --help`
"""
worker = NowcastWorker(NAME, description=__doc__)
worker.init_cli()
worker.cli.add_date_option(
"--run-date",
default=arrow.now().floor("day"),
help="Date of the run to produce turbidity file for.",
)
worker.run(make_turbidity_file, success, failure)
def success(parsed_args):
"""
:param :py:class:`argparse.Namespace` parsed_args:
:return: Nowcast system message type
:rtype: str
"""
logger.info(
f'{parsed_args.run_date.format("YYYY-MM-DD")} '
f"Fraser River turbidity file creation complete"
)
return "success"
def failure(parsed_args):
"""
:param :py:class:`argparse.Namespace` parsed_args:
:return: Nowcast system message type
:rtype: str
"""
logger.critical(
f'{parsed_args.run_date.format("YYYY-MM-DD")} '
f"Fraser River turbidity file creation failed"
)
return "failure"
def make_turbidity_file(parsed_args, config, *args):
"""Create a daily average Fraser River turbidity file from hourly real-time
turbidity data.
:param :py:class:`argparse.Namespace` parsed_args:
:param :py:class:`nemo_nowcast.Config` config:
:return: Nowcast system checklist item
:rtype: str
"""
run_date = parsed_args.run_date
ymd = run_date.format("YYYY-MM-DD")
logger.info(f"Creating Fraser River turbidity forcing file for {ymd}")
turbidity_csv = config["rivers"]["turbidity"]["ECget Fraser turbidity"]
# Pick time as 19:00, which means selected times will begin with 19:10
# on prev day and go to 18:10
idatedt = dt.datetime(run_date.year, run_date.month, run_date.day, 19, 0, 0)
idateDD = _dateTimeToDecDay(idatedt)
# Read most recent 24 hours data + extra for itnerpolation from
# turbidity_csv, mthresh is max number of missing data points to
# interpolate over (number of hours) + 1.01 to account for difference
# between last and next hour (1) and rounding errors (.01)
mthresh = 5.01
try:
tdf = _loadturb(idateDD, turbidity_csv, mthresh, ymd)
except ValueError:
return None
# Interpolate and average data and write netcdf file to nc_filepath
itdf = _interpTurb(tdf, idateDD, mthresh)
try:
iTurb = _calcAvgT(itdf, mthresh, ymd)
except ValueError:
return None
dest_dir = Path(config["rivers"]["turbidity"]["forcing dir"])
file_tmpl = config["rivers"]["turbidity"]["file template"]
nc_filepath = os.fspath(dest_dir / file_tmpl.format(run_date.date()))
_writeTFile(nc_filepath, iTurb)
logger.debug(f"stored Fraser River turbidity forcing file: {nc_filepath}")
checklist = nc_filepath
return checklist
def _loadturb(idate, turbidity_csv, mthresh, ymd):
# Read file into pandas dataframe
tdf = pd.read_csv(turbidity_csv, header=0)
tdf["dtdate"] = pd.to_datetime(
tdf["# date"] + " " + tdf["time"], format="%Y-%m-%d %H:%M:%S"
)
tdf["DD"] = [_dateTimeToDecDay(jj) for jj in _pacToUTC(tdf["dtdate"].values)]
# Select current 24 hr period + extra for interpolation
# this will break if np.datetime64 string format changes
tdf2 = (
tdf.loc[
(tdf["DD"] > (idate - 1.0 - mthresh / 24))
& (tdf["DD"] <= (idate + mthresh / 24))
]
.sort_values("DD")
.copy()
)
tdf2.drop_duplicates(inplace=True)
tdf2.index = range(len(tdf2))
if len(tdf2) < 5:
# data read can't satisfy coverage criteria
msg = (
f"Insufficient data to proceed to turbidity interpolation; "
f"cannot create Fraser River turbidity file "
f"for {ymd}"
)
logger.warning(msg)
raise ValueError(msg)
logger.debug(f"read turbidity data from {turbidity_csv}")
return tdf2
def _interpTurb(tdf2, idate, mthresh):
dfout = pd.DataFrame(
index=range(int(mthresh) * 2 + 24), columns=("hDD", "turbidity")
)
dfout["hDD"] = [
idate - 1 + (ind - int(mthresh)) / 24.0 for ind in range(len(dfout))
]
iout = 0
for ind, row in tdf2.iterrows():
if ind == 0:
ddlast = row["DD"]
if (dfout.loc[iout]["hDD"] - ddlast) < 0:
# insert NaNs if no data at beginning of cycle
nint = int(np.round((ddlast - dfout.loc[iout]["hDD"]) * 24))
for ii in range(0, nint):
iout += 1
if ((row["DD"] - ddlast) < mthresh / 24.0) & (
(row["DD"] - ddlast) > 1.5 / 24.0
):
# if a break consists of 4 missing data points or less, linearly
# interpolate through
tlast = tdf2.loc[ind - 1]["turbidity"]
tnext = row["turbidity"]
ddnext = row["DD"]
nint = int(np.round((ddnext - ddlast) * 24) - 1)
for ii in range(1, nint + 1):
dd0 = ddlast + ii / 24.0
tur0 = tlast + (dd0 - ddlast) / (ddnext - ddlast) * (tnext - tlast)
if (dfout.loc[iout]["hDD"] - dd0) < 0.5 / 24.0:
dfout.loc[iout, "turbidity"] = tur0
iout += 1
else:
msg = (
f"Anticipated and output hour were consistent: "
f'{dfout.loc[iout]["hDD"]} {dd0} '
f'{dfout.loc[iout]["hDD"] - dd0}'
)
logger.error(msg)
raise ValueError(msg)
elif (row["DD"] - ddlast) >= mthresh / 24.0:
# insert NaNs in larger holes
nint = int(np.round((row["DD"] - ddlast) * 24) - 1)
for ii in range(1, nint + 1):
dd0 = ddlast + ii / 24.0
if (dfout.loc[iout]["hDD"] - dd0) < 0.5 / 24.0:
iout += 1
else:
msg = "Anticipated and output hour were consistent"
logger.error(msg)
raise ValueError(msg)
# always append current tdf2 row's value
if np.abs(dfout.loc[iout]["hDD"] - row["DD"]) < 0.5 / 24.0:
dfout.loc[iout, "turbidity"] = row["turbidity"]
else:
msg = (
f"Anticipated and output hour were consistent: "
f'iout={iout} ind={ind} {dfout.loc[iout]["hDD"]} {row["DD"]}'
)
logger.error(msg)
raise ValueError(msg)
iout += 1
ddlast = row["DD"]
logger.debug("interpolated turbidity data")
return dfout
def _calcAvgT(dfout, mthresh, ymd):
i0 = int(mthresh)
i1 = i0 + 23
dfdata = dfout.loc[i0:i1]
if len(dfdata.loc[dfdata["turbidity"] > 0].values) > 19:
dfdata2 = dfdata.loc[dfdata["turbidity"] > 0]
iTurb = np.mean(dfdata2["turbidity"].values)
else:
# data read doesn't satisfy coverage criteria
msg = (
f"Insufficient data after interpolation to create Fraser River "
f"turbidity file "
f"for {ymd}"
)
logger.warning(msg)
raise ValueError(msg)
return iTurb
def _writeTFile(
fname, iTurb, dimTemplate="/results/forcing/rivers/RLonFraCElse_y2016m01d23.nc"
):
f = nc.Dataset(dimTemplate) # example for dims
new = nc.Dataset(fname, "w")
# Copy dimensions
for dname, the_dim in f.dimensions.items():
new.createDimension(dname, len(the_dim) if not the_dim.isunlimited() else None)
# create dimension variables:
new_x = new.createVariable("nav_lat", np.float32, ("y", "x"), zlib=True)
new_x[:] = f.variables["nav_lat"][:, :]
new_y = new.createVariable("nav_lon", np.float32, ("y", "x"), zlib=True)
new_y[:] = f.variables["nav_lon"][:, :]
new_tc = new.createVariable("time_counter", np.float32, "time_counter", zlib=True)
new_tc[:] = f.variables["time_counter"][:]
new_run = new.createVariable("turb", float, ("time_counter", "y", "x"), zlib=True)
new_run[:, :, :] = -999.99 # most cells are masked with negative numbers
new_run[:, 400:448, 338:380] = iTurb # set turbidity to daily average
new.close()
logger.debug(f"wrote file to {fname}")
return
def _dateTimeToDecDay(dtin):
tdif = dtin - dt.datetime(1900, 1, 1)
dd = tdif.days + tdif.seconds / (3600 * 24)
return dd
def _pacToUTC(pactime0):
# input datetime object without tzinfo in Pacific Time and
# output datetime object (or np array of them) without tzinfo in UTC
pactime = np.array(pactime0, ndmin=1)
if pactime.ndim > 1:
raise Exception("Error: ndim>1")
# handle case where array of numpy.datetime64 is input:
# this will break if np.datetime64 string format changes
if isinstance(pactime[0], np.datetime64):
pactime2 = [
dt.datetime.strptime(str(d)[:19], "%Y-%m-%dT%H:%M:%S") for d in pactime
]
pactime = np.array(pactime2)
out = np.empty(pactime.shape, dtype=object)
pac = pytz.timezone("Canada/Pacific")
utc = pytz.utc
for ii in range(0, len(pactime)):
itime = pactime[ii]
loc_t = pac.localize(itime)
utc_t = loc_t.astimezone(utc)
out[ii] = utc_t.replace(tzinfo=None)
return out[0] if np.isscalar(pactime0) else out
if __name__ == "__main__":
main() # pragma: no cover