Source code for silicone.utils

import datetime as dt
import logging
import os.path

import numpy as np
import pandas as pd
import pyam
import scipy.interpolate
from openscm_units import ScmUnitRegistry
from pint.errors import DimensionalityError

logger = logging.getLogger(__name__)

# initialise our own registry to avoid conflicts
_ur = ScmUnitRegistry()
_ur.add_standards()

"""
Utils contains a number of helpful functions that don't belong elsewhere.
"""


[docs]def find_matching_scenarios( options_df, to_compare_df, variable_follower, variable_leaders, classify_scenarios, classify_models=["*"], return_all_info=False, use_change_not_abs=False, ): """ Groups scenarios and models into different classifications and uses those to work out which group contains a trendline most similar to the data. These combinations may group several models/scenarios together by means of wild cards. Most similar means having the smallest total squared distance between the to_compare_df value of variable_follower and the variable_follower values interpolated in options_df at the variable_leaders points in to_compare_df, i.e. assuming errors only exist in variable_follower. In the event of a tie between different scenario/model classifications, it returns the scenario/model combination that occurs earlier in the input lists, looping through scenarios first. Parameters ---------- options_df : :obj:`pyam.IamDataFrame` The dataframe containing the data for each scenario and model to_compare_df : :obj:`pyam.IamDataFrame` The dataframe we wish to find the scenario group closest to. May contain one or more scenarios, we minimise the least squared errors for all the data colleectively. variable_follower : str The variable we want to interpolate and compare to the value in to_compare_df variable_leaders : list[str] The variable(s) we want to use to construct the interpolation (e.g. ``["Emissions|CO2"]``). In the event that there are multiple, we interpolate with each one separately and minimise the sum of the squared errors. classify_scenarios : list[str] The names of scenarios or groups of scenarios that are possible matches. This may have "\\*"s to represent wild cards, hence multiple scenarios will have all their data combined to make the interpolator. classify_models : list[str] The names of models or groups of models that are possible matches. This may have "\\*"s to represent wild cards, hence multiple models will have all their data combined to make the interpolator. return_all_info : bool If True, instead of simply returning the strings specifying the closest scenario/model match, we return all scenario/model combinations in order of preference, along with the rms distance, quantifying the closeness. use_change_not_abs : bool If True, the code looks for the trend with the closest *derivatives* rather than the closest absolute value, i.e. closest trend allowing for an offset. This requires data from more than one time. Returns ------- if return_all_info == False: (string, string) Strings specifying the model (first) and scenario (second) classifications that best match the data. if return_all_info == True: dict Maps the model and scenario classification strings to the measure of closeness. Raises ------ ValueError Not all required timepoints are present in the database we crunched, we have `{dates we have}` but you passed in `{dates we need}`." """ assert all( x in options_df.variable for x in [variable_follower] + variable_leaders ), "Not all required data is present in compared series" assert len(variable_leaders) == 1, "This is only calibrated to work with one leader" time_col = options_df.time_col assert ( to_compare_df.time_col == time_col ), "The time column in the data to classify does not match the cruncher" times_needed = set(to_compare_df.data[time_col]) if any(x not in options_df.data[time_col].values for x in times_needed): raise ValueError( "Not all required timepoints are present in the database we " "crunched, we have `{}` but you passed in {}.".format( list(set(options_df.data[time_col])), list(set(to_compare_df.data[time_col])), ) ) assert ( len(times_needed) > 1 or use_change_not_abs == False ), "We need data from multiple times in order to calculate a difference." if to_compare_df.data.empty: print("The database being compared is empty") return None scen_model_rating = {} to_compare_db = _make_wide_db(to_compare_df) if use_change_not_abs: # Set all values to 0 at time 0 to remove any offset _remove_t0_from_wide_db(times_needed, to_compare_db) to_compare_db = to_compare_db.reset_index() for scenario in classify_scenarios: for model in classify_models: scenario_db = options_df.filter( scenario=scenario, model=model, variable=variable_leaders + [variable_follower], ) if scenario_db.data.empty: scen_model_rating[model, scenario] = np.inf logger.warning( "Data with scenario {} and model {} not found in data".format( scenario, model ) ) continue wide_db = _make_wide_db(scenario_db) if use_change_not_abs: # Set all values to 0 at time 0 _remove_t0_from_wide_db(times_needed, wide_db) squared_dif = 0 for leader in variable_leaders: all_interps = _make_interpolator( variable_follower, leader, wide_db, time_col ) for row in to_compare_db.iterrows(): squared_dif += ( row[1][variable_follower] - all_interps[row[1][time_col]](row[1][leader]) ) ** 2 scen_model_rating[model, scenario] = squared_dif ordered_scen = sorted(scen_model_rating.items(), key=lambda item: item[1]) if return_all_info: return ordered_scen return ordered_scen[0][0]
def _remove_t0_from_wide_db(times_needed, _db): """ This function finds the first set of values for each model and scenario and subtracts them from all values to remove the offset. """ for model, scenario in set( zip(_db.index.get_level_values("model"), _db.index.get_level_values("scenario")) ): offset = _db.loc[model, scenario, min(times_needed)].copy().values.squeeze() for time in times_needed: _db.loc[model, scenario, time] = _db.loc[model, scenario, time] - offset def _f_factory(ys): # We want a generic function to return constants where required def f(x): return ys[0] * np.ones(np.size(x)) return f def _make_interpolator( variable_follower, variable_leader, wide_db, time_col, interpkind="linear" ): """ Constructs a linear interpolator for variable_follower as a function of (one) variable_leader for each timestep in the data. Parameters ---------- variable_follower : str The variable we want to interpolate and compare to the value in variable_leaders. the x variable. variable_leaders : list[str] The variable we want to use to construct the interpolation (e.g. ``["Emissions|CO2"]``). The y variable. wide_db : pd.DataFrame The dataframe of every year in wide format, with both x and y values time_col : str The name of the time column in wide_db interpkind : str The style of interpolation. By default, linear, but can also be any value accepted as the "kind" option in scipy.interpolate.interp1d, or "PchipInterpolator", in which case scipy.interpolate.PchipInterpolator is used. Care should be taken if using non-default interp1d options, as they are either uneven or have "ringing". """ derived_relationships = {} for db_time, dbtdf in wide_db.groupby(time_col): xs = dbtdf[variable_leader].values.squeeze() ys = dbtdf[variable_follower].values.squeeze() if xs.shape != ys.shape: raise NotImplementedError( "Having more than one `variable_leaders` is not yet implemented" ) if not xs.shape: # 0D-array, so we can return a single value xs = np.array([xs]) ys = np.array([ys]) # Ensure that any duplicates are replaced by their average value xs_pandas = pd.Series(xs) for x_dup in xs_pandas[xs_pandas.duplicated()]: inds = np.asarray(xs == x_dup).nonzero()[0] ys[inds[0]] = ys[inds].mean() xs = np.delete(xs, inds[1:]) ys = np.delete(ys, inds[1:]) xs, ys = map(np.array, zip(*sorted(zip(xs, ys)))) if xs.shape == (1,): # If there is only one point, we must always return that point derived_relationships[db_time] = _f_factory(ys) else: if interpkind != "PchipInterpolator": derived_relationships[db_time] = scipy.interpolate.interp1d( xs, ys, bounds_error=False, fill_value=(ys[0], ys[-1]), assume_sorted=True, kind=interpkind, ) else: # We must add boundaries to the spline to prevent extrapolating larger values xs = np.concatenate([xs[:1] - 1.0, xs, xs[-1:] + 1.0]) ys = np.append(np.append(ys[0], ys), ys[-1]) derived_relationships[db_time] = scipy.interpolate.PchipInterpolator( xs, ys, ) return derived_relationships def _make_wide_db(use_db): """ Converts an IamDataFrame into a pandas DataFrame that describes the timeseries of variables in index-labelled values. """ idx = ["model", "scenario", use_db.time_col] assert ( use_db.data.groupby(idx + ["variable"]).count().max().max() <= 1 ), "The table contains multiple entries with the same model and scenario" use_db = use_db.pivot_table(index=idx, columns="variable", aggfunc="sum") # make sure we don't have empty strings floating around (pyam bug?) use_db = use_db.applymap(lambda x: np.nan if isinstance(x, str) else x) use_db = use_db.dropna(axis=0) return use_db def _get_unit_of_variable(df, variable, multiple_units="raise"): """ Get the unit of a variable in ``df`` Parameters ---------- variable : str String to use to filter variables multiple_units : str If ``"raise"``, check that the variable only has one unit and raise an ``AssertionError`` if it has more than one unit. Returns ------- list List of units for the variable Raises ------ AssertionError ``multiple_units=="raise"`` and the filter results in more than one unit """ units = df.filter(variable=variable).data["unit"].unique() if multiple_units == "raise": if len(units) > 1: raise AssertionError("`{}` has multiple units".format(variable)) return units return units
[docs]def return_cases_which_consistently_split( df, aggregate, components, how_close=None, metric_name="AR5GWP100" ): """ Returns model-scenario tuples which correctly split up the to_split into the various components. Components may contain wildcard "\\*"s to match several variables. Parameters ---------- df: :obj:`pyam.IamDataFrame` The input dataframe. aggregate : str Name of the variable that should split into the others components : list[str] List of the variable names whose sum should equal the to_split value (if expressed in common units). how_close : dict This is a dictionary of numpy.isclose options specifying how exact the match must be for the case to be included as passing. By default we specify a relative tolerance of 1% ('rtol': 1e-2). The syntax for this can be found in the numpy documentation. metric_name : str The name of the conversion metric to use. This will usually be AR<4/5/6>GWP100. Returns ------- list[(str, str, str)] List of consistent (Model name, scenario name, region name) tuples. """ if not how_close: how_close = {"equal_nan": True, "rtol": 1e-02} valid_model_scenario = [] df = convert_units_to_MtCO2_equiv( df.filter(variable=[aggregate] + components), metric_name ) combinations = df.data[["model", "scenario", "region"]].drop_duplicates() for ind in range(len(combinations)): model, scenario, region = combinations.iloc[ind] model_df = df.filter(model=model, scenario=scenario, region=region) # The following will often release a warning for empty data logging.getLogger("pyam.core").setLevel(logging.CRITICAL) to_split_df = model_df.filter(variable=aggregate) logging.getLogger("pyam.core").setLevel(logging.WARNING) if to_split_df.data.empty: continue sum_all = model_df.data.groupby(model_df.time_col).agg("sum") sum_to_split = to_split_df.data.groupby(model_df.time_col).agg("sum") if all( [ np.isclose( sum_all.loc[time, "value"], sum_to_split.loc[time, "value"] * 2, **how_close, ) for time in sum_to_split.index ] ): valid_model_scenario.append((model, scenario, region)) return valid_model_scenario
[docs]def convert_units_to_MtCO2_equiv(df, metric_name="AR5GWP100"): """ Converts the units of gases reported in kt into Mt CO2 equivalent per year Uses GWP100 values from either (by default) AR5 or AR4 IPCC reports. Parameters ---------- df : :obj:`pyam.IamDataFrame` The input dataframe whose units need to be converted. metric_name : str The name of the conversion metric to use. This will usually be AR<4/5/6>GWP100. Return ------ :obj:`pyam.IamDataFrame` The input data with units converted. """ # Check things need converting convert_to_str = "Mt CO2-equiv/yr" convert_to_str_clean = "Mt CO2/yr" if df["unit"].isin([convert_to_str, convert_to_str_clean]).all(): return df to_convert_df = df.copy() to_convert_var = ( to_convert_df.filter(unit=[convert_to_str, convert_to_str_clean], keep=False) .data[["variable", "unit"]] .drop_duplicates() ) to_convert_units = to_convert_var["unit"] to_convert_units_clean = { unit: unit.replace("-equiv", "").replace("equiv", "") for unit in to_convert_units } conversion_factors = {} not_found = [] with _ur.context(metric_name): for unit in to_convert_units: if unit in conversion_factors: continue clean_unit = to_convert_units_clean[unit] try: conversion_factors[unit] = ( _ur(clean_unit).to(convert_to_str_clean).magnitude ) except DimensionalityError: raise ValueError( "Cannot convert from {} (cleaned is: {}) " "to {} (cleaned is: {})".format( unit, clean_unit, convert_to_str, convert_to_str_clean ) ) assert not not_found, "Not all units can be converted. We lack {}".format(not_found) for unit in to_convert_units: to_convert_df.convert_unit( current=unit, to=convert_to_str, factor=conversion_factors[unit], inplace=True, ) return to_convert_df
[docs]def download_or_load_sr15(filename, valid_model_ids="*"): """ Load SR1.5 data, if it isn't there, download it Parameters ---------- filename : str Filename in which to look for/save the data valid_model_ids : str Models to return from data Returns ------- :obj: `pyam.IamDataFrame` The loaded data """ if not os.path.isfile(filename): get_sr15_scenarios(filename, valid_model_ids) return pyam.IamDataFrame(filename).filter(model=valid_model_ids)
[docs]def get_sr15_scenarios(output_file, valid_model_ids): """ Collects world-level data from the IIASA database for the named models and saves them to a given location. Parameters ---------- output_file : str File name and location for data to be saved valid_model_ids : list[str] Names of models that are to be fetched. """ conn = pyam.iiasa.Connection("IXSE_SR15") variables_to_fetch = ["Emissions*"] for model in valid_model_ids: print("Fetching data for {}".format(model)) for variable in variables_to_fetch: print("Fetching {}".format(variable)) var_df = conn.query( model=model, variable=variable, region="World", timeslice=None ) try: df.append(var_df, inplace=True) except NameError: df = pyam.IamDataFrame(var_df) print("Writing to {}".format(output_file)) df.to_csv(output_file)
def _adjust_time_style_to_match(in_df, target_df): if in_df.time_col != target_df.time_col: in_df = in_df.timeseries() if target_df.time_col == "time": target_df_year_map = {v.year: v for v in target_df.timeseries().columns} in_df.columns = in_df.columns.map( lambda x: target_df_year_map[x] if x in target_df_year_map else dt.datetime(x, 1, 1) ) else: in_df.columns = in_df.columns.map(lambda x: x.year) return pyam.IamDataFrame(in_df) return in_df def _remove_equivs(string_to_fix): """ Removes the substring "-equiv" from strings. For use in unit conversion Parameter --------- string_to_fix: str The string to strip of "-equiv". Returns ------- str """ return string_to_fix.replace("-equiv", "") def _construct_consistent_values(aggregate_name, components, db_to_generate): """ Calculates the sum of the components and creates an IamDataFrame with this value under variable type `aggregate_name`. Parameters ---------- aggregate_name : str The name of the aggregate variable. components : [str] List of the names of the variables to be summed. db_to_generate : :obj:`pyam.IamDataFrame` Input data from which to construct consistent values. Returns ------- :obj:`pyam.IamDataFrame` Consistently calculated aggregate data. """ assert ( aggregate_name not in db_to_generate.variable ), "We already have a variable of this name" relevant_db = db_to_generate.filter(variable=components) units = relevant_db.data["unit"].drop_duplicates().sort_values() unit_equivs = units.map(lambda x: x.replace("-equiv", "")).drop_duplicates() if len(unit_equivs) == 0: raise ValueError( "Attempting to construct a consistent {} but none of the components " "present".format(aggregate_name) ) elif len(unit_equivs) > 1: raise ValueError( "Too many units found to make a consistent {}".format(aggregate_name) ) use = ( relevant_db.data.groupby(["model", "scenario", "region", relevant_db.time_col]) .agg("sum") .reset_index() ) # Units are sorted in alphabetical order so we choose the first to get -equiv use["unit"] = units.iloc[0] use["variable"] = aggregate_name return pyam.IamDataFrame(use) def _make_weighting_series(df, weights): """ Makes a complete list of weights for all scenarios from a dictionary of only specific weights. Parameters ---------- df: :obj:`pd.DataFrame` The timseries with the full set of models and scenarios whose weights should be returned weights: Dict{(str, str) : float} The dictionary, mapping the (model and scenario) tuple onto the weight (relative to a weight of 1 for the default). This does not have to include all scenarios in df, but cannot include scenarios not in df. Returns ------- :obj:`pd.Series` A series with index corresponding to the timeseries of the dataframe and values corresponding to the weights. """ all_mod_scen = [(i[0], i[1]) for i in df.index.drop_duplicates()] if any([key not in all_mod_scen for key in weights.keys()]): raise ValueError( "Not all the weighting values are found in the database. We " "lack {}".format([key for key in weights.keys() if key not in all_mod_scen]) ) result = pd.Series(np.ones_like(len(df)), index=df.index) for (key, val) in weights.items(): result[key[0], key[1]] = val return result