# Copyright 2013 – present by the SalishSeaCast Project contributors
# and The University of British Columbia
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""SalishSeaCast worker that collects a file containing sea surface height observations and
forecast values at Neah Bay from an HTTPS server and stores them locally for subsequent processing
by another worker to produce a sea surface height boundary condition file.
"""
import logging
import os
import tarfile
import tempfile
from pathlib import Path
import arrow
from nemo_nowcast import get_web_data, NowcastWorker
NAME = "collect_NeahBay_ssh"
logger = logging.getLogger(NAME)
[docs]
def main():
"""Set up and run the worker.
For command-line usage see:
:command:`python -m nowcast.workers.collect_NeahBay_ssh --help`
"""
worker = NowcastWorker(NAME, description=__doc__)
worker.init_cli()
worker.cli.add_argument(
"forecast",
choices={"00", "06", "12", "18"},
help="Name of forecast to collect files for.",
)
worker.cli.add_date_option(
"--data-date",
default=arrow.now().floor("day"),
help="Date to collect Neah Bay ssh data for.",
)
worker.run(collect_NeahBay_ssh, success, failure)
return worker
def success(parsed_args):
"""
:param :py:class:`argparse.Namespace` parsed_args:
:return: Nowcast system message type
:rtype: str
"""
data_date = parsed_args.data_date
forecast = parsed_args.forecast
logger.info(
f"{data_date} Neah Bay ssh {forecast}Z obs/forecast data collection complete"
)
return f"success {forecast}"
def failure(parsed_args):
"""
:param :py:class:`argparse.Namespace` parsed_args:
:return: Nowcast system message type
:rtype: str
"""
data_date = parsed_args.data_date
forecast = parsed_args.forecast
logger.critical(
f"{data_date} Neah Bay ssh {forecast}Z obs/forecast data collection failed"
)
return f"failure {forecast}"
def collect_NeahBay_ssh(parsed_args, config, *args):
"""
:param :py:class:`argparse.Namespace` parsed_args:
:param :py:class:`nemo_nowcast.Config` config:
:return: Nowcast system checklist items
:rtype: dict
"""
data_date = parsed_args.data_date
yyyymmdd = arrow.get(data_date).format("YYYYMMDD")
forecast = parsed_args.forecast
logger.info(
f"collecting Neah Bay ssh {forecast}Z obs/forecast for {data_date.format('YYYY-MM-DD')}"
)
url_tmpl = config["ssh"]["download"]["url template"]
tar_url = url_tmpl.format(yyyymmdd=yyyymmdd, forecast=forecast)
tar_file_tmpl = config["ssh"]["download"]["tar file template"]
tar_file = tar_file_tmpl.format(yyyymmdd=yyyymmdd, forecast=forecast)
ssh_dir = Path(config["ssh"]["ssh dir"])
csv_file = Path(tar_file).with_suffix(".csv")
csv_file_path = ssh_dir / "txt" / csv_file
tar_csv_tmpl = config["ssh"]["download"]["tarball csv file template"]
tar_csv_member = tar_csv_tmpl.format(yyyymmdd=yyyymmdd, forecast=forecast)
with tempfile.TemporaryDirectory() as tmp_dir:
tar_file_path = Path(tmp_dir, tar_file)
logger.debug(f"downloading {tar_url}")
get_web_data(tar_url, NAME, tar_file_path)
size = os.stat(tar_file_path).st_size
logger.debug(f"downloaded {size} bytes from {tar_url}")
_extract_csv(tar_csv_member, tar_file_path, csv_file_path)
checklist = {
"data date": data_date.format("YYYY-MM-DD"),
f"{forecast}": os.fspath(csv_file_path),
}
return checklist
def _extract_csv(tar_csv_member, tar_file_path, csv_file_path):
with tarfile.open(tar_file_path) as tar:
logger.debug(f"extracting {tar_csv_member} from tarball to {csv_file_path}")
buff = tar.extractfile(tar.getmember(tar_csv_member))
with csv_file_path.open("wb") as f:
f.write(buff.read())
size = os.stat(csv_file_path).st_size
logger.debug(f"wrote {size} bytes to {csv_file_path}")
if __name__ == "__main__":
main() # pragma: no cover