Source code for polar2grid.glue

#!/usr/bin/env python
# encoding: utf-8
# Copyright (C) 2021 Space Science and Engineering Center (SSEC),
#  University of Wisconsin-Madison.
#
#     This program is free software: you can redistribute it and/or modify
#     it under the terms of the GNU General Public License as published by
#     the Free Software Foundation, either version 3 of the License, or
#     (at your option) any later version.
#
#     This program is distributed in the hope that it will be useful,
#     but WITHOUT ANY WARRANTY; without even the implied warranty of
#     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#     GNU General Public License for more details.
#
#     You should have received a copy of the GNU General Public License
#     along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# This file is part of the polar2grid software package. Polar2grid takes
# satellite observation data, remaps it, and writes it to a file format for
# input into another program.
# Documentation: http://www.ssec.wisc.edu/software/polar2grid/
"""Connect various satpy components together to go from satellite data to output imagery format."""

from __future__ import annotations

import contextlib
import logging
import os
import shutil
import sys
import tempfile
from collections.abc import Iterable
from datetime import datetime
from typing import Optional, Union

import dask
import satpy
from dask.diagnostics import ProgressBar
from pyresample import SwathDefinition
from satpy import DataID, Scene
from satpy.writers import compute_writer_results

from polar2grid._glue_argparser import GlueArgumentParser, get_p2g_defaults_env_var
from polar2grid.core.script_utils import (
    create_exc_handler,
    rename_log_file,
    setup_logging,
)
from polar2grid.filters import filter_scene
from polar2grid.readers._base import ReaderProxyBase
from polar2grid.resample import resample_scene
from polar2grid.utils.config import add_polar2grid_config_paths
from polar2grid.utils.dynamic_imports import get_reader_attr
from polar2grid.utils.legacy_compat import get_sensor_alias

LOG = logging.getLogger(__name__)

_PLATFORM_ALIASES = {
    "suomi-npp": "npp",
    "snpp": "npp",
    "n20": "noaa20",
    "n21": "noaa21",
    "n22": "noaa22",
    "n23": "noaa23",
    "noaa-18": "noaa18",
    "noaa-19": "noaa19",
    "noaa-20": "noaa20",
    "noaa-21": "noaa21",
    "noaa-22": "noaa22",
    "noaa-23": "noaa23",
    "jpss-1": "noaa20",
    "jpss-2": "noaa21",
    "jpss-3": "noaa22",
    "jpss-4": "noaa23",
    "j1": "noaa20",
    "j2": "noaa21",
    "j3": "noaa22",
    "j4": "noaa23",
    "fy-3b": "fy3b",
    "fy-3c": "fy3c",
    "fy-3d": "fy3d",
    "eos-aqua": "aqua",
    "eos-terra": "terra",
    "aqua": "aqua",
    "terra": "terra",
    "gcom-w1": "gcom-w1",
    "metop-a": "metopa",
    "metop-b": "metopb",
    "metop-c": "metopc",
}


