Module mod17.calibration
Calibration of MOD17 against a representative, global eddy covariance (EC) flux tower network. The model calibration is based on Markov-Chain Monte Carlo (MCMC). Example use:
python calibration.py tune-gpp --pft=1
The general calibration protocol used here involves:
- Check how well the chain(s) are mixing by running short:
python calibration.py tune-gpp 1 --draws=2000
- If any chain is "sticky," run a short chain while tuning the jump scale:
python calibration.py tune-gpp 1 --tune=scaling --draws=2000
- Using the trace plot from Step (2) as a reference, experiment with
different jump scales to try and achieve the same (optimal) mixing when
tuning on lambda(default) instead, e.g.:python calibration.py tune-gpp 1 --scaling=1e-2 --draws=2000
- When the right jump scale is found, run a chain at the desired length.
Once a good mixture is obtained, it is necessary to prune the samples to eliminate autocorrelation, e.g., in Python:
sampler = MOD17StochasticSampler(...)
sampler.plot_autocorr(burn = 1000, thin = 10)
trace = sampler.get_trace(burn = 1000, thin = 10)
A thinned posterior can be exported from the command line:
$ python calibration.py export-bplut output.csv --burn=1000 --thin=10
References
Madani, N., Kimball, J. S., & Running, S. W. (2017). "Improving global gross primary productivity estimates by computing optimum light use efficiencies using flux tower data." Journal of Geophysical Research: Biogeosciences, 122(11), 2939–2951.
Expand source code
'''
Calibration of MOD17 against a representative, global eddy covariance (EC)
flux tower network. The model calibration is based on Markov-Chain Monte
Carlo (MCMC). Example use:
    python calibration.py tune-gpp --pft=1
The general calibration protocol used here involves:
1. Check how well the chain(s) are mixing by running short:
`python calibration.py tune-gpp 1 --draws=2000`
2. If any chain is "sticky," run a short chain while tuning the jump scale:
`python calibration.py tune-gpp 1 --tune=scaling --draws=2000`
3. Using the trace plot from Step (2) as a reference, experiment with
different jump scales to try and achieve the same (optimal) mixing when
tuning on `lambda` (default) instead, e.g.:
`python calibration.py tune-gpp 1 --scaling=1e-2 --draws=2000`
4. When the right jump scale is found, run a chain at the desired length.
Once a good mixture is obtained, it is necessary to prune the samples to
eliminate autocorrelation, e.g., in Python:
    sampler = MOD17StochasticSampler(...)
    sampler.plot_autocorr(burn = 1000, thin = 10)
    trace = sampler.get_trace(burn = 1000, thin = 10)
A thinned posterior can be exported from the command line:
```py
$ python calibration.py export-bplut output.csv --burn=1000 --thin=10
```
References:
    Madani, N., Kimball, J. S., & Running, S. W. (2017).
      "Improving global gross primary productivity estimates by computing
      optimum light use efficiencies using flux tower data."
      Journal of Geophysical Research: Biogeosciences, 122(11), 2939–2951.
'''
import datetime
import json
import os
import warnings
import numpy as np
import h5py
import arviz as az
import pymc as pm
import aesara.tensor as at
import mod17
from functools import partial
from multiprocessing import get_context, set_start_method
from numbers import Number
from typing import Callable, Sequence
from pathlib import Path
from matplotlib import pyplot
from scipy import signal
from mod17 import MOD17, PFT_VALID
from mod17.utils import pft_dominant, restore_bplut, write_bplut, rmsd
MOD17_DIR = os.path.dirname(mod17.__file__)
# This matplotlib setting prevents labels from overplotting
pyplot.rcParams['figure.constrained_layout.use'] = True
class BlackBoxLikelihood(at.Op):
    '''
    A custom Theano operator that calculates the "likelihood" of model
    parameters; it takes a vector of values (the parameters that define our
    model) and returns a single "scalar" value (the log-likelihood).
    Parameters
    ----------
    model : Callable
        An arbitrary "black box" function that takes two arguments: the
        model parameters ("params") and the forcing data ("x")
    observed : numpy.ndarray
        The "observed" data that our log-likelihood function takes in
    x : numpy.ndarray or None
        The forcing data (input drivers) that our model requires, or None
        if no driver data are required
    weights : Sequence or None
        Optional sequence of weights applied to the model residuals (as in
        weighted least squares)
    objective : str
        Name of the objective (or "loss") function to use, one of
        ('rmsd', 'gaussian', 'kge'); defaults to "rmsd"
    '''
    itypes = [at.dvector] # Expects a vector of parameter values when called
    otypes = [at.dscalar] # Outputs a single scalar value (the log likelihood)
    def __init__(
            self, model: Callable, observed: Sequence, x: Sequence = None,
            weights: Sequence = None, objective: str = 'rmsd'):
        '''
        Initialise the Op with various things that our log-likelihood function
        requires. The observed data ("observed") and drivers ("x") must be
        stored on the instance so the Theano Op can work seamlessly.
        '''
        self.model = model
        self.observed = observed
        self.x = x
        self.weights = weights
        if objective in ('rmsd', 'rmse'):
            self._loglik = self.loglik
        elif objective == 'gaussian':
            self._loglik = self.loglik_gaussian
        elif objective == 'kge':
            self._loglik = self.loglik_kge
        else:
            raise ValueError('Unknown "objective" function specified')
    def loglik(
            self, params: Sequence, observed: Sequence,
            x: Sequence = None) -> Number:
        '''
        Pseudo-log likelihood, based on the root-mean squared deviation
        (RMSD). The sign of the RMSD is forced to be negative so as to allow
        for maximization of this objective function.
        Parameters
        ----------
        params : Sequence
            One or more model parameters
        observed : Sequence
            The observed values
        x : Sequence or None
            Input driver data
        Returns
        -------
        Number
            The (negative) root-mean squared deviation (RMSD) between the
            predicted and observed values
        '''
        predicted = self.model(params, *x)
        if self.weights is not None:
            return -np.sqrt(
                np.nanmean(((predicted - observed) * self.weights) ** 2))
        return -np.sqrt(np.nanmean(((predicted - observed)) ** 2))
    def loglik_gaussian(
            self, params: Sequence, observed: Sequence,
            x: Sequence = None) -> Number:
        '''
        Gaussian log-likelihood, assuming independent, identically distributed
        observations.
        Parameters
        ----------
        params : Sequence
            One or more model parameters
        observed : Sequence
            The observed values
        x : Sequence or None
            Input driver data
        Returns
        -------
        Number
            The (negative) log-likelihood
        '''
        predicted = self.model(params, *x)
        sigma = params[-1]
        # Gaussian log-likelihood;
        # -\frac{N}{2}\,\mathrm{log}(2\pi\hat{\sigma}^2)
        #   - \frac{1}{2\hat{\sigma}^2} \sum (\hat{y} - y)^2
        return -0.5 * np.log(2 * np.pi * sigma**2) - (0.5 / sigma**2) *\
            np.nansum((predicted - observed)**2)
    def loglik_kge(
            self, params: Sequence, observed: Sequence,
            x: Sequence = None) -> Number:
        r'''
        Kling-Gupta efficiency.
        $$
        KGE = 1 - \sqrt{(r - 1)^2 + (\alpha - 1)^2 + (\beta - 1)^2}
        $$
        Parameters
        ----------
        params : Sequence
            One or more model parameters
        observed : Sequence
            The observed values
        x : Sequence or None
            Input driver data
        Returns
        -------
        Number
            The Kling-Gupta efficiency
        '''
        predicted = self.model(params, *x)
        r = np.corrcoef(predicted, observed)[0, 1]
        alpha = np.std(predicted) / np.std(observed)
        beta = np.sum(predicted) / np.sum(observed)
        return 1 - np.sqrt((r - 1)**2 + (alpha - 1)**2 + (beta - 1)**2)
    def perform(self, node, inputs, outputs):
        '''
        The method that is used when calling the Op.
        Parameters
        ----------
        node
        inputs : Sequence
        outputs : Sequence
        '''
        (params,) = inputs
        logl = self._loglik(params, self.observed, self.x)
        outputs[0][0] = np.array(logl) # Output the log-likelihood
