Source code for nowcast.workers.get_onc_ctd

#  Copyright 2013 – present by the SalishSeaCast Project contributors
#  and The University of British Columbia
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
"""Salish Sea nowcast worker that downloads CTD temperature and salinity data
for a specified UTC day from an ONC Strait of Georgia node.

The data are filtered to include only values for which qaqcFlag == 1
(meaning that all of ONC's automated QA/QC tests were passed).
After filtering the data are aggregated into 15 minute bins.
The aggregation functions are mean, standard deviation, and sample count.

The data are stored as a netCDF-4/HDF5 file that is accessible via
https://salishsea.eos.ubc.ca/erddap/tabledap/index.html?page=1&itemsPerPage=1000.

Development notebook:
https://nbviewer.org/github/SalishSeaCast/analysis-doug/blob/main/notebooks/ONC-CTD-DataToERDDAP.ipynb
"""
import logging
import os
from pathlib import Path

import arrow
import numpy
import xarray
from nemo_nowcast import NowcastWorker, WorkerError
from salishsea_tools import data_tools
from salishsea_tools.places import PLACES

NAME = "get_onc_ctd"
logger = logging.getLogger(NAME)


[docs] def main(): """For command-line usage see: :command:`python -m nowcast.workers.get_onc_ctd -h` """ worker = NowcastWorker(NAME, description=__doc__) worker.init_cli() worker.cli.add_argument( "onc_station", choices={"SCVIP", "SEVIP", "USDDL"}, help="Name of the ONC node station to download data for.", ) worker.cli.add_date_option( "--data-date", default=arrow.utcnow().floor("day").shift(days=-1), help="UTC date to get ONC node CTD data for.", ) worker.run(get_onc_ctd, success, failure) return worker
def success(parsed_args): ymd = parsed_args.data_date.format("YYYY-MM-DD") logger.info(f"{ymd} ONC {parsed_args.onc_station} CTD T&S file created") msg_type = f"success {parsed_args.onc_station}" return msg_type def failure(parsed_args): ymd = parsed_args.data_date.format("YYYY-MM-DD") logger.critical(f"{ymd} ONC {parsed_args.onc_station} CTD T&S file creation failed") msg_type = "failure" return msg_type def get_onc_ctd(parsed_args, config, *args): ymd = parsed_args.data_date.format("YYYY-MM-DD") logger.info(f"requesting ONC {parsed_args.onc_station} CTD T&S data for {ymd}") onc_data = data_tools.get_onc_data( "scalardata", "getByLocation", os.environ["ONC_USER_TOKEN"], locationCode=parsed_args.onc_station, deviceCategoryCode="CTD", sensorCategoryCodes="salinity,temperature", dateFrom=data_tools.onc_datetime(f"{ymd} 00:00", "utc"), dateTo=data_tools.onc_datetime(f"{ymd} 23:59", "utc"), ) try: ctd_data = data_tools.onc_json_to_dataset(onc_data) except TypeError: logger.error(f"No ONC {parsed_args.onc_station} CTD T&S data for {ymd}") raise WorkerError logger.debug( f"ONC {parsed_args.onc_station} CTD T&S data for {ymd} received and parsed" ) logger.debug( f"filtering ONC {parsed_args.onc_station} temperature data for {ymd} " f"to exlude qaqcFlag!=1" ) temperature = _qaqc_filter(ctd_data, "temperature") logger.debug( f"filtering ONC {parsed_args.onc_station} salinity data for {ymd} " f"to exlude qaqcFlag!=1" ) salinity = _qaqc_filter(ctd_data, "salinity") logger.debug(f"creating ONC {parsed_args.onc_station} CTD T&S dataset for {ymd}") ds = _create_dataset(parsed_args.onc_station, temperature, salinity) dest_dir = Path(config["observations"]["ctd data"]["dest dir"]) filepath_tmpl = config["observations"]["ctd data"]["filepath template"] nc_filepath = dest_dir / filepath_tmpl.format( station=parsed_args.onc_station, yyyymmdd=parsed_args.data_date.format("YYYYMMDD"), ) logger.debug( f"storing ONC {parsed_args.onc_station} CTD T&S dataset " f"for {ymd} as {nc_filepath}" ) encoding = { var: {"dtype": "int64", "_FillValue": 0} for var in ds.data_vars if var.endswith("sample_count") } encoding["time"] = {"units": "minutes since 1970-01-01 00:00"} ds.to_netcdf(os.fspath(nc_filepath), encoding=encoding, unlimited_dims=("time",)) checklist = {parsed_args.onc_station: os.fspath(nc_filepath)} return checklist def _qaqc_filter(ctd_data, var): qaqc_mask = ctd_data.data_vars[var].attrs["qaqcFlag"] == 1 filtered_var = xarray.DataArray( name=var, data=ctd_data.data_vars[var][qaqc_mask].values, coords={"time": ctd_data.data_vars[var].sampleTime[qaqc_mask].values}, dims="time", ) return filtered_var def _create_dataset(onc_station, temperature, salinity): def count(values, axis): return values.size metadata = { "SCVIP": { "place_name": "Central node", "ONC_station": "Central", "ONC_stationDescription": "Pacific, Salish Sea, Strait of Georgia, Central, " "Strait of Georgia VENUS Instrument Platform", }, "SEVIP": { "place_name": "East node", "ONC_station": "East", "ONC_stationDescription": "Pacific, Salish Sea, Strait of Georgia, East, " "Strait of Georgia VENUS Instrument Platform", }, "USDDL": { "place_name": "Delta DDL node", "ONC_station": "Delta Upper Slope DDL", "ONC_stationDescription": "Pacific, Salish Sea, Strait of Georgia, Delta, Upper Slope, " "Delta Dynamics Laboratory", }, } try: temperature_mean = temperature.resample(time="15Min").mean() temperature_std_dev = temperature.resample(time="15Min").std() temperature_sample_count = temperature.resample(time="15Min").count() except IndexError: # If the temperature data is messing no dataset can be created raise WorkerError(f"no {onc_station} temperate data; no dataset created") try: salinity_mean = salinity.resample(time="15Min").mean() salinity_std_dev = salinity.resample(time="15Min").std() salinity_sample_count = salinity.resample(time="15Min").count() except IndexError: logger.warning(f"no {onc_station} salinity data") salinity_mean = temperature_mean.copy() salinity_mean.name = "salinity" salinity_mean.data = numpy.full_like(temperature_mean, numpy.nan) salinity_std_dev = temperature_std_dev.copy() salinity_std_dev.name = "salinity_std_dev" salinity_std_dev.data = numpy.full_like(temperature_std_dev, numpy.nan) salinity_sample_count = temperature_sample_count.copy() salinity_sample_count.name = "salinity_sample_count" salinity_sample_count.data = numpy.zeros_like(temperature_sample_count) ds = xarray.Dataset( data_vars={ "temperature": xarray.DataArray( name="temperature", data=temperature_mean, attrs={ "ioos_category": "Temperature", "standard_name": "sea_water_temperature", "long_name": "temperature", "units": "degrees_Celcius", "aggregation_operation": "mean", "aggregation_interval": 15 * 60, "aggregation_interval_units": "seconds", }, ), "temperature_std_dev": xarray.DataArray( name="temperature_std_dev", data=temperature_std_dev, attrs={ "ioos_category": "Temperature", "standard_name": "sea_water_temperature_standard_deviation", "long_name": "temperature standard deviation", "units": "degrees_Celcius", "aggregation_operation": "standard deviation", "aggregation_interval": 15 * 60, "aggregation_interval_units": "seconds", }, ), "temperature_sample_count": xarray.DataArray( name="temperature_sample_count", data=temperature_sample_count, attrs={ "standard_name": "sea_water_temperature_sample_count", "long_name": "temperature sample count", "aggregation_operation": "count", "aggregation_interval": 15 * 60, "aggregation_interval_units": "seconds", }, ), "salinity": xarray.DataArray( name="salinity", data=salinity_mean, attrs={ "ioos_category": "Salinity", "standard_name": "sea_water_reference_salinity", "long_name": "reference salinity", "units": "g/kg", "aggregation_operation": "mean", "aggregation_interval": 15 * 60, "aggregation_interval_units": "seconds", }, ), "salinity_std_dev": xarray.DataArray( name="salinity_std_dev", data=salinity_std_dev, attrs={ "ioos_category": "Salinity", "standard_name": "sea_water_reference_salinity_standard_deviation", "long_name": "reference salinity standard deviation", "units": "g/kg", "aggregation_operation": "standard deviation", "aggregation_interval": 15 * 60, "aggregation_interval_units": "seconds", }, ), "salinity_sample_count": xarray.DataArray( name="salinity_sample_count", data=salinity_sample_count, attrs={ "standard_name": "sea_water_reference_salinity_sample_count", "long_name": "reference salinity sample count", "aggregation_operation": "count", "aggregation_interval": 15 * 60, "aggregation_interval_units": "seconds", }, ), }, coords={ "depth": PLACES[metadata[onc_station]["place_name"]]["depth"], "longitude": PLACES[metadata[onc_station]["place_name"]]["lon lat"][0], "latitude": PLACES[metadata[onc_station]["place_name"]]["lon lat"][1], }, attrs={ "history": f""" [arrow.now().format('YYYY-MM-DD HH:mm:ss')] Download raw data from ONC scalardata API. [arrow.now().format('YYYY-MM-DD HH:mm:ss')] Filter to exclude data with qaqcFlag != 1. [arrow.now().format('YYYY-MM-DD HH:mm:ss')] Resample data to 15 minute intervals using mean, standard deviation and count as aggregation functions. [arrow.now().format('YYYY-MM-DD HH:mm:ss')] Store as netCDF4 file. """, "ONC_station": metadata[onc_station]["ONC_station"], "ONC_stationCode": onc_station, "ONC_stationDescription": metadata[onc_station]["ONC_stationDescription"], "ONC_data_product_url": f"http://dmas.uvic.ca/DataSearch?location={onc_station}" f"&deviceCategory=CTD", }, ) return ds if __name__ == "__main__": main() # pragma: no cover