Source code for silicone.time_projectors.linear_extender

"""
Module for the database cruncher which extends using a linear trend
"""
import datetime
import logging
import warnings

from pyam import IamDataFrame

logger = logging.getLogger(__name__)


[docs]class LinearExtender: """ Time projector which extends the timeseries of a variable using a linear trend. You can either specify a gradient for the line (possibly zero) or a point in the future. """ def __init__(self, db=None): """ Initialise the time projector. The database is only used to determine the required timeperiods to return and can be left out if this is specified later. Parameters ---------- db : IamDataFrame or None The database to use to determine times to return """ if db: self._db = db.copy() else: self._db = db
[docs] def derive_relationship(self, variable, gradient=None, year_value=None, times=None): """ Derives the function to return a linear trend following from the last datapoint Parameters ---------- variable : str The variable for which we want to calculate timeseries (e.g. ``"Emissions|CO2"``). gradient : float or None The gradient of the variable after its last available datapoint, in the emissions units per year. If not provided, year_value must be provided instead. year_value : None or tuple(int or datetime, float) The value of the variable at a given future time, e.g. (2050, 0) to extend the data to net zero in 2050. If not provided, gradient must be provided instead. times : None or list[int or datetime] The times to return entries at. Only required if no database was used during initalisation. Returns ------- :obj:`func` Function which takes a :obj:`pyam.IamDataFrame` containing ``variable`` timeseries and returns these timeseries extended until the latest time in the infiller database. Raises ------ ValueError There is no data for ``variable`` in the database. """ if times is None: if not self._db: raise ValueError( "This function must either be given a list of times or a " "database of completed scenarios" ) times = self._db[self._db.time_col].unique() if (gradient is not None) and (year_value is not None): raise ValueError("Provide only one of a year_value OR gradient") if (gradient is None) and (year_value is None): raise ValueError("Provide either a year_value OR gradient") if year_value: if (not isinstance(year_value, tuple)) or (len(year_value) != 2): raise ValueError( "year_value should be a tuple of the year and the value that year." ) def filler(in_iamdf): """ Filler function derived from :obj:`LinearExtender`. Parameters ---------- in_iamdf : :obj:`pyam.IamDataFrame` Input data to fill data in Returns ------- :obj:`pyam.IamDataFrame` Filled in data (without original source data) Raises ------ ValueError "The infiller database does not extend in time past the target " "database, so no infilling can occur." """ target_df = in_iamdf.filter(variable=variable) if target_df.empty: error_msg = "No data for `variable` ({}) in target database".format( variable ) raise ValueError(error_msg) infiller_time_col = target_df.time_col last_time = max(target_df.data[infiller_time_col]) try: later_times = [time for time in times if time > last_time] except TypeError as exc: raise TypeError( "The times requested must be in the same format as the time column " "in the input database" ) from exc if not later_times: raise ValueError( "No times requested are later than the times already in " "the database" ) key_timepoint_filter = {infiller_time_col: last_time} def get_values_in_key_timepoint(idf): # filter warning about empty data frame as we handle it ourselves with warnings.catch_warnings(): warnings.simplefilter("ignore") filtered = idf.filter(**key_timepoint_filter) idf = filtered.timeseries() if not idf.shape[1] == 1: raise AssertionError( "How did filtering for a single timepoint result in more than " "one column?" ) return idf.iloc[:, 0] target_at_key_time = get_values_in_key_timepoint(target_df) output_ts = target_df.timeseries() # one_year required so type conversion works properly one_year = ( 1 if (infiller_time_col == "year") else datetime.timedelta(days=365) ) if year_value: gradient_to_use = (year_value[1] - target_at_key_time) / ( (year_value[0] - last_time) / one_year ) else: gradient_to_use = gradient for time in later_times: output_ts[time] = target_at_key_time + gradient_to_use * ( (time - last_time) / one_year ) for col in output_ts.columns: if col not in later_times: del output_ts[col] return IamDataFrame(output_ts) return filler