Module pyl4c.apps.calibration.optimize
To calibrate the GPP or RECO model for a specific PFT:
python optimize.py pft <pft> tune-gpp
python optimize.py pft <pft> tune-reco
To plot a specific PFT's (optimized) response to a driver:
python optimize.py pft <pft> plot-gpp <driver>
python optimize.py pft <pft> plot-reco <driver>
Where:
- For
plot-gpp
,<driver>
is one of:smrz
,vpd
,tmin
. - For
plot-reco
,<driver>
is one of:smsf
,tsoil
.
It's important to specify a configuration file, otherwise the default,
pyl4c/data/files/config_L4C_calibration.yaml
, will be used:
python optimize.py pft <pft> tune-gpp --config=my_config.yaml
python optimize.py pft <pft> tune-soc --config=my_config.yaml
python optimize.py pft <pft> tune-reco --config=my_config.yaml
To plot the steady-state SOC versus IGBP SOC pit measurements, and then select potential new decay rates:
python optimize.py pft <pft> tune-soc
To get the goodness-of-fit statistics for the updated parameters:
python optimize.py pft <pft> score GPP
python optimize.py pft <pft> score RECO
To view the current (potentially updated) BPLUT:
python optimize.py bplut show
# View parameter values for <param> across ALL PFTs
python optimize.py bplut show None <param>
# View a PFT's values across ALL parameters
python optimize.py bplut show <param> None
Classes
class CalibrationAPI (config: str = None, pft: int = None)
-
Expand source code
class CalibrationAPI(object): ''' Convenience class for calibrating the L4C GPP and RECO models. Meant to be used with `fire.Fire()`. Uses: # Run the calibration for a specific PFT python optimize.py tune-gpp --pft=<pft> # Get access to the sampler (and debugger), after calibration is run python optimize.py tune-gpp --pft=<pft> --ipdb ''' _driver_bounds = {'apar': (2, np.inf)} _metadata = { 'tmin': {'units': 'deg K'}, 'vpd': {'units': 'Pa'}, 'smrz': {'units': '%'}, 'smsf': {'units': '%'}, 'tsoil': {'units': 'deg K'}, } _required_parameters = { 'GPP': ['LUE', 'tmin0', 'tmin1', 'vpd0', 'vpd1', 'smrz0', 'smrz1', 'ft0'], 'RECO': ['CUE', 'tsoil', 'smsf0', 'smsf1'], 'SOC': ['decay_rates0', 'decay_rates1', 'decay_rates2'] } _required_drivers = { # Tsurf = Surface skin temperature; Tmin = Minimum daily temperature 'GPP': ['fPAR', 'PAR', 'Tmin', 'VPD', 'SMRZ', 'FT'], 'RECO': ['Tsoil', 'SMSF'] } def __init__(self, config: str = None, pft: int = None): config_file = config if config_file is None: config_file = os.path.join( L4C_DIR, 'data/files/config_L4C_calibration.yaml') print(f'Using configuration file: {config_file}') with open(config_file, 'r') as file: self.config = yaml.safe_load(file) if pft is not None: assert pft in PFT_VALID, f'Invalid PFT: {pft}' self._pft = pft self.hdf5 = self.config['data']['file'] self.bplut = BPLUT( restore_bplut(self.config['BPLUT']), hdf5_path = self.config['optimization']['backend']) def _bounds(self, init_params, bounds, model, fixed = None): 'Defines bounds; optionally "fixes" parameters by fixing bounds' params = init_params lower = [] upper = [] # Then, set the bounds; for free parameters, this is what was given; # for fixed parameters, this is the fixed value plus/minus some # tolerance for i, p in enumerate(self._required_parameters[model]): # This is a parameter to be optimized; use default bounds lower.append(bounds[p][0]) upper.append(bounds[p][1]) if fixed is not None: if p in fixed: if fixed[p] is not None: lower.pop() upper.pop() lower.append(fixed[p] - 1e-3) upper.append(fixed[p] + 1e-3) return (np.array(lower), np.array(upper)) def _clean( self, raw: Sequence, drivers: Sequence, protocol: str = 'GPP', num_std: int = 5): 'Cleans up data values according to a prescribed protocol' if protocol == 'GPP': # Filter out observed GPP values when GPP is negative or when # APAR < 0.1 g C m-2 day-1 apar = np.nanmean(drivers['fPAR'], axis = -1) * drivers['PAR'] cleaned = np.where( apar < 0.1, np.nan, np.where(raw < 0, np.nan, raw)) return np.apply_along_axis( lambda x: np.where( x > (num_std * np.nanstd(x)), np.nan, x), 0, cleaned) elif protocol == 'RECO': # Remove negative values return np.where(raw < 0, np.nan, raw) def _filter(self, raw: Sequence, size: int): 'Apply a smoothing filter with zero phase offset' if size > 1: window = np.ones(size) / size return np.apply_along_axis( lambda x: signal.filtfilt(window, np.ones(1), x), 0, raw) return raw # Or, revert to the raw data def _get_params(self, model: str): # Filter the parameters to just those for the PFT of interest return self.bplut.flat(self._pft, self._required_parameters[model.upper()]) def _load_gpp_data( self, filter_length, store = None, prefix = '', load_gpp = True, check_validity = True): 'Load the required datasets for GPP, for a single PFT' blacklist = self.config['data']['sites_blacklisted'] if store is None: store = self.hdf5 with h5py.File(store, 'r') as hdf: n_steps = hdf[f'{prefix}time'].shape[0] sites = hdf[f'{prefix}site_id'][:] if hasattr(sites[0], 'decode'): sites = list(map(lambda x: x.decode('utf-8'), sites)) # Get dominant PFT pft_map = hdf[f'{prefix}state/PFT'][:] if pft_map.ndim > 1: pft_map = pft_dominant(pft_map, sites) # Blacklist validation sites pft_mask = np.logical_and( np.in1d(pft_map, self._pft), ~np.in1d(sites, blacklist)) drivers = dict() field_map = self.config['data']['fields'] for field in self._required_drivers['GPP']: # Try reading the field exactly as described in config file if field in field_map: if field_map[field] in hdf: # Preserve 1-km subgrid for fPAR if field == 'fPAR': drivers[field] = hdf[field_map[field]][:,pft_mask,:] else: drivers[field] = hdf[field_map[field]][:,pft_mask] elif field == 'PAR': if 'SWGDN' not in field_map: raise ValueError(f"Could not find PAR or SWGDN data") drivers[field] = par(hdf[field_map['SWGDN']][:,pft_mask]) elif field == 'VPD': qv2m = hdf[field_map['QV2M']][:,pft_mask] ps = hdf[field_map['PS']][:,pft_mask] t2m = hdf[field_map['T2M']][:,pft_mask] drivers[field] = vpd(qv2m, ps, t2m) elif field == 'SMRZ': smrz = hdf[field_map['SMRZ0']][:,pft_mask] smrz_min = smrz.min(axis = 0) drivers[field] = rescale_smrz(smrz, smrz_min) elif field == 'FT': tsurf = hdf[field_map['Tsurf']][:,pft_mask] # Classify soil as frozen (FT=0) or unfrozen (FT=1) based # on threshold freezing point of water drivers[field] = np.where(tsurf <= 273.15, 0, 1) # Check units on fPAR if np.nanmax(drivers['fPAR'][:]) > 10: drivers['fPAR'] = drivers['fPAR'].astype(np.float32) / 100 assert len(set(self._required_drivers['GPP'])\ .difference(set(drivers.keys()))) == 0,\ 'Did not find all required drivers for the GPP model!' # If RMSE is used, then we want to pay attention to weighting weights = None if 'weights' in hdf.keys(): weights = hdf['weights'][pft_mask][np.newaxis,:]\ .repeat(n_steps, axis = 0) else: print('WARNING - "weights" not found in HDF5 file!') if load_gpp and 'GPP' not in hdf.keys(): with h5py.File( self.config['data']['supplemental_file'], 'r') as _hdf: tower_gpp = _hdf['GPP'][:][:,pft_mask] elif load_gpp: tower_gpp = hdf['GPP'][:][:,pft_mask] # Check that driver data do not contain NaNs if check_validity: for field in drivers.keys(): if field == 'fPAR': continue # The 1-km subgrid may have NaNs assert not np.isnan(drivers[field]).any(),\ f'Driver dataset "{field}" contains NaNs' # If we don't want to load observational data, return the drivers data if not load_gpp: return (drivers, None, None, None, weights) # Clean observations, then mask out driver data where the are no # observations tower_gpp = self._filter(tower_gpp, filter_length) tower_gpp = self._clean(tower_gpp, drivers, protocol = 'GPP') # Subset all datasets to just the valid observation site-days tower_gpp_flat = tower_gpp[~np.isnan(tower_gpp)] if weights is not None: weights = weights[~np.isnan(tower_gpp)] drivers_flat = list() # For eveything other than fPAR, add a trailing axis to the flat view; # this will enable datasets to line up with fPAR's 1-km subgrid for field in drivers.keys(): flat = drivers[field][~np.isnan(tower_gpp)] drivers_flat.append(flat[:,np.newaxis] if field != 'fPAR' else flat) return (drivers, drivers_flat, tower_gpp, tower_gpp_flat, weights) def _load_reco_data( self, filter_length, store = None, prefix = '', load_reco = True, check_validity = True): 'Load the required datasets for RECO, for a single PFT' blacklist = self.config['data']['sites_blacklisted'] if store is None: store = self.hdf5 with h5py.File(store, 'r') as hdf: n_steps = hdf[f'{prefix}time'].shape[0] sites = hdf[f'{prefix}site_id'][:] if hasattr(sites[0], 'decode'): sites = list(map(lambda x: x.decode('utf-8'), sites)) # Get dominant PFT pft_map = hdf[f'{prefix}state/PFT'][:] if pft_map.ndim > 1: pft_map = pft_dominant(pft_map, sites) # Blacklist validation sites pft_mask = np.logical_and( np.in1d(pft_map, self._pft), ~np.in1d(sites, blacklist)) drivers = dict() field_map = self.config['data']['fields'] for field in self._required_drivers['RECO']: # Try reading the field exactly as described in config file if field in field_map: if field_map[field] in hdf: drivers[field] = hdf[field_map[field]][:,pft_mask] # If RMSE is used, then we want to pay attention to weighting weights = None if 'weights' in hdf.keys(): weights = hdf['weights'][pft_mask][np.newaxis,:]\ .repeat(n_steps, axis = 0) else: print('WARNING - "weights" not found in HDF5 file!') if load_reco and 'RECO' not in hdf.keys(): with h5py.File( self.config['data']['supplemental_file'], 'r') as _hdf: tower_reco = _hdf['RECO'][:][:,pft_mask] elif load_reco: tower_reco = hdf['RECO'][:][:,pft_mask] # Check that driver data do not contain NaNs if check_validity: for field in drivers.keys(): assert not np.isnan(drivers[field]).any(),\ f'Driver dataset "{field}" contains NaNs' # If we don't want to load observational data, return the drivers data if not load_reco: return (drivers, None, weights) # Clean observations, then mask out driver data where the are no # observations tower_reco = self._filter(tower_reco, filter_length) tower_reco = self._clean(tower_reco, drivers, protocol = 'RECO') drivers = [drivers[k] for k in self._required_drivers['RECO']] return (drivers, tower_reco, weights) def _report(self, old_params, new_params, model, prec = 2): 'Prints a report on the updated (optimized) parameters' labels = self._required_parameters[model.upper()] pad = max(len(l) for l in labels) + 1 fmt_string = '-- {:<%d} {:>%d} [{:>%d}]' % (pad, 5 + prec, 7 + prec) print('%s parameters report, %s (PFT %d):' % ( f'{model.upper()} Optimization', PFT[self._pft][0], self._pft)) print((' {:>%d} {:>%d}' % (8 + pad + prec, 8 + prec))\ .format('NEW', 'INITIAL')) for i, label in enumerate(labels): new = ('%%.%df' % prec) % new_params[i] if new_params[i] is not None else '' old = ('%%.%df' % prec) % old_params[i] print(fmt_string.format(('%s:' % label), new, old)) @staticmethod def e_mult(params, tmin, vpd, smrz, ft): # Calculate E_mult based on current parameters f_tmin = linear_constraint(params[1], params[2]) f_vpd = linear_constraint(params[3], params[4], 'reversed') f_smrz = linear_constraint(params[5], params[6]) f_ft = linear_constraint(params[7], 1.0, 'binary') return f_tmin(tmin) * f_vpd(vpd) * f_smrz(smrz) * f_ft(ft) @staticmethod def k_mult(params, tsoil, smsf): # Calculate K_mult based on current parameters f_tsoil = partial(arrhenius, beta0 = params[1]) f_smsf = linear_constraint(params[2], params[3]) return f_tsoil(tsoil) * f_smsf(smsf) @staticmethod def gpp(params, fpar, par, tmin, vpd, smrz, ft): # Calculate GPP based on the provided BPLUT parameters apar = fpar * par return apar * params[0] *\ CalibrationAPI.e_mult(params, tmin, vpd, smrz, ft) @staticmethod def reco(params, tower_reco, tower_gpp, tsoil, smsf, q_rh, q_k): # Calculate RH as (RECO - RA) or (RECO - (faut * GPP)) ra = ((1 - params[0]) * tower_gpp) rh = tower_reco - ra rh = np.where(rh < 0, 0, rh) # Mask out negative RH values kmult0 = CalibrationAPI.k_mult(params, tsoil, smsf) cbar0 = cbar(rh, kmult0, q_rh, q_k) return ra + (kmult0 * cbar0) def pft(self, pft): ''' Sets the PFT class for the next calibration step. Parameters ---------- pft : int The PFT class to use in calibration Returns ------- CLI ''' assert pft in range(1, 9), 'Unrecognized PFT class' self._pft = pft return self def plot_gpp( self, driver, filter_length = 2, coefs = None, xlim = None, ylim = None, alpha = 0.1, marker = '.'): ''' Using the current or optimized BPLUT coefficients, plots the GPP ramp function for a given driver. NOTE: Values where APAR < 2.0 are not shown. Parameters ---------- driver : str Name of the driver to plot on the horizontal axis filter_length : int coefs : list or tuple or numpy.ndarray (Optional) array-like, Instead of using what's in the BPLUT, specify the exact parameters, e.g., [tmin0, tmin1] xlim : list or tuple (Optional) A 2-element sequence: The x-axis limits ylim : list or tuple (Optional) A 2-element sequence: The x-axis limits alpha : float (Optional) The alpha value (Default: 0.1) marker : str (Optional) The marker symbol (Default: ".") ''' @suppress_warnings def empirical_lue(apar, gpp): # Mask low APAR values lower, _ = self._driver_bounds.get('apar', (0, None)) apar = np.where(apar < lower, np.nan, apar) # Calculate empirical light-use efficiency: GPP/APAR return np.where(apar > 0, np.divide(gpp, apar), 0) np.seterr(invalid = 'ignore') # Read in GPP and APAR data assert driver.lower() in ('tmin', 'vpd', 'smrz'),\ 'Requested driver "%s" cannot be plotted for GPP' % driver if coefs is not None: assert hasattr(coefs, 'index') and not hasattr(coefs, 'title'),\ "Argument --coefs expects a list [values,] with NO spaces" coefs0 = [ # Original coefficients self.bplut[driver.lower()][i][self._pft] for i in (0, 1) ] # Load APAR and tower GPP data driver_data, _, tower_gpp, _, _ = self._load_gpp_data(filter_length) apar = np.nanmean(driver_data['fPAR'], axis = -1) * driver_data['PAR'] # Based on the bounds, create an empirical ramp function that # spans the range of the driver bounds = [ self.config['optimization']['bounds'][f'{driver.lower()}{i}'][i] for i in (0, 1) # i.e., min(tmin0) and max(tmin1) ] domain = np.arange(bounds[0], bounds[1], 0.1) ramp_func_original = linear_constraint( *coefs0, 'reversed' if driver.lower() == 'vpd' else None) if coefs is not None: ramp_func = linear_constraint( *coefs, 'reversed' if driver.lower() == 'vpd' else None) if driver.lower() == 'vpd': x0 = driver_data['VPD'] elif driver.lower() == 'tmin': x0 = driver_data['Tmin'] elif driver.lower() == 'smrz': x0 = driver_data['SMRZ'] # Update plotting parameters lue = empirical_lue(apar, tower_gpp) # Mask out negative LUE values and values with APAR<2 pyplot.scatter(x0, np.where( np.logical_or(lue == 0, apar < 2), np.nan, lue), alpha = alpha, marker = marker) # Plot the original ramp function (black line) pyplot.plot(domain, ramp_func_original(domain) *\ self.bplut['LUE'][:,self._pft], 'k-') # Optionally, plot a proposed ramp function (red line) if coefs is not None: pyplot.plot(domain, ramp_func(domain) *\ self.bplut['LUE'][:,self._pft], 'r-') pyplot.xlabel('%s (%s)' % (driver, self._metadata[driver]['units'])) pyplot.ylabel('GPP/APAR (g C MJ-1 d-1)') if xlim is not None: pyplot.xlim(xlim[0], xlim[1]) if ylim is not None: pyplot.ylim(ylim[0], ylim[1]) pyplot.title( '%s (PFT %d): GPP Response to "%s"' % ( PFT[self._pft][0], self._pft, driver)) pyplot.show() def plot_reco( self, driver, filter_length: int = 2, coefs = None, q_rh = 75, q_k = 50, xlim = None, ylim = None, alpha = 0.1, marker = '.'): ''' Using the current or optimized BPLUT coefficients, plots the RECO ramp function for a given driver. The ramp function is shown on a plot of RH/Cbar, which is equivalent to Kmult (as Cbar is an upper quantile of the RH/Kmult distribution). Parameters ---------- driver : str Name of the driver to plot on the horizontal axis filter_length : int coefs : list or tuple or numpy.ndarray (Optional) array-like, Instead of using what's in the BPLUT, specify the exact parameters, e.g., `[tmin0, tmin1]` q_rh : int Additional arguments to `pyl4c.apps.calibration.cbar()` q_k : int Additional arguments to `pyl4c.apps.calibration.cbar()` ylim : list or tuple (Optional) A 2-element sequence: The x-axis limits alpha : float (Optional) The alpha value (Default: 0.1) marker : str (Optional) The marker symbol (Default: ".") ''' xlabels = { 'smsf': 'Surface Soil Moisture', 'tsoil': 'Soil Temperature' } np.seterr(invalid = 'ignore') assert driver.lower() in ('tsoil', 'smsf'),\ 'Requested driver "%s" cannot be plotted for RECO' % driver if coefs is not None: assert hasattr(coefs, 'index') and not hasattr(coefs, 'title'),\ "Argument --coefs expects a list [values,] with NO spaces" drivers, reco, _ = self._load_reco_data(filter_length) _, _, gpp, _, _ = self._load_gpp_data(filter_length) tsoil, smsf = drivers # Calculate k_mult based on original parameters f_smsf = linear_constraint(*self.bplut['smsf'][:,self._pft]) k_mult = f_smsf(smsf) * arrhenius(tsoil, self.bplut['tsoil'][0,self._pft]) # Calculate RH as (RECO - RA) rh = reco - ((1 - self.bplut['CUE'][0,self._pft]) * gpp) # Set negative RH values to zero rh = np.where(suppress_warnings(np.less)(rh, 0), 0, rh) cbar0 = suppress_warnings(cbar)(rh, k_mult, q_rh, q_k) gpp = reco = None # Update plotting parameters pyplot.scatter( # Plot RH/Cbar against either Tsoil or SMSF tsoil if driver == 'tsoil' else smsf, suppress_warnings(np.divide)(rh, cbar0), alpha = alpha, marker = marker) if driver == 'tsoil': domain = np.arange(tsoil.min(), tsoil.max(), 0.1) pyplot.plot(domain, arrhenius(domain, self.bplut['tsoil'][0,self._pft]), 'k-') elif driver == 'smsf': domain = np.arange(smsf.min(), smsf.max(), 0.001) pyplot.plot(domain, f_smsf(domain).ravel(), 'k-') if coefs is not None: if driver == 'tsoil': pyplot.plot(domain, arrhenius(domain, *coefs), 'r-') elif driver == 'smsf': pyplot.plot(domain, linear_constraint(*coefs)(domain), 'r-') pyplot.xlabel('%s (%s)' % ( xlabels[driver], self._metadata[driver]['units'])) pyplot.ylabel('RH/Cbar') if xlim is not None: pyplot.xlim(xlim[0], xlim[1]) if ylim is not None: pyplot.ylim(ylim[0], ylim[1]) pyplot.title( '%s (PFT %d): RECO Response to "%s"' % ( PFT[self._pft][0], self._pft, 'SMSF' if driver.lower() == 'smsf' else 'Tsoil')) pyplot.show() def score( self, model: str, filter_length: int = 2, q_rh: int = 75, q_k: int = 50): ''' Scores the current model (for a specific PFT) based on the parameters in the BPLUT. Parameters ---------- model : str One of: "GPP" or "RECO" filter_length : int The window size for the smoothing filter, applied to the observed data ''' gpp_drivers, gpp_drivers_flat, tower_gpp, tower_gpp_flat, weights =\ self._load_gpp_data(filter_length) if model.upper() == 'GPP': observed = tower_gpp_flat elif model.upper() == 'RECO': reco_drivers, tower_reco, weights =\ self._load_reco_data(filter_length) observed = tower_reco # Print the parameters table old_params = restore_bplut_flat(self.config['BPLUT']) old_params = [ old_params[k][:,self._pft] for k in self._required_parameters[model.upper()] ] params = self._get_params(model.upper()) self._report(old_params, params, model.upper()) # Get goodness-of-fit statistics if model.upper() == 'GPP': predicted = self.gpp(params, *gpp_drivers_flat).mean(axis = -1) elif model.upper() == 'RECO': predicted = self.reco( params, tower_reco, tower_gpp, *reco_drivers, q_rh = q_rh, q_k = q_k) r2, rmse_score, ub_rmse, bias = report_fit_stats( observed, predicted, weights, verbose = False) print(f'{model.upper()} model goodness-of-fit statistics:') print(('-- {:<13} {:>5}').format('R-squared:', '%.3f' % r2)) print(('-- {:<13} {:>5}').format('RMSE:', '%.2f' % rmse_score)) print(('-- {:<13} {:>5}').format('Unbised RMSE:', '%.2f' % ub_rmse)) print(('-- {:<13} {:>5}').format('Bias:', '%.2f' % bias)) def set(self, parameter, value): ''' Sets the named parameter to the given value for the specified PFT class. This updates the initial parameters, affecting any subsequent optimization. Parameters ---------- parameter : str Name of the parameter to bet set value : int or float Value of the named parameter to be set Returns ------- CLI ''' # Update the BPLUT in memory but NOT the file BPLUT (this is temporary) self.bplut.update(self._pft, (value,), (parameter,), flush = False) return self def tune_gpp( self, filter_length: int = 2, optimize: bool = True, use_nlopt: bool = True): ''' Run the L4C GPP calibration. - For GPP data: Removes observations where GPP < 0 or where APAR is < 0.1 MJ m-2 day-1 Parameters ---------- filter_length : int The window size for the smoothing filter, applied to the observed data optimize : bool True to run the optimization (Default) or False if you just want the goodness-of-fit to be reported use_nlopt : bool True to use `nlopt` for optimization (Default) ''' def residuals(params, drivers, observed, weights): gpp0 = self.gpp(params, *drivers).mean(axis = -1) diff = np.subtract(observed, gpp0) # Objective function: Difference between tower GPP and L4C GPP, # multiplied by the tower weights return (weights * diff)[~np.isnan(diff)] init_params = self._get_params('GPP') # Load blacklisted sites (if any) blacklist = self.config['data']['sites_blacklisted'] print('Loading driver datasets...') drivers, drivers_flat, tower_gpp, tower_gpp_flat, weights =\ self._load_gpp_data(filter_length) print(f'NOTE: Counts of (days, towers) are: {tower_gpp.shape}') # Configure the optimization, get bounds for the parameter search fixed = None if self.config['optimization']['fixed'] is not None: fixed = dict([ # Get any fixed parameters as {param: fixed_value} (k, v[self._pft]) for k, v in self.config['optimization']['fixed'].items() ]) step_size_global = [ self.config['optimization']['step_size'][p] for p in self._required_parameters['GPP'] ] bounds_dict = self.config['optimization']['bounds'] bounds = self._bounds(init_params, bounds_dict, 'GPP', fixed) trials = self.config['optimization']['trials'] params = [] # Optimized parameters params0 = [] # Initial (random) parameters scores = [] # Will be a (100, P) space, where P is the number of parameters param_space = np.linspace(bounds[0], bounds[1], 100) for t in range(0, trials): # If multiple trials, randomize the initial parameter values # and score the model in each trial if trials > 1: p = param_space.shape[1] # Number of parameters idx = np.random.randint(0, param_space.shape[0], p) init_params = param_space[idx,np.arange(0, p)] params0.append(init_params) # If we're optimizing (with any library), define the bounds and # the objective function if optimize: bounds = self._bounds(init_params, bounds_dict, 'GPP', fixed) # Set initial value to a fixed value if specified if fixed is not None: for key, value in fixed.items(): if value is not None and key in self._required_parameters['GPP']: init_params[self._required_parameters['GPP'].index(key)] = value objective = partial( residuals, drivers = drivers_flat, observed = tower_gpp_flat, weights = weights) # Apply constrained, non-linear least-squares optimization, using # either SciPy or NLOPT if optimize and not use_nlopt: solution = solve_least_squares( objective, init_params, labels = self.required_parameters['GPP'], bounds = bounds, loss = 'arctan') fitted = solution.x.tolist() print(solution.message) elif optimize and use_nlopt: opt = GenericOptimization( objective, bounds, step_size = step_size_global) fitted = opt.solve(init_params) else: fitted = [None for i in range(0, len(init_params))] # Record the found solution and its goodness-of-fit score params.append(fitted) predicted = self.gpp( fitted if optimize else init_params, *drivers_flat).mean(axis = -1) _, rmse_score, _, _ = report_fit_stats( tower_gpp_flat, predicted, weights, verbose = False) print('[%s/%s] RMSE score of last trial: %.3f' % ( str(t + 1).zfill(2), str(trials).zfill(2), rmse_score)) scores.append(rmse_score) # Select the fit params with the best score if trials > 1: fitted = params[np.argmin(scores)] init_params = params0[np.argmin(scores)] # Generate and print a report, update the BPLUT parameters self._report(init_params, fitted, 'GPP') if optimize: self.bplut.update( self._pft, fitted, self._required_parameters['GPP']) def tune_reco( self, filter_length: int = 2, q_rh: int = 75, q_k: int = 50, optimize: bool = True, use_nlopt: bool = True): ''' Optimizes RECO. The 9-km mean L4C RECO is fit to the tower-observed RECO using constrained, non-linear least-squares optimization. Considerations: - Negative RH values (i.e., NPP > RECO) are set to zero. Parameters ---------- filter_length : int The window size for the smoothing filter, applied to the observed data optimize : bool False to only report parameters and their fit statistics instead of optimizing (Default: True) use_nlopt : bool True to use the nlopt library for optimization (Default: True) ''' def residuals( params, drivers, observed_reco, observed_gpp, weights, q_rh, q_k): # Objective function: Difference between tower RECO and L4C RECO reco0 = self.reco( params, observed_reco, observed_gpp, *drivers, q_rh, q_k) diff = np.subtract(observed_reco, reco0) missing = np.logical_or(np.isnan(observed_reco), np.isnan(reco0)) # Multiply by the tower weights return (weights * diff)[~missing] assert q_rh >= 0 and q_rh <= 100 and q_k >= 0 and q_k <= 100,\ 'Invalid setting for "q_rh" or "q_k" parameters' init_params = self._get_params('RECO') # Load blacklisted sites (if any) blacklist = self.config['data']['sites_blacklisted'] print('Loading driver datasets...') _, _, tower_gpp, _, _ = self._load_gpp_data(filter_length) drivers, tower_reco, weights = self._load_reco_data(filter_length) print(f'NOTE: Counts of (days, towers) are: {tower_reco.shape}') # Configure the optimization, get bounds for the parameter search fixed = None if self.config['optimization']['fixed'] is not None: fixed = dict([ # Get any fixed parameters as {param: fixed_value} (k, v[self._pft]) for k, v in self.config['optimization']['fixed'].items() ]) step_size_global = [ self.config['optimization']['step_size'][p] for p in self._required_parameters['RECO'] ] bounds_dict = self.config['optimization']['bounds'] bounds = self._bounds(init_params, bounds_dict, 'RECO', fixed) trials = self.config['optimization']['trials'] params = [] # Optimized parameters params0 = [] # Initial (random) parameters scores = [] # Will be a (100, P) space, where P is the number of parameters param_space = np.linspace(bounds[0], bounds[1], 100) for t in range(0, trials): # If multiple trials, randomize the initial parameter values # and score the model in each trial if trials > 1: p = param_space.shape[1] # Number of parameters idx = np.random.randint(0, param_space.shape[0], p) init_params = param_space[idx,np.arange(0, p)] params0.append(init_params) # If we're optimizing (with any library), define the bounds and # the objective function if optimize: bounds = self._bounds(init_params, bounds_dict, 'RECO', fixed) # Set initial value to a fixed value if specified for key, value in fixed.items(): if value is not None and key in self._required_parameters['RECO']: init_params[self._required_parameters['RECO'].index(key)] = value objective = partial( residuals, drivers = drivers, weights = weights, observed_gpp = tower_gpp, observed_reco = tower_reco, q_rh = q_rh, q_k = q_k) # Apply constrained, non-linear least-squares optimization, using # either SciPy or NLOPT if optimize and not use_nlopt: solution = solve_least_squares( objective, init_params, labels = self.required_parameters['RECO'], bounds = bounds, loss = 'arctan') fitted = solution.x.tolist() print(solution.message) elif optimize and use_nlopt: opt = GenericOptimization( objective, bounds, step_size = step_size_global) fitted = opt.solve(init_params) else: fitted = [None for i in range(0, len(init_params))] # Record the found solution and its goodness-of-fit score params.append(fitted) predicted = self.reco( fitted if optimize else init_params, tower_reco, tower_gpp, *drivers, q_rh = q_rh, q_k = q_k) _, rmse_score, _, _ = report_fit_stats( tower_reco, predicted, weights, verbose = False) print('[%s/%s] RMSE score of last trial: %.3f' % ( str(t + 1).zfill(2), str(trials).zfill(2), rmse_score)) scores.append(rmse_score) # Select the fit params with the best score if trials > 1: fitted = params[np.argmin(scores)] init_params = params0[np.argmin(scores)] # Generate and print a report, update the BPLUT parameters self._report(init_params, fitted, 'RECO') if optimize: self.bplut.update( self._pft, fitted, self._required_parameters['RECO']) def tune_soc(self, filter_length: int = 2, xlim = None, alpha = 0.6): ''' Starts interactive calibration procedure for the soil organic carbon (SOC) decay parameters for a given PFT. Uses any updated parameters in the attached storage: `optimization.backend` in the configuration file. Parameters ---------- filter_length : int The window size for the smoothing filter, applied to the observed data xlim : int or None The largest SOC value that should be shown on the plot (i.e., the X and Y axis limit, for a square plot); if None, defaults to the largest value in the inventory dataset alpha : float The alpha applied to the SOC scatterplot points (Default: 0.6) ''' soc_file = self.config['data']['soil_organic_carbon']['file'] # Load SOC pit measurements with h5py.File(soc_file, 'r') as hdf: dates = hdf[ self.config['data']['soil_organic_carbon']['fields']['time'] ][:] dates = [datetime.date(*ymd) for ymd in dates.tolist()] field = self.config['data']['soil_organic_carbon']['fields']['observed'] field_pft = self.config['data']['soil_organic_carbon']['fields']['PFT'] pft_mask = hdf[field_pft][:] target_soc = hdf[field][:][pft_mask == self._pft] print('Loading driver datasets...') drivers_for_gpp, _, _, _, _ = self._load_gpp_data( filter_length, store = soc_file, prefix = 'ISCN/', load_gpp = False, check_validity = False) drivers_for_reco, _, weights = self._load_reco_data( filter_length, store = soc_file, prefix = 'ISCN/', load_reco = False, check_validity = False) drivers_for_reco = [ # Convert from dict to list drivers_for_reco[key] for key in self._required_drivers['RECO'] ] # Calculate GPP based on the updated parameters init_params = restore_bplut_flat(self.config['BPLUT']) params_gpp = self._get_params('GPP') params_reco = self._get_params('RECO') gpp = self.gpp(params_gpp, *[ drivers_for_gpp[d] if d != 'fPAR' else np.nanmean(drivers_for_gpp[d], axis = -1) for d in self._required_drivers['GPP'] ]) # Calculate a 365-day climatology of NPP cue = params_reco[self._required_parameters['RECO'].index('CUE')] npp = gpp * cue # Make the time axis (currently 0) be the trailing axis npp_clim = climatology365(npp.swapaxes(0, 1), dates) # Calculate litterfall litter = npp_clim.sum(axis = 0) / 365 # Calculate a 365-day climatology of Kmult kmult = self.k_mult(params_reco, *drivers_for_reco) # Make the time axis (currently 0) be the trailing axis kmult_clim = climatology365(kmult.swapaxes(0, 1), dates) sigma = npp_clim.sum(axis = 0) / kmult_clim.sum(axis = 0) # Inferred steady-state storage fmet = init_params['f_metabolic'][:,self._pft][0] fstr = init_params['f_structural'][:,self._pft][0] decay_rates = self._get_params('SOC') decay_rates = decay_rates[:,np.newaxis] # Begin user-interaction loop to manually calibrate decay rates prev = None while True: init_soc = soc_analytical_spinup( litter, kmult_clim, fmet, fstr, decay_rates) soc, _ = soc_numerical_spinup( np.stack(init_soc), litter, kmult_clim, fmet, fstr, decay_rates, verbose = True) soc = np.stack(soc).sum(axis = 0) _, ax = pyplot.subplots(figsize = (6,6)) ax.plot([0, 1], [0, 1], transform = ax.transAxes, linestyle = 'dotted') if prev is not None: pyplot.plot( target_soc / 1e3, prev / 1e3, 'o', c = 'gray', alpha = alpha / 2) try: pyplot.plot(target_soc / 1e3, soc / 1e3, 'o', alpha = alpha) except: import ipdb ipdb.set_trace()#FIXME xmin, xmax = np.nanpercentile(target_soc / 1e3, (0, 100)) print(f'-- Min/Max of Inventory SOC: {xmin.round(1), xmax.round(1)}') print(f'-- Min/Max of Predicted SOC: {np.nanmin(soc / 1e3).round(1), np.nanmax(soc / 1e3).round(1)}') pyplot.xlim(0, xmax if xlim is None else xlim) pyplot.ylim(0, xmax if xlim is None else xlim) pyplot.xlabel('Inventory SOC (kg m$^{-2}$)') pyplot.ylabel('Modeled Equilibrium SOC (kg m$^{-2}$)') pyplot.title(f'For PFT={self._pft}') pyplot.show() # Calculate correlation coefficient mask = np.isnan(target_soc) r = np.corrcoef(target_soc[~mask], soc[~mask])[0,1] rmse = rmsd(target_soc[~mask], soc[~mask]) print(f'Current metabolic rate (r={r.round(3)}, RMSE={round(rmse, 1)}):') print('%.5f\n' % decay_rates[0]) proposal = input('New metabolic rate [Q to quit]:\n') if proposal == 'Q': break value = float(proposal) # NOTE: The "structural" and "recalcitrant" pool decay rates # here should be the actual decay rates, i.e., the "metabolic" # rate scaled by fixed constants decay_rates = np.array([ value, value * 0.4, value * 0.0093 ]).reshape((3, 1)) prev = soc.copy() print(f'Updated BPLUT decay rates for PFT={self._pft}') self.bplut.update( self._pft, decay_rates.ravel(), ['decay_rates0', 'decay_rates1', 'decay_rates2'])
Convenience class for calibrating the L4C GPP and RECO models. Meant to be used with
fire.Fire()
. Uses:# Run the calibration for a specific PFT python optimize.py tune-gpp --pft=<pft> # Get access to the sampler (and debugger), after calibration is run python optimize.py tune-gpp --pft=<pft> --ipdb
Static methods
def e_mult(params, tmin, vpd, smrz, ft)
-
Expand source code
@staticmethod def e_mult(params, tmin, vpd, smrz, ft): # Calculate E_mult based on current parameters f_tmin = linear_constraint(params[1], params[2]) f_vpd = linear_constraint(params[3], params[4], 'reversed') f_smrz = linear_constraint(params[5], params[6]) f_ft = linear_constraint(params[7], 1.0, 'binary') return f_tmin(tmin) * f_vpd(vpd) * f_smrz(smrz) * f_ft(ft)
def gpp(params, fpar, par, tmin, vpd, smrz, ft)
-
Expand source code
@staticmethod def gpp(params, fpar, par, tmin, vpd, smrz, ft): # Calculate GPP based on the provided BPLUT parameters apar = fpar * par return apar * params[0] *\ CalibrationAPI.e_mult(params, tmin, vpd, smrz, ft)
def k_mult(params, tsoil, smsf)
-
Expand source code
@staticmethod def k_mult(params, tsoil, smsf): # Calculate K_mult based on current parameters f_tsoil = partial(arrhenius, beta0 = params[1]) f_smsf = linear_constraint(params[2], params[3]) return f_tsoil(tsoil) * f_smsf(smsf)
def reco(params, tower_reco, tower_gpp, tsoil, smsf, q_rh, q_k)
-
Expand source code
@staticmethod def reco(params, tower_reco, tower_gpp, tsoil, smsf, q_rh, q_k): # Calculate RH as (RECO - RA) or (RECO - (faut * GPP)) ra = ((1 - params[0]) * tower_gpp) rh = tower_reco - ra rh = np.where(rh < 0, 0, rh) # Mask out negative RH values kmult0 = CalibrationAPI.k_mult(params, tsoil, smsf) cbar0 = cbar(rh, kmult0, q_rh, q_k) return ra + (kmult0 * cbar0)
Methods
def pft(self, pft)
-
Expand source code
def pft(self, pft): ''' Sets the PFT class for the next calibration step. Parameters ---------- pft : int The PFT class to use in calibration Returns ------- CLI ''' assert pft in range(1, 9), 'Unrecognized PFT class' self._pft = pft return self
Sets the PFT class for the next calibration step.
Parameters
pft
:int
- The PFT class to use in calibration
Returns
CLI
def plot_gpp(self, driver, filter_length=2, coefs=None, xlim=None, ylim=None, alpha=0.1, marker='.')
-
Expand source code
def plot_gpp( self, driver, filter_length = 2, coefs = None, xlim = None, ylim = None, alpha = 0.1, marker = '.'): ''' Using the current or optimized BPLUT coefficients, plots the GPP ramp function for a given driver. NOTE: Values where APAR < 2.0 are not shown. Parameters ---------- driver : str Name of the driver to plot on the horizontal axis filter_length : int coefs : list or tuple or numpy.ndarray (Optional) array-like, Instead of using what's in the BPLUT, specify the exact parameters, e.g., [tmin0, tmin1] xlim : list or tuple (Optional) A 2-element sequence: The x-axis limits ylim : list or tuple (Optional) A 2-element sequence: The x-axis limits alpha : float (Optional) The alpha value (Default: 0.1) marker : str (Optional) The marker symbol (Default: ".") ''' @suppress_warnings def empirical_lue(apar, gpp): # Mask low APAR values lower, _ = self._driver_bounds.get('apar', (0, None)) apar = np.where(apar < lower, np.nan, apar) # Calculate empirical light-use efficiency: GPP/APAR return np.where(apar > 0, np.divide(gpp, apar), 0) np.seterr(invalid = 'ignore') # Read in GPP and APAR data assert driver.lower() in ('tmin', 'vpd', 'smrz'),\ 'Requested driver "%s" cannot be plotted for GPP' % driver if coefs is not None: assert hasattr(coefs, 'index') and not hasattr(coefs, 'title'),\ "Argument --coefs expects a list [values,] with NO spaces" coefs0 = [ # Original coefficients self.bplut[driver.lower()][i][self._pft] for i in (0, 1) ] # Load APAR and tower GPP data driver_data, _, tower_gpp, _, _ = self._load_gpp_data(filter_length) apar = np.nanmean(driver_data['fPAR'], axis = -1) * driver_data['PAR'] # Based on the bounds, create an empirical ramp function that # spans the range of the driver bounds = [ self.config['optimization']['bounds'][f'{driver.lower()}{i}'][i] for i in (0, 1) # i.e., min(tmin0) and max(tmin1) ] domain = np.arange(bounds[0], bounds[1], 0.1) ramp_func_original = linear_constraint( *coefs0, 'reversed' if driver.lower() == 'vpd' else None) if coefs is not None: ramp_func = linear_constraint( *coefs, 'reversed' if driver.lower() == 'vpd' else None) if driver.lower() == 'vpd': x0 = driver_data['VPD'] elif driver.lower() == 'tmin': x0 = driver_data['Tmin'] elif driver.lower() == 'smrz': x0 = driver_data['SMRZ'] # Update plotting parameters lue = empirical_lue(apar, tower_gpp) # Mask out negative LUE values and values with APAR<2 pyplot.scatter(x0, np.where( np.logical_or(lue == 0, apar < 2), np.nan, lue), alpha = alpha, marker = marker) # Plot the original ramp function (black line) pyplot.plot(domain, ramp_func_original(domain) *\ self.bplut['LUE'][:,self._pft], 'k-') # Optionally, plot a proposed ramp function (red line) if coefs is not None: pyplot.plot(domain, ramp_func(domain) *\ self.bplut['LUE'][:,self._pft], 'r-') pyplot.xlabel('%s (%s)' % (driver, self._metadata[driver]['units'])) pyplot.ylabel('GPP/APAR (g C MJ-1 d-1)') if xlim is not None: pyplot.xlim(xlim[0], xlim[1]) if ylim is not None: pyplot.ylim(ylim[0], ylim[1]) pyplot.title( '%s (PFT %d): GPP Response to "%s"' % ( PFT[self._pft][0], self._pft, driver)) pyplot.show()
Using the current or optimized BPLUT coefficients, plots the GPP ramp function for a given driver. NOTE: Values where APAR < 2.0 are not shown.
Parameters
driver
:str
- Name of the driver to plot on the horizontal axis
filter_length
:int
coefs
:list
ortuple
ornumpy.ndarray
- (Optional) array-like, Instead of using what's in the BPLUT, specify the exact parameters, e.g., [tmin0, tmin1]
xlim
:list
ortuple
- (Optional) A 2-element sequence: The x-axis limits
ylim
:list
ortuple
- (Optional) A 2-element sequence: The x-axis limits
alpha
:float
- (Optional) The alpha value (Default: 0.1)
marker
:str
- (Optional) The marker symbol (Default: ".")
def plot_reco(self,
driver,
filter_length: int = 2,
coefs=None,
q_rh=75,
q_k=50,
xlim=None,
ylim=None,
alpha=0.1,
marker='.')-
Expand source code
def plot_reco( self, driver, filter_length: int = 2, coefs = None, q_rh = 75, q_k = 50, xlim = None, ylim = None, alpha = 0.1, marker = '.'): ''' Using the current or optimized BPLUT coefficients, plots the RECO ramp function for a given driver. The ramp function is shown on a plot of RH/Cbar, which is equivalent to Kmult (as Cbar is an upper quantile of the RH/Kmult distribution). Parameters ---------- driver : str Name of the driver to plot on the horizontal axis filter_length : int coefs : list or tuple or numpy.ndarray (Optional) array-like, Instead of using what's in the BPLUT, specify the exact parameters, e.g., `[tmin0, tmin1]` q_rh : int Additional arguments to `pyl4c.apps.calibration.cbar()` q_k : int Additional arguments to `pyl4c.apps.calibration.cbar()` ylim : list or tuple (Optional) A 2-element sequence: The x-axis limits alpha : float (Optional) The alpha value (Default: 0.1) marker : str (Optional) The marker symbol (Default: ".") ''' xlabels = { 'smsf': 'Surface Soil Moisture', 'tsoil': 'Soil Temperature' } np.seterr(invalid = 'ignore') assert driver.lower() in ('tsoil', 'smsf'),\ 'Requested driver "%s" cannot be plotted for RECO' % driver if coefs is not None: assert hasattr(coefs, 'index') and not hasattr(coefs, 'title'),\ "Argument --coefs expects a list [values,] with NO spaces" drivers, reco, _ = self._load_reco_data(filter_length) _, _, gpp, _, _ = self._load_gpp_data(filter_length) tsoil, smsf = drivers # Calculate k_mult based on original parameters f_smsf = linear_constraint(*self.bplut['smsf'][:,self._pft]) k_mult = f_smsf(smsf) * arrhenius(tsoil, self.bplut['tsoil'][0,self._pft]) # Calculate RH as (RECO - RA) rh = reco - ((1 - self.bplut['CUE'][0,self._pft]) * gpp) # Set negative RH values to zero rh = np.where(suppress_warnings(np.less)(rh, 0), 0, rh) cbar0 = suppress_warnings(cbar)(rh, k_mult, q_rh, q_k) gpp = reco = None # Update plotting parameters pyplot.scatter( # Plot RH/Cbar against either Tsoil or SMSF tsoil if driver == 'tsoil' else smsf, suppress_warnings(np.divide)(rh, cbar0), alpha = alpha, marker = marker) if driver == 'tsoil': domain = np.arange(tsoil.min(), tsoil.max(), 0.1) pyplot.plot(domain, arrhenius(domain, self.bplut['tsoil'][0,self._pft]), 'k-') elif driver == 'smsf': domain = np.arange(smsf.min(), smsf.max(), 0.001) pyplot.plot(domain, f_smsf(domain).ravel(), 'k-') if coefs is not None: if driver == 'tsoil': pyplot.plot(domain, arrhenius(domain, *coefs), 'r-') elif driver == 'smsf': pyplot.plot(domain, linear_constraint(*coefs)(domain), 'r-') pyplot.xlabel('%s (%s)' % ( xlabels[driver], self._metadata[driver]['units'])) pyplot.ylabel('RH/Cbar') if xlim is not None: pyplot.xlim(xlim[0], xlim[1]) if ylim is not None: pyplot.ylim(ylim[0], ylim[1]) pyplot.title( '%s (PFT %d): RECO Response to "%s"' % ( PFT[self._pft][0], self._pft, 'SMSF' if driver.lower() == 'smsf' else 'Tsoil')) pyplot.show()
Using the current or optimized BPLUT coefficients, plots the RECO ramp function for a given driver. The ramp function is shown on a plot of RH/Cbar, which is equivalent to Kmult (as Cbar is an upper quantile of the RH/Kmult distribution).
Parameters
driver
:str
- Name of the driver to plot on the horizontal axis
filter_length
:int
coefs
:list
ortuple
ornumpy.ndarray
- (Optional) array-like, Instead of using what's in the BPLUT,
specify the exact parameters, e.g.,
[tmin0, tmin1]
q_rh
:int
- Additional arguments to
cbar()
q_k
:int
- Additional arguments to
cbar()
ylim
:list
ortuple
- (Optional) A 2-element sequence: The x-axis limits
alpha
:float
- (Optional) The alpha value (Default: 0.1)
marker
:str
- (Optional) The marker symbol (Default: ".")
def score(self, model: str, filter_length: int = 2, q_rh: int = 75, q_k: int = 50)
-
Expand source code
def score( self, model: str, filter_length: int = 2, q_rh: int = 75, q_k: int = 50): ''' Scores the current model (for a specific PFT) based on the parameters in the BPLUT. Parameters ---------- model : str One of: "GPP" or "RECO" filter_length : int The window size for the smoothing filter, applied to the observed data ''' gpp_drivers, gpp_drivers_flat, tower_gpp, tower_gpp_flat, weights =\ self._load_gpp_data(filter_length) if model.upper() == 'GPP': observed = tower_gpp_flat elif model.upper() == 'RECO': reco_drivers, tower_reco, weights =\ self._load_reco_data(filter_length) observed = tower_reco # Print the parameters table old_params = restore_bplut_flat(self.config['BPLUT']) old_params = [ old_params[k][:,self._pft] for k in self._required_parameters[model.upper()] ] params = self._get_params(model.upper()) self._report(old_params, params, model.upper()) # Get goodness-of-fit statistics if model.upper() == 'GPP': predicted = self.gpp(params, *gpp_drivers_flat).mean(axis = -1) elif model.upper() == 'RECO': predicted = self.reco( params, tower_reco, tower_gpp, *reco_drivers, q_rh = q_rh, q_k = q_k) r2, rmse_score, ub_rmse, bias = report_fit_stats( observed, predicted, weights, verbose = False) print(f'{model.upper()} model goodness-of-fit statistics:') print(('-- {:<13} {:>5}').format('R-squared:', '%.3f' % r2)) print(('-- {:<13} {:>5}').format('RMSE:', '%.2f' % rmse_score)) print(('-- {:<13} {:>5}').format('Unbised RMSE:', '%.2f' % ub_rmse)) print(('-- {:<13} {:>5}').format('Bias:', '%.2f' % bias))
Scores the current model (for a specific PFT) based on the parameters in the BPLUT.
Parameters
model
:str
- One of: "GPP" or "RECO"
filter_length
:int
- The window size for the smoothing filter, applied to the observed data
def set(self, parameter, value)
-
Expand source code
def set(self, parameter, value): ''' Sets the named parameter to the given value for the specified PFT class. This updates the initial parameters, affecting any subsequent optimization. Parameters ---------- parameter : str Name of the parameter to bet set value : int or float Value of the named parameter to be set Returns ------- CLI ''' # Update the BPLUT in memory but NOT the file BPLUT (this is temporary) self.bplut.update(self._pft, (value,), (parameter,), flush = False) return self
Sets the named parameter to the given value for the specified PFT class. This updates the initial parameters, affecting any subsequent optimization.
Parameters
parameter
:str
- Name of the parameter to bet set
value
:int
orfloat
- Value of the named parameter to be set
Returns
CLI
def tune_gpp(self, filter_length: int = 2, optimize: bool = True, use_nlopt: bool = True)
-
Expand source code
def tune_gpp( self, filter_length: int = 2, optimize: bool = True, use_nlopt: bool = True): ''' Run the L4C GPP calibration. - For GPP data: Removes observations where GPP < 0 or where APAR is < 0.1 MJ m-2 day-1 Parameters ---------- filter_length : int The window size for the smoothing filter, applied to the observed data optimize : bool True to run the optimization (Default) or False if you just want the goodness-of-fit to be reported use_nlopt : bool True to use `nlopt` for optimization (Default) ''' def residuals(params, drivers, observed, weights): gpp0 = self.gpp(params, *drivers).mean(axis = -1) diff = np.subtract(observed, gpp0) # Objective function: Difference between tower GPP and L4C GPP, # multiplied by the tower weights return (weights * diff)[~np.isnan(diff)] init_params = self._get_params('GPP') # Load blacklisted sites (if any) blacklist = self.config['data']['sites_blacklisted'] print('Loading driver datasets...') drivers, drivers_flat, tower_gpp, tower_gpp_flat, weights =\ self._load_gpp_data(filter_length) print(f'NOTE: Counts of (days, towers) are: {tower_gpp.shape}') # Configure the optimization, get bounds for the parameter search fixed = None if self.config['optimization']['fixed'] is not None: fixed = dict([ # Get any fixed parameters as {param: fixed_value} (k, v[self._pft]) for k, v in self.config['optimization']['fixed'].items() ]) step_size_global = [ self.config['optimization']['step_size'][p] for p in self._required_parameters['GPP'] ] bounds_dict = self.config['optimization']['bounds'] bounds = self._bounds(init_params, bounds_dict, 'GPP', fixed) trials = self.config['optimization']['trials'] params = [] # Optimized parameters params0 = [] # Initial (random) parameters scores = [] # Will be a (100, P) space, where P is the number of parameters param_space = np.linspace(bounds[0], bounds[1], 100) for t in range(0, trials): # If multiple trials, randomize the initial parameter values # and score the model in each trial if trials > 1: p = param_space.shape[1] # Number of parameters idx = np.random.randint(0, param_space.shape[0], p) init_params = param_space[idx,np.arange(0, p)] params0.append(init_params) # If we're optimizing (with any library), define the bounds and # the objective function if optimize: bounds = self._bounds(init_params, bounds_dict, 'GPP', fixed) # Set initial value to a fixed value if specified if fixed is not None: for key, value in fixed.items(): if value is not None and key in self._required_parameters['GPP']: init_params[self._required_parameters['GPP'].index(key)] = value objective = partial( residuals, drivers = drivers_flat, observed = tower_gpp_flat, weights = weights) # Apply constrained, non-linear least-squares optimization, using # either SciPy or NLOPT if optimize and not use_nlopt: solution = solve_least_squares( objective, init_params, labels = self.required_parameters['GPP'], bounds = bounds, loss = 'arctan') fitted = solution.x.tolist() print(solution.message) elif optimize and use_nlopt: opt = GenericOptimization( objective, bounds, step_size = step_size_global) fitted = opt.solve(init_params) else: fitted = [None for i in range(0, len(init_params))] # Record the found solution and its goodness-of-fit score params.append(fitted) predicted = self.gpp( fitted if optimize else init_params, *drivers_flat).mean(axis = -1) _, rmse_score, _, _ = report_fit_stats( tower_gpp_flat, predicted, weights, verbose = False) print('[%s/%s] RMSE score of last trial: %.3f' % ( str(t + 1).zfill(2), str(trials).zfill(2), rmse_score)) scores.append(rmse_score) # Select the fit params with the best score if trials > 1: fitted = params[np.argmin(scores)] init_params = params0[np.argmin(scores)] # Generate and print a report, update the BPLUT parameters self._report(init_params, fitted, 'GPP') if optimize: self.bplut.update( self._pft, fitted, self._required_parameters['GPP'])
Run the L4C GPP calibration.
- For GPP data: Removes observations where GPP < 0 or where APAR is < 0.1 MJ m-2 day-1
Parameters
filter_length
:int
- The window size for the smoothing filter, applied to the observed data
optimize
:bool
- True to run the optimization (Default) or False if you just want the goodness-of-fit to be reported
use_nlopt
:bool
- True to use
nlopt
for optimization (Default)
def tune_reco(self,
filter_length: int = 2,
q_rh: int = 75,
q_k: int = 50,
optimize: bool = True,
use_nlopt: bool = True)-
Expand source code
def tune_reco( self, filter_length: int = 2, q_rh: int = 75, q_k: int = 50, optimize: bool = True, use_nlopt: bool = True): ''' Optimizes RECO. The 9-km mean L4C RECO is fit to the tower-observed RECO using constrained, non-linear least-squares optimization. Considerations: - Negative RH values (i.e., NPP > RECO) are set to zero. Parameters ---------- filter_length : int The window size for the smoothing filter, applied to the observed data optimize : bool False to only report parameters and their fit statistics instead of optimizing (Default: True) use_nlopt : bool True to use the nlopt library for optimization (Default: True) ''' def residuals( params, drivers, observed_reco, observed_gpp, weights, q_rh, q_k): # Objective function: Difference between tower RECO and L4C RECO reco0 = self.reco( params, observed_reco, observed_gpp, *drivers, q_rh, q_k) diff = np.subtract(observed_reco, reco0) missing = np.logical_or(np.isnan(observed_reco), np.isnan(reco0)) # Multiply by the tower weights return (weights * diff)[~missing] assert q_rh >= 0 and q_rh <= 100 and q_k >= 0 and q_k <= 100,\ 'Invalid setting for "q_rh" or "q_k" parameters' init_params = self._get_params('RECO') # Load blacklisted sites (if any) blacklist = self.config['data']['sites_blacklisted'] print('Loading driver datasets...') _, _, tower_gpp, _, _ = self._load_gpp_data(filter_length) drivers, tower_reco, weights = self._load_reco_data(filter_length) print(f'NOTE: Counts of (days, towers) are: {tower_reco.shape}') # Configure the optimization, get bounds for the parameter search fixed = None if self.config['optimization']['fixed'] is not None: fixed = dict([ # Get any fixed parameters as {param: fixed_value} (k, v[self._pft]) for k, v in self.config['optimization']['fixed'].items() ]) step_size_global = [ self.config['optimization']['step_size'][p] for p in self._required_parameters['RECO'] ] bounds_dict = self.config['optimization']['bounds'] bounds = self._bounds(init_params, bounds_dict, 'RECO', fixed) trials = self.config['optimization']['trials'] params = [] # Optimized parameters params0 = [] # Initial (random) parameters scores = [] # Will be a (100, P) space, where P is the number of parameters param_space = np.linspace(bounds[0], bounds[1], 100) for t in range(0, trials): # If multiple trials, randomize the initial parameter values # and score the model in each trial if trials > 1: p = param_space.shape[1] # Number of parameters idx = np.random.randint(0, param_space.shape[0], p) init_params = param_space[idx,np.arange(0, p)] params0.append(init_params) # If we're optimizing (with any library), define the bounds and # the objective function if optimize: bounds = self._bounds(init_params, bounds_dict, 'RECO', fixed) # Set initial value to a fixed value if specified for key, value in fixed.items(): if value is not None and key in self._required_parameters['RECO']: init_params[self._required_parameters['RECO'].index(key)] = value objective = partial( residuals, drivers = drivers, weights = weights, observed_gpp = tower_gpp, observed_reco = tower_reco, q_rh = q_rh, q_k = q_k) # Apply constrained, non-linear least-squares optimization, using # either SciPy or NLOPT if optimize and not use_nlopt: solution = solve_least_squares( objective, init_params, labels = self.required_parameters['RECO'], bounds = bounds, loss = 'arctan') fitted = solution.x.tolist() print(solution.message) elif optimize and use_nlopt: opt = GenericOptimization( objective, bounds, step_size = step_size_global) fitted = opt.solve(init_params) else: fitted = [None for i in range(0, len(init_params))] # Record the found solution and its goodness-of-fit score params.append(fitted) predicted = self.reco( fitted if optimize else init_params, tower_reco, tower_gpp, *drivers, q_rh = q_rh, q_k = q_k) _, rmse_score, _, _ = report_fit_stats( tower_reco, predicted, weights, verbose = False) print('[%s/%s] RMSE score of last trial: %.3f' % ( str(t + 1).zfill(2), str(trials).zfill(2), rmse_score)) scores.append(rmse_score) # Select the fit params with the best score if trials > 1: fitted = params[np.argmin(scores)] init_params = params0[np.argmin(scores)] # Generate and print a report, update the BPLUT parameters self._report(init_params, fitted, 'RECO') if optimize: self.bplut.update( self._pft, fitted, self._required_parameters['RECO'])
Optimizes RECO. The 9-km mean L4C RECO is fit to the tower-observed RECO using constrained, non-linear least-squares optimization. Considerations:
- Negative RH values (i.e., NPP > RECO) are set to zero.
Parameters
filter_length
:int
- The window size for the smoothing filter, applied to the observed data
optimize
:bool
- False to only report parameters and their fit statistics instead of optimizing (Default: True)
use_nlopt
:bool
- True to use the nlopt library for optimization (Default: True)
def tune_soc(self, filter_length: int = 2, xlim=None, alpha=0.6)
-
Expand source code
def tune_soc(self, filter_length: int = 2, xlim = None, alpha = 0.6): ''' Starts interactive calibration procedure for the soil organic carbon (SOC) decay parameters for a given PFT. Uses any updated parameters in the attached storage: `optimization.backend` in the configuration file. Parameters ---------- filter_length : int The window size for the smoothing filter, applied to the observed data xlim : int or None The largest SOC value that should be shown on the plot (i.e., the X and Y axis limit, for a square plot); if None, defaults to the largest value in the inventory dataset alpha : float The alpha applied to the SOC scatterplot points (Default: 0.6) ''' soc_file = self.config['data']['soil_organic_carbon']['file'] # Load SOC pit measurements with h5py.File(soc_file, 'r') as hdf: dates = hdf[ self.config['data']['soil_organic_carbon']['fields']['time'] ][:] dates = [datetime.date(*ymd) for ymd in dates.tolist()] field = self.config['data']['soil_organic_carbon']['fields']['observed'] field_pft = self.config['data']['soil_organic_carbon']['fields']['PFT'] pft_mask = hdf[field_pft][:] target_soc = hdf[field][:][pft_mask == self._pft] print('Loading driver datasets...') drivers_for_gpp, _, _, _, _ = self._load_gpp_data( filter_length, store = soc_file, prefix = 'ISCN/', load_gpp = False, check_validity = False) drivers_for_reco, _, weights = self._load_reco_data( filter_length, store = soc_file, prefix = 'ISCN/', load_reco = False, check_validity = False) drivers_for_reco = [ # Convert from dict to list drivers_for_reco[key] for key in self._required_drivers['RECO'] ] # Calculate GPP based on the updated parameters init_params = restore_bplut_flat(self.config['BPLUT']) params_gpp = self._get_params('GPP') params_reco = self._get_params('RECO') gpp = self.gpp(params_gpp, *[ drivers_for_gpp[d] if d != 'fPAR' else np.nanmean(drivers_for_gpp[d], axis = -1) for d in self._required_drivers['GPP'] ]) # Calculate a 365-day climatology of NPP cue = params_reco[self._required_parameters['RECO'].index('CUE')] npp = gpp * cue # Make the time axis (currently 0) be the trailing axis npp_clim = climatology365(npp.swapaxes(0, 1), dates) # Calculate litterfall litter = npp_clim.sum(axis = 0) / 365 # Calculate a 365-day climatology of Kmult kmult = self.k_mult(params_reco, *drivers_for_reco) # Make the time axis (currently 0) be the trailing axis kmult_clim = climatology365(kmult.swapaxes(0, 1), dates) sigma = npp_clim.sum(axis = 0) / kmult_clim.sum(axis = 0) # Inferred steady-state storage fmet = init_params['f_metabolic'][:,self._pft][0] fstr = init_params['f_structural'][:,self._pft][0] decay_rates = self._get_params('SOC') decay_rates = decay_rates[:,np.newaxis] # Begin user-interaction loop to manually calibrate decay rates prev = None while True: init_soc = soc_analytical_spinup( litter, kmult_clim, fmet, fstr, decay_rates) soc, _ = soc_numerical_spinup( np.stack(init_soc), litter, kmult_clim, fmet, fstr, decay_rates, verbose = True) soc = np.stack(soc).sum(axis = 0) _, ax = pyplot.subplots(figsize = (6,6)) ax.plot([0, 1], [0, 1], transform = ax.transAxes, linestyle = 'dotted') if prev is not None: pyplot.plot( target_soc / 1e3, prev / 1e3, 'o', c = 'gray', alpha = alpha / 2) try: pyplot.plot(target_soc / 1e3, soc / 1e3, 'o', alpha = alpha) except: import ipdb ipdb.set_trace()#FIXME xmin, xmax = np.nanpercentile(target_soc / 1e3, (0, 100)) print(f'-- Min/Max of Inventory SOC: {xmin.round(1), xmax.round(1)}') print(f'-- Min/Max of Predicted SOC: {np.nanmin(soc / 1e3).round(1), np.nanmax(soc / 1e3).round(1)}') pyplot.xlim(0, xmax if xlim is None else xlim) pyplot.ylim(0, xmax if xlim is None else xlim) pyplot.xlabel('Inventory SOC (kg m$^{-2}$)') pyplot.ylabel('Modeled Equilibrium SOC (kg m$^{-2}$)') pyplot.title(f'For PFT={self._pft}') pyplot.show() # Calculate correlation coefficient mask = np.isnan(target_soc) r = np.corrcoef(target_soc[~mask], soc[~mask])[0,1] rmse = rmsd(target_soc[~mask], soc[~mask]) print(f'Current metabolic rate (r={r.round(3)}, RMSE={round(rmse, 1)}):') print('%.5f\n' % decay_rates[0]) proposal = input('New metabolic rate [Q to quit]:\n') if proposal == 'Q': break value = float(proposal) # NOTE: The "structural" and "recalcitrant" pool decay rates # here should be the actual decay rates, i.e., the "metabolic" # rate scaled by fixed constants decay_rates = np.array([ value, value * 0.4, value * 0.0093 ]).reshape((3, 1)) prev = soc.copy() print(f'Updated BPLUT decay rates for PFT={self._pft}') self.bplut.update( self._pft, decay_rates.ravel(), ['decay_rates0', 'decay_rates1', 'decay_rates2'])
Starts interactive calibration procedure for the soil organic carbon (SOC) decay parameters for a given PFT. Uses any updated parameters in the attached storage:
optimization.backend
in the configuration file.Parameters
filter_length
:int
- The window size for the smoothing filter, applied to the observed data
xlim
:int
orNone
- The largest SOC value that should be shown on the plot (i.e., the X and Y axis limit, for a square plot); if None, defaults to the largest value in the inventory dataset
alpha
:float
- The alpha applied to the SOC scatterplot points (Default: 0.6)
class GenericOptimization (func, bounds, method: int = 39, step_size=None, verbose=True)
-
Expand source code
class GenericOptimization(object): ''' A more generic and expansive tool for optimization; includes many more algorithms for minimization/ maximization problems, including sequential quadratic programming (SQP), which is the default here and is closest to what is performed in Matlab's `fmincon`. Despite the similarity to `fmincon`, SQP will tend to deviate strongly from the initial parameters derived via fmincon. This solver is SLOW for gradient descent methods relative to `scipy.optimize.least_squares()`, because the gradient is calculated with a finite element approach. opt = GenericOptimization(residuals, OPT_BOUNDS['gpp'], step_size = (0.01, 0.1, 0.1, 1, 1, 0.1, 0.1, 0.05)) opt.solve(init_params) See: https://nlopt.readthedocs.io/en/latest/NLopt_Python_Reference/ Parameters ---------- func : function Function to calculate the residuals bounds : list or tuple 2-element sequence of (lower, upper) bounds where each element is an array method : str One of the nlopt algorithms step_size : list or tuple or numpy.ndarray Sequence of steps to take in gradient descent; not needed for derivative-free methods verbose : bool True to print all output to the screen ''' def __init__( self, func, bounds, method: int = nlopt.LD_SLSQP, step_size = None, verbose = True): # https://nlopt.readthedocs.io/en/latest/NLopt_Algorithms/#slsqp assert isinstance(method, int), 'Did not recognize "method" argument' self._bounds = bounds self._method = method self._residuals = func self._step_size = step_size self._verbose = verbose def solve(self, init_params, ftol = 1e-8, xtol = 1e-8, maxeval = 500): ''' Using the sum-of-squared errors (SSE) as the objective function, solves a minimization problem. Parameters ---------- init_params : list or tuple or numpy.ndarray Sequence of starting parameters (or "initial guesses") ftol : float xtol : float maxeval : int Maximum number of objective function evaluations Returns ------- numpy.ndarray ''' @suppress_warnings def sse(x): return np.power(self._residuals(x), 2).sum() @suppress_warnings def objf(x, grad): if grad.size > 0: # Approximate the gradient using finite element method grad[...] = optimize.approx_fprime( x, sse, self._step_size) return sse(x) opt = nlopt.opt(self._method, len(init_params)) # https://nlopt.readthedocs.io/en/latest/NLopt_Python_Reference/#localsubsidiary-optimization-algorithm if self._method == nlopt.G_MLSL_LDS: opt.set_local_optimizer( nlopt.opt(nlopt.LN_COBYLA, len(init_params))) opt.set_min_objective(objf) opt.set_lower_bounds(self._bounds[0]) opt.set_upper_bounds(self._bounds[1]) opt.set_ftol_abs(ftol) opt.set_xtol_abs(xtol) opt.set_maxeval(maxeval) if self._verbose: print('Solving...') return opt.optimize(init_params)
A more generic and expansive tool for optimization; includes many more algorithms for minimization/ maximization problems, including sequential quadratic programming (SQP), which is the default here and is closest to what is performed in Matlab's
fmincon
. Despite the similarity tofmincon
, SQP will tend to deviate strongly from the initial parameters derived via fmincon. This solver is SLOW for gradient descent methods relative toscipy.optimize.least_squares()
, because the gradient is calculated with a finite element approach.opt = GenericOptimization(residuals, OPT_BOUNDS['gpp'], step_size = (0.01, 0.1, 0.1, 1, 1, 0.1, 0.1, 0.05)) opt.solve(init_params)
See: https://nlopt.readthedocs.io/en/latest/NLopt_Python_Reference/
Parameters
func
:function
- Function to calculate the residuals
bounds
:list
ortuple
- 2-element sequence of (lower, upper) bounds where each element is an array
method
:str
- One of the nlopt algorithms
step_size
:list
ortuple
ornumpy.ndarray
- Sequence of steps to take in gradient descent; not needed for derivative-free methods
verbose
:bool
- True to print all output to the screen
Methods
def solve(self, init_params, ftol=1e-08, xtol=1e-08, maxeval=500)
-
Expand source code
def solve(self, init_params, ftol = 1e-8, xtol = 1e-8, maxeval = 500): ''' Using the sum-of-squared errors (SSE) as the objective function, solves a minimization problem. Parameters ---------- init_params : list or tuple or numpy.ndarray Sequence of starting parameters (or "initial guesses") ftol : float xtol : float maxeval : int Maximum number of objective function evaluations Returns ------- numpy.ndarray ''' @suppress_warnings def sse(x): return np.power(self._residuals(x), 2).sum() @suppress_warnings def objf(x, grad): if grad.size > 0: # Approximate the gradient using finite element method grad[...] = optimize.approx_fprime( x, sse, self._step_size) return sse(x) opt = nlopt.opt(self._method, len(init_params)) # https://nlopt.readthedocs.io/en/latest/NLopt_Python_Reference/#localsubsidiary-optimization-algorithm if self._method == nlopt.G_MLSL_LDS: opt.set_local_optimizer( nlopt.opt(nlopt.LN_COBYLA, len(init_params))) opt.set_min_objective(objf) opt.set_lower_bounds(self._bounds[0]) opt.set_upper_bounds(self._bounds[1]) opt.set_ftol_abs(ftol) opt.set_xtol_abs(xtol) opt.set_maxeval(maxeval) if self._verbose: print('Solving...') return opt.optimize(init_params)
Using the sum-of-squared errors (SSE) as the objective function, solves a minimization problem.
Parameters
init_params
:list
ortuple
ornumpy.ndarray
- Sequence of starting parameters (or "initial guesses")
ftol
:float
xtol
:float
maxeval
:int
- Maximum number of objective function evaluations
Returns
numpy.ndarray