Source code for polar2grid.utils.search_noaa_s3_viirs

#!/usr/bin/env python3
"""Glob for VIIRS SDR objects in NOAA NESDIS S3 buckets.

Example usage:
  # Single band
  python glob_viirs_s3.py --satellite n20 --band I05 \
      --start-time 2026-04-24T01:10 --end-time 2026-04-24T01:12

  # Multiple bands
  python glob_viirs_s3.py --satellite n20 --band I04 --band I05 --band M15 \
      --start-time 2026-04-24T00:00 --end-time 2026-04-24T01:00

  # All bands
  python glob_viirs_s3.py --satellite n20 --band ALL \
      --start-time 2026-04-24T00:00 --end-time 2026-04-24T23:59 \
      --no-sign-request
"""

from __future__ import annotations

import argparse
from glob import fnmatch
import os
import re
import sys
from collections.abc import Iterable, Iterator
from datetime import datetime, timedelta, timezone

import s3fs

BUCKET_FORMAT_STR = os.environ.get("BUCKET_FORMAT_STR", "noaa-nesdis-{satellite}-pds")
GRANULE_DURATION_SECONDS = 90

# ---------------------------------------------------------------------------
# Band helpers
# ---------------------------------------------------------------------------

VALID_BANDS = (
    ["I01", "I02", "I03", "I04", "I05"]
    + [f"M{n:02d}" for n in range(1, 17)]
    # non-terrain corrected geolocation is not available from NOAA buckets
    + ["DNB", "GITCO", "GMTCO", "GDNBO"]  # + ["GIMGO", "GMODO"]
)
DEFAULT_BANDS = VALID_BANDS

# Special token accepted by --band that expands to every band
ALL_BANDS_TOKEN = "ALL"


