Module mod17.calibration

Calibration of MOD17 against a representative, global eddy covariance (EC) flux tower network. The model calibration is based on Markov-Chain Monte Carlo (MCMC). Example use:

python calibration.py tune-gpp --pft=1

The general calibration protocol used here involves:

  1. Check how well the chain(s) are mixing by running short: python calibration.py tune-gpp 1 --draws=2000
  2. If any chain is "sticky," run a short chain while tuning the jump scale: python calibration.py tune-gpp 1 --tune=scaling --draws=2000
  3. Using the trace plot from Step (2) as a reference, experiment with different jump scales to try and achieve the same (optimal) mixing when tuning on lambda (default) instead, e.g.: python calibration.py tune-gpp 1 --scaling=1e-2 --draws=2000
  4. When the right jump scale is found, run a chain at the desired length.

Once a good mixture is obtained, it is necessary to prune the samples to eliminate autocorrelation, e.g., in Python:

sampler = MOD17StochasticSampler(...)
sampler.plot_autocorr(burn = 1000, thin = 10)
trace = sampler.get_trace(burn = 1000, thin = 10)

A thinned posterior can be exported from the command line:

$ python calibration.py export-bplut output.csv --burn=1000 --thin=10

References

Madani, N., Kimball, J. S., & Running, S. W. (2017). "Improving global gross primary productivity estimates by computing optimum light use efficiencies using flux tower data." Journal of Geophysical Research: Biogeosciences, 122(11), 2939–2951.

Expand source code
'''
Calibration of MOD17 against a representative, global eddy covariance (EC)
flux tower network. The model calibration is based on Markov-Chain Monte
Carlo (MCMC). Example use:

    python calibration.py tune-gpp --pft=1

The general calibration protocol used here involves:

1. Check how well the chain(s) are mixing by running short:
`python calibration.py tune-gpp 1 --draws=2000`
2. If any chain is "sticky," run a short chain while tuning the jump scale:
`python calibration.py tune-gpp 1 --tune=scaling --draws=2000`
3. Using the trace plot from Step (2) as a reference, experiment with
different jump scales to try and achieve the same (optimal) mixing when
tuning on `lambda` (default) instead, e.g.:
`python calibration.py tune-gpp 1 --scaling=1e-2 --draws=2000`
4. When the right jump scale is found, run a chain at the desired length.

Once a good mixture is obtained, it is necessary to prune the samples to
eliminate autocorrelation, e.g., in Python:

    sampler = MOD17StochasticSampler(...)
    sampler.plot_autocorr(burn = 1000, thin = 10)
    trace = sampler.get_trace(burn = 1000, thin = 10)

A thinned posterior can be exported from the command line:

```py
$ python calibration.py export-bplut output.csv --burn=1000 --thin=10
```

References:

    Madani, N., Kimball, J. S., & Running, S. W. (2017).
      "Improving global gross primary productivity estimates by computing
      optimum light use efficiencies using flux tower data."
      Journal of Geophysical Research: Biogeosciences, 122(11), 2939–2951.
'''

import datetime
import json
import os
import warnings
import numpy as np
import h5py
import arviz as az
import pymc as pm
import aesara.tensor as at
import mod17
from functools import partial
from multiprocessing import get_context, set_start_method
from numbers import Number
from typing import Callable, Sequence
from pathlib import Path
from matplotlib import pyplot
from scipy import signal
from mod17 import MOD17, PFT_VALID
from mod17.utils import pft_dominant, restore_bplut, write_bplut, rmsd

MOD17_DIR = os.path.dirname(mod17.__file__)
# This matplotlib setting prevents labels from overplotting
pyplot.rcParams['figure.constrained_layout.use'] = True


class BlackBoxLikelihood(at.Op):
    '''
    A custom Theano operator that calculates the "likelihood" of model
    parameters; it takes a vector of values (the parameters that define our
    model) and returns a single "scalar" value (the log-likelihood).

    Parameters
    ----------
    model : Callable
        An arbitrary "black box" function that takes two arguments: the
        model parameters ("params") and the forcing data ("x")
    observed : numpy.ndarray
        The "observed" data that our log-likelihood function takes in
    x : numpy.ndarray or None
        The forcing data (input drivers) that our model requires, or None
        if no driver data are required
    weights : Sequence or None
        Optional sequence of weights applied to the model residuals (as in
        weighted least squares)
    objective : str
        Name of the objective (or "loss") function to use, one of
        ('rmsd', 'gaussian', 'kge'); defaults to "rmsd"
    '''
    itypes = [at.dvector] # Expects a vector of parameter values when called
    otypes = [at.dscalar] # Outputs a single scalar value (the log likelihood)

    def __init__(
            self, model: Callable, observed: Sequence, x: Sequence = None,
            weights: Sequence = None, objective: str = 'rmsd'):
        '''
        Initialise the Op with various things that our log-likelihood function
        requires. The observed data ("observed") and drivers ("x") must be
        stored on the instance so the Theano Op can work seamlessly.
        '''
        self.model = model
        self.observed = observed
        self.x = x
        self.weights = weights
        if objective in ('rmsd', 'rmse'):
            self._loglik = self.loglik
        elif objective == 'gaussian':
            self._loglik = self.loglik_gaussian
        elif objective == 'kge':
            self._loglik = self.loglik_kge
        else:
            raise ValueError('Unknown "objective" function specified')

    def loglik(
            self, params: Sequence, observed: Sequence,
            x: Sequence = None) -> Number:
        '''
        Pseudo-log likelihood, based on the root-mean squared deviation
        (RMSD). The sign of the RMSD is forced to be negative so as to allow
        for maximization of this objective function.

        Parameters
        ----------
        params : Sequence
            One or more model parameters
        observed : Sequence
            The observed values
        x : Sequence or None
            Input driver data

        Returns
        -------
        Number
            The (negative) root-mean squared deviation (RMSD) between the
            predicted and observed values
        '''
        predicted = self.model(params, *x)
        if self.weights is not None:
            return -np.sqrt(
                np.nanmean(((predicted - observed) * self.weights) ** 2))
        return -np.sqrt(np.nanmean(((predicted - observed)) ** 2))

    def loglik_gaussian(
            self, params: Sequence, observed: Sequence,
            x: Sequence = None) -> Number:
        '''
        Gaussian log-likelihood, assuming independent, identically distributed
        observations.

        Parameters
        ----------
        params : Sequence
            One or more model parameters
        observed : Sequence
            The observed values
        x : Sequence or None
            Input driver data

        Returns
        -------
        Number
            The (negative) log-likelihood
        '''
        predicted = self.model(params, *x)
        sigma = params[-1]
        # Gaussian log-likelihood;
        # -\frac{N}{2}\,\mathrm{log}(2\pi\hat{\sigma}^2)
        #   - \frac{1}{2\hat{\sigma}^2} \sum (\hat{y} - y)^2
        return -0.5 * np.log(2 * np.pi * sigma**2) - (0.5 / sigma**2) *\
            np.nansum((predicted - observed)**2)

    def loglik_kge(
            self, params: Sequence, observed: Sequence,
            x: Sequence = None) -> Number:
        r'''
        Kling-Gupta efficiency.

        $$
        KGE = 1 - \sqrt{(r - 1)^2 + (\alpha - 1)^2 + (\beta - 1)^2}
        $$

        Parameters
        ----------
        params : Sequence
            One or more model parameters
        observed : Sequence
            The observed values
        x : Sequence or None
            Input driver data

        Returns
        -------
        Number
            The Kling-Gupta efficiency
        '''
        predicted = self.model(params, *x)
        r = np.corrcoef(predicted, observed)[0, 1]
        alpha = np.std(predicted) / np.std(observed)
        beta = np.sum(predicted) / np.sum(observed)
        return 1 - np.sqrt((r - 1)**2 + (alpha - 1)**2 + (beta - 1)**2)

    def perform(self, node, inputs, outputs):
        '''
        The method that is used when calling the Op.

        Parameters
        ----------
        node
        inputs : Sequence
        outputs : Sequence
        '''
        (params,) = inputs
        logl = self._loglik(params, self.observed, self.x)
        outputs[0][0] = np.array(logl) # Output the log-likelihood


