Source code for silicone.multiple_infillers.decompose_collection_with_time_dep_ratio

"""
Uses the 'time-dependent ratio' database cruncher designed for constructing an
aggregate variable and breaking this mix into its constituents.
"""

import pyam

from silicone.database_crunchers import TimeDepRatio
from silicone.utils import convert_units_to_MtCO2_equiv


[docs]class DecomposeCollectionTimeDepRatio: """ Constructs an aggregate variable and uses the 'time-dependent ratio' technique to calculate what this predicts for our database. """ def __init__(self, db): """ Initialises the database to use for infilling. Parameters ---------- db : IamDataFrame The database for infilling. """ self._db = db.copy() def _construct_consistent_values(self, aggregate_name, components, db_to_generate): """ Calculates the sum of the components and creates an IamDataFrame with this value under variable type `aggregate_name`. Parameters ---------- aggregate_name : str The name of the aggregate variable. components : [str] List of the names of the variables to be summed. db_to_generate : :obj:`pyam.IamDataFrame` Input data from which to construct consistent values. Return ------ :obj:`pyam.IamDataFrame` Consistently calculated aggregate data. """ assert ( aggregate_name not in db_to_generate.variable ), "We already have a variable of this name" relevant_db = db_to_generate.filter(variable=components) units = relevant_db.data["unit"].drop_duplicates().sort_values() unit_equivs = units.map(lambda x: x.replace("-equiv", "")).drop_duplicates() if len(unit_equivs) == 0: raise ValueError( "Attempting to construct a consistent {} but none of the components " "present".format(aggregate_name) ) elif len(unit_equivs) > 1: raise ValueError( "Too many units found to make a consistent {}".format(aggregate_name) ) use = ( relevant_db.data.groupby( ["model", "scenario", "region", relevant_db.time_col] ) .agg("sum") .reset_index() ) # Units are sorted in alphabetical order so we choose the first to get -equiv use["unit"] = units.iloc[0] use["variable"] = aggregate_name for col in relevant_db.extra_cols: use[col] = "" return pyam.IamDataFrame(use) def _set_of_units_without_equiv(self, df): """ Parameters ---------- df : obj:`pyam.IamDataFrame` The dataframe whose units we want Returns ------- Set(str) The set of units from the dataframe with "-equiv" removed """ return set(df.data["unit"].map(lambda x: x.replace("-equiv", "")))
[docs] def infill_components( self, aggregate, components, to_infill_df, metric_name="AR5GWP100", only_consistent_cases=True, ): """ Derive the relationship between the composite variables and their sum, then use this to deconstruct the sum. Parameters ---------- aggregate : str The variable for which we want to calculate timeseries (e.g. ``"Emissions|CO2"``). Unlike in most crunchers, we do not expect the database to already contain this data. components : list[str] The variables whose sum should be equal to the timeseries of the aggregate (e.g. ``["Emissions|CO2|AFOLU", "Emissions|CO2|Energy"]``). to_infill_df : :obj:`pyam.IamDataFrame` The dataframe that already contains the ``aggregate`` variable, but needs the ``components`` to be infilled. metric_name : str The name of the conversion metric to use. This will usually be AR<4/5/6>GWP100. only_consistent_cases : bool Do we want to only use model/scenario combinations where all aggregate and components have data at all times? This will reduce the risk of inconsistencies or unevenness in the results, but may reduce the amount of data. Returns ------- :obj:`pyam.IamDataFrame` The infilled data resulting from the calculation. Raises ------ ValueError There is no data for ``variable_leaders`` or ``variable_follower`` in the database. """ assert ( aggregate in to_infill_df.variable ), "The database to infill does not have the aggregate variable" assert all( y not in components for y in to_infill_df.variable ), "The database to infill already has some component variables" assert len(to_infill_df.data.columns) == len(self._db.data.columns) and all( to_infill_df.data.columns == self._db.data.columns ), ( "The database and to_infill_db fed into this have inconsistent columns, " "which will prevent adding the data together properly." ) self._filtered_db = self._db.filter( variable=components, region=to_infill_df.region, ) if self._filtered_db.empty: raise ValueError( "Attempting to construct a consistent {} but none of the components " "present in region {}".format(aggregate, to_infill_df.region) ) if only_consistent_cases: # Remove cases with nans at some time. consistent_cases = ( self._filtered_db.filter( **{ to_infill_df.time_col: to_infill_df[ to_infill_df.time_col ].unique() } ) .timeseries() .dropna() ) self._filtered_db = pyam.IamDataFrame(consistent_cases) # We only want to reference cases where all the required components are found combinations = self._filtered_db.data[ ["model", "scenario", "region"] ].drop_duplicates() for ind in range(len(combinations)): model, scenario, region = combinations.iloc[ind] found_vars = self._filtered_db.filter( model=model, scenario=scenario, region=region ).variable if any(comp not in found_vars for comp in components): self._filtered_db.filter( model=model, scenario=scenario, keep=False, inplace=True ) if len(self._set_of_units_without_equiv(self._filtered_db)) > 1: db_to_generate = convert_units_to_MtCO2_equiv( self._filtered_db, metric_name=metric_name ) else: db_to_generate = self._filtered_db consistent_composite = self._construct_consistent_values( aggregate, components, db_to_generate ) self._filtered_db.append(consistent_composite, inplace=True) cruncher = TimeDepRatio(self._filtered_db) if self._set_of_units_without_equiv( to_infill_df.filter(variable=aggregate) ) != self._set_of_units_without_equiv(consistent_composite): raise ValueError( "The units of the aggregate variable are inconsistent between the " "input and constructed data. We input {} and constructed {}.".format( self._set_of_units_without_equiv(to_infill_df), self._set_of_units_without_equiv(consistent_composite), ) ) for leader in components: to_add = cruncher.derive_relationship( leader, [aggregate], only_consistent_cases=False )(to_infill_df) try: df_to_append.append(to_add, inplace=True) except NameError: df_to_append = to_add return df_to_append