Source code for silicone.database_crunchers.quantile_rolling_windows

"""
Module for the database cruncher which uses the 'rolling windows' technique.
"""

import logging

import numpy as np
import scipy.interpolate
from pyam import IamDataFrame

from ..stats import rolling_window_find_quantiles
from ..utils import _get_unit_of_variable
from .base import _DatabaseCruncher

logger = logging.getLogger(__name__)


[docs]class QuantileRollingWindows(_DatabaseCruncher): """ Database cruncher which uses the 'rolling windows' technique. This cruncher derives the relationship between two variables by performing quantile calculations between the follower timeseries and the lead timeseries. These calculations are performed at each timestep in the timeseries, independent of the other timesteps. For each timestep, the lead timeseries axis is divided into multiple evenly spaced windows (to date this is only tested on 1:1 relationships but may work with more than one lead timeseries). In each window, every data point in the database is included. However, the data points receive a weight given by .. math:: w(x, x_{\\text{window}}) = \\frac{1}{1 + (d_n)^2} where :math:`w` is the weight and :math:`d_n` is the normalised distance between the centre of the window and the data point's position on the lead timeseries axis. :math:`d_n` is calculated as .. math:: d_n = \\frac{x - x_{\\text{window}}}{f \\times (\\frac{b}{2})} where :math:`x` is the position of the data point on the lead timeseries axis, :math:`x_{\\text{window}}` is the position of the centre of the window on the lead timeseries axis, :math:`b` is the distance between window centres and :math:`f` is a decay factor which controls how much less points away from :math:`x_{\\text{window}}` are weighted. If :math:`f=1` then a point which is half the width between window centres away receives a weighting of :math:`1/2`. Lowering the value of :math:`f` cause points further from the window centre to receive less weight. With these weightings, the desired quantile of the data is then calculated. This calculation is done by sorting the data by the database's follow timeseries values (then by lead timeseries values in the case of identical follow values). From here, the weight of each point is calculated following the formula given above. We calculate the cumulative sum of weights, and then the cumulative sum up to half weights, defined by .. math:: c_{hw} = c_w - 0.5 \\times w where :math:`c_w` is the cumulative weights and :math:`w` is the raw weights. This ensures that quantiles less than half the weight of the smallest follow value return the smallest follow value and more than one minus half the weight of the largest follow value return the largest value. Without such a shift, the largest value is only returned if the quantile is 1, leading to a bias towards smaller values. With these calculations, we have determined the relationship between the follow timeseries values and the quantile i.e. cumulative sum of (normalised) weights. We can then determine arbitrary quantiles by linearly interpolating. If the option ``use_ratio`` is set to ``True``, instead of returning the absolute value of the follow at this quantile, we return the quantile of the ratio between the lead and follow data in the database, multiplied by the actual lead value of the database being infilled. By varying the quantile, this cruncher can provide ranges of the relationship between different variables. For example, it can provide the 90th percentile (i.e. high end) of the relationship between e.g. ``Emissions|CH4`` and ``Emissions|CO2`` or the 50th percentile (i.e. median) or any other arbitrary percentile/quantile choice. Note that the impact of this will strongly depend on nwindows and decay_length_factor. Using the :class:`TimeDepQuantileRollingWindows` class makes it is possible to specify a dictionary of dates to quantiles, in which case we return that quantile for that year or date. """
[docs] def derive_relationship( self, variable_follower, variable_leaders, quantile=0.5, nwindows=11, decay_length_factor=1, use_ratio=False, ): """ Derive the relationship between two variables from the database. Parameters ---------- variable_follower : str The variable for which we want to calculate timeseries (e.g. ``"Emissions|CH4"``). variable_leaders : list[str] The variable(s) we want to use in order to infer timeseries of ``variable_follower`` (e.g. ``["Emissions|CO2"]``). quantile : float The quantile to return in each window. nwindows : int The number of window centers to use when calculating the relationship between the follower and lead gases. decay_length_factor : float Parameter which controls how strongly points away from the window's centre should be weighted compared to points at the centre. Larger values give points further away increasingly less weight, smaller values give points further away increasingly more weight. use_ratio : bool If false, we use the quantile value of the weighted mean absolute value. If true, we find the quantile weighted mean ratio between lead and follow, then multiply the ratio by the input value. Returns ------- :obj:`func` Function which takes a :obj:`pyam.IamDataFrame` containing ``variable_leaders`` timeseries and returns timeseries for ``variable_follower`` based on the derived relationship between the two. Please see the source code for the exact definition (and docstring) of the returned function. Raises ------ ValueError There is no data for ``variable_leaders`` or ``variable_follower`` in the database. ValueError ``quantile`` is not between 0 and 1. ValueError ``nwindows`` is not equivalent to an integer or is not greater than 1. ValueError ``decay_length_factor`` is 0. """ self._check_follower_and_leader_in_db(variable_follower, variable_leaders) if not (0 <= quantile <= 1): error_msg = "Invalid quantile ({}), it must be in [0, 1]".format(quantile) raise ValueError(error_msg) if int(nwindows) != nwindows or nwindows < 2: error_msg = "Invalid nwindows ({}), it must be an integer > 1".format( nwindows ) raise ValueError(error_msg) nwindows = int(nwindows) if np.equal(decay_length_factor, 0): raise ValueError("decay_length_factor must not be zero") data_leader_unit = _get_unit_of_variable(self._db, variable_leaders)[0] data_follower_unit = _get_unit_of_variable(self._db, variable_follower)[0] db_time_col = self._db.time_col columns = "variable" idx = list(set(self._db.data.columns) - {columns, "value", "unit"}) wide_db = self._db.filter( variable=[variable_follower] + variable_leaders ).pivot_table(index=idx, columns=columns, aggfunc="sum") # make sure we don't have empty strings floating around (pyam bug?) wide_db = wide_db.applymap(lambda x: np.nan if isinstance(x, str) else x) wide_db = wide_db.dropna(axis=0) derived_relationships = {} for db_time, dbtdf in wide_db.groupby(db_time_col): xs = dbtdf[variable_leaders].values.squeeze() ys = dbtdf[variable_follower].values.squeeze() if xs.shape != ys.shape: raise NotImplementedError( "Having more than one `variable_leaders` is not yet implemented" ) if not xs.shape: # 0D-array, make 1D xs = np.array([xs]) ys = np.array([ys]) if use_ratio: # We want the ratio between x and y, not the actual values of y. ys = ys / xs if np.isnan(ys).any(): logger.warning( "Undefined values of ratio appear in the quantiles when " "infilling {}, setting some values to 0 (this may not affect " "results).".format(variable_follower) ) ys[np.isnan(ys)] = 0 if np.equal(max(xs), min(xs)): # We must prevent singularity behaviour if all the points are at the # same x value. cumsum_weights = np.array([(0.5 + x) / len(ys) for x in range(len(ys))]) ys.sort() def same_x_val_workaround( _, ys=ys, cumsum_weights=cumsum_weights, quantile=quantile ): if np.equal(min(ys), max(ys)): return ys[0] return scipy.interpolate.interp1d( cumsum_weights, ys, bounds_error=False, fill_value=(ys[0], ys[-1]), assume_sorted=True, )(quantile) derived_relationships[db_time] = same_x_val_workaround else: db_time_table = rolling_window_find_quantiles( xs, ys, quantile, nwindows, decay_length_factor ) derived_relationships[db_time] = scipy.interpolate.interp1d( db_time_table.index.values, db_time_table.loc[:, quantile].values.squeeze(), bounds_error=False, fill_value=( db_time_table[quantile].iloc[0], db_time_table[quantile].iloc[-1], ), ) def filler(in_iamdf): """ Filler function derived from :class:`QuantileRollingWindows`. Parameters ---------- in_iamdf : :obj:`pyam.IamDataFrame` Input data to fill data in Returns ------- :obj:`pyam.IamDataFrame` Filled in data (without original source data) Raises ------ ValueError The key db_times for filling are not in ``in_iamdf``. """ if db_time_col != in_iamdf.time_col: raise ValueError( "`in_iamdf` time column must be the same as the time column used " "to generate this filler function (`{}`)".format(db_time_col) ) var_units = _get_unit_of_variable(in_iamdf, variable_leaders) if var_units.size == 0: raise ValueError( "There is no data for {} so it cannot be infilled".format( variable_leaders ) ) var_units = var_units[0] if var_units != data_leader_unit: raise ValueError( "Units of lead variable is meant to be `{}`, found `{}`".format( data_leader_unit, var_units ) ) # check whether we have all the required timepoints or not have_all_timepoints = all( [c in derived_relationships for c in in_iamdf.timeseries()] ) if not have_all_timepoints: raise ValueError( "Not all required timepoints are present in the database we " "crunched, we crunched \n\t`{}`\nbut you passed in \n\t{}".format( list(derived_relationships.keys()), in_iamdf.timeseries().columns.tolist(), ) ) # do infilling here infilled_ts = in_iamdf.filter(variable=variable_leaders).timeseries() if use_ratio and (infilled_ts.values < 0).any(): warn_str = "Note that the lead variable {} goes negative.".format( variable_leaders ) logger.warning(warn_str) print(warn_str) for col in infilled_ts: if use_ratio: infilled_ts[col] = ( derived_relationships[col](infilled_ts[col]) * infilled_ts[col] ) else: infilled_ts[col] = derived_relationships[col](infilled_ts[col]) infilled_ts = infilled_ts.reset_index() infilled_ts["variable"] = variable_follower infilled_ts["unit"] = data_follower_unit return IamDataFrame(infilled_ts) return filler