Source code for gridded.time

#!/usr/binenv python

from textwrap import dedent
import logging
import netCDF4 as nc4
import numpy as np

from datetime import datetime, timedelta

from gridded.utilities import get_dataset


[docs] class OutOfTimeRangeError(ValueError): """ Used for when data is asked for outside of the range of times supported by a Time object. """ pass
[docs] class TimeSeriesError(ValueError): """ Used for when data is asked for outside of the range of times supported by a Time object. """ pass
[docs] def offset_as_iso_string(offset_hours): """ returns the offset as an isostring: -8:00 3:30 etc ... """ if offset_hours is None: return "" else: sign = "-" if offset_hours <0 else "+" hours = int(abs(offset_hours)) minutes = int((abs(offset_hours) - hours) * 60) return f"{sign}{hours:0>2}:{minutes:0>2}"
[docs] def parse_time_offset(unit_str): # NOTE: this uses dateutil -- which isn't otherwise required # we could write it by hand instead. """ find the time offset from a CF-style time units string. Follows the ISO format(s): ('UTC', 'days since 2024-1-1T00:00:00Z'), ('UTC-0', 'days since 2024-1-1T00:00:00+00:00'), ('naive', 'days since 2024-1-1T00:00:00'), ('offset-7', 'days since 2024-1-1T00:00:00-7:00') :param unit_str: CF-style time unit string :returns: Number: hours, str: name: """ import dateutil t_string = unit_str.split('since')[1] try: dt = dateutil.parser.parse(t_string) except dateutil.parser.ParserError: logging.warning(f"Couldn't parse TZ offset in time string: '{t_string}'. Setting to default") offset = None else: offset = dt.utcoffset() if offset is None: offset_hours = None name = None else: offset_hours = offset.total_seconds() / 3600 name = dt.tzname() if name is None: name = offset_as_iso_string(offset_hours) return offset_hours, name
[docs] class Time(object): # Used to make a singleton with the constant_time class method. # question: why not a ContantTime Class? _const_time = None def __init__(self, data=None, filename=None, varname=None, tz_offset=None, new_tz_offset=None, tz_offset_name="", origin=None, displacement=None, *args, **kwargs): ''' Representation of a time axis. Provides interpolation alphas and indexing. :param data: Ascending list of times to use :type data: Sequence of `datetime.datetime` objects. :param filename=None: name of file data was loaded from -- informational only :param varname=None: name of variable in data file -- informational only :param tz_offset=None: Timezone offset of the data. If not provided it will be NOne ("Naive") e.g. offset is not known. Use zero for UTC. :type tz_offset: float or integer hours :param new_tz_offset: Timezone offset desired -- from the offset specified by tz_offset. :type new_tz_offset: float or integer hours :param tz_offset_name="": name for the time zone offset, if desired. example: "Pacific Daylight Time" NOTE: only informational -- it's up to you to have the tz_offset correct. :type tz_offset_name: str. :param origin: shifts the time interval to begin at the time specified :type origin: `datetime.datetime` :param displacement: displacement to apply to the time data. Allows shifting entire time interval into future or past :type displacement: `datetime.timedelta` ''' if isinstance(data, Time): self.data = data.data elif data is None: self.data = np.array([datetime.now().replace(second=0, microsecond=0)]) else: self.data = np.asarray(data) # Quick check to ensure data is 'datetime-like' enough try: self.data += timedelta(seconds=0) except TypeError: raise TypeError(f"Time datatype: {self.data.dtype} not compatible with datetime.") if origin is not None: diff = self.data[0] - origin self.data -= diff self.filename = filename self.varname = varname self.tz_offset_name = tz_offset_name # set the private attribute directly, because using the property # can cause the data to be shifted twice in case of loading from a # serialization of this object self._tz_offset = tz_offset if new_tz_offset is not None: if self._tz_offset is None: raise ValueError("You cannot specify a new_tz_offset without specifying the current offset" "i.e. tz_offset can not be None. Use 0 for UTC.") # this will shift it relative to tz_offset self.tz_offset = new_tz_offset if displacement is not None: self.displacement = displacement if not self._timeseries_is_ascending(self.data): raise TimeSeriesError("Time sequence is not ascending") if self._has_duplicates(self.data): raise TimeSeriesError("Time sequence has duplicate entries") super(Time, self).__init__(*args, **kwargs)
[docs] @staticmethod def locate_time_var_from_var(datavar): if hasattr(datavar, 'time') and datavar.time in datavar._grp.dimensions.keys(): varname = datavar.time else: varname = datavar.dimensions[0] if 'time' in datavar.dimensions[0] else None return varname
[docs] @classmethod def from_netCDF(cls, filename=None, dataset=None, varname=None, datavar=None, tz_offset=None, new_tz_offset=None, tz_offset_name=None, origin=None, displacement=None, **kwargs): """ Construct a Time object from a netcdf file. By default, this will attempt to discover the data variable to use. If the file is not standards conforming, you can specify the time variable name, or another variable, for which you want the corresponding times. :param filename=None: name of netcdf file :param dataset=None: netcdf dataset object (only one of filename or dataset should be specified) :param varname=None: Name of the time variable. :param datavar=None: A netcdf variable or name of netcdf variable for which you want the corresponding time object. It will try to find the time variable that corresponds to the passed in variable. :param tz_offset=None: Timezone offset from UTC the data are in, in hours. If None: offset will be read from file, if present. If no offset is specified in the file, UTC (tz_offset=0) will be assumed. If 'Naive', then no offset will be assigned. :type tz_offset: Number in hours. :param new_tz_offset=None: New Timezone offset from UTC desired in hours. Data will be shifted to the new offset. If tz_offset is set to Naive, then this will fail. (It can't be changed without knowing what it was to begin with) :type new_tz_offset: Number in hours. :param tz_offset_name="": name for the time zone offset, if desired. example: "Pacific Daylight Time" NOTE: only informational -- it's up to you to have the tz_offset correct. :type tz_offset_name: str. """ if varname is None and datavar is None: raise TypeError('you must pass in either a varname or a datavar') if dataset is None: dataset = get_dataset(filename) if varname is None and datavar is not None: if isinstance(datavar, str): datavar = dataset.variables[datavar] varname = cls.locate_time_var_from_var(datavar) # fixme: This seems risky -- better to raise and deal with it elsewhere. if varname is None: return cls.constant_time() if isinstance(varname, str): tvar = dataset.variables[varname] # figure out the timezone_offset if isinstance(tz_offset, str) and tz_offset.lower() == 'naive': tz_offset = None name = "No Timezone Specified" elif tz_offset is None: # look in the time units attribute: tz_offset, name = parse_time_offset(tvar.units) # assuming, for netcdf files, that no specified offset is UTC (CF standard) if tz_offset is None: tz_offset = 0 name = "UTC" else: name = offset_as_iso_string(tz_offset) if tz_offset_name is None: tz_offset_name = name tdata = nc4.num2date(tvar[:], units=tvar.units, only_use_cftime_datetimes=False, only_use_python_datetimes=True) # Fixme: use the name and pass it through? time = cls(data=tdata, filename=filename, varname=varname, tz_offset=tz_offset, new_tz_offset=new_tz_offset, tz_offset_name=tz_offset_name, origin=origin, displacement=displacement, **kwargs ) return time
[docs] @classmethod def constant_time(cls): """ Returns a Time object that represents no change in time In practice, that's a Time object with a single datetime """ # this caches a single instance of a constant time object # in the class, and always returns the same one (a singleton) if cls._const_time is None: cls._const_time = cls(np.array([datetime.now()])) return cls._const_time
@property def data(self): return self._data @data.setter def data(self, data): # If it's passed in a Time object, it gets its data object # Fixme: Why? this seems like more magic than necessary if isinstance(data, self.__class__) or data.__class__ in self.__class__.__mro__: data = data.data # add check for valid datetime list? self._data = np.asarray(data) @property def info(self): """ Provides info about this Time object """ msg = """ Time object: filename: {} varname: {} first timestep: {} final timestep: {} number of timesteps: {} """.format(self.filename, self.varname, self.min_time, self.max_time, len(self.data), ) return dedent(msg) def __len__(self): return len(self.data) def __iter__(self): return iter(self.data) def __eq__(self, other): # r = self.data == other.data # return all(r) if hasattr(r, '__len__') else r if not isinstance(other, self.__class__): return False return np.array_equal(self.data, other.data) and self.tz_offset == other.tz_offset def __ne__(self, other): return not self.__eq__(other) def _timeseries_is_ascending(self, ts): return np.all(np.sort(ts) == ts) def _has_duplicates(self, time): return len(np.unique(time)) != len(time) and len(time) != 1 @property def min_time(self): ''' First time in series :rtype: datetime.datetime ''' return self.data[0] @property def max_time(self): ''' Last time in series :rtype: datetime.datetime ''' return self.data[-1] @property def tz_offset(self): ''' Timezone offset of the time series :rtype: number ''' return self._tz_offset @tz_offset.setter def tz_offset(self, offset): ''' Set the timezone offset of the time series. Replaces the current offset by reverting the current offset and applying the new offset. :param offset: offset to adjust for timezone, in hours. :type offset: float, integer hours ''' if self._tz_offset is not None and offset is not None: # undo previous offset self.data -= timedelta(hours=self._tz_offset) # set new offset self.data += timedelta(hours=offset) self._tz_offset = offset @property def displacement(self): if not hasattr (self, '_displacement'): self._displacement = None return self._displacement @displacement.setter def displacement(self, displacement): if not hasattr(self, '_displacement') or self._displacement is None: if displacement is None: self._displacement = None return if isinstance(displacement, (float, int)): displacement = timedelta(hours=displacement) self.data += displacement self._displacement = displacement else: # why not jsut make this an init thing, then?? raise AttributeError('displacement is settable only once')
[docs] def get_time_array(self): """ returns a copy of the internal data array """ return self.data.copy()
[docs] def time_in_bounds(self, time): ''' Checks if time provided is within the bounds represented by this object. :param time: time to be queried :type time: `datetime.datetime` :rtype: bool ''' return not ((time < self.min_time) or (time > self.max_time))
[docs] def valid_time(self, time): """ Raises a OutOfTimeRangeError if time is not within the bounds of the timeseries :param time: a datetime object that you want to check. """ # if time < self.min_time or time > self.max_time: if not self.time_in_bounds(time): raise OutOfTimeRangeError(f'time specified: ({time.isoformat()}) is not within the bounds of ' f'({self.min_time.isoformat()} to {self.max_time.isoformat()})' )
[docs] def index_of(self, time, extrapolate=False): ''' Returns the index of the provided time with respect to the time intervals in the file. :param time: Time to be queried :type time: `datetime.datetime` :param extrapolate: whether to allow extrapolation: i.e. will not raise if outside the bounds of the time data. :type extrapolate: bool :return: index of first time before specified time :rtype: integer ''' if not (extrapolate or len(self.data) == 1): self.valid_time(time) index = np.searchsorted(self.data, time) if len(self.data) == 1: index = 0 return index
[docs] def interp_alpha(self, time, extrapolate=False): ''' Returns interpolation alpha for the specified time This is the weighting to give the index before -- 1-alpha would be the weight to the index after. :param time: Time to be queried :type time: `datetime.datetime` :param extrapolate: if True, 0.0 (before) or 1.0 (after) is returned. if False, a ValueError is raised if outside the time series. :type extrapolate: bool :return: interpolation alpha :rtype: float (0 <= r <= 1) ''' if (not extrapolate) and (not len(self.data) == 1): self.valid_time(time) i0 = self.index_of(time, extrapolate) if i0 > len(self.data) - 1: return 1.0 if i0 == 0: return 0.0 t0 = self.data[i0 - 1] t1 = self.data[i0] return (time - t0).total_seconds() / (t1 - t0).total_seconds()