Source code for gridded.depth


import warnings

import numpy as np
from gridded.time import Time
from gridded.grids import Grid
from gridded.utilities import (get_dataset,
                               search_dataset_for_any_long_name,
                               search_dataset_for_variables_by_longname,
                               search_dataset_for_variables_by_varname,
                               merge_var_search_dicts
                               )


[docs]class DepthBase(object): _default_component_types = {'time': Time, 'grid': Grid, 'variable': None} def __init__(self, surface_index=None, bottom_index=None, **kwargs): ''' :param surface_index: array index of 'highest' level (closest to sea level) :param bottom_index: array index of 'lowest' level (closest to seafloor) :param positive_down: orientation of points coordinates (for interpolation functions) ''' self.surface_index = surface_index self.bottom_index = bottom_index
[docs] @classmethod def from_netCDF(cls, surface_index=-1, **kwargs): return cls(surface_index, **kwargs)
[docs] def interpolation_alphas(self, points, time, data_shape, extrapolate=False, _hash=None, **kwargs,): ''' Returns the weights (alphas) required for interpolation The base class implementation only supports surface values: it returns the surface index and 0.0 for the alpha This is the expected outcome for the case where all points are on the surface. ''' sz = len(points) indices = np.ma.MaskedArray(data=np.ones((sz, ), dtype=np.int64) * self.surface_index, mask=np.zeros((sz, ), dtype=bool)) alphas = np.ma.MaskedArray(data=np.zeros((sz, ), dtype=np.float64), mask=np.zeros((sz, ), dtype=bool)) return indices, alphas
@classmethod def _find_required_depth_attrs(cls, filename, dataset=None, depth_topology=None): ''' This function is the top level 'search for attributes' function. If there are any common attributes to all potential depth types, they will be sought here. This function returns a dict, which maps an attribute name to a netCDF4 Variable or numpy array object extracted from the dataset. When called from a child depth object, this function should provide all the kwargs needed to create a valid instance using the __init__. There are no universally required terms (yet) ''' df_vars = dataset.variables if dataset is not None else get_dataset(filename).variables df_vars = dict([(k.lower(), v) for k, v in df_vars.items()] ) init_args = {} dt = {} return init_args, dt @classmethod def _gen_varname(cls, filename=None, dataset=None, grid_dataset=None, names_list=None, std_names_list=None): """ Function to find the default variable names if they are not provided. :param filename: Name of file that will be searched for variables :param dataset: Existing instance of a netCDF4.Dataset :type filename: string :type dataset: netCDF.Dataset :return: List of default variable names, or None if none are found """ df = None if dataset is not None: df = dataset else: df = get_dataset(filename) if names_list is None: names_list = cls.default_names if std_names_list is None: std_names_list = cls.cf_names for n in names_list: if n in df.variables.keys(): return n for n in std_names_list: for var in df.variables.values(): if hasattr(var, 'standard_name') or hasattr(var, 'long_name'): if var.name == n: return n raise KeyError("Default names not found.")
[docs]class L_Depth(DepthBase): _def_count=0 default_terms = [('depth_levels', 'depth')] default_names = {'depth': ['depth','depth_levels']} cf_names = {'depth': 'depth'} def __init__(self, name=None, terms=None, surface_index=None, bottom_index=None, default_surface_boundary_condition='extrapolate', default_bottom_boundary_conditon='mask', **kwargs): super(L_Depth, self).__init__(**kwargs) self.name=name self.terms={} if terms is None: raise ValueError('Must provide terms for level depth coordinate') else: self.terms=terms for k, v in terms.items(): setattr(self, k, v) self.surface_index = surface_index self.bottom_index = bottom_index self.default_surface_boundary_condition = default_surface_boundary_condition self.default_bottom_boundary_condition = default_bottom_boundary_conditon
[docs] @classmethod def from_netCDF(cls, filename=None, dataset=None, name=None, topology=None, terms=None, surface_index=None, bottom_index=None, **kwargs ): df = dataset if filename is None else get_dataset(filename, dataset) if df is None: raise ValueError('No filename or dataset provided') if name is None: name = cls.__name__ + str(cls._def_count) if terms is None: terms={} for tn, tln in cls.default_terms: vname=tn #if tn not in dataset.variables.keys(): # 2023 if tn not in df.variables.keys(): vname = cls._gen_varname(filename, dataset, [tn], [tln]) #terms[tn] = dataset[vname][:] # 2023 terms[tn] = df[vname][:] if surface_index is None: surface_index = np.argmin(terms['depth_levels']) if bottom_index is None: bottom_index = np.argmax(terms['depth_levels']) # 2023-02-21 set the depth of the top layer to zero terms['depth_levels'][surface_index] = 0.0 # 2023-02-21 set the depth of the top layer to zero return cls(name=name, terms=terms, surface_index=surface_index, bottom_index=bottom_index, **kwargs)
[docs] def interpolation_alphas(self, points, time = None, data_shape=None, surface_boundary_condition=None, bottom_boundary_condition=None, *args, **kwargs): ''' Returns a pair of values. The 1st value is an array of the depth indices of all the particles. The 2nd value is an array of the interpolation alphas for the particles between their depth index and depth_index+1. If both values are None, then all particles are on the surface layer. ''' points = np.asarray(points, dtype=np.float64) points = points.reshape(-1, 3) depths = points[:, 2] surface_boundary_condition = self.default_surface_boundary_condition if surface_boundary_condition is None else surface_boundary_condition bottom_boundary_condition = self.default_bottom_boundary_condition if bottom_boundary_condition is None else bottom_boundary_condition # process remaining points that are 'above the surface' or 'below the ground' # L0 and L1 bound the entire vertical layer # It is important to get the 'right' argument correct for np.digitize # See https://numpy.org/doc/stable/reference/generated/numpy.digitize.html L0 = self.depth_levels[0] L1 = self.depth_levels[-1] idxs = np.digitize(depths, self.depth_levels, right=False if L0 < L1 else True) - 1 indices = np.ma.MaskedArray(data=idxs, mask=np.zeros((len(idxs)), dtype=bool)) alphas = np.ma.MaskedArray(data=np.empty((len(points)), dtype=np.float64) * np.nan, mask=np.zeros((len(points)), dtype=bool)) #set above surface and below seafloor alphas to allow future filtering if L0 < L1: #0, 1, 2, 3, 4, 5, 6 above_surface = indices == -1 above_alpha = 1 below_bottom = indices == (len(self.depth_levels) - 1) below_alpha = 0 else: #6, 5, 4, 3, 2, 1, 0 above_surface = indices == (len(self.depth_levels) - 1) above_alpha = 0 below_bottom = indices == -1 below_alpha = 1 alphas[above_surface] = above_alpha alphas[below_bottom] = below_alpha #set above surface and below seafloor mask if surface_boundary_condition == 'mask': alphas.mask = np.logical_or(alphas.mask, above_surface) indices.mask[above_surface] = True alphas[indices == -1] = 1 if bottom_boundary_condition == 'mask': alphas.mask = np.logical_or(alphas.mask, below_bottom) indices.mask[below_bottom] = True within_layer = np.isnan(alphas) L0 = np.take(self.depth_levels, indices[within_layer]) L1 = np.take(self.depth_levels, indices[within_layer] + 1) alphas[within_layer] = (depths[within_layer] - L0) / ( L1 - L0) if any(np.isnan(alphas)): raise ValueError('Some alphas are still unmasked and NaN. Please file a bug report') return indices, alphas
@classmethod def _find_required_depth_attrs(cls, filename, dataset=None, topology=None): # THESE ARE ACTUALLY ALL OPTIONAL. This should be migrated when optional # attributes are dealt with # Get superset attributes gf_vars = dataset.variables if dataset is not None else get_dataset(filename).variables gf_vars = dict([(k.lower(), v) for k, v in gf_vars.items()] ) init_args, gt = super(L_Depth, cls)._find_required_depth_attrs(filename, dataset=dataset, topology=topology) return init_args, gt
[docs]class S_Depth(DepthBase): ''' Represents the ocean s-coordinates as implemented by ROMS, It may or may not be useful for other systems. ''' _def_count = 0 default_names = {'Cs_r': ['Cs_r'], 'Cs_w': ['Cs_w'], 's_rho': ['s_rho'], 's_w': ['s_w'], 'hc': ['hc'], 'bathymetry': ['h'], 'zeta': ['zeta'] } # NOTE: I don't think these are CFnames :-( cf_names = {'Cs_r': ['S-coordinate stretching curves at RHO-points'], 'Cs_w': ['S-coordinate stretching curves at W-points'], 's_rho': ['S-coordinate at RHO-points'], 's_w': ['S-coordinate at W-points'], 'hc': ['S-coordinate parameter, critical depth'], 'bathymetry': ['bathymetry at RHO-points', 'Final bathymetry at RHO-points'], 'zeta': ['free-surface'] } def __init__(self, name=None, time=None, grid=None, bathymetry=None, zeta=None, terms=None, vtransform=2, positive_down=True, surface_boundary_condition='extrapolate', bottom_boundary_condition='mask', **kwargs): ''' :param name: Human readable name :type name: string :param time: time axis of the object :type time: gridded.time.Time or derivative :param grid: x/y grid representation :type grid: gridded.grids.GridBase or derivative :param bathymetry: variable object representing seafloor :type bathymetry: gridded.variable.Variable or derivative :param zeta: variable object representing free-surface :type zeta: gridded.variable.Variable or derivative :param terms: remaining terms in dictionary layout :type terms: dictionary of string key to numeric value See S_Depth.default_names, sans bathymetry and zeta :param vtransform: S-coordinate transform type. 1 = Old, 2 = New :type vtransform: int (default 2) :param positive_down: Flag for interpreting depth values as positive or negative This applies to any coordinates passed into the various functions, NOT the values of the S-Coordinates that this object represents :type positive_down: boolean :param surface_boundary_condition: Determines how to handle points above the surface :type surface_boundary_condition: string ('extrapolate' or 'mask') :param bottom_boundary_condition: Determines how to handle points below the seafloor :type bottom_boundary_condition: string ('extrapolate' or 'mask') ''' super(S_Depth, self).__init__(**kwargs) self.name = name self.time = time self.grid = grid self.bathymetry = bathymetry self.zeta = zeta self.terms = {} self.vtransform = vtransform self.positive_down = positive_down if terms is None: raise ValueError('Must provide terms for sigma coordinate') else: self.terms=terms for k, v in terms.items(): setattr(self, k, v) if self.surface_index is None: self.surface_index = self.num_w_levels - 1 if self.bottom_index is None: self.bottom_index = 0 # self.rho_coordinates = self.compute_coordinates('rho') # self.w_coordinates = self.compute_coordinates('w') self.default_surface_boundary_condition = 'extrapolate' self.default_bottom_boundary_condition = 'mask'
[docs] @classmethod def from_netCDF(cls, filename=None, varnames=None, grid_topology=None, name=None, # units=None, time=None, # time_origin=None, grid=None, dataset=None, data_file=None, grid_file=None, # load_all=False, bathymetry=None, zeta=None, terms=None, # fill_value=0, vtransform=2, positive_down=True, **kwargs ): ''' :param filename: A string or ordered list of string of netCDF filename(s) :type filename: str or list[str] :param varnames: Direct mapping of component name to netCDF variable name. Use this if auto detection fails. Partial definition is allowable. Unspecified terms will use the value in `.default_names`:: {'Cs_r': 'Cs_r', 'Cs_w': Cs_w', 's_rho': 's_rho'), 's_w': 's_w', 'bathymetry': 'h', 'hc': 'hc'), 'zeta': 'zeta') } :type varnames: dict :param name: Human-readable name for this object :type name: str :param time: Time dimension (for zeta) :type time: gridded.time.Time or subclass :param grid: X-Y dmension (for bathymetry & zeta) :type grid: subclass of gridded.grids.GridBase ''' Grid = cls._default_component_types['grid'] Time = cls._default_component_types['time'] Variable = cls._default_component_types['variable'] if filename is not None: data_file = filename grid_file = filename ds = None dg = None if dataset is None: if grid_file == data_file: ds = dg = get_dataset(grid_file) else: ds = get_dataset(data_file) dg = get_dataset(grid_file) else: if grid_file is not None: dg = get_dataset(grid_file) else: dg = dataset ds = dataset if grid is None: grid = Grid.from_netCDF(grid_file, dataset=dg, grid_topology=grid_topology) if name is None: name = cls.__name__ + str(cls._def_count) cls._def_count += 1 varnames = cls.default_names.copy() # Do a comprehensive search for netCDF4 Variables all at once vn_search = search_dataset_for_variables_by_varname(ds, varnames) ds_search = search_dataset_for_variables_by_longname(ds, cls.cf_names) varnames = merge_var_search_dicts(ds_search, vn_search) if ds != dg: dg_search = search_dataset_for_variables_by_longname(dg, cls.cf_names) varnames = merge_var_search_dicts(varnames, dg_search) if bathymetry is None: bathy_var = varnames.get('bathymetry', None) if bathy_var is None: err = 'Unable to locate bathymetry in data file' if dg: raise ValueError(err + ' or grid file') raise ValueError(err) bathymetry = Variable.from_netCDF(dataset=bathy_var._grp, varname=bathy_var.name, grid=grid, name='bathymetry', ) if zeta is None: zeta_var = varnames.get('zeta', None) if zeta_var is None: warn = 'Unable to locate zeta in data file' if dg: warnings.warn(warn + ' or grid file.') warn += ' Generating constant (0) zeta.' warnings.warn(err) zeta = Variable.constant(0) else: zeta = Variable.from_netCDF(dataset=zeta_var._grp, varname=zeta_var.name, grid=grid, name='zeta') if time is None: time = zeta.time if terms is None: terms = {} for term, tvar in varnames.items(): if term in ['bathymetry', 'zeta']: # skip these because they're done separately... continue terms[term] = tvar[:] if vtransform is None: vtransform = 2 #default for ROMS # messing about trying to detect this. return cls(name=name, time=time, grid=grid, bathymetry=bathymetry, zeta=zeta, terms=terms, vtransform=vtransform, positive_down=positive_down, **kwargs)
@property def num_w_levels(self): return len(self.s_w) @property def num_r_levels(self): return len(self.s_rho) def __len__(self): return self.num_w_levels
[docs] def compute_coordinates(self, rho_or_w): """ Uses zeta, bathymetry, and the s-coordinate terms to produce the time-dependent coordinate variable. :return: numpy array of s-coordinates """ if rho_or_w == 'rho': s_c = self.s_rho Cs = self.Cs_r elif rho_or_w == 'w': s_c = self.s_w Cs = self.Cs_w else: raise ValueError('invalid rho_or_w argument (must be "rho" or "w")') if len(self.zeta.data.shape) == 2: #need this check to workaround a 'constant' zeta object zeta = np.zeros((len(self.time.data), ) + self.bathymetry.data.shape) else: zeta = self.zeta.data[:] h = self.bathymetry.data[:] hc = self.hc hCs = h * Cs[:,np.newaxis, np.newaxis] s_coord = None if self.vtransform == 1: S = (hc * s_c)[:, np.newaxis, np.newaxis] + hCs - (hc * Cs)[:, np.newaxis, np.newaxis] s_coord = -(S + zeta[:, np.newaxis, :, :] * (1 + S / h)) elif self.vtransform == 2: S = ((hc * s_c)[:, np.newaxis, np.newaxis] + hCs) / (hc + h) s_coord = -(zeta[:, np.newaxis, :, :] + (zeta[:, np.newaxis, :, :] + h) * S) return s_coord
[docs] def get_transect(self, points, time, rho_or_w='w', _hash=None, **kwargs): ''' :param points: array of points to interpolate to :type points: numpy array of shape (n, 3) :param time: time to interpolate to :type time: datetime.datetime :param rho_or_w: 'rho' or 'w' to interpolate to rho or w points :type rho_or_w: string :return: numpy array of shape (n, num_w_levels) of interpolated values ''' s_c = self.s_rho if rho_or_w == 'rho' else self.s_w C_s = self.Cs_r if rho_or_w == 'rho' else self.Cs_w h = self.bathymetry.at(points, time, unmask=False, _hash=_hash, **kwargs) zeta = self.zeta.at(points, time, unmask=False, _hash=_hash, **kwargs) hc = self.hc hCs = h * C_s[np.newaxis, :] if self.vtransform == 1: S = (hc* s_c) + hCs - (hc * C_s)[np.newaxis, :] s_coord = -(S + zeta * (1 + S / h)) elif self.vtransform == 2: S = ((hc * s_c) + hCs) / (hc + h) s_coord = -(zeta + (zeta + h) * S) return s_coord
def _L_Depth_given_bathymetry_t1(self, bathy, zeta, lvl, rho_or_w): """ computes the depth (positive up) of a level given the bathymetry and zeta. Produces an output array same shape as bathy and zeta Output is always 'positive down' This implements transform 1 (https://www.myroms.org/wiki/Vertical_S-coordinate) """ if rho_or_w == 'rho': s = self.s_rho[lvl] Cs = self.Cs_r[lvl] elif rho_or_w == 'w': s = self.s_w[lvl] Cs = self.Cs_w[lvl] else: raise ValueError('invalid rho_or_w argument (must be "rho" or "w")') hc = self.hc S = hc * s + (bathy - hc) * Cs return -(S + zeta * (1 + S / bathy)) def _L_Depth_given_bathymetry_t2(self, bathy, zeta, lvl, rho_or_w): """ computes the depth (positive up) of a level given the bathymetry and zeta. Produces an output array same shape as bathy and zeta Output is always 'positive down' This implements transform 2 (https://www.myroms.org/wiki/Vertical_S-coordinate) """ if rho_or_w == 'rho': s = self.s_rho[lvl] Cs = self.Cs_r[lvl] elif rho_or_w == 'w': s = self.s_w[lvl] Cs = self.Cs_w[lvl] else: raise ValueError('invalid rho_or_w argument (must be "rho" or "w")') hc = self.hc S = (hc * s + bathy * Cs) / (hc + bathy) return -(zeta + (zeta + bathy) * S)
[docs] def interpolation_alphas(self, points, time, data_shape, surface_boundary_condition=None, bottom_boundary_condition=None, _hash=None, extrapolate=False, **kwargs): ''' Returns a pair of arrays. The 1st array is of the depth indices of all the points. The 2nd value is an array of the interpolation alphas for the points between their depth index and depth_index+1. If a depth is between coordinate N and N+1, the index will be N. If a depth is exactly on a coordinate, the index will be N. Any points that are 'off grid' will be subject to the boundary conditions 'mask' or 'extrapolate'. 'mask' will mask the index and the alpha of the point. 'extrapolate' will set the index to the surface or bottom index, and the alpha to 0 or 1 depending on the orientation of the layers ''' depths = points[:, 2] surface_boundary_condition = (self.default_surface_boundary_condition if surface_boundary_condition is None else surface_boundary_condition) bottom_boundary_condition = (self.default_bottom_boundary_condition if bottom_boundary_condition is None else bottom_boundary_condition) rho_or_w = surface_index = None # parameter necessary for ldgb if data_shape[0] == self.num_w_levels: # data assumed to be on w points rho_or_w = 'w' surface_index = self.surface_index elif data_shape[0] == self.num_r_levels: # data assumed to be on rho points rho_or_w = 'rho' surface_index = self.surface_index-1 else: raise ValueError('Cannot get depth interpolation alphas for data shape specified; does not fit r or w depth axis') # ldbg = 'level depth given bathymetry'. This is the depth of the layer # at each x/y point. if self.vtransform == 2: ldgb = self._L_Depth_given_bathymetry_t2 elif self.vtransform == 1: ldgb = self._L_Depth_given_bathymetry_t1 else: raise ValueError('invalid vtransform attribute on depth object') zetas = self.zeta.at(points, time, _hash=_hash, unmask=False, extrapolate=extrapolate).reshape(-1) bathy = self.bathymetry.at(points, time, _hash=_hash, unmask=False, extrapolate=extrapolate).reshape(-1) surface_depth = ldgb(bathy, zetas, surface_index, rho_or_w) indices = np.ma.MaskedArray(data=-np.ones((len(points)), dtype=np.int64), mask=np.zeros((len(points)), dtype=bool)) alphas = np.ma.MaskedArray(data=np.empty((len(points)), dtype=np.float64) * np.nan, mask=np.zeros((len(points)), dtype=bool)) if not np.any(depths > surface_depth): # nothing is within or below the mesh, so apply surface boundary condition # to all points and return if surface_boundary_condition == 'extrapolate': indices[:] = surface_index alphas[:] = 0 else: indices = np.ma.masked_less(indices, 0) alphas = np.ma.masked_invalid(alphas, 0) return indices, alphas # use np.digitize to bin the depths into the layers transects = self.get_transect(points, time, rho_or_w=rho_or_w, _hash=_hash, extrapolate=extrapolate) vf = np.vectorize(np.digitize, signature='(),(n)->()', excluded=['right']) indices = vf(depths, transects, right=True) - 1 # transect mask is True where the point is outside the grid horizontally # so it must be reapplied indices = np.ma.array(indices, mask=transects.mask[:,0]) alphas.mask = transects.mask[:,0] # set above surface and below seafloor alphas to allow future filtering alphas[indices == surface_index] = 0 if surface_boundary_condition == 'mask': alphas.mask = np.logical_or(alphas.mask, indices == surface_index) indices.mask[indices == surface_index] = True alphas[indices == -1] = 1 if bottom_boundary_condition == 'mask': alphas.mask = np.logical_or(alphas.mask, indices == -1) indices.mask[indices == -1] = True # compute the remaining alphas, which should be for points within the depth interval L0 = np.take(transects, indices) L1 = np.take(transects, indices + 1) within_layer = np.isnan(alphas) #remaining alphas would still have nan at this point alphas[within_layer] = (depths[within_layer] - L0[within_layer]) / (L1[within_layer] - L0[within_layer]) if any(np.isnan(alphas)): raise ValueError('Some alphas are still unmasked and NaN. Please file a bug report') return indices, alphas
[docs] def get_section(self, time, coord='w', x_coord=None, y_coord=None, vtransform=2): ''' Returns a section of the z-level space in time. All s-levels are returned in the data. By providing a x_coord or y_coord you can get cross sections in the direction specified, or both for the level depths at a single point. ''' if coord not in ['w', 'rho']: raise ValueError('Can only specify "w" or "rho" for coord kwarg') ldgb = None z_shp = None if vtransform == 2: ldgb = self._L_Depth_given_bathymetry_t2 else: ldgb = self._L_Depth_given_bathymetry_t1 if coord == 'w': z_shp = (self.num_w_levels, self.zeta.data.shape[-2], self.zeta.data.shape[-1]) if coord == 'rho': z_shp = (self.num_r_levels, self.zeta.data.shape[-2], self.zeta.data.shape[-1]) time_idx = self.time.index_of(time) time_alpha = self.time.interp_alpha(time) z0 = self.zeta.data[time_idx] if time_idx == len(self.time.data) - 1 or time_idx == 0: zeta = self.zeta.data[time_idx] else: z1 = self.zeta.data[time_idx + 1] zeta = (z1 - z0)*(1-time_alpha) + z0 bathy = self.bathymetry.data[:] z_data=np.empty(z_shp) for lvl in range(0,len(z_data)): z_data[lvl] = ldgb(bathy, zeta, lvl, coord) return z_data
[docs]class Depth(object): ''' Factory class that generates depth objects. Also handles common loading and parsing operations ''' ld_names = ['level', 'levels', 'L_Depth', 'depth_levels' 'depth_level'] sd_names = ['sigma'] surf_names = ['surface', 'surface only', 'surf', 'none'] def __init__(self): raise NotImplementedError("Depth is not meant to be instantiated. " "Please use the 'from_netCDF' or 'surface_only' function")
[docs] @staticmethod def surface_only(*args, **kwargs): ''' If instantiated directly, this will always return a DepthBase It is assumed index -1 is the surface index ''' return DepthBase(*args,**kwargs)
[docs] @staticmethod def from_netCDF(filename=None, dataset=None, data_file=None, grid_file=None, depth_type=None, varname=None, topology=None, _default_types=(('level', L_Depth), ('sigma', S_Depth), ('surface', DepthBase)), **kwargs): ''' :param filename: File containing a depth :type filename: string or list of string :param dataset: Takes precedence over filename, if provided. :type dataset: netCDF4.Dataset :param depth_type: Must be provided if autodetection is not possible. See Depth.ld_names, Depth.sd_names, and Depth.surf_names for the expected values for this argument :type depth_type: string :returns: Instance of L_Depth or S_Depth ''' if filename is not None: data_file = filename grid_file = filename ds = None dg = None if dataset is None: if grid_file == data_file: ds = dg = get_dataset(grid_file) else: ds = get_dataset(data_file) dg = get_dataset(grid_file) else: if grid_file is not None: dg = get_dataset(grid_file) else: dg = dataset ds = dataset df = dataset if filename is None else get_dataset(filename, dataset) if df is None: raise ValueError('No filename or dataset provided') cls = depth_type if (depth_type is None or isinstance(depth_type, str) or not issubclass(depth_type, DepthBase)): cls = Depth._get_depth_type(df, depth_type, topology, _default_types) return cls.from_netCDF(filename=filename, dataset=dataset, grid_file=grid_file, data_file=data_file, topology=topology, **kwargs)
# @staticmethod # def depth_type_of_var(dataset, varname): # ''' # Given a varname or netCDF variable and dataset, try to determine the # depth type from dimensions, attributes, or other metadata information, # and return a grid_type and topology # ''' # var = dataset[varname] @staticmethod def _get_depth_type(dataset, depth_type=None, topology=None, _default_types=None): if _default_types is None: _default_types = dict() else: _default_types = dict(_default_types) S_Depth = _default_types.get('sigma', None) L_Depth = _default_types.get('level', None) Surface_Depth = _default_types.get('surface', None) if depth_type is not None: if depth_type.lower() in Depth.ld_names: return L_Depth elif depth_type.lower() in Depth.sd_names: return S_Depth elif depth_type.lower() in Depth.surf_names: return Surface_Depth else: raise ValueError('Specified depth_type not recognized/supported') if topology is not None: if ('faces' in topology.keys() or topology.get('depth_type', 'notype').lower() in ld_names): return L_Depth elif topology.get('depth_type', 'notype').lower() in Depth.sd_names: return S_Depth else: return Surface_Depth else: t1 = search_dataset_for_any_long_name(dataset, S_Depth.cf_names) if t1: return S_Depth else: try: L_Depth.from_netCDF(dataset=dataset) return L_Depth except: warnings.warn('''Unable to automatically determine depth system so reverting to surface-only mode. Please verify the index of the surface is correct.''', RuntimeWarning) return Surface_Depth