class AbstractSampler(object):
    '''
    Generic algorithm for fitting a model to data based on observed values
    similar to what we can produce with our model. Not intended to be called
    directly.
    '''
    def get_posterior(self, thin: int = 1) -> np.ndarray:
        '''
        Returns a stacked posterior array, with optional thinning, combining
        all the chains together.
        Parameters
        ----------
        thin : int
        Returns
        -------
        numpy.ndarray
        '''
        trace = az.from_netcdf(self.backend)
        return np.stack([ # i.e., get every ith element, each chain
            trace['posterior'][p].values[:,::thin].ravel()
            for p in self.required_parameters[self.name]
        ], axis = -1)
    def get_trace(
            self, thin: int = None, burn: int = None
        ) -> az.data.inference_data.InferenceData:
        '''
        Extracts the trace from the backend data store.
        Parameters
        ----------
        thin : int
            Thinning rate
        burn : int
            The burn-in (i.e., first N samples to discard)
        '''
        trace = az.from_netcdf(self.backend)
        if thin is None and burn is None:
            return trace
        return trace.sel(draw = slice(burn, None, thin))
    def plot_autocorr(self, thin: int = None, burn: int = None, **kwargs):
        '''
        Auto-correlation plot for an MCMC sample.
        Parameters
        ----------
        thin : int
            Thinning rate
        burn : int
            The burn-in (i.e., first N samples to discard)
        **kwargs
            Additional keyword arguments to `arviz.plot_autocorr()`.
        '''
        assert os.path.exists(self.backend),\
            'Could not find file backend!'
        trace = az.from_netcdf(self.backend)
        kwargs.setdefault('combined', True)
        if thin is None:
            az.plot_autocorr(trace, **kwargs)
        else:
            burn = 0 if burn is None else burn
            az.plot_autocorr(
                trace.sel(draw = slice(burn, None, thin))['posterior'],
                **kwargs)
        pyplot.show()
    def plot_forest(self, **kwargs):
        '''
        Forest plot for an MCMC sample.
        In particular:
        - `hdi_prob`: A float indicating the highest density interval (HDF) to
            plot
        '''
        assert os.path.exists(self.backend),\
            'Could not find file backend!'
        trace = az.from_netcdf(self.backend)
        az.plot_forest(trace, **kwargs)
        pyplot.show()
    def plot_pair(self, **kwargs):
        '''
        Paired variables plot for an MCMC sample.
        Parameters
        ----------
        **kwargs
            Additional keyword arguments to `arviz.plot_pair()`.
        '''
        assert os.path.exists(self.backend),\
            'Could not find file backend!'
        trace = az.from_netcdf(self.backend)
        az.plot_pair(trace, **kwargs)
        pyplot.show()
    def plot_partial_score(
            self, observed: Sequence, drivers: Sequence, fit: dict = None):
        '''
        Plots the "partial effect" of a single parameter: the score of the
        model at that parameter's current value against a sweep of possible
        parameter values. All other parameters are held fixed at the best-fit
        values.
        Parameters
        ----------
        observed : Sequence
            Sequence of observed values that will be used to calibrate the model;
            i.e., model is scored by how close its predicted values are to the
            observed values
        drivers : list or tuple
            Sequence of driver datasets to be supplied, in order, to the
            model's run function
        fit : dict or None
            The best-fit parameter values used for those parameters that are
            fixed
        '''
        trace = az.from_netcdf(self.backend)
        if fit is None:
            # Mean of posterior are "best fit" values
            fit = trace['posterior'].mean()
        fit_params = list(filter(
            lambda p: p in fit, self.required_parameters[self.name]))
        # The NPP model depends on constants not included in the fit
        constants = []
        if self.name == 'NPP':
            constants = [
                self.params[p]
                for p in ['LUE_max', 'tmin0', 'tmin1', 'vpd0', 'vpd1']
            ]
        n = len(fit_params)
        nrow = n
        ncol = 1
        if n > 4:
            nrow = 2
            ncol = n - (n // 2)
        fig, axes = pyplot.subplots(
            nrow, ncol, figsize = (n * 2, n), sharey = True)
        i = 0
        for j in range(nrow):
            for k in range(ncol):
                if i >= n:
                    break
                free_param = fit_params[i]
                fixed = np.stack([
                    fit[p].values for p in fit_params
                ])[np.newaxis,:].repeat(30, axis = 0)
                sweep = np.linspace(
                    trace['posterior'][free_param].min(),
                    trace['posterior'][free_param].max(), num = 30)
                fixed[:,i] = sweep
                # Need to concatenate GPP parameters at begining of fixed
                scores = -np.array(self.score_posterior(
                    observed, drivers, [
                        [*constants, *f] for f in fixed.tolist()
                    ]))
                axes[j,k].plot(sweep, scores, 'k-')
                axes[j,k].axvline(
                    fit[free_param], color = 'red', linestyle = 'dashed',
                    label = 'Posterior Mean')
                axes[j,k].set_xlabel(free_param)
                axes[j,k].set_title(free_param)
                axes[j,k].legend()
                i += 1
        # Delete the last empty subplot
        if n % 2 != 0:
            fig.delaxes(axes.flatten()[-1])
        axes[0, 0].set_ylabel('Score')
        pyplot.tight_layout()
        pyplot.show()
    def plot_posterior(self, **kwargs):
        '''
        Plots the posterior distribution for an MCMC sample.
        Parameters
        ----------
        **kwargs
            Additional keyword arguments to `arviz.plot_posterior()`.
        '''
        assert os.path.exists(self.backend),\
            'Could not find file backend!'
        trace = az.from_netcdf(self.backend)
        az.plot_posterior(trace, **kwargs)
        pyplot.show()
    def score_posterior(
            self, observed: Sequence, drivers: Sequence, posterior: Sequence,
            method: str = 'rmsd') -> Number:
        '''
        Returns a goodness-of-fit score based on the existing calibration.
        Parameters
        ----------
        observed : Sequence
            Sequence of observed values that will be used to calibrate the model;
            i.e., model is scored by how close its predicted values are to the
            observed values
        drivers : list or tuple
            Sequence of driver datasets to be supplied, in order, to the
            model's run function
        posterior : list or tuple
            Sequence of posterior parameter sets (i.e., nested sequence); each
            nested sequence will be scored
        method : str
            The method for generating a goodness-of-git score
            (Default: "rmsd")
        Returns
        -------
        float
        '''
        if method != 'rmsd':
            raise NotImplementedError('"method" must be one of: "rmsd"')
        score_func = partial(
            rmsd, func = self.model, observed = observed, drivers = drivers)
        with get_context('spawn').Pool() as pool:
            scores = pool.map(score_func, posterior)
        return scores
class StochasticSampler(AbstractSampler):
    '''
    A Markov Chain-Monte Carlo (MCMC) sampler for an arbitrary model. The
    specific sampler used is the Differential Evolution (DE) MCMC algorithm
    described by Ter Braak (2008), though the implementation is specific to
    the PyMC3 library.
    NOTE: The `model` (a function) should be named "_name" where "name" is
    some uniquely identifiable model name. This helps `StochasticSampler.run()`
    to find the correct compiler for the model. The compiler function should
    be named `compiled_name_model()` (where "name" is the model name) and be
    defined on a subclass of `StochasticSampler`.
    Parameters
    ----------
    config : dict
        Dictionary of configuration parameters
    model : Callable
        The function to call (with driver data and parameters); this function
        should take driver data as positional arguments and the model
        parameters as a `*Sequence`; it should require no external state.
    observed : Sequence
        Sequence of observed values that will be used to calibrate the model;
        i.e., model is scored by how close its predicted values are to the
        observed values
    params_dict : dict or None
        Dictionary of model parameters, to be used as initial values and as
        the basis for constructing a new dictionary of optimized parameters
    backend : str or None
        Path to a NetCDF4 file backend (Default: None)
    weights : Sequence or None
        Optional sequence of weights applied to the model residuals (as in
        weighted least squares)
    '''
    def __init__(
            self, config: dict, model: Callable, params_dict: dict = None,
            backend: str = None, weights: Sequence = None,
            model_name: str = None):
        self.backend = backend
        # Convert the BOUNDS into nested dicts for easy use
        self.bounds = dict([
            (key, dict([('lower', b[0]), ('upper', b[1])]))
            for key, b in config['optimization']['bounds'].items()
        ])
        self.config = config
        self.model = model
        if hasattr(model, '__name__'):
            self.name = model.__name__.strip('_').upper() # "_gpp" = "GPP"
        else:
            self.name = model_name
        self.params = params_dict
        # Set the model's prior distribution assumptions
        self.prior = dict()
        for key in self.required_parameters[self.name]:
            # NOTE: This is the default only for LUE_max; other parameters,
            #   with Uniform distributions, don't use anything here
            self.prior.setdefault(key, {
                'mu': params_dict[key],
                'sigma': 2e-4
            })
        self.weights = weights
        assert os.path.exists(os.path.dirname(backend))
    def run(
            self, observed: Sequence, drivers: Sequence,
            draws = 1000, chains = 3, tune = 'lambda',
            scaling: float = 1e-3, prior: dict = dict(),
            check_shape: bool = False, save_fig: bool = False,
            var_names: Sequence = None) -> None:
        '''
        Fits the model using DE-MCMCz approach. `tune="lambda"` (default) is
        recommended; lambda is related to the scale of the jumps learned from
        other chains, but epsilon ("scaling") controls the scale directly.
        Using a larger value for `scaling` (Default: 1e-3) will produce larger
        jumps and may directly address "sticky" chains.
        Parameters
        ----------
        observed : Sequence
            The observed data the model will be calibrated against
        drivers : list or tuple
            Sequence of driver datasets to be supplied, in order, to the
            model's run function
        draws : int
            Number of samples to draw (on each chain); defaults to 1000
        chains : int
            Number of chains; defaults to 3
        tune : str or None
            Which hyperparameter to tune: Defaults to 'lambda', but can also
            be 'scaling' or None.
        scaling : float
            Initial scale factor for epsilon (Default: 1e-3)
        prior : dict
        check_shape : bool
            True to require that input driver datasets have the same shape as
            the observed values (Default: False)
        save_fig : bool
            True to save figures to files instead of showing them
            (Default: False)
        var_names : Sequence
            One or more variable names to show in the plot
        '''
        assert not check_shape or drivers[0].shape == observed.shape,\
            'Driver data should have the same shape as the "observed" data'
        assert len(drivers) == len(self.required_drivers[self.name]),\
            'Did not receive expected number of driver datasets!'
        assert tune in ('lambda', 'scaling') or tune is None
        # Update prior assumptions
        self.prior.update(prior)
        # Generate an initial goodness-of-fit score
        predicted = self.model([
            self.params[p] for p in self.required_parameters[self.name]
        ], *drivers)
        if self.weights is not None:
            score = np.sqrt(
                np.nanmean(((predicted - observed) * self.weights) ** 2))
        else:
            score = np.sqrt(np.nanmean(((predicted - observed)) ** 2))
        print('-- RMSD at the initial point: %.3f' % score)
        print('Compiling model...')
        try:
            compiler = getattr(self, 'compile_%s_model' % self.name.lower())
        except AttributeError:
            raise AttributeError('''Could not find a compiler for model named
            "%s"; make sure that a function "compile_%s_model()" is defined on
             this class''' % (model_name, model_name))
        with compiler(observed, drivers) as model:
            with warnings.catch_warnings():
                warnings.simplefilter('ignore')
                step_func = pm.DEMetropolisZ(tune = tune, scaling = scaling)
                trace = pm.sample(
                    draws = draws, step = step_func, cores = chains,
                    chains = chains, idata_kwargs = {'log_likelihood': True})
            if self.backend is not None:
                print('Writing results to file...')
                trace.to_netcdf(self.backend)
            if var_names is None:
                az.plot_trace(trace, var_names = ['~log_likelihood'])
            else:
                az.plot_trace(trace, var_names = var_names)
            if save_fig:
                pyplot.savefig('.'.join(self.backend.split('.')[:-1]) + '.png')
            else:
                pyplot.show()
class MOD17StochasticSampler(StochasticSampler):
    '''
    A Markov Chain-Monte Carlo (MCMC) sampler for MOD17. The specific sampler
    used is the Differential Evolution (DE) MCMC algorithm described by
    Ter Braak (2008), though the implementation is specific to the PyMC3
    library.
    Considerations:
    1. Tower GPP is censored when values are < 0 or when APAR is
        < 0.1 MJ m-2 d-1.
    Parameters
    ----------
    config : dict
        Dictionary of configuration parameters
    model : Callable
        The function to call (with driver data and parameters); this function
        should take driver data as positional arguments and the model
        parameters as a `*Sequence`; it should require no external state.
    observed : Sequence
        Sequence of observed values that will be used to calibrate the model;
        i.e., model is scored by how close its predicted values are to the
        observed values
    params_dict : dict or None
        Dictionary of model parameters, to be used as initial values and as
        the basis for constructing a new dictionary of optimized parameters
    backend : str or None
        Path to a NetCDF4 file backend (Default: None)
    weights : Sequence or None
        Optional sequence of weights applied to the model residuals (as in
        weighted least squares)
    '''
    # NOTE: This is different than for mod17.MOD17 because we haven't yet
    #   figured out how the respiration terms are calculated
    required_parameters = {
        'GPP': ['LUE_max', 'tmin0', 'tmin1', 'vpd0', 'vpd1'],
        'NPP': MOD17.required_parameters
    }
    required_drivers = {
        'GPP': ['fPAR', 'Tmin', 'VPD', 'PAR'],
        'NPP': ['fPAR', 'Tmin', 'VPD', 'PAR', 'LAI', 'Tmean', 'years']
    }
    def compile_gpp_model(
            self, observed: Sequence, drivers: Sequence) -> pm.Model:
        '''
        Creates a new GPP model based on the prior distribution. Model can be
        re-compiled multiple times, e.g., for cross validation.
        Parameters
        ----------
        observed : Sequence
            Sequence of observed values that will be used to calibrate the model;
            i.e., model is scored by how close its predicted values are to the
            observed values
        drivers : list or tuple
            Sequence of driver datasets to be supplied, in order, to the
            model's run function
        Returns
        -------
        pm.Model
        '''
        # Define the objective/ likelihood function
        log_likelihood = BlackBoxLikelihood(
            self.model, observed, x = drivers, weights = self.weights,
            objective = self.config['optimization']['objective'].lower())
        # With this context manager, "all PyMC3 objects introduced in the indented
        #   code block...are added to the model behind the scenes."
        with pm.Model() as model:
            # (Stochstic) Priors for unknown model parameters
            LUE_max = pm.TruncatedNormal('LUE_max',
                **self.prior['LUE_max'], **self.bounds['LUE_max'])
            # NOTE: tmin0, vpd0 are fixed at Collection 6.1 values
            tmin0 = self.params['tmin0']
            tmin1 = pm.Uniform('tmin1', **self.bounds['tmin1'])
            # NOTE: Upper bound on `vpd1` is set by the maximum observed VPD
            vpd0 = self.params['vpd0']
            vpd1 = pm.Uniform('vpd1',
                lower = self.bounds['vpd1']['lower'],
                upper = drivers[2].max().round(0))
            # Convert model parameters to a tensor vector
            params_list = [LUE_max, tmin0, tmin1, vpd0, vpd1]
            params = at.as_tensor_variable(params_list)
            # Key step: Define the log-likelihood as an added potential
            pm.Potential('likelihood', log_likelihood(params))
        return model
    def compile_npp_model(
            self, observed: Sequence, drivers: Sequence) -> pm.Model:
        '''
        Creates a new NPP model based on the prior distribution. Model can be
        re-compiled multiple times, e.g., for cross validation.
        Parameters
        ----------
        observed : Sequence
            Sequence of observed values that will be used to calibrate the model;
            i.e., model is scored by how close its predicted values are to the
            observed values
        drivers : list or tuple
            Sequence of driver datasets to be supplied, in order, to the
            model's run function
        Returns
        -------
        pm.Model
        '''
        # Define the objective/ likelihood function
        log_likelihood = BlackBoxLikelihood(
            self.model, observed, x = drivers, weights = self.weights,
            objective = self.config['optimization']['objective'].lower())
        # With this context manager, "all PyMC3 objects introduced in the indented
        #   code block...are added to the model behind the scenes."
        with pm.Model() as model:
            # Setting GPP parameters that are known--EXCEPT tmin1
            LUE_max = self.params['LUE_max']
            tmin0   = self.params['tmin0']
            tmin1   = self.params['tmin1']
            vpd0    = self.params['vpd0']
            vpd1    = self.params['vpd1']
            # SLA fixed at prior mean
            SLA = np.exp(self.prior['SLA']['mu'])
            # Allometry ratios prescribe narrow range around Collection 6.1 values
            froot_leaf_ratio = pm.Triangular(
                'froot_leaf_ratio', **self.prior['froot_leaf_ratio'])
            # (Stochstic) Priors for unknown model parameters
            Q10_froot = pm.TruncatedNormal(
                'Q10_froot', **self.prior['Q10_froot'], **self.bounds['Q10'])
            leaf_mr_base = pm.LogNormal(
                'leaf_mr_base', **self.prior['leaf_mr_base'])
            froot_mr_base = pm.LogNormal(
                'froot_mr_base', **self.prior['froot_mr_base'])
            # For GRS and CRO, livewood mass and respiration are zero
            if np.equal(list(self.prior['livewood_mr_base'].values()), [0, 0]).all():
                livewood_leaf_ratio = 0
                livewood_mr_base = 0
                Q10_livewood = 0
            else:
                livewood_leaf_ratio = pm.Triangular(
                    'livewood_leaf_ratio', **self.prior['livewood_leaf_ratio'])
                livewood_mr_base = pm.LogNormal(
                    'livewood_mr_base', **self.prior['livewood_mr_base'])
                Q10_livewood = pm.TruncatedNormal(
                    'Q10_livewood', **self.prior['Q10_livewood'],
                    **self.bounds['Q10'])
            # Convert model parameters to a tensor vector
            params_list = [
                LUE_max, tmin0, tmin1, vpd0, vpd1, SLA,
                Q10_livewood, Q10_froot, froot_leaf_ratio, livewood_leaf_ratio,
                leaf_mr_base, froot_mr_base, livewood_mr_base
            ]
            params = at.as_tensor_variable(params_list)
            # Key step: Define the log-likelihood as an added potential
            pm.Potential('likelihood', log_likelihood(params))
        return model
class CalibrationAPI(object):
    '''
    Convenience class for calibrating the MOD17 GPP and NPP models. Meant to
    be used with `fire.Fire()`.
    '''
    def __init__(self, config = None):
        config_file = config
        if config_file is None:
            config_file = os.path.join(
                MOD17_DIR, 'data/MOD17_calibration_config.json')
        with open(config_file, 'r') as file:
            self.config = json.load(file)
        self.hdf5 = self.config['data']['file']
    def _clean(self, raw: Sequence, drivers: Sequence, protocol: str = 'GPP'):
        'Cleans up data values according to a prescribed protocol'
        if protocol == 'GPP':
            # Filter out observed GPP values when GPP is negative or when
            #   APAR < 0.1 g C m-2 day-1
            apar = drivers['fPAR'] * drivers['PAR']
            return np.where(
                apar < 0.1, np.nan, np.where(raw < 0, np.nan, raw))
    def _filter(self, raw: Sequence, size: int):
        'Apply a smoothing filter with zero phase offset'
        if size > 1:
            window = np.ones(size) / size
            return np.apply_along_axis(
                lambda x: signal.filtfilt(window, np.ones(1), x), 0, raw)
        return raw # Or, revert to the raw data
    def clean_observed(
            self, raw: Sequence, drivers: Sequence, driver_labels: Sequence,
            protocol: str = 'GPP', filter_length: int = 2) -> Sequence:
        '''
        Cleans observed tower flux data according to a prescribed protocol.
        - For GPP data: Removes observations where GPP < 0 or where APAR is
            < 0.1 MJ m-2 day-1
        Parameters
        ----------
        raw : Sequence
        drivers : Sequence
        driver_labels : Sequence
        protocol : str
        filter_length : int
            The window size for the smoothing filter, applied to the observed
            data
        Returns
        -------
        Sequence
        '''
        if protocol != 'GPP':
            raise NotImplementedError('"protocol" must be one of: "GPP"')
        # Read in the observed data and apply smoothing filter
        obs = self._filter(raw, filter_length)
        obs = self._clean(obs, dict([
            (driver_labels[i], data)
            for i, data in enumerate(drivers)
        ]), protocol = 'GPP')
        return obs
    def export_bplut(
            self, model: str, output_path: str, thin: int = 10,
            burn: int = 1000):
        '''
        Export the BPLUT using the posterior mean from the MCMC sampler. NOTE:
        The posterior mean is usually not the best estimate for poorly
        identified parameters.
        Parameters
        ----------
        model : str
            The name of the model ("GPP" or "NPP")
        output_path : str
            The output CSV file path
        thin : int
            Thinning rate
        burn : int
            The burn-in (i.e., first N samples to discard)
        '''
        params_dict = restore_bplut(self.config['BPLUT'][model])
        bplut = params_dict.copy()
        # Filter the parameters to just those for the PFT of interest
        for pft in PFT_VALID:
            backend = self.config['optimization']['backend_template'] %\
                (model, pft)
            params = dict([(k, v[pft]) for k, v in params_dict.items()])
            sampler = MOD17StochasticSampler(
                self.config, getattr(MOD17, '_%s' % model.lower()), params,
                backend = backend)
            trace = sampler.get_trace()
            fit = trace.sel(
                draw = slice(burn, None, thin))['posterior'].mean()
            for each in MOD17.required_parameters:
                try:
                    bplut[each][pft] = float(fit[each])
                except KeyError:
                    continue
        write_bplut(bplut, output_path)
    def export_posterior(
            self, model: str, param: str, output_path: str, thin: int = 10,
            burn: int = 1000, k_folds: int = 1):
        '''
        Exports posterior distribution for a parameter, for each PFT to HDF5.
        Parameters
        ----------
        model : str
            The name of the model ("GPP" or "NPP")
        param : str
            The model parameter to export
        output_path : str
            The output HDF5 file path
        thin : int
            Thinning rate
        burn : int
            The burn-in (i.e., first N samples to discard)
        k_folds : int
            The number of k-folds used in cross-calibration/validation;
            if more than one (default), the folds for each PFT will be
            combined into a single HDF5 file
        '''
        params_dict = restore_bplut(self.config['BPLUT'][model])
        bplut = params_dict.copy()
        # Filter the parameters to just those for the PFT of interest
        post = []
        for pft in PFT_VALID:
            params = dict([(k, v[pft]) for k, v in params_dict.items()])
            backend = self.config['optimization']['backend_template'] %\
                (model, pft)
            post_by_fold = []
            for fold in range(1, k_folds + 1):
                if k_folds > 1:
                    backend = self.config['optimization']['backend_template'] %\
                        (f'{model}-k{fold}', pft)
                sampler = MOD17StochasticSampler(
                    self.config, getattr(MOD17, '_%s' % model.lower()), params,
                    backend = backend)
                trace = sampler.get_trace()
                fit = trace.sel(draw = slice(burn, None, thin))['posterior']
                num_samples = fit.sizes['chain'] * fit.sizes['draw']
                if param in fit:
                    post_by_fold.append(
                        az.extract_dataset(fit, combined = True)[param].values)
                else:
                    # In case there is, e.g., a parameter that takes on a
                    #   constant value for a specific PFT
                    if k_folds > 1:
                        post_by_fold.append(np.ones((1, num_samples)) * np.nan)
                    else:
                        post_by_fold.append(np.ones(num_samples) * np.nan)
            if k_folds > 1:
                post.append(np.vstack(post_by_fold))
            else:
                post.extend(post_by_fold)
        # If not every PFT's posterior has the same number of samples (e.g.,
        #   when one set of chains was run longer than another)...
        if not all([p.shape == post[0].shape for p in post]):
            max_len = max([p.shape for p in post])[0]
            # ...Reshape all posteriors to match the greatest sample size
            import ipdb
            ipdb.set_trace()#FIXME
            post = [
                np.pad(
                    p.astype(np.float32), (0, max_len - p.size),
                    mode = 'constant', constant_values = (np.nan,))
                for p in post
            ]
        with h5py.File(output_path, 'a') as hdf:
            post = np.stack(post)
            ts = datetime.date.today().strftime('%Y-%m-%d') # Today's date
            dataset = hdf.create_dataset(
                f'{param}_posterior', post.shape, np.float32, post)
            dataset.attrs['description'] = 'CalibrationAPI.export_posterior() on {ts}'
    def export_likely_posterior(
            self, model: str, param: str, output_path: str, thin: int = 10,
            burn: int = 1000, ptile: int = 95):
        '''
        Exports posterior distribution for a parameter based on likelihood
        Parameters
        ----------
        model : str
            The name of the model ("GPP" or "NPP")
        param : str
            The model parameter to export
        output_path : str
            The output HDF5 file path
        thin : int
            Thinning rate
        burn : int
            The burn-in (i.e., first N samples to discard)
        ptile : int
            The percentile cutoff for likelihood; only samples at or above
            this likelihood cutoff will be included
        '''
        params_dict = restore_bplut(self.config['BPLUT'][model])
        bplut = params_dict.copy()
        # Filter the parameters to just those for the PFT of interest
        post = []
        likelihood = []
        for pft in PFT_VALID:
            backend = self.config['optimization']['backend_template'] % (model, pft)
            params = dict([(k, v[pft]) for k, v in params_dict.items()])
            sampler = MOD17StochasticSampler(
                self.config, getattr(MOD17, '_%s' % model.lower()), params,
                backend = backend)
            trace = sampler.get_trace()
            fit = trace.sel(draw = slice(burn, None, thin))
            # Find the likelihood value associated with the cutoff percentile
            ll = az.extract_dataset(
                fit, combined = True)['log_likelihood'].values
            values = az.extract_dataset(fit, combined = True)[param].values
            cutoff = np.percentile(ll, ptile)
            post.append(values[ll >= cutoff])
            likelihood.append(ll[ll >= cutoff])
        with h5py.File(output_path, 'a') as hdf:
            n = max([len(p) for p in post])
            # Make sure all arrays are the same size
            post = np.stack([
                np.concatenate((p, np.full((n - len(p),), np.nan)))
                for p in post
            ])
            likelihood = np.stack([
                np.concatenate((p, np.full((n - len(p),), np.nan)))
                for p in likelihood
            ])
            ts = datetime.date.today().strftime('%Y-%m-%d') # Today's date
            dataset = hdf.create_dataset(
                f'{param}_posterior', post.shape, np.float32, post)
            dataset.attrs['description'] = 'CalibrationAPI.export_likely_posterior() on {ts}'
            dataset = hdf.create_dataset(
                f'{param}_likelihood', likelihood.shape, np.float32, likelihood)
    def tune_gpp(
            self, pft: int, plot_trace: bool = False, ipdb: bool = False,
            save_fig: bool = False, **kwargs):
        '''
        Run the MOD17 GPP calibration.
        Parameters
        ----------
        pft : int
            The Plant Functional Type (PFT) to calibrate
        plot_trace : bool
            True to plot the trace for a previous calibration run; this will
            also NOT start a new calibration (Default: False)
        ipdb : bool
            True to drop the user into an ipdb prompt, prior to and instead of
            running calibration
        save_fig : bool
            True to save figures to files instead of showing them
            (Default: False)
        **kwargs
            Additional keyword arguments passed to
            `MOD17StochasticSampler.run()`
        '''
        assert pft in PFT_VALID, f'Invalid PFT: {pft}'
        # Pass configuration parameters to MOD17StochasticSampler.run()
        for key in ('chains', 'draws', 'tune', 'scaling'):
            if key in self.config['optimization'].keys() and not key in kwargs.keys():
                kwargs[key] = self.config['optimization'][key]
        # Filter the parameters to just those for the PFT of interest
        params_dict = restore_bplut(self.config['BPLUT']['GPP'])
        # Load blacklisted sites (if any)
        blacklist = self.config['data']['sites_blacklisted']
        params_dict = dict([(k, v[pft]) for k, v in params_dict.items()])
        model = MOD17(params_dict)
        objective = self.config['optimization']['objective'].lower()
        print('Loading driver datasets...')
        with h5py.File(self.hdf5, 'r') as hdf:
            sites = hdf['FLUXNET/site_id'][:]
            if hasattr(sites[0], 'decode'):
                sites = list(map(lambda x: x.decode('utf-8'), sites))
            # Get dominant PFT
            pft_map = pft_dominant(hdf['state/PFT'][:], site_list = sites)
            # Blacklist various sites
            pft_mask = np.logical_and(pft_map == pft, ~np.in1d(sites, blacklist))
            # NOTE: Converting from Kelvin to Celsius
            tday = hdf['MERRA2/T10M_daytime'][:][:,pft_mask] - 273.15
            qv10m = hdf['MERRA2/QV10M_daytime'][:][:,pft_mask]
            ps = hdf['MERRA2/PS_daytime'][:][:,pft_mask]
            drivers = [ # fPAR, Tmin, VPD, PAR
                hdf['MODIS/MOD15A2HGF_fPAR_interp'][:][:,pft_mask],
                hdf['MERRA2/Tmin'][:][:,pft_mask] - 273.15,
                MOD17.vpd(qv10m, ps, tday),
                MOD17.par(hdf['MERRA2/SWGDN'][:][:,pft_mask]),
            ]
            # Convert fPAR from (%) to [0,1]
            drivers[0] = np.nanmean(drivers[0], axis = -1) / 100
            # If RMSE is used, then we want to pay attention to weighting
            weights = None
            if objective in ('rmsd', 'rmse'):
                weights = hdf['weights'][pft_mask][np.newaxis,:]\
                    .repeat(tday.shape[0], axis = 0)
            # Check that driver data do not contain NaNs
            for d, each in enumerate(drivers):
                name = ('fPAR', 'Tmin', 'VPD', 'PAR')[d]
                assert not np.isnan(each).any(),\
                    f'Driver dataset "{name}" contains NaNs'
            tower_gpp = hdf['FLUXNET/GPP'][:][:,pft_mask]
            # Read the validation mask; mask out observations that are
            #   reserved for validation
            print('Masking out validation data...')
            mask = hdf['FLUXNET/validation_mask'][pft]
            tower_gpp[mask] = np.nan
        # Clean observations, then mask out driver data where the are no
        #   observations
        tower_gpp = self.clean_observed(
            tower_gpp, drivers, MOD17StochasticSampler.required_drivers['GPP'],
            protocol = 'GPP')
        if weights is not None:
            weights = weights[~np.isnan(tower_gpp)]
        for d, _ in enumerate(drivers):
            drivers[d] = drivers[d][~np.isnan(tower_gpp)]
        tower_gpp = tower_gpp[~np.isnan(tower_gpp)]
        print('Initializing sampler...')
        backend = self.config['optimization']['backend_template'] % ('GPP', pft)
        sampler = MOD17StochasticSampler(
            self.config, MOD17._gpp, params_dict, backend = backend,
            weights = weights)
        if plot_trace or ipdb:
            if ipdb:
                import ipdb
                ipdb.set_trace()
            trace = sampler.get_trace()
            az.plot_trace(trace, var_names = MOD17.required_parameters[0:5])
            pyplot.show()
            return
        # Get (informative) priors for just those parameters that have them
        with open(self.config['optimization']['prior'], 'r') as file:
            prior = json.load(file)
        prior_params = filter(
            lambda p: p in prior.keys(), sampler.required_parameters['GPP'])
        prior = dict([
            (p, {'mu': prior[p]['mu'][pft], 'sigma': prior[p]['sigma'][pft]})
            for p in prior_params
        ])
        sampler.run(
            tower_gpp, drivers, prior = prior, save_fig = save_fig, **kwargs)
    def tune_npp(
            self, pft: int, plot_trace: bool = False, ipdb: bool = False,
            save_fig: bool = False, climatology = False,
            cutoff: Number = 2385, k_folds: int = 1, **kwargs):
        r'''
        Run the MOD17 NPP calibration. If k-folds cross-validation is used,
        the model is calibrated on $k$ random subsets of the data and a
        series of file is created, e.g., as:
            MOD17_NPP_calibration_PFT1.h5
            MOD17_NPP-k1_calibration_PFT1.nc4
            MOD17_NPP-k2_calibration_PFT1.nc4
            ...
        Where each `.nc4` file is a standard `arviz` backend and the `.h5`
        indicates which indices from the NPP observations vector, after
        removing NaNs, were excluded (i.e., the indices of the test data).
        Parameters
        ----------
        pft : int
            The Plant Functional Type (PFT) to calibrate
        plot_trace : bool
            True to display the trace plot ONLY and not run calibration
            (Default: False)
        ipdb : bool
            True to drop into an interactive Python debugger (`ipdb`) after
            loading an existing trace (Default: False)
        save_fig : bool
            True to save the post-calibration trace plot to a file instead of
            displaying it (Default: False)
        climatology : bool
            True to use a MERRA-2 climatology (and look for it in the drivers
            file), i.e., use `MERRA2_climatology` group instead of
            `surface_met_MERRA2` group (Default: False)
        cutoff : Number
            Maximum value of observed NPP (g C m-2 year-1); values above this
            cutoff will be discarded and not used in calibration
            (Default: 2385)
        k_folds : int
            Number of folds to use in k-folds cross-validation; defaults to
            k=1, i.e., no cross-validation is performed.
        **kwargs
            Additional keyword arguments passed to
            `MOD17StochasticSampler.run()`
        '''
        assert pft in PFT_VALID, f'Invalid PFT: {pft}'
        prefix = 'MERRA2_climatology' if climatology else 'surface_met_MERRA2'
        params_dict = restore_bplut(self.config['BPLUT']['NPP'])
        # Filter the parameters to just those for the PFT of interest
        params_dict = dict([(k, v[pft]) for k, v in params_dict.items()])
        model = MOD17(params_dict)
        kwargs.update({'var_names': [
            '~LUE_max', '~tmin0', '~tmin1', '~vpd0', '~vpd1', '~log_likelihood'
        ]})
        # Pass configuration parameters to MOD17StochasticSampler.run()
        for key in ('chains', 'draws', 'tune', 'scaling'):
            if key in self.config['optimization'].keys():
                kwargs[key] = self.config['optimization'][key]
        print('Loading driver datasets...')
        with h5py.File(self.hdf5, 'r') as hdf:
            # NOTE: This is only recorded at the site-level; no need to
            #   determine modal PFT across subgrid
            pft_map = hdf['NPP/PFT'][:]
            # Leave out sites where there is no fPAR (and no LAI) data
            fpar = hdf['NPP/MOD15A2H_fPAR_clim_filt'][:]
            mask = np.logical_and(
                    pft_map == pft, ~np.isnan(np.nanmean(fpar, axis = -1))\
                .all(axis = 0))
            # NOTE: Converting from Kelvin to Celsius
            tday = hdf[f'NPP/{prefix}/T10M_daytime'][:][:,mask] - 273.15
            qv10m = hdf[f'NPP/{prefix}/QV10M_daytime'][:][:,mask]
            ps = hdf[f'NPP/{prefix}/PS_daytime'][:][:,mask]
            drivers = [ # fPAR, Tmin, VPD, PAR, LAI, Tmean, years
                hdf['NPP/MOD15A2H_fPAR_clim_filt'][:][:,mask],
                hdf[f'NPP/{prefix}/Tmin'][:][:,mask]  - 273.15,
                MOD17.vpd(qv10m, ps, tday),
                MOD17.par(hdf[f'NPP/{prefix}/SWGDN'][:][:,mask]),
                hdf['NPP/MOD15A2H_LAI_clim_filt'][:][:,mask],
                hdf[f'NPP/{prefix}/T10M'][:][:,mask] - 273.15,
                np.full(ps.shape, 1) # i.e., A 365-day climatological year ("Year 1")
            ]
            observed_npp = hdf['NPP/NPP_total'][:][mask]
        if cutoff is not None:
            observed_npp[observed_npp > cutoff] = np.nan
        # Set negative VPD to zero
        drivers[2] = np.where(drivers[2] < 0, 0, drivers[2])
        # Convert fPAR from (%) to [0,1] and re-scale LAI; reshape fPAR, LAI
        drivers[0] = np.nanmean(drivers[0], axis = -1) * 0.01
        drivers[4] = np.nanmean(drivers[4], axis = -1) * 0.1
        # TODO Mask out driver data where the are no observations
        for d, _ in enumerate(drivers):
            drivers[d] = drivers[d][:,~np.isnan(observed_npp)]
        observed_npp = observed_npp[~np.isnan(observed_npp)]
        if k_folds > 1:
            # Back-up the original (complete) datasets
            _drivers = [d.copy() for d in drivers]
            _observed_npp = observed_npp.copy()
            # Randomize the indices of the NPP data
            indices = np.arange(0, observed_npp.size)
            np.random.shuffle(indices)
            # Get the starting and ending index of each fold
            fold_idx = np.array([indices.size // k_folds] * k_folds) * np.arange(0, k_folds)
            fold_idx = list(map(list, zip(fold_idx, fold_idx + indices.size // k_folds)))
            # Ensure that the entire dataset is used
            fold_idx[-1][-1] = indices.max()
            idx_test = [indices[start:end] for start, end in fold_idx]
        # Loop over each fold (or the entire dataset, if num. folds == 1)
        for k, fold in enumerate(range(1, k_folds + 1)):
            backend = self.config['optimization']['backend_template'] % ('NPP', pft)
            if k_folds > 1 and fold == 1:
                # Create an HDF5 file with the same name as the (original)
                #   netCDF4 back-end, store the test indices
                with h5py.File(backend.replace('nc4', 'h5'), 'w') as hdf:
                    out = list(idx_test)
                    size = indices.size // k_folds
                    try:
                        out = np.stack(out)
                    except ValueError:
                        size = max((o.size for o in out))
                        for i in range(0, len(out)):
                            out[i] = np.concatenate((out[i], [np.nan] * (size - out[i].size)))
                    hdf.create_dataset(
                        'test_indices', (k_folds, size), np.int32, np.stack(out))
                # Restore the original NPP dataset
                observed_npp = _observed_npp.copy()
                # Set to NaN all the test indices
                idx = idx_test[k]
                observed_npp[idx] = np.nan
                # Same for drivers, after restoring from the original
                drivers = [d.copy()[:,~np.isnan(observed_npp)] for d in _drivers]
                observed_npp = observed_npp[~np.isnan(observed_npp)]
            # Use a different naming scheme for the backend
            if k_folds > 1:
                backend = self.config['optimization']['backend_template'] % (f'NPP-k{fold}', pft)
            print('Initializing sampler...')
            sampler = MOD17StochasticSampler(
                self.config, MOD17._npp, params_dict, backend = backend,
                model_name = 'NPP')
            if plot_trace or ipdb:
                if ipdb:
                    import ipdb
                    ipdb.set_trace()
                trace = sampler.get_trace()
                az.plot_trace(
                    trace, var_names = MOD17.required_parameters[5:])
                pyplot.show()
                return
            # Get (informative) priors for just those parameters that have them
            with open(self.config['optimization']['prior'], 'r') as file:
                prior = json.load(file)
            prior_params = filter(
                lambda p: p in prior.keys(), sampler.required_parameters['NPP'])
            prior = dict([
                (p, prior[p]) for p in prior_params
            ])
            for key in prior.keys():
                # And make sure to subset to the chosen PFT!
                for arg in prior[key].keys():
                    prior[key][arg] = prior[key][arg][pft]
            sampler.run(
                observed_npp, drivers, prior = prior, save_fig = save_fig,
                **kwargs)
if __name__ == '__main__':
    import fire
    with warnings.catch_warnings():
        warnings.simplefilter('ignore')
        fire.Fire(CalibrationAPI)Classes
- class AbstractSampler
- 
Generic algorithm for fitting a model to data based on observed values similar to what we can produce with our model. Not intended to be called directly. Expand source codeclass AbstractSampler(object): ''' Generic algorithm for fitting a model to data based on observed values similar to what we can produce with our model. Not intended to be called directly. ''' def get_posterior(self, thin: int = 1) -> np.ndarray: ''' Returns a stacked posterior array, with optional thinning, combining all the chains together. Parameters ---------- thin : int Returns ------- numpy.ndarray ''' trace = az.from_netcdf(self.backend) return np.stack([ # i.e., get every ith element, each chain trace['posterior'][p].values[:,::thin].ravel() for p in self.required_parameters[self.name] ], axis = -1) def get_trace( self, thin: int = None, burn: int = None ) -> az.data.inference_data.InferenceData: ''' Extracts the trace from the backend data store. Parameters ---------- thin : int Thinning rate burn : int The burn-in (i.e., first N samples to discard) ''' trace = az.from_netcdf(self.backend) if thin is None and burn is None: return trace return trace.sel(draw = slice(burn, None, thin)) def plot_autocorr(self, thin: int = None, burn: int = None, **kwargs): ''' Auto-correlation plot for an MCMC sample. Parameters ---------- thin : int Thinning rate burn : int The burn-in (i.e., first N samples to discard) **kwargs Additional keyword arguments to `arviz.plot_autocorr()`. ''' assert os.path.exists(self.backend),\ 'Could not find file backend!' trace = az.from_netcdf(self.backend) kwargs.setdefault('combined', True) if thin is None: az.plot_autocorr(trace, **kwargs) else: burn = 0 if burn is None else burn az.plot_autocorr( trace.sel(draw = slice(burn, None, thin))['posterior'], **kwargs) pyplot.show() def plot_forest(self, **kwargs): ''' Forest plot for an MCMC sample. In particular: - `hdi_prob`: A float indicating the highest density interval (HDF) to plot ''' assert os.path.exists(self.backend),\ 'Could not find file backend!' trace = az.from_netcdf(self.backend) az.plot_forest(trace, **kwargs) pyplot.show() def plot_pair(self, **kwargs): ''' Paired variables plot for an MCMC sample. Parameters ---------- **kwargs Additional keyword arguments to `arviz.plot_pair()`. ''' assert os.path.exists(self.backend),\ 'Could not find file backend!' trace = az.from_netcdf(self.backend) az.plot_pair(trace, **kwargs) pyplot.show() def plot_partial_score( self, observed: Sequence, drivers: Sequence, fit: dict = None): ''' Plots the "partial effect" of a single parameter: the score of the model at that parameter's current value against a sweep of possible parameter values. All other parameters are held fixed at the best-fit values. Parameters ---------- observed : Sequence Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values drivers : list or tuple Sequence of driver datasets to be supplied, in order, to the model's run function fit : dict or None The best-fit parameter values used for those parameters that are fixed ''' trace = az.from_netcdf(self.backend) if fit is None: # Mean of posterior are "best fit" values fit = trace['posterior'].mean() fit_params = list(filter( lambda p: p in fit, self.required_parameters[self.name])) # The NPP model depends on constants not included in the fit constants = [] if self.name == 'NPP': constants = [ self.params[p] for p in ['LUE_max', 'tmin0', 'tmin1', 'vpd0', 'vpd1'] ] n = len(fit_params) nrow = n ncol = 1 if n > 4: nrow = 2 ncol = n - (n // 2) fig, axes = pyplot.subplots( nrow, ncol, figsize = (n * 2, n), sharey = True) i = 0 for j in range(nrow): for k in range(ncol): if i >= n: break free_param = fit_params[i] fixed = np.stack([ fit[p].values for p in fit_params ])[np.newaxis,:].repeat(30, axis = 0) sweep = np.linspace( trace['posterior'][free_param].min(), trace['posterior'][free_param].max(), num = 30) fixed[:,i] = sweep # Need to concatenate GPP parameters at begining of fixed scores = -np.array(self.score_posterior( observed, drivers, [ [*constants, *f] for f in fixed.tolist() ])) axes[j,k].plot(sweep, scores, 'k-') axes[j,k].axvline( fit[free_param], color = 'red', linestyle = 'dashed', label = 'Posterior Mean') axes[j,k].set_xlabel(free_param) axes[j,k].set_title(free_param) axes[j,k].legend() i += 1 # Delete the last empty subplot if n % 2 != 0: fig.delaxes(axes.flatten()[-1]) axes[0, 0].set_ylabel('Score') pyplot.tight_layout() pyplot.show() def plot_posterior(self, **kwargs): ''' Plots the posterior distribution for an MCMC sample. Parameters ---------- **kwargs Additional keyword arguments to `arviz.plot_posterior()`. ''' assert os.path.exists(self.backend),\ 'Could not find file backend!' trace = az.from_netcdf(self.backend) az.plot_posterior(trace, **kwargs) pyplot.show() def score_posterior( self, observed: Sequence, drivers: Sequence, posterior: Sequence, method: str = 'rmsd') -> Number: ''' Returns a goodness-of-fit score based on the existing calibration. Parameters ---------- observed : Sequence Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values drivers : list or tuple Sequence of driver datasets to be supplied, in order, to the model's run function posterior : list or tuple Sequence of posterior parameter sets (i.e., nested sequence); each nested sequence will be scored method : str The method for generating a goodness-of-git score (Default: "rmsd") Returns ------- float ''' if method != 'rmsd': raise NotImplementedError('"method" must be one of: "rmsd"') score_func = partial( rmsd, func = self.model, observed = observed, drivers = drivers) with get_context('spawn').Pool() as pool: scores = pool.map(score_func, posterior) return scoresSubclassesMethods- def get_posterior(self, thin: int = 1) ‑> numpy.ndarray
- 
Returns a stacked posterior array, with optional thinning, combining all the chains together. Parameters- thin:- int
 Returns- numpy.ndarray
 Expand source codedef get_posterior(self, thin: int = 1) -> np.ndarray: ''' Returns a stacked posterior array, with optional thinning, combining all the chains together. Parameters ---------- thin : int Returns ------- numpy.ndarray ''' trace = az.from_netcdf(self.backend) return np.stack([ # i.e., get every ith element, each chain trace['posterior'][p].values[:,::thin].ravel() for p in self.required_parameters[self.name] ], axis = -1)
- def get_trace(self, thin: int = None, burn: int = None) ‑> arviz.data.inference_data.InferenceData
- 
Extracts the trace from the backend data store. Parameters- thin:- int
- Thinning rate
- burn:- int
- The burn-in (i.e., first N samples to discard)
 Expand source codedef get_trace( self, thin: int = None, burn: int = None ) -> az.data.inference_data.InferenceData: ''' Extracts the trace from the backend data store. Parameters ---------- thin : int Thinning rate burn : int The burn-in (i.e., first N samples to discard) ''' trace = az.from_netcdf(self.backend) if thin is None and burn is None: return trace return trace.sel(draw = slice(burn, None, thin))
- def plot_autocorr(self, thin: int = None, burn: int = None, **kwargs)
- 
Auto-correlation plot for an MCMC sample. Parameters- thin:- int
- Thinning rate
- burn:- int
- The burn-in (i.e., first N samples to discard)
- **kwargs
- Additional keyword arguments to arviz.plot_autocorr().
 Expand source codedef plot_autocorr(self, thin: int = None, burn: int = None, **kwargs): ''' Auto-correlation plot for an MCMC sample. Parameters ---------- thin : int Thinning rate burn : int The burn-in (i.e., first N samples to discard) **kwargs Additional keyword arguments to `arviz.plot_autocorr()`. ''' assert os.path.exists(self.backend),\ 'Could not find file backend!' trace = az.from_netcdf(self.backend) kwargs.setdefault('combined', True) if thin is None: az.plot_autocorr(trace, **kwargs) else: burn = 0 if burn is None else burn az.plot_autocorr( trace.sel(draw = slice(burn, None, thin))['posterior'], **kwargs) pyplot.show()
- def plot_forest(self, **kwargs)
- 
Forest plot for an MCMC sample. In particular: - hdi_prob: A float indicating the highest density interval (HDF) to plot
 Expand source codedef plot_forest(self, **kwargs): ''' Forest plot for an MCMC sample. In particular: - `hdi_prob`: A float indicating the highest density interval (HDF) to plot ''' assert os.path.exists(self.backend),\ 'Could not find file backend!' trace = az.from_netcdf(self.backend) az.plot_forest(trace, **kwargs) pyplot.show()
- def plot_pair(self, **kwargs)
- 
Paired variables plot for an MCMC sample. Parameters- **kwargs
- Additional keyword arguments to arviz.plot_pair().
 Expand source codedef plot_pair(self, **kwargs): ''' Paired variables plot for an MCMC sample. Parameters ---------- **kwargs Additional keyword arguments to `arviz.plot_pair()`. ''' assert os.path.exists(self.backend),\ 'Could not find file backend!' trace = az.from_netcdf(self.backend) az.plot_pair(trace, **kwargs) pyplot.show()
- def plot_partial_score(self, observed: Sequence[+T_co], drivers: Sequence[+T_co], fit: dict = None)
- 
Plots the "partial effect" of a single parameter: the score of the model at that parameter's current value against a sweep of possible parameter values. All other parameters are held fixed at the best-fit values. Parameters- observed:- Sequence
- Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values
- drivers:- listor- tuple
- Sequence of driver datasets to be supplied, in order, to the model's run function
- fit:- dictor- None
- The best-fit parameter values used for those parameters that are fixed
 Expand source codedef plot_partial_score( self, observed: Sequence, drivers: Sequence, fit: dict = None): ''' Plots the "partial effect" of a single parameter: the score of the model at that parameter's current value against a sweep of possible parameter values. All other parameters are held fixed at the best-fit values. Parameters ---------- observed : Sequence Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values drivers : list or tuple Sequence of driver datasets to be supplied, in order, to the model's run function fit : dict or None The best-fit parameter values used for those parameters that are fixed ''' trace = az.from_netcdf(self.backend) if fit is None: # Mean of posterior are "best fit" values fit = trace['posterior'].mean() fit_params = list(filter( lambda p: p in fit, self.required_parameters[self.name])) # The NPP model depends on constants not included in the fit constants = [] if self.name == 'NPP': constants = [ self.params[p] for p in ['LUE_max', 'tmin0', 'tmin1', 'vpd0', 'vpd1'] ] n = len(fit_params) nrow = n ncol = 1 if n > 4: nrow = 2 ncol = n - (n // 2) fig, axes = pyplot.subplots( nrow, ncol, figsize = (n * 2, n), sharey = True) i = 0 for j in range(nrow): for k in range(ncol): if i >= n: break free_param = fit_params[i] fixed = np.stack([ fit[p].values for p in fit_params ])[np.newaxis,:].repeat(30, axis = 0) sweep = np.linspace( trace['posterior'][free_param].min(), trace['posterior'][free_param].max(), num = 30) fixed[:,i] = sweep # Need to concatenate GPP parameters at begining of fixed scores = -np.array(self.score_posterior( observed, drivers, [ [*constants, *f] for f in fixed.tolist() ])) axes[j,k].plot(sweep, scores, 'k-') axes[j,k].axvline( fit[free_param], color = 'red', linestyle = 'dashed', label = 'Posterior Mean') axes[j,k].set_xlabel(free_param) axes[j,k].set_title(free_param) axes[j,k].legend() i += 1 # Delete the last empty subplot if n % 2 != 0: fig.delaxes(axes.flatten()[-1]) axes[0, 0].set_ylabel('Score') pyplot.tight_layout() pyplot.show()
- def plot_posterior(self, **kwargs)
- 
Plots the posterior distribution for an MCMC sample. Parameters- **kwargs
- Additional keyword arguments to arviz.plot_posterior().
 Expand source codedef plot_posterior(self, **kwargs): ''' Plots the posterior distribution for an MCMC sample. Parameters ---------- **kwargs Additional keyword arguments to `arviz.plot_posterior()`. ''' assert os.path.exists(self.backend),\ 'Could not find file backend!' trace = az.from_netcdf(self.backend) az.plot_posterior(trace, **kwargs) pyplot.show()
- def score_posterior(self, observed: Sequence[+T_co], drivers: Sequence[+T_co], posterior: Sequence[+T_co], method: str = 'rmsd') ‑> numbers.Number
- 
Returns a goodness-of-fit score based on the existing calibration. Parameters- observed:- Sequence
- Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values
- drivers:- listor- tuple
- Sequence of driver datasets to be supplied, in order, to the model's run function
- posterior:- listor- tuple
- Sequence of posterior parameter sets (i.e., nested sequence); each nested sequence will be scored
- method:- str
- The method for generating a goodness-of-git score (Default: "rmsd")
 Returns- float
 Expand source codedef score_posterior( self, observed: Sequence, drivers: Sequence, posterior: Sequence, method: str = 'rmsd') -> Number: ''' Returns a goodness-of-fit score based on the existing calibration. Parameters ---------- observed : Sequence Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values drivers : list or tuple Sequence of driver datasets to be supplied, in order, to the model's run function posterior : list or tuple Sequence of posterior parameter sets (i.e., nested sequence); each nested sequence will be scored method : str The method for generating a goodness-of-git score (Default: "rmsd") Returns ------- float ''' if method != 'rmsd': raise NotImplementedError('"method" must be one of: "rmsd"') score_func = partial( rmsd, func = self.model, observed = observed, drivers = drivers) with get_context('spawn').Pool() as pool: scores = pool.map(score_func, posterior) return scores
 
- class BlackBoxLikelihood (model: Callable, observed: Sequence[+T_co], x: Sequence[+T_co] = None, weights: Sequence[+T_co] = None, objective: str = 'rmsd')
- 
A custom Theano operator that calculates the "likelihood" of model parameters; it takes a vector of values (the parameters that define our model) and returns a single "scalar" value (the log-likelihood). Parameters- model:- Callable
- An arbitrary "black box" function that takes two arguments: the model parameters ("params") and the forcing data ("x")
- observed:- numpy.ndarray
- The "observed" data that our log-likelihood function takes in
- x:- numpy.ndarrayor- None
- The forcing data (input drivers) that our model requires, or None if no driver data are required
- weights:- Sequenceor- None
- Optional sequence of weights applied to the model residuals (as in weighted least squares)
- objective:- str
- Name of the objective (or "loss") function to use, one of ('rmsd', 'gaussian', 'kge'); defaults to "rmsd"
 Initialise the Op with various things that our log-likelihood function requires. The observed data ("observed") and drivers ("x") must be stored on the instance so the Theano Op can work seamlessly. Expand source codeclass BlackBoxLikelihood(at.Op): ''' A custom Theano operator that calculates the "likelihood" of model parameters; it takes a vector of values (the parameters that define our model) and returns a single "scalar" value (the log-likelihood). Parameters ---------- model : Callable An arbitrary "black box" function that takes two arguments: the model parameters ("params") and the forcing data ("x") observed : numpy.ndarray The "observed" data that our log-likelihood function takes in x : numpy.ndarray or None The forcing data (input drivers) that our model requires, or None if no driver data are required weights : Sequence or None Optional sequence of weights applied to the model residuals (as in weighted least squares) objective : str Name of the objective (or "loss") function to use, one of ('rmsd', 'gaussian', 'kge'); defaults to "rmsd" ''' itypes = [at.dvector] # Expects a vector of parameter values when called otypes = [at.dscalar] # Outputs a single scalar value (the log likelihood) def __init__( self, model: Callable, observed: Sequence, x: Sequence = None, weights: Sequence = None, objective: str = 'rmsd'): ''' Initialise the Op with various things that our log-likelihood function requires. The observed data ("observed") and drivers ("x") must be stored on the instance so the Theano Op can work seamlessly. ''' self.model = model self.observed = observed self.x = x self.weights = weights if objective in ('rmsd', 'rmse'): self._loglik = self.loglik elif objective == 'gaussian': self._loglik = self.loglik_gaussian elif objective == 'kge': self._loglik = self.loglik_kge else: raise ValueError('Unknown "objective" function specified') def loglik( self, params: Sequence, observed: Sequence, x: Sequence = None) -> Number: ''' Pseudo-log likelihood, based on the root-mean squared deviation (RMSD). The sign of the RMSD is forced to be negative so as to allow for maximization of this objective function. Parameters ---------- params : Sequence One or more model parameters observed : Sequence The observed values x : Sequence or None Input driver data Returns ------- Number The (negative) root-mean squared deviation (RMSD) between the predicted and observed values ''' predicted = self.model(params, *x) if self.weights is not None: return -np.sqrt( np.nanmean(((predicted - observed) * self.weights) ** 2)) return -np.sqrt(np.nanmean(((predicted - observed)) ** 2)) def loglik_gaussian( self, params: Sequence, observed: Sequence, x: Sequence = None) -> Number: ''' Gaussian log-likelihood, assuming independent, identically distributed observations. Parameters ---------- params : Sequence One or more model parameters observed : Sequence The observed values x : Sequence or None Input driver data Returns ------- Number The (negative) log-likelihood ''' predicted = self.model(params, *x) sigma = params[-1] # Gaussian log-likelihood; # -\frac{N}{2}\,\mathrm{log}(2\pi\hat{\sigma}^2) # - \frac{1}{2\hat{\sigma}^2} \sum (\hat{y} - y)^2 return -0.5 * np.log(2 * np.pi * sigma**2) - (0.5 / sigma**2) *\ np.nansum((predicted - observed)**2) def loglik_kge( self, params: Sequence, observed: Sequence, x: Sequence = None) -> Number: r''' Kling-Gupta efficiency. $$ KGE = 1 - \sqrt{(r - 1)^2 + (\alpha - 1)^2 + (\beta - 1)^2} $$ Parameters ---------- params : Sequence One or more model parameters observed : Sequence The observed values x : Sequence or None Input driver data Returns ------- Number The Kling-Gupta efficiency ''' predicted = self.model(params, *x) r = np.corrcoef(predicted, observed)[0, 1] alpha = np.std(predicted) / np.std(observed) beta = np.sum(predicted) / np.sum(observed) return 1 - np.sqrt((r - 1)**2 + (alpha - 1)**2 + (beta - 1)**2) def perform(self, node, inputs, outputs): ''' The method that is used when calling the Op. Parameters ---------- node inputs : Sequence outputs : Sequence ''' (params,) = inputs logl = self._loglik(params, self.observed, self.x) outputs[0][0] = np.array(logl) # Output the log-likelihoodAncestors- aesara.graph.op.Op
- aesara.graph.utils.MetaObject
 Class variables- var default_output : Optional[int]
- var destroy_map : Dict[int, List[int]]
- var itypes : Optional[Sequence[Type]]
- var otypes : Optional[Sequence[Type]]
- var params_type : Optional[aesara.link.c.params_type.ParamsType]
- var view_map : Dict[int, List[int]]
 Methods- def loglik(self, params: Sequence[+T_co], observed: Sequence[+T_co], x: Sequence[+T_co] = None) ‑> numbers.Number
- 
Pseudo-log likelihood, based on the root-mean squared deviation (RMSD). The sign of the RMSD is forced to be negative so as to allow for maximization of this objective function. Parameters- params:- Sequence
- One or more model parameters
- observed:- Sequence
- The observed values
- x:- Sequenceor- None
- Input driver data
 Returns- Number
- The (negative) root-mean squared deviation (RMSD) between the predicted and observed values
 Expand source codedef loglik( self, params: Sequence, observed: Sequence, x: Sequence = None) -> Number: ''' Pseudo-log likelihood, based on the root-mean squared deviation (RMSD). The sign of the RMSD is forced to be negative so as to allow for maximization of this objective function. Parameters ---------- params : Sequence One or more model parameters observed : Sequence The observed values x : Sequence or None Input driver data Returns ------- Number The (negative) root-mean squared deviation (RMSD) between the predicted and observed values ''' predicted = self.model(params, *x) if self.weights is not None: return -np.sqrt( np.nanmean(((predicted - observed) * self.weights) ** 2)) return -np.sqrt(np.nanmean(((predicted - observed)) ** 2))
- def loglik_gaussian(self, params: Sequence[+T_co], observed: Sequence[+T_co], x: Sequence[+T_co] = None) ‑> numbers.Number
- 
Gaussian log-likelihood, assuming independent, identically distributed observations. Parameters- params:- Sequence
- One or more model parameters
- observed:- Sequence
- The observed values
- x:- Sequenceor- None
- Input driver data
 Returns- Number
- The (negative) log-likelihood
 Expand source codedef loglik_gaussian( self, params: Sequence, observed: Sequence, x: Sequence = None) -> Number: ''' Gaussian log-likelihood, assuming independent, identically distributed observations. Parameters ---------- params : Sequence One or more model parameters observed : Sequence The observed values x : Sequence or None Input driver data Returns ------- Number The (negative) log-likelihood ''' predicted = self.model(params, *x) sigma = params[-1] # Gaussian log-likelihood; # -\frac{N}{2}\,\mathrm{log}(2\pi\hat{\sigma}^2) # - \frac{1}{2\hat{\sigma}^2} \sum (\hat{y} - y)^2 return -0.5 * np.log(2 * np.pi * sigma**2) - (0.5 / sigma**2) *\ np.nansum((predicted - observed)**2)
- def loglik_kge(self, params: Sequence[+T_co], observed: Sequence[+T_co], x: Sequence[+T_co] = None) ‑> numbers.Number
- 
Kling-Gupta efficiency. KGE = 1 - \sqrt{(r - 1)^2 + (\alpha - 1)^2 + (\beta - 1)^2} Parameters- params:- Sequence
- One or more model parameters
- observed:- Sequence
- The observed values
- x:- Sequenceor- None
- Input driver data
 Returns- Number
- The Kling-Gupta efficiency
 Expand source codedef loglik_kge( self, params: Sequence, observed: Sequence, x: Sequence = None) -> Number: r''' Kling-Gupta efficiency. $$ KGE = 1 - \sqrt{(r - 1)^2 + (\alpha - 1)^2 + (\beta - 1)^2} $$ Parameters ---------- params : Sequence One or more model parameters observed : Sequence The observed values x : Sequence or None Input driver data Returns ------- Number The Kling-Gupta efficiency ''' predicted = self.model(params, *x) r = np.corrcoef(predicted, observed)[0, 1] alpha = np.std(predicted) / np.std(observed) beta = np.sum(predicted) / np.sum(observed) return 1 - np.sqrt((r - 1)**2 + (alpha - 1)**2 + (beta - 1)**2)
- def perform(self, node, inputs, outputs)
- 
The method that is used when calling the Op. Parameters- node
- inputs:- Sequence
- outputs:- Sequence
 Expand source codedef perform(self, node, inputs, outputs): ''' The method that is used when calling the Op. Parameters ---------- node inputs : Sequence outputs : Sequence ''' (params,) = inputs logl = self._loglik(params, self.observed, self.x) outputs[0][0] = np.array(logl) # Output the log-likelihood
 
- class CalibrationAPI (config=None)
- 
Convenience class for calibrating the MOD17 GPP and NPP models. Meant to be used with fire.Fire().Expand source codeclass CalibrationAPI(object): ''' Convenience class for calibrating the MOD17 GPP and NPP models. Meant to be used with `fire.Fire()`. ''' def __init__(self, config = None): config_file = config if config_file is None: config_file = os.path.join( MOD17_DIR, 'data/MOD17_calibration_config.json') with open(config_file, 'r') as file: self.config = json.load(file) self.hdf5 = self.config['data']['file'] def _clean(self, raw: Sequence, drivers: Sequence, protocol: str = 'GPP'): 'Cleans up data values according to a prescribed protocol' if protocol == 'GPP': # Filter out observed GPP values when GPP is negative or when # APAR < 0.1 g C m-2 day-1 apar = drivers['fPAR'] * drivers['PAR'] return np.where( apar < 0.1, np.nan, np.where(raw < 0, np.nan, raw)) def _filter(self, raw: Sequence, size: int): 'Apply a smoothing filter with zero phase offset' if size > 1: window = np.ones(size) / size return np.apply_along_axis( lambda x: signal.filtfilt(window, np.ones(1), x), 0, raw) return raw # Or, revert to the raw data def clean_observed( self, raw: Sequence, drivers: Sequence, driver_labels: Sequence, protocol: str = 'GPP', filter_length: int = 2) -> Sequence: ''' Cleans observed tower flux data according to a prescribed protocol. - For GPP data: Removes observations where GPP < 0 or where APAR is < 0.1 MJ m-2 day-1 Parameters ---------- raw : Sequence drivers : Sequence driver_labels : Sequence protocol : str filter_length : int The window size for the smoothing filter, applied to the observed data Returns ------- Sequence ''' if protocol != 'GPP': raise NotImplementedError('"protocol" must be one of: "GPP"') # Read in the observed data and apply smoothing filter obs = self._filter(raw, filter_length) obs = self._clean(obs, dict([ (driver_labels[i], data) for i, data in enumerate(drivers) ]), protocol = 'GPP') return obs def export_bplut( self, model: str, output_path: str, thin: int = 10, burn: int = 1000): ''' Export the BPLUT using the posterior mean from the MCMC sampler. NOTE: The posterior mean is usually not the best estimate for poorly identified parameters. Parameters ---------- model : str The name of the model ("GPP" or "NPP") output_path : str The output CSV file path thin : int Thinning rate burn : int The burn-in (i.e., first N samples to discard) ''' params_dict = restore_bplut(self.config['BPLUT'][model]) bplut = params_dict.copy() # Filter the parameters to just those for the PFT of interest for pft in PFT_VALID: backend = self.config['optimization']['backend_template'] %\ (model, pft) params = dict([(k, v[pft]) for k, v in params_dict.items()]) sampler = MOD17StochasticSampler( self.config, getattr(MOD17, '_%s' % model.lower()), params, backend = backend) trace = sampler.get_trace() fit = trace.sel( draw = slice(burn, None, thin))['posterior'].mean() for each in MOD17.required_parameters: try: bplut[each][pft] = float(fit[each]) except KeyError: continue write_bplut(bplut, output_path) def export_posterior( self, model: str, param: str, output_path: str, thin: int = 10, burn: int = 1000, k_folds: int = 1): ''' Exports posterior distribution for a parameter, for each PFT to HDF5. Parameters ---------- model : str The name of the model ("GPP" or "NPP") param : str The model parameter to export output_path : str The output HDF5 file path thin : int Thinning rate burn : int The burn-in (i.e., first N samples to discard) k_folds : int The number of k-folds used in cross-calibration/validation; if more than one (default), the folds for each PFT will be combined into a single HDF5 file ''' params_dict = restore_bplut(self.config['BPLUT'][model]) bplut = params_dict.copy() # Filter the parameters to just those for the PFT of interest post = [] for pft in PFT_VALID: params = dict([(k, v[pft]) for k, v in params_dict.items()]) backend = self.config['optimization']['backend_template'] %\ (model, pft) post_by_fold = [] for fold in range(1, k_folds + 1): if k_folds > 1: backend = self.config['optimization']['backend_template'] %\ (f'{model}-k{fold}', pft) sampler = MOD17StochasticSampler( self.config, getattr(MOD17, '_%s' % model.lower()), params, backend = backend) trace = sampler.get_trace() fit = trace.sel(draw = slice(burn, None, thin))['posterior'] num_samples = fit.sizes['chain'] * fit.sizes['draw'] if param in fit: post_by_fold.append( az.extract_dataset(fit, combined = True)[param].values) else: # In case there is, e.g., a parameter that takes on a # constant value for a specific PFT if k_folds > 1: post_by_fold.append(np.ones((1, num_samples)) * np.nan) else: post_by_fold.append(np.ones(num_samples) * np.nan) if k_folds > 1: post.append(np.vstack(post_by_fold)) else: post.extend(post_by_fold) # If not every PFT's posterior has the same number of samples (e.g., # when one set of chains was run longer than another)... if not all([p.shape == post[0].shape for p in post]): max_len = max([p.shape for p in post])[0] # ...Reshape all posteriors to match the greatest sample size import ipdb ipdb.set_trace()#FIXME post = [ np.pad( p.astype(np.float32), (0, max_len - p.size), mode = 'constant', constant_values = (np.nan,)) for p in post ] with h5py.File(output_path, 'a') as hdf: post = np.stack(post) ts = datetime.date.today().strftime('%Y-%m-%d') # Today's date dataset = hdf.create_dataset( f'{param}_posterior', post.shape, np.float32, post) dataset.attrs['description'] = 'CalibrationAPI.export_posterior() on {ts}' def export_likely_posterior( self, model: str, param: str, output_path: str, thin: int = 10, burn: int = 1000, ptile: int = 95): ''' Exports posterior distribution for a parameter based on likelihood Parameters ---------- model : str The name of the model ("GPP" or "NPP") param : str The model parameter to export output_path : str The output HDF5 file path thin : int Thinning rate burn : int The burn-in (i.e., first N samples to discard) ptile : int The percentile cutoff for likelihood; only samples at or above this likelihood cutoff will be included ''' params_dict = restore_bplut(self.config['BPLUT'][model]) bplut = params_dict.copy() # Filter the parameters to just those for the PFT of interest post = [] likelihood = [] for pft in PFT_VALID: backend = self.config['optimization']['backend_template'] % (model, pft) params = dict([(k, v[pft]) for k, v in params_dict.items()]) sampler = MOD17StochasticSampler( self.config, getattr(MOD17, '_%s' % model.lower()), params, backend = backend) trace = sampler.get_trace() fit = trace.sel(draw = slice(burn, None, thin)) # Find the likelihood value associated with the cutoff percentile ll = az.extract_dataset( fit, combined = True)['log_likelihood'].values values = az.extract_dataset(fit, combined = True)[param].values cutoff = np.percentile(ll, ptile) post.append(values[ll >= cutoff]) likelihood.append(ll[ll >= cutoff]) with h5py.File(output_path, 'a') as hdf: n = max([len(p) for p in post]) # Make sure all arrays are the same size post = np.stack([ np.concatenate((p, np.full((n - len(p),), np.nan))) for p in post ]) likelihood = np.stack([ np.concatenate((p, np.full((n - len(p),), np.nan))) for p in likelihood ]) ts = datetime.date.today().strftime('%Y-%m-%d') # Today's date dataset = hdf.create_dataset( f'{param}_posterior', post.shape, np.float32, post) dataset.attrs['description'] = 'CalibrationAPI.export_likely_posterior() on {ts}' dataset = hdf.create_dataset( f'{param}_likelihood', likelihood.shape, np.float32, likelihood) def tune_gpp( self, pft: int, plot_trace: bool = False, ipdb: bool = False, save_fig: bool = False, **kwargs): ''' Run the MOD17 GPP calibration. Parameters ---------- pft : int The Plant Functional Type (PFT) to calibrate plot_trace : bool True to plot the trace for a previous calibration run; this will also NOT start a new calibration (Default: False) ipdb : bool True to drop the user into an ipdb prompt, prior to and instead of running calibration save_fig : bool True to save figures to files instead of showing them (Default: False) **kwargs Additional keyword arguments passed to `MOD17StochasticSampler.run()` ''' assert pft in PFT_VALID, f'Invalid PFT: {pft}' # Pass configuration parameters to MOD17StochasticSampler.run() for key in ('chains', 'draws', 'tune', 'scaling'): if key in self.config['optimization'].keys() and not key in kwargs.keys(): kwargs[key] = self.config['optimization'][key] # Filter the parameters to just those for the PFT of interest params_dict = restore_bplut(self.config['BPLUT']['GPP']) # Load blacklisted sites (if any) blacklist = self.config['data']['sites_blacklisted'] params_dict = dict([(k, v[pft]) for k, v in params_dict.items()]) model = MOD17(params_dict) objective = self.config['optimization']['objective'].lower() print('Loading driver datasets...') with h5py.File(self.hdf5, 'r') as hdf: sites = hdf['FLUXNET/site_id'][:] if hasattr(sites[0], 'decode'): sites = list(map(lambda x: x.decode('utf-8'), sites)) # Get dominant PFT pft_map = pft_dominant(hdf['state/PFT'][:], site_list = sites) # Blacklist various sites pft_mask = np.logical_and(pft_map == pft, ~np.in1d(sites, blacklist)) # NOTE: Converting from Kelvin to Celsius tday = hdf['MERRA2/T10M_daytime'][:][:,pft_mask] - 273.15 qv10m = hdf['MERRA2/QV10M_daytime'][:][:,pft_mask] ps = hdf['MERRA2/PS_daytime'][:][:,pft_mask] drivers = [ # fPAR, Tmin, VPD, PAR hdf['MODIS/MOD15A2HGF_fPAR_interp'][:][:,pft_mask], hdf['MERRA2/Tmin'][:][:,pft_mask] - 273.15, MOD17.vpd(qv10m, ps, tday), MOD17.par(hdf['MERRA2/SWGDN'][:][:,pft_mask]), ] # Convert fPAR from (%) to [0,1] drivers[0] = np.nanmean(drivers[0], axis = -1) / 100 # If RMSE is used, then we want to pay attention to weighting weights = None if objective in ('rmsd', 'rmse'): weights = hdf['weights'][pft_mask][np.newaxis,:]\ .repeat(tday.shape[0], axis = 0) # Check that driver data do not contain NaNs for d, each in enumerate(drivers): name = ('fPAR', 'Tmin', 'VPD', 'PAR')[d] assert not np.isnan(each).any(),\ f'Driver dataset "{name}" contains NaNs' tower_gpp = hdf['FLUXNET/GPP'][:][:,pft_mask] # Read the validation mask; mask out observations that are # reserved for validation print('Masking out validation data...') mask = hdf['FLUXNET/validation_mask'][pft] tower_gpp[mask] = np.nan # Clean observations, then mask out driver data where the are no # observations tower_gpp = self.clean_observed( tower_gpp, drivers, MOD17StochasticSampler.required_drivers['GPP'], protocol = 'GPP') if weights is not None: weights = weights[~np.isnan(tower_gpp)] for d, _ in enumerate(drivers): drivers[d] = drivers[d][~np.isnan(tower_gpp)] tower_gpp = tower_gpp[~np.isnan(tower_gpp)] print('Initializing sampler...') backend = self.config['optimization']['backend_template'] % ('GPP', pft) sampler = MOD17StochasticSampler( self.config, MOD17._gpp, params_dict, backend = backend, weights = weights) if plot_trace or ipdb: if ipdb: import ipdb ipdb.set_trace() trace = sampler.get_trace() az.plot_trace(trace, var_names = MOD17.required_parameters[0:5]) pyplot.show() return # Get (informative) priors for just those parameters that have them with open(self.config['optimization']['prior'], 'r') as file: prior = json.load(file) prior_params = filter( lambda p: p in prior.keys(), sampler.required_parameters['GPP']) prior = dict([ (p, {'mu': prior[p]['mu'][pft], 'sigma': prior[p]['sigma'][pft]}) for p in prior_params ]) sampler.run( tower_gpp, drivers, prior = prior, save_fig = save_fig, **kwargs) def tune_npp( self, pft: int, plot_trace: bool = False, ipdb: bool = False, save_fig: bool = False, climatology = False, cutoff: Number = 2385, k_folds: int = 1, **kwargs): r''' Run the MOD17 NPP calibration. If k-folds cross-validation is used, the model is calibrated on $k$ random subsets of the data and a series of file is created, e.g., as: MOD17_NPP_calibration_PFT1.h5 MOD17_NPP-k1_calibration_PFT1.nc4 MOD17_NPP-k2_calibration_PFT1.nc4 ... Where each `.nc4` file is a standard `arviz` backend and the `.h5` indicates which indices from the NPP observations vector, after removing NaNs, were excluded (i.e., the indices of the test data). Parameters ---------- pft : int The Plant Functional Type (PFT) to calibrate plot_trace : bool True to display the trace plot ONLY and not run calibration (Default: False) ipdb : bool True to drop into an interactive Python debugger (`ipdb`) after loading an existing trace (Default: False) save_fig : bool True to save the post-calibration trace plot to a file instead of displaying it (Default: False) climatology : bool True to use a MERRA-2 climatology (and look for it in the drivers file), i.e., use `MERRA2_climatology` group instead of `surface_met_MERRA2` group (Default: False) cutoff : Number Maximum value of observed NPP (g C m-2 year-1); values above this cutoff will be discarded and not used in calibration (Default: 2385) k_folds : int Number of folds to use in k-folds cross-validation; defaults to k=1, i.e., no cross-validation is performed. **kwargs Additional keyword arguments passed to `MOD17StochasticSampler.run()` ''' assert pft in PFT_VALID, f'Invalid PFT: {pft}' prefix = 'MERRA2_climatology' if climatology else 'surface_met_MERRA2' params_dict = restore_bplut(self.config['BPLUT']['NPP']) # Filter the parameters to just those for the PFT of interest params_dict = dict([(k, v[pft]) for k, v in params_dict.items()]) model = MOD17(params_dict) kwargs.update({'var_names': [ '~LUE_max', '~tmin0', '~tmin1', '~vpd0', '~vpd1', '~log_likelihood' ]}) # Pass configuration parameters to MOD17StochasticSampler.run() for key in ('chains', 'draws', 'tune', 'scaling'): if key in self.config['optimization'].keys(): kwargs[key] = self.config['optimization'][key] print('Loading driver datasets...') with h5py.File(self.hdf5, 'r') as hdf: # NOTE: This is only recorded at the site-level; no need to # determine modal PFT across subgrid pft_map = hdf['NPP/PFT'][:] # Leave out sites where there is no fPAR (and no LAI) data fpar = hdf['NPP/MOD15A2H_fPAR_clim_filt'][:] mask = np.logical_and( pft_map == pft, ~np.isnan(np.nanmean(fpar, axis = -1))\ .all(axis = 0)) # NOTE: Converting from Kelvin to Celsius tday = hdf[f'NPP/{prefix}/T10M_daytime'][:][:,mask] - 273.15 qv10m = hdf[f'NPP/{prefix}/QV10M_daytime'][:][:,mask] ps = hdf[f'NPP/{prefix}/PS_daytime'][:][:,mask] drivers = [ # fPAR, Tmin, VPD, PAR, LAI, Tmean, years hdf['NPP/MOD15A2H_fPAR_clim_filt'][:][:,mask], hdf[f'NPP/{prefix}/Tmin'][:][:,mask] - 273.15, MOD17.vpd(qv10m, ps, tday), MOD17.par(hdf[f'NPP/{prefix}/SWGDN'][:][:,mask]), hdf['NPP/MOD15A2H_LAI_clim_filt'][:][:,mask], hdf[f'NPP/{prefix}/T10M'][:][:,mask] - 273.15, np.full(ps.shape, 1) # i.e., A 365-day climatological year ("Year 1") ] observed_npp = hdf['NPP/NPP_total'][:][mask] if cutoff is not None: observed_npp[observed_npp > cutoff] = np.nan # Set negative VPD to zero drivers[2] = np.where(drivers[2] < 0, 0, drivers[2]) # Convert fPAR from (%) to [0,1] and re-scale LAI; reshape fPAR, LAI drivers[0] = np.nanmean(drivers[0], axis = -1) * 0.01 drivers[4] = np.nanmean(drivers[4], axis = -1) * 0.1 # TODO Mask out driver data where the are no observations for d, _ in enumerate(drivers): drivers[d] = drivers[d][:,~np.isnan(observed_npp)] observed_npp = observed_npp[~np.isnan(observed_npp)] if k_folds > 1: # Back-up the original (complete) datasets _drivers = [d.copy() for d in drivers] _observed_npp = observed_npp.copy() # Randomize the indices of the NPP data indices = np.arange(0, observed_npp.size) np.random.shuffle(indices) # Get the starting and ending index of each fold fold_idx = np.array([indices.size // k_folds] * k_folds) * np.arange(0, k_folds) fold_idx = list(map(list, zip(fold_idx, fold_idx + indices.size // k_folds))) # Ensure that the entire dataset is used fold_idx[-1][-1] = indices.max() idx_test = [indices[start:end] for start, end in fold_idx] # Loop over each fold (or the entire dataset, if num. folds == 1) for k, fold in enumerate(range(1, k_folds + 1)): backend = self.config['optimization']['backend_template'] % ('NPP', pft) if k_folds > 1 and fold == 1: # Create an HDF5 file with the same name as the (original) # netCDF4 back-end, store the test indices with h5py.File(backend.replace('nc4', 'h5'), 'w') as hdf: out = list(idx_test) size = indices.size // k_folds try: out = np.stack(out) except ValueError: size = max((o.size for o in out)) for i in range(0, len(out)): out[i] = np.concatenate((out[i], [np.nan] * (size - out[i].size))) hdf.create_dataset( 'test_indices', (k_folds, size), np.int32, np.stack(out)) # Restore the original NPP dataset observed_npp = _observed_npp.copy() # Set to NaN all the test indices idx = idx_test[k] observed_npp[idx] = np.nan # Same for drivers, after restoring from the original drivers = [d.copy()[:,~np.isnan(observed_npp)] for d in _drivers] observed_npp = observed_npp[~np.isnan(observed_npp)] # Use a different naming scheme for the backend if k_folds > 1: backend = self.config['optimization']['backend_template'] % (f'NPP-k{fold}', pft) print('Initializing sampler...') sampler = MOD17StochasticSampler( self.config, MOD17._npp, params_dict, backend = backend, model_name = 'NPP') if plot_trace or ipdb: if ipdb: import ipdb ipdb.set_trace() trace = sampler.get_trace() az.plot_trace( trace, var_names = MOD17.required_parameters[5:]) pyplot.show() return # Get (informative) priors for just those parameters that have them with open(self.config['optimization']['prior'], 'r') as file: prior = json.load(file) prior_params = filter( lambda p: p in prior.keys(), sampler.required_parameters['NPP']) prior = dict([ (p, prior[p]) for p in prior_params ]) for key in prior.keys(): # And make sure to subset to the chosen PFT! for arg in prior[key].keys(): prior[key][arg] = prior[key][arg][pft] sampler.run( observed_npp, drivers, prior = prior, save_fig = save_fig, **kwargs)SubclassesMethods- def clean_observed(self, raw: Sequence[+T_co], drivers: Sequence[+T_co], driver_labels: Sequence[+T_co], protocol: str = 'GPP', filter_length: int = 2) ‑> Sequence[+T_co]
- 
Cleans observed tower flux data according to a prescribed protocol. - For GPP data: Removes observations where GPP < 0 or where APAR is < 0.1 MJ m-2 day-1
 Parameters- raw:- Sequence
- drivers:- Sequence
- driver_labels:- Sequence
- protocol:- str
- filter_length:- int
- The window size for the smoothing filter, applied to the observed data
 Returns- Sequence
 Expand source codedef clean_observed( self, raw: Sequence, drivers: Sequence, driver_labels: Sequence, protocol: str = 'GPP', filter_length: int = 2) -> Sequence: ''' Cleans observed tower flux data according to a prescribed protocol. - For GPP data: Removes observations where GPP < 0 or where APAR is < 0.1 MJ m-2 day-1 Parameters ---------- raw : Sequence drivers : Sequence driver_labels : Sequence protocol : str filter_length : int The window size for the smoothing filter, applied to the observed data Returns ------- Sequence ''' if protocol != 'GPP': raise NotImplementedError('"protocol" must be one of: "GPP"') # Read in the observed data and apply smoothing filter obs = self._filter(raw, filter_length) obs = self._clean(obs, dict([ (driver_labels[i], data) for i, data in enumerate(drivers) ]), protocol = 'GPP') return obs
- def export_bplut(self, model: str, output_path: str, thin: int = 10, burn: int = 1000)
- 
Export the BPLUT using the posterior mean from the MCMC sampler. NOTE: The posterior mean is usually not the best estimate for poorly identified parameters. Parameters- model:- str
- The name of the model ("GPP" or "NPP")
- output_path:- str
- The output CSV file path
- thin:- int
- Thinning rate
- burn:- int
- The burn-in (i.e., first N samples to discard)
 Expand source codedef export_bplut( self, model: str, output_path: str, thin: int = 10, burn: int = 1000): ''' Export the BPLUT using the posterior mean from the MCMC sampler. NOTE: The posterior mean is usually not the best estimate for poorly identified parameters. Parameters ---------- model : str The name of the model ("GPP" or "NPP") output_path : str The output CSV file path thin : int Thinning rate burn : int The burn-in (i.e., first N samples to discard) ''' params_dict = restore_bplut(self.config['BPLUT'][model]) bplut = params_dict.copy() # Filter the parameters to just those for the PFT of interest for pft in PFT_VALID: backend = self.config['optimization']['backend_template'] %\ (model, pft) params = dict([(k, v[pft]) for k, v in params_dict.items()]) sampler = MOD17StochasticSampler( self.config, getattr(MOD17, '_%s' % model.lower()), params, backend = backend) trace = sampler.get_trace() fit = trace.sel( draw = slice(burn, None, thin))['posterior'].mean() for each in MOD17.required_parameters: try: bplut[each][pft] = float(fit[each]) except KeyError: continue write_bplut(bplut, output_path)
- def export_likely_posterior(self, model: str, param: str, output_path: str, thin: int = 10, burn: int = 1000, ptile: int = 95)
- 
Exports posterior distribution for a parameter based on likelihood Parameters- model:- str
- The name of the model ("GPP" or "NPP")
- param:- str
- The model parameter to export
- output_path:- str
- The output HDF5 file path
- thin:- int
- Thinning rate
- burn:- int
- The burn-in (i.e., first N samples to discard)
- ptile:- int
- The percentile cutoff for likelihood; only samples at or above this likelihood cutoff will be included
 Expand source codedef export_likely_posterior( self, model: str, param: str, output_path: str, thin: int = 10, burn: int = 1000, ptile: int = 95): ''' Exports posterior distribution for a parameter based on likelihood Parameters ---------- model : str The name of the model ("GPP" or "NPP") param : str The model parameter to export output_path : str The output HDF5 file path thin : int Thinning rate burn : int The burn-in (i.e., first N samples to discard) ptile : int The percentile cutoff for likelihood; only samples at or above this likelihood cutoff will be included ''' params_dict = restore_bplut(self.config['BPLUT'][model]) bplut = params_dict.copy() # Filter the parameters to just those for the PFT of interest post = [] likelihood = [] for pft in PFT_VALID: backend = self.config['optimization']['backend_template'] % (model, pft) params = dict([(k, v[pft]) for k, v in params_dict.items()]) sampler = MOD17StochasticSampler( self.config, getattr(MOD17, '_%s' % model.lower()), params, backend = backend) trace = sampler.get_trace() fit = trace.sel(draw = slice(burn, None, thin)) # Find the likelihood value associated with the cutoff percentile ll = az.extract_dataset( fit, combined = True)['log_likelihood'].values values = az.extract_dataset(fit, combined = True)[param].values cutoff = np.percentile(ll, ptile) post.append(values[ll >= cutoff]) likelihood.append(ll[ll >= cutoff]) with h5py.File(output_path, 'a') as hdf: n = max([len(p) for p in post]) # Make sure all arrays are the same size post = np.stack([ np.concatenate((p, np.full((n - len(p),), np.nan))) for p in post ]) likelihood = np.stack([ np.concatenate((p, np.full((n - len(p),), np.nan))) for p in likelihood ]) ts = datetime.date.today().strftime('%Y-%m-%d') # Today's date dataset = hdf.create_dataset( f'{param}_posterior', post.shape, np.float32, post) dataset.attrs['description'] = 'CalibrationAPI.export_likely_posterior() on {ts}' dataset = hdf.create_dataset( f'{param}_likelihood', likelihood.shape, np.float32, likelihood)
- def export_posterior(self, model: str, param: str, output_path: str, thin: int = 10, burn: int = 1000, k_folds: int = 1)
- 
Exports posterior distribution for a parameter, for each PFT to HDF5. Parameters- model:- str
- The name of the model ("GPP" or "NPP")
- param:- str
- The model parameter to export
- output_path:- str
- The output HDF5 file path
- thin:- int
- Thinning rate
- burn:- int
- The burn-in (i.e., first N samples to discard)
- k_folds:- int
- The number of k-folds used in cross-calibration/validation; if more than one (default), the folds for each PFT will be combined into a single HDF5 file
 Expand source codedef export_posterior( self, model: str, param: str, output_path: str, thin: int = 10, burn: int = 1000, k_folds: int = 1): ''' Exports posterior distribution for a parameter, for each PFT to HDF5. Parameters ---------- model : str The name of the model ("GPP" or "NPP") param : str The model parameter to export output_path : str The output HDF5 file path thin : int Thinning rate burn : int The burn-in (i.e., first N samples to discard) k_folds : int The number of k-folds used in cross-calibration/validation; if more than one (default), the folds for each PFT will be combined into a single HDF5 file ''' params_dict = restore_bplut(self.config['BPLUT'][model]) bplut = params_dict.copy() # Filter the parameters to just those for the PFT of interest post = [] for pft in PFT_VALID: params = dict([(k, v[pft]) for k, v in params_dict.items()]) backend = self.config['optimization']['backend_template'] %\ (model, pft) post_by_fold = [] for fold in range(1, k_folds + 1): if k_folds > 1: backend = self.config['optimization']['backend_template'] %\ (f'{model}-k{fold}', pft) sampler = MOD17StochasticSampler( self.config, getattr(MOD17, '_%s' % model.lower()), params, backend = backend) trace = sampler.get_trace() fit = trace.sel(draw = slice(burn, None, thin))['posterior'] num_samples = fit.sizes['chain'] * fit.sizes['draw'] if param in fit: post_by_fold.append( az.extract_dataset(fit, combined = True)[param].values) else: # In case there is, e.g., a parameter that takes on a # constant value for a specific PFT if k_folds > 1: post_by_fold.append(np.ones((1, num_samples)) * np.nan) else: post_by_fold.append(np.ones(num_samples) * np.nan) if k_folds > 1: post.append(np.vstack(post_by_fold)) else: post.extend(post_by_fold) # If not every PFT's posterior has the same number of samples (e.g., # when one set of chains was run longer than another)... if not all([p.shape == post[0].shape for p in post]): max_len = max([p.shape for p in post])[0] # ...Reshape all posteriors to match the greatest sample size import ipdb ipdb.set_trace()#FIXME post = [ np.pad( p.astype(np.float32), (0, max_len - p.size), mode = 'constant', constant_values = (np.nan,)) for p in post ] with h5py.File(output_path, 'a') as hdf: post = np.stack(post) ts = datetime.date.today().strftime('%Y-%m-%d') # Today's date dataset = hdf.create_dataset( f'{param}_posterior', post.shape, np.float32, post) dataset.attrs['description'] = 'CalibrationAPI.export_posterior() on {ts}'
- def tune_gpp(self, pft: int, plot_trace: bool = False, ipdb: bool = False, save_fig: bool = False, **kwargs)
- 
Run the MOD17 GPP calibration. Parameters- pft:- int
- The Plant Functional Type (PFT) to calibrate
- plot_trace:- bool
- True to plot the trace for a previous calibration run; this will also NOT start a new calibration (Default: False)
- ipdb:- bool
- True to drop the user into an ipdb prompt, prior to and instead of running calibration
- save_fig:- bool
- True to save figures to files instead of showing them (Default: False)
- **kwargs
- Additional keyword arguments passed to
StochasticSampler.run()
 Expand source codedef tune_gpp( self, pft: int, plot_trace: bool = False, ipdb: bool = False, save_fig: bool = False, **kwargs): ''' Run the MOD17 GPP calibration. Parameters ---------- pft : int The Plant Functional Type (PFT) to calibrate plot_trace : bool True to plot the trace for a previous calibration run; this will also NOT start a new calibration (Default: False) ipdb : bool True to drop the user into an ipdb prompt, prior to and instead of running calibration save_fig : bool True to save figures to files instead of showing them (Default: False) **kwargs Additional keyword arguments passed to `MOD17StochasticSampler.run()` ''' assert pft in PFT_VALID, f'Invalid PFT: {pft}' # Pass configuration parameters to MOD17StochasticSampler.run() for key in ('chains', 'draws', 'tune', 'scaling'): if key in self.config['optimization'].keys() and not key in kwargs.keys(): kwargs[key] = self.config['optimization'][key] # Filter the parameters to just those for the PFT of interest params_dict = restore_bplut(self.config['BPLUT']['GPP']) # Load blacklisted sites (if any) blacklist = self.config['data']['sites_blacklisted'] params_dict = dict([(k, v[pft]) for k, v in params_dict.items()]) model = MOD17(params_dict) objective = self.config['optimization']['objective'].lower() print('Loading driver datasets...') with h5py.File(self.hdf5, 'r') as hdf: sites = hdf['FLUXNET/site_id'][:] if hasattr(sites[0], 'decode'): sites = list(map(lambda x: x.decode('utf-8'), sites)) # Get dominant PFT pft_map = pft_dominant(hdf['state/PFT'][:], site_list = sites) # Blacklist various sites pft_mask = np.logical_and(pft_map == pft, ~np.in1d(sites, blacklist)) # NOTE: Converting from Kelvin to Celsius tday = hdf['MERRA2/T10M_daytime'][:][:,pft_mask] - 273.15 qv10m = hdf['MERRA2/QV10M_daytime'][:][:,pft_mask] ps = hdf['MERRA2/PS_daytime'][:][:,pft_mask] drivers = [ # fPAR, Tmin, VPD, PAR hdf['MODIS/MOD15A2HGF_fPAR_interp'][:][:,pft_mask], hdf['MERRA2/Tmin'][:][:,pft_mask] - 273.15, MOD17.vpd(qv10m, ps, tday), MOD17.par(hdf['MERRA2/SWGDN'][:][:,pft_mask]), ] # Convert fPAR from (%) to [0,1] drivers[0] = np.nanmean(drivers[0], axis = -1) / 100 # If RMSE is used, then we want to pay attention to weighting weights = None if objective in ('rmsd', 'rmse'): weights = hdf['weights'][pft_mask][np.newaxis,:]\ .repeat(tday.shape[0], axis = 0) # Check that driver data do not contain NaNs for d, each in enumerate(drivers): name = ('fPAR', 'Tmin', 'VPD', 'PAR')[d] assert not np.isnan(each).any(),\ f'Driver dataset "{name}" contains NaNs' tower_gpp = hdf['FLUXNET/GPP'][:][:,pft_mask] # Read the validation mask; mask out observations that are # reserved for validation print('Masking out validation data...') mask = hdf['FLUXNET/validation_mask'][pft] tower_gpp[mask] = np.nan # Clean observations, then mask out driver data where the are no # observations tower_gpp = self.clean_observed( tower_gpp, drivers, MOD17StochasticSampler.required_drivers['GPP'], protocol = 'GPP') if weights is not None: weights = weights[~np.isnan(tower_gpp)] for d, _ in enumerate(drivers): drivers[d] = drivers[d][~np.isnan(tower_gpp)] tower_gpp = tower_gpp[~np.isnan(tower_gpp)] print('Initializing sampler...') backend = self.config['optimization']['backend_template'] % ('GPP', pft) sampler = MOD17StochasticSampler( self.config, MOD17._gpp, params_dict, backend = backend, weights = weights) if plot_trace or ipdb: if ipdb: import ipdb ipdb.set_trace() trace = sampler.get_trace() az.plot_trace(trace, var_names = MOD17.required_parameters[0:5]) pyplot.show() return # Get (informative) priors for just those parameters that have them with open(self.config['optimization']['prior'], 'r') as file: prior = json.load(file) prior_params = filter( lambda p: p in prior.keys(), sampler.required_parameters['GPP']) prior = dict([ (p, {'mu': prior[p]['mu'][pft], 'sigma': prior[p]['sigma'][pft]}) for p in prior_params ]) sampler.run( tower_gpp, drivers, prior = prior, save_fig = save_fig, **kwargs)
- def tune_npp(self, pft: int, plot_trace: bool = False, ipdb: bool = False, save_fig: bool = False, climatology=False, cutoff: numbers.Number = 2385, k_folds: int = 1, **kwargs)
- 
Run the MOD17 NPP calibration. If k-folds cross-validation is used, the model is calibrated on $k$ random subsets of the data and a series of file is created, e.g., as: MOD17_NPP_calibration_PFT1.h5 MOD17_NPP-k1_calibration_PFT1.nc4 MOD17_NPP-k2_calibration_PFT1.nc4 ...Where each .nc4file is a standardarvizbackend and the.h5indicates which indices from the NPP observations vector, after removing NaNs, were excluded (i.e., the indices of the test data).Parameters- pft:- int
- The Plant Functional Type (PFT) to calibrate
- plot_trace:- bool
- True to display the trace plot ONLY and not run calibration (Default: False)
- ipdb:- bool
- True to drop into an interactive Python debugger (ipdb) after loading an existing trace (Default: False)
- save_fig:- bool
- True to save the post-calibration trace plot to a file instead of displaying it (Default: False)
- climatology:- bool
- True to use a MERRA-2 climatology (and look for it in the drivers
file), i.e., use MERRA2_climatologygroup instead ofsurface_met_MERRA2group (Default: False)
- cutoff:- Number
- Maximum value of observed NPP (g C m-2 year-1); values above this cutoff will be discarded and not used in calibration (Default: 2385)
- k_folds:- int
- Number of folds to use in k-folds cross-validation; defaults to k=1, i.e., no cross-validation is performed.
- **kwargs
- Additional keyword arguments passed to
StochasticSampler.run()
 Expand source codedef tune_npp( self, pft: int, plot_trace: bool = False, ipdb: bool = False, save_fig: bool = False, climatology = False, cutoff: Number = 2385, k_folds: int = 1, **kwargs): r''' Run the MOD17 NPP calibration. If k-folds cross-validation is used, the model is calibrated on $k$ random subsets of the data and a series of file is created, e.g., as: MOD17_NPP_calibration_PFT1.h5 MOD17_NPP-k1_calibration_PFT1.nc4 MOD17_NPP-k2_calibration_PFT1.nc4 ... Where each `.nc4` file is a standard `arviz` backend and the `.h5` indicates which indices from the NPP observations vector, after removing NaNs, were excluded (i.e., the indices of the test data). Parameters ---------- pft : int The Plant Functional Type (PFT) to calibrate plot_trace : bool True to display the trace plot ONLY and not run calibration (Default: False) ipdb : bool True to drop into an interactive Python debugger (`ipdb`) after loading an existing trace (Default: False) save_fig : bool True to save the post-calibration trace plot to a file instead of displaying it (Default: False) climatology : bool True to use a MERRA-2 climatology (and look for it in the drivers file), i.e., use `MERRA2_climatology` group instead of `surface_met_MERRA2` group (Default: False) cutoff : Number Maximum value of observed NPP (g C m-2 year-1); values above this cutoff will be discarded and not used in calibration (Default: 2385) k_folds : int Number of folds to use in k-folds cross-validation; defaults to k=1, i.e., no cross-validation is performed. **kwargs Additional keyword arguments passed to `MOD17StochasticSampler.run()` ''' assert pft in PFT_VALID, f'Invalid PFT: {pft}' prefix = 'MERRA2_climatology' if climatology else 'surface_met_MERRA2' params_dict = restore_bplut(self.config['BPLUT']['NPP']) # Filter the parameters to just those for the PFT of interest params_dict = dict([(k, v[pft]) for k, v in params_dict.items()]) model = MOD17(params_dict) kwargs.update({'var_names': [ '~LUE_max', '~tmin0', '~tmin1', '~vpd0', '~vpd1', '~log_likelihood' ]}) # Pass configuration parameters to MOD17StochasticSampler.run() for key in ('chains', 'draws', 'tune', 'scaling'): if key in self.config['optimization'].keys(): kwargs[key] = self.config['optimization'][key] print('Loading driver datasets...') with h5py.File(self.hdf5, 'r') as hdf: # NOTE: This is only recorded at the site-level; no need to # determine modal PFT across subgrid pft_map = hdf['NPP/PFT'][:] # Leave out sites where there is no fPAR (and no LAI) data fpar = hdf['NPP/MOD15A2H_fPAR_clim_filt'][:] mask = np.logical_and( pft_map == pft, ~np.isnan(np.nanmean(fpar, axis = -1))\ .all(axis = 0)) # NOTE: Converting from Kelvin to Celsius tday = hdf[f'NPP/{prefix}/T10M_daytime'][:][:,mask] - 273.15 qv10m = hdf[f'NPP/{prefix}/QV10M_daytime'][:][:,mask] ps = hdf[f'NPP/{prefix}/PS_daytime'][:][:,mask] drivers = [ # fPAR, Tmin, VPD, PAR, LAI, Tmean, years hdf['NPP/MOD15A2H_fPAR_clim_filt'][:][:,mask], hdf[f'NPP/{prefix}/Tmin'][:][:,mask] - 273.15, MOD17.vpd(qv10m, ps, tday), MOD17.par(hdf[f'NPP/{prefix}/SWGDN'][:][:,mask]), hdf['NPP/MOD15A2H_LAI_clim_filt'][:][:,mask], hdf[f'NPP/{prefix}/T10M'][:][:,mask] - 273.15, np.full(ps.shape, 1) # i.e., A 365-day climatological year ("Year 1") ] observed_npp = hdf['NPP/NPP_total'][:][mask] if cutoff is not None: observed_npp[observed_npp > cutoff] = np.nan # Set negative VPD to zero drivers[2] = np.where(drivers[2] < 0, 0, drivers[2]) # Convert fPAR from (%) to [0,1] and re-scale LAI; reshape fPAR, LAI drivers[0] = np.nanmean(drivers[0], axis = -1) * 0.01 drivers[4] = np.nanmean(drivers[4], axis = -1) * 0.1 # TODO Mask out driver data where the are no observations for d, _ in enumerate(drivers): drivers[d] = drivers[d][:,~np.isnan(observed_npp)] observed_npp = observed_npp[~np.isnan(observed_npp)] if k_folds > 1: # Back-up the original (complete) datasets _drivers = [d.copy() for d in drivers] _observed_npp = observed_npp.copy() # Randomize the indices of the NPP data indices = np.arange(0, observed_npp.size) np.random.shuffle(indices) # Get the starting and ending index of each fold fold_idx = np.array([indices.size // k_folds] * k_folds) * np.arange(0, k_folds) fold_idx = list(map(list, zip(fold_idx, fold_idx + indices.size // k_folds))) # Ensure that the entire dataset is used fold_idx[-1][-1] = indices.max() idx_test = [indices[start:end] for start, end in fold_idx] # Loop over each fold (or the entire dataset, if num. folds == 1) for k, fold in enumerate(range(1, k_folds + 1)): backend = self.config['optimization']['backend_template'] % ('NPP', pft) if k_folds > 1 and fold == 1: # Create an HDF5 file with the same name as the (original) # netCDF4 back-end, store the test indices with h5py.File(backend.replace('nc4', 'h5'), 'w') as hdf: out = list(idx_test) size = indices.size // k_folds try: out = np.stack(out) except ValueError: size = max((o.size for o in out)) for i in range(0, len(out)): out[i] = np.concatenate((out[i], [np.nan] * (size - out[i].size))) hdf.create_dataset( 'test_indices', (k_folds, size), np.int32, np.stack(out)) # Restore the original NPP dataset observed_npp = _observed_npp.copy() # Set to NaN all the test indices idx = idx_test[k] observed_npp[idx] = np.nan # Same for drivers, after restoring from the original drivers = [d.copy()[:,~np.isnan(observed_npp)] for d in _drivers] observed_npp = observed_npp[~np.isnan(observed_npp)] # Use a different naming scheme for the backend if k_folds > 1: backend = self.config['optimization']['backend_template'] % (f'NPP-k{fold}', pft) print('Initializing sampler...') sampler = MOD17StochasticSampler( self.config, MOD17._npp, params_dict, backend = backend, model_name = 'NPP') if plot_trace or ipdb: if ipdb: import ipdb ipdb.set_trace() trace = sampler.get_trace() az.plot_trace( trace, var_names = MOD17.required_parameters[5:]) pyplot.show() return # Get (informative) priors for just those parameters that have them with open(self.config['optimization']['prior'], 'r') as file: prior = json.load(file) prior_params = filter( lambda p: p in prior.keys(), sampler.required_parameters['NPP']) prior = dict([ (p, prior[p]) for p in prior_params ]) for key in prior.keys(): # And make sure to subset to the chosen PFT! for arg in prior[key].keys(): prior[key][arg] = prior[key][arg][pft] sampler.run( observed_npp, drivers, prior = prior, save_fig = save_fig, **kwargs)
 
- class MOD17StochasticSampler (config: dict, model: Callable, params_dict: dict = None, backend: str = None, weights: Sequence[+T_co] = None, model_name: str = None)
- 
A Markov Chain-Monte Carlo (MCMC) sampler for MOD17. The specific sampler used is the Differential Evolution (DE) MCMC algorithm described by Ter Braak (2008), though the implementation is specific to the PyMC3 library. Considerations: - Tower GPP is censored when values are < 0 or when APAR is < 0.1 MJ m-2 d-1.
 Parameters- config:- dict
- Dictionary of configuration parameters
- model:- Callable
- The function to call (with driver data and parameters); this function
should take driver data as positional arguments and the model
parameters as a *Sequence; it should require no external state.
- observed:- Sequence
- Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values
- params_dict:- dictor- None
- Dictionary of model parameters, to be used as initial values and as the basis for constructing a new dictionary of optimized parameters
- backend:- stror- None
- Path to a NetCDF4 file backend (Default: None)
- weights:- Sequenceor- None
- Optional sequence of weights applied to the model residuals (as in weighted least squares)
 Expand source codeclass MOD17StochasticSampler(StochasticSampler): ''' A Markov Chain-Monte Carlo (MCMC) sampler for MOD17. The specific sampler used is the Differential Evolution (DE) MCMC algorithm described by Ter Braak (2008), though the implementation is specific to the PyMC3 library. Considerations: 1. Tower GPP is censored when values are < 0 or when APAR is < 0.1 MJ m-2 d-1. Parameters ---------- config : dict Dictionary of configuration parameters model : Callable The function to call (with driver data and parameters); this function should take driver data as positional arguments and the model parameters as a `*Sequence`; it should require no external state. observed : Sequence Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values params_dict : dict or None Dictionary of model parameters, to be used as initial values and as the basis for constructing a new dictionary of optimized parameters backend : str or None Path to a NetCDF4 file backend (Default: None) weights : Sequence or None Optional sequence of weights applied to the model residuals (as in weighted least squares) ''' # NOTE: This is different than for mod17.MOD17 because we haven't yet # figured out how the respiration terms are calculated required_parameters = { 'GPP': ['LUE_max', 'tmin0', 'tmin1', 'vpd0', 'vpd1'], 'NPP': MOD17.required_parameters } required_drivers = { 'GPP': ['fPAR', 'Tmin', 'VPD', 'PAR'], 'NPP': ['fPAR', 'Tmin', 'VPD', 'PAR', 'LAI', 'Tmean', 'years'] } def compile_gpp_model( self, observed: Sequence, drivers: Sequence) -> pm.Model: ''' Creates a new GPP model based on the prior distribution. Model can be re-compiled multiple times, e.g., for cross validation. Parameters ---------- observed : Sequence Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values drivers : list or tuple Sequence of driver datasets to be supplied, in order, to the model's run function Returns ------- pm.Model ''' # Define the objective/ likelihood function log_likelihood = BlackBoxLikelihood( self.model, observed, x = drivers, weights = self.weights, objective = self.config['optimization']['objective'].lower()) # With this context manager, "all PyMC3 objects introduced in the indented # code block...are added to the model behind the scenes." with pm.Model() as model: # (Stochstic) Priors for unknown model parameters LUE_max = pm.TruncatedNormal('LUE_max', **self.prior['LUE_max'], **self.bounds['LUE_max']) # NOTE: tmin0, vpd0 are fixed at Collection 6.1 values tmin0 = self.params['tmin0'] tmin1 = pm.Uniform('tmin1', **self.bounds['tmin1']) # NOTE: Upper bound on `vpd1` is set by the maximum observed VPD vpd0 = self.params['vpd0'] vpd1 = pm.Uniform('vpd1', lower = self.bounds['vpd1']['lower'], upper = drivers[2].max().round(0)) # Convert model parameters to a tensor vector params_list = [LUE_max, tmin0, tmin1, vpd0, vpd1] params = at.as_tensor_variable(params_list) # Key step: Define the log-likelihood as an added potential pm.Potential('likelihood', log_likelihood(params)) return model def compile_npp_model( self, observed: Sequence, drivers: Sequence) -> pm.Model: ''' Creates a new NPP model based on the prior distribution. Model can be re-compiled multiple times, e.g., for cross validation. Parameters ---------- observed : Sequence Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values drivers : list or tuple Sequence of driver datasets to be supplied, in order, to the model's run function Returns ------- pm.Model ''' # Define the objective/ likelihood function log_likelihood = BlackBoxLikelihood( self.model, observed, x = drivers, weights = self.weights, objective = self.config['optimization']['objective'].lower()) # With this context manager, "all PyMC3 objects introduced in the indented # code block...are added to the model behind the scenes." with pm.Model() as model: # Setting GPP parameters that are known--EXCEPT tmin1 LUE_max = self.params['LUE_max'] tmin0 = self.params['tmin0'] tmin1 = self.params['tmin1'] vpd0 = self.params['vpd0'] vpd1 = self.params['vpd1'] # SLA fixed at prior mean SLA = np.exp(self.prior['SLA']['mu']) # Allometry ratios prescribe narrow range around Collection 6.1 values froot_leaf_ratio = pm.Triangular( 'froot_leaf_ratio', **self.prior['froot_leaf_ratio']) # (Stochstic) Priors for unknown model parameters Q10_froot = pm.TruncatedNormal( 'Q10_froot', **self.prior['Q10_froot'], **self.bounds['Q10']) leaf_mr_base = pm.LogNormal( 'leaf_mr_base', **self.prior['leaf_mr_base']) froot_mr_base = pm.LogNormal( 'froot_mr_base', **self.prior['froot_mr_base']) # For GRS and CRO, livewood mass and respiration are zero if np.equal(list(self.prior['livewood_mr_base'].values()), [0, 0]).all(): livewood_leaf_ratio = 0 livewood_mr_base = 0 Q10_livewood = 0 else: livewood_leaf_ratio = pm.Triangular( 'livewood_leaf_ratio', **self.prior['livewood_leaf_ratio']) livewood_mr_base = pm.LogNormal( 'livewood_mr_base', **self.prior['livewood_mr_base']) Q10_livewood = pm.TruncatedNormal( 'Q10_livewood', **self.prior['Q10_livewood'], **self.bounds['Q10']) # Convert model parameters to a tensor vector params_list = [ LUE_max, tmin0, tmin1, vpd0, vpd1, SLA, Q10_livewood, Q10_froot, froot_leaf_ratio, livewood_leaf_ratio, leaf_mr_base, froot_mr_base, livewood_mr_base ] params = at.as_tensor_variable(params_list) # Key step: Define the log-likelihood as an added potential pm.Potential('likelihood', log_likelihood(params)) return modelAncestorsSubclassesClass variables- var required_drivers
- var required_parameters
 Methods- def compile_gpp_model(self, observed: Sequence[+T_co], drivers: Sequence[+T_co]) ‑> pymc.model.Model
- 
Creates a new GPP model based on the prior distribution. Model can be re-compiled multiple times, e.g., for cross validation. Parameters- observed:- Sequence
- Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values
- drivers:- listor- tuple
- Sequence of driver datasets to be supplied, in order, to the model's run function
 Returns- pm.Model
 Expand source codedef compile_gpp_model( self, observed: Sequence, drivers: Sequence) -> pm.Model: ''' Creates a new GPP model based on the prior distribution. Model can be re-compiled multiple times, e.g., for cross validation. Parameters ---------- observed : Sequence Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values drivers : list or tuple Sequence of driver datasets to be supplied, in order, to the model's run function Returns ------- pm.Model ''' # Define the objective/ likelihood function log_likelihood = BlackBoxLikelihood( self.model, observed, x = drivers, weights = self.weights, objective = self.config['optimization']['objective'].lower()) # With this context manager, "all PyMC3 objects introduced in the indented # code block...are added to the model behind the scenes." with pm.Model() as model: # (Stochstic) Priors for unknown model parameters LUE_max = pm.TruncatedNormal('LUE_max', **self.prior['LUE_max'], **self.bounds['LUE_max']) # NOTE: tmin0, vpd0 are fixed at Collection 6.1 values tmin0 = self.params['tmin0'] tmin1 = pm.Uniform('tmin1', **self.bounds['tmin1']) # NOTE: Upper bound on `vpd1` is set by the maximum observed VPD vpd0 = self.params['vpd0'] vpd1 = pm.Uniform('vpd1', lower = self.bounds['vpd1']['lower'], upper = drivers[2].max().round(0)) # Convert model parameters to a tensor vector params_list = [LUE_max, tmin0, tmin1, vpd0, vpd1] params = at.as_tensor_variable(params_list) # Key step: Define the log-likelihood as an added potential pm.Potential('likelihood', log_likelihood(params)) return model
- def compile_npp_model(self, observed: Sequence[+T_co], drivers: Sequence[+T_co]) ‑> pymc.model.Model
- 
Creates a new NPP model based on the prior distribution. Model can be re-compiled multiple times, e.g., for cross validation. Parameters- observed:- Sequence
- Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values
- drivers:- listor- tuple
- Sequence of driver datasets to be supplied, in order, to the model's run function
 Returns- pm.Model
 Expand source codedef compile_npp_model( self, observed: Sequence, drivers: Sequence) -> pm.Model: ''' Creates a new NPP model based on the prior distribution. Model can be re-compiled multiple times, e.g., for cross validation. Parameters ---------- observed : Sequence Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values drivers : list or tuple Sequence of driver datasets to be supplied, in order, to the model's run function Returns ------- pm.Model ''' # Define the objective/ likelihood function log_likelihood = BlackBoxLikelihood( self.model, observed, x = drivers, weights = self.weights, objective = self.config['optimization']['objective'].lower()) # With this context manager, "all PyMC3 objects introduced in the indented # code block...are added to the model behind the scenes." with pm.Model() as model: # Setting GPP parameters that are known--EXCEPT tmin1 LUE_max = self.params['LUE_max'] tmin0 = self.params['tmin0'] tmin1 = self.params['tmin1'] vpd0 = self.params['vpd0'] vpd1 = self.params['vpd1'] # SLA fixed at prior mean SLA = np.exp(self.prior['SLA']['mu']) # Allometry ratios prescribe narrow range around Collection 6.1 values froot_leaf_ratio = pm.Triangular( 'froot_leaf_ratio', **self.prior['froot_leaf_ratio']) # (Stochstic) Priors for unknown model parameters Q10_froot = pm.TruncatedNormal( 'Q10_froot', **self.prior['Q10_froot'], **self.bounds['Q10']) leaf_mr_base = pm.LogNormal( 'leaf_mr_base', **self.prior['leaf_mr_base']) froot_mr_base = pm.LogNormal( 'froot_mr_base', **self.prior['froot_mr_base']) # For GRS and CRO, livewood mass and respiration are zero if np.equal(list(self.prior['livewood_mr_base'].values()), [0, 0]).all(): livewood_leaf_ratio = 0 livewood_mr_base = 0 Q10_livewood = 0 else: livewood_leaf_ratio = pm.Triangular( 'livewood_leaf_ratio', **self.prior['livewood_leaf_ratio']) livewood_mr_base = pm.LogNormal( 'livewood_mr_base', **self.prior['livewood_mr_base']) Q10_livewood = pm.TruncatedNormal( 'Q10_livewood', **self.prior['Q10_livewood'], **self.bounds['Q10']) # Convert model parameters to a tensor vector params_list = [ LUE_max, tmin0, tmin1, vpd0, vpd1, SLA, Q10_livewood, Q10_froot, froot_leaf_ratio, livewood_leaf_ratio, leaf_mr_base, froot_mr_base, livewood_mr_base ] params = at.as_tensor_variable(params_list) # Key step: Define the log-likelihood as an added potential pm.Potential('likelihood', log_likelihood(params)) return model
 Inherited members
- class StochasticSampler (config: dict, model: Callable, params_dict: dict = None, backend: str = None, weights: Sequence[+T_co] = None, model_name: str = None)
- 
A Markov Chain-Monte Carlo (MCMC) sampler for an arbitrary model. The specific sampler used is the Differential Evolution (DE) MCMC algorithm described by Ter Braak (2008), though the implementation is specific to the PyMC3 library. NOTE: The model(a function) should be named "_name" where "name" is some uniquely identifiable model name. This helpsStochasticSampler.run()to find the correct compiler for the model. The compiler function should be namedcompiled_name_model()(where "name" is the model name) and be defined on a subclass ofStochasticSampler.Parameters- config:- dict
- Dictionary of configuration parameters
- model:- Callable
- The function to call (with driver data and parameters); this function
should take driver data as positional arguments and the model
parameters as a *Sequence; it should require no external state.
- observed:- Sequence
- Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values
- params_dict:- dictor- None
- Dictionary of model parameters, to be used as initial values and as the basis for constructing a new dictionary of optimized parameters
- backend:- stror- None
- Path to a NetCDF4 file backend (Default: None)
- weights:- Sequenceor- None
- Optional sequence of weights applied to the model residuals (as in weighted least squares)
 Expand source codeclass StochasticSampler(AbstractSampler): ''' A Markov Chain-Monte Carlo (MCMC) sampler for an arbitrary model. The specific sampler used is the Differential Evolution (DE) MCMC algorithm described by Ter Braak (2008), though the implementation is specific to the PyMC3 library. NOTE: The `model` (a function) should be named "_name" where "name" is some uniquely identifiable model name. This helps `StochasticSampler.run()` to find the correct compiler for the model. The compiler function should be named `compiled_name_model()` (where "name" is the model name) and be defined on a subclass of `StochasticSampler`. Parameters ---------- config : dict Dictionary of configuration parameters model : Callable The function to call (with driver data and parameters); this function should take driver data as positional arguments and the model parameters as a `*Sequence`; it should require no external state. observed : Sequence Sequence of observed values that will be used to calibrate the model; i.e., model is scored by how close its predicted values are to the observed values params_dict : dict or None Dictionary of model parameters, to be used as initial values and as the basis for constructing a new dictionary of optimized parameters backend : str or None Path to a NetCDF4 file backend (Default: None) weights : Sequence or None Optional sequence of weights applied to the model residuals (as in weighted least squares) ''' def __init__( self, config: dict, model: Callable, params_dict: dict = None, backend: str = None, weights: Sequence = None, model_name: str = None): self.backend = backend # Convert the BOUNDS into nested dicts for easy use self.bounds = dict([ (key, dict([('lower', b[0]), ('upper', b[1])])) for key, b in config['optimization']['bounds'].items() ]) self.config = config self.model = model if hasattr(model, '__name__'): self.name = model.__name__.strip('_').upper() # "_gpp" = "GPP" else: self.name = model_name self.params = params_dict # Set the model's prior distribution assumptions self.prior = dict() for key in self.required_parameters[self.name]: # NOTE: This is the default only for LUE_max; other parameters, # with Uniform distributions, don't use anything here self.prior.setdefault(key, { 'mu': params_dict[key], 'sigma': 2e-4 }) self.weights = weights assert os.path.exists(os.path.dirname(backend)) def run( self, observed: Sequence, drivers: Sequence, draws = 1000, chains = 3, tune = 'lambda', scaling: float = 1e-3, prior: dict = dict(), check_shape: bool = False, save_fig: bool = False, var_names: Sequence = None) -> None: ''' Fits the model using DE-MCMCz approach. `tune="lambda"` (default) is recommended; lambda is related to the scale of the jumps learned from other chains, but epsilon ("scaling") controls the scale directly. Using a larger value for `scaling` (Default: 1e-3) will produce larger jumps and may directly address "sticky" chains. Parameters ---------- observed : Sequence The observed data the model will be calibrated against drivers : list or tuple Sequence of driver datasets to be supplied, in order, to the model's run function draws : int Number of samples to draw (on each chain); defaults to 1000 chains : int Number of chains; defaults to 3 tune : str or None Which hyperparameter to tune: Defaults to 'lambda', but can also be 'scaling' or None. scaling : float Initial scale factor for epsilon (Default: 1e-3) prior : dict check_shape : bool True to require that input driver datasets have the same shape as the observed values (Default: False) save_fig : bool True to save figures to files instead of showing them (Default: False) var_names : Sequence One or more variable names to show in the plot ''' assert not check_shape or drivers[0].shape == observed.shape,\ 'Driver data should have the same shape as the "observed" data' assert len(drivers) == len(self.required_drivers[self.name]),\ 'Did not receive expected number of driver datasets!' assert tune in ('lambda', 'scaling') or tune is None # Update prior assumptions self.prior.update(prior) # Generate an initial goodness-of-fit score predicted = self.model([ self.params[p] for p in self.required_parameters[self.name] ], *drivers) if self.weights is not None: score = np.sqrt( np.nanmean(((predicted - observed) * self.weights) ** 2)) else: score = np.sqrt(np.nanmean(((predicted - observed)) ** 2)) print('-- RMSD at the initial point: %.3f' % score) print('Compiling model...') try: compiler = getattr(self, 'compile_%s_model' % self.name.lower()) except AttributeError: raise AttributeError('''Could not find a compiler for model named "%s"; make sure that a function "compile_%s_model()" is defined on this class''' % (model_name, model_name)) with compiler(observed, drivers) as model: with warnings.catch_warnings(): warnings.simplefilter('ignore') step_func = pm.DEMetropolisZ(tune = tune, scaling = scaling) trace = pm.sample( draws = draws, step = step_func, cores = chains, chains = chains, idata_kwargs = {'log_likelihood': True}) if self.backend is not None: print('Writing results to file...') trace.to_netcdf(self.backend) if var_names is None: az.plot_trace(trace, var_names = ['~log_likelihood']) else: az.plot_trace(trace, var_names = var_names) if save_fig: pyplot.savefig('.'.join(self.backend.split('.')[:-1]) + '.png') else: pyplot.show()AncestorsSubclassesMethods- def run(self, observed: Sequence[+T_co], drivers: Sequence[+T_co], draws=1000, chains=3, tune='lambda', scaling: float = 0.001, prior: dict = {}, check_shape: bool = False, save_fig: bool = False, var_names: Sequence[+T_co] = None) ‑> None
- 
Fits the model using DE-MCMCz approach. tune="lambda"(default) is recommended; lambda is related to the scale of the jumps learned from other chains, but epsilon ("scaling") controls the scale directly. Using a larger value forscaling(Default: 1e-3) will produce larger jumps and may directly address "sticky" chains.Parameters- observed:- Sequence
- The observed data the model will be calibrated against
- drivers:- listor- tuple
- Sequence of driver datasets to be supplied, in order, to the model's run function
- draws:- int
- Number of samples to draw (on each chain); defaults to 1000
- chains:- int
- Number of chains; defaults to 3
- tune:- stror- None
- Which hyperparameter to tune: Defaults to 'lambda', but can also be 'scaling' or None.
- scaling:- float
- Initial scale factor for epsilon (Default: 1e-3)
- prior:- dict
- check_shape:- bool
- True to require that input driver datasets have the same shape as the observed values (Default: False)
- save_fig:- bool
- True to save figures to files instead of showing them (Default: False)
- var_names:- Sequence
- One or more variable names to show in the plot
 Expand source codedef run( self, observed: Sequence, drivers: Sequence, draws = 1000, chains = 3, tune = 'lambda', scaling: float = 1e-3, prior: dict = dict(), check_shape: bool = False, save_fig: bool = False, var_names: Sequence = None) -> None: ''' Fits the model using DE-MCMCz approach. `tune="lambda"` (default) is recommended; lambda is related to the scale of the jumps learned from other chains, but epsilon ("scaling") controls the scale directly. Using a larger value for `scaling` (Default: 1e-3) will produce larger jumps and may directly address "sticky" chains. Parameters ---------- observed : Sequence The observed data the model will be calibrated against drivers : list or tuple Sequence of driver datasets to be supplied, in order, to the model's run function draws : int Number of samples to draw (on each chain); defaults to 1000 chains : int Number of chains; defaults to 3 tune : str or None Which hyperparameter to tune: Defaults to 'lambda', but can also be 'scaling' or None. scaling : float Initial scale factor for epsilon (Default: 1e-3) prior : dict check_shape : bool True to require that input driver datasets have the same shape as the observed values (Default: False) save_fig : bool True to save figures to files instead of showing them (Default: False) var_names : Sequence One or more variable names to show in the plot ''' assert not check_shape or drivers[0].shape == observed.shape,\ 'Driver data should have the same shape as the "observed" data' assert len(drivers) == len(self.required_drivers[self.name]),\ 'Did not receive expected number of driver datasets!' assert tune in ('lambda', 'scaling') or tune is None # Update prior assumptions self.prior.update(prior) # Generate an initial goodness-of-fit score predicted = self.model([ self.params[p] for p in self.required_parameters[self.name] ], *drivers) if self.weights is not None: score = np.sqrt( np.nanmean(((predicted - observed) * self.weights) ** 2)) else: score = np.sqrt(np.nanmean(((predicted - observed)) ** 2)) print('-- RMSD at the initial point: %.3f' % score) print('Compiling model...') try: compiler = getattr(self, 'compile_%s_model' % self.name.lower()) except AttributeError: raise AttributeError('''Could not find a compiler for model named "%s"; make sure that a function "compile_%s_model()" is defined on this class''' % (model_name, model_name)) with compiler(observed, drivers) as model: with warnings.catch_warnings(): warnings.simplefilter('ignore') step_func = pm.DEMetropolisZ(tune = tune, scaling = scaling) trace = pm.sample( draws = draws, step = step_func, cores = chains, chains = chains, idata_kwargs = {'log_likelihood': True}) if self.backend is not None: print('Writing results to file...') trace.to_netcdf(self.backend) if var_names is None: az.plot_trace(trace, var_names = ['~log_likelihood']) else: az.plot_trace(trace, var_names = var_names) if save_fig: pyplot.savefig('.'.join(self.backend.split('.')[:-1]) + '.png') else: pyplot.show()
 Inherited members