[docs] def parse_band(value: str) -> list[str]: """Validate a single --band value and return the list of bands it represents. "ALL" expands to every band; otherwise the value must be a known band name. """ upper = value.upper() if upper == ALL_BANDS_TOKEN: return list(DEFAULT_BANDS) if upper not in VALID_BANDS: raise argparse.ArgumentTypeError( f"Invalid band {value!r}. Choose from {', '.join(VALID_BANDS)} or '{ALL_BANDS_TOKEN}'." ) return [upper]
[docs] def band_to_prefix(band: str) -> str: """Convert a band name to the SDR or GEO product-prefix segment used in the S3 key. For example, the I01 SDR band is converted to "VIIRS-I1-SDR". The geolocation "band" GITCO is converted to "VIIRS-IMG-GEO-TC". """ if band.startswith("G"): # M-band and I-band geolocation are only available for terrain-corrected (TC) band_type = "GEO" if band[1] == "D" else "GEO-TC" else: band_type = "SDR" if band == "DNB": prefix_id = band elif band[0] == "G": prefix_id = { "D": "DNB", "I": "IMG", "M": "MOD", }[band[1]] else: letter = band[0] # 'I' or 'M' number = int(band[1:]) # strip leading zero prefix_id = f"{letter}{number}" return f"VIIRS-{prefix_id}-{band_type}"
# --------------------------------------------------------------------------- # Date/time helpers # ---------------------------------------------------------------------------
[docs] def parse_datetime(s: str) -> datetime: """Parse date/times from the command line. Accept ISO-8601-ish strings: YYYY-MM-DDTHH:MM[:SS] """ for fmt in ("%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M", "%Y-%m-%d"): try: return datetime.strptime(s, fmt).replace(tzinfo=timezone.utc) except ValueError: pass raise argparse.ArgumentTypeError(f"Cannot parse datetime {s!r}. Use YYYY-MM-DDTHH:MM[:SS] or YYYY-MM-DD.")
[docs] def iter_day_prefixes(start: datetime, end: datetime): """Yield every unique date/time between start and end (inclusive) at hour resolution.""" curr_time = start.replace(minute=0, second=0, microsecond=0) if start.hour == 0 and (start - curr_time).total_seconds() < GRANULE_DURATION_SECONDS: # if we might get a partial granule on the midnight boundary, then search the 23rd hour of the previous day curr_time = curr_time - timedelta(hours=1) end_day = end.replace(minute=0, second=0, microsecond=0) if end_day.hour == 23 and ((end_day + timedelta(hours=1)) - end).total_seconds() < GRANULE_DURATION_SECONDS: # if we might get a partial granule on the end of the day over midnight # then search the first hour of the next day end_day = end_day + timedelta(hours=1) while curr_time <= end_day: yield curr_time curr_time += timedelta(hours=1)
# --------------------------------------------------------------------------- # Filename time extraction # --------------------------------------------------------------------------- _FNAME_RE = re.compile( r"_d(?P<date>\d{8})" # dYYYYMMDD r"_t(?P<tstart>\d{7})" # tHHMMSSd (tenths digit after seconds) r"_e(?P<tend>\d{7})" # eHHMMSSd )
[docs] def file_start_end_time(filename: str) -> tuple[datetime, datetime] | tuple[None, None]: """Parse the granule start time from a VIIRS SDR filename.""" if (m := _FNAME_RE.search(filename)) is None: print(f"Unexpected filename scheme discovered: {filename}", file=sys.stderr) return None, None try: return _convert_file_times_to_datetimes( m.group("date"), # YYYYMMDD m.group("tstart")[:6], # HHMMSS (drop microseconds) m.group("tend")[:6], ) except ValueError: print(f"Could not parse time information: {filename}", file=sys.stderr) return None, None
[docs] def _convert_file_times_to_datetimes(date_str: str, tstart: str, tend: str) -> tuple[datetime, datetime]: start_dt = datetime.strptime(date_str + tstart, "%Y%m%d%H%M%S").replace(tzinfo=timezone.utc) end_dt = datetime.strptime(date_str + tend, "%Y%m%d%H%M%S").replace(tzinfo=timezone.utc) if end_dt < start_dt: end_dt += timedelta(days=1) return start_dt, end_dt
[docs] def build_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser(description="Glob VIIRS SDR files in a NOAA NESDIS public S3 bucket.") p.add_argument( "-s", "--satellite", required=True, help="Satellite identifier, e.g. n20, n21, snpp.", ) p.add_argument( "-b", "--band", required=True, action="append", dest="bands", type=parse_band, metavar="BAND", help=( "VIIRS band to search for. May be specified multiple times. " f"Individual choices: {', '.join(VALID_BANDS)}. " f"Use '{ALL_BANDS_TOKEN}' to select every band plus terrain-corrected geolocation." ), ) p.add_argument( "--start-time", required=True, type=parse_datetime, metavar="YYYY-MM-DDTHH:MM[:SS]", help="Earliest granule start time (UTC).", ) p.add_argument( "--end-time", required=True, type=parse_datetime, metavar="YYYY-MM-DDTHH:MM[:SS]", help="Latest granule end time (UTC).", ) p.add_argument( "--print-urls", action="store_true", help="Print HTTPS URLs instead of s3:// paths.", ) return p
[docs] def main(): parser = build_parser() args = parser.parse_args() if args.start_time > args.end_time: parser.error("--start-time must be before --end-time") if (args.end_time - args.start_time).total_seconds() >= 60 * 60: parser.error("Time range can't be more than 60 minutes.") # Flatten the list-of-lists produced by action="append" + type=parse_band, # then deduplicate while preserving the canonical VALID_BANDS order. requested = {band for group in args.bands for band in group} bands = [b for b in VALID_BANDS if b in requested] satellite = args.satellite.lower() fs = s3fs.S3FileSystem(anon=True) found = [] for glob_pattern in _generate_glob_patterns(bands, args.start_time, args.end_time, satellite): glob_matches = _glob_s3_fs(fs, glob_pattern) found.extend(_filter_by_start_end(glob_matches, args.start_time, args.end_time)) if not found: print("No matching objects found.", file=sys.stderr) sys.exit(1) _print_uris(found, args.print_urls)
[docs] def _generate_glob_patterns( bands: Iterable[str], start_time: datetime, end_time: datetime, satellite: str ) -> Iterator[str]: fn_platform = { "snpp": "npp", "n20": "j01", "n21": "j02", "n22": "j03", "n23": "j04", }[satellite] bucket = BUCKET_FORMAT_STR.format(satellite=satellite) for band in bands: prefix_code = band_to_prefix(band) # e.g. "I5", "M3", "DNB" fn_band_id = f"SV{band}" if band[0] in ("M", "I", "D") else band for glob_datetime in iter_day_prefixes(start_time, end_time): day_path = glob_datetime.strftime("%Y/%m/%d") fn_day_str = day_path.replace("/", "") fn_time_hour = glob_datetime.strftime("%H") # e.g. noaa-nesdis-n20-pds/VIIRS-I5-SDR/2026/04/24/ prefix = f"{bucket}/{prefix_code}/{day_path}/" # later process (see _glob_s3_fs) is very particular about how many wildcards are used glob_pattern = f"{prefix}{fn_band_id}_{fn_platform}_d{fn_day_str}_t{fn_time_hour}*.h5" yield glob_pattern
[docs] def _glob_s3_fs(fs: s3fs.S3FileSystem, glob_pattern: str) -> Iterator[str]: # At the time of writing fs.glob is slow because it iterates over all # objects in the bucket instead of using a prefix # yield from fs.glob(glob_pattern) # return uri_path, uri_fn = glob_pattern.rsplit("/", 1) uri_fn = uri_fn.rstrip("/") static_fn_prefix = uri_fn.split("*", 1)[0] for result in fs.find(uri_path, prefix=static_fn_prefix): res_fn = result.rsplit("/", 1)[1] if not fnmatch.fnmatch(res_fn, uri_fn): continue yield result
[docs] def _filter_by_start_end(possible_paths: Iterable[str], start_time: datetime, end_time: datetime) -> Iterator[str]: for path in possible_paths: fname = path.rsplit("/", 1)[-1] if _overlaps_time_range(fname, start_time, end_time): yield path
[docs] def _overlaps_time_range(fname: str, start_time: datetime, end_time: datetime) -> bool: t_start, t_end = file_start_end_time(fname) return t_start is not None and t_end is not None and not (t_end < start_time or t_start > end_time)
[docs] def _print_uris(paths: Iterable[str], print_urls: bool) -> None: for path in paths: if print_urls: # s3://bucket/key -> https://bucket.s3.amazonaws.com/key parts = path.split("/", 1) bkt, key = parts[0], parts[1] if len(parts) > 1 else "" print(f"https://{bkt}.s3.amazonaws.com/{key}") else: print(f"s3://{path}")
if __name__ == "__main__": main()