Source code for nowcast.workers.collect_river_data

#  Copyright 2013 – present by the SalishSeaCast Project contributors
#  and The University of British Columbia
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
"""SalishSeaCast worker that collects river discharge observation data from
an ECCC datamart CSV file mirror, or the USGS Water Service REST service,
and appends a day-average discharge to a SOG-format forcing file.
"""
import logging
from pathlib import Path

import arrow
import httpx
import pandas
import sentry_sdk
from nemo_nowcast import NowcastWorker, WorkerError

NAME = "collect_river_data"
logger = logging.getLogger(NAME)


[docs] def main(): """For command-line usage see: :command:`python -m nowcast.workers.collect_river_data --help` """ worker = NowcastWorker(NAME, description=__doc__) worker.init_cli() worker.cli.add_argument( "data_src", choices={"ECCC", "USGS"}, help="Name of the river data service." ) worker.cli.add_argument("river_name", help="Name of the river to collect data for.") worker.cli.add_date_option( "--data-date", default=arrow.now().floor("day"), help="Date to collect river data for.", ) worker.run(collect_river_data, success, failure) return worker
def success(parsed_args): """ :param :py:class:`argparse.Namespace` parsed_args: :return: Nowcast system message type :rtype: str """ logger.info( f"{parsed_args.data_src} {parsed_args.river_name} river data collection for " f"{parsed_args.data_date.format('YYYY-MM-DD')} completed" ) return "success" def failure(parsed_args): """ :param :py:class:`argparse.Namespace` parsed_args: :return: Nowcast system message type :rtype: str """ logger.critical( f"Calculation of {parsed_args.data_src} {parsed_args.river_name} " f"river average discharge for {parsed_args.data_date.format('YYYY-MM-DD')} or " f"appending it to {parsed_args.river_name}_flow file failed" ) return "failure" def collect_river_data(parsed_args, config, *args): """ :param :py:class:`argparse.Namespace` parsed_args: :param :py:class:`nemo_nowcast.Config` config: :return: Nowcast system checklist items :rtype: dict """ data_src = parsed_args.data_src river_name = parsed_args.river_name stn_id = config["rivers"]["stations"][data_src][river_name] sentry_sdk.set_tag("stn-id", stn_id) sentry_sdk.set_tag("river-name", river_name) data_date = parsed_args.data_date logger.info( f"Collecting {data_src} {river_name} river data for {data_date.format('YYYY-MM-DD')}" ) day_avg_discharge_funcs = { # data_src: function "ECCC": _calc_eccc_day_avg_discharge, "USGS": _get_usgs_day_avg_discharge, } day_avg_discharge = day_avg_discharge_funcs[data_src](river_name, data_date, config) daily_avg_file = Path(config["rivers"]["SOG river files"][river_name]) _store_day_avg_discharge(data_date, day_avg_discharge, daily_avg_file) checklist = {"river name": river_name, "data date": data_date.format("YYYY-MM-DD")} logger.info( f"Appended {data_src} {river_name} river average discharge for " f"{data_date.format('YYYY-MM-DD')} to: {daily_avg_file}" ) return checklist def _calc_eccc_day_avg_discharge(river_name, data_date, config): """ :param str river_name: :param :py:class:`Arrow.arrow` data_date: :param :py:class:`nemo_nowcast.Config` config: :rtype: float """ csv_file_template = config["rivers"]["csv file template"] stn_id = config["rivers"]["stations"]["ECCC"][river_name] csv_file = Path(config["rivers"]["datamart dir"]) / csv_file_template.format( stn_id=stn_id ) df = pandas.read_csv( csv_file, usecols=["Date", "Discharge / Débit (cms)"], index_col="Date", date_parser=lambda x: pandas.to_datetime(x.rpartition("-")[0]), ) day_avg_discharge = df.loc[f"{data_date.format('YYYY-MM-DD')}"].mean()[ "Discharge / Débit (cms)" ] logger.debug( f"average discharge for {data_date.format('YYYY-MM-DD')} from {csv_file}: {day_avg_discharge:.6e} m^3/s" ) return day_avg_discharge def _get_usgs_day_avg_discharge(river_name, data_date, config): """ :param str river_name: :param :py:class:`Arrow.arrow` data_date: :param :py:class:`nemo_nowcast.Config` config: :rtype: float """ usgs_url = config["rivers"]["usgs url"] usgs_parmas = config["rivers"]["usgs params"] stn_id = config["rivers"]["stations"]["USGS"][river_name] yyyymmdd = data_date.format("YYYY-MM-DD") usgs_parmas.update( { "sites": stn_id, "startDT": yyyymmdd, "endDT": yyyymmdd, } ) with httpx.Client() as client: try: response = client.get(usgs_url, params=usgs_parmas, follow_redirects=True) response.raise_for_status() except httpx.RequestError as exc: msg = f"Error while requesting {exc.request.url}" logger.critical(msg) raise WorkerError(msg) except httpx.HTTPStatusError as exc: msg = f"Error response {exc.response.status_code} while requesting {exc.request.url}" logger.critical(msg) raise WorkerError(msg) try: timeseries = response.json()["value"]["timeSeries"][0] except IndexError: msg = f"{river_name} {yyyymmdd} timeSeries is empty" logger.critical(msg) raise WorkerError(msg) no_data_value = timeseries["variable"]["noDataValue"] try: cfs = timeseries["values"][0]["value"][0]["value"] except IndexError: msg = f"IndexError in {river_name} {yyyymmdd} timeSeries JSON" logger.critical(msg) raise WorkerError(msg) except KeyError: msg = f"KeyError in {river_name} {yyyymmdd} timeSeries JSON" logger.critical(msg) raise WorkerError(msg) if cfs == no_data_value: msg = ( f"Got no-data value ({no_data_value}) in {river_name} {yyyymmdd} timeSeries" ) logger.critical(msg) raise WorkerError(msg) day_avg_discharge = float(cfs) * 0.0283168 logger.debug( f"average discharge for {river_name} on {yyyymmdd} from {usgs_url}: {day_avg_discharge:.6e} m^3/s" ) return day_avg_discharge def _store_day_avg_discharge(data_date, day_avg_discharge, sog_flow_file): """ :param :py:class:`Arrow.arrow` data_date: :param float day_avg_discharge: :param :py:class:`pathlib.Path` sog_flow_file: """ with sog_flow_file.open("at") as f: f.write(f"{data_date.format('YYYY MM DD')} {day_avg_discharge:.6e}\n") logger.debug( f"appended {data_date.format('YYYY MM DD')} {day_avg_discharge:.6e} to: {sog_flow_file}" ) if __name__ == "__main__": main() # pragma: no cover