Module pyl4c.apps.calibration.optimize
To calibrate the GPP or RECO model for a specific PFT:
python optimize.py pft <pft> tune-gpp
python optimize.py pft <pft> tune-reco
To plot a specific PFT's (optimized) response to a driver:
python optimize.py pft <pft> plot-gpp <driver>
python optimize.py pft <pft> plot-reco <driver>
Where:
- For
plot-gpp,<driver>is one of:smrz,vpd,tmin. - For
plot-reco,<driver>is one of:smsf,tsoil.
It's important to specify a configuration file, otherwise the default,
pyl4c/data/files/config_L4C_calibration.yaml, will be used:
python optimize.py pft <pft> tune-gpp --config=my_config.yaml
python optimize.py pft <pft> tune-soc --config=my_config.yaml
python optimize.py pft <pft> tune-reco --config=my_config.yaml
To plot the steady-state SOC versus IGBP SOC pit measurements, and then select potential new decay rates:
python optimize.py pft <pft> tune-soc
To get the goodness-of-fit statistics for the updated parameters:
python optimize.py pft <pft> score GPP
python optimize.py pft <pft> score RECO
To view the current (potentially updated) BPLUT:
python optimize.py bplut show
# View parameter values for <param> across ALL PFTs
python optimize.py bplut show None <param>
# View a PFT's values across ALL parameters
python optimize.py bplut show <param> None
Classes
class CalibrationAPI (config: str = None, pft: int = None)-
Expand source code
class CalibrationAPI(object): ''' Convenience class for calibrating the L4C GPP and RECO models. Meant to be used with `fire.Fire()`. Uses: # Run the calibration for a specific PFT python optimize.py tune-gpp --pft=<pft> # Get access to the sampler (and debugger), after calibration is run python optimize.py tune-gpp --pft=<pft> --ipdb ''' _driver_bounds = {'apar': (2, np.inf)} _metadata = { 'tmin': {'units': 'deg K'}, 'vpd': {'units': 'Pa'}, 'smrz': {'units': '%'}, 'smsf': {'units': '%'}, 'tsoil': {'units': 'deg K'}, } _required_parameters = { 'GPP': ['LUE', 'tmin0', 'tmin1', 'vpd0', 'vpd1', 'smrz0', 'smrz1', 'ft0'], 'RECO': ['CUE', 'tsoil', 'smsf0', 'smsf1'], 'SOC': ['decay_rates0', 'decay_rates1', 'decay_rates2'] } _required_drivers = { # Tsurf = Surface skin temperature; Tmin = Minimum daily temperature 'GPP': ['fPAR', 'PAR', 'Tmin', 'VPD', 'SMRZ', 'FT'], 'RECO': ['Tsoil', 'SMSF'] } def __init__(self, config: str = None, pft: int = None): config_file = config if config_file is None: config_file = os.path.join( L4C_DIR, 'data/files/config_L4C_calibration.yaml') print(f'Using configuration file: {config_file}') with open(config_file, 'r') as file: self.config = yaml.safe_load(file) if pft is not None: assert pft in PFT_VALID, f'Invalid PFT: {pft}' self._pft = pft self.hdf5 = self.config['data']['file'] self.bplut = BPLUT( restore_bplut(self.config['BPLUT']), hdf5_path = self.config['optimization']['backend']) def _bounds(self, init_params, bounds, model, fixed = None): 'Defines bounds; optionally "fixes" parameters by fixing bounds' params = init_params lower = [] upper = [] # Then, set the bounds; for free parameters, this is what was given; # for fixed parameters, this is the fixed value plus/minus some # tolerance for i, p in enumerate(self._required_parameters[model]): # This is a parameter to be optimized; use default bounds lower.append(bounds[p][0]) upper.append(bounds[p][1]) if fixed is not None: if p in fixed: if fixed[p] is not None: lower.pop() upper.pop() lower.append(fixed[p] - 1e-3) upper.append(fixed[p] + 1e-3) return (np.array(lower), np.array(upper)) def _clean( self, raw: Sequence, drivers: Sequence, protocol: str = 'GPP', num_std: int = 5): 'Cleans up data values according to a prescribed protocol' if protocol == 'GPP': # Filter out observed GPP values when GPP is negative or when # APAR < 0.1 g C m-2 day-1 apar = np.nanmean(drivers['fPAR'], axis = -1) * drivers['PAR'] cleaned = np.where( apar < 0.1, np.nan, np.where(raw < 0, np.nan, raw)) return np.apply_along_axis( lambda x: np.where( x > (num_std * np.nanstd(x)), np.nan, x), 0, cleaned) elif protocol == 'RECO': # Remove negative values return np.where(raw < 0, np.nan, raw) def _filter(self, raw: Sequence, size: int): 'Apply a smoothing filter with zero phase offset' if size > 1: window = np.ones(size) / size return np.apply_along_axis( lambda x: signal.filtfilt(window, np.ones(1), x), 0, raw) return raw # Or, revert to the raw data def _get_params(self, model: str): # Filter the parameters to just those for the PFT of interest return self.bplut.flat(self._pft, self._required_parameters[model.upper()]) def _load_gpp_data( self, filter_length, store = None, prefix = '', load_gpp = True, check_validity = True): 'Load the required datasets for GPP, for a single PFT' blacklist = self.config['data']['sites_blacklisted'] if store is None: store = self.hdf5 with h5py.File(store, 'r') as hdf: n_steps = hdf[f'{prefix}time'].shape[0] sites = hdf[f'{prefix}site_id'][:] if hasattr(sites[0], 'decode'): sites = list(map(lambda x: x.decode('utf-8'), sites)) # Get dominant PFT pft_map = hdf[f'{prefix}state/PFT'][:] if pft_map.ndim > 1: pft_map = pft_dominant(pft_map, sites) # Blacklist validation sites pft_mask = np.logical_and( np.in1d(pft_map, self._pft), ~np.in1d(sites, blacklist)) drivers = dict() field_map = self.config['data']['fields'] for field in self._required_drivers['GPP']: # Try reading the field exactly as described in config file if field in field_map: if field_map[field] in hdf: # Preserve 1-km subgrid for fPAR if field == 'fPAR': drivers[field] = hdf[field_map[field]][:,pft_mask,:] else: drivers[field] = hdf[field_map[field]][:,pft_mask] elif field == 'PAR': if 'SWGDN' not in field_map: raise ValueError(f"Could not find PAR or SWGDN data") drivers[field] = par(hdf[field_map['SWGDN']][:,pft_mask]) elif field == 'VPD': qv2m = hdf[field_map['QV2M']][:,pft_mask] ps = hdf[field_map['PS']][:,pft_mask] t2m = hdf[field_map['T2M']][:,pft_mask] drivers[field] = vpd(qv2m, ps, t2m) elif field == 'SMRZ': smrz = hdf[field_map['SMRZ0']][:,pft_mask] smrz_min = smrz.min(axis = 0) drivers[field] = rescale_smrz(smrz, smrz_min) elif field == 'FT': tsurf = hdf[field_map['Tsurf']][:,pft_mask] # Classify soil as frozen (FT=0) or unfrozen (FT=1) based # on threshold freezing point of water drivers[field] = np.where(tsurf <= 273.15, 0, 1) # Check units on fPAR if np.nanmax(drivers['fPAR'][:]) > 10: drivers['fPAR'] = drivers['fPAR'].astype(np.float32) / 100 assert len(set(self._required_drivers['GPP'])\ .difference(set(drivers.keys()))) == 0,\ 'Did not find all required drivers for the GPP model!' # If RMSE is used, then we want to pay attention to weighting weights = None if 'weights' in hdf.keys(): weights = hdf['weights'][pft_mask][np.newaxis,:]\ .repeat(n_steps, axis = 0) else: print('WARNING - "weights" not found in HDF5 file!') if load_gpp and 'GPP' not in hdf.keys(): with h5py.File( self.config['data']['supplemental_file'], 'r') as _hdf: tower_gpp = _hdf['GPP'][:][:,pft_mask] elif load_gpp: tower_gpp = hdf['GPP'][:][:,pft_mask] # Check that driver data do not contain NaNs if check_validity: for field in drivers.keys(): if field == 'fPAR': continue # The 1-km subgrid may have NaNs assert not np.isnan(drivers[field]).any(),\ f'Driver dataset "{field}" contains NaNs' # If we don't want to load observational data, return the drivers data if not load_gpp: return (drivers, None, None, None, weights) # Clean observations, then mask out driver data where the are no # observations tower_gpp = self._filter(tower_gpp, filter_length) tower_gpp = self._clean(tower_gpp, drivers, protocol = 'GPP') # Subset all datasets to just the valid observation site-days tower_gpp_flat = tower_gpp[~np.isnan(tower_gpp)] if weights is not None: weights = weights[~np.isnan(tower_gpp)] # For eveything other than fPAR, add a trailing axis to the flat view; # this will enable datasets to line up with fPAR's 1-km subgrid drivers_flat = list() for field in drivers.keys(): flat = drivers[field][~np.isnan(tower_gpp)] drivers_flat.append(flat[:,np.newaxis] if field != 'fPAR' else flat) return (drivers, drivers_flat, tower_gpp, tower_gpp_flat, weights) def _load_reco_data( self, filter_length, store = None, prefix = '', load_reco = True, check_validity = True): 'Load the required datasets for RECO, for a single PFT' blacklist = self.config['data']['sites_blacklisted'] if store is None: store = self.hdf5 with h5py.File(store, 'r') as hdf: n_steps = hdf[f'{prefix}time'].shape[0] sites = hdf[f'{prefix}site_id'][:] if hasattr(sites[0], 'decode'): sites = list(map(lambda x: x.decode('utf-8'), sites)) # Get dominant PFT pft_map = hdf[f'{prefix}state/PFT'][:] if pft_map.ndim > 1: pft_map = pft_dominant(pft_map, sites) # Blacklist validation sites pft_mask = np.logical_and( np.in1d(pft_map, self._pft), ~np.in1d(sites, blacklist)) drivers = dict() field_map = self.config['data']['fields'] for field in self._required_drivers['RECO']: # Try reading the field exactly as described in config file if field in field_map: if field_map[field] in hdf: drivers[field] = hdf[field_map[field]][:,pft_mask] # If RMSE is used, then we want to pay attention to weighting weights = None if 'weights' in hdf.keys(): weights = hdf['weights'][pft_mask][np.newaxis,:]\ .repeat(n_steps, axis = 0) else: print('WARNING - "weights" not found in HDF5 file!') if load_reco and 'RECO' not in hdf.keys(): with h5py.File( self.config['data']['supplemental_file'], 'r') as _hdf: tower_reco = _hdf['RECO'][:][:,pft_mask] elif load_reco: tower_reco = hdf['RECO'][:][:,pft_mask] # Check that driver data do not contain NaNs if check_validity: for field in drivers.keys(): assert not np.isnan(drivers[field]).any(),\ f'Driver dataset "{field}" contains NaNs' # If we don't want to load observational data, return the drivers data if not load_reco: return (drivers, None, weights) # Clean observations, then mask out driver data where the are no # observations tower_reco = self._filter(tower_reco, filter_length) tower_reco = self._clean(tower_reco, drivers, protocol = 'RECO') drivers = [drivers[k] for k in self._required_drivers['RECO']] return (drivers, tower_reco, weights) def _report(self, old_params, new_params, model, prec = 2): 'Prints a report on the updated (optimized) parameters' labels = self._required_parameters[model.upper()] pad = max(len(l) for l in labels) + 1 fmt_string = '-- {:<%d} {:>%d} [{:>%d}]' % (pad, 5 + prec, 7 + prec) print('%s parameters report, %s (PFT %d):' % ( f'{model.upper()} Optimization', PFT[self._pft][0], self._pft)) print((' {:>%d} {:>%d}' % (8 + pad + prec, 8 + prec))\ .format('NEW', 'INITIAL')) for i, label in enumerate(labels): new = ('%%.%df' % prec) % new_params[i] if new_params[i] is not None else '' old = ('%%.%df' % prec) % old_params[i] print(fmt_string.format(('%s:' % label), new, old)) @staticmethod def e_mult(params, tmin, vpd, smrz, ft): # Calculate E_mult based on current parameters f_tmin = linear_constraint(params[1], params[2]) f_vpd = linear_constraint(params[3], params[4], 'reversed') f_smrz = linear_constraint(params[5], params[6]) f_ft = linear_constraint(params[7], 1.0, 'binary') return f_tmin(tmin) * f_vpd(vpd) * f_smrz(smrz) * f_ft(ft) @staticmethod def k_mult(params, tsoil, smsf): # Calculate K_mult based on current parameters f_tsoil = partial(arrhenius, beta0 = params[1]) f_smsf = linear_constraint(params[2], params[3]) return f_tsoil(tsoil) * f_smsf(smsf) @staticmethod def gpp(params, fpar, par, tmin, vpd, smrz, ft): # Calculate GPP based on the provided BPLUT parameters apar = fpar * par return apar * params[0] *\ CalibrationAPI.e_mult(params, tmin, vpd, smrz, ft) @staticmethod def reco(params, tower_reco, tower_gpp, tsoil, smsf, q_rh, q_k): # Calculate RH as (RECO - RA) or (RECO - (faut * GPP)) ra = ((1 - params[0]) * tower_gpp) rh = tower_reco - ra rh = np.where(rh < 0, 0, rh) # Mask out negative RH values kmult0 = CalibrationAPI.k_mult(params, tsoil, smsf) # Make an initial guess at steady-state SOC, essentially, based on # inverting the observed RH flux cbar0 = cbar(rh, kmult0, q_rh, q_k) return ra + (kmult0 * cbar0) def pft(self, pft): ''' Sets the PFT class for the next calibration step. Parameters ---------- pft : int The PFT class to use in calibration Returns ------- CLI ''' assert pft in range(1, 9), 'Unrecognized PFT class' self._pft = pft return self def plot_gpp( self, driver, filter_length = 2, coefs = None, xlim = None, ylim = None, alpha = 0.1, marker = '.'): ''' Using the current or optimized BPLUT coefficients, plots the GPP ramp function for a given driver. NOTE: Values where APAR < 2.0 are not shown. Parameters ---------- driver : str Name of the driver to plot on the horizontal axis filter_length : int coefs : list or tuple or numpy.ndarray (Optional) array-like, Instead of using what's in the BPLUT, specify the exact parameters, e.g., [tmin0, tmin1] xlim : list or tuple (Optional) A 2-element sequence: The x-axis limits ylim : list or tuple (Optional) A 2-element sequence: The x-axis limits alpha : float (Optional) The alpha value (Default: 0.1) marker : str (Optional) The marker symbol (Default: ".") ''' @suppress_warnings def empirical_lue(apar, gpp): # Mask low APAR values lower, _ = self._driver_bounds.get('apar', (0, None)) apar = np.where(apar < lower, np.nan, apar) # Calculate empirical light-use efficiency: GPP/APAR return np.where(apar > 0, np.divide(gpp, apar), 0) np.seterr(invalid = 'ignore') # Read in GPP and APAR data assert driver.lower() in ('tmin', 'vpd', 'smrz'),\ 'Requested driver "%s" cannot be plotted for GPP' % driver if coefs is not None: assert hasattr(coefs, 'index') and not hasattr(coefs, 'title'),\ "Argument --coefs expects a list [values,] with NO spaces" coefs0 = [ # Original coefficients self.bplut[driver.lower()][i][self._pft] for i in (0, 1) ] # Load APAR and tower GPP data driver_data, _, tower_gpp, _, _ = self._load_gpp_data(filter_length) apar = np.nanmean(driver_data['fPAR'], axis = -1) * driver_data['PAR'] # Based on the bounds, create an empirical ramp function that # spans the range of the driver bounds = [ self.config['optimization']['bounds'][f'{driver.lower()}{i}'][i] for i in (0, 1) # i.e., min(tmin0) and max(tmin1) ] domain = np.arange(bounds[0], bounds[1], 0.1) ramp_func_original = linear_constraint( *coefs0, 'reversed' if driver.lower() == 'vpd' else None) if coefs is not None: ramp_func = linear_constraint( *coefs, 'reversed' if driver.lower() == 'vpd' else None) if driver.lower() == 'vpd': x0 = driver_data['VPD'] elif driver.lower() == 'tmin': x0 = driver_data['Tmin'] elif driver.lower() == 'smrz': x0 = driver_data['SMRZ'] # Update plotting parameters lue = empirical_lue(apar, tower_gpp) # Mask out negative LUE values and values with APAR<2 pyplot.scatter(x0, np.where( np.logical_or(lue == 0, apar < 2), np.nan, lue), alpha = alpha, marker = marker) # Plot the original ramp function (black line) pyplot.plot(domain, ramp_func_original(domain) *\ self.bplut['LUE'][:,self._pft], 'k-') # Optionally, plot a proposed ramp function (red line) if coefs is not None: pyplot.plot(domain, ramp_func(domain) *\ self.bplut['LUE'][:,self._pft], 'r-') pyplot.xlabel('%s (%s)' % (driver, self._metadata[driver]['units'])) pyplot.ylabel('GPP/APAR (g C MJ-1 d-1)') if xlim is not None: pyplot.xlim(xlim[0], xlim[1]) if ylim is not None: pyplot.ylim(ylim[0], ylim[1]) pyplot.title( '%s (PFT %d): GPP Response to "%s"' % ( PFT[self._pft][0], self._pft, driver)) pyplot.show() def plot_reco( self, driver, filter_length: int = 2, coefs = None, q_rh = 75, q_k = 50, xlim = None, ylim = None, alpha = 0.1, marker = '.'): ''' Using the current or optimized BPLUT coefficients, plots the RECO ramp function for a given driver. The ramp function is shown on a plot of RH/Cbar, which is equivalent to Kmult (as Cbar is an upper quantile of the RH/Kmult distribution). Parameters ---------- driver : str Name of the driver to plot on the horizontal axis filter_length : int coefs : list or tuple or numpy.ndarray (Optional) array-like, Instead of using what's in the BPLUT, specify the exact parameters, e.g., `[tmin0, tmin1]` q_rh : int Additional arguments to `pyl4c.apps.calibration.cbar()` q_k : int Additional arguments to `pyl4c.apps.calibration.cbar()` ylim : list or tuple (Optional) A 2-element sequence: The x-axis limits alpha : float (Optional) The alpha value (Default: 0.1) marker : str (Optional) The marker symbol (Default: ".") ''' xlabels = { 'smsf': 'Surface Soil Moisture', 'tsoil': 'Soil Temperature' } np.seterr(invalid = 'ignore') assert driver.lower() in ('tsoil', 'smsf'),\ 'Requested driver "%s" cannot be plotted for RECO' % driver if coefs is not None: assert hasattr(coefs, 'index') and not hasattr(coefs, 'title'),\ "Argument --coefs expects a list [values,] with NO spaces" drivers, reco, _ = self._load_reco_data(filter_length) _, _, gpp, _, _ = self._load_gpp_data(filter_length) tsoil, smsf = drivers # Calculate k_mult based on original parameters f_smsf = linear_constraint(*self.bplut['smsf'][:,self._pft]) k_mult = f_smsf(smsf) * arrhenius(tsoil, self.bplut['tsoil'][0,self._pft]) # Calculate RH as (RECO - RA) rh = reco - ((1 - self.bplut['CUE'][0,self._pft]) * gpp) # Set negative RH values to zero rh = np.where(suppress_warnings(np.less)(rh, 0), 0, rh) cbar0 = suppress_warnings(cbar)(rh, k_mult, q_rh, q_k) gpp = reco = None # Update plotting parameters pyplot.scatter( # Plot RH/Cbar against either Tsoil or SMSF tsoil if driver == 'tsoil' else smsf, suppress_warnings(np.divide)(rh, cbar0), alpha = alpha, marker = marker) if driver == 'tsoil': domain = np.arange(tsoil.min(), tsoil.max(), 0.1) pyplot.plot(domain, arrhenius(domain, self.bplut['tsoil'][0,self._pft]), 'k-') elif driver == 'smsf': domain = np.arange(smsf.min(), smsf.max(), 0.001) pyplot.plot(domain, f_smsf(domain).ravel(), 'k-') if coefs is not None: if driver == 'tsoil': pyplot.plot(domain, arrhenius(domain, *coefs), 'r-') elif driver == 'smsf': pyplot.plot(domain, linear_constraint(*coefs)(domain), 'r-') pyplot.xlabel('%s (%s)' % ( xlabels[driver], self._metadata[driver]['units'])) pyplot.ylabel('RH/Cbar') if xlim is not None: pyplot.xlim(xlim[0], xlim[1]) if ylim is not None: pyplot.ylim(ylim[0], ylim[1]) pyplot.title( '%s (PFT %d): RECO Response to "%s"' % ( PFT[self._pft][0], self._pft, 'SMSF' if driver.lower() == 'smsf' else 'Tsoil')) pyplot.show() def score( self, model: str, filter_length: int = 2, q_rh: int = 75, q_k: int = 50): ''' Scores the current model (for a specific PFT) based on the parameters in the BPLUT. Parameters ---------- model : str One of: "GPP" or "RECO" filter_length : int The window size for the smoothing filter, applied to the observed data ''' gpp_drivers, gpp_drivers_flat, tower_gpp, tower_gpp_flat, weights =\ self._load_gpp_data(filter_length) if model.upper() == 'GPP': observed = tower_gpp_flat elif model.upper() == 'RECO': reco_drivers, tower_reco, weights =\ self._load_reco_data(filter_length) observed = tower_reco # Print the parameters table old_params = restore_bplut_flat(self.config['BPLUT']) old_params = [ old_params[k][:,self._pft] for k in self._required_parameters[model.upper()] ] params = self._get_params(model.upper()) self._report(old_params, params, model.upper()) # Get goodness-of-fit statistics if model.upper() == 'GPP': predicted = self.gpp(params, *gpp_drivers_flat).mean(axis = -1) elif model.upper() == 'RECO': predicted = self.reco( params, tower_reco, tower_gpp, *reco_drivers, q_rh = q_rh, q_k = q_k) r2, rmse_score, ub_rmse, bias = report_fit_stats( observed, predicted, weights, verbose = False) print(f'{model.upper()} model goodness-of-fit statistics:') print(('-- {:<13} {:>5}').format('R-squared:', '%.3f' % r2)) print(('-- {:<13} {:>5}').format('RMSE:', '%.2f' % rmse_score)) print(('-- {:<13} {:>5}').format('Unbised RMSE:', '%.2f' % ub_rmse)) print(('-- {:<13} {:>5}').format('Bias:', '%.2f' % bias)) def set(self, parameter, value): ''' Sets the named parameter to the given value for the specified PFT class. This updates the initial parameters, affecting any subsequent optimization. Parameters ---------- parameter : str Name of the parameter to bet set value : int or float Value of the named parameter to be set Returns ------- CLI ''' # Update the BPLUT in memory but NOT the file BPLUT (this is temporary) self.bplut.update(self._pft, (value,), (parameter,), flush = False) return self def tune_gpp( self, filter_length: int = 2, optimize: bool = True, use_nlopt: bool = True): ''' Run the L4C GPP calibration. - For GPP data: Removes observations where GPP < 0 or where APAR is < 0.1 MJ m-2 day-1 Parameters ---------- filter_length : int The window size for the smoothing filter, applied to the observed data optimize : bool True to run the optimization (Default) or False if you just want the goodness-of-fit to be reported use_nlopt : bool True to use the `nlopt` library for optimization (Default: True); otherwise, uses `scipy` ''' def residuals(params, drivers, observed, weights): gpp0 = self.gpp(params, *drivers).mean(axis = -1) diff = np.subtract(observed, gpp0) # Objective function: Difference between tower GPP and L4C GPP, # multiplied by the tower weights return (weights * diff)[~np.isnan(diff)] init_params = self._get_params('GPP') # Load blacklisted sites (if any) blacklist = self.config['data']['sites_blacklisted'] print('Loading driver datasets...') drivers, drivers_flat, tower_gpp, tower_gpp_flat, weights =\ self._load_gpp_data(filter_length) print(f'NOTE: Counts of (days, towers) are: {tower_gpp.shape}') # Configure the optimization, get bounds for the parameter search fixed = None if self.config['optimization']['fixed'] is not None: fixed = dict([ # Get any fixed parameters as {param: fixed_value} (k, v[self._pft]) for k, v in self.config['optimization']['fixed'].items() ]) step_size_global = [ self.config['optimization']['step_size'][p] for p in self._required_parameters['GPP'] ] bounds_dict = self.config['optimization']['bounds'] bounds = self._bounds(init_params, bounds_dict, 'GPP', fixed) trials = self.config['optimization']['trials'] params = [] # Optimized parameters params0 = [] # Initial (random) parameters scores = [] # Will be a (100, P) space, where P is the number of parameters param_space = np.linspace(bounds[0], bounds[1], 100) for t in range(0, trials): # If multiple trials, randomize the initial parameter values # and score the model in each trial if trials > 1: p = param_space.shape[1] # Number of parameters idx = np.random.randint(0, param_space.shape[0], p) init_params = param_space[idx,np.arange(0, p)] params0.append(init_params) # If we're optimizing (with any library), define the bounds and # the objective function if optimize: bounds = self._bounds(init_params, bounds_dict, 'GPP', fixed) # Set initial value to a fixed value if specified if fixed is not None: for key, value in fixed.items(): if value is not None and key in self._required_parameters['GPP']: init_params[self._required_parameters['GPP'].index(key)] = value objective = partial( residuals, drivers = drivers_flat, observed = tower_gpp_flat, weights = weights) # Apply constrained, non-linear least-squares optimization, using # either SciPy or NLOPT if optimize and not use_nlopt: solution = solve_least_squares( objective, init_params, labels = self.required_parameters['GPP'], bounds = bounds, loss = 'arctan') fitted = solution.x.tolist() print(solution.message) elif optimize and use_nlopt: opt = GenericOptimization( objective, bounds, step_size = step_size_global) fitted = opt.solve(init_params) else: fitted = [None for i in range(0, len(init_params))] # Record the found solution and its goodness-of-fit score params.append(fitted) predicted = self.gpp( fitted if optimize else init_params, *drivers_flat).mean(axis = -1) _, rmse_score, _, _ = report_fit_stats( tower_gpp_flat, predicted, weights, verbose = False) print('[%s/%s] RMSE score of last trial: %.3f' % ( str(t + 1).zfill(2), str(trials).zfill(2), rmse_score)) scores.append(rmse_score) # Select the fit params with the best score if trials > 1: fitted = params[np.argmin(scores)] init_params = params0[np.argmin(scores)] # Generate and print a report, update the BPLUT parameters self._report(init_params, fitted, 'GPP') if optimize: self.bplut.update( self._pft, fitted, self._required_parameters['GPP']) def tune_reco( self, filter_length: int = 2, q_rh: int = 75, q_k: int = 50, optimize: bool = True, use_nlopt: bool = True): ''' Optimizes RECO. The 9-km mean L4C RECO is fit to the tower-observed RECO using constrained, non-linear least-squares optimization. Considerations: - Negative RH values (i.e., NPP > RECO) are set to zero. Parameters ---------- filter_length : int The window size for the smoothing filter, applied to the observed data optimize : bool False to only report parameters and their fit statistics instead of optimizing (Default: True) use_nlopt : bool True to use the `nlopt` library for optimization (Default: True); otherwise, uses `scipy` ''' def residuals( params, drivers, observed_reco, observed_gpp, weights, q_rh, q_k): # Objective function: Difference between tower RECO and L4C RECO reco0 = self.reco( params, observed_reco, observed_gpp, *drivers, q_rh, q_k) diff = np.subtract(observed_reco, reco0) missing = np.logical_or(np.isnan(observed_reco), np.isnan(reco0)) # Multiply by the tower weights return (weights * diff)[~missing] assert q_rh >= 0 and q_rh <= 100 and q_k >= 0 and q_k <= 100,\ 'Invalid setting for "q_rh" or "q_k" parameters' init_params = self._get_params('RECO') # Load blacklisted sites (if any) blacklist = self.config['data']['sites_blacklisted'] print('Loading driver datasets...') _, _, tower_gpp, _, _ = self._load_gpp_data(filter_length) drivers, tower_reco, weights = self._load_reco_data(filter_length) print(f'NOTE: Counts of (days, towers) are: {tower_reco.shape}') # Configure the optimization, get bounds for the parameter search fixed = None if self.config['optimization']['fixed'] is not None: fixed = dict([ # Get any fixed parameters as {param: fixed_value} (k, v[self._pft]) for k, v in self.config['optimization']['fixed'].items() ]) step_size_global = [ self.config['optimization']['step_size'][p] for p in self._required_parameters['RECO'] ] bounds_dict = self.config['optimization']['bounds'] bounds = self._bounds(init_params, bounds_dict, 'RECO', fixed) trials = self.config['optimization']['trials'] params = [] # Optimized parameters params0 = [] # Initial (random) parameters scores = [] # Will be a (100, P) space, where P is the number of parameters param_space = np.linspace(bounds[0], bounds[1], 100) for t in range(0, trials): # If multiple trials, randomize the initial parameter values # and score the model in each trial if trials > 1: p = param_space.shape[1] # Number of parameters idx = np.random.randint(0, param_space.shape[0], p) init_params = param_space[idx,np.arange(0, p)] params0.append(init_params) # If we're optimizing (with any library), define the bounds and # the objective function if optimize: bounds = self._bounds(init_params, bounds_dict, 'RECO', fixed) # Set initial value to a fixed value if specified for key, value in fixed.items(): if value is not None and key in self._required_parameters['RECO']: init_params[self._required_parameters['RECO'].index(key)] = value objective = partial( residuals, drivers = drivers, weights = weights, observed_gpp = tower_gpp, observed_reco = tower_reco, q_rh = q_rh, q_k = q_k) # Apply constrained, non-linear least-squares optimization, using # either SciPy or NLOPT if optimize and not use_nlopt: solution = solve_least_squares( objective, init_params, labels = self.required_parameters['RECO'], bounds = bounds, loss = 'arctan') fitted = solution.x.tolist() print(solution.message) elif optimize and use_nlopt: opt = GenericOptimization( objective, bounds, step_size = step_size_global) fitted = opt.solve(init_params) else: fitted = [None for i in range(0, len(init_params))] # Record the found solution and its goodness-of-fit score params.append(fitted) predicted = self.reco( fitted if optimize else init_params, tower_reco, tower_gpp, *drivers, q_rh = q_rh, q_k = q_k) _, rmse_score, _, _ = report_fit_stats( tower_reco, predicted, weights, verbose = False) print('[%s/%s] RMSE score of last trial: %.3f' % ( str(t + 1).zfill(2), str(trials).zfill(2), rmse_score)) scores.append(rmse_score) # Select the fit params with the best score if trials > 1: fitted = params[np.argmin(scores)] init_params = params0[np.argmin(scores)] # Generate and print a report, update the BPLUT parameters self._report(init_params, fitted, 'RECO') if optimize: self.bplut.update( self._pft, fitted, self._required_parameters['RECO']) def tune_soc(self, filter_length: int = 2, xlim = None, alpha = 0.6): ''' Starts interactive calibration procedure for the soil organic carbon (SOC) decay parameters for a given PFT. Uses any updated parameters in the attached storage: `optimization.backend` in the configuration file. Parameters ---------- filter_length : int The window size for the smoothing filter, applied to the observed data xlim : int or None The largest SOC value that should be shown on the plot (i.e., the X and Y axis limit, for a square plot); if None, defaults to the largest value in the inventory dataset alpha : float The alpha applied to the SOC scatterplot points (Default: 0.6) ''' soc_file = self.config['data']['soil_organic_carbon']['file'] # Load SOC pit measurements with h5py.File(soc_file, 'r') as hdf: dates = hdf[ self.config['data']['soil_organic_carbon']['fields']['time'] ][:] dates = [datetime.date(*ymd) for ymd in dates.tolist()] field = self.config['data']['soil_organic_carbon']['fields']['observed'] field_pft = self.config['data']['soil_organic_carbon']['fields']['PFT'] pft_mask = hdf[field_pft][:] target_soc = hdf[field][:][pft_mask == self._pft] print('Loading driver datasets...') drivers_for_gpp, _, _, _, _ = self._load_gpp_data( filter_length, store = soc_file, prefix = 'ISCN/', load_gpp = False, check_validity = False) drivers_for_reco, _, weights = self._load_reco_data( filter_length, store = soc_file, prefix = 'ISCN/', load_reco = False, check_validity = False) drivers_for_reco = [ # Convert from dict to list drivers_for_reco[key] for key in self._required_drivers['RECO'] ] # Calculate GPP based on the updated parameters init_params = restore_bplut_flat(self.config['BPLUT']) params_gpp = self._get_params('GPP') params_reco = self._get_params('RECO') gpp = self.gpp(params_gpp, *[ drivers_for_gpp[d] if d != 'fPAR' else np.nanmean(drivers_for_gpp[d], axis = -1) for d in self._required_drivers['GPP'] ]) # Calculate a 365-day climatology of NPP cue = params_reco[self._required_parameters['RECO'].index('CUE')] npp = gpp * cue # Make the time axis (currently 0) be the trailing axis npp_clim = climatology365(npp.swapaxes(0, 1), dates) # Calculate litterfall litter = npp_clim.sum(axis = 0) / 365 # Calculate a 365-day climatology of Kmult kmult = self.k_mult(params_reco, *drivers_for_reco) # Make the time axis (currently 0) be the trailing axis kmult_clim = climatology365(kmult.swapaxes(0, 1), dates) sigma = npp_clim.sum(axis = 0) / kmult_clim.sum(axis = 0) # Inferred steady-state storage fmet = init_params['f_metabolic'][:,self._pft][0] fstr = init_params['f_structural'][:,self._pft][0] decay_rates = self._get_params('SOC') decay_rates = decay_rates[:,np.newaxis] # Begin user-interaction loop to manually calibrate decay rates prev = None while True: init_soc = soc_analytical_spinup( litter, kmult_clim, fmet, fstr, decay_rates) soc, _ = soc_numerical_spinup( np.stack(init_soc), litter, kmult_clim, fmet, fstr, decay_rates, verbose = True) soc = np.stack(soc).sum(axis = 0) _, ax = pyplot.subplots(figsize = (6,6)) ax.plot([0, 1], [0, 1], transform = ax.transAxes, linestyle = 'dotted') if prev is not None: pyplot.plot( target_soc / 1e3, prev / 1e3, 'o', c = 'gray', alpha = alpha / 2) try: pyplot.plot(target_soc / 1e3, soc / 1e3, 'o', alpha = alpha) except: import ipdb ipdb.set_trace()#FIXME xmin, xmax = np.nanpercentile(target_soc / 1e3, (0, 100)) print(f'-- Min/Max of Inventory SOC: {xmin.round(1), xmax.round(1)}') print(f'-- Min/Max of Predicted SOC: {np.nanmin(soc / 1e3).round(1), np.nanmax(soc / 1e3).round(1)}') pyplot.xlim(0, xmax if xlim is None else xlim) pyplot.ylim(0, xmax if xlim is None else xlim) pyplot.xlabel('Inventory SOC (kg m$^{-2}$)') pyplot.ylabel('Modeled Equilibrium SOC (kg m$^{-2}$)') pyplot.title(f'For PFT={self._pft}') pyplot.show() # Calculate correlation coefficient mask = np.isnan(target_soc) r = np.corrcoef(target_soc[~mask], soc[~mask])[0,1] rmse = rmsd(target_soc[~mask], soc[~mask]) print(f'Current metabolic rate (r={r.round(3)}, RMSE={round(rmse, 1)}):') print('%.5f\n' % decay_rates[0]) proposal = input('New metabolic rate [Q to quit]:\n') if proposal == 'Q': break value = float(proposal) # NOTE: The "structural" and "recalcitrant" pool decay rates # here should be the actual decay rates, i.e., the "metabolic" # rate scaled by fixed constants decay_rates = np.array([ value, value * 0.4, value * 0.0093 ]).reshape((3, 1)) prev = soc.copy() print(f'Updated BPLUT decay rates for PFT={self._pft}') self.bplut.update( self._pft, decay_rates.ravel(), ['decay_rates0', 'decay_rates1', 'decay_rates2'])Convenience class for calibrating the L4C GPP and RECO models. Meant to be used with
fire.Fire(). Uses:# Run the calibration for a specific PFT python optimize.py tune-gpp --pft=<pft> # Get access to the sampler (and debugger), after calibration is run python optimize.py tune-gpp --pft=<pft> --ipdbStatic methods
def e_mult(params, tmin, vpd, smrz, ft)-
Expand source code
@staticmethod def e_mult(params, tmin, vpd, smrz, ft): # Calculate E_mult based on current parameters f_tmin = linear_constraint(params[1], params[2]) f_vpd = linear_constraint(params[3], params[4], 'reversed') f_smrz = linear_constraint(params[5], params[6]) f_ft = linear_constraint(params[7], 1.0, 'binary') return f_tmin(tmin) * f_vpd(vpd) * f_smrz(smrz) * f_ft(ft) def gpp(params, fpar, par, tmin, vpd, smrz, ft)-
Expand source code
@staticmethod def gpp(params, fpar, par, tmin, vpd, smrz, ft): # Calculate GPP based on the provided BPLUT parameters apar = fpar * par return apar * params[0] *\ CalibrationAPI.e_mult(params, tmin, vpd, smrz, ft) def k_mult(params, tsoil, smsf)-
Expand source code
@staticmethod def k_mult(params, tsoil, smsf): # Calculate K_mult based on current parameters f_tsoil = partial(arrhenius, beta0 = params[1]) f_smsf = linear_constraint(params[2], params[3]) return f_tsoil(tsoil) * f_smsf(smsf) def reco(params, tower_reco, tower_gpp, tsoil, smsf, q_rh, q_k)-
Expand source code
@staticmethod def reco(params, tower_reco, tower_gpp, tsoil, smsf, q_rh, q_k): # Calculate RH as (RECO - RA) or (RECO - (faut * GPP)) ra = ((1 - params[0]) * tower_gpp) rh = tower_reco - ra rh = np.where(rh < 0, 0, rh) # Mask out negative RH values kmult0 = CalibrationAPI.k_mult(params, tsoil, smsf) # Make an initial guess at steady-state SOC, essentially, based on # inverting the observed RH flux cbar0 = cbar(rh, kmult0, q_rh, q_k) return ra + (kmult0 * cbar0)
Methods
def pft(self, pft)-
Expand source code
def pft(self, pft): ''' Sets the PFT class for the next calibration step. Parameters ---------- pft : int The PFT class to use in calibration Returns ------- CLI ''' assert pft in range(1, 9), 'Unrecognized PFT class' self._pft = pft return selfSets the PFT class for the next calibration step.
Parameters
pft:int- The PFT class to use in calibration
Returns
CLI
def plot_gpp(self, driver, filter_length=2, coefs=None, xlim=None, ylim=None, alpha=0.1, marker='.')-
Expand source code
def plot_gpp( self, driver, filter_length = 2, coefs = None, xlim = None, ylim = None, alpha = 0.1, marker = '.'): ''' Using the current or optimized BPLUT coefficients, plots the GPP ramp function for a given driver. NOTE: Values where APAR < 2.0 are not shown. Parameters ---------- driver : str Name of the driver to plot on the horizontal axis filter_length : int coefs : list or tuple or numpy.ndarray (Optional) array-like, Instead of using what's in the BPLUT, specify the exact parameters, e.g., [tmin0, tmin1] xlim : list or tuple (Optional) A 2-element sequence: The x-axis limits ylim : list or tuple (Optional) A 2-element sequence: The x-axis limits alpha : float (Optional) The alpha value (Default: 0.1) marker : str (Optional) The marker symbol (Default: ".") ''' @suppress_warnings def empirical_lue(apar, gpp): # Mask low APAR values lower, _ = self._driver_bounds.get('apar', (0, None)) apar = np.where(apar < lower, np.nan, apar) # Calculate empirical light-use efficiency: GPP/APAR return np.where(apar > 0, np.divide(gpp, apar), 0) np.seterr(invalid = 'ignore') # Read in GPP and APAR data assert driver.lower() in ('tmin', 'vpd', 'smrz'),\ 'Requested driver "%s" cannot be plotted for GPP' % driver if coefs is not None: assert hasattr(coefs, 'index') and not hasattr(coefs, 'title'),\ "Argument --coefs expects a list [values,] with NO spaces" coefs0 = [ # Original coefficients self.bplut[driver.lower()][i][self._pft] for i in (0, 1) ] # Load APAR and tower GPP data driver_data, _, tower_gpp, _, _ = self._load_gpp_data(filter_length) apar = np.nanmean(driver_data['fPAR'], axis = -1) * driver_data['PAR'] # Based on the bounds, create an empirical ramp function that # spans the range of the driver bounds = [ self.config['optimization']['bounds'][f'{driver.lower()}{i}'][i] for i in (0, 1) # i.e., min(tmin0) and max(tmin1) ] domain = np.arange(bounds[0], bounds[1], 0.1) ramp_func_original = linear_constraint( *coefs0, 'reversed' if driver.lower() == 'vpd' else None) if coefs is not None: ramp_func = linear_constraint( *coefs, 'reversed' if driver.lower() == 'vpd' else None) if driver.lower() == 'vpd': x0 = driver_data['VPD'] elif driver.lower() == 'tmin': x0 = driver_data['Tmin'] elif driver.lower() == 'smrz': x0 = driver_data['SMRZ'] # Update plotting parameters lue = empirical_lue(apar, tower_gpp) # Mask out negative LUE values and values with APAR<2 pyplot.scatter(x0, np.where( np.logical_or(lue == 0, apar < 2), np.nan, lue), alpha = alpha, marker = marker) # Plot the original ramp function (black line) pyplot.plot(domain, ramp_func_original(domain) *\ self.bplut['LUE'][:,self._pft], 'k-') # Optionally, plot a proposed ramp function (red line) if coefs is not None: pyplot.plot(domain, ramp_func(domain) *\ self.bplut['LUE'][:,self._pft], 'r-') pyplot.xlabel('%s (%s)' % (driver, self._metadata[driver]['units'])) pyplot.ylabel('GPP/APAR (g C MJ-1 d-1)') if xlim is not None: pyplot.xlim(xlim[0], xlim[1]) if ylim is not None: pyplot.ylim(ylim[0], ylim[1]) pyplot.title( '%s (PFT %d): GPP Response to "%s"' % ( PFT[self._pft][0], self._pft, driver)) pyplot.show()Using the current or optimized BPLUT coefficients, plots the GPP ramp function for a given driver. NOTE: Values where APAR < 2.0 are not shown.
Parameters
driver:str- Name of the driver to plot on the horizontal axis
filter_length:intcoefs:listortupleornumpy.ndarray- (Optional) array-like, Instead of using what's in the BPLUT, specify the exact parameters, e.g., [tmin0, tmin1]
xlim:listortuple- (Optional) A 2-element sequence: The x-axis limits
ylim:listortuple- (Optional) A 2-element sequence: The x-axis limits
alpha:float- (Optional) The alpha value (Default: 0.1)
marker:str- (Optional) The marker symbol (Default: ".")
def plot_reco(self,
driver,
filter_length: int = 2,
coefs=None,
q_rh=75,
q_k=50,
xlim=None,
ylim=None,
alpha=0.1,
marker='.')-
Expand source code
def plot_reco( self, driver, filter_length: int = 2, coefs = None, q_rh = 75, q_k = 50, xlim = None, ylim = None, alpha = 0.1, marker = '.'): ''' Using the current or optimized BPLUT coefficients, plots the RECO ramp function for a given driver. The ramp function is shown on a plot of RH/Cbar, which is equivalent to Kmult (as Cbar is an upper quantile of the RH/Kmult distribution). Parameters ---------- driver : str Name of the driver to plot on the horizontal axis filter_length : int coefs : list or tuple or numpy.ndarray (Optional) array-like, Instead of using what's in the BPLUT, specify the exact parameters, e.g., `[tmin0, tmin1]` q_rh : int Additional arguments to `pyl4c.apps.calibration.cbar()` q_k : int Additional arguments to `pyl4c.apps.calibration.cbar()` ylim : list or tuple (Optional) A 2-element sequence: The x-axis limits alpha : float (Optional) The alpha value (Default: 0.1) marker : str (Optional) The marker symbol (Default: ".") ''' xlabels = { 'smsf': 'Surface Soil Moisture', 'tsoil': 'Soil Temperature' } np.seterr(invalid = 'ignore') assert driver.lower() in ('tsoil', 'smsf'),\ 'Requested driver "%s" cannot be plotted for RECO' % driver if coefs is not None: assert hasattr(coefs, 'index') and not hasattr(coefs, 'title'),\ "Argument --coefs expects a list [values,] with NO spaces" drivers, reco, _ = self._load_reco_data(filter_length) _, _, gpp, _, _ = self._load_gpp_data(filter_length) tsoil, smsf = drivers # Calculate k_mult based on original parameters f_smsf = linear_constraint(*self.bplut['smsf'][:,self._pft]) k_mult = f_smsf(smsf) * arrhenius(tsoil, self.bplut['tsoil'][0,self._pft]) # Calculate RH as (RECO - RA) rh = reco - ((1 - self.bplut['CUE'][0,self._pft]) * gpp) # Set negative RH values to zero rh = np.where(suppress_warnings(np.less)(rh, 0), 0, rh) cbar0 = suppress_warnings(cbar)(rh, k_mult, q_rh, q_k) gpp = reco = None # Update plotting parameters pyplot.scatter( # Plot RH/Cbar against either Tsoil or SMSF tsoil if driver == 'tsoil' else smsf, suppress_warnings(np.divide)(rh, cbar0), alpha = alpha, marker = marker) if driver == 'tsoil': domain = np.arange(tsoil.min(), tsoil.max(), 0.1) pyplot.plot(domain, arrhenius(domain, self.bplut['tsoil'][0,self._pft]), 'k-') elif driver == 'smsf': domain = np.arange(smsf.min(), smsf.max(), 0.001) pyplot.plot(domain, f_smsf(domain).ravel(), 'k-') if coefs is not None: if driver == 'tsoil': pyplot.plot(domain, arrhenius(domain, *coefs), 'r-') elif driver == 'smsf': pyplot.plot(domain, linear_constraint(*coefs)(domain), 'r-') pyplot.xlabel('%s (%s)' % ( xlabels[driver], self._metadata[driver]['units'])) pyplot.ylabel('RH/Cbar') if xlim is not None: pyplot.xlim(xlim[0], xlim[1]) if ylim is not None: pyplot.ylim(ylim[0], ylim[1]) pyplot.title( '%s (PFT %d): RECO Response to "%s"' % ( PFT[self._pft][0], self._pft, 'SMSF' if driver.lower() == 'smsf' else 'Tsoil')) pyplot.show()Using the current or optimized BPLUT coefficients, plots the RECO ramp function for a given driver. The ramp function is shown on a plot of RH/Cbar, which is equivalent to Kmult (as Cbar is an upper quantile of the RH/Kmult distribution).
Parameters
driver:str- Name of the driver to plot on the horizontal axis
filter_length:intcoefs:listortupleornumpy.ndarray- (Optional) array-like, Instead of using what's in the BPLUT,
specify the exact parameters, e.g.,
[tmin0, tmin1] q_rh:int- Additional arguments to
cbar() q_k:int- Additional arguments to
cbar() ylim:listortuple- (Optional) A 2-element sequence: The x-axis limits
alpha:float- (Optional) The alpha value (Default: 0.1)
marker:str- (Optional) The marker symbol (Default: ".")
def score(self, model: str, filter_length: int = 2, q_rh: int = 75, q_k: int = 50)-
Expand source code
def score( self, model: str, filter_length: int = 2, q_rh: int = 75, q_k: int = 50): ''' Scores the current model (for a specific PFT) based on the parameters in the BPLUT. Parameters ---------- model : str One of: "GPP" or "RECO" filter_length : int The window size for the smoothing filter, applied to the observed data ''' gpp_drivers, gpp_drivers_flat, tower_gpp, tower_gpp_flat, weights =\ self._load_gpp_data(filter_length) if model.upper() == 'GPP': observed = tower_gpp_flat elif model.upper() == 'RECO': reco_drivers, tower_reco, weights =\ self._load_reco_data(filter_length) observed = tower_reco # Print the parameters table old_params = restore_bplut_flat(self.config['BPLUT']) old_params = [ old_params[k][:,self._pft] for k in self._required_parameters[model.upper()] ] params = self._get_params(model.upper()) self._report(old_params, params, model.upper()) # Get goodness-of-fit statistics if model.upper() == 'GPP': predicted = self.gpp(params, *gpp_drivers_flat).mean(axis = -1) elif model.upper() == 'RECO': predicted = self.reco( params, tower_reco, tower_gpp, *reco_drivers, q_rh = q_rh, q_k = q_k) r2, rmse_score, ub_rmse, bias = report_fit_stats( observed, predicted, weights, verbose = False) print(f'{model.upper()} model goodness-of-fit statistics:') print(('-- {:<13} {:>5}').format('R-squared:', '%.3f' % r2)) print(('-- {:<13} {:>5}').format('RMSE:', '%.2f' % rmse_score)) print(('-- {:<13} {:>5}').format('Unbised RMSE:', '%.2f' % ub_rmse)) print(('-- {:<13} {:>5}').format('Bias:', '%.2f' % bias))Scores the current model (for a specific PFT) based on the parameters in the BPLUT.
Parameters
model:str- One of: "GPP" or "RECO"
filter_length:int- The window size for the smoothing filter, applied to the observed data
def set(self, parameter, value)-
Expand source code
def set(self, parameter, value): ''' Sets the named parameter to the given value for the specified PFT class. This updates the initial parameters, affecting any subsequent optimization. Parameters ---------- parameter : str Name of the parameter to bet set value : int or float Value of the named parameter to be set Returns ------- CLI ''' # Update the BPLUT in memory but NOT the file BPLUT (this is temporary) self.bplut.update(self._pft, (value,), (parameter,), flush = False) return selfSets the named parameter to the given value for the specified PFT class. This updates the initial parameters, affecting any subsequent optimization.
Parameters
parameter:str- Name of the parameter to bet set
value:intorfloat- Value of the named parameter to be set
Returns
CLI
def tune_gpp(self, filter_length: int = 2, optimize: bool = True, use_nlopt: bool = True)-
Expand source code
def tune_gpp( self, filter_length: int = 2, optimize: bool = True, use_nlopt: bool = True): ''' Run the L4C GPP calibration. - For GPP data: Removes observations where GPP < 0 or where APAR is < 0.1 MJ m-2 day-1 Parameters ---------- filter_length : int The window size for the smoothing filter, applied to the observed data optimize : bool True to run the optimization (Default) or False if you just want the goodness-of-fit to be reported use_nlopt : bool True to use the `nlopt` library for optimization (Default: True); otherwise, uses `scipy` ''' def residuals(params, drivers, observed, weights): gpp0 = self.gpp(params, *drivers).mean(axis = -1) diff = np.subtract(observed, gpp0) # Objective function: Difference between tower GPP and L4C GPP, # multiplied by the tower weights return (weights * diff)[~np.isnan(diff)] init_params = self._get_params('GPP') # Load blacklisted sites (if any) blacklist = self.config['data']['sites_blacklisted'] print('Loading driver datasets...') drivers, drivers_flat, tower_gpp, tower_gpp_flat, weights =\ self._load_gpp_data(filter_length) print(f'NOTE: Counts of (days, towers) are: {tower_gpp.shape}') # Configure the optimization, get bounds for the parameter search fixed = None if self.config['optimization']['fixed'] is not None: fixed = dict([ # Get any fixed parameters as {param: fixed_value} (k, v[self._pft]) for k, v in self.config['optimization']['fixed'].items() ]) step_size_global = [ self.config['optimization']['step_size'][p] for p in self._required_parameters['GPP'] ] bounds_dict = self.config['optimization']['bounds'] bounds = self._bounds(init_params, bounds_dict, 'GPP', fixed) trials = self.config['optimization']['trials'] params = [] # Optimized parameters params0 = [] # Initial (random) parameters scores = [] # Will be a (100, P) space, where P is the number of parameters param_space = np.linspace(bounds[0], bounds[1], 100) for t in range(0, trials): # If multiple trials, randomize the initial parameter values # and score the model in each trial if trials > 1: p = param_space.shape[1] # Number of parameters idx = np.random.randint(0, param_space.shape[0], p) init_params = param_space[idx,np.arange(0, p)] params0.append(init_params) # If we're optimizing (with any library), define the bounds and # the objective function if optimize: bounds = self._bounds(init_params, bounds_dict, 'GPP', fixed) # Set initial value to a fixed value if specified if fixed is not None: for key, value in fixed.items(): if value is not None and key in self._required_parameters['GPP']: init_params[self._required_parameters['GPP'].index(key)] = value objective = partial( residuals, drivers = drivers_flat, observed = tower_gpp_flat, weights = weights) # Apply constrained, non-linear least-squares optimization, using # either SciPy or NLOPT if optimize and not use_nlopt: solution = solve_least_squares( objective, init_params, labels = self.required_parameters['GPP'], bounds = bounds, loss = 'arctan') fitted = solution.x.tolist() print(solution.message) elif optimize and use_nlopt: opt = GenericOptimization( objective, bounds, step_size = step_size_global) fitted = opt.solve(init_params) else: fitted = [None for i in range(0, len(init_params))] # Record the found solution and its goodness-of-fit score params.append(fitted) predicted = self.gpp( fitted if optimize else init_params, *drivers_flat).mean(axis = -1) _, rmse_score, _, _ = report_fit_stats( tower_gpp_flat, predicted, weights, verbose = False) print('[%s/%s] RMSE score of last trial: %.3f' % ( str(t + 1).zfill(2), str(trials).zfill(2), rmse_score)) scores.append(rmse_score) # Select the fit params with the best score if trials > 1: fitted = params[np.argmin(scores)] init_params = params0[np.argmin(scores)] # Generate and print a report, update the BPLUT parameters self._report(init_params, fitted, 'GPP') if optimize: self.bplut.update( self._pft, fitted, self._required_parameters['GPP'])Run the L4C GPP calibration.
- For GPP data: Removes observations where GPP < 0 or where APAR is < 0.1 MJ m-2 day-1
Parameters
filter_length:int- The window size for the smoothing filter, applied to the observed data
optimize:bool- True to run the optimization (Default) or False if you just want the goodness-of-fit to be reported
use_nlopt:bool- True to use the
nloptlibrary for optimization (Default: True); otherwise, usesscipy
def tune_reco(self,
filter_length: int = 2,
q_rh: int = 75,
q_k: int = 50,
optimize: bool = True,
use_nlopt: bool = True)-
Expand source code
def tune_reco( self, filter_length: int = 2, q_rh: int = 75, q_k: int = 50, optimize: bool = True, use_nlopt: bool = True): ''' Optimizes RECO. The 9-km mean L4C RECO is fit to the tower-observed RECO using constrained, non-linear least-squares optimization. Considerations: - Negative RH values (i.e., NPP > RECO) are set to zero. Parameters ---------- filter_length : int The window size for the smoothing filter, applied to the observed data optimize : bool False to only report parameters and their fit statistics instead of optimizing (Default: True) use_nlopt : bool True to use the `nlopt` library for optimization (Default: True); otherwise, uses `scipy` ''' def residuals( params, drivers, observed_reco, observed_gpp, weights, q_rh, q_k): # Objective function: Difference between tower RECO and L4C RECO reco0 = self.reco( params, observed_reco, observed_gpp, *drivers, q_rh, q_k) diff = np.subtract(observed_reco, reco0) missing = np.logical_or(np.isnan(observed_reco), np.isnan(reco0)) # Multiply by the tower weights return (weights * diff)[~missing] assert q_rh >= 0 and q_rh <= 100 and q_k >= 0 and q_k <= 100,\ 'Invalid setting for "q_rh" or "q_k" parameters' init_params = self._get_params('RECO') # Load blacklisted sites (if any) blacklist = self.config['data']['sites_blacklisted'] print('Loading driver datasets...') _, _, tower_gpp, _, _ = self._load_gpp_data(filter_length) drivers, tower_reco, weights = self._load_reco_data(filter_length) print(f'NOTE: Counts of (days, towers) are: {tower_reco.shape}') # Configure the optimization, get bounds for the parameter search fixed = None if self.config['optimization']['fixed'] is not None: fixed = dict([ # Get any fixed parameters as {param: fixed_value} (k, v[self._pft]) for k, v in self.config['optimization']['fixed'].items() ]) step_size_global = [ self.config['optimization']['step_size'][p] for p in self._required_parameters['RECO'] ] bounds_dict = self.config['optimization']['bounds'] bounds = self._bounds(init_params, bounds_dict, 'RECO', fixed) trials = self.config['optimization']['trials'] params = [] # Optimized parameters params0 = [] # Initial (random) parameters scores = [] # Will be a (100, P) space, where P is the number of parameters param_space = np.linspace(bounds[0], bounds[1], 100) for t in range(0, trials): # If multiple trials, randomize the initial parameter values # and score the model in each trial if trials > 1: p = param_space.shape[1] # Number of parameters idx = np.random.randint(0, param_space.shape[0], p) init_params = param_space[idx,np.arange(0, p)] params0.append(init_params) # If we're optimizing (with any library), define the bounds and # the objective function if optimize: bounds = self._bounds(init_params, bounds_dict, 'RECO', fixed) # Set initial value to a fixed value if specified for key, value in fixed.items(): if value is not None and key in self._required_parameters['RECO']: init_params[self._required_parameters['RECO'].index(key)] = value objective = partial( residuals, drivers = drivers, weights = weights, observed_gpp = tower_gpp, observed_reco = tower_reco, q_rh = q_rh, q_k = q_k) # Apply constrained, non-linear least-squares optimization, using # either SciPy or NLOPT if optimize and not use_nlopt: solution = solve_least_squares( objective, init_params, labels = self.required_parameters['RECO'], bounds = bounds, loss = 'arctan') fitted = solution.x.tolist() print(solution.message) elif optimize and use_nlopt: opt = GenericOptimization( objective, bounds, step_size = step_size_global) fitted = opt.solve(init_params) else: fitted = [None for i in range(0, len(init_params))] # Record the found solution and its goodness-of-fit score params.append(fitted) predicted = self.reco( fitted if optimize else init_params, tower_reco, tower_gpp, *drivers, q_rh = q_rh, q_k = q_k) _, rmse_score, _, _ = report_fit_stats( tower_reco, predicted, weights, verbose = False) print('[%s/%s] RMSE score of last trial: %.3f' % ( str(t + 1).zfill(2), str(trials).zfill(2), rmse_score)) scores.append(rmse_score) # Select the fit params with the best score if trials > 1: fitted = params[np.argmin(scores)] init_params = params0[np.argmin(scores)] # Generate and print a report, update the BPLUT parameters self._report(init_params, fitted, 'RECO') if optimize: self.bplut.update( self._pft, fitted, self._required_parameters['RECO'])Optimizes RECO. The 9-km mean L4C RECO is fit to the tower-observed RECO using constrained, non-linear least-squares optimization. Considerations:
- Negative RH values (i.e., NPP > RECO) are set to zero.
Parameters
filter_length:int- The window size for the smoothing filter, applied to the observed data
optimize:bool- False to only report parameters and their fit statistics instead of optimizing (Default: True)
use_nlopt:bool- True to use the
nloptlibrary for optimization (Default: True); otherwise, usesscipy
def tune_soc(self, filter_length: int = 2, xlim=None, alpha=0.6)-
Expand source code
def tune_soc(self, filter_length: int = 2, xlim = None, alpha = 0.6): ''' Starts interactive calibration procedure for the soil organic carbon (SOC) decay parameters for a given PFT. Uses any updated parameters in the attached storage: `optimization.backend` in the configuration file. Parameters ---------- filter_length : int The window size for the smoothing filter, applied to the observed data xlim : int or None The largest SOC value that should be shown on the plot (i.e., the X and Y axis limit, for a square plot); if None, defaults to the largest value in the inventory dataset alpha : float The alpha applied to the SOC scatterplot points (Default: 0.6) ''' soc_file = self.config['data']['soil_organic_carbon']['file'] # Load SOC pit measurements with h5py.File(soc_file, 'r') as hdf: dates = hdf[ self.config['data']['soil_organic_carbon']['fields']['time'] ][:] dates = [datetime.date(*ymd) for ymd in dates.tolist()] field = self.config['data']['soil_organic_carbon']['fields']['observed'] field_pft = self.config['data']['soil_organic_carbon']['fields']['PFT'] pft_mask = hdf[field_pft][:] target_soc = hdf[field][:][pft_mask == self._pft] print('Loading driver datasets...') drivers_for_gpp, _, _, _, _ = self._load_gpp_data( filter_length, store = soc_file, prefix = 'ISCN/', load_gpp = False, check_validity = False) drivers_for_reco, _, weights = self._load_reco_data( filter_length, store = soc_file, prefix = 'ISCN/', load_reco = False, check_validity = False) drivers_for_reco = [ # Convert from dict to list drivers_for_reco[key] for key in self._required_drivers['RECO'] ] # Calculate GPP based on the updated parameters init_params = restore_bplut_flat(self.config['BPLUT']) params_gpp = self._get_params('GPP') params_reco = self._get_params('RECO') gpp = self.gpp(params_gpp, *[ drivers_for_gpp[d] if d != 'fPAR' else np.nanmean(drivers_for_gpp[d], axis = -1) for d in self._required_drivers['GPP'] ]) # Calculate a 365-day climatology of NPP cue = params_reco[self._required_parameters['RECO'].index('CUE')] npp = gpp * cue # Make the time axis (currently 0) be the trailing axis npp_clim = climatology365(npp.swapaxes(0, 1), dates) # Calculate litterfall litter = npp_clim.sum(axis = 0) / 365 # Calculate a 365-day climatology of Kmult kmult = self.k_mult(params_reco, *drivers_for_reco) # Make the time axis (currently 0) be the trailing axis kmult_clim = climatology365(kmult.swapaxes(0, 1), dates) sigma = npp_clim.sum(axis = 0) / kmult_clim.sum(axis = 0) # Inferred steady-state storage fmet = init_params['f_metabolic'][:,self._pft][0] fstr = init_params['f_structural'][:,self._pft][0] decay_rates = self._get_params('SOC') decay_rates = decay_rates[:,np.newaxis] # Begin user-interaction loop to manually calibrate decay rates prev = None while True: init_soc = soc_analytical_spinup( litter, kmult_clim, fmet, fstr, decay_rates) soc, _ = soc_numerical_spinup( np.stack(init_soc), litter, kmult_clim, fmet, fstr, decay_rates, verbose = True) soc = np.stack(soc).sum(axis = 0) _, ax = pyplot.subplots(figsize = (6,6)) ax.plot([0, 1], [0, 1], transform = ax.transAxes, linestyle = 'dotted') if prev is not None: pyplot.plot( target_soc / 1e3, prev / 1e3, 'o', c = 'gray', alpha = alpha / 2) try: pyplot.plot(target_soc / 1e3, soc / 1e3, 'o', alpha = alpha) except: import ipdb ipdb.set_trace()#FIXME xmin, xmax = np.nanpercentile(target_soc / 1e3, (0, 100)) print(f'-- Min/Max of Inventory SOC: {xmin.round(1), xmax.round(1)}') print(f'-- Min/Max of Predicted SOC: {np.nanmin(soc / 1e3).round(1), np.nanmax(soc / 1e3).round(1)}') pyplot.xlim(0, xmax if xlim is None else xlim) pyplot.ylim(0, xmax if xlim is None else xlim) pyplot.xlabel('Inventory SOC (kg m$^{-2}$)') pyplot.ylabel('Modeled Equilibrium SOC (kg m$^{-2}$)') pyplot.title(f'For PFT={self._pft}') pyplot.show() # Calculate correlation coefficient mask = np.isnan(target_soc) r = np.corrcoef(target_soc[~mask], soc[~mask])[0,1] rmse = rmsd(target_soc[~mask], soc[~mask]) print(f'Current metabolic rate (r={r.round(3)}, RMSE={round(rmse, 1)}):') print('%.5f\n' % decay_rates[0]) proposal = input('New metabolic rate [Q to quit]:\n') if proposal == 'Q': break value = float(proposal) # NOTE: The "structural" and "recalcitrant" pool decay rates # here should be the actual decay rates, i.e., the "metabolic" # rate scaled by fixed constants decay_rates = np.array([ value, value * 0.4, value * 0.0093 ]).reshape((3, 1)) prev = soc.copy() print(f'Updated BPLUT decay rates for PFT={self._pft}') self.bplut.update( self._pft, decay_rates.ravel(), ['decay_rates0', 'decay_rates1', 'decay_rates2'])Starts interactive calibration procedure for the soil organic carbon (SOC) decay parameters for a given PFT. Uses any updated parameters in the attached storage:
optimization.backendin the configuration file.Parameters
filter_length:int- The window size for the smoothing filter, applied to the observed data
xlim:intorNone- The largest SOC value that should be shown on the plot (i.e., the X and Y axis limit, for a square plot); if None, defaults to the largest value in the inventory dataset
alpha:float- The alpha applied to the SOC scatterplot points (Default: 0.6)
class GenericOptimization (func, bounds, method: int = 39, step_size=None, verbose=True)-
Expand source code
class GenericOptimization(object): ''' A more generic and expansive tool for optimization; includes many more algorithms for minimization/ maximization problems, including sequential quadratic programming (SQP), which is the default here and is closest to what is performed in Matlab's `fmincon`. Despite the similarity to `fmincon`, SQP will tend to deviate strongly from the initial parameters derived via fmincon. This solver is SLOW for gradient descent methods relative to `scipy.optimize.least_squares()`, because the gradient is calculated with a finite element approach. opt = GenericOptimization(residuals, OPT_BOUNDS['gpp'], step_size = (0.01, 0.1, 0.1, 1, 1, 0.1, 0.1, 0.05)) opt.solve(init_params) See: https://nlopt.readthedocs.io/en/latest/NLopt_Python_Reference/ Parameters ---------- func : function Function to calculate the residuals bounds : list or tuple 2-element sequence of (lower, upper) bounds where each element is an array method : str One of the nlopt algorithms step_size : list or tuple or numpy.ndarray Sequence of steps to take in gradient descent; not needed for derivative-free methods verbose : bool True to print all output to the screen ''' def __init__( self, func, bounds, method: int = nlopt.LD_SLSQP, step_size = None, verbose = True): # https://nlopt.readthedocs.io/en/latest/NLopt_Algorithms/#slsqp assert isinstance(method, int), 'Did not recognize "method" argument' self._bounds = bounds self._method = method self._residuals = func self._step_size = step_size self._verbose = verbose def solve(self, init_params, ftol = 1e-8, xtol = 1e-8, maxeval = 500): ''' Using the sum-of-squared errors (SSE) as the objective function, solves a minimization problem. Parameters ---------- init_params : list or tuple or numpy.ndarray Sequence of starting parameters (or "initial guesses") ftol : float xtol : float maxeval : int Maximum number of objective function evaluations Returns ------- numpy.ndarray ''' @suppress_warnings def sse(x): return np.power(self._residuals(x), 2).sum() @suppress_warnings def objf(x, grad): if grad.size > 0: # Approximate the gradient using finite element method grad[...] = optimize.approx_fprime( x, sse, self._step_size) return sse(x) opt = nlopt.opt(self._method, len(init_params)) # https://nlopt.readthedocs.io/en/latest/NLopt_Python_Reference/#localsubsidiary-optimization-algorithm if self._method == nlopt.G_MLSL_LDS: opt.set_local_optimizer( nlopt.opt(nlopt.LN_COBYLA, len(init_params))) opt.set_min_objective(objf) opt.set_lower_bounds(self._bounds[0]) opt.set_upper_bounds(self._bounds[1]) opt.set_ftol_abs(ftol) opt.set_xtol_abs(xtol) opt.set_maxeval(maxeval) if self._verbose: print('Solving...') return opt.optimize(init_params)A more generic and expansive tool for optimization; includes many more algorithms for minimization/ maximization problems, including sequential quadratic programming (SQP), which is the default here and is closest to what is performed in Matlab's
fmincon. Despite the similarity tofmincon, SQP will tend to deviate strongly from the initial parameters derived via fmincon. This solver is SLOW for gradient descent methods relative toscipy.optimize.least_squares(), because the gradient is calculated with a finite element approach.opt = GenericOptimization(residuals, OPT_BOUNDS['gpp'], step_size = (0.01, 0.1, 0.1, 1, 1, 0.1, 0.1, 0.05)) opt.solve(init_params)See: https://nlopt.readthedocs.io/en/latest/NLopt_Python_Reference/
Parameters
func:function- Function to calculate the residuals
bounds:listortuple- 2-element sequence of (lower, upper) bounds where each element is an array
method:str- One of the nlopt algorithms
step_size:listortupleornumpy.ndarray- Sequence of steps to take in gradient descent; not needed for derivative-free methods
verbose:bool- True to print all output to the screen
Methods
def solve(self, init_params, ftol=1e-08, xtol=1e-08, maxeval=500)-
Expand source code
def solve(self, init_params, ftol = 1e-8, xtol = 1e-8, maxeval = 500): ''' Using the sum-of-squared errors (SSE) as the objective function, solves a minimization problem. Parameters ---------- init_params : list or tuple or numpy.ndarray Sequence of starting parameters (or "initial guesses") ftol : float xtol : float maxeval : int Maximum number of objective function evaluations Returns ------- numpy.ndarray ''' @suppress_warnings def sse(x): return np.power(self._residuals(x), 2).sum() @suppress_warnings def objf(x, grad): if grad.size > 0: # Approximate the gradient using finite element method grad[...] = optimize.approx_fprime( x, sse, self._step_size) return sse(x) opt = nlopt.opt(self._method, len(init_params)) # https://nlopt.readthedocs.io/en/latest/NLopt_Python_Reference/#localsubsidiary-optimization-algorithm if self._method == nlopt.G_MLSL_LDS: opt.set_local_optimizer( nlopt.opt(nlopt.LN_COBYLA, len(init_params))) opt.set_min_objective(objf) opt.set_lower_bounds(self._bounds[0]) opt.set_upper_bounds(self._bounds[1]) opt.set_ftol_abs(ftol) opt.set_xtol_abs(xtol) opt.set_maxeval(maxeval) if self._verbose: print('Solving...') return opt.optimize(init_params)Using the sum-of-squared errors (SSE) as the objective function, solves a minimization problem.
Parameters
init_params:listortupleornumpy.ndarray- Sequence of starting parameters (or "initial guesses")
ftol:floatxtol:floatmaxeval:int- Maximum number of objective function evaluations
Returns
numpy.ndarray