class AbstractSampler(object):
    '''
    Generic algorithm for fitting a model to data based on observed values
    similar to what we can produce with our model. Not intended to be called
    directly.
    '''

    def get_posterior(self, thin: int = 1) -> np.ndarray:
        '''
        Returns a stacked posterior array, with optional thinning, combining
        all the chains together.

        Parameters
        ----------
        thin : int

        Returns
        -------
        numpy.ndarray
        '''
        trace = az.from_netcdf(self.backend)
        return np.stack([ # i.e., get every ith element, each chain
            trace['posterior'][p].values[:,::thin].ravel()
            for p in self.required_parameters[self.name]
        ], axis = -1)

    def get_trace(
            self, thin: int = None, burn: int = None
        ) -> az.data.inference_data.InferenceData:
        '''
        Extracts the trace from the backend data store.

        Parameters
        ----------
        thin : int
            Thinning rate
        burn : int
            The burn-in (i.e., first N samples to discard)
        '''
        trace = az.from_netcdf(self.backend)
        if thin is None and burn is None:
            return trace
        return trace.sel(draw = slice(burn, None, thin))

    def plot_autocorr(self, thin: int = None, burn: int = None, **kwargs):
        '''
        Auto-correlation plot for an MCMC sample.

        Parameters
        ----------
        thin : int
            Thinning rate
        burn : int
            The burn-in (i.e., first N samples to discard)
        **kwargs
            Additional keyword arguments to `arviz.plot_autocorr()`.
        '''
        assert os.path.exists(self.backend),\
            'Could not find file backend!'
        trace = az.from_netcdf(self.backend)
        kwargs.setdefault('combined', True)
        if thin is None:
            az.plot_autocorr(trace, **kwargs)
        else:
            burn = 0 if burn is None else burn
            az.plot_autocorr(
                trace.sel(draw = slice(burn, None, thin))['posterior'],
                **kwargs)
        pyplot.show()

    def plot_forest(self, **kwargs):
        '''
        Forest plot for an MCMC sample.

        In particular:

        - `hdi_prob`: A float indicating the highest density interval (HDF) to
            plot
        '''
        assert os.path.exists(self.backend),\
            'Could not find file backend!'
        trace = az.from_netcdf(self.backend)
        az.plot_forest(trace, **kwargs)
        pyplot.show()

    def plot_pair(self, **kwargs):
        '''
        Paired variables plot for an MCMC sample.

        Parameters
        ----------
        **kwargs
            Additional keyword arguments to `arviz.plot_pair()`.
        '''
        assert os.path.exists(self.backend),\
            'Could not find file backend!'
        trace = az.from_netcdf(self.backend)
        az.plot_pair(trace, **kwargs)
        pyplot.show()

    def plot_partial_score(
            self, observed: Sequence, drivers: Sequence, fit: dict = None):
        '''
        Plots the "partial effect" of a single parameter: the score of the
        model at that parameter's current value against a sweep of possible
        parameter values. All other parameters are held fixed at the best-fit
        values.

        Parameters
        ----------
        observed : Sequence
            Sequence of observed values that will be used to calibrate the model;
            i.e., model is scored by how close its predicted values are to the
            observed values
        drivers : list or tuple
            Sequence of driver datasets to be supplied, in order, to the
            model's run function
        fit : dict or None
            The best-fit parameter values used for those parameters that are
            fixed
        '''
        trace = az.from_netcdf(self.backend)
        if fit is None:
            # Mean of posterior are "best fit" values
            fit = trace['posterior'].mean()
        fit_params = list(filter(
            lambda p: p in fit, self.required_parameters[self.name]))
        # The NPP model depends on constants not included in the fit
        constants = []
        if self.name == 'NPP':
            constants = [
                self.params[p]
                for p in ['LUE_max', 'tmin0', 'tmin1', 'vpd0', 'vpd1']
            ]
        n = len(fit_params)
        nrow = n
        ncol = 1
        if n > 4:
            nrow = 2
            ncol = n - (n // 2)
        fig, axes = pyplot.subplots(
            nrow, ncol, figsize = (n * 2, n), sharey = True)
        i = 0
        for j in range(nrow):
            for k in range(ncol):
                if i >= n:
                    break
                free_param = fit_params[i]
                fixed = np.stack([
                    fit[p].values for p in fit_params
                ])[np.newaxis,:].repeat(30, axis = 0)
                sweep = np.linspace(
                    trace['posterior'][free_param].min(),
                    trace['posterior'][free_param].max(), num = 30)
                fixed[:,i] = sweep
                # Need to concatenate GPP parameters at begining of fixed
                scores = -np.array(self.score_posterior(
                    observed, drivers, [
                        [*constants, *f] for f in fixed.tolist()
                    ]))
                axes[j,k].plot(sweep, scores, 'k-')
                axes[j,k].axvline(
                    fit[free_param], color = 'red', linestyle = 'dashed',
                    label = 'Posterior Mean')
                axes[j,k].set_xlabel(free_param)
                axes[j,k].set_title(free_param)
                axes[j,k].legend()
                i += 1
        # Delete the last empty subplot
        if n % 2 != 0:
            fig.delaxes(axes.flatten()[-1])
        axes[0, 0].set_ylabel('Score')
        pyplot.tight_layout()
        pyplot.show()

    def plot_posterior(self, **kwargs):
        '''
        Plots the posterior distribution for an MCMC sample.

        Parameters
        ----------
        **kwargs
            Additional keyword arguments to `arviz.plot_posterior()`.
        '''
        assert os.path.exists(self.backend),\
            'Could not find file backend!'
        trace = az.from_netcdf(self.backend)
        az.plot_posterior(trace, **kwargs)
        pyplot.show()

    def score_posterior(
            self, observed: Sequence, drivers: Sequence, posterior: Sequence,
            method: str = 'rmsd') -> Number:
        '''
        Returns a goodness-of-fit score based on the existing calibration.

        Parameters
        ----------
        observed : Sequence
            Sequence of observed values that will be used to calibrate the model;
            i.e., model is scored by how close its predicted values are to the
            observed values
        drivers : list or tuple
            Sequence of driver datasets to be supplied, in order, to the
            model's run function
        posterior : list or tuple
            Sequence of posterior parameter sets (i.e., nested sequence); each
            nested sequence will be scored
        method : str
            The method for generating a goodness-of-git score
            (Default: "rmsd")

        Returns
        -------
        float
        '''
        if method != 'rmsd':
            raise NotImplementedError('"method" must be one of: "rmsd"')
        score_func = partial(
            rmsd, func = self.model, observed = observed, drivers = drivers)
        with get_context('spawn').Pool() as pool:
            scores = pool.map(score_func, posterior)
        return scores


class StochasticSampler(AbstractSampler):
    '''
    A Markov Chain-Monte Carlo (MCMC) sampler for an arbitrary model. The
    specific sampler used is the Differential Evolution (DE) MCMC algorithm
    described by Ter Braak (2008), though the implementation is specific to
    the PyMC3 library.

    NOTE: The `model` (a function) should be named "_name" where "name" is
    some uniquely identifiable model name. This helps `StochasticSampler.run()`
    to find the correct compiler for the model. The compiler function should
    be named `compiled_name_model()` (where "name" is the model name) and be
    defined on a subclass of `StochasticSampler`.

    Parameters
    ----------
    config : dict
        Dictionary of configuration parameters
    model : Callable
        The function to call (with driver data and parameters); this function
        should take driver data as positional arguments and the model
        parameters as a `*Sequence`; it should require no external state.
    observed : Sequence
        Sequence of observed values that will be used to calibrate the model;
        i.e., model is scored by how close its predicted values are to the
        observed values
    params_dict : dict or None
        Dictionary of model parameters, to be used as initial values and as
        the basis for constructing a new dictionary of optimized parameters
    backend : str or None
        Path to a NetCDF4 file backend (Default: None)
    weights : Sequence or None
        Optional sequence of weights applied to the model residuals (as in
        weighted least squares)
    '''
    def __init__(
            self, config: dict, model: Callable, params_dict: dict = None,
            backend: str = None, weights: Sequence = None,
            model_name: str = None):
        self.backend = backend
        # Convert the BOUNDS into nested dicts for easy use
        self.bounds = dict([
            (key, dict([('lower', b[0]), ('upper', b[1])]))
            for key, b in config['optimization']['bounds'].items()
        ])
        self.config = config
        self.model = model
        if hasattr(model, '__name__'):
            self.name = model.__name__.strip('_').upper() # "_gpp" = "GPP"
        else:
            self.name = model_name
        self.params = params_dict
        # Set the model's prior distribution assumptions
        self.prior = dict()
        for key in self.required_parameters[self.name]:
            # NOTE: This is the default only for LUE_max; other parameters,
            #   with Uniform distributions, don't use anything here
            self.prior.setdefault(key, {
                'mu': params_dict[key],
                'sigma': 2e-4
            })
        self.weights = weights
        assert os.path.exists(os.path.dirname(backend))

    def run(
            self, observed: Sequence, drivers: Sequence,
            draws = 1000, chains = 3, tune = 'lambda',
            scaling: float = 1e-3, prior: dict = dict(),
            check_shape: bool = False, save_fig: bool = False,
            var_names: Sequence = None) -> None:
        '''
        Fits the model using DE-MCMCz approach. `tune="lambda"` (default) is
        recommended; lambda is related to the scale of the jumps learned from
        other chains, but epsilon ("scaling") controls the scale directly.
        Using a larger value for `scaling` (Default: 1e-3) will produce larger
        jumps and may directly address "sticky" chains.

        Parameters
        ----------
        observed : Sequence
            The observed data the model will be calibrated against
        drivers : list or tuple
            Sequence of driver datasets to be supplied, in order, to the
            model's run function
        draws : int
            Number of samples to draw (on each chain); defaults to 1000
        chains : int
            Number of chains; defaults to 3
        tune : str or None
            Which hyperparameter to tune: Defaults to 'lambda', but can also
            be 'scaling' or None.
        scaling : float
            Initial scale factor for epsilon (Default: 1e-3)
        prior : dict
        check_shape : bool
            True to require that input driver datasets have the same shape as
            the observed values (Default: False)
        save_fig : bool
            True to save figures to files instead of showing them
            (Default: False)
        var_names : Sequence
            One or more variable names to show in the plot
        '''
        assert not check_shape or drivers[0].shape == observed.shape,\
            'Driver data should have the same shape as the "observed" data'
        assert len(drivers) == len(self.required_drivers[self.name]),\
            'Did not receive expected number of driver datasets!'
        assert tune in ('lambda', 'scaling') or tune is None
        # Update prior assumptions
        self.prior.update(prior)
        # Generate an initial goodness-of-fit score
        predicted = self.model([
            self.params[p] for p in self.required_parameters[self.name]
        ], *drivers)
        if self.weights is not None:
            score = np.sqrt(
                np.nanmean(((predicted - observed) * self.weights) ** 2))
        else:
            score = np.sqrt(np.nanmean(((predicted - observed)) ** 2))
        print('-- RMSD at the initial point: %.3f' % score)
        print('Compiling model...')
        try:
            compiler = getattr(self, 'compile_%s_model' % self.name.lower())
        except AttributeError:
            raise AttributeError('''Could not find a compiler for model named
            "%s"; make sure that a function "compile_%s_model()" is defined on
             this class''' % (model_name, model_name))
        with compiler(observed, drivers) as model:
            with warnings.catch_warnings():
                warnings.simplefilter('ignore')
                step_func = pm.DEMetropolisZ(tune = tune, scaling = scaling)
                trace = pm.sample(
                    draws = draws, step = step_func, cores = chains,
                    chains = chains, idata_kwargs = {'log_likelihood': True})
            if self.backend is not None:
                print('Writing results to file...')
                trace.to_netcdf(self.backend)
            if var_names is None:
                az.plot_trace(trace, var_names = ['~log_likelihood'])
            else:
                az.plot_trace(trace, var_names = var_names)
            if save_fig:
                pyplot.savefig('.'.join(self.backend.split('.')[:-1]) + '.png')
            else:
                pyplot.show()


class MOD17StochasticSampler(StochasticSampler):
    '''
    A Markov Chain-Monte Carlo (MCMC) sampler for MOD17. The specific sampler
    used is the Differential Evolution (DE) MCMC algorithm described by
    Ter Braak (2008), though the implementation is specific to the PyMC3
    library.

    Considerations:

    1. Tower GPP is censored when values are < 0 or when APAR is
        < 0.1 MJ m-2 d-1.

    Parameters
    ----------
    config : dict
        Dictionary of configuration parameters
    model : Callable
        The function to call (with driver data and parameters); this function
        should take driver data as positional arguments and the model
        parameters as a `*Sequence`; it should require no external state.
    observed : Sequence
        Sequence of observed values that will be used to calibrate the model;
        i.e., model is scored by how close its predicted values are to the
        observed values
    params_dict : dict or None
        Dictionary of model parameters, to be used as initial values and as
        the basis for constructing a new dictionary of optimized parameters
    backend : str or None
        Path to a NetCDF4 file backend (Default: None)
    weights : Sequence or None
        Optional sequence of weights applied to the model residuals (as in
        weighted least squares)
    '''
    # NOTE: This is different than for mod17.MOD17 because we haven't yet
    #   figured out how the respiration terms are calculated
    required_parameters = {
        'GPP': ['LUE_max', 'tmin0', 'tmin1', 'vpd0', 'vpd1'],
        'NPP': MOD17.required_parameters
    }
    required_drivers = {
        'GPP': ['fPAR', 'Tmin', 'VPD', 'PAR'],
        'NPP': ['fPAR', 'Tmin', 'VPD', 'PAR', 'LAI', 'Tmean', 'years']
    }

    def compile_gpp_model(
            self, observed: Sequence, drivers: Sequence) -> pm.Model:
        '''
        Creates a new GPP model based on the prior distribution. Model can be
        re-compiled multiple times, e.g., for cross validation.

        Parameters
        ----------
        observed : Sequence
            Sequence of observed values that will be used to calibrate the model;
            i.e., model is scored by how close its predicted values are to the
            observed values
        drivers : list or tuple
            Sequence of driver datasets to be supplied, in order, to the
            model's run function

        Returns
        -------
        pm.Model
        '''
        # Define the objective/ likelihood function
        log_likelihood = BlackBoxLikelihood(
            self.model, observed, x = drivers, weights = self.weights,
            objective = self.config['optimization']['objective'].lower())
        # With this context manager, "all PyMC3 objects introduced in the indented
        #   code block...are added to the model behind the scenes."
        with pm.Model() as model:
            # (Stochstic) Priors for unknown model parameters
            LUE_max = pm.TruncatedNormal('LUE_max',
                **self.prior['LUE_max'], **self.bounds['LUE_max'])
            # NOTE: tmin0, vpd0 are fixed at Collection 6.1 values
            tmin0 = self.params['tmin0']
            tmin1 = pm.Uniform('tmin1', **self.bounds['tmin1'])
            # NOTE: Upper bound on `vpd1` is set by the maximum observed VPD
            vpd0 = self.params['vpd0']
            vpd1 = pm.Uniform('vpd1',
                lower = self.bounds['vpd1']['lower'],
                upper = drivers[2].max().round(0))
            # Convert model parameters to a tensor vector
            params_list = [LUE_max, tmin0, tmin1, vpd0, vpd1]
            params = at.as_tensor_variable(params_list)
            # Key step: Define the log-likelihood as an added potential
            pm.Potential('likelihood', log_likelihood(params))
        return model

    def compile_npp_model(
            self, observed: Sequence, drivers: Sequence) -> pm.Model:
        '''
        Creates a new NPP model based on the prior distribution. Model can be
        re-compiled multiple times, e.g., for cross validation.

        Parameters
        ----------
        observed : Sequence
            Sequence of observed values that will be used to calibrate the model;
            i.e., model is scored by how close its predicted values are to the
            observed values
        drivers : list or tuple
            Sequence of driver datasets to be supplied, in order, to the
            model's run function

        Returns
        -------
        pm.Model
        '''
        # Define the objective/ likelihood function
        log_likelihood = BlackBoxLikelihood(
            self.model, observed, x = drivers, weights = self.weights,
            objective = self.config['optimization']['objective'].lower())
        # With this context manager, "all PyMC3 objects introduced in the indented
        #   code block...are added to the model behind the scenes."
        with pm.Model() as model:
            # Setting GPP parameters that are known--EXCEPT tmin1
            LUE_max = self.params['LUE_max']
            tmin0   = self.params['tmin0']
            tmin1   = self.params['tmin1']
            vpd0    = self.params['vpd0']
            vpd1    = self.params['vpd1']
            # SLA fixed at prior mean
            SLA = np.exp(self.prior['SLA']['mu'])
            # Allometry ratios prescribe narrow range around Collection 6.1 values
            froot_leaf_ratio = pm.Triangular(
                'froot_leaf_ratio', **self.prior['froot_leaf_ratio'])
            # (Stochstic) Priors for unknown model parameters
            Q10_froot = pm.TruncatedNormal(
                'Q10_froot', **self.prior['Q10_froot'], **self.bounds['Q10'])
            leaf_mr_base = pm.LogNormal(
                'leaf_mr_base', **self.prior['leaf_mr_base'])
            froot_mr_base = pm.LogNormal(
                'froot_mr_base', **self.prior['froot_mr_base'])
            # For GRS and CRO, livewood mass and respiration are zero
            if np.equal(list(self.prior['livewood_mr_base'].values()), [0, 0]).all():
                livewood_leaf_ratio = 0
                livewood_mr_base = 0
                Q10_livewood = 0
            else:
                livewood_leaf_ratio = pm.Triangular(
                    'livewood_leaf_ratio', **self.prior['livewood_leaf_ratio'])
                livewood_mr_base = pm.LogNormal(
                    'livewood_mr_base', **self.prior['livewood_mr_base'])
                Q10_livewood = pm.TruncatedNormal(
                    'Q10_livewood', **self.prior['Q10_livewood'],
                    **self.bounds['Q10'])
            # Convert model parameters to a tensor vector
            params_list = [
                LUE_max, tmin0, tmin1, vpd0, vpd1, SLA,
                Q10_livewood, Q10_froot, froot_leaf_ratio, livewood_leaf_ratio,
                leaf_mr_base, froot_mr_base, livewood_mr_base
            ]
            params = at.as_tensor_variable(params_list)
            # Key step: Define the log-likelihood as an added potential
            pm.Potential('likelihood', log_likelihood(params))
        return model


class CalibrationAPI(object):
    '''
    Convenience class for calibrating the MOD17 GPP and NPP models. Meant to
    be used with `fire.Fire()`.
    '''
    def __init__(self, config = None):
        config_file = config
        if config_file is None:
            config_file = os.path.join(
                MOD17_DIR, 'data/MOD17_calibration_config.json')
        with open(config_file, 'r') as file:
            self.config = json.load(file)
        self.hdf5 = self.config['data']['file']

    def _clean(self, raw: Sequence, drivers: Sequence, protocol: str = 'GPP'):
        'Cleans up data values according to a prescribed protocol'
        if protocol == 'GPP':
            # Filter out observed GPP values when GPP is negative or when
            #   APAR < 0.1 g C m-2 day-1
            apar = drivers['fPAR'] * drivers['PAR']
            return np.where(
                apar < 0.1, np.nan, np.where(raw < 0, np.nan, raw))

    def _filter(self, raw: Sequence, size: int):
        'Apply a smoothing filter with zero phase offset'
        if size > 1:
            window = np.ones(size) / size
            return np.apply_along_axis(
                lambda x: signal.filtfilt(window, np.ones(1), x), 0, raw)
        return raw # Or, revert to the raw data

    def clean_observed(
            self, raw: Sequence, drivers: Sequence, driver_labels: Sequence,
            protocol: str = 'GPP', filter_length: int = 2) -> Sequence:
        '''
        Cleans observed tower flux data according to a prescribed protocol.

        - For GPP data: Removes observations where GPP < 0 or where APAR is
            < 0.1 MJ m-2 day-1

        Parameters
        ----------
        raw : Sequence
        drivers : Sequence
        driver_labels : Sequence
        protocol : str
        filter_length : int
            The window size for the smoothing filter, applied to the observed
            data

        Returns
        -------
        Sequence
        '''
        if protocol != 'GPP':
            raise NotImplementedError('"protocol" must be one of: "GPP"')
        # Read in the observed data and apply smoothing filter
        obs = self._filter(raw, filter_length)
        obs = self._clean(obs, dict([
            (driver_labels[i], data)
            for i, data in enumerate(drivers)
        ]), protocol = 'GPP')
        return obs

    def export_bplut(
            self, model: str, output_path: str, thin: int = 10,
            burn: int = 1000):
        '''
        Export the BPLUT using the posterior mean from the MCMC sampler. NOTE:
        The posterior mean is usually not the best estimate for poorly
        identified parameters.

        Parameters
        ----------
        model : str
            The name of the model ("GPP" or "NPP")
        output_path : str
            The output CSV file path
        thin : int
            Thinning rate
        burn : int
            The burn-in (i.e., first N samples to discard)
        '''
        params_dict = restore_bplut(self.config['BPLUT'][model])
        bplut = params_dict.copy()
        # Filter the parameters to just those for the PFT of interest
        for pft in PFT_VALID:
            backend = self.config['optimization']['backend_template'] %\
                (model, pft)
            params = dict([(k, v[pft]) for k, v in params_dict.items()])
            sampler = MOD17StochasticSampler(
                self.config, getattr(MOD17, '_%s' % model.lower()), params,
                backend = backend)
            trace = sampler.get_trace()
            fit = trace.sel(
                draw = slice(burn, None, thin))['posterior'].mean()
            for each in MOD17.required_parameters:
                try:
                    bplut[each][pft] = float(fit[each])
                except KeyError:
                    continue
        write_bplut(bplut, output_path)

    def export_posterior(
            self, model: str, param: str, output_path: str, thin: int = 10,
            burn: int = 1000, k_folds: int = 1):
        '''
        Exports posterior distribution for a parameter, for each PFT to HDF5.

        Parameters
        ----------
        model : str
            The name of the model ("GPP" or "NPP")
        param : str
            The model parameter to export
        output_path : str
            The output HDF5 file path
        thin : int
            Thinning rate
        burn : int
            The burn-in (i.e., first N samples to discard)
        k_folds : int
            The number of k-folds used in cross-calibration/validation;
            if more than one (default), the folds for each PFT will be
            combined into a single HDF5 file
        '''
        params_dict = restore_bplut(self.config['BPLUT'][model])
        bplut = params_dict.copy()
        # Filter the parameters to just those for the PFT of interest
        post = []
        for pft in PFT_VALID:
            params = dict([(k, v[pft]) for k, v in params_dict.items()])
            backend = self.config['optimization']['backend_template'] %\
                (model, pft)
            post_by_fold = []
            for fold in range(1, k_folds + 1):
                if k_folds > 1:
                    backend = self.config['optimization']['backend_template'] %\
                        (f'{model}-k{fold}', pft)
                sampler = MOD17StochasticSampler(
                    self.config, getattr(MOD17, '_%s' % model.lower()), params,
                    backend = backend)
                trace = sampler.get_trace()
                fit = trace.sel(draw = slice(burn, None, thin))['posterior']
                num_samples = fit.sizes['chain'] * fit.sizes['draw']
                if param in fit:
                    post_by_fold.append(
                        az.extract_dataset(fit, combined = True)[param].values)
                else:
                    # In case there is, e.g., a parameter that takes on a
                    #   constant value for a specific PFT
                    if k_folds > 1:
                        post_by_fold.append(np.ones((1, num_samples)) * np.nan)
                    else:
                        post_by_fold.append(np.ones(num_samples) * np.nan)
            if k_folds > 1:
                post.append(np.vstack(post_by_fold))
            else:
                post.extend(post_by_fold)
        # If not every PFT's posterior has the same number of samples (e.g.,
        #   when one set of chains was run longer than another)...
        if not all([p.shape == post[0].shape for p in post]):
            max_len = max([p.shape for p in post])[0]
            # ...Reshape all posteriors to match the greatest sample size
            import ipdb
            ipdb.set_trace()#FIXME
            post = [
                np.pad(
                    p.astype(np.float32), (0, max_len - p.size),
                    mode = 'constant', constant_values = (np.nan,))
                for p in post
            ]
        with h5py.File(output_path, 'a') as hdf:
            post = np.stack(post)
            ts = datetime.date.today().strftime('%Y-%m-%d') # Today's date
            dataset = hdf.create_dataset(
                f'{param}_posterior', post.shape, np.float32, post)
            dataset.attrs['description'] = 'CalibrationAPI.export_posterior() on {ts}'

    def export_likely_posterior(
            self, model: str, param: str, output_path: str, thin: int = 10,
            burn: int = 1000, ptile: int = 95):
        '''
        Exports posterior distribution for a parameter based on likelihood

        Parameters
        ----------
        model : str
            The name of the model ("GPP" or "NPP")
        param : str
            The model parameter to export
        output_path : str
            The output HDF5 file path
        thin : int
            Thinning rate
        burn : int
            The burn-in (i.e., first N samples to discard)
        ptile : int
            The percentile cutoff for likelihood; only samples at or above
            this likelihood cutoff will be included
        '''
        params_dict = restore_bplut(self.config['BPLUT'][model])
        bplut = params_dict.copy()
        # Filter the parameters to just those for the PFT of interest
        post = []
        likelihood = []
        for pft in PFT_VALID:
            backend = self.config['optimization']['backend_template'] % (model, pft)
            params = dict([(k, v[pft]) for k, v in params_dict.items()])
            sampler = MOD17StochasticSampler(
                self.config, getattr(MOD17, '_%s' % model.lower()), params,
                backend = backend)
            trace = sampler.get_trace()
            fit = trace.sel(draw = slice(burn, None, thin))
            # Find the likelihood value associated with the cutoff percentile
            ll = az.extract_dataset(
                fit, combined = True)['log_likelihood'].values
            values = az.extract_dataset(fit, combined = True)[param].values
            cutoff = np.percentile(ll, ptile)
            post.append(values[ll >= cutoff])
            likelihood.append(ll[ll >= cutoff])
        with h5py.File(output_path, 'a') as hdf:
            n = max([len(p) for p in post])
            # Make sure all arrays are the same size
            post = np.stack([
                np.concatenate((p, np.full((n - len(p),), np.nan)))
                for p in post
            ])
            likelihood = np.stack([
                np.concatenate((p, np.full((n - len(p),), np.nan)))
                for p in likelihood
            ])
            ts = datetime.date.today().strftime('%Y-%m-%d') # Today's date
            dataset = hdf.create_dataset(
                f'{param}_posterior', post.shape, np.float32, post)
            dataset.attrs['description'] = 'CalibrationAPI.export_likely_posterior() on {ts}'
            dataset = hdf.create_dataset(
                f'{param}_likelihood', likelihood.shape, np.float32, likelihood)

    def tune_gpp(
            self, pft: int, plot_trace: bool = False, ipdb: bool = False,
            save_fig: bool = False, **kwargs):
        '''
        Run the MOD17 GPP calibration.

        Parameters
        ----------
        pft : int
            The Plant Functional Type (PFT) to calibrate
        plot_trace : bool
            True to plot the trace for a previous calibration run; this will
            also NOT start a new calibration (Default: False)
        ipdb : bool
            True to drop the user into an ipdb prompt, prior to and instead of
            running calibration
        save_fig : bool
            True to save figures to files instead of showing them
            (Default: False)
        **kwargs
            Additional keyword arguments passed to
            `MOD17StochasticSampler.run()`
        '''
        assert pft in PFT_VALID, f'Invalid PFT: {pft}'
        # Pass configuration parameters to MOD17StochasticSampler.run()
        for key in ('chains', 'draws', 'tune', 'scaling'):
            if key in self.config['optimization'].keys() and not key in kwargs.keys():
                kwargs[key] = self.config['optimization'][key]
        # Filter the parameters to just those for the PFT of interest
        params_dict = restore_bplut(self.config['BPLUT']['GPP'])
        # Load blacklisted sites (if any)
        blacklist = self.config['data']['sites_blacklisted']
        params_dict = dict([(k, v[pft]) for k, v in params_dict.items()])
        model = MOD17(params_dict)
        objective = self.config['optimization']['objective'].lower()

        print('Loading driver datasets...')
        with h5py.File(self.hdf5, 'r') as hdf:
            sites = hdf['FLUXNET/site_id'][:]
            if hasattr(sites[0], 'decode'):
                sites = list(map(lambda x: x.decode('utf-8'), sites))
            # Get dominant PFT
            pft_map = pft_dominant(hdf['state/PFT'][:], site_list = sites)
            # Blacklist various sites
            pft_mask = np.logical_and(pft_map == pft, ~np.in1d(sites, blacklist))
            # NOTE: Converting from Kelvin to Celsius
            tday = hdf['MERRA2/T10M_daytime'][:][:,pft_mask] - 273.15
            qv10m = hdf['MERRA2/QV10M_daytime'][:][:,pft_mask]
            ps = hdf['MERRA2/PS_daytime'][:][:,pft_mask]
            drivers = [ # fPAR, Tmin, VPD, PAR
                hdf['MODIS/MOD15A2HGF_fPAR_interp'][:][:,pft_mask],
                hdf['MERRA2/Tmin'][:][:,pft_mask] - 273.15,
                MOD17.vpd(qv10m, ps, tday),
                MOD17.par(hdf['MERRA2/SWGDN'][:][:,pft_mask]),
            ]
            # Convert fPAR from (%) to [0,1]
            drivers[0] = np.nanmean(drivers[0], axis = -1) / 100
            # If RMSE is used, then we want to pay attention to weighting
            weights = None
            if objective in ('rmsd', 'rmse'):
                weights = hdf['weights'][pft_mask][np.newaxis,:]\
                    .repeat(tday.shape[0], axis = 0)
            # Check that driver data do not contain NaNs
            for d, each in enumerate(drivers):
                name = ('fPAR', 'Tmin', 'VPD', 'PAR')[d]
                assert not np.isnan(each).any(),\
                    f'Driver dataset "{name}" contains NaNs'
            tower_gpp = hdf['FLUXNET/GPP'][:][:,pft_mask]
            # Read the validation mask; mask out observations that are
            #   reserved for validation
            print('Masking out validation data...')
            mask = hdf['FLUXNET/validation_mask'][pft]
            tower_gpp[mask] = np.nan

        # Clean observations, then mask out driver data where the are no
        #   observations
        tower_gpp = self.clean_observed(
            tower_gpp, drivers, MOD17StochasticSampler.required_drivers['GPP'],
            protocol = 'GPP')
        if weights is not None:
            weights = weights[~np.isnan(tower_gpp)]
        for d, _ in enumerate(drivers):
            drivers[d] = drivers[d][~np.isnan(tower_gpp)]
        tower_gpp = tower_gpp[~np.isnan(tower_gpp)]

        print('Initializing sampler...')
        backend = self.config['optimization']['backend_template'] % ('GPP', pft)
        sampler = MOD17StochasticSampler(
            self.config, MOD17._gpp, params_dict, backend = backend,
            weights = weights)
        if plot_trace or ipdb:
            if ipdb:
                import ipdb
                ipdb.set_trace()
            trace = sampler.get_trace()
            az.plot_trace(trace, var_names = MOD17.required_parameters[0:5])
            pyplot.show()
            return
        # Get (informative) priors for just those parameters that have them
        with open(self.config['optimization']['prior'], 'r') as file:
            prior = json.load(file)
        prior_params = filter(
            lambda p: p in prior.keys(), sampler.required_parameters['GPP'])
        prior = dict([
            (p, {'mu': prior[p]['mu'][pft], 'sigma': prior[p]['sigma'][pft]})
            for p in prior_params
        ])
        sampler.run(
            tower_gpp, drivers, prior = prior, save_fig = save_fig, **kwargs)

    def tune_npp(
            self, pft: int, plot_trace: bool = False, ipdb: bool = False,
            save_fig: bool = False, climatology = False,
            cutoff: Number = 2385, k_folds: int = 1, **kwargs):
        r'''
        Run the MOD17 NPP calibration. If k-folds cross-validation is used,
        the model is calibrated on $k$ random subsets of the data and a
        series of file is created, e.g., as:

            MOD17_NPP_calibration_PFT1.h5
            MOD17_NPP-k1_calibration_PFT1.nc4
            MOD17_NPP-k2_calibration_PFT1.nc4
            ...

        Where each `.nc4` file is a standard `arviz` backend and the `.h5`
        indicates which indices from the NPP observations vector, after
        removing NaNs, were excluded (i.e., the indices of the test data).

        Parameters
        ----------
        pft : int
            The Plant Functional Type (PFT) to calibrate
        plot_trace : bool
            True to display the trace plot ONLY and not run calibration
            (Default: False)
        ipdb : bool
            True to drop into an interactive Python debugger (`ipdb`) after
            loading an existing trace (Default: False)
        save_fig : bool
            True to save the post-calibration trace plot to a file instead of
            displaying it (Default: False)
        climatology : bool
            True to use a MERRA-2 climatology (and look for it in the drivers
            file), i.e., use `MERRA2_climatology` group instead of
            `surface_met_MERRA2` group (Default: False)
        cutoff : Number
            Maximum value of observed NPP (g C m-2 year-1); values above this
            cutoff will be discarded and not used in calibration
            (Default: 2385)
        k_folds : int
            Number of folds to use in k-folds cross-validation; defaults to
            k=1, i.e., no cross-validation is performed.
        **kwargs
            Additional keyword arguments passed to
            `MOD17StochasticSampler.run()`
        '''
        assert pft in PFT_VALID, f'Invalid PFT: {pft}'
        prefix = 'MERRA2_climatology' if climatology else 'surface_met_MERRA2'
        params_dict = restore_bplut(self.config['BPLUT']['NPP'])
        # Filter the parameters to just those for the PFT of interest
        params_dict = dict([(k, v[pft]) for k, v in params_dict.items()])
        model = MOD17(params_dict)
        kwargs.update({'var_names': [
            '~LUE_max', '~tmin0', '~tmin1', '~vpd0', '~vpd1', '~log_likelihood'
        ]})
        # Pass configuration parameters to MOD17StochasticSampler.run()
        for key in ('chains', 'draws', 'tune', 'scaling'):
            if key in self.config['optimization'].keys():
                kwargs[key] = self.config['optimization'][key]
        print('Loading driver datasets...')
        with h5py.File(self.hdf5, 'r') as hdf:
            # NOTE: This is only recorded at the site-level; no need to
            #   determine modal PFT across subgrid
            pft_map = hdf['NPP/PFT'][:]
            # Leave out sites where there is no fPAR (and no LAI) data
            fpar = hdf['NPP/MOD15A2H_fPAR_clim_filt'][:]
            mask = np.logical_and(
                    pft_map == pft, ~np.isnan(np.nanmean(fpar, axis = -1))\
                .all(axis = 0))
            # NOTE: Converting from Kelvin to Celsius
            tday = hdf[f'NPP/{prefix}/T10M_daytime'][:][:,mask] - 273.15
            qv10m = hdf[f'NPP/{prefix}/QV10M_daytime'][:][:,mask]
            ps = hdf[f'NPP/{prefix}/PS_daytime'][:][:,mask]
            drivers = [ # fPAR, Tmin, VPD, PAR, LAI, Tmean, years
                hdf['NPP/MOD15A2H_fPAR_clim_filt'][:][:,mask],
                hdf[f'NPP/{prefix}/Tmin'][:][:,mask]  - 273.15,
                MOD17.vpd(qv10m, ps, tday),
                MOD17.par(hdf[f'NPP/{prefix}/SWGDN'][:][:,mask]),
                hdf['NPP/MOD15A2H_LAI_clim_filt'][:][:,mask],
                hdf[f'NPP/{prefix}/T10M'][:][:,mask] - 273.15,
                np.full(ps.shape, 1) # i.e., A 365-day climatological year ("Year 1")
            ]
            observed_npp = hdf['NPP/NPP_total'][:][mask]
        if cutoff is not None:
            observed_npp[observed_npp > cutoff] = np.nan
        # Set negative VPD to zero
        drivers[2] = np.where(drivers[2] < 0, 0, drivers[2])
        # Convert fPAR from (%) to [0,1] and re-scale LAI; reshape fPAR, LAI
        drivers[0] = np.nanmean(drivers[0], axis = -1) * 0.01
        drivers[4] = np.nanmean(drivers[4], axis = -1) * 0.1
        # TODO Mask out driver data where the are no observations
        for d, _ in enumerate(drivers):
            drivers[d] = drivers[d][:,~np.isnan(observed_npp)]
        observed_npp = observed_npp[~np.isnan(observed_npp)]

        if k_folds > 1:
            # Back-up the original (complete) datasets
            _drivers = [d.copy() for d in drivers]
            _observed_npp = observed_npp.copy()
            # Randomize the indices of the NPP data
            indices = np.arange(0, observed_npp.size)
            np.random.shuffle(indices)
            # Get the starting and ending index of each fold
            fold_idx = np.array([indices.size // k_folds] * k_folds) * np.arange(0, k_folds)
            fold_idx = list(map(list, zip(fold_idx, fold_idx + indices.size // k_folds)))
            # Ensure that the entire dataset is used
            fold_idx[-1][-1] = indices.max()
            idx_test = [indices[start:end] for start, end in fold_idx]

        # Loop over each fold (or the entire dataset, if num. folds == 1)
        for k, fold in enumerate(range(1, k_folds + 1)):
            backend = self.config['optimization']['backend_template'] % ('NPP', pft)
            if k_folds > 1 and fold == 1:
                # Create an HDF5 file with the same name as the (original)
                #   netCDF4 back-end, store the test indices
                with h5py.File(backend.replace('nc4', 'h5'), 'w') as hdf:
                    out = list(idx_test)
                    size = indices.size // k_folds
                    try:
                        out = np.stack(out)
                    except ValueError:
                        size = max((o.size for o in out))
                        for i in range(0, len(out)):
                            out[i] = np.concatenate((out[i], [np.nan] * (size - out[i].size)))
                    hdf.create_dataset(
                        'test_indices', (k_folds, size), np.int32, np.stack(out))
                # Restore the original NPP dataset
                observed_npp = _observed_npp.copy()
                # Set to NaN all the test indices
                idx = idx_test[k]
                observed_npp[idx] = np.nan
                # Same for drivers, after restoring from the original
                drivers = [d.copy()[:,~np.isnan(observed_npp)] for d in _drivers]
                observed_npp = observed_npp[~np.isnan(observed_npp)]
            # Use a different naming scheme for the backend
            if k_folds > 1:
                backend = self.config['optimization']['backend_template'] % (f'NPP-k{fold}', pft)

            print('Initializing sampler...')
            sampler = MOD17StochasticSampler(
                self.config, MOD17._npp, params_dict, backend = backend,
                model_name = 'NPP')
            if plot_trace or ipdb:
                if ipdb:
                    import ipdb
                    ipdb.set_trace()
                trace = sampler.get_trace()
                az.plot_trace(
                    trace, var_names = MOD17.required_parameters[5:])
                pyplot.show()
                return
            # Get (informative) priors for just those parameters that have them
            with open(self.config['optimization']['prior'], 'r') as file:
                prior = json.load(file)
            prior_params = filter(
                lambda p: p in prior.keys(), sampler.required_parameters['NPP'])
            prior = dict([
                (p, prior[p]) for p in prior_params
            ])
            for key in prior.keys():
                # And make sure to subset to the chosen PFT!
                for arg in prior[key].keys():
                    prior[key][arg] = prior[key][arg][pft]
            sampler.run(
                observed_npp, drivers, prior = prior, save_fig = save_fig,
                **kwargs)


if __name__ == '__main__':
    import fire
    with warnings.catch_warnings():
        warnings.simplefilter('ignore')
        fire.Fire(CalibrationAPI)

Classes

class AbstractSampler

Generic algorithm for fitting a model to data based on observed values similar to what we can produce with our model. Not intended to be called directly.

Expand source code
class AbstractSampler(object):
    '''
    Generic algorithm for fitting a model to data based on observed values
    similar to what we can produce with our model. Not intended to be called
    directly.
    '''

    def get_posterior(self, thin: int = 1) -> np.ndarray:
        '''
        Returns a stacked posterior array, with optional thinning, combining
        all the chains together.

        Parameters
        ----------
        thin : int

        Returns
        -------
        numpy.ndarray
        '''
        trace = az.from_netcdf(self.backend)
        return np.stack([ # i.e., get every ith element, each chain
            trace['posterior'][p].values[:,::thin].ravel()
            for p in self.required_parameters[self.name]
        ], axis = -1)

    def get_trace(
            self, thin: int = None, burn: int = None
        ) -> az.data.inference_data.InferenceData:
        '''
        Extracts the trace from the backend data store.

        Parameters
        ----------
        thin : int
            Thinning rate
        burn : int
            The burn-in (i.e., first N samples to discard)
        '''
        trace = az.from_netcdf(self.backend)
        if thin is None and burn is None:
            return trace
        return trace.sel(draw = slice(burn, None, thin))

    def plot_autocorr(self, thin: int = None, burn: int = None, **kwargs):
        '''
        Auto-correlation plot for an MCMC sample.

        Parameters
        ----------
        thin : int
            Thinning rate
        burn : int
            The burn-in (i.e., first N samples to discard)
        **kwargs
            Additional keyword arguments to `arviz.plot_autocorr()`.
        '''
        assert os.path.exists(self.backend),\
            'Could not find file backend!'
        trace = az.from_netcdf(self.backend)
        kwargs.setdefault('combined', True)
        if thin is None:
            az.plot_autocorr(trace, **kwargs)
        else:
            burn = 0 if burn is None else burn
            az.plot_autocorr(
                trace.sel(draw = slice(burn, None, thin))['posterior'],
                **kwargs)
        pyplot.show()

    def plot_forest(self, **kwargs):
        '''
        Forest plot for an MCMC sample.

        In particular:

        - `hdi_prob`: A float indicating the highest density interval (HDF) to
            plot
        '''
        assert os.path.exists(self.backend),\
            'Could not find file backend!'
        trace = az.from_netcdf(self.backend)
        az.plot_forest(trace, **kwargs)
        pyplot.show()

    def plot_pair(self, **kwargs):
        '''
        Paired variables plot for an MCMC sample.

        Parameters
        ----------
        **kwargs
            Additional keyword arguments to `arviz.plot_pair()`.
        '''
        assert os.path.exists(self.backend),\
            'Could not find file backend!'
        trace = az.from_netcdf(self.backend)
        az.plot_pair(trace, **kwargs)
        pyplot.show()

    def plot_partial_score(
            self, observed: Sequence, drivers: Sequence, fit: dict = None):
        '''
        Plots the "partial effect" of a single parameter: the score of the
        model at that parameter's current value against a sweep of possible
        parameter values. All other parameters are held fixed at the best-fit
        values.

        Parameters
        ----------
        observed : Sequence
            Sequence of observed values that will be used to calibrate the model;
            i.e., model is scored by how close its predicted values are to the
            observed values
        drivers : list or tuple
            Sequence of driver datasets to be supplied, in order, to the
            model's run function
        fit : dict or None
            The best-fit parameter values used for those parameters that are
            fixed
        '''
        trace = az.from_netcdf(self.backend)
        if fit is None:
            # Mean of posterior are "best fit" values
            fit = trace['posterior'].mean()
        fit_params = list(filter(
            lambda p: p in fit, self.required_parameters[self.name]))
        # The NPP model depends on constants not included in the fit
        constants = []
        if self.name == 'NPP':
            constants = [
                self.params[p]
                for p in ['LUE_max', 'tmin0', 'tmin1', 'vpd0', 'vpd1']
            ]
        n = len(fit_params)
        nrow = n
        ncol = 1
        if n > 4:
            nrow = 2
            ncol = n - (n // 2)
        fig, axes = pyplot.subplots(
            nrow, ncol, figsize = (n * 2, n), sharey = True)
        i = 0
        for j in range(nrow):
            for k in range(ncol):
                if i >= n:
                    break
                free_param = fit_params[i]
                fixed = np.stack([
                    fit[p].values for p in fit_params
                ])[np.newaxis,:].repeat(30, axis = 0)
                sweep = np.linspace(
                    trace['posterior'][free_param].min(),
                    trace['posterior'][free_param].max(), num = 30)
                fixed[:,i] = sweep
                # Need to concatenate GPP parameters at begining of fixed
                scores = -np.array(self.score_posterior(
                    observed, drivers, [
                        [*constants, *f] for f in fixed.tolist()
                    ]))
                axes[j,k].plot(sweep, scores, 'k-')
                axes[j,k].axvline(
                    fit[free_param], color = 'red', linestyle = 'dashed',
                    label = 'Posterior Mean')
                axes[j,k].set_xlabel(free_param)
                axes[j,k].set_title(free_param)
                axes[j,k].legend()
                i += 1
        # Delete the last empty subplot
        if n % 2 != 0:
            fig.delaxes(axes.flatten()[-1])
        axes[0, 0].set_ylabel('Score')
        pyplot.tight_layout()
        pyplot.show()

    def plot_posterior(self, **kwargs):
        '''
        Plots the posterior distribution for an MCMC sample.

        Parameters
        ----------
        **kwargs
            Additional keyword arguments to `arviz.plot_posterior()`.
        '''
        assert os.path.exists(self.backend),\
            'Could not find file backend!'
        trace = az.from_netcdf(self.backend)
        az.plot_posterior(trace, **kwargs)
        pyplot.show()

    def score_posterior(
            self, observed: Sequence, drivers: Sequence, posterior: Sequence,
            method: str = 'rmsd') -> Number:
        '''
        Returns a goodness-of-fit score based on the existing calibration.

        Parameters
        ----------
        observed : Sequence
            Sequence of observed values that will be used to calibrate the model;
            i.e., model is scored by how close its predicted values are to the
            observed values
        drivers : list or tuple
            Sequence of driver datasets to be supplied, in order, to the
            model's run function
        posterior : list or tuple
            Sequence of posterior parameter sets (i.e., nested sequence); each
            nested sequence will be scored
        method : str
            The method for generating a goodness-of-git score
            (Default: "rmsd")

        Returns
        -------
        float
        '''
        if method != 'rmsd':
            raise NotImplementedError('"method" must be one of: "rmsd"')
        score_func = partial(
            rmsd, func = self.model, observed = observed, drivers = drivers)
        with get_context('spawn').Pool() as pool:
            scores = pool.map(score_func, posterior)
        return scores

Subclasses

Methods

def get_posterior(self, thin: int = 1) ‑> numpy.ndarray

Returns a stacked posterior array, with optional thinning, combining all the chains together.

Parameters

thin : int
 

Returns

numpy.ndarray
 
Expand source code
def get_posterior(self, thin: int = 1) -> np.ndarray:
    '''
    Returns a stacked posterior array, with optional thinning, combining
    all the chains together.

    Parameters
    ----------
    thin : int

    Returns
    -------
    numpy.ndarray
    '''
    trace = az.from_netcdf(self.backend)
    return np.stack([ # i.e., get every ith element, each chain
        trace['posterior'][p].values[:,::thin].ravel()
        for p in self.required_parameters[self.name]
    ], axis = -1)
def get_trace(self, thin: int = None, burn: int = None) ‑> arviz.data.inference_data.InferenceData

Extracts the trace from the backend data store.

Parameters

thin : int
Thinning rate
burn : int
The burn-in (i.e., first N samples to discard)
Expand source code
def get_trace(
        self, thin: int = None, burn: int = None
    ) -> az.data.inference_data.InferenceData:
    '''
    Extracts the trace from the backend data store.

    Parameters
    ----------
    thin : int
        Thinning rate
    burn : int
        The burn-in (i.e., first N samples to discard)
    '''
    trace = az.from_netcdf(self.backend)
    if thin is None and burn is None:
        return trace
    return trace.sel(draw = slice(burn, None, thin))
def plot_autocorr(self, thin: int = None, burn: int = None, **kwargs)

Auto-correlation plot for an MCMC sample.

Parameters

thin : int
Thinning rate
burn : int
The burn-in (i.e., first N samples to discard)
**kwargs
Additional keyword arguments to arviz.plot_autocorr().
Expand source code
def plot_autocorr(self, thin: int = None, burn: int = None, **kwargs):
    '''
    Auto-correlation plot for an MCMC sample.

    Parameters
    ----------
    thin : int
        Thinning rate
    burn : int
        The burn-in (i.e., first N samples to discard)
    **kwargs
        Additional keyword arguments to `arviz.plot_autocorr()`.
    '''
    assert os.path.exists(self.backend),\
        'Could not find file backend!'
    trace = az.from_netcdf(self.backend)
    kwargs.setdefault('combined', True)
    if thin is None:
        az.plot_autocorr(trace, **kwargs)
    else:
        burn = 0 if burn is None else burn
        az.plot_autocorr(
            trace.sel(draw = slice(burn, None, thin))['posterior'],
            **kwargs)
    pyplot.show()
def plot_forest(self, **kwargs)

Forest plot for an MCMC sample.

In particular:

  • hdi_prob: A float indicating the highest density interval (HDF) to plot
Expand source code
def plot_forest(self, **kwargs):
    '''
    Forest plot for an MCMC sample.

    In particular:

    - `hdi_prob`: A float indicating the highest density interval (HDF) to
        plot
    '''
    assert os.path.exists(self.backend),\
        'Could not find file backend!'
    trace = az.from_netcdf(self.backend)
    az.plot_forest(trace, **kwargs)
    pyplot.show()
def plot_pair(self, **kwargs)

Paired variables plot for an MCMC sample.

Parameters

**kwargs
Additional keyword arguments to arviz.plot_pair().
Expand source code
def plot_pair(self, **kwargs):
    '''
    Paired variables plot for an MCMC sample.

    Parameters
    ----------
    **kwargs
        Additional keyword arguments to `arviz.plot_pair()`.
    '''
    assert os.path.exists(self.backend),\
        'Could not find file backend!'
    trace = az.from_netcdf(self.backend)
    az.plot_pair(trace, **kwargs)
    pyplot.show()
def plot_partial_score(self, observed: Sequence[+T_co], drivers: Sequence[+T_co], fit: dict = None)

Plots the "partial effect" of a single parameter: the score of the model at that parameter's current value against a sweep of possible parameter values. All other parameters are held fixed at the best-fit values.

Parameters

observed : Sequence
Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values
drivers : list or tuple
Sequence of driver datasets to be supplied, in order, to the model's run function
fit : dict or None
The best-fit parameter values used for those parameters that are fixed
Expand source code
def plot_partial_score(
        self, observed: Sequence, drivers: Sequence, fit: dict = None):
    '''
    Plots the "partial effect" of a single parameter: the score of the
    model at that parameter's current value against a sweep of possible
    parameter values. All other parameters are held fixed at the best-fit
    values.

    Parameters
    ----------
    observed : Sequence
        Sequence of observed values that will be used to calibrate the model;
        i.e., model is scored by how close its predicted values are to the
        observed values
    drivers : list or tuple
        Sequence of driver datasets to be supplied, in order, to the
        model's run function
    fit : dict or None
        The best-fit parameter values used for those parameters that are
        fixed
    '''
    trace = az.from_netcdf(self.backend)
    if fit is None:
        # Mean of posterior are "best fit" values
        fit = trace['posterior'].mean()
    fit_params = list(filter(
        lambda p: p in fit, self.required_parameters[self.name]))
    # The NPP model depends on constants not included in the fit
    constants = []
    if self.name == 'NPP':
        constants = [
            self.params[p]
            for p in ['LUE_max', 'tmin0', 'tmin1', 'vpd0', 'vpd1']
        ]
    n = len(fit_params)
    nrow = n
    ncol = 1
    if n > 4:
        nrow = 2
        ncol = n - (n // 2)
    fig, axes = pyplot.subplots(
        nrow, ncol, figsize = (n * 2, n), sharey = True)
    i = 0
    for j in range(nrow):
        for k in range(ncol):
            if i >= n:
                break
            free_param = fit_params[i]
            fixed = np.stack([
                fit[p].values for p in fit_params
            ])[np.newaxis,:].repeat(30, axis = 0)
            sweep = np.linspace(
                trace['posterior'][free_param].min(),
                trace['posterior'][free_param].max(), num = 30)
            fixed[:,i] = sweep
            # Need to concatenate GPP parameters at begining of fixed
            scores = -np.array(self.score_posterior(
                observed, drivers, [
                    [*constants, *f] for f in fixed.tolist()
                ]))
            axes[j,k].plot(sweep, scores, 'k-')
            axes[j,k].axvline(
                fit[free_param], color = 'red', linestyle = 'dashed',
                label = 'Posterior Mean')
            axes[j,k].set_xlabel(free_param)
            axes[j,k].set_title(free_param)
            axes[j,k].legend()
            i += 1
    # Delete the last empty subplot
    if n % 2 != 0:
        fig.delaxes(axes.flatten()[-1])
    axes[0, 0].set_ylabel('Score')
    pyplot.tight_layout()
    pyplot.show()
def plot_posterior(self, **kwargs)

Plots the posterior distribution for an MCMC sample.

Parameters

**kwargs
Additional keyword arguments to arviz.plot_posterior().
Expand source code
def plot_posterior(self, **kwargs):
    '''
    Plots the posterior distribution for an MCMC sample.

    Parameters
    ----------
    **kwargs
        Additional keyword arguments to `arviz.plot_posterior()`.
    '''
    assert os.path.exists(self.backend),\
        'Could not find file backend!'
    trace = az.from_netcdf(self.backend)
    az.plot_posterior(trace, **kwargs)
    pyplot.show()
def score_posterior(self, observed: Sequence[+T_co], drivers: Sequence[+T_co], posterior: Sequence[+T_co], method: str = 'rmsd') ‑> numbers.Number

Returns a goodness-of-fit score based on the existing calibration.

Parameters

observed : Sequence
Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values
drivers : list or tuple
Sequence of driver datasets to be supplied, in order, to the model's run function
posterior : list or tuple
Sequence of posterior parameter sets (i.e., nested sequence); each nested sequence will be scored
method : str
The method for generating a goodness-of-git score (Default: "rmsd")

Returns

float
 
Expand source code
def score_posterior(
        self, observed: Sequence, drivers: Sequence, posterior: Sequence,
        method: str = 'rmsd') -> Number:
    '''
    Returns a goodness-of-fit score based on the existing calibration.

    Parameters
    ----------
    observed : Sequence
        Sequence of observed values that will be used to calibrate the model;
        i.e., model is scored by how close its predicted values are to the
        observed values
    drivers : list or tuple
        Sequence of driver datasets to be supplied, in order, to the
        model's run function
    posterior : list or tuple
        Sequence of posterior parameter sets (i.e., nested sequence); each
        nested sequence will be scored
    method : str
        The method for generating a goodness-of-git score
        (Default: "rmsd")

    Returns
    -------
    float
    '''
    if method != 'rmsd':
        raise NotImplementedError('"method" must be one of: "rmsd"')
    score_func = partial(
        rmsd, func = self.model, observed = observed, drivers = drivers)
    with get_context('spawn').Pool() as pool:
        scores = pool.map(score_func, posterior)
    return scores
class BlackBoxLikelihood (model: Callable, observed: Sequence[+T_co], x: Sequence[+T_co] = None, weights: Sequence[+T_co] = None, objective: str = 'rmsd')

A custom Theano operator that calculates the "likelihood" of model parameters; it takes a vector of values (the parameters that define our model) and returns a single "scalar" value (the log-likelihood).

Parameters

model : Callable
An arbitrary "black box" function that takes two arguments: the model parameters ("params") and the forcing data ("x")
observed : numpy.ndarray
The "observed" data that our log-likelihood function takes in
x : numpy.ndarray or None
The forcing data (input drivers) that our model requires, or None if no driver data are required
weights : Sequence or None
Optional sequence of weights applied to the model residuals (as in weighted least squares)
objective : str
Name of the objective (or "loss") function to use, one of ('rmsd', 'gaussian', 'kge'); defaults to "rmsd"

Initialise the Op with various things that our log-likelihood function requires. The observed data ("observed") and drivers ("x") must be stored on the instance so the Theano Op can work seamlessly.

Expand source code
class BlackBoxLikelihood(at.Op):
    '''
    A custom Theano operator that calculates the "likelihood" of model
    parameters; it takes a vector of values (the parameters that define our
    model) and returns a single "scalar" value (the log-likelihood).

    Parameters
    ----------
    model : Callable
        An arbitrary "black box" function that takes two arguments: the
        model parameters ("params") and the forcing data ("x")
    observed : numpy.ndarray
        The "observed" data that our log-likelihood function takes in
    x : numpy.ndarray or None
        The forcing data (input drivers) that our model requires, or None
        if no driver data are required
    weights : Sequence or None
        Optional sequence of weights applied to the model residuals (as in
        weighted least squares)
    objective : str
        Name of the objective (or "loss") function to use, one of
        ('rmsd', 'gaussian', 'kge'); defaults to "rmsd"
    '''
    itypes = [at.dvector] # Expects a vector of parameter values when called
    otypes = [at.dscalar] # Outputs a single scalar value (the log likelihood)

    def __init__(
            self, model: Callable, observed: Sequence, x: Sequence = None,
            weights: Sequence = None, objective: str = 'rmsd'):
        '''
        Initialise the Op with various things that our log-likelihood function
        requires. The observed data ("observed") and drivers ("x") must be
        stored on the instance so the Theano Op can work seamlessly.
        '''
        self.model = model
        self.observed = observed
        self.x = x
        self.weights = weights
        if objective in ('rmsd', 'rmse'):
            self._loglik = self.loglik
        elif objective == 'gaussian':
            self._loglik = self.loglik_gaussian
        elif objective == 'kge':
            self._loglik = self.loglik_kge
        else:
            raise ValueError('Unknown "objective" function specified')

    def loglik(
            self, params: Sequence, observed: Sequence,
            x: Sequence = None) -> Number:
        '''
        Pseudo-log likelihood, based on the root-mean squared deviation
        (RMSD). The sign of the RMSD is forced to be negative so as to allow
        for maximization of this objective function.

        Parameters
        ----------
        params : Sequence
            One or more model parameters
        observed : Sequence
            The observed values
        x : Sequence or None
            Input driver data

        Returns
        -------
        Number
            The (negative) root-mean squared deviation (RMSD) between the
            predicted and observed values
        '''
        predicted = self.model(params, *x)
        if self.weights is not None:
            return -np.sqrt(
                np.nanmean(((predicted - observed) * self.weights) ** 2))
        return -np.sqrt(np.nanmean(((predicted - observed)) ** 2))

    def loglik_gaussian(
            self, params: Sequence, observed: Sequence,
            x: Sequence = None) -> Number:
        '''
        Gaussian log-likelihood, assuming independent, identically distributed
        observations.

        Parameters
        ----------
        params : Sequence
            One or more model parameters
        observed : Sequence
            The observed values
        x : Sequence or None
            Input driver data

        Returns
        -------
        Number
            The (negative) log-likelihood
        '''
        predicted = self.model(params, *x)
        sigma = params[-1]
        # Gaussian log-likelihood;
        # -\frac{N}{2}\,\mathrm{log}(2\pi\hat{\sigma}^2)
        #   - \frac{1}{2\hat{\sigma}^2} \sum (\hat{y} - y)^2
        return -0.5 * np.log(2 * np.pi * sigma**2) - (0.5 / sigma**2) *\
            np.nansum((predicted - observed)**2)

    def loglik_kge(
            self, params: Sequence, observed: Sequence,
            x: Sequence = None) -> Number:
        r'''
        Kling-Gupta efficiency.

        $$
        KGE = 1 - \sqrt{(r - 1)^2 + (\alpha - 1)^2 + (\beta - 1)^2}
        $$

        Parameters
        ----------
        params : Sequence
            One or more model parameters
        observed : Sequence
            The observed values
        x : Sequence or None
            Input driver data

        Returns
        -------
        Number
            The Kling-Gupta efficiency
        '''
        predicted = self.model(params, *x)
        r = np.corrcoef(predicted, observed)[0, 1]
        alpha = np.std(predicted) / np.std(observed)
        beta = np.sum(predicted) / np.sum(observed)
        return 1 - np.sqrt((r - 1)**2 + (alpha - 1)**2 + (beta - 1)**2)

    def perform(self, node, inputs, outputs):
        '''
        The method that is used when calling the Op.

        Parameters
        ----------
        node
        inputs : Sequence
        outputs : Sequence
        '''
        (params,) = inputs
        logl = self._loglik(params, self.observed, self.x)
        outputs[0][0] = np.array(logl) # Output the log-likelihood

Ancestors

  • aesara.graph.op.Op
  • aesara.graph.utils.MetaObject

Class variables

var default_output : Optional[int]
var destroy_map : Dict[int, List[int]]
var itypes : Optional[Sequence[Type]]
var otypes : Optional[Sequence[Type]]
var params_type : Optional[aesara.link.c.params_type.ParamsType]
var view_map : Dict[int, List[int]]

Methods

def loglik(self, params: Sequence[+T_co], observed: Sequence[+T_co], x: Sequence[+T_co] = None) ‑> numbers.Number

Pseudo-log likelihood, based on the root-mean squared deviation (RMSD). The sign of the RMSD is forced to be negative so as to allow for maximization of this objective function.

Parameters

params : Sequence
One or more model parameters
observed : Sequence
The observed values
x : Sequence or None
Input driver data

Returns

Number
The (negative) root-mean squared deviation (RMSD) between the predicted and observed values
Expand source code
def loglik(
        self, params: Sequence, observed: Sequence,
        x: Sequence = None) -> Number:
    '''
    Pseudo-log likelihood, based on the root-mean squared deviation
    (RMSD). The sign of the RMSD is forced to be negative so as to allow
    for maximization of this objective function.

    Parameters
    ----------
    params : Sequence
        One or more model parameters
    observed : Sequence
        The observed values
    x : Sequence or None
        Input driver data

    Returns
    -------
    Number
        The (negative) root-mean squared deviation (RMSD) between the
        predicted and observed values
    '''
    predicted = self.model(params, *x)
    if self.weights is not None:
        return -np.sqrt(
            np.nanmean(((predicted - observed) * self.weights) ** 2))
    return -np.sqrt(np.nanmean(((predicted - observed)) ** 2))
def loglik_gaussian(self, params: Sequence[+T_co], observed: Sequence[+T_co], x: Sequence[+T_co] = None) ‑> numbers.Number

Gaussian log-likelihood, assuming independent, identically distributed observations.

Parameters

params : Sequence
One or more model parameters
observed : Sequence
The observed values
x : Sequence or None
Input driver data

Returns

Number
The (negative) log-likelihood
Expand source code
def loglik_gaussian(
        self, params: Sequence, observed: Sequence,
        x: Sequence = None) -> Number:
    '''
    Gaussian log-likelihood, assuming independent, identically distributed
    observations.

    Parameters
    ----------
    params : Sequence
        One or more model parameters
    observed : Sequence
        The observed values
    x : Sequence or None
        Input driver data

    Returns
    -------
    Number
        The (negative) log-likelihood
    '''
    predicted = self.model(params, *x)
    sigma = params[-1]
    # Gaussian log-likelihood;
    # -\frac{N}{2}\,\mathrm{log}(2\pi\hat{\sigma}^2)
    #   - \frac{1}{2\hat{\sigma}^2} \sum (\hat{y} - y)^2
    return -0.5 * np.log(2 * np.pi * sigma**2) - (0.5 / sigma**2) *\
        np.nansum((predicted - observed)**2)
def loglik_kge(self, params: Sequence[+T_co], observed: Sequence[+T_co], x: Sequence[+T_co] = None) ‑> numbers.Number

Kling-Gupta efficiency.

KGE = 1 - \sqrt{(r - 1)^2 + (\alpha - 1)^2 + (\beta - 1)^2}

Parameters

params : Sequence
One or more model parameters
observed : Sequence
The observed values
x : Sequence or None
Input driver data

Returns

Number
The Kling-Gupta efficiency
Expand source code
def loglik_kge(
        self, params: Sequence, observed: Sequence,
        x: Sequence = None) -> Number:
    r'''
    Kling-Gupta efficiency.

    $$
    KGE = 1 - \sqrt{(r - 1)^2 + (\alpha - 1)^2 + (\beta - 1)^2}
    $$

    Parameters
    ----------
    params : Sequence
        One or more model parameters
    observed : Sequence
        The observed values
    x : Sequence or None
        Input driver data

    Returns
    -------
    Number
        The Kling-Gupta efficiency
    '''
    predicted = self.model(params, *x)
    r = np.corrcoef(predicted, observed)[0, 1]
    alpha = np.std(predicted) / np.std(observed)
    beta = np.sum(predicted) / np.sum(observed)
    return 1 - np.sqrt((r - 1)**2 + (alpha - 1)**2 + (beta - 1)**2)
def perform(self, node, inputs, outputs)

The method that is used when calling the Op.

Parameters

node
 
inputs : Sequence
 
outputs : Sequence
 
Expand source code
def perform(self, node, inputs, outputs):
    '''
    The method that is used when calling the Op.

    Parameters
    ----------
    node
    inputs : Sequence
    outputs : Sequence
    '''
    (params,) = inputs
    logl = self._loglik(params, self.observed, self.x)
    outputs[0][0] = np.array(logl) # Output the log-likelihood
class CalibrationAPI (config=None)

Convenience class for calibrating the MOD17 GPP and NPP models. Meant to be used with fire.Fire().

Expand source code
class CalibrationAPI(object):
    '''
    Convenience class for calibrating the MOD17 GPP and NPP models. Meant to
    be used with `fire.Fire()`.
    '''
    def __init__(self, config = None):
        config_file = config
        if config_file is None:
            config_file = os.path.join(
                MOD17_DIR, 'data/MOD17_calibration_config.json')
        with open(config_file, 'r') as file:
            self.config = json.load(file)
        self.hdf5 = self.config['data']['file']

    def _clean(self, raw: Sequence, drivers: Sequence, protocol: str = 'GPP'):
        'Cleans up data values according to a prescribed protocol'
        if protocol == 'GPP':
            # Filter out observed GPP values when GPP is negative or when
            #   APAR < 0.1 g C m-2 day-1
            apar = drivers['fPAR'] * drivers['PAR']
            return np.where(
                apar < 0.1, np.nan, np.where(raw < 0, np.nan, raw))

    def _filter(self, raw: Sequence, size: int):
        'Apply a smoothing filter with zero phase offset'
        if size > 1:
            window = np.ones(size) / size
            return np.apply_along_axis(
                lambda x: signal.filtfilt(window, np.ones(1), x), 0, raw)
        return raw # Or, revert to the raw data

    def clean_observed(
            self, raw: Sequence, drivers: Sequence, driver_labels: Sequence,
            protocol: str = 'GPP', filter_length: int = 2) -> Sequence:
        '''
        Cleans observed tower flux data according to a prescribed protocol.

        - For GPP data: Removes observations where GPP < 0 or where APAR is
            < 0.1 MJ m-2 day-1

        Parameters
        ----------
        raw : Sequence
        drivers : Sequence
        driver_labels : Sequence
        protocol : str
        filter_length : int
            The window size for the smoothing filter, applied to the observed
            data

        Returns
        -------
        Sequence
        '''
        if protocol != 'GPP':
            raise NotImplementedError('"protocol" must be one of: "GPP"')
        # Read in the observed data and apply smoothing filter
        obs = self._filter(raw, filter_length)
        obs = self._clean(obs, dict([
            (driver_labels[i], data)
            for i, data in enumerate(drivers)
        ]), protocol = 'GPP')
        return obs

    def export_bplut(
            self, model: str, output_path: str, thin: int = 10,
            burn: int = 1000):
        '''
        Export the BPLUT using the posterior mean from the MCMC sampler. NOTE:
        The posterior mean is usually not the best estimate for poorly
        identified parameters.

        Parameters
        ----------
        model : str
            The name of the model ("GPP" or "NPP")
        output_path : str
            The output CSV file path
        thin : int
            Thinning rate
        burn : int
            The burn-in (i.e., first N samples to discard)
        '''
        params_dict = restore_bplut(self.config['BPLUT'][model])
        bplut = params_dict.copy()
        # Filter the parameters to just those for the PFT of interest
        for pft in PFT_VALID:
            backend = self.config['optimization']['backend_template'] %\
                (model, pft)
            params = dict([(k, v[pft]) for k, v in params_dict.items()])
            sampler = MOD17StochasticSampler(
                self.config, getattr(MOD17, '_%s' % model.lower()), params,
                backend = backend)
            trace = sampler.get_trace()
            fit = trace.sel(
                draw = slice(burn, None, thin))['posterior'].mean()
            for each in MOD17.required_parameters:
                try:
                    bplut[each][pft] = float(fit[each])
                except KeyError:
                    continue
        write_bplut(bplut, output_path)

    def export_posterior(
            self, model: str, param: str, output_path: str, thin: int = 10,
            burn: int = 1000, k_folds: int = 1):
        '''
        Exports posterior distribution for a parameter, for each PFT to HDF5.

        Parameters
        ----------
        model : str
            The name of the model ("GPP" or "NPP")
        param : str
            The model parameter to export
        output_path : str
            The output HDF5 file path
        thin : int
            Thinning rate
        burn : int
            The burn-in (i.e., first N samples to discard)
        k_folds : int
            The number of k-folds used in cross-calibration/validation;
            if more than one (default), the folds for each PFT will be
            combined into a single HDF5 file
        '''
        params_dict = restore_bplut(self.config['BPLUT'][model])
        bplut = params_dict.copy()
        # Filter the parameters to just those for the PFT of interest
        post = []
        for pft in PFT_VALID:
            params = dict([(k, v[pft]) for k, v in params_dict.items()])
            backend = self.config['optimization']['backend_template'] %\
                (model, pft)
            post_by_fold = []
            for fold in range(1, k_folds + 1):
                if k_folds > 1:
                    backend = self.config['optimization']['backend_template'] %\
                        (f'{model}-k{fold}', pft)
                sampler = MOD17StochasticSampler(
                    self.config, getattr(MOD17, '_%s' % model.lower()), params,
                    backend = backend)
                trace = sampler.get_trace()
                fit = trace.sel(draw = slice(burn, None, thin))['posterior']
                num_samples = fit.sizes['chain'] * fit.sizes['draw']
                if param in fit:
                    post_by_fold.append(
                        az.extract_dataset(fit, combined = True)[param].values)
                else:
                    # In case there is, e.g., a parameter that takes on a
                    #   constant value for a specific PFT
                    if k_folds > 1:
                        post_by_fold.append(np.ones((1, num_samples)) * np.nan)
                    else:
                        post_by_fold.append(np.ones(num_samples) * np.nan)
            if k_folds > 1:
                post.append(np.vstack(post_by_fold))
            else:
                post.extend(post_by_fold)
        # If not every PFT's posterior has the same number of samples (e.g.,
        #   when one set of chains was run longer than another)...
        if not all([p.shape == post[0].shape for p in post]):
            max_len = max([p.shape for p in post])[0]
            # ...Reshape all posteriors to match the greatest sample size
            import ipdb
            ipdb.set_trace()#FIXME
            post = [
                np.pad(
                    p.astype(np.float32), (0, max_len - p.size),
                    mode = 'constant', constant_values = (np.nan,))
                for p in post
            ]
        with h5py.File(output_path, 'a') as hdf:
            post = np.stack(post)
            ts = datetime.date.today().strftime('%Y-%m-%d') # Today's date
            dataset = hdf.create_dataset(
                f'{param}_posterior', post.shape, np.float32, post)
            dataset.attrs['description'] = 'CalibrationAPI.export_posterior() on {ts}'

    def export_likely_posterior(
            self, model: str, param: str, output_path: str, thin: int = 10,
            burn: int = 1000, ptile: int = 95):
        '''
        Exports posterior distribution for a parameter based on likelihood

        Parameters
        ----------
        model : str
            The name of the model ("GPP" or "NPP")
        param : str
            The model parameter to export
        output_path : str
            The output HDF5 file path
        thin : int
            Thinning rate
        burn : int
            The burn-in (i.e., first N samples to discard)
        ptile : int
            The percentile cutoff for likelihood; only samples at or above
            this likelihood cutoff will be included
        '''
        params_dict = restore_bplut(self.config['BPLUT'][model])
        bplut = params_dict.copy()
        # Filter the parameters to just those for the PFT of interest
        post = []
        likelihood = []
        for pft in PFT_VALID:
            backend = self.config['optimization']['backend_template'] % (model, pft)
            params = dict([(k, v[pft]) for k, v in params_dict.items()])
            sampler = MOD17StochasticSampler(
                self.config, getattr(MOD17, '_%s' % model.lower()), params,
                backend = backend)
            trace = sampler.get_trace()
            fit = trace.sel(draw = slice(burn, None, thin))
            # Find the likelihood value associated with the cutoff percentile
            ll = az.extract_dataset(
                fit, combined = True)['log_likelihood'].values
            values = az.extract_dataset(fit, combined = True)[param].values
            cutoff = np.percentile(ll, ptile)
            post.append(values[ll >= cutoff])
            likelihood.append(ll[ll >= cutoff])
        with h5py.File(output_path, 'a') as hdf:
            n = max([len(p) for p in post])
            # Make sure all arrays are the same size
            post = np.stack([
                np.concatenate((p, np.full((n - len(p),), np.nan)))
                for p in post
            ])
            likelihood = np.stack([
                np.concatenate((p, np.full((n - len(p),), np.nan)))
                for p in likelihood
            ])
            ts = datetime.date.today().strftime('%Y-%m-%d') # Today's date
            dataset = hdf.create_dataset(
                f'{param}_posterior', post.shape, np.float32, post)
            dataset.attrs['description'] = 'CalibrationAPI.export_likely_posterior() on {ts}'
            dataset = hdf.create_dataset(
                f'{param}_likelihood', likelihood.shape, np.float32, likelihood)

    def tune_gpp(
            self, pft: int, plot_trace: bool = False, ipdb: bool = False,
            save_fig: bool = False, **kwargs):
        '''
        Run the MOD17 GPP calibration.

        Parameters
        ----------
        pft : int
            The Plant Functional Type (PFT) to calibrate
        plot_trace : bool
            True to plot the trace for a previous calibration run; this will
            also NOT start a new calibration (Default: False)
        ipdb : bool
            True to drop the user into an ipdb prompt, prior to and instead of
            running calibration
        save_fig : bool
            True to save figures to files instead of showing them
            (Default: False)
        **kwargs
            Additional keyword arguments passed to
            `MOD17StochasticSampler.run()`
        '''
        assert pft in PFT_VALID, f'Invalid PFT: {pft}'
        # Pass configuration parameters to MOD17StochasticSampler.run()
        for key in ('chains', 'draws', 'tune', 'scaling'):
            if key in self.config['optimization'].keys() and not key in kwargs.keys():
                kwargs[key] = self.config['optimization'][key]
        # Filter the parameters to just those for the PFT of interest
        params_dict = restore_bplut(self.config['BPLUT']['GPP'])
        # Load blacklisted sites (if any)
        blacklist = self.config['data']['sites_blacklisted']
        params_dict = dict([(k, v[pft]) for k, v in params_dict.items()])
        model = MOD17(params_dict)
        objective = self.config['optimization']['objective'].lower()

        print('Loading driver datasets...')
        with h5py.File(self.hdf5, 'r') as hdf:
            sites = hdf['FLUXNET/site_id'][:]
            if hasattr(sites[0], 'decode'):
                sites = list(map(lambda x: x.decode('utf-8'), sites))
            # Get dominant PFT
            pft_map = pft_dominant(hdf['state/PFT'][:], site_list = sites)
            # Blacklist various sites
            pft_mask = np.logical_and(pft_map == pft, ~np.in1d(sites, blacklist))
            # NOTE: Converting from Kelvin to Celsius
            tday = hdf['MERRA2/T10M_daytime'][:][:,pft_mask] - 273.15
            qv10m = hdf['MERRA2/QV10M_daytime'][:][:,pft_mask]
            ps = hdf['MERRA2/PS_daytime'][:][:,pft_mask]
            drivers = [ # fPAR, Tmin, VPD, PAR
                hdf['MODIS/MOD15A2HGF_fPAR_interp'][:][:,pft_mask],
                hdf['MERRA2/Tmin'][:][:,pft_mask] - 273.15,
                MOD17.vpd(qv10m, ps, tday),
                MOD17.par(hdf['MERRA2/SWGDN'][:][:,pft_mask]),
            ]
            # Convert fPAR from (%) to [0,1]
            drivers[0] = np.nanmean(drivers[0], axis = -1) / 100
            # If RMSE is used, then we want to pay attention to weighting
            weights = None
            if objective in ('rmsd', 'rmse'):
                weights = hdf['weights'][pft_mask][np.newaxis,:]\
                    .repeat(tday.shape[0], axis = 0)
            # Check that driver data do not contain NaNs
            for d, each in enumerate(drivers):
                name = ('fPAR', 'Tmin', 'VPD', 'PAR')[d]
                assert not np.isnan(each).any(),\
                    f'Driver dataset "{name}" contains NaNs'
            tower_gpp = hdf['FLUXNET/GPP'][:][:,pft_mask]
            # Read the validation mask; mask out observations that are
            #   reserved for validation
            print('Masking out validation data...')
            mask = hdf['FLUXNET/validation_mask'][pft]
            tower_gpp[mask] = np.nan

        # Clean observations, then mask out driver data where the are no
        #   observations
        tower_gpp = self.clean_observed(
            tower_gpp, drivers, MOD17StochasticSampler.required_drivers['GPP'],
            protocol = 'GPP')
        if weights is not None:
            weights = weights[~np.isnan(tower_gpp)]
        for d, _ in enumerate(drivers):
            drivers[d] = drivers[d][~np.isnan(tower_gpp)]
        tower_gpp = tower_gpp[~np.isnan(tower_gpp)]

        print('Initializing sampler...')
        backend = self.config['optimization']['backend_template'] % ('GPP', pft)
        sampler = MOD17StochasticSampler(
            self.config, MOD17._gpp, params_dict, backend = backend,
            weights = weights)
        if plot_trace or ipdb:
            if ipdb:
                import ipdb
                ipdb.set_trace()
            trace = sampler.get_trace()
            az.plot_trace(trace, var_names = MOD17.required_parameters[0:5])
            pyplot.show()
            return
        # Get (informative) priors for just those parameters that have them
        with open(self.config['optimization']['prior'], 'r') as file:
            prior = json.load(file)
        prior_params = filter(
            lambda p: p in prior.keys(), sampler.required_parameters['GPP'])
        prior = dict([
            (p, {'mu': prior[p]['mu'][pft], 'sigma': prior[p]['sigma'][pft]})
            for p in prior_params
        ])
        sampler.run(
            tower_gpp, drivers, prior = prior, save_fig = save_fig, **kwargs)

    def tune_npp(
            self, pft: int, plot_trace: bool = False, ipdb: bool = False,
            save_fig: bool = False, climatology = False,
            cutoff: Number = 2385, k_folds: int = 1, **kwargs):
        r'''
        Run the MOD17 NPP calibration. If k-folds cross-validation is used,
        the model is calibrated on $k$ random subsets of the data and a
        series of file is created, e.g., as:

            MOD17_NPP_calibration_PFT1.h5
            MOD17_NPP-k1_calibration_PFT1.nc4
            MOD17_NPP-k2_calibration_PFT1.nc4
            ...

        Where each `.nc4` file is a standard `arviz` backend and the `.h5`
        indicates which indices from the NPP observations vector, after
        removing NaNs, were excluded (i.e., the indices of the test data).

        Parameters
        ----------
        pft : int
            The Plant Functional Type (PFT) to calibrate
        plot_trace : bool
            True to display the trace plot ONLY and not run calibration
            (Default: False)
        ipdb : bool
            True to drop into an interactive Python debugger (`ipdb`) after
            loading an existing trace (Default: False)
        save_fig : bool
            True to save the post-calibration trace plot to a file instead of
            displaying it (Default: False)
        climatology : bool
            True to use a MERRA-2 climatology (and look for it in the drivers
            file), i.e., use `MERRA2_climatology` group instead of
            `surface_met_MERRA2` group (Default: False)
        cutoff : Number
            Maximum value of observed NPP (g C m-2 year-1); values above this
            cutoff will be discarded and not used in calibration
            (Default: 2385)
        k_folds : int
            Number of folds to use in k-folds cross-validation; defaults to
            k=1, i.e., no cross-validation is performed.
        **kwargs
            Additional keyword arguments passed to
            `MOD17StochasticSampler.run()`
        '''
        assert pft in PFT_VALID, f'Invalid PFT: {pft}'
        prefix = 'MERRA2_climatology' if climatology else 'surface_met_MERRA2'
        params_dict = restore_bplut(self.config['BPLUT']['NPP'])
        # Filter the parameters to just those for the PFT of interest
        params_dict = dict([(k, v[pft]) for k, v in params_dict.items()])
        model = MOD17(params_dict)
        kwargs.update({'var_names': [
            '~LUE_max', '~tmin0', '~tmin1', '~vpd0', '~vpd1', '~log_likelihood'
        ]})
        # Pass configuration parameters to MOD17StochasticSampler.run()
        for key in ('chains', 'draws', 'tune', 'scaling'):
            if key in self.config['optimization'].keys():
                kwargs[key] = self.config['optimization'][key]
        print('Loading driver datasets...')
        with h5py.File(self.hdf5, 'r') as hdf:
            # NOTE: This is only recorded at the site-level; no need to
            #   determine modal PFT across subgrid
            pft_map = hdf['NPP/PFT'][:]
            # Leave out sites where there is no fPAR (and no LAI) data
            fpar = hdf['NPP/MOD15A2H_fPAR_clim_filt'][:]
            mask = np.logical_and(
                    pft_map == pft, ~np.isnan(np.nanmean(fpar, axis = -1))\
                .all(axis = 0))
            # NOTE: Converting from Kelvin to Celsius
            tday = hdf[f'NPP/{prefix}/T10M_daytime'][:][:,mask] - 273.15
            qv10m = hdf[f'NPP/{prefix}/QV10M_daytime'][:][:,mask]
            ps = hdf[f'NPP/{prefix}/PS_daytime'][:][:,mask]
            drivers = [ # fPAR, Tmin, VPD, PAR, LAI, Tmean, years
                hdf['NPP/MOD15A2H_fPAR_clim_filt'][:][:,mask],
                hdf[f'NPP/{prefix}/Tmin'][:][:,mask]  - 273.15,
                MOD17.vpd(qv10m, ps, tday),
                MOD17.par(hdf[f'NPP/{prefix}/SWGDN'][:][:,mask]),
                hdf['NPP/MOD15A2H_LAI_clim_filt'][:][:,mask],
                hdf[f'NPP/{prefix}/T10M'][:][:,mask] - 273.15,
                np.full(ps.shape, 1) # i.e., A 365-day climatological year ("Year 1")
            ]
            observed_npp = hdf['NPP/NPP_total'][:][mask]
        if cutoff is not None:
            observed_npp[observed_npp > cutoff] = np.nan
        # Set negative VPD to zero
        drivers[2] = np.where(drivers[2] < 0, 0, drivers[2])
        # Convert fPAR from (%) to [0,1] and re-scale LAI; reshape fPAR, LAI
        drivers[0] = np.nanmean(drivers[0], axis = -1) * 0.01
        drivers[4] = np.nanmean(drivers[4], axis = -1) * 0.1
        # TODO Mask out driver data where the are no observations
        for d, _ in enumerate(drivers):
            drivers[d] = drivers[d][:,~np.isnan(observed_npp)]
        observed_npp = observed_npp[~np.isnan(observed_npp)]

        if k_folds > 1:
            # Back-up the original (complete) datasets
            _drivers = [d.copy() for d in drivers]
            _observed_npp = observed_npp.copy()
            # Randomize the indices of the NPP data
            indices = np.arange(0, observed_npp.size)
            np.random.shuffle(indices)
            # Get the starting and ending index of each fold
            fold_idx = np.array([indices.size // k_folds] * k_folds) * np.arange(0, k_folds)
            fold_idx = list(map(list, zip(fold_idx, fold_idx + indices.size // k_folds)))
            # Ensure that the entire dataset is used
            fold_idx[-1][-1] = indices.max()
            idx_test = [indices[start:end] for start, end in fold_idx]

        # Loop over each fold (or the entire dataset, if num. folds == 1)
        for k, fold in enumerate(range(1, k_folds + 1)):
            backend = self.config['optimization']['backend_template'] % ('NPP', pft)
            if k_folds > 1 and fold == 1:
                # Create an HDF5 file with the same name as the (original)
                #   netCDF4 back-end, store the test indices
                with h5py.File(backend.replace('nc4', 'h5'), 'w') as hdf:
                    out = list(idx_test)
                    size = indices.size // k_folds
                    try:
                        out = np.stack(out)
                    except ValueError:
                        size = max((o.size for o in out))
                        for i in range(0, len(out)):
                            out[i] = np.concatenate((out[i], [np.nan] * (size - out[i].size)))
                    hdf.create_dataset(
                        'test_indices', (k_folds, size), np.int32, np.stack(out))
                # Restore the original NPP dataset
                observed_npp = _observed_npp.copy()
                # Set to NaN all the test indices
                idx = idx_test[k]
                observed_npp[idx] = np.nan
                # Same for drivers, after restoring from the original
                drivers = [d.copy()[:,~np.isnan(observed_npp)] for d in _drivers]
                observed_npp = observed_npp[~np.isnan(observed_npp)]
            # Use a different naming scheme for the backend
            if k_folds > 1:
                backend = self.config['optimization']['backend_template'] % (f'NPP-k{fold}', pft)

            print('Initializing sampler...')
            sampler = MOD17StochasticSampler(
                self.config, MOD17._npp, params_dict, backend = backend,
                model_name = 'NPP')
            if plot_trace or ipdb:
                if ipdb:
                    import ipdb
                    ipdb.set_trace()
                trace = sampler.get_trace()
                az.plot_trace(
                    trace, var_names = MOD17.required_parameters[5:])
                pyplot.show()
                return
            # Get (informative) priors for just those parameters that have them
            with open(self.config['optimization']['prior'], 'r') as file:
                prior = json.load(file)
            prior_params = filter(
                lambda p: p in prior.keys(), sampler.required_parameters['NPP'])
            prior = dict([
                (p, prior[p]) for p in prior_params
            ])
            for key in prior.keys():
                # And make sure to subset to the chosen PFT!
                for arg in prior[key].keys():
                    prior[key][arg] = prior[key][arg][pft]
            sampler.run(
                observed_npp, drivers, prior = prior, save_fig = save_fig,
                **kwargs)

Subclasses

Methods

def clean_observed(self, raw: Sequence[+T_co], drivers: Sequence[+T_co], driver_labels: Sequence[+T_co], protocol: str = 'GPP', filter_length: int = 2) ‑> Sequence[+T_co]

Cleans observed tower flux data according to a prescribed protocol.

  • For GPP data: Removes observations where GPP < 0 or where APAR is < 0.1 MJ m-2 day-1

Parameters

raw : Sequence
 
drivers : Sequence
 
driver_labels : Sequence
 
protocol : str
 
filter_length : int
The window size for the smoothing filter, applied to the observed data

Returns

Sequence
 
Expand source code
def clean_observed(
        self, raw: Sequence, drivers: Sequence, driver_labels: Sequence,
        protocol: str = 'GPP', filter_length: int = 2) -> Sequence:
    '''
    Cleans observed tower flux data according to a prescribed protocol.

    - For GPP data: Removes observations where GPP < 0 or where APAR is
        < 0.1 MJ m-2 day-1

    Parameters
    ----------
    raw : Sequence
    drivers : Sequence
    driver_labels : Sequence
    protocol : str
    filter_length : int
        The window size for the smoothing filter, applied to the observed
        data

    Returns
    -------
    Sequence
    '''
    if protocol != 'GPP':
        raise NotImplementedError('"protocol" must be one of: "GPP"')
    # Read in the observed data and apply smoothing filter
    obs = self._filter(raw, filter_length)
    obs = self._clean(obs, dict([
        (driver_labels[i], data)
        for i, data in enumerate(drivers)
    ]), protocol = 'GPP')
    return obs
def export_bplut(self, model: str, output_path: str, thin: int = 10, burn: int = 1000)

Export the BPLUT using the posterior mean from the MCMC sampler. NOTE: The posterior mean is usually not the best estimate for poorly identified parameters.

Parameters

model : str
The name of the model ("GPP" or "NPP")
output_path : str
The output CSV file path
thin : int
Thinning rate
burn : int
The burn-in (i.e., first N samples to discard)
Expand source code
def export_bplut(
        self, model: str, output_path: str, thin: int = 10,
        burn: int = 1000):
    '''
    Export the BPLUT using the posterior mean from the MCMC sampler. NOTE:
    The posterior mean is usually not the best estimate for poorly
    identified parameters.

    Parameters
    ----------
    model : str
        The name of the model ("GPP" or "NPP")
    output_path : str
        The output CSV file path
    thin : int
        Thinning rate
    burn : int
        The burn-in (i.e., first N samples to discard)
    '''
    params_dict = restore_bplut(self.config['BPLUT'][model])
    bplut = params_dict.copy()
    # Filter the parameters to just those for the PFT of interest
    for pft in PFT_VALID:
        backend = self.config['optimization']['backend_template'] %\
            (model, pft)
        params = dict([(k, v[pft]) for k, v in params_dict.items()])
        sampler = MOD17StochasticSampler(
            self.config, getattr(MOD17, '_%s' % model.lower()), params,
            backend = backend)
        trace = sampler.get_trace()
        fit = trace.sel(
            draw = slice(burn, None, thin))['posterior'].mean()
        for each in MOD17.required_parameters:
            try:
                bplut[each][pft] = float(fit[each])
            except KeyError:
                continue
    write_bplut(bplut, output_path)
def export_likely_posterior(self, model: str, param: str, output_path: str, thin: int = 10, burn: int = 1000, ptile: int = 95)

Exports posterior distribution for a parameter based on likelihood

Parameters

model : str
The name of the model ("GPP" or "NPP")
param : str
The model parameter to export
output_path : str
The output HDF5 file path
thin : int
Thinning rate
burn : int
The burn-in (i.e., first N samples to discard)
ptile : int
The percentile cutoff for likelihood; only samples at or above this likelihood cutoff will be included
Expand source code
def export_likely_posterior(
        self, model: str, param: str, output_path: str, thin: int = 10,
        burn: int = 1000, ptile: int = 95):
    '''
    Exports posterior distribution for a parameter based on likelihood

    Parameters
    ----------
    model : str
        The name of the model ("GPP" or "NPP")
    param : str
        The model parameter to export
    output_path : str
        The output HDF5 file path
    thin : int
        Thinning rate
    burn : int
        The burn-in (i.e., first N samples to discard)
    ptile : int
        The percentile cutoff for likelihood; only samples at or above
        this likelihood cutoff will be included
    '''
    params_dict = restore_bplut(self.config['BPLUT'][model])
    bplut = params_dict.copy()
    # Filter the parameters to just those for the PFT of interest
    post = []
    likelihood = []
    for pft in PFT_VALID:
        backend = self.config['optimization']['backend_template'] % (model, pft)
        params = dict([(k, v[pft]) for k, v in params_dict.items()])
        sampler = MOD17StochasticSampler(
            self.config, getattr(MOD17, '_%s' % model.lower()), params,
            backend = backend)
        trace = sampler.get_trace()
        fit = trace.sel(draw = slice(burn, None, thin))
        # Find the likelihood value associated with the cutoff percentile
        ll = az.extract_dataset(
            fit, combined = True)['log_likelihood'].values
        values = az.extract_dataset(fit, combined = True)[param].values
        cutoff = np.percentile(ll, ptile)
        post.append(values[ll >= cutoff])
        likelihood.append(ll[ll >= cutoff])
    with h5py.File(output_path, 'a') as hdf:
        n = max([len(p) for p in post])
        # Make sure all arrays are the same size
        post = np.stack([
            np.concatenate((p, np.full((n - len(p),), np.nan)))
            for p in post
        ])
        likelihood = np.stack([
            np.concatenate((p, np.full((n - len(p),), np.nan)))
            for p in likelihood
        ])
        ts = datetime.date.today().strftime('%Y-%m-%d') # Today's date
        dataset = hdf.create_dataset(
            f'{param}_posterior', post.shape, np.float32, post)
        dataset.attrs['description'] = 'CalibrationAPI.export_likely_posterior() on {ts}'
        dataset = hdf.create_dataset(
            f'{param}_likelihood', likelihood.shape, np.float32, likelihood)
def export_posterior(self, model: str, param: str, output_path: str, thin: int = 10, burn: int = 1000, k_folds: int = 1)

Exports posterior distribution for a parameter, for each PFT to HDF5.

Parameters

model : str
The name of the model ("GPP" or "NPP")
param : str
The model parameter to export
output_path : str
The output HDF5 file path
thin : int
Thinning rate
burn : int
The burn-in (i.e., first N samples to discard)
k_folds : int
The number of k-folds used in cross-calibration/validation; if more than one (default), the folds for each PFT will be combined into a single HDF5 file
Expand source code
def export_posterior(
        self, model: str, param: str, output_path: str, thin: int = 10,
        burn: int = 1000, k_folds: int = 1):
    '''
    Exports posterior distribution for a parameter, for each PFT to HDF5.

    Parameters
    ----------
    model : str
        The name of the model ("GPP" or "NPP")
    param : str
        The model parameter to export
    output_path : str
        The output HDF5 file path
    thin : int
        Thinning rate
    burn : int
        The burn-in (i.e., first N samples to discard)
    k_folds : int
        The number of k-folds used in cross-calibration/validation;
        if more than one (default), the folds for each PFT will be
        combined into a single HDF5 file
    '''
    params_dict = restore_bplut(self.config['BPLUT'][model])
    bplut = params_dict.copy()
    # Filter the parameters to just those for the PFT of interest
    post = []
    for pft in PFT_VALID:
        params = dict([(k, v[pft]) for k, v in params_dict.items()])
        backend = self.config['optimization']['backend_template'] %\
            (model, pft)
        post_by_fold = []
        for fold in range(1, k_folds + 1):
            if k_folds > 1:
                backend = self.config['optimization']['backend_template'] %\
                    (f'{model}-k{fold}', pft)
            sampler = MOD17StochasticSampler(
                self.config, getattr(MOD17, '_%s' % model.lower()), params,
                backend = backend)
            trace = sampler.get_trace()
            fit = trace.sel(draw = slice(burn, None, thin))['posterior']
            num_samples = fit.sizes['chain'] * fit.sizes['draw']
            if param in fit:
                post_by_fold.append(
                    az.extract_dataset(fit, combined = True)[param].values)
            else:
                # In case there is, e.g., a parameter that takes on a
                #   constant value for a specific PFT
                if k_folds > 1:
                    post_by_fold.append(np.ones((1, num_samples)) * np.nan)
                else:
                    post_by_fold.append(np.ones(num_samples) * np.nan)
        if k_folds > 1:
            post.append(np.vstack(post_by_fold))
        else:
            post.extend(post_by_fold)
    # If not every PFT's posterior has the same number of samples (e.g.,
    #   when one set of chains was run longer than another)...
    if not all([p.shape == post[0].shape for p in post]):
        max_len = max([p.shape for p in post])[0]
        # ...Reshape all posteriors to match the greatest sample size
        import ipdb
        ipdb.set_trace()#FIXME
        post = [
            np.pad(
                p.astype(np.float32), (0, max_len - p.size),
                mode = 'constant', constant_values = (np.nan,))
            for p in post
        ]
    with h5py.File(output_path, 'a') as hdf:
        post = np.stack(post)
        ts = datetime.date.today().strftime('%Y-%m-%d') # Today's date
        dataset = hdf.create_dataset(
            f'{param}_posterior', post.shape, np.float32, post)
        dataset.attrs['description'] = 'CalibrationAPI.export_posterior() on {ts}'
def tune_gpp(self, pft: int, plot_trace: bool = False, ipdb: bool = False, save_fig: bool = False, **kwargs)

Run the MOD17 GPP calibration.

Parameters

pft : int
The Plant Functional Type (PFT) to calibrate
plot_trace : bool
True to plot the trace for a previous calibration run; this will also NOT start a new calibration (Default: False)
ipdb : bool
True to drop the user into an ipdb prompt, prior to and instead of running calibration
save_fig : bool
True to save figures to files instead of showing them (Default: False)
**kwargs
Additional keyword arguments passed to StochasticSampler.run()
Expand source code
def tune_gpp(
        self, pft: int, plot_trace: bool = False, ipdb: bool = False,
        save_fig: bool = False, **kwargs):
    '''
    Run the MOD17 GPP calibration.

    Parameters
    ----------
    pft : int
        The Plant Functional Type (PFT) to calibrate
    plot_trace : bool
        True to plot the trace for a previous calibration run; this will
        also NOT start a new calibration (Default: False)
    ipdb : bool
        True to drop the user into an ipdb prompt, prior to and instead of
        running calibration
    save_fig : bool
        True to save figures to files instead of showing them
        (Default: False)
    **kwargs
        Additional keyword arguments passed to
        `MOD17StochasticSampler.run()`
    '''
    assert pft in PFT_VALID, f'Invalid PFT: {pft}'
    # Pass configuration parameters to MOD17StochasticSampler.run()
    for key in ('chains', 'draws', 'tune', 'scaling'):
        if key in self.config['optimization'].keys() and not key in kwargs.keys():
            kwargs[key] = self.config['optimization'][key]
    # Filter the parameters to just those for the PFT of interest
    params_dict = restore_bplut(self.config['BPLUT']['GPP'])
    # Load blacklisted sites (if any)
    blacklist = self.config['data']['sites_blacklisted']
    params_dict = dict([(k, v[pft]) for k, v in params_dict.items()])
    model = MOD17(params_dict)
    objective = self.config['optimization']['objective'].lower()

    print('Loading driver datasets...')
    with h5py.File(self.hdf5, 'r') as hdf:
        sites = hdf['FLUXNET/site_id'][:]
        if hasattr(sites[0], 'decode'):
            sites = list(map(lambda x: x.decode('utf-8'), sites))
        # Get dominant PFT
        pft_map = pft_dominant(hdf['state/PFT'][:], site_list = sites)
        # Blacklist various sites
        pft_mask = np.logical_and(pft_map == pft, ~np.in1d(sites, blacklist))
        # NOTE: Converting from Kelvin to Celsius
        tday = hdf['MERRA2/T10M_daytime'][:][:,pft_mask] - 273.15
        qv10m = hdf['MERRA2/QV10M_daytime'][:][:,pft_mask]
        ps = hdf['MERRA2/PS_daytime'][:][:,pft_mask]
        drivers = [ # fPAR, Tmin, VPD, PAR
            hdf['MODIS/MOD15A2HGF_fPAR_interp'][:][:,pft_mask],
            hdf['MERRA2/Tmin'][:][:,pft_mask] - 273.15,
            MOD17.vpd(qv10m, ps, tday),
            MOD17.par(hdf['MERRA2/SWGDN'][:][:,pft_mask]),
        ]
        # Convert fPAR from (%) to [0,1]
        drivers[0] = np.nanmean(drivers[0], axis = -1) / 100
        # If RMSE is used, then we want to pay attention to weighting
        weights = None
        if objective in ('rmsd', 'rmse'):
            weights = hdf['weights'][pft_mask][np.newaxis,:]\
                .repeat(tday.shape[0], axis = 0)
        # Check that driver data do not contain NaNs
        for d, each in enumerate(drivers):
            name = ('fPAR', 'Tmin', 'VPD', 'PAR')[d]
            assert not np.isnan(each).any(),\
                f'Driver dataset "{name}" contains NaNs'
        tower_gpp = hdf['FLUXNET/GPP'][:][:,pft_mask]
        # Read the validation mask; mask out observations that are
        #   reserved for validation
        print('Masking out validation data...')
        mask = hdf['FLUXNET/validation_mask'][pft]
        tower_gpp[mask] = np.nan

    # Clean observations, then mask out driver data where the are no
    #   observations
    tower_gpp = self.clean_observed(
        tower_gpp, drivers, MOD17StochasticSampler.required_drivers['GPP'],
        protocol = 'GPP')
    if weights is not None:
        weights = weights[~np.isnan(tower_gpp)]
    for d, _ in enumerate(drivers):
        drivers[d] = drivers[d][~np.isnan(tower_gpp)]
    tower_gpp = tower_gpp[~np.isnan(tower_gpp)]

    print('Initializing sampler...')
    backend = self.config['optimization']['backend_template'] % ('GPP', pft)
    sampler = MOD17StochasticSampler(
        self.config, MOD17._gpp, params_dict, backend = backend,
        weights = weights)
    if plot_trace or ipdb:
        if ipdb:
            import ipdb
            ipdb.set_trace()
        trace = sampler.get_trace()
        az.plot_trace(trace, var_names = MOD17.required_parameters[0:5])
        pyplot.show()
        return
    # Get (informative) priors for just those parameters that have them
    with open(self.config['optimization']['prior'], 'r') as file:
        prior = json.load(file)
    prior_params = filter(
        lambda p: p in prior.keys(), sampler.required_parameters['GPP'])
    prior = dict([
        (p, {'mu': prior[p]['mu'][pft], 'sigma': prior[p]['sigma'][pft]})
        for p in prior_params
    ])
    sampler.run(
        tower_gpp, drivers, prior = prior, save_fig = save_fig, **kwargs)
def tune_npp(self, pft: int, plot_trace: bool = False, ipdb: bool = False, save_fig: bool = False, climatology=False, cutoff: numbers.Number = 2385, k_folds: int = 1, **kwargs)

Run the MOD17 NPP calibration. If k-folds cross-validation is used, the model is calibrated on $k$ random subsets of the data and a series of file is created, e.g., as:

MOD17_NPP_calibration_PFT1.h5
MOD17_NPP-k1_calibration_PFT1.nc4
MOD17_NPP-k2_calibration_PFT1.nc4
...

Where each .nc4 file is a standard arviz backend and the .h5 indicates which indices from the NPP observations vector, after removing NaNs, were excluded (i.e., the indices of the test data).

Parameters

pft : int
The Plant Functional Type (PFT) to calibrate
plot_trace : bool
True to display the trace plot ONLY and not run calibration (Default: False)
ipdb : bool
True to drop into an interactive Python debugger (ipdb) after loading an existing trace (Default: False)
save_fig : bool
True to save the post-calibration trace plot to a file instead of displaying it (Default: False)
climatology : bool
True to use a MERRA-2 climatology (and look for it in the drivers file), i.e., use MERRA2_climatology group instead of surface_met_MERRA2 group (Default: False)
cutoff : Number
Maximum value of observed NPP (g C m-2 year-1); values above this cutoff will be discarded and not used in calibration (Default: 2385)
k_folds : int
Number of folds to use in k-folds cross-validation; defaults to k=1, i.e., no cross-validation is performed.
**kwargs
Additional keyword arguments passed to StochasticSampler.run()
Expand source code
def tune_npp(
        self, pft: int, plot_trace: bool = False, ipdb: bool = False,
        save_fig: bool = False, climatology = False,
        cutoff: Number = 2385, k_folds: int = 1, **kwargs):
    r'''
    Run the MOD17 NPP calibration. If k-folds cross-validation is used,
    the model is calibrated on $k$ random subsets of the data and a
    series of file is created, e.g., as:

        MOD17_NPP_calibration_PFT1.h5
        MOD17_NPP-k1_calibration_PFT1.nc4
        MOD17_NPP-k2_calibration_PFT1.nc4
        ...

    Where each `.nc4` file is a standard `arviz` backend and the `.h5`
    indicates which indices from the NPP observations vector, after
    removing NaNs, were excluded (i.e., the indices of the test data).

    Parameters
    ----------
    pft : int
        The Plant Functional Type (PFT) to calibrate
    plot_trace : bool
        True to display the trace plot ONLY and not run calibration
        (Default: False)
    ipdb : bool
        True to drop into an interactive Python debugger (`ipdb`) after
        loading an existing trace (Default: False)
    save_fig : bool
        True to save the post-calibration trace plot to a file instead of
        displaying it (Default: False)
    climatology : bool
        True to use a MERRA-2 climatology (and look for it in the drivers
        file), i.e., use `MERRA2_climatology` group instead of
        `surface_met_MERRA2` group (Default: False)
    cutoff : Number
        Maximum value of observed NPP (g C m-2 year-1); values above this
        cutoff will be discarded and not used in calibration
        (Default: 2385)
    k_folds : int
        Number of folds to use in k-folds cross-validation; defaults to
        k=1, i.e., no cross-validation is performed.
    **kwargs
        Additional keyword arguments passed to
        `MOD17StochasticSampler.run()`
    '''
    assert pft in PFT_VALID, f'Invalid PFT: {pft}'
    prefix = 'MERRA2_climatology' if climatology else 'surface_met_MERRA2'
    params_dict = restore_bplut(self.config['BPLUT']['NPP'])
    # Filter the parameters to just those for the PFT of interest
    params_dict = dict([(k, v[pft]) for k, v in params_dict.items()])
    model = MOD17(params_dict)
    kwargs.update({'var_names': [
        '~LUE_max', '~tmin0', '~tmin1', '~vpd0', '~vpd1', '~log_likelihood'
    ]})
    # Pass configuration parameters to MOD17StochasticSampler.run()
    for key in ('chains', 'draws', 'tune', 'scaling'):
        if key in self.config['optimization'].keys():
            kwargs[key] = self.config['optimization'][key]
    print('Loading driver datasets...')
    with h5py.File(self.hdf5, 'r') as hdf:
        # NOTE: This is only recorded at the site-level; no need to
        #   determine modal PFT across subgrid
        pft_map = hdf['NPP/PFT'][:]
        # Leave out sites where there is no fPAR (and no LAI) data
        fpar = hdf['NPP/MOD15A2H_fPAR_clim_filt'][:]
        mask = np.logical_and(
                pft_map == pft, ~np.isnan(np.nanmean(fpar, axis = -1))\
            .all(axis = 0))
        # NOTE: Converting from Kelvin to Celsius
        tday = hdf[f'NPP/{prefix}/T10M_daytime'][:][:,mask] - 273.15
        qv10m = hdf[f'NPP/{prefix}/QV10M_daytime'][:][:,mask]
        ps = hdf[f'NPP/{prefix}/PS_daytime'][:][:,mask]
        drivers = [ # fPAR, Tmin, VPD, PAR, LAI, Tmean, years
            hdf['NPP/MOD15A2H_fPAR_clim_filt'][:][:,mask],
            hdf[f'NPP/{prefix}/Tmin'][:][:,mask]  - 273.15,
            MOD17.vpd(qv10m, ps, tday),
            MOD17.par(hdf[f'NPP/{prefix}/SWGDN'][:][:,mask]),
            hdf['NPP/MOD15A2H_LAI_clim_filt'][:][:,mask],
            hdf[f'NPP/{prefix}/T10M'][:][:,mask] - 273.15,
            np.full(ps.shape, 1) # i.e., A 365-day climatological year ("Year 1")
        ]
        observed_npp = hdf['NPP/NPP_total'][:][mask]
    if cutoff is not None:
        observed_npp[observed_npp > cutoff] = np.nan
    # Set negative VPD to zero
    drivers[2] = np.where(drivers[2] < 0, 0, drivers[2])
    # Convert fPAR from (%) to [0,1] and re-scale LAI; reshape fPAR, LAI
    drivers[0] = np.nanmean(drivers[0], axis = -1) * 0.01
    drivers[4] = np.nanmean(drivers[4], axis = -1) * 0.1
    # TODO Mask out driver data where the are no observations
    for d, _ in enumerate(drivers):
        drivers[d] = drivers[d][:,~np.isnan(observed_npp)]
    observed_npp = observed_npp[~np.isnan(observed_npp)]

    if k_folds > 1:
        # Back-up the original (complete) datasets
        _drivers = [d.copy() for d in drivers]
        _observed_npp = observed_npp.copy()
        # Randomize the indices of the NPP data
        indices = np.arange(0, observed_npp.size)
        np.random.shuffle(indices)
        # Get the starting and ending index of each fold
        fold_idx = np.array([indices.size // k_folds] * k_folds) * np.arange(0, k_folds)
        fold_idx = list(map(list, zip(fold_idx, fold_idx + indices.size // k_folds)))
        # Ensure that the entire dataset is used
        fold_idx[-1][-1] = indices.max()
        idx_test = [indices[start:end] for start, end in fold_idx]

    # Loop over each fold (or the entire dataset, if num. folds == 1)
    for k, fold in enumerate(range(1, k_folds + 1)):
        backend = self.config['optimization']['backend_template'] % ('NPP', pft)
        if k_folds > 1 and fold == 1:
            # Create an HDF5 file with the same name as the (original)
            #   netCDF4 back-end, store the test indices
            with h5py.File(backend.replace('nc4', 'h5'), 'w') as hdf:
                out = list(idx_test)
                size = indices.size // k_folds
                try:
                    out = np.stack(out)
                except ValueError:
                    size = max((o.size for o in out))
                    for i in range(0, len(out)):
                        out[i] = np.concatenate((out[i], [np.nan] * (size - out[i].size)))
                hdf.create_dataset(
                    'test_indices', (k_folds, size), np.int32, np.stack(out))
            # Restore the original NPP dataset
            observed_npp = _observed_npp.copy()
            # Set to NaN all the test indices
            idx = idx_test[k]
            observed_npp[idx] = np.nan
            # Same for drivers, after restoring from the original
            drivers = [d.copy()[:,~np.isnan(observed_npp)] for d in _drivers]
            observed_npp = observed_npp[~np.isnan(observed_npp)]
        # Use a different naming scheme for the backend
        if k_folds > 1:
            backend = self.config['optimization']['backend_template'] % (f'NPP-k{fold}', pft)

        print('Initializing sampler...')
        sampler = MOD17StochasticSampler(
            self.config, MOD17._npp, params_dict, backend = backend,
            model_name = 'NPP')
        if plot_trace or ipdb:
            if ipdb:
                import ipdb
                ipdb.set_trace()
            trace = sampler.get_trace()
            az.plot_trace(
                trace, var_names = MOD17.required_parameters[5:])
            pyplot.show()
            return
        # Get (informative) priors for just those parameters that have them
        with open(self.config['optimization']['prior'], 'r') as file:
            prior = json.load(file)
        prior_params = filter(
            lambda p: p in prior.keys(), sampler.required_parameters['NPP'])
        prior = dict([
            (p, prior[p]) for p in prior_params
        ])
        for key in prior.keys():
            # And make sure to subset to the chosen PFT!
            for arg in prior[key].keys():
                prior[key][arg] = prior[key][arg][pft]
        sampler.run(
            observed_npp, drivers, prior = prior, save_fig = save_fig,
            **kwargs)
class MOD17StochasticSampler (config: dict, model: Callable, params_dict: dict = None, backend: str = None, weights: Sequence[+T_co] = None, model_name: str = None)

A Markov Chain-Monte Carlo (MCMC) sampler for MOD17. The specific sampler used is the Differential Evolution (DE) MCMC algorithm described by Ter Braak (2008), though the implementation is specific to the PyMC3 library.

Considerations:

  1. Tower GPP is censored when values are < 0 or when APAR is < 0.1 MJ m-2 d-1.

Parameters

config : dict
Dictionary of configuration parameters
model : Callable
The function to call (with driver data and parameters); this function should take driver data as positional arguments and the model parameters as a *Sequence; it should require no external state.
observed : Sequence
Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values
params_dict : dict or None
Dictionary of model parameters, to be used as initial values and as the basis for constructing a new dictionary of optimized parameters
backend : str or None
Path to a NetCDF4 file backend (Default: None)
weights : Sequence or None
Optional sequence of weights applied to the model residuals (as in weighted least squares)
Expand source code
class MOD17StochasticSampler(StochasticSampler):
    '''
    A Markov Chain-Monte Carlo (MCMC) sampler for MOD17. The specific sampler
    used is the Differential Evolution (DE) MCMC algorithm described by
    Ter Braak (2008), though the implementation is specific to the PyMC3
    library.

    Considerations:

    1. Tower GPP is censored when values are < 0 or when APAR is
        < 0.1 MJ m-2 d-1.

    Parameters
    ----------
    config : dict
        Dictionary of configuration parameters
    model : Callable
        The function to call (with driver data and parameters); this function
        should take driver data as positional arguments and the model
        parameters as a `*Sequence`; it should require no external state.
    observed : Sequence
        Sequence of observed values that will be used to calibrate the model;
        i.e., model is scored by how close its predicted values are to the
        observed values
    params_dict : dict or None
        Dictionary of model parameters, to be used as initial values and as
        the basis for constructing a new dictionary of optimized parameters
    backend : str or None
        Path to a NetCDF4 file backend (Default: None)
    weights : Sequence or None
        Optional sequence of weights applied to the model residuals (as in
        weighted least squares)
    '''
    # NOTE: This is different than for mod17.MOD17 because we haven't yet
    #   figured out how the respiration terms are calculated
    required_parameters = {
        'GPP': ['LUE_max', 'tmin0', 'tmin1', 'vpd0', 'vpd1'],
        'NPP': MOD17.required_parameters
    }
    required_drivers = {
        'GPP': ['fPAR', 'Tmin', 'VPD', 'PAR'],
        'NPP': ['fPAR', 'Tmin', 'VPD', 'PAR', 'LAI', 'Tmean', 'years']
    }

    def compile_gpp_model(
            self, observed: Sequence, drivers: Sequence) -> pm.Model:
        '''
        Creates a new GPP model based on the prior distribution. Model can be
        re-compiled multiple times, e.g., for cross validation.

        Parameters
        ----------
        observed : Sequence
            Sequence of observed values that will be used to calibrate the model;
            i.e., model is scored by how close its predicted values are to the
            observed values
        drivers : list or tuple
            Sequence of driver datasets to be supplied, in order, to the
            model's run function

        Returns
        -------
        pm.Model
        '''
        # Define the objective/ likelihood function
        log_likelihood = BlackBoxLikelihood(
            self.model, observed, x = drivers, weights = self.weights,
            objective = self.config['optimization']['objective'].lower())
        # With this context manager, "all PyMC3 objects introduced in the indented
        #   code block...are added to the model behind the scenes."
        with pm.Model() as model:
            # (Stochstic) Priors for unknown model parameters
            LUE_max = pm.TruncatedNormal('LUE_max',
                **self.prior['LUE_max'], **self.bounds['LUE_max'])
            # NOTE: tmin0, vpd0 are fixed at Collection 6.1 values
            tmin0 = self.params['tmin0']
            tmin1 = pm.Uniform('tmin1', **self.bounds['tmin1'])
            # NOTE: Upper bound on `vpd1` is set by the maximum observed VPD
            vpd0 = self.params['vpd0']
            vpd1 = pm.Uniform('vpd1',
                lower = self.bounds['vpd1']['lower'],
                upper = drivers[2].max().round(0))
            # Convert model parameters to a tensor vector
            params_list = [LUE_max, tmin0, tmin1, vpd0, vpd1]
            params = at.as_tensor_variable(params_list)
            # Key step: Define the log-likelihood as an added potential
            pm.Potential('likelihood', log_likelihood(params))
        return model

    def compile_npp_model(
            self, observed: Sequence, drivers: Sequence) -> pm.Model:
        '''
        Creates a new NPP model based on the prior distribution. Model can be
        re-compiled multiple times, e.g., for cross validation.

        Parameters
        ----------
        observed : Sequence
            Sequence of observed values that will be used to calibrate the model;
            i.e., model is scored by how close its predicted values are to the
            observed values
        drivers : list or tuple
            Sequence of driver datasets to be supplied, in order, to the
            model's run function

        Returns
        -------
        pm.Model
        '''
        # Define the objective/ likelihood function
        log_likelihood = BlackBoxLikelihood(
            self.model, observed, x = drivers, weights = self.weights,
            objective = self.config['optimization']['objective'].lower())
        # With this context manager, "all PyMC3 objects introduced in the indented
        #   code block...are added to the model behind the scenes."
        with pm.Model() as model:
            # Setting GPP parameters that are known--EXCEPT tmin1
            LUE_max = self.params['LUE_max']
            tmin0   = self.params['tmin0']
            tmin1   = self.params['tmin1']
            vpd0    = self.params['vpd0']
            vpd1    = self.params['vpd1']
            # SLA fixed at prior mean
            SLA = np.exp(self.prior['SLA']['mu'])
            # Allometry ratios prescribe narrow range around Collection 6.1 values
            froot_leaf_ratio = pm.Triangular(
                'froot_leaf_ratio', **self.prior['froot_leaf_ratio'])
            # (Stochstic) Priors for unknown model parameters
            Q10_froot = pm.TruncatedNormal(
                'Q10_froot', **self.prior['Q10_froot'], **self.bounds['Q10'])
            leaf_mr_base = pm.LogNormal(
                'leaf_mr_base', **self.prior['leaf_mr_base'])
            froot_mr_base = pm.LogNormal(
                'froot_mr_base', **self.prior['froot_mr_base'])
            # For GRS and CRO, livewood mass and respiration are zero
            if np.equal(list(self.prior['livewood_mr_base'].values()), [0, 0]).all():
                livewood_leaf_ratio = 0
                livewood_mr_base = 0
                Q10_livewood = 0
            else:
                livewood_leaf_ratio = pm.Triangular(
                    'livewood_leaf_ratio', **self.prior['livewood_leaf_ratio'])
                livewood_mr_base = pm.LogNormal(
                    'livewood_mr_base', **self.prior['livewood_mr_base'])
                Q10_livewood = pm.TruncatedNormal(
                    'Q10_livewood', **self.prior['Q10_livewood'],
                    **self.bounds['Q10'])
            # Convert model parameters to a tensor vector
            params_list = [
                LUE_max, tmin0, tmin1, vpd0, vpd1, SLA,
                Q10_livewood, Q10_froot, froot_leaf_ratio, livewood_leaf_ratio,
                leaf_mr_base, froot_mr_base, livewood_mr_base
            ]
            params = at.as_tensor_variable(params_list)
            # Key step: Define the log-likelihood as an added potential
            pm.Potential('likelihood', log_likelihood(params))
        return model

Ancestors

Subclasses

Class variables

var required_drivers
var required_parameters

Methods

def compile_gpp_model(self, observed: Sequence[+T_co], drivers: Sequence[+T_co]) ‑> pymc.model.Model

Creates a new GPP model based on the prior distribution. Model can be re-compiled multiple times, e.g., for cross validation.

Parameters

observed : Sequence
Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values
drivers : list or tuple
Sequence of driver datasets to be supplied, in order, to the model's run function

Returns

pm.Model
 
Expand source code
def compile_gpp_model(
        self, observed: Sequence, drivers: Sequence) -> pm.Model:
    '''
    Creates a new GPP model based on the prior distribution. Model can be
    re-compiled multiple times, e.g., for cross validation.

    Parameters
    ----------
    observed : Sequence
        Sequence of observed values that will be used to calibrate the model;
        i.e., model is scored by how close its predicted values are to the
        observed values
    drivers : list or tuple
        Sequence of driver datasets to be supplied, in order, to the
        model's run function

    Returns
    -------
    pm.Model
    '''
    # Define the objective/ likelihood function
    log_likelihood = BlackBoxLikelihood(
        self.model, observed, x = drivers, weights = self.weights,
        objective = self.config['optimization']['objective'].lower())
    # With this context manager, "all PyMC3 objects introduced in the indented
    #   code block...are added to the model behind the scenes."
    with pm.Model() as model:
        # (Stochstic) Priors for unknown model parameters
        LUE_max = pm.TruncatedNormal('LUE_max',
            **self.prior['LUE_max'], **self.bounds['LUE_max'])
        # NOTE: tmin0, vpd0 are fixed at Collection 6.1 values
        tmin0 = self.params['tmin0']
        tmin1 = pm.Uniform('tmin1', **self.bounds['tmin1'])
        # NOTE: Upper bound on `vpd1` is set by the maximum observed VPD
        vpd0 = self.params['vpd0']
        vpd1 = pm.Uniform('vpd1',
            lower = self.bounds['vpd1']['lower'],
            upper = drivers[2].max().round(0))
        # Convert model parameters to a tensor vector
        params_list = [LUE_max, tmin0, tmin1, vpd0, vpd1]
        params = at.as_tensor_variable(params_list)
        # Key step: Define the log-likelihood as an added potential
        pm.Potential('likelihood', log_likelihood(params))
    return model
def compile_npp_model(self, observed: Sequence[+T_co], drivers: Sequence[+T_co]) ‑> pymc.model.Model

Creates a new NPP model based on the prior distribution. Model can be re-compiled multiple times, e.g., for cross validation.

Parameters

observed : Sequence
Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values
drivers : list or tuple
Sequence of driver datasets to be supplied, in order, to the model's run function

Returns

pm.Model
 
Expand source code
def compile_npp_model(
        self, observed: Sequence, drivers: Sequence) -> pm.Model:
    '''
    Creates a new NPP model based on the prior distribution. Model can be
    re-compiled multiple times, e.g., for cross validation.

    Parameters
    ----------
    observed : Sequence
        Sequence of observed values that will be used to calibrate the model;
        i.e., model is scored by how close its predicted values are to the
        observed values
    drivers : list or tuple
        Sequence of driver datasets to be supplied, in order, to the
        model's run function

    Returns
    -------
    pm.Model
    '''
    # Define the objective/ likelihood function
    log_likelihood = BlackBoxLikelihood(
        self.model, observed, x = drivers, weights = self.weights,
        objective = self.config['optimization']['objective'].lower())
    # With this context manager, "all PyMC3 objects introduced in the indented
    #   code block...are added to the model behind the scenes."
    with pm.Model() as model:
        # Setting GPP parameters that are known--EXCEPT tmin1
        LUE_max = self.params['LUE_max']
        tmin0   = self.params['tmin0']
        tmin1   = self.params['tmin1']
        vpd0    = self.params['vpd0']
        vpd1    = self.params['vpd1']
        # SLA fixed at prior mean
        SLA = np.exp(self.prior['SLA']['mu'])
        # Allometry ratios prescribe narrow range around Collection 6.1 values
        froot_leaf_ratio = pm.Triangular(
            'froot_leaf_ratio', **self.prior['froot_leaf_ratio'])
        # (Stochstic) Priors for unknown model parameters
        Q10_froot = pm.TruncatedNormal(
            'Q10_froot', **self.prior['Q10_froot'], **self.bounds['Q10'])
        leaf_mr_base = pm.LogNormal(
            'leaf_mr_base', **self.prior['leaf_mr_base'])
        froot_mr_base = pm.LogNormal(
            'froot_mr_base', **self.prior['froot_mr_base'])
        # For GRS and CRO, livewood mass and respiration are zero
        if np.equal(list(self.prior['livewood_mr_base'].values()), [0, 0]).all():
            livewood_leaf_ratio = 0
            livewood_mr_base = 0
            Q10_livewood = 0
        else:
            livewood_leaf_ratio = pm.Triangular(
                'livewood_leaf_ratio', **self.prior['livewood_leaf_ratio'])
            livewood_mr_base = pm.LogNormal(
                'livewood_mr_base', **self.prior['livewood_mr_base'])
            Q10_livewood = pm.TruncatedNormal(
                'Q10_livewood', **self.prior['Q10_livewood'],
                **self.bounds['Q10'])
        # Convert model parameters to a tensor vector
        params_list = [
            LUE_max, tmin0, tmin1, vpd0, vpd1, SLA,
            Q10_livewood, Q10_froot, froot_leaf_ratio, livewood_leaf_ratio,
            leaf_mr_base, froot_mr_base, livewood_mr_base
        ]
        params = at.as_tensor_variable(params_list)
        # Key step: Define the log-likelihood as an added potential
        pm.Potential('likelihood', log_likelihood(params))
    return model

Inherited members

class StochasticSampler (config: dict, model: Callable, params_dict: dict = None, backend: str = None, weights: Sequence[+T_co] = None, model_name: str = None)

A Markov Chain-Monte Carlo (MCMC) sampler for an arbitrary model. The specific sampler used is the Differential Evolution (DE) MCMC algorithm described by Ter Braak (2008), though the implementation is specific to the PyMC3 library.

NOTE: The model (a function) should be named "_name" where "name" is some uniquely identifiable model name. This helps StochasticSampler.run() to find the correct compiler for the model. The compiler function should be named compiled_name_model() (where "name" is the model name) and be defined on a subclass of StochasticSampler.

Parameters

config : dict
Dictionary of configuration parameters
model : Callable
The function to call (with driver data and parameters); this function should take driver data as positional arguments and the model parameters as a *Sequence; it should require no external state.
observed : Sequence
Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values
params_dict : dict or None
Dictionary of model parameters, to be used as initial values and as the basis for constructing a new dictionary of optimized parameters
backend : str or None
Path to a NetCDF4 file backend (Default: None)
weights : Sequence or None
Optional sequence of weights applied to the model residuals (as in weighted least squares)
Expand source code
class StochasticSampler(AbstractSampler):
    '''
    A Markov Chain-Monte Carlo (MCMC) sampler for an arbitrary model. The
    specific sampler used is the Differential Evolution (DE) MCMC algorithm
    described by Ter Braak (2008), though the implementation is specific to
    the PyMC3 library.

    NOTE: The `model` (a function) should be named "_name" where "name" is
    some uniquely identifiable model name. This helps `StochasticSampler.run()`
    to find the correct compiler for the model. The compiler function should
    be named `compiled_name_model()` (where "name" is the model name) and be
    defined on a subclass of `StochasticSampler`.

    Parameters
    ----------
    config : dict
        Dictionary of configuration parameters
    model : Callable
        The function to call (with driver data and parameters); this function
        should take driver data as positional arguments and the model
        parameters as a `*Sequence`; it should require no external state.
    observed : Sequence
        Sequence of observed values that will be used to calibrate the model;
        i.e., model is scored by how close its predicted values are to the
        observed values
    params_dict : dict or None
        Dictionary of model parameters, to be used as initial values and as
        the basis for constructing a new dictionary of optimized parameters
    backend : str or None
        Path to a NetCDF4 file backend (Default: None)
    weights : Sequence or None
        Optional sequence of weights applied to the model residuals (as in
        weighted least squares)
    '''
    def __init__(
            self, config: dict, model: Callable, params_dict: dict = None,
            backend: str = None, weights: Sequence = None,
            model_name: str = None):
        self.backend = backend
        # Convert the BOUNDS into nested dicts for easy use
        self.bounds = dict([
            (key, dict([('lower', b[0]), ('upper', b[1])]))
            for key, b in config['optimization']['bounds'].items()
        ])
        self.config = config
        self.model = model
        if hasattr(model, '__name__'):
            self.name = model.__name__.strip('_').upper() # "_gpp" = "GPP"
        else:
            self.name = model_name
        self.params = params_dict
        # Set the model's prior distribution assumptions
        self.prior = dict()
        for key in self.required_parameters[self.name]:
            # NOTE: This is the default only for LUE_max; other parameters,
            #   with Uniform distributions, don't use anything here
            self.prior.setdefault(key, {
                'mu': params_dict[key],
                'sigma': 2e-4
            })
        self.weights = weights
        assert os.path.exists(os.path.dirname(backend))

    def run(
            self, observed: Sequence, drivers: Sequence,
            draws = 1000, chains = 3, tune = 'lambda',
            scaling: float = 1e-3, prior: dict = dict(),
            check_shape: bool = False, save_fig: bool = False,
            var_names: Sequence = None) -> None:
        '''
        Fits the model using DE-MCMCz approach. `tune="lambda"` (default) is
        recommended; lambda is related to the scale of the jumps learned from
        other chains, but epsilon ("scaling") controls the scale directly.
        Using a larger value for `scaling` (Default: 1e-3) will produce larger
        jumps and may directly address "sticky" chains.

        Parameters
        ----------
        observed : Sequence
            The observed data the model will be calibrated against
        drivers : list or tuple
            Sequence of driver datasets to be supplied, in order, to the
            model's run function
        draws : int
            Number of samples to draw (on each chain); defaults to 1000
        chains : int
            Number of chains; defaults to 3
        tune : str or None
            Which hyperparameter to tune: Defaults to 'lambda', but can also
            be 'scaling' or None.
        scaling : float
            Initial scale factor for epsilon (Default: 1e-3)
        prior : dict
        check_shape : bool
            True to require that input driver datasets have the same shape as
            the observed values (Default: False)
        save_fig : bool
            True to save figures to files instead of showing them
            (Default: False)
        var_names : Sequence
            One or more variable names to show in the plot
        '''
        assert not check_shape or drivers[0].shape == observed.shape,\
            'Driver data should have the same shape as the "observed" data'
        assert len(drivers) == len(self.required_drivers[self.name]),\
            'Did not receive expected number of driver datasets!'
        assert tune in ('lambda', 'scaling') or tune is None
        # Update prior assumptions
        self.prior.update(prior)
        # Generate an initial goodness-of-fit score
        predicted = self.model([
            self.params[p] for p in self.required_parameters[self.name]
        ], *drivers)
        if self.weights is not None:
            score = np.sqrt(
                np.nanmean(((predicted - observed) * self.weights) ** 2))
        else:
            score = np.sqrt(np.nanmean(((predicted - observed)) ** 2))
        print('-- RMSD at the initial point: %.3f' % score)
        print('Compiling model...')
        try:
            compiler = getattr(self, 'compile_%s_model' % self.name.lower())
        except AttributeError:
            raise AttributeError('''Could not find a compiler for model named
            "%s"; make sure that a function "compile_%s_model()" is defined on
             this class''' % (model_name, model_name))
        with compiler(observed, drivers) as model:
            with warnings.catch_warnings():
                warnings.simplefilter('ignore')
                step_func = pm.DEMetropolisZ(tune = tune, scaling = scaling)
                trace = pm.sample(
                    draws = draws, step = step_func, cores = chains,
                    chains = chains, idata_kwargs = {'log_likelihood': True})
            if self.backend is not None:
                print('Writing results to file...')
                trace.to_netcdf(self.backend)
            if var_names is None:
                az.plot_trace(trace, var_names = ['~log_likelihood'])
            else:
                az.plot_trace(trace, var_names = var_names)
            if save_fig:
                pyplot.savefig('.'.join(self.backend.split('.')[:-1]) + '.png')
            else:
                pyplot.show()

Ancestors

Subclasses

Methods

def run(self, observed: Sequence[+T_co], drivers: Sequence[+T_co], draws=1000, chains=3, tune='lambda', scaling: float = 0.001, prior: dict = {}, check_shape: bool = False, save_fig: bool = False, var_names: Sequence[+T_co] = None) ‑> None

Fits the model using DE-MCMCz approach. tune="lambda" (default) is recommended; lambda is related to the scale of the jumps learned from other chains, but epsilon ("scaling") controls the scale directly. Using a larger value for scaling (Default: 1e-3) will produce larger jumps and may directly address "sticky" chains.

Parameters

observed : Sequence
The observed data the model will be calibrated against
drivers : list or tuple
Sequence of driver datasets to be supplied, in order, to the model's run function
draws : int
Number of samples to draw (on each chain); defaults to 1000
chains : int
Number of chains; defaults to 3
tune : str or None
Which hyperparameter to tune: Defaults to 'lambda', but can also be 'scaling' or None.
scaling : float
Initial scale factor for epsilon (Default: 1e-3)
prior : dict
 
check_shape : bool
True to require that input driver datasets have the same shape as the observed values (Default: False)
save_fig : bool
True to save figures to files instead of showing them (Default: False)
var_names : Sequence
One or more variable names to show in the plot
Expand source code
def run(
        self, observed: Sequence, drivers: Sequence,
        draws = 1000, chains = 3, tune = 'lambda',
        scaling: float = 1e-3, prior: dict = dict(),
        check_shape: bool = False, save_fig: bool = False,
        var_names: Sequence = None) -> None:
    '''
    Fits the model using DE-MCMCz approach. `tune="lambda"` (default) is
    recommended; lambda is related to the scale of the jumps learned from
    other chains, but epsilon ("scaling") controls the scale directly.
    Using a larger value for `scaling` (Default: 1e-3) will produce larger
    jumps and may directly address "sticky" chains.

    Parameters
    ----------
    observed : Sequence
        The observed data the model will be calibrated against
    drivers : list or tuple
        Sequence of driver datasets to be supplied, in order, to the
        model's run function
    draws : int
        Number of samples to draw (on each chain); defaults to 1000
    chains : int
        Number of chains; defaults to 3
    tune : str or None
        Which hyperparameter to tune: Defaults to 'lambda', but can also
        be 'scaling' or None.
    scaling : float
        Initial scale factor for epsilon (Default: 1e-3)
    prior : dict
    check_shape : bool
        True to require that input driver datasets have the same shape as
        the observed values (Default: False)
    save_fig : bool
        True to save figures to files instead of showing them
        (Default: False)
    var_names : Sequence
        One or more variable names to show in the plot
    '''
    assert not check_shape or drivers[0].shape == observed.shape,\
        'Driver data should have the same shape as the "observed" data'
    assert len(drivers) == len(self.required_drivers[self.name]),\
        'Did not receive expected number of driver datasets!'
    assert tune in ('lambda', 'scaling') or tune is None
    # Update prior assumptions
    self.prior.update(prior)
    # Generate an initial goodness-of-fit score
    predicted = self.model([
        self.params[p] for p in self.required_parameters[self.name]
    ], *drivers)
    if self.weights is not None:
        score = np.sqrt(
            np.nanmean(((predicted - observed) * self.weights) ** 2))
    else:
        score = np.sqrt(np.nanmean(((predicted - observed)) ** 2))
    print('-- RMSD at the initial point: %.3f' % score)
    print('Compiling model...')
    try:
        compiler = getattr(self, 'compile_%s_model' % self.name.lower())
    except AttributeError:
        raise AttributeError('''Could not find a compiler for model named
        "%s"; make sure that a function "compile_%s_model()" is defined on
         this class''' % (model_name, model_name))
    with compiler(observed, drivers) as model:
        with warnings.catch_warnings():
            warnings.simplefilter('ignore')
            step_func = pm.DEMetropolisZ(tune = tune, scaling = scaling)
            trace = pm.sample(
                draws = draws, step = step_func, cores = chains,
                chains = chains, idata_kwargs = {'log_likelihood': True})
        if self.backend is not None:
            print('Writing results to file...')
            trace.to_netcdf(self.backend)
        if var_names is None:
            az.plot_trace(trace, var_names = ['~log_likelihood'])
        else:
            az.plot_trace(trace, var_names = var_names)
        if save_fig:
            pyplot.savefig('.'.join(self.backend.split('.')[:-1]) + '.png')
        else:
            pyplot.show()

Inherited members