Module pyl4c.lib.cli
Classes for building command line interface (CLI) tools, such as for bulk processing data at the command line.
Classes
class CommandLineInterface
-
Expand source code
class CommandLineInterface(object): ''' A command line interface (CLI) convenience class used for creating Python scripts that can be invoked from the command line. ''' def __init__(self): pass def __check__(self): possible_required_keys = ('_output_path', '_output_tpl') for key in possible_required_keys: if hasattr(self, key): assert getattr(self, key) is not None, 'You must specify %s with: --%s=""' % (key, key) if hasattr(self, '_field_tpl'): assert self._field_tpl.rfind('%d') > 0, 'The field argument must contain a string formatting character, e.g., "SOC/soc_pft%d_mean"' if hasattr(self, '_grid'): assert self._grid is not None, 'Must specify the EASE-Grid 2.0 size with --grid argument ("M01" or "M09")' if hasattr(self, '_mask'): if self._mask is not None: assert isinstance(self._mask, str), 'Did not recognize --mask as a file path' assert os.path.exists(self._mask), 'File not found: %s (Cannot use shortcuts like ~)' % self._mask if hasattr(self, '_output_dir'): assert os.path.isdir(self._output_dir), 'Directory does not exist: %s' % self._output_dir if hasattr(self, '_output_path'): assert os.path.exists(os.path.dirname(self._output_path)), 'Did not recognize output_path (Cannot use shortcuts like ~)' if hasattr(self, '_output_tpl'): assert self._output_tpl.rfind('%s') >= 0, 'The output_tpl argument must have one string formatting character' if hasattr(self, '_reference'): if self._reference is not None: assert isinstance(self._reference, str), 'Did not recognize --reference as a file path' assert os.path.exists(self._reference), 'File not found: %s (Cannot use shortcuts like ~)' % self._reference if hasattr(self, '_summaries'): assert not isinstance(self._summaries, str), 'Could not interpret --summaries argument as a sequence of NumPy functions' @cached_property def __coords__(self): if self._grid in ('M01', 'M09'): return get_ease2_coords(self._grid) # But if coordinate arrays are not pre-computed... return ease2_coords_approx(self._grid) @cached_property def __shp__(self): if self._subset_id is not None: x_idx, y_idx = self.__slice_idx__ return (y_idx[1] - y_idx[0], x_idx[1] - x_idx[0]) x_coords, y_coords = self.__coords__ return (len(y_coords), len(x_coords)) @cached_property def __slice_idx__(self): # The ((xmin, xmax), (ymin, ymax)) indices in pixel space if self._subset_id is not None: return get_ease2_slice_idx( grid = self._grid, subset_id = self._subset_id) return (None, None) def __pieces__(self): # Returns the next row chunk p = 0 # Initialize at piece 0 start = 0 # Starting at first row, if no subsetting if self._subset_id is not None: x_idx, y_idx = self.__slice_idx__ start = y_idx[0] # Starting at top of subset num_pieces = self._pieces num_rows = self.__shp__[0] step = int(np.ceil(num_rows / num_pieces)) while p < num_pieces: yield (start + (p * step), start + ((p+1) * step)) p += 1 def gdt_to_dtype(self, gdt): return { gdal.GDT_Float32: np.float32, gdal.GDT_Float64: np.float64, gdal.GDT_Int16: np.int16, gdal.GDT_Int32: np.int32 }[gdt] def lookup_dtype(self, type_string): ''' Given, e.g., "float32", returns `numpy.float32`. Parameters ---------- type_string : str A NumPy named type, e.g., "float32", "int16", "byte" ''' return getattr(np, type_string) def lookup_gdt(self, type_string): ''' Given, e.g., "float32", returns `gdal.GDT_Float32`. Parameters ---------- type_string : str A NumPy named type, e.g., "float32", "int16", "byte" ''' return getattr( gdal, 'GDT_%s' % type_string.title() if type_string != 'uint8' else 'GDT_Byte') def infer_file_mode(self, file_paths): ''' Determine what class of file we're working with. Parameters ---------- file_path : str Returns ------- str One of: "hdf5", "sparse", "other" ''' # Check if a list/tuple given versus a character sequence path = file_paths[0] if len(file_paths[0]) > 1 else file_paths if path.split('.')[-1] == 'h5': return 'hdf5' elif path.split('.')[-1] in TYPE_MAP.keys(): return 'sparse' else: return 'other' def read_array(self, file_path, **kwargs): ''' Reads in the file at the given file path and returns a `numpy.ndarray`. Parameters ---------- file_path : str The location of a file start_row : int Index of the first row in a row chunk end_row : int Index of the final row in a row chunk mode : str The file mode to use shp : tuple The shape of the raster arrays Returns ------- numpy.ndarray ''' if 'start_row' in kwargs.keys() and 'end_row' in kwargs.keys(): if kwargs['start_row'] is not None and kwargs['end_row'] is not None: return self.read_chunked(file_path, **kwargs) if 'shp' in kwargs.keys(): shp = kwargs['shp'] else: shp = None mode = self._mode if 'mode' in kwargs.keys(): mode = kwargs['mode'] if self._subset_id is not None: x_idx, y_idx = self.__slice_idx__ xmin, xmax = x_idx ymin, ymax = y_idx # If we're reading HDF5 files... if mode == 'hdf5': with h5py.File(file_path, 'r') as hdf: assert self._field in hdf.keys(), 'Could not find the specified field name: %s' % self._field # If we're not subsetting... if self._subset_id is None: return hdf[self._field][:] return hdf[self._field][ymin:ymax, xmin:xmax] # If we're reading sparse TCF output arrays... elif mode in ('sparse', 'tcf'): # TCFArray has stronger assumptions than SparseArray; try to read # the file either way tcf = SparseArray(file_path, self._grid) tcf.inflate() if self._subset_id is None: return tcf.data[:] return tcf.data[ymin:ymax, xmin:xmax] # If we're reading arbitrary raster arrays... elif mode == 'other': if self._subset_id is not None: raise NotImplementedError('No way of compositing subsets on arbitrary raster arrays with unknown shape') return as_array(file_path, False)[0] else: raise NotImplementedError('File mode "%s" not recognized' % mode) def read_chunked( self, file_path, start_row, end_row, mode = None, shp = None): ''' Reads in the file at the given file path, row chunk by row chunk, and returns a `numpy.ndarray`. Parameters ---------- file_path : str The location of a file start_row : int Index of the first row in a row chunk end_row : int Index of the final row in a row chunk mode : str The file mode to use shp : tuple The shape of the raster arrays Returns ------- numpy.ndarray ''' if mode is None: mode = self._mode r0, r1 = (start_row, end_row) if self._subset_id is not None: x_idx, y_idx = self.__slice_idx__ xmin, xmax = x_idx # If we're reading HDF5 files... if mode == 'hdf5': with h5py.File(file_path, 'r') as hdf: assert self._field in hdf.keys(), 'Could not find the specified field name: %s' % self._field # If we're not subsetting... if self._subset_id is None: return hdf[self._field][r0:r1,:] return hdf[self._field][r0:r1, xmin:xmax] # If we're reading sparse TCF output arrays... elif mode in ('sparse', 'tcf'): # TCFArray has stronger assumptions than SparseArray; try to read # the file either way tcf = SparseArray(file_path, self._grid) tcf.inflate() if self._subset_id is None: return tcf.data[r0:r1,:] return tcf.data[r0:r1, xmin:xmax] # If we're reading arbitrary raster arrays... elif mode == 'other': if self._subset_id is None: shp = self.__shp__ if shp is None else shp return as_array( file_path, False, (0, r0, shp[1], r1))[0] return as_array(file_path, False, (xmin, r0, xmax, r1))[0] else: raise NotImplementedError('File mode "%s" not recognized' % mode) def read_raster(self, file_path, mode = None): ''' Reads in the file at the given file path and returns a `gdal.Dataset`. Parameters ---------- file_path : str The location of a file mode : str The file mode to use Returns ------- gdal.Dataset ''' if mode is None: mode = self._mode # For all other file modes... assert getattr(self, '_grid', None) is not None, 'Must define input --grid in order to read input HDF5 or sparse arrays' gt = EASE2_GRID_PARAMS[self._grid]['geotransform'] wkt = EPSG[EASE2_GRID_PARAMS[self._grid]['epsg']] # If we're reading arbitrary raster arrays... if mode == 'other': rast, _, _ = as_raster(file_path, False) return rast # If we're reading HDF5 files... elif mode == 'hdf5': with h5py.File(file_path, 'r') as hdf: assert self._field in hdf.keys(), 'Could not find the specified field name: %s' % self._field arr = hdf[self._field][:] return array_to_raster(arr, gt, wkt) # If we're reading sparse TCF output arrays... elif mode in ('sparse', 'tcf'): # TCFArray has stronger assumptions than SparseArray; try to read # the file either way tcf = SparseArray(file_path, self._grid) tcf.inflate() return array_to_raster(tcf.data, gt, wkt) else: raise NotImplementedError('File mode "%s" not recognized' % mode)
A command line interface (CLI) convenience class used for creating Python scripts that can be invoked from the command line.
Subclasses
Methods
def gdt_to_dtype(self, gdt)
-
Expand source code
def gdt_to_dtype(self, gdt): return { gdal.GDT_Float32: np.float32, gdal.GDT_Float64: np.float64, gdal.GDT_Int16: np.int16, gdal.GDT_Int32: np.int32 }[gdt]
def infer_file_mode(self, file_paths)
-
Expand source code
def infer_file_mode(self, file_paths): ''' Determine what class of file we're working with. Parameters ---------- file_path : str Returns ------- str One of: "hdf5", "sparse", "other" ''' # Check if a list/tuple given versus a character sequence path = file_paths[0] if len(file_paths[0]) > 1 else file_paths if path.split('.')[-1] == 'h5': return 'hdf5' elif path.split('.')[-1] in TYPE_MAP.keys(): return 'sparse' else: return 'other'
Determine what class of file we're working with.
Parameters
file_path
:str
Returns
str
- One of: "hdf5", "sparse", "other"
def lookup_dtype(self, type_string)
-
Expand source code
def lookup_dtype(self, type_string): ''' Given, e.g., "float32", returns `numpy.float32`. Parameters ---------- type_string : str A NumPy named type, e.g., "float32", "int16", "byte" ''' return getattr(np, type_string)
Given, e.g., "float32", returns
numpy.float32
.Parameters
type_string
:str
- A NumPy named type, e.g., "float32", "int16", "byte"
def lookup_gdt(self, type_string)
-
Expand source code
def lookup_gdt(self, type_string): ''' Given, e.g., "float32", returns `gdal.GDT_Float32`. Parameters ---------- type_string : str A NumPy named type, e.g., "float32", "int16", "byte" ''' return getattr( gdal, 'GDT_%s' % type_string.title() if type_string != 'uint8' else 'GDT_Byte')
Given, e.g., "float32", returns
gdal.GDT_Float32
.Parameters
type_string
:str
- A NumPy named type, e.g., "float32", "int16", "byte"
def read_array(self, file_path, **kwargs)
-
Expand source code
def read_array(self, file_path, **kwargs): ''' Reads in the file at the given file path and returns a `numpy.ndarray`. Parameters ---------- file_path : str The location of a file start_row : int Index of the first row in a row chunk end_row : int Index of the final row in a row chunk mode : str The file mode to use shp : tuple The shape of the raster arrays Returns ------- numpy.ndarray ''' if 'start_row' in kwargs.keys() and 'end_row' in kwargs.keys(): if kwargs['start_row'] is not None and kwargs['end_row'] is not None: return self.read_chunked(file_path, **kwargs) if 'shp' in kwargs.keys(): shp = kwargs['shp'] else: shp = None mode = self._mode if 'mode' in kwargs.keys(): mode = kwargs['mode'] if self._subset_id is not None: x_idx, y_idx = self.__slice_idx__ xmin, xmax = x_idx ymin, ymax = y_idx # If we're reading HDF5 files... if mode == 'hdf5': with h5py.File(file_path, 'r') as hdf: assert self._field in hdf.keys(), 'Could not find the specified field name: %s' % self._field # If we're not subsetting... if self._subset_id is None: return hdf[self._field][:] return hdf[self._field][ymin:ymax, xmin:xmax] # If we're reading sparse TCF output arrays... elif mode in ('sparse', 'tcf'): # TCFArray has stronger assumptions than SparseArray; try to read # the file either way tcf = SparseArray(file_path, self._grid) tcf.inflate() if self._subset_id is None: return tcf.data[:] return tcf.data[ymin:ymax, xmin:xmax] # If we're reading arbitrary raster arrays... elif mode == 'other': if self._subset_id is not None: raise NotImplementedError('No way of compositing subsets on arbitrary raster arrays with unknown shape') return as_array(file_path, False)[0] else: raise NotImplementedError('File mode "%s" not recognized' % mode)
Reads in the file at the given file path and returns a
numpy.ndarray
.Parameters
file_path
:str
- The location of a file
start_row
:int
- Index of the first row in a row chunk
end_row
:int
- Index of the final row in a row chunk
mode
:str
- The file mode to use
shp
:tuple
- The shape of the raster arrays
Returns
numpy.ndarray
def read_chunked(self, file_path, start_row, end_row, mode=None, shp=None)
-
Expand source code
def read_chunked( self, file_path, start_row, end_row, mode = None, shp = None): ''' Reads in the file at the given file path, row chunk by row chunk, and returns a `numpy.ndarray`. Parameters ---------- file_path : str The location of a file start_row : int Index of the first row in a row chunk end_row : int Index of the final row in a row chunk mode : str The file mode to use shp : tuple The shape of the raster arrays Returns ------- numpy.ndarray ''' if mode is None: mode = self._mode r0, r1 = (start_row, end_row) if self._subset_id is not None: x_idx, y_idx = self.__slice_idx__ xmin, xmax = x_idx # If we're reading HDF5 files... if mode == 'hdf5': with h5py.File(file_path, 'r') as hdf: assert self._field in hdf.keys(), 'Could not find the specified field name: %s' % self._field # If we're not subsetting... if self._subset_id is None: return hdf[self._field][r0:r1,:] return hdf[self._field][r0:r1, xmin:xmax] # If we're reading sparse TCF output arrays... elif mode in ('sparse', 'tcf'): # TCFArray has stronger assumptions than SparseArray; try to read # the file either way tcf = SparseArray(file_path, self._grid) tcf.inflate() if self._subset_id is None: return tcf.data[r0:r1,:] return tcf.data[r0:r1, xmin:xmax] # If we're reading arbitrary raster arrays... elif mode == 'other': if self._subset_id is None: shp = self.__shp__ if shp is None else shp return as_array( file_path, False, (0, r0, shp[1], r1))[0] return as_array(file_path, False, (xmin, r0, xmax, r1))[0] else: raise NotImplementedError('File mode "%s" not recognized' % mode)
Reads in the file at the given file path, row chunk by row chunk, and returns a
numpy.ndarray
.Parameters
file_path
:str
- The location of a file
start_row
:int
- Index of the first row in a row chunk
end_row
:int
- Index of the final row in a row chunk
mode
:str
- The file mode to use
shp
:tuple
- The shape of the raster arrays
Returns
numpy.ndarray
def read_raster(self, file_path, mode=None)
-
Expand source code
def read_raster(self, file_path, mode = None): ''' Reads in the file at the given file path and returns a `gdal.Dataset`. Parameters ---------- file_path : str The location of a file mode : str The file mode to use Returns ------- gdal.Dataset ''' if mode is None: mode = self._mode # For all other file modes... assert getattr(self, '_grid', None) is not None, 'Must define input --grid in order to read input HDF5 or sparse arrays' gt = EASE2_GRID_PARAMS[self._grid]['geotransform'] wkt = EPSG[EASE2_GRID_PARAMS[self._grid]['epsg']] # If we're reading arbitrary raster arrays... if mode == 'other': rast, _, _ = as_raster(file_path, False) return rast # If we're reading HDF5 files... elif mode == 'hdf5': with h5py.File(file_path, 'r') as hdf: assert self._field in hdf.keys(), 'Could not find the specified field name: %s' % self._field arr = hdf[self._field][:] return array_to_raster(arr, gt, wkt) # If we're reading sparse TCF output arrays... elif mode in ('sparse', 'tcf'): # TCFArray has stronger assumptions than SparseArray; try to read # the file either way tcf = SparseArray(file_path, self._grid) tcf.inflate() return array_to_raster(tcf.data, gt, wkt) else: raise NotImplementedError('File mode "%s" not recognized' % mode)
Reads in the file at the given file path and returns a
gdal.Dataset
.Parameters
file_path
:str
- The location of a file
mode
:str
- The file mode to use
Returns
gdal.Dataset
class ProgressBar (total, prefix='', suffix='', decimals=0, length=30, fill='|', verbose=True)
-
Expand source code
class ProgressBar(object): ''' An animated progress bar for printing progress to the screen, as with a command line interface. Used as a context manager around a loop, e.g.: with ProgressBar(len(things), "Working...") as progress: for i, each in enumerate(things): ... progress.update(i) Parameters ---------- total : int The total number of loop iterations prefix : str (Optional) The text to print before the progress bar (Default: "") suffix : str (Optional) The text to print at the end of the progress bar (Default: "") decimals : int (Optional) Number of decimal places for the progress bar's percent (Default: 0) length : int (Optional) The length of the progress bar, in characters (Default: 30) fill : str (Optional) The character to display for the filled portion of the bar (Default: "|") verbose : bool True to display the bar (Default: True); set to False if the `ProgressBar` should not be displayed ''' def __init__( self, total, prefix = '', suffix = '', decimals = 0, length = 30, fill = '|', verbose = True): self._decimals = decimals self._fill = fill self._length = length self._prefix = prefix self._suffix = suffix self._total = total # Some CLIs might have a "verbose" mode, in which progress *should* be # printed to the screen; when not in "verbose," we still enter and # exit the ProgressBar context; so, we return a "dummy" bar that # cannot print to screen when not in "verbose" mode self._verbose = verbose def __enter__(self): self.update(0) # Initialize the bar if not self._verbose: return self.dummy() # Return a dummy bar return self def __exit__(self, exc_type, exc_value, traceback): if self._verbose: self.update(self._total) # Finalize the bar @property def prefix(self): return self._prefix @prefix.setter def prefix(self, new_prefix): self._prefix = new_prefix def dummy(self): ''' Returns a dummy instance of ProgressBar that prints nothing; used when not in "verbose" mode (the default). ''' dummy = ProgressBar(0) setattr(dummy, 'update', lambda s, i: None) return dummy def update(self, iteration): ''' When called in a loop, creates a progress bar in the terminal. NOTE: Adapted from this example: https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console Parameters ---------- iteration : int The current iteration total : int The total iterations prefix : str (Optional) Prefix string suffix : str (Optional) Suffix string decimals : int (Optional) Positive number of decimals in percent complete length : int (Optional) Character length of bar fill : str (Optional) Bar fill character ''' percent = ("{0:." + str(self._decimals) + "f}")\ .format(100 * (iteration / float(self._total))) d = 1 if self._decimals > 0 else 0 # Space for decimal point filled_len = int(self._length * iteration // self._total) bar = self._fill * filled_len + '-' * (self._length - filled_len) # Print new line on complete if iteration == self._total: # Clear the bar, leave the prefix print('\r%s%s' % (self._prefix, ''.rjust(self._length + 10))) else: print('\r%s [%s] %s%% %s' % ( self._prefix, bar, percent.rjust(self._decimals + d + 3), self._suffix ), end = '\r')
An animated progress bar for printing progress to the screen, as with a command line interface. Used as a context manager around a loop, e.g.:
with ProgressBar(len(things), "Working...") as progress: for i, each in enumerate(things): ... progress.update(i)
Parameters
total
:int
- The total number of loop iterations
prefix
:str
- (Optional) The text to print before the progress bar (Default: "")
suffix
:str
- (Optional) The text to print at the end of the progress bar (Default: "")
decimals
:int
- (Optional) Number of decimal places for the progress bar's percent (Default: 0)
length
:int
- (Optional) The length of the progress bar, in characters (Default: 30)
fill
:str
- (Optional) The character to display for the filled portion of the bar (Default: "|")
verbose
:bool
- True to display the bar (Default: True); set to False if the
ProgressBar
should not be displayed
Instance variables
prop prefix
-
Expand source code
@property def prefix(self): return self._prefix
Methods
def dummy(self)
-
Expand source code
def dummy(self): ''' Returns a dummy instance of ProgressBar that prints nothing; used when not in "verbose" mode (the default). ''' dummy = ProgressBar(0) setattr(dummy, 'update', lambda s, i: None) return dummy
Returns a dummy instance of ProgressBar that prints nothing; used when not in "verbose" mode (the default).
def update(self, iteration)
-
Expand source code
def update(self, iteration): ''' When called in a loop, creates a progress bar in the terminal. NOTE: Adapted from this example: https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console Parameters ---------- iteration : int The current iteration total : int The total iterations prefix : str (Optional) Prefix string suffix : str (Optional) Suffix string decimals : int (Optional) Positive number of decimals in percent complete length : int (Optional) Character length of bar fill : str (Optional) Bar fill character ''' percent = ("{0:." + str(self._decimals) + "f}")\ .format(100 * (iteration / float(self._total))) d = 1 if self._decimals > 0 else 0 # Space for decimal point filled_len = int(self._length * iteration // self._total) bar = self._fill * filled_len + '-' * (self._length - filled_len) # Print new line on complete if iteration == self._total: # Clear the bar, leave the prefix print('\r%s%s' % (self._prefix, ''.rjust(self._length + 10))) else: print('\r%s [%s] %s%% %s' % ( self._prefix, bar, percent.rjust(self._decimals + d + 3), self._suffix ), end = '\r')
When called in a loop, creates a progress bar in the terminal. NOTE: Adapted from this example:
<https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console>
Parameters
iteration
:int
- The current iteration
total
:int
- The total iterations
prefix
:str
- (Optional) Prefix string
suffix
:str
- (Optional) Suffix string
decimals
:int
- (Optional) Positive number of decimals in percent complete
length
:int
- (Optional) Character length of bar
fill
:str
- (Optional) Bar fill character