diff --git a/pyproject.toml b/pyproject.toml index d3279e91..e3033ad2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,7 +56,8 @@ markers = [ packages = [ "src/evalml", "src/verification", - "src/data_input" + "src/data_input", + "src/spectra" ] [tool.uv.sources] diff --git a/src/evalml/config.py b/src/evalml/config.py index c36ea4df..c2822835 100644 --- a/src/evalml/config.py +++ b/src/evalml/config.py @@ -1,7 +1,7 @@ from pathlib import Path -from typing import Dict, List, Any, ClassVar, FrozenSet, Optional +from typing import Dict, List, Any, ClassVar, FrozenSet, Literal, Optional -from pydantic import BaseModel, Field, RootModel, field_validator +from pydantic import BaseModel, Field, RootModel, field_validator, model_validator PROJECT_ROOT = Path(__file__).parents[2] @@ -302,6 +302,57 @@ class ExperimentScorecardConfig(BaseModel): model_config = {"extra": "forbid"} +class SpectraConfig(BaseModel): + """Configuration for power-spectra QC plots in the experiment pipeline.""" + + enabled: bool = Field( + default=False, + description="Whether to compute and plot power spectra.", + ) + method: Literal["dct", "fft"] = Field( + default="dct", + description="Spectral method: 'dct' (default, recommended for LAM) or 'fft'.", + ) + lead_times: List[int] = Field( + default_factory=list, + description="Representative lead times (hours) at which to compute spectra.", + ) + variables: List[str] = Field( + default=["T_2M", "WIND_KE", "TOT_PREC"], + description="Spectra variables. Supported: T_2M, WIND_KE (from U/V_10M), TOT_PREC.", + ) + init_hours: Optional[List[int]] = Field( + default=None, + description="Optional subset of init hours to average over. None = all.", + ) + + @field_validator("variables") + @classmethod + def validate_variables(cls, v: List[str]) -> List[str]: + if not v: + raise ValueError( + "`variables` must list at least one variable when spectra is configured." + ) + allowed = {"T_2M", "WIND_KE", "TOT_PREC"} + invalid = set(v) - allowed + if invalid: + raise ValueError( + f"Unsupported spectra variable(s) {invalid!r}. Must be subset of {allowed}." + ) + return v + + @model_validator(mode="after") + def check_enabled_requirements(self): + if self.enabled and not self.lead_times: + raise ValueError( + "experiment.spectra.enabled is true but `lead_times` is empty; " + "specify at least one lead time (hours)." + ) + return self + + model_config = {"extra": "forbid"} + + class ShowcaseConfig(BaseModel): """Configuration for the showcase workflow.""" @@ -373,6 +424,10 @@ class ExperimentConfig(BaseModel): default=None, description="Scorecard generation configuration. Omit or set enabled: false to disable.", ) + spectra: SpectraConfig = Field( + default_factory=SpectraConfig, + description="Power-spectra QC configuration. Disabled by default.", + ) @field_validator("thresholds") @classmethod diff --git a/src/spectra/__init__.py b/src/spectra/__init__.py new file mode 100644 index 00000000..7599fd4a --- /dev/null +++ b/src/spectra/__init__.py @@ -0,0 +1,7 @@ +"""Variance power-spectra diagnostics for evalml. + +Public surface re-exported for convenience; submodules can also be imported +directly. The spectral `core` is numpy-only and reusable for any model. +""" + +from spectra import compute, core, io, regrid # noqa: F401 diff --git a/src/spectra/compute.py b/src/spectra/compute.py new file mode 100644 index 00000000..336788fa --- /dev/null +++ b/src/spectra/compute.py @@ -0,0 +1,119 @@ +"""Orchestration: per-source spectra computation, over-init aggregation, and the +experiment overlay plot. Defines the spectra.nc schema.""" + +from __future__ import annotations + +from itertools import cycle +from pathlib import Path + +import numpy as np +import xarray as xr + +from spectra import core, io, regrid + +_PALETTE = ["#1f3b73", "#c1272d", "#1b7837", "#6a3d9a", "#e08214", "#4d4d4d"] + + +def compute_source_spectra(ds: xr.Dataset, variables, lead_times, method, label): + """Compute spectra for one source (already-loaded native dataset). + + Returns an x.Dataset with dims (variable, leadtime, wavenumber) and a shared + `wavelength` coordinate. The native grid is detected from the field length. + """ + if not variables or not lead_times: + raise ValueError( + "compute_source_spectra requires non-empty `variables` and `lead_times`." + ) + spectrum_fn = core.SPECTRUM_FUNCS.get(method) + if spectrum_fn is None: + raise ValueError( + f"Unknown spectrum method {method!r}. Valid: {list(core.SPECTRUM_FUNCS)}." + ) + npoints = io.native_field( + ds, io.required_params(variables)[0], lead_times[0] + ).shape[0] + matrix, ny, nx, dx_km = regrid.load_regridder(npoints) + + wavelength = None + power = np.full((len(variables), len(lead_times), min(ny, nx) // 2), np.nan) + for vi, var in enumerate(variables): + for li, step in enumerate(lead_times): + comps, factor = io.native_components(ds, var, step) + grids = [regrid.regrid(c, matrix, ny, nx) for c in comps] + wl, p = core.combined_spectrum(spectrum_fn, grids, dx_km, factor=factor) + wavelength = wl + power[vi, li, :] = p + + return xr.Dataset( + {"power": (("variable", "leadtime", "wavenumber"), power)}, + coords={ + "variable": list(variables), + "leadtime": list(lead_times), + "wavenumber": np.arange(power.shape[-1]), + "wavelength": ("wavenumber", wavelength), + }, + attrs={ + "dx_km": float(dx_km), + "npoints": int(npoints), + "label": label, + "method": method, + }, + ) + + +def aggregate_spectra(spectra_files) -> xr.Dataset: + """Average power over init times (nanmean). All inputs share one grid.""" + datasets = [xr.open_dataset(f) for f in spectra_files] + try: + stacked = xr.concat(datasets, dim="init") + agg = stacked.mean(dim="init", skipna=True) + agg.attrs = datasets[0].attrs + agg["wavelength"] = datasets[0]["wavelength"].copy() + agg.load() + finally: + for ds in datasets: + ds.close() + return agg + + +def plot_experiment_spectra( + truth_file, participant_files, out_dir, variables, lead_times +): + """One overlay figure per (variable, lead time): truth + all participants.""" + out_dir = Path(out_dir) + out_dir.mkdir(parents=True, exist_ok=True) + participants = [xr.open_dataset(f) for f in participant_files] + try: + with xr.open_dataset(truth_file) as truth: + for var in variables: + for step in lead_times: + spectra = {} + t_wl = truth["wavelength"].values + t_p = truth["power"].sel(variable=var, leadtime=step).values + spectra[f"{truth.attrs['label']} (truth)"] = ( + t_wl, + t_p, + "k", + "-", + 1.0, + 2.0, + ) + color = cycle(_PALETTE) + for ds in participants: + wl = ds["wavelength"].values + p = ds["power"].sel(variable=var, leadtime=step).values + spectra[ds.attrs["label"]] = (wl, p, next(color), "-", 0.9, 1.6) + out = out_dir / f"spectrum_{var}_{step:03d}.png" + # Reference grid/eff-res/Nyquist lines use the TRUTH grid's dx (labelled + # "truth"); each participant curve keeps its own wavelength axis. + core.plot_power_spectra( + spectra, + out, + f"Power spectrum — {var} @ +{step}h", + grid_dx_km=float(truth.attrs["dx_km"]), + model_short="truth", + model_color="k", + ) + finally: + for ds in participants: + ds.close() diff --git a/src/spectra/core.py b/src/spectra/core.py new file mode 100644 index 00000000..62ce95e3 --- /dev/null +++ b/src/spectra/core.py @@ -0,0 +1,201 @@ +"""Numpy-only 2D variance (power) spectra of regular-grid fields. + +Ported from the `icon-power-spectra` skill. The only behavioural change vs. the +skill is that `radial_spectrum` returns FIXED-LENGTH arrays (length `nbins`, +`NaN` for empty bins) on a deterministic, edge-centred wavenumber grid, so +spectra from different init times share an identical wavelength coordinate and +can be stacked/averaged. Variance-conserving normalization is unchanged. +""" + +from __future__ import annotations + +from itertools import cycle + +import matplotlib.pyplot as plt +import numpy as np +import scipy.fft + +KM_PER_DEG = 111.2 + +REFERENCE_GRIDS = { + "O96 (~104 km)": 90.0 / 96.0 * KM_PER_DEG, + "N320 (~31 km)": 0.28125 * KM_PER_DEG, + "CERRA (~9 km)": 0.10 * KM_PER_DEG, +} + +EFF_RES_FACTOR = 6.0 +TRANSITION_KM = 400.0 + + +def _hann2d(ny: int, nx: int) -> np.ndarray: + win = np.hanning(ny)[:, None] * np.hanning(nx)[None, :] + return win / np.sqrt((win**2).mean()) + + +def radial_spectrum(coeff, kx, ky, nbins, norm, oro=None): + """Radially bin |coeff|^2 over isotropic |k| [cycles/km] into a density. + + Returns (wavelength_km, power) each of length `nbins`, sorted by ascending + wavelength, with `NaN` in bins that contain no samples. `norm` scales the + summed coefficients to a variance-conserving spectrum so spectra from grids + of different resolution overlap where both resolve. `oro` (transform of the + orography on the same grid), if given, removes the terrain-coherent + component per band (unused in v1). + """ + kk = np.sqrt(kx[None, :] ** 2 + ky[:, None] ** 2).ravel() + c = coeff.ravel() + h = oro.ravel() if oro is not None else None + + k_max = min(kx.max(), ky.max()) + edges = np.linspace(0.0, k_max, nbins + 1) + dk = edges[1] - edges[0] + which = np.digitize(kk, edges) + + power = np.full(nbins, np.nan) + for b in range(1, nbins + 1): + sel = which == b + if not np.any(sel): + continue + cb = c[sel] + s_ff = float(np.vdot(cb, cb).real) + if h is not None: + hb = h[sel] + s_hh = float(np.vdot(hb, hb).real) + if s_hh > 0: + s_fh = np.vdot(hb, cb) + s_ff -= abs(s_fh) ** 2 / s_hh + val = max(s_ff, 0.0) * norm / dk + power[b - 1] = val if val > 0.0 else np.nan + + centre_k = 0.5 * (edges[:-1] + edges[1:]) + wavelength = 1.0 / centre_k + order = np.argsort(wavelength) + return wavelength[order], power[order] + + +def dct_power_spectrum(field, dx_km, nbins=None, oro_coeff=None): + """2D DCT (Denis et al. 2002) variance spectrum of a regular-grid field.""" + field = field - field.mean() + ny, nx = field.shape + nbins = nbins or min(ny, nx) // 2 + coeff = scipy.fft.dctn(field, type=2, norm="ortho") + kx = np.arange(nx) / (2.0 * nx) / dx_km + ky = np.arange(ny) / (2.0 * ny) / dx_km + return radial_spectrum(coeff, kx, ky, nbins, 1.0 / (nx * ny), oro_coeff) + + +def fft_power_spectrum(field, dx_km, nbins=None, oro_coeff=None): + """2D windowed-FFT (Hann) spectrum, normalized to match the DCT.""" + # NOTE: ported verbatim from the icon-power-spectra skill. FFT is the + # optional (non-default) comparison method; the magnitude binning follows + # the reference implementation exactly. Prefer DCT for quantitative use. + field = field - field.mean() + ny, nx = field.shape + nbins = nbins or min(ny, nx) // 2 + coeff = scipy.fft.fft2(field * _hann2d(ny, nx)) + kx = scipy.fft.fftfreq(nx, d=dx_km) + ky = scipy.fft.fftfreq(ny, d=dx_km) + return radial_spectrum(coeff, kx, ky, nbins, 1.0 / (nx * ny) ** 2, oro_coeff) + + +SPECTRUM_FUNCS = {"dct": dct_power_spectrum, "fft": fft_power_spectrum} + + +def combined_spectrum( + spectrum_fn, fields_2d, dx_km, nbins=None, factor=1.0, oro_coeff=None +): + """Sum the spectra of one or more component fields, scaled by `factor`. + + For wind kinetic energy pass [u, v] and factor=0.5 -> 0.5*(|u_hat|^2+|v_hat|^2). + NaN bins are treated as zero in the sum so the common coordinate is preserved. + """ + if not fields_2d: + raise ValueError("combined_spectrum requires at least one field") + wl = total = None + for f in fields_2d: + w, p = spectrum_fn(f, dx_km, nbins, oro_coeff=oro_coeff) + wl = w + total = np.nan_to_num(p) if total is None else total + np.nan_to_num(p) + return wl, factor * total + + +def _add_slope_guide(ax, n, label, wl_range, anchor): + wl = np.array(wl_range, dtype=float) + wl0, p0 = anchor + power = p0 * (wl / wl0) ** n + ax.plot(wl, power, ls=":", color="0.35", lw=1.3) + ax.annotate( + label, + xy=(wl.min(), power[np.argmin(wl)]), + xytext=(3, 3), + textcoords="offset points", + color="0.35", + fontsize=11, + style="italic", + ) + + +def plot_power_spectra( + spectra, + out_path, + title, + grid_dx_km=None, + model_short="model", + model_color="#1f3b73", + extra_lines=None, +): + """Plot one or more spectra vs wavelength with the standard annotations. + + spectra : {label: (wavelength_km, power, color, linestyle, alpha, lw)} + """ + fig, ax = plt.subplots(figsize=(10, 6)) + for label, (wl, power, color, ls, alpha, lw) in spectra.items(): + ax.loglog(wl, power, lw=lw, label=label, color=color, ls=ls, alpha=alpha) + + lines = [(lbl, wl, "0.55") for lbl, wl in REFERENCE_GRIDS.items()] + if grid_dx_km is not None: + lines += [ + (f"{model_short} grid", grid_dx_km, model_color), + (f"{model_short} eff.", EFF_RES_FACTOR * grid_dx_km, model_color), + (f"{model_short} Nyq.", 2.0 * grid_dx_km, model_color), + ] + lines += list(extra_lines or []) + _, ymax = ax.get_ylim() + label_y = cycle([1.0, 0.6, 0.35]) + for label, wl, color in sorted(lines, key=lambda t: t[1]): + ax.axvline(wl, color=color, ls="--", lw=1.0, alpha=0.8) + ax.text( + wl, + ymax * next(label_y), + " " + label, + rotation=90, + va="top", + ha="right", + color=color, + fontsize=8.5, + ) + + ref_wl, ref_p = next(iter(spectra.values()))[:2] + ref_wl = np.asarray(ref_wl) + ref_p = np.asarray(ref_p) + finite = np.isfinite(ref_wl) & np.isfinite(ref_p) + ref_wl, ref_p = ref_wl[finite], ref_p[finite] + meso = (ref_wl >= 5) & (ref_wl <= TRANSITION_KM) + if meso.any(): + _add_slope_guide( + ax, + 5.0 / 3.0, + r"$k^{-5/3}$", + (ref_wl[meso].min(), ref_wl[meso].max()), + (np.median(ref_wl[meso]), 5 * np.median(ref_p[meso])), + ) + + ax.set_xlabel("Wavelength [km]") + ax.set_ylabel("Power (variance density)") + ax.set_title(title) + ax.invert_xaxis() + ax.grid(True, which="both", ls=":", alpha=0.3) + ax.legend(loc="lower left", frameon=True) + fig.tight_layout() + fig.savefig(out_path, dpi=200, bbox_inches="tight") + plt.close(fig) diff --git a/src/spectra/io.py b/src/spectra/io.py new file mode 100644 index 00000000..a289eb59 --- /dev/null +++ b/src/spectra/io.py @@ -0,0 +1,69 @@ +"""Thin evalml I/O for spectra: variable->component mapping and native-field +extraction. Field loading is delegated to data_input loaders.""" + +from __future__ import annotations + +import numpy as np +import xarray as xr + +# Logical spectra variable -> (component GRIB params, scale factor). +# WIND_KE combines U/V as kinetic energy E = 0.5*(|u_hat|^2 + |v_hat|^2). +VARIABLE_COMPONENTS: dict[str, tuple[list[str], float]] = { + "T_2M": (["T_2M"], 1.0), + "WIND_KE": (["U_10M", "V_10M"], 0.5), + "TOT_PREC": (["TOT_PREC"], 1.0), +} + + +def required_params(variables: list[str]) -> list[str]: + """All GRIB params needed to compute the requested spectra variables.""" + params: list[str] = [] + for var in variables: + if var not in VARIABLE_COMPONENTS: + raise KeyError( + f"Unknown spectra variable {var!r}. " + f"Known: {sorted(VARIABLE_COMPONENTS)}." + ) + for p in VARIABLE_COMPONENTS[var][0]: + if p not in params: + params.append(p) + return params + + +def native_field(ds: xr.Dataset, param: str, step: int) -> np.ndarray: + """1-D native field for `param` at lead-time `step` (hours). + + Handles both timedelta64 step coordinates (as produced by the data_input + GRIB/baseline loaders) and plain integer-hour step coordinates. + """ + da = ds[param] + if "step" in da.dims: + step_coord = da.coords["step"] + if np.issubdtype(step_coord.dtype, np.timedelta64): + da = da.sel(step=np.timedelta64(step, "h")) + else: + da = da.sel(step=step) + # Drop singleton non-spatial dims left over from the loaders (e.g. a size-1 + # forecast_reference_time, or a single ensemble `number`), keeping only the + # native grid axis. A genuine multi-member dim survives and trips the guard. + da = da.squeeze(drop=True) + if da.ndim != 1: + raise ValueError( + f"Expected a 1-D native field for {param!r} after step selection, " + f"got shape {da.shape} with dims {da.dims}. Power spectra require " + f"data on the ICON native (unstructured) grid." + ) + return np.asarray(da.values).reshape(-1) + + +def native_components( + ds: xr.Dataset, variable: str, step: int +) -> tuple[list[np.ndarray], float]: + """Native component arrays + scale factor for a spectra variable at `step`.""" + if variable not in VARIABLE_COMPONENTS: + raise KeyError( + f"Unknown spectra variable {variable!r}. " + f"Known: {sorted(VARIABLE_COMPONENTS)}." + ) + params, factor = VARIABLE_COMPONENTS[variable] + return [native_field(ds, p, step) for p in params], factor diff --git a/src/spectra/regrid.py b/src/spectra/regrid.py new file mode 100644 index 00000000..3aa9ce39 --- /dev/null +++ b/src/spectra/regrid.py @@ -0,0 +1,90 @@ +# src/spectra/regrid.py +"""ICON native (unstructured) -> regular rotlatlon regridding via iconremap RBF +weights. The weights file is selected at runtime from the native point count.""" + +from __future__ import annotations + +from functools import lru_cache +from pathlib import Path + +import numpy as np +import scipy.sparse +import xarray as xr + +from spectra import core + +# Hardcoded iconremap RBF weight files on Balfrin, keyed by native cell count. +# CH1 native count is 1,147,980 (per the icon-power-spectra skill). +# TODO confirm CH2 count (see plan Task 2 Step 1). +_WEIGHTS_ROOT = Path("/store_new/mch/msopr/icon_workflow_2/iconremap-weights") +CH1_NPOINTS = 1_147_980 # confirmed vs ICON-CH1-EPS native T_2M field +CH2_NPOINTS = 283_876 # confirmed vs ICON-CH2-EPS native T_2M field +ICON_WEIGHTS = { + CH1_NPOINTS: _WEIGHTS_ROOT / "icon-ch1-eps-rotlatlon.nc", + CH2_NPOINTS: _WEIGHTS_ROOT / "icon-ch2-eps-rotlatlon.nc", +} +# TODO: prefer uuidOfHGrid as the registry key if point counts ever collide. +# TODO: eventually consume precomputed .npz sparse matrices directly. + + +def weights_path_for_npoints(npoints: int) -> Path: + """Resolve the iconremap weights file for a native grid of `npoints` cells.""" + try: + return ICON_WEIGHTS[npoints] + except KeyError as exc: + raise KeyError( + f"No iconremap weights registered for native grid with {npoints} " + f"cells. Known grids: {sorted(ICON_WEIGHTS)}." + ) from exc + + +def build_truncation_matrix(iconremap_nc, n_source): + """Sparse native(n_source) -> regular(ny*nx) matrix from iconremap weights.""" + with xr.open_dataset(iconremap_nc) as ds: + idx = ds["rbf_B_glbidx"].values + wgt = ds["rbf_B_wgt"].values + nx, ny = int(ds.attrs["nx"]), int(ds.attrs["ny"]) + npoints = idx.shape[-1] + idx = idx.reshape(ny * nx, npoints) + wgt = wgt.reshape(ny * nx, npoints) + n_targets = ny * nx + + # iconremap indices are 0-based and used as-is (cf. meteodata-lab's + # `_icon2regular`, which indexes with `np.take(field, indices)`). + rows = np.broadcast_to(np.arange(n_targets)[:, None], idx.shape).ravel() + cols = idx.ravel() + vals = wgt.ravel().astype(np.float32) + + M = scipy.sparse.coo_matrix( + (vals, (rows, cols)), shape=(n_targets, n_source) + ).tocsr() + return M, (ny, nx) + + +def grid_geometry(iconremap_nc): + """(ny, nx, dx_km) of the regular target grid from the iconremap weights.""" + with xr.open_dataset(iconremap_nc) as ds: + return ( + int(ds.attrs["ny"]), + int(ds.attrs["nx"]), + float(ds.attrs["dx"]) * core.KM_PER_DEG, + ) + + +def regrid(native_1d, matrix, ny, nx): + """Apply a truncation matrix to a native field -> (ny, nx) regular array.""" + return (matrix @ np.asarray(native_1d)).reshape(ny, nx) + + +@lru_cache(maxsize=4) +def load_regridder(npoints: int): + """Cached regridder for a native grid: (matrix, ny, nx, dx_km).""" + weights = weights_path_for_npoints(npoints) + matrix, (ny, nx) = build_truncation_matrix(weights, n_source=npoints) + g_ny, g_nx, dx_km = grid_geometry(weights) + if (g_ny, g_nx) != (ny, nx): + raise ValueError( + f"Grid geometry mismatch for {npoints} cells: " + f"matrix target {(ny, nx)} vs grid_geometry {(g_ny, g_nx)}." + ) + return matrix, ny, nx, dx_km diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 4c715931..e34b87a4 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -1,6 +1,7 @@ import pytest +from pydantic import ValidationError -from evalml.config import ConfigModel +from evalml.config import ConfigModel, SpectraConfig def test_example_forecasters_config(example_forecasters_config): @@ -39,3 +40,53 @@ def test_legacy_top_level_baselines_still_supported(example_forecasters_config): ] _ = ConfigModel.model_validate(cfg) + + +def test_spectra_config_defaults(): + cfg = SpectraConfig() + assert cfg.enabled is False + assert cfg.method == "dct" + assert cfg.variables == ["T_2M", "WIND_KE", "TOT_PREC"] + assert cfg.lead_times == [] + + +def test_spectra_config_accepts_valid(): + cfg = SpectraConfig( + enabled=True, + method="fft", + lead_times=[6, 48, 120], + variables=["T_2M"], + init_hours=[0, 12], + ) + assert cfg.method == "fft" + assert cfg.lead_times == [6, 48, 120] + + +def test_spectra_config_rejects_bad_method(): + with pytest.raises(ValidationError): + SpectraConfig(method="wavelet") + + +def test_spectra_config_rejects_unknown_variable(): + with pytest.raises(ValidationError): + SpectraConfig(variables=["GEOPOTENTIAL"]) + + +def test_spectra_config_rejects_empty_variables(): + with pytest.raises(ValidationError): + SpectraConfig(variables=[]) + + +def test_spectra_config_forbids_extra(): + with pytest.raises(ValidationError): + SpectraConfig(enabled=True, bogus=1) + + +def test_spectra_config_enabled_requires_lead_times(): + with pytest.raises(ValidationError): + SpectraConfig(enabled=True, lead_times=[]) + + +def test_spectra_config_disabled_allows_empty_lead_times(): + cfg = SpectraConfig(enabled=False) + assert cfg.lead_times == [] diff --git a/tests/unit/test_spectra_core.py b/tests/unit/test_spectra_core.py new file mode 100644 index 00000000..19f9deb9 --- /dev/null +++ b/tests/unit/test_spectra_core.py @@ -0,0 +1,103 @@ +import numpy as np +import pytest +import xarray as xr + +from spectra import compute, core + + +def _cosine_field(n=128, wavelength_px=16): + x = np.arange(n) + return np.cos(2 * np.pi * x[None, :] / wavelength_px) * np.ones((n, 1)) + + +def test_radial_spectrum_returns_fixed_length_with_nan_padding(): + # A coeff array with energy only in a couple of modes -> most bins empty (NaN). + coeff = np.zeros((8, 8)) + coeff[0, 3] = 5.0 + kx = np.arange(8) / (2.0 * 8) / 1.0 + ky = np.arange(8) / (2.0 * 8) / 1.0 + wl, power = core.radial_spectrum(coeff, kx, ky, nbins=4, norm=1.0 / 64) + assert wl.shape == (4,) and power.shape == (4,) + assert np.isnan(power).any() # empty bins are NaN, not dropped + assert np.nanmax(power) > 0 + assert np.all(np.diff(wl) > 0) # wavelengths strictly ascending + + +def test_dct_spectrum_peaks_at_input_wavelength(): + dx = 2.0 # km per pixel + wavelength_px = 16 + field = _cosine_field(n=128, wavelength_px=wavelength_px) + wl, power = core.dct_power_spectrum(field, dx) + peak_wl = wl[np.nanargmax(power)] + expected = wavelength_px * dx # 32 km + assert peak_wl == pytest.approx(expected, rel=0.25) + + +def test_constant_field_has_no_power(): + field = np.full((64, 64), 7.3) + wl, power = core.dct_power_spectrum(field, 1.0) + assert np.nansum(power) == pytest.approx(0.0, abs=1e-9) + + +def test_combined_spectrum_scales_and_sums(): + field = _cosine_field(64, 8) + wl1, p1 = core.dct_power_spectrum(field, 1.0) + # two identical fields with factor 0.5 -> 0.5*(p+p) == p + wl2, p2 = core.combined_spectrum( + core.dct_power_spectrum, [field, field], 1.0, factor=0.5 + ) + np.testing.assert_allclose(np.nan_to_num(p2), np.nan_to_num(p1), rtol=1e-6) + + +def test_aggregate_spectra_averages_over_init(tmp_path): + wl = np.array([100.0, 50.0, 25.0]) + + def mk(power, init): + ds = xr.Dataset( + {"power": (("variable", "leadtime", "wavenumber"), np.array([[power]]))}, + coords={ + "variable": ["T_2M"], + "leadtime": [6], + "wavenumber": np.arange(3), + "wavelength": ("wavenumber", wl), + }, + attrs={"dx_km": 1.1, "npoints": 10, "label": "m"}, + ) + p = tmp_path / f"s_{init}.nc" + ds.to_netcdf(p) + return p + + a = mk([1.0, 2.0, 3.0], 0) + b = mk([3.0, 4.0, 5.0], 1) + agg = compute.aggregate_spectra([a, b]) + np.testing.assert_allclose( + agg["power"].sel(variable="T_2M", leadtime=6).values, [2.0, 3.0, 4.0] + ) + assert agg.attrs["label"] == "m" + np.testing.assert_array_equal(agg["wavelength"].values, wl) + + +def test_aggregate_spectra_nanmean_ignores_missing(tmp_path): + wl = np.array([100.0, 50.0]) + + def mk(power, init): + ds = xr.Dataset( + {"power": (("variable", "leadtime", "wavenumber"), np.array([[power]]))}, + coords={ + "variable": ["T_2M"], + "leadtime": [6], + "wavenumber": np.arange(2), + "wavelength": ("wavenumber", wl), + }, + attrs={"dx_km": 1.1, "npoints": 10, "label": "m"}, + ) + p = tmp_path / f"s_{init}.nc" + ds.to_netcdf(p) + return p + + a = mk([np.nan, 2.0], 0) + b = mk([4.0, 4.0], 1) + agg = compute.aggregate_spectra([a, b]) + np.testing.assert_allclose( + agg["power"].sel(variable="T_2M", leadtime=6).values, [4.0, 3.0] + ) diff --git a/tests/unit/test_spectra_io.py b/tests/unit/test_spectra_io.py new file mode 100644 index 00000000..a9359b68 --- /dev/null +++ b/tests/unit/test_spectra_io.py @@ -0,0 +1,87 @@ +import numpy as np +import pandas as pd +import pytest +import xarray as xr + +from spectra import io + + +def _toy_dataset(): + # native grid of 6 cells, two steps + return xr.Dataset( + { + "T_2M": (("step", "values"), np.arange(12.0).reshape(2, 6)), + "U_10M": (("step", "values"), np.ones((2, 6))), + "V_10M": (("step", "values"), 2 * np.ones((2, 6))), + }, + coords={"step": [0, 6], "values": np.arange(6)}, + ) + + +def test_variable_components_known(): + assert io.VARIABLE_COMPONENTS["T_2M"] == (["T_2M"], 1.0) + assert io.VARIABLE_COMPONENTS["WIND_KE"] == (["U_10M", "V_10M"], 0.5) + assert io.VARIABLE_COMPONENTS["TOT_PREC"] == (["TOT_PREC"], 1.0) + + +def test_native_field_squeezes_singleton_reftime_dim(): + # Real GRIB loads carry a size-1 forecast_reference_time dim; native_field + # must squeeze it so the field reduces to the 1-D native grid axis. + ds = xr.Dataset( + { + "T_2M": ( + ("forecast_reference_time", "step", "values"), + np.arange(12.0).reshape(1, 2, 6), + ) + }, + coords={"step": [0, 6], "values": np.arange(6)}, + ) + arr = io.native_field(ds, "T_2M", step=6) + assert arr.shape == (6,) + np.testing.assert_array_equal(arr, np.arange(6, 12)) + + +def test_native_field_extracts_1d_for_step(): + ds = _toy_dataset() + arr = io.native_field(ds, "T_2M", step=6) + assert arr.shape == (6,) + np.testing.assert_array_equal(arr, np.arange(6, 12)) + + +def test_native_components_returns_list_and_factor(): + ds = _toy_dataset() + comps, factor = io.native_components(ds, "WIND_KE", step=0) + assert factor == 0.5 + assert len(comps) == 2 + assert comps[0].shape == (6,) + + +def test_unknown_variable_raises(): + with pytest.raises(KeyError, match="Unknown spectra variable"): + io.native_components(_toy_dataset(), "NOPE", step=0) + + +def test_native_field_handles_timedelta_step(): + ds = xr.Dataset( + {"T_2M": (("step", "values"), np.arange(12.0).reshape(2, 6))}, + coords={"step": pd.to_timedelta([0, 6], unit="h"), "values": np.arange(6)}, + ) + arr = io.native_field(ds, "T_2M", step=6) + np.testing.assert_array_equal(arr, np.arange(6, 12)) + + +def test_native_field_rejects_non_1d(): + ds = xr.Dataset( + {"T_2M": (("step", "y", "x"), np.zeros((1, 3, 4)))}, + coords={"step": [0], "y": np.arange(3), "x": np.arange(4)}, + ) + with pytest.raises(ValueError, match="1-D native field"): + io.native_field(ds, "T_2M", step=0) + + +def test_required_params_deduplicates(): + assert io.required_params(["WIND_KE", "T_2M", "WIND_KE"]) == [ + "U_10M", + "V_10M", + "T_2M", + ] diff --git a/tests/unit/test_spectra_regrid.py b/tests/unit/test_spectra_regrid.py new file mode 100644 index 00000000..bf439796 --- /dev/null +++ b/tests/unit/test_spectra_regrid.py @@ -0,0 +1,82 @@ +# tests/unit/test_spectra_regrid.py +import numpy as np +import pytest +import xarray as xr + +from spectra import regrid + + +def _fake_weights(tmp_path, ny=4, nx=5, n_source=20, npoints=3): + # Realistic iconremap file: 0-based source indices, used as-is. Index 0 is a + # valid source cell (here paired with a nonzero weight), not padding. + rng = np.arange(ny * nx * npoints) % n_source + idx = rng.reshape(ny * nx, npoints).astype("int64") # 0-based, 0..n_source-1 + wgt = np.full((ny * nx, npoints), 1.0 / npoints, dtype="float64") + ds = xr.Dataset( + { + "rbf_B_glbidx": (("cell", "stencil"), idx), + "rbf_B_wgt": (("cell", "stencil"), wgt), + }, + attrs={"nx": nx, "ny": ny, "dx": 0.01}, + ) + path = tmp_path / "weights.nc" + ds.to_netcdf(path) + return path, ny, nx, n_source + + +def test_build_truncation_matrix_applies_weights(tmp_path): + path, ny, nx, n_source = _fake_weights(tmp_path) + M, (rny, rnx) = regrid.build_truncation_matrix(path, n_source=n_source) + assert (rny, rnx) == (ny, nx) + native = np.ones(n_source) + out = regrid.regrid(native, M, ny, nx) + assert out.shape == (ny, nx) + np.testing.assert_allclose(out, 1.0, rtol=1e-6) # average of ones is one + + +def test_grid_geometry_reads_dx_in_km(tmp_path): + path, ny, nx, _ = _fake_weights(tmp_path) + g_ny, g_nx, dx_km = regrid.grid_geometry(path) + assert (g_ny, g_nx) == (ny, nx) + assert dx_km == pytest.approx(0.01 * regrid.core.KM_PER_DEG) + + +def test_weights_path_for_npoints_unknown_raises(): + with pytest.raises(KeyError, match="No iconremap weights"): + regrid.weights_path_for_npoints(999_999) + + +def test_weights_path_for_npoints_known(): + assert "ch1" in str(regrid.weights_path_for_npoints(1_147_980)).lower() + + +def _fake_weights_zero_based_no_zeros(tmp_path, ny=4, nx=5, n_source=20): + # 0-based file (as the real ICON-CH1 weights): no zero entries at all, and + # the smallest *referenced* cell is >= 1, yet max index == n_source - 1. A + # min-index heuristic wrongly calls this 1-based and shifts every reference + # by one -- the off-by-one that injected grid-scale noise into the spectra. + # Single-point stencils (weight 1) so the regridded value IS the referenced + # native cell, making any shift directly observable. + idx = ((np.arange(ny * nx) % (n_source - 5)) + 5).reshape(ny * nx, 1) + assert idx.min() >= 1 and idx.max() == n_source - 1 and (idx != 0).all() + wgt = np.ones((ny * nx, 1), dtype="float64") + ds = xr.Dataset( + { + "rbf_B_glbidx": (("cell", "stencil"), idx.astype("int64")), + "rbf_B_wgt": (("cell", "stencil"), wgt), + }, + attrs={"nx": nx, "ny": ny, "dx": 0.01}, + ) + path = tmp_path / "weights_zero_no_pad.nc" + ds.to_netcdf(path) + return path, ny, nx, n_source, idx.ravel() + + +def test_build_truncation_matrix_zero_based_no_zeros_not_shifted(tmp_path): + # Regression: a 0-based, zero-padding-free file must NOT be treated as 1-based. + path, ny, nx, n_source, ref_cells = _fake_weights_zero_based_no_zeros(tmp_path) + M, _ = regrid.build_truncation_matrix(path, n_source=n_source) + native = np.arange(n_source, dtype="float64") # value == cell index + out = regrid.regrid(native, M, ny, nx).ravel() + # No off-by-one: each target picks up its referenced native cell as-is. + np.testing.assert_array_equal(out, ref_cells) diff --git a/workflow/Snakefile b/workflow/Snakefile index 4a3ec692..9d446892 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -19,6 +19,7 @@ include: "rules/inference.smk" include: "rules/verification.smk" include: "rules/report.smk" include: "rules/plot.smk" +include: "rules/spectra.smk" # about workflow @@ -148,6 +149,11 @@ rule experiment_all: ) else [] ), + ( + expand(rules.spectra_plot.output, experiment=EXPERIMENT_NAME) + if config.get("experiment", {}).get("spectra", {}).get("enabled", False) + else [] + ), rule showcase_all: diff --git a/workflow/rules/spectra.smk b/workflow/rules/spectra.smk new file mode 100644 index 00000000..b7f152ca --- /dev/null +++ b/workflow/rules/spectra.smk @@ -0,0 +1,213 @@ +# ----------------------------------------------------- # +# POWER SPECTRA WORKFLOW # +# ----------------------------------------------------- # + + +include: "common.smk" + + +SPECTRA = config["experiment"].get("spectra", {}) +SPECTRA_LEAD_TIMES = ",".join(str(s) for s in SPECTRA.get("lead_times", [])) +SPECTRA_VARIABLES = ",".join(SPECTRA.get("variables", ["T_2M", "WIND_KE", "TOT_PREC"])) +SPECTRA_METHOD = SPECTRA.get("method", "dct") + + +def _spectra_reftimes(): + hours = SPECTRA.get("init_hours") + if hours is None: + return [t.strftime("%Y%m%d%H%M") for t in REFTIMES] + return [t.strftime("%Y%m%d%H%M") for t in REFTIMES if t.hour in hours] + + +def _spectra_truth_steps(): + candidates = list(collect_all_candidates()) + if candidates: + return RUN_CONFIGS[candidates[0]]["steps"] + # no candidate runs (baseline-only experiment): fall back to any baseline's steps + if BASELINE_CONFIGS: + return next(iter(BASELINE_CONFIGS.values()))["steps"] + return "0/120/6" + + +def spectra_participants(): + """participant key -> aggregated spectra path (runs + baselines).""" + out = {} + for base in BASELINE_CONFIGS: + out[base] = OUT_ROOT / f"data/baselines/{base}/spectra_aggregated.nc" + for run_id, cfg in RUN_CONFIGS.items(): + if cfg.get("_is_candidate", False): + out[run_id] = OUT_ROOT / f"data/runs/{run_id}/spectra_aggregated.nc" + return out + + +SPECTRA_PARTICIPANTS = spectra_participants() + + +rule spectra_compute: + input: + "src/spectra/__init__.py", + "src/data_input/__init__.py", + script="workflow/scripts/spectra_compute.py", + inference_okfile=rules.inference_execute.output.okfile, + eckit_grids=rules.data_download_eckit_geo_grids.output, + output: + OUT_ROOT / "data/runs/{run_id}/{init_time}/spectra.nc", + log: + OUT_ROOT / "logs/spectra_compute/{run_id}-{init_time}.log", + wildcard_constraints: + run_id=r"[^/]+/[^/]+", + init_time=r"\d+", + resources: + cpus_per_task=8, + mem_mb=50_000, + runtime="30m", + params: + grib=lambda wc: ( + Path(OUT_ROOT) / f"data/runs/{wc.run_id}/{wc.init_time}/grib" + ).resolve(), + steps=lambda wc: RUN_CONFIGS[wc.run_id]["steps"], + label=lambda wc: RUN_CONFIGS[wc.run_id].get("label", wc.run_id), + shell: + """ + export ECCODES_DEFINITION_PATH=$(realpath .venv/share/eccodes-cosmo-resources/definitions) + uv run {input.script} \ + --forecast {params.grib} --reftime {wildcards.init_time} \ + --steps "{params.steps}" --lead_times "{SPECTRA_LEAD_TIMES}" \ + --variables "{SPECTRA_VARIABLES}" --method "{SPECTRA_METHOD}" \ + --label "{params.label}" --output {output} >{log} 2>&1 + """ + + +rule spectra_compute_baseline: + input: + "src/spectra/__init__.py", + "src/data_input/__init__.py", + script="workflow/scripts/spectra_compute.py", + forecast=lambda wc: BASELINE_CONFIGS[wc.baseline_id]["root"], + eckit_grids=rules.data_download_eckit_geo_grids.output, + output: + OUT_ROOT / "data/baselines/{baseline_id}/{init_time}/spectra.nc", + log: + OUT_ROOT / "logs/spectra_compute_baseline/{baseline_id}-{init_time}.log", + resources: + cpus_per_task=8, + mem_mb=50_000, + runtime="30m", + params: + steps=lambda wc: BASELINE_CONFIGS[wc.baseline_id]["steps"], + label=lambda wc: BASELINE_CONFIGS[wc.baseline_id].get("label", wc.baseline_id), + shell: + """ + export ECCODES_DEFINITION_PATH=$(realpath .venv/share/eccodes-cosmo-resources/definitions) + uv run {input.script} \ + --forecast {input.forecast} --reftime {wildcards.init_time} \ + --steps "{params.steps}" --lead_times "{SPECTRA_LEAD_TIMES}" \ + --variables "{SPECTRA_VARIABLES}" --method "{SPECTRA_METHOD}" \ + --label "{params.label}" --output {output} >{log} 2>&1 + """ + + +rule spectra_compute_truth: + input: + "src/spectra/__init__.py", + "src/data_input/__init__.py", + script="workflow/scripts/spectra_compute.py", + truth=config["truth"]["root"], + eckit_grids=rules.data_download_eckit_geo_grids.output, + output: + OUT_ROOT / "data/truth/{init_time}/spectra.nc", + log: + OUT_ROOT / "logs/spectra_truth/{init_time}.log", + resources: + cpus_per_task=8, + mem_mb=50_000, + runtime="30m", + params: + steps=_spectra_truth_steps(), + label=config["truth"]["label"], + shell: + """ + export ECCODES_DEFINITION_PATH=$(realpath .venv/share/eccodes-cosmo-resources/definitions) + uv run {input.script} \ + --truth {input.truth} --reftime {wildcards.init_time} \ + --steps "{params.steps}" --lead_times "{SPECTRA_LEAD_TIMES}" \ + --variables "{SPECTRA_VARIABLES}" --method "{SPECTRA_METHOD}" \ + --label "{params.label}" --output {output} >{log} 2>&1 + """ + + +rule spectra_aggregate: + input: + script="workflow/scripts/spectra_aggregate.py", + spectra_nc=lambda wc: expand( + rules.spectra_compute.output, + init_time=_spectra_reftimes(), + allow_missing=True, + ), + output: + OUT_ROOT / "data/runs/{run_id}/spectra_aggregated.nc", + log: + OUT_ROOT / "logs/spectra_aggregate/{run_id}.log", + wildcard_constraints: + run_id=r"[^/]+/[^/]+", + localrule: True + resources: + cpus_per_task=2, + mem_mb=20_000, + runtime="20m", + shell: + "uv run {input.script} {input.spectra_nc} --output {output} >{log} 2>&1" + + +use rule spectra_aggregate as spectra_aggregate_baseline with: + input: + script="workflow/scripts/spectra_aggregate.py", + spectra_nc=lambda wc: expand( + rules.spectra_compute_baseline.output, + init_time=_spectra_reftimes(), + allow_missing=True, + ), + output: + OUT_ROOT / "data/baselines/{baseline_id}/spectra_aggregated.nc", + log: + OUT_ROOT / "logs/spectra_aggregate_baseline/{baseline_id}.log", + localrule: True + + +use rule spectra_aggregate as spectra_aggregate_truth with: + input: + script="workflow/scripts/spectra_aggregate.py", + spectra_nc=expand( + rules.spectra_compute_truth.output, + init_time=_spectra_reftimes(), + ), + output: + OUT_ROOT / "data/truth/spectra_aggregated.nc", + log: + OUT_ROOT / "logs/spectra_aggregate_truth/truth.log", + localrule: True + + +rule spectra_plot: + input: + script="workflow/scripts/spectra_plot.py", + truth=OUT_ROOT / "data/truth/spectra_aggregated.nc", + participants=list(SPECTRA_PARTICIPANTS.values()), + output: + report( + directory(OUT_ROOT / "results/{experiment}/spectra"), + patterns=["{name}.png"], + ), + log: + OUT_ROOT / "logs/spectra_plot/{experiment}.log", + resources: + cpus_per_task=2, + mem_mb=20_000, + runtime="20m", + shell: + """ + uv run {input.script} \ + --truth {input.truth} --participants {input.participants} \ + --variables "{SPECTRA_VARIABLES}" --lead_times "{SPECTRA_LEAD_TIMES}" \ + --output_dir {output} >{log} 2>&1 + """ diff --git a/workflow/scripts/spectra_aggregate.py b/workflow/scripts/spectra_aggregate.py new file mode 100644 index 00000000..5cf1a39b --- /dev/null +++ b/workflow/scripts/spectra_aggregate.py @@ -0,0 +1,28 @@ +"""Average per-init spectra over init times for one participant.""" + +import logging +from argparse import ArgumentParser +from pathlib import Path + +from spectra.compute import aggregate_spectra + +LOG = logging.getLogger(__name__) +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) + + +def main(): + ap = ArgumentParser() + ap.add_argument("inputs", nargs="+", help="per-init spectra.nc files.") + ap.add_argument("--output", required=True) + args = ap.parse_args() + agg = aggregate_spectra(args.inputs) + out = Path(args.output) + out.parent.mkdir(parents=True, exist_ok=True) + agg.to_netcdf(out) + LOG.info("Saved aggregated spectra to %s", out) + + +if __name__ == "__main__": + main() diff --git a/workflow/scripts/spectra_compute.py b/workflow/scripts/spectra_compute.py new file mode 100644 index 00000000..327ab59f --- /dev/null +++ b/workflow/scripts/spectra_compute.py @@ -0,0 +1,79 @@ +"""Compute power spectra for one source (run, baseline, or truth) at one init.""" + +import logging +from argparse import ArgumentParser +from datetime import datetime +from pathlib import Path + +import pandas as pd + +from data_input import load_forecast_data, load_truth_data, parse_steps +from spectra import io +from spectra.compute import compute_source_spectra + +LOG = logging.getLogger(__name__) +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) + + +def main(): + ap = ArgumentParser() + ap.add_argument("--forecast", help="Forecast root (grib dir or baseline archive).") + ap.add_argument("--truth", help="Truth zarr root (use instead of --forecast).") + ap.add_argument("--reftime", required=True) + ap.add_argument("--steps", required=True, help="start/stop/step in hours.") + ap.add_argument("--lead_times", required=True, help="comma-separated hours.") + ap.add_argument("--variables", required=True, help="comma-separated names.") + ap.add_argument("--method", default="dct") + ap.add_argument("--label", required=True) + ap.add_argument("--output", required=True) + args = ap.parse_args() + + if bool(args.forecast) == bool(args.truth): + ap.error("Exactly one of --forecast / --truth must be supplied.") + + reftime = datetime.strptime(args.reftime, "%Y%m%d%H%M") + all_steps = parse_steps(args.steps) + lead_times = [int(s) for s in args.lead_times.split(",")] + missing = [s for s in lead_times if s not in all_steps] + if missing: + LOG.warning("Requested lead times %s not in source steps; dropping.", missing) + lead_times = [s for s in lead_times if s in all_steps] + if not lead_times: + LOG.error( + "None of the requested lead times are available in source steps %s; aborting.", + all_steps, + ) + raise SystemExit(1) + variables = args.variables.split(",") + params = io.required_params(variables) + + if args.truth: + ds = load_truth_data(Path(args.truth), reftime, all_steps, params) + if "time" in ds.dims and "step" not in ds.dims: + valid = pd.to_datetime(ds["time"].values) + steps_h = ( + ((valid - pd.Timestamp(reftime)) / pd.Timedelta(hours=1)) + .round() + .astype(int) + ) + if pd.Series(steps_h).duplicated().any(): + raise ValueError( + f"Truth timestamps rounded to integer hours produced duplicate " + f"steps: {steps_h.tolist()}. Sub-hourly truth is unsupported for spectra." + ) + ds = ds.assign_coords(step=("time", steps_h)).swap_dims({"time": "step"}) + else: + ds = load_forecast_data(Path(args.forecast), reftime, all_steps, params) + + result = compute_source_spectra(ds, variables, lead_times, args.method, args.label) + out = Path(args.output) + out.parent.mkdir(parents=True, exist_ok=True) + result.to_netcdf(out) + LOG.info("Saved spectra to %s", out) + + +if __name__ == "__main__": + main() diff --git a/workflow/scripts/spectra_plot.py b/workflow/scripts/spectra_plot.py new file mode 100644 index 00000000..b81cc1a8 --- /dev/null +++ b/workflow/scripts/spectra_plot.py @@ -0,0 +1,32 @@ +"""Overlay truth + all participants into one figure per (variable, lead time).""" + +import logging +from argparse import ArgumentParser + +from spectra.compute import plot_experiment_spectra + +LOG = logging.getLogger(__name__) +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) + + +def main(): + ap = ArgumentParser() + ap.add_argument("--truth", required=True) + ap.add_argument("--participants", nargs="+", required=True) + ap.add_argument("--variables", required=True) + ap.add_argument("--lead_times", required=True) + ap.add_argument("--output_dir", required=True) + args = ap.parse_args() + plot_experiment_spectra( + args.truth, + args.participants, + args.output_dir, + variables=args.variables.split(","), + lead_times=[int(s) for s in args.lead_times.split(",")], + ) + + +if __name__ == "__main__": + main() diff --git a/workflow/tools/config.schema.json b/workflow/tools/config.schema.json index 5bc95b48..de1de588 100644 --- a/workflow/tools/config.schema.json +++ b/workflow/tools/config.schema.json @@ -269,6 +269,10 @@ ], "default": null, "description": "Scorecard generation configuration. Omit or set enabled: false to disable." + }, + "spectra": { + "$ref": "#/$defs/SpectraConfig", + "description": "Power-spectra QC configuration. Disabled by default." } }, "required": [ @@ -734,6 +738,67 @@ "title": "ShowcaseConfig", "type": "object" }, + "SpectraConfig": { + "additionalProperties": false, + "description": "Configuration for power-spectra QC plots in the experiment pipeline.", + "properties": { + "enabled": { + "default": false, + "description": "Whether to compute and plot power spectra.", + "title": "Enabled", + "type": "boolean" + }, + "method": { + "default": "dct", + "description": "Spectral method: 'dct' (default, recommended for LAM) or 'fft'.", + "enum": [ + "dct", + "fft" + ], + "title": "Method", + "type": "string" + }, + "lead_times": { + "description": "Representative lead times (hours) at which to compute spectra.", + "items": { + "type": "integer" + }, + "title": "Lead Times", + "type": "array" + }, + "variables": { + "default": [ + "T_2M", + "WIND_KE", + "TOT_PREC" + ], + "description": "Spectra variables. Supported: T_2M, WIND_KE (from U/V_10M), TOT_PREC.", + "items": { + "type": "string" + }, + "title": "Variables", + "type": "array" + }, + "init_hours": { + "anyOf": [ + { + "items": { + "type": "integer" + }, + "type": "array" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Optional subset of init hours to average over. None = all.", + "title": "Init Hours" + } + }, + "title": "SpectraConfig", + "type": "object" + }, "Stratification": { "description": "Stratification settings for the analysis.", "properties": {