[docs]def _get_platform_name_alias(satpy_platform_name): return _PLATFORM_ALIASES.get(satpy_platform_name.lower(), satpy_platform_name)
[docs]def _overwrite_platform_name_with_aliases(scn): """Change 'platform_name' for every DataArray to Polar2Grid expectations.""" for data_arr in scn: if "platform_name" not in data_arr.attrs: continue pname = _get_platform_name_alias(data_arr.attrs["platform_name"]) data_arr.attrs["platform_name"] = pname
[docs]def _overwrite_sensor_with_aliases(scn): """Change 'sensor' for every DataArray to Polar2Grid expectations.""" for data_arr in scn: if "sensor" not in data_arr.attrs: continue pname = get_sensor_alias(data_arr.attrs["sensor"]) data_arr.attrs["sensor"] = pname
[docs]def _write_scene( scn: Scene, writers: list[str], writer_args: dict[str, dict], data_ids: list[DataID], to_save: Optional[list] = None ): if to_save is None: to_save = [] if not data_ids: # no datasets to save return to_save _assign_default_native_area_id(scn, data_ids) for writer_name in writers: wargs = writer_args[writer_name] _write_scene_with_writer(scn, writer_name, data_ids, wargs, to_save) return to_save
[docs]def _assign_default_native_area_id(scn: Scene, data_ids: list[DataID]) -> None: for data_id in data_ids: area_def = scn[data_id].attrs.get("area") if area_def is None or hasattr(area_def, "area_id"): continue scn[data_id].attrs["area"].area_id = "native"
[docs]def _write_scene_with_writer(scn: Scene, writer_name: str, data_ids: list[DataID], wargs: dict, to_save: list) -> None: res = scn.save_datasets(writer=writer_name, compute=False, datasets=data_ids, **wargs) if res and isinstance(res[0], (tuple, list)): # list of (dask-array, file-obj) tuples to_save.extend(zip(*res)) else: # list of delayed objects to_save.extend(res)
[docs]def _print_list_products(reader_info, is_polar2grid: bool, p2g_only: bool): available_p2g_names, available_custom_names, available_satpy_names = reader_info.get_available_products() available_satpy_names = ["*" + _sname for _sname in available_satpy_names] available_custom_names = ["*" + _sname for _sname in available_custom_names] project_name = "Polar2Grid" if is_polar2grid else "Geo2Grid" print("### Custom User Products") print("\n".join(sorted(available_custom_names)) if available_custom_names else "<None>") print() if not p2g_only: print("### Non-standard Satpy Products") print("\n".join(sorted(available_satpy_names)) if available_satpy_names else "<None>") print() print(f"### Standard Available {project_name} Products") print("\n".join(sorted(available_p2g_names)) if available_p2g_names else "<None>")
[docs]def _create_scene(scene_creation: dict) -> Optional[Scene]: try: scn = Scene(**scene_creation) except ValueError as e: LOG.error("{} | Enable debug message (-vvv) or see log file for details.".format(str(e))) LOG.debug("Further error information: ", exc_info=True) return except OSError: LOG.error("Could not open files. Enable debug message (-vvv) or see log file for details.") LOG.debug("Further error information: ", exc_info=True) return return scn
[docs]def _resample_scene_to_grids( scn: Scene, reader_names: list[str], resample_args: dict, filter_kwargs: dict, preserve_resolution: bool, use_polar2grid_defaults: bool, ) -> list[tuple]: ll_bbox = resample_args.pop("ll_bbox") if ll_bbox: scn = scn.crop(ll_bbox=ll_bbox) scn = filter_scene( scn, reader_names, **filter_kwargs, ) if scn is None: LOG.info("No remaining products after filtering.") return [] areas_to_resample = resample_args.pop("grids") antimeridian_mode = resample_args.pop("antimeridian_mode") if "ewa_persist" in resample_args: resample_args["persist"] = resample_args.pop("ewa_persist") scenes_to_save = resample_scene( scn, areas_to_resample, antimeridian_mode=antimeridian_mode, preserve_resolution=preserve_resolution, is_polar2grid=use_polar2grid_defaults, **resample_args, ) return scenes_to_save
[docs]def _save_scenes(scenes_to_save: list[tuple], reader_info, writer_args) -> list: to_save = [] for scene_to_save, products_to_save in scenes_to_save: _overwrite_platform_name_with_aliases(scene_to_save) _overwrite_sensor_with_aliases(scene_to_save) reader_info.apply_p2g_name_to_scene(scene_to_save) to_save = _write_scene( scene_to_save, writer_args["writers"], writer_args, products_to_save, to_save=to_save, ) return to_save
[docs]def _get_glue_name(args): reader_name = "NONE" if args.readers is None else args.readers[0] writer_names = "-".join(args.writers or []) return f"{reader_name}_{writer_names}"
[docs]@contextlib.contextmanager def _create_profile_html_if(create_profile: Union[False, None, str], project_name: str, glue_name: str): from dask.diagnostics import CacheProfiler, Profiler, ResourceProfiler, visualize if create_profile is False: yield return if create_profile is None: profile_filename = "{project_name}_{glue_name}_{start_time:%Y%m%d_%H%M%S}.html" else: profile_filename = create_profile start_time = datetime.now() with CacheProfiler() as cprof, ResourceProfiler() as rprof, Profiler() as prof: yield end_time = datetime.now() profile_filename = profile_filename.format( project_name=project_name, glue_name=glue_name, start_time=start_time, end_time=end_time, ) profile_filename = os.path.abspath(profile_filename) visualize([prof, rprof, cprof], file_path=profile_filename, show=False) print(f"Profile HTML: file://{profile_filename}")
[docs]def main(argv=sys.argv[1:]): ret = -1 try: processor = _GlueProcessor(argv) except FileNotFoundError: return ret try: ret = processor() finally: processor.cleanup() return ret
[docs]class _GlueProcessor: """Helper class to make calling processing steps easier.""" def __init__(self, argv): add_polar2grid_config_paths() self.is_polar2grid = get_p2g_defaults_env_var() self.arg_parser = GlueArgumentParser(argv, self.is_polar2grid) self.glue_name = _get_glue_name(self.arg_parser._args) self.rename_log = _prepare_initial_logging(self.arg_parser, self.glue_name) self.tmp_config_paths = [] self._handle_extra_config_paths(self.arg_parser._args) self._clean = False
[docs] def _handle_extra_config_paths(self, args): if not args.extra_config_path: return _check_valid_config_paths(args.extra_config_path) new_config_paths = [] # Preserve user's specified order to handle inheritance/overrides for extra_config_path in args.extra_config_path: if os.path.isdir(extra_config_path): new_config_paths.append(extra_config_path) continue tmp_config_path = _create_tmp_enhancement_config_dir(extra_config_path) new_config_paths.append(tmp_config_path) self.tmp_config_paths.append(tmp_config_path) _add_extra_config_paths(new_config_paths) LOG.debug(f"Satpy config path is: {satpy.config.get('config_path')}")
[docs] def cleanup(self): self._clean = True for tmp_config_path in self.tmp_config_paths: LOG.debug(f"Deleting temporary config directory: {tmp_config_path}") shutil.rmtree(tmp_config_path, ignore_errors=True)
def __call__(self): # Set up dask and the number of workers common_args = self.arg_parser._args if common_args.num_workers: dask.config.set(num_workers=common_args.num_workers) with _create_profile_html_if( common_args.create_profile, "polar2grid" if self.is_polar2grid else "geo2grid", self.glue_name, ): return self._run_processing()
[docs] def _run_processing(self): LOG.info("Sorting and reading input files...") arg_parser = self.arg_parser preferred_chunk_size = get_reader_attr(arg_parser._scene_creation["reader"], "PREFERRED_CHUNK_SIZE", 1024) _set_preferred_chunk_size(preferred_chunk_size) scn = _create_scene(arg_parser._scene_creation) if scn is None: return -1 if self.rename_log: stime = getattr(scn, "start_time", scn.attrs.get("start_time")) rename_log_file(self.glue_name + stime.strftime("_%Y%m%d_%H%M%S.log")) # Load the actual data arrays and metadata (lazy loaded as dask arrays) LOG.info("Loading product metadata from files...") load_args = arg_parser._load_args.copy() user_products = load_args.pop("products") reader_info = ReaderProxyBase.from_reader_name(arg_parser._scene_creation["reader"], scn, user_products) if arg_parser._args.list_products or arg_parser._args.list_products_all: _print_list_products(reader_info, self.is_polar2grid, not arg_parser._args.list_products_all) return 0 products = reader_info.get_satpy_products_to_load() persist_geolocation = not arg_parser._reader_args.pop("no_persist_geolocation", False) if not products: return -1 scn.load(products, **load_args, generate=False) if persist_geolocation: scn = _persist_swath_definition_in_scene(scn) scn.generate_possible_composites(True) reader_args = arg_parser._reader_args filter_kwargs = { "sza_threshold": reader_args["sza_threshold"], "day_fraction": reader_args["filter_day_products"], "night_fraction": reader_args["filter_night_products"], } scenes_to_save = _resample_scene_to_grids( scn, arg_parser._reader_names, arg_parser._resample_args, filter_kwargs, arg_parser._args.preserve_resolution, self.is_polar2grid, ) to_save = _save_scenes(scenes_to_save, reader_info, arg_parser._writer_args) if arg_parser._args.progress: pbar = ProgressBar() pbar.register() LOG.info("Computing products and saving data to writers...") if not to_save: LOG.warning( "No product files produced given available valid data and " "resampling settings. This can happen if the writer " "detects that no valid output will be written or the " "input data does not overlap with the target grid." ) compute_writer_results(to_save) LOG.info("SUCCESS") return 0
[docs]def _prepare_initial_logging(arg_parser, glue_name: str) -> bool: global LOG LOG = logging.getLogger(glue_name) # Prepare logging args = arg_parser._args rename_log = False if args.log_fn is None: rename_log = True args.log_fn = glue_name + "_fail.log" levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG] setup_logging(console_level=levels[min(3, args.verbosity)], log_filename=args.log_fn) logging.getLogger("rasterio").setLevel(levels[min(1, args.verbosity)]) logging.getLogger("fsspec").setLevel(levels[min(2, args.verbosity)]) logging.getLogger("s3fs").setLevel(levels[min(2, args.verbosity)]) logging.getLogger("aiobotocore").setLevel(levels[min(2, args.verbosity)]) logging.getLogger("botocore").setLevel(levels[min(2, args.verbosity)]) sys.excepthook = create_exc_handler(LOG.name) if levels[min(3, args.verbosity)] > logging.DEBUG: import warnings warnings.filterwarnings("ignore") LOG.debug("Starting script with arguments: %s", " ".join(arg_parser.argv)) return rename_log
[docs]def _add_extra_config_paths(extra_paths: list[str]): config_path = satpy.config.get("config_path") LOG.info(f"Adding additional configuration paths: {extra_paths}") satpy.config.set(config_path=config_path + extra_paths)
[docs]def _check_valid_config_paths(extra_config_paths: Iterable): single_file_configs = [config_path for config_path in extra_config_paths if os.path.isfile(config_path)] config_paths = [config_path for config_path in extra_config_paths if os.path.isdir(config_path)] invalid_paths = set(extra_config_paths) - (set(single_file_configs) | set(config_paths)) if invalid_paths: str_paths = "\n\t".join(sorted(invalid_paths)) msg = f"Specified extra config paths don't exist:\n\t{str_paths}" LOG.error(msg) raise FileNotFoundError(msg)
[docs]def _create_tmp_enhancement_config_dir(enh_yaml_file: str) -> str: config_dir = tempfile.mkdtemp(prefix="p2g_tmp_config") enh_dir = os.path.join(config_dir, "enhancements") LOG.debug(f"Creating temporary config directory for enhancement file '{enh_yaml_file}': {enh_dir}") os.makedirs(enh_dir) new_enh_file = os.path.join(enh_dir, "generic.yaml") shutil.copy(enh_yaml_file, new_enh_file) return config_dir
[docs]def _set_preferred_chunk_size(preferred_chunk_size: int) -> None: pcs_in_mb = (preferred_chunk_size * preferred_chunk_size) * 8 // (1024 * 1024) if "PYTROLL_CHUNK_SIZE" not in os.environ: LOG.debug(f"Setting preferred chunk size to {preferred_chunk_size} pixels or {pcs_in_mb:d}MiB") satpy.CHUNK_SIZE = preferred_chunk_size os.environ["PYTROLL_CHUNK_SIZE"] = f"{preferred_chunk_size:d}" dask.config.set({"array.chunk-size": f"{pcs_in_mb:d}MiB"}) else: LOG.debug(f"Using environment variable chunk size: {os.environ['PYTROLL_CHUNK_SIZE']}")
[docs]def _persist_swath_definition_in_scene(scn: Scene) -> None: to_persist_swath_defs = _swaths_to_persist(scn) if not to_persist_swath_defs: return scn to_update_data_arrays, to_persist_lonlats = zip(*to_persist_swath_defs.values()) LOG.info("Loading swath geolocation into memory...") persisted_lonlats = dask.persist(*to_persist_lonlats) persisted_swath_defs = [SwathDefinition(plons, plats) for plons, plats in persisted_lonlats] new_scn = scn.copy() for arrays_to_update, persisted_swath_def in zip(to_update_data_arrays, persisted_swath_defs): for array_to_update in arrays_to_update: array_to_update.attrs["area"] = persisted_swath_def new_scn._datasets[array_to_update.attrs["_satpy_id"]] = array_to_update LOG.debug(f"{len(to_persist_swath_defs)} unique swath definitions persisted") return new_scn
[docs]def _swaths_to_persist(scn: Scene) -> dict: to_persist_swath_defs = {} for data_arr in scn.values(): swath_def = data_arr.attrs.get("area") if not isinstance(swath_def, SwathDefinition): continue this_swath_data_array_copies, _ = to_persist_swath_defs.setdefault( swath_def, ([], (swath_def.lons, swath_def.lats)) ) this_swath_data_array_copies.append(data_arr.copy()) return to_persist_swath_defs
if __name__ == "__main__": sys.exit(main())