Source code for polar2grid.filters.day_night

#!/usr/bin/env python
# encoding: utf-8
# Copyright (C) 2021 Space Science and Engineering Center (SSEC),
#  University of Wisconsin-Madison.
#
#     This program is free software: you can redistribute it and/or modify
#     it under the terms of the GNU General Public License as published by
#     the Free Software Foundation, either version 3 of the License, or
#     (at your option) any later version.
#
#     This program is distributed in the hope that it will be useful,
#     but WITHOUT ANY WARRANTY; without even the implied warranty of
#     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#     GNU General Public License for more details.
#
#     You should have received a copy of the GNU General Public License
#     along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# This file is part of the polar2grid software package. Polar2grid takes
# satellite observation data, remaps it, and writes it to a file format for
# input into another program.
# Documentation: http://www.ssec.wisc.edu/software/polar2grid/
"""Filter classes for making decisions based on day or night percentages."""

import logging

try:
    # Python 3.9+
    from functools import cache
except ImportError:
    from functools import lru_cache as cache

import numpy as np
from pyorbital.astronomy import sun_zenith_angle
from pyresample.spherical import SphPolygon
from xarray import DataArray

from ._base import BaseFilter
from ._utils import polygon_for_area

logger = logging.getLogger(__name__)


[docs]@cache def _get_sunlight_coverage(area_def, start_time, sza_threshold=90.0, overpass=None): """Get the sunlight coverage of *area_def* at *start_time* as a value between 0 and 1.""" if area_def.ndim == 1: logger.debug("Source data is 1 dimensional, will not filter based on sunlight coverage.") return False adp = polygon_for_area(area_def) poly = get_twilight_poly(start_time) if overpass is not None: ovp = overpass.boundary.contour_poly cut_area_poly = adp.intersection(ovp) else: cut_area_poly = adp if cut_area_poly is None: if not adp._is_inside(ovp): return 0.0 else: # Should already have been taken care of in pyresample.spherical.intersection cut_area_poly = adp daylight = cut_area_poly.intersection(poly) if daylight is None: if sun_zenith_angle(start_time, *area_def.get_lonlat(0, 0)) < sza_threshold: return 1.0 else: return 0.0 else: daylight_area = daylight.area() total_area = adp.area() return daylight_area / total_area
[docs]def modpi(val, mod=np.pi): """Put *val* between -*mod* and *mod*.""" return (val + mod) % (2 * mod) - mod
[docs]def get_twilight_poly(utctime): """Return a polygon enclosing the sunlit part of the globe at *utctime*.""" from pyorbital import astronomy ra, dec = astronomy.sun_ra_dec(utctime) lon = modpi(ra - astronomy.gmst(utctime)) lat = dec vertices = np.zeros((4, 2)) vertices[0, :] = modpi(lon - np.pi / 2), 0 if lat <= 0: vertices[1, :] = lon, np.pi / 2 + lat vertices[3, :] = modpi(lon + np.pi), -(np.pi / 2 + lat) else: vertices[1, :] = modpi(lon + np.pi), np.pi / 2 - lat vertices[3, :] = lon, -(np.pi / 2 - lat) vertices[2, :] = modpi(lon + np.pi / 2), 0 return SphPolygon(vertices)
[docs]class SunlightCoverageFilter(BaseFilter): """Base class for filtering based on day/night coverage.""" FILTER_MSG = "Unloading '{}' because there is not enough day/night coverage." def __init__(self, product_filter_criteria: dict = None, sza_threshold: float = 100.0, fraction: float = 0.1): """Initialize thresholds and default search criteria.""" super().__init__(product_filter_criteria) self._sza_threshold = sza_threshold self._fraction = fraction
[docs] def _should_be_filtered(self, sunlight_coverage): raise NotImplementedError("Subclass must implement the filter decision")
[docs] def _filter_data_array(self, data_arr: DataArray, _cache: dict): """Check if this DataArray should be removed. Returns: True if it meets the condition and does not have enough day data and should therefore be removed. False otherwise meaning it should be kept. """ if not self._matches_criteria(data_arr): return False area = data_arr.attrs["area"] start_time = data_arr.attrs["start_time"] end_time = data_arr.attrs["end_time"] mid_time = start_time + (end_time - start_time) / 2 sza_threshold = self._sza_threshold if area not in _cache: slc = _get_sunlight_coverage(area, mid_time, sza_threshold) logger.debug("Sunlight is estimated to cover %0.2f%% of the scene.", slc * 100) _cache[area] = self._should_be_filtered(slc) return _cache[area]
[docs]class DayCoverageFilter(SunlightCoverageFilter): """Remove certain products when there is not enough day data.""" FILTER_MSG = "Unloading '{}' because there is not enough day data." def __init__(self, product_filter_criteria: dict = None, sza_threshold: float = 100.0, day_fraction: float = 0.1): """Initialize thresholds and default search criteria.""" super().__init__(product_filter_criteria, sza_threshold=sza_threshold, fraction=day_fraction)
[docs] def _should_be_filtered(self, sunlight_coverage): return sunlight_coverage < self._fraction
[docs]class NightCoverageFilter(SunlightCoverageFilter): """Remove certain products when there is not enough night data.""" FILTER_MSG = "Unloading '{}' because there is not enough night data." def __init__(self, product_filter_criteria: dict = None, sza_threshold: float = 100.0, night_fraction: float = 0.1): """Initialize thresholds and default search criteria.""" super().__init__(product_filter_criteria, sza_threshold=sza_threshold, fraction=night_fraction)
[docs] def _should_be_filtered(self, sunlight_coverage): return (1 - sunlight_coverage) < self._fraction