Skip to content

Commit

Permalink
fix: calculate rates from summed totals
Browse files Browse the repository at this point in the history
  • Loading branch information
rouille committed Jun 25, 2024
1 parent 8f9ab43 commit f4a1253
Showing 1 changed file with 170 additions and 62 deletions.
232 changes: 170 additions & 62 deletions src/oge/output_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,26 @@
}


def prepare_files_for_upload(years):
"""
Zips files in preparation for upload to cloud storage and Zenodo.
def prepare_files_for_upload(years: list):
"""Zips files in preparation for upload to cloud storage and Zenodo.
This should only be run when releasing a new minor or major version of the repo.
Args:
years (list): list of four-digit year indicating when the data were taken.
"""

for year in years:
zip_results_for_s3(year)
zip_data_for_zenodo(year)


def zip_results_for_s3(year):
"""
Zips results directories that contain more than a single file for hosting on an Amazon S3 bucket.
def zip_results_for_s3(year: int):
"""Zips results directories that contain more than a single file for hosting on an
Amazon S3 bucket.
Args:
year (int): a four-digit year indicating when the data were taken.
"""
os.makedirs(data_folder("s3_upload"), exist_ok=True)
for data_type in ["power_sector_data", "carbon_accounting", "plant_data"]:
Expand Down Expand Up @@ -104,9 +109,11 @@ def zip_results_for_s3(year):
)


def zip_data_for_zenodo(year):
"""
Zips each of the four data directories for archiving on Zenodo.
def zip_data_for_zenodo(year: int):
"""Zips each of the three data directories for archiving on Zenodo.
Args:
year (int): a four-digit year indicating when the data were taken.
"""
os.makedirs(data_folder("zenodo"), exist_ok=True)
for directory in ["outputs", "results"]:
Expand All @@ -119,7 +126,19 @@ def zip_data_for_zenodo(year):
)


def output_intermediate_data(df, file_name, path_prefix, year, skip_outputs):
def output_intermediate_data(
df: pd.DataFrame, file_name: str, path_prefix: str, year: int, skip_outputs: bool
):
"""Save data frame as ZIP into the outputs directory.
Args:
df (pd.DataFrame): data frame that will be saved.
file_name (str): name of file without file extension.
path_prefix (str): name of base directory prefixing directory where data will
be saved.
year (int): a four-digit year indicating when the data were taken.
skip_outputs (bool): whether to save data or not.
"""
column_checks.check_columns(df, file_name)
if not skip_outputs:
logger.info(f"Exporting {file_name} to data/outputs")
Expand All @@ -131,8 +150,27 @@ def output_intermediate_data(df, file_name, path_prefix, year, skip_outputs):


def output_to_results(
df, year, file_name, subfolder, path_prefix, skip_outputs, include_metric=True
df: pd.DataFrame,
year: int,
file_name: str,
subfolder: str,
path_prefix: str,
skip_outputs: bool,
include_metric=True,
):
"""Save data franme as CSV into the results directory.
Args:
df (pd.DataFrame): data frame that will be saved.
year (int):a four-digit year indicating when the data were taken.
file_name (str): name of file without file extension.
subfolder (str): name of directory following `path_prefix`.
path_prefix (str): name of base directory prefixing directory where data will
be saved.
skip_outputs (bool): whether to save the data or not.
include_metric (bool, optional): whether to create a file with values in metric
units. Defaults to True.
"""
# Always check columns that should not be negative.
small = "small" in path_prefix
logger.info(f"Exporting {file_name} to data/results/{path_prefix}{subfolder}")
Expand All @@ -144,7 +182,7 @@ def output_to_results(

# Check for negatives after rounding
validation.test_for_negative_values(df, year, small)
# check that there are no missing values
# Check that there are no missing values
validation.test_for_missing_values(df, small)

if not skip_outputs:
Expand All @@ -159,7 +197,18 @@ def output_to_results(
)


def output_data_quality_metrics(df, file_name, path_prefix, skip_outputs):
def output_data_quality_metrics(
df: pd.DataFrame, file_name: str, path_prefix: str, skip_outputs: bool
):
"""Output data quality metrics.
Args:
df (pd.DataFrame): data frame that will be saved.
file_name (str): name of file without file extension.
path_prefix (str): name of base directory prefixing directory where data will
be saved.
skip_outputs (bool): whether to save data or not.
"""
if not skip_outputs:
logger.info(
f"Exporting {file_name} to data/results/{path_prefix}data_quality_metrics"
Expand All @@ -174,14 +223,27 @@ def output_data_quality_metrics(df, file_name, path_prefix, skip_outputs):


def output_plant_data(
df, year, path_prefix, resolution, skip_outputs, plant_attributes
df: pd.DataFrame,
year: int,
path_prefix: str,
resolution: str,
skip_outputs: bool,
plant_attributes: pd.DataFrame,
):
"""
Helper function for plant-level output.
Output for each time granularity, and output separately for real and shaped plants
`df` contains all plant-level data, both CEMS and synthetic.
"""Helper function for plant-level output. Different output will be produced for
real and shaped plants.
Note:
plant-level does not include rates, so all aggregation is summation.
Note: plant-level does not include rates, so all aggregation is summation.
Args:
df (pd.DataFrame): plant-level data both CEMS and synthetic.
year (int): a four-digit year indicating when data were taken.
path_prefix (str): name of base directory prefixing directory where data will
be saved.
resolution (str): temporal resolution. Wither 'hourly', 'monthly' or 'annual'.
skip_outputs (bool): whether to save data or not.
plant_attributes (pd.DataFrame): the plant static attributes table.
"""
if not skip_outputs:
if resolution == "hourly":
Expand Down Expand Up @@ -209,7 +271,6 @@ def output_plant_data(
path_prefix,
skip_outputs,
)

elif resolution == "monthly":
# output monthly data
output_to_results(
Expand Down Expand Up @@ -242,17 +303,19 @@ def output_plant_data(
)


def convert_results(df):
"""
Take df in US units (used throughout pipeline).
Return a df with metric units.
ASSUMPTIONS:
* Columns to convert have names of form
`co2_lb_per_mwh_produced` (mass),
`co2_lb_per_mwh_produced_for_electricity` (rate),
`fuel_consumed_mmbtu` (mass)
meaning that unit to convert is ALWAYS in numerator
def convert_results(df: pd.DataFrame) -> pd.DataFrame:
"""Convert values in data frame from US units to metric units.
Note:
Columns to convert have names of form: 'co2_lb_per_mwh_produced' (mass),
'co2_lb_per_mwh_produced_for_electricity' (rate) and 'uel_consumed_mmbtu'
(mass) meaning that nit to convert is always in numerator.
Args:
df (pd.DataFrame): data frame in US units.
Returns:
pd.DataFrame: data frame in metric units.
"""
converted = df.copy(deep=True)
for column in converted.columns:
Expand All @@ -270,32 +333,43 @@ def convert_results(df):
return converted


def write_generated_averages(ba_fuel_data, year, path_prefix, skip_outputs):
def write_generated_averages(
ba_fuel_data: pd.DataFrame, year: int, path_prefix: str, skip_outputs: bool
):
"""Outputs generated averaged emission.
Args:
ba_fuel_data (pd.DataFrame): plant data aggregated by BA and fuel type.
year (int): a four-digit year indicating when the data were taken.
path_prefix (str): name of base directory prefixing directory where data will
be saved.
skip_outputs (bool): whether to save data or not.
"""
if not skip_outputs:
avg_fuel_type_production = (
fuel_type_production = (
ba_fuel_data.groupby(["fuel_category"]).sum(numeric_only=True).reset_index()
)
# Add row for total before taking rates
total = avg_fuel_type_production.mean(numeric_only=True).to_frame().T
total.loc[0, "fuel_category"] = "total"
avg_fuel_type_production = pd.concat([avg_fuel_type_production, total], axis=0)

# Find rates
# Add row for total
total_production = fuel_type_production.sum(numeric_only=True).to_frame().T
total_production.loc[0, "fuel_category"] = "total"
production = pd.concat([fuel_type_production, total_production], axis=0)

# Calculate rates
for emission_type in ["_for_electricity", "_for_electricity_adjusted"]:
for emission in ["co2", "ch4", "n2o", "co2e", "nox", "so2"]:
avg_fuel_type_production[
f"generated_{emission}_rate_lb_per_mwh{emission_type}"
] = (
production[f"generated_{emission}_rate_lb_per_mwh{emission_type}"] = (
(
avg_fuel_type_production[f"{emission}_mass_lb{emission_type}"]
/ avg_fuel_type_production["net_generation_mwh"]
production[f"{emission}_mass_lb{emission_type}"]
/ production["net_generation_mwh"]
)
.replace(np.inf, np.NaN)
.replace(-np.inf, np.NaN)
.fillna(0)
)

output_intermediate_data(
avg_fuel_type_production,
production,
"annual_generation_averages_by_fuel",
path_prefix,
year,
Expand All @@ -314,11 +388,25 @@ def write_plant_metadata(
skip_outputs: bool,
year: int,
):
"""
Outputs metadata for each subplant-month.
"""Outputs plant metadata or each subplant-month. Includes rows for subplants
aggregated to a synthetic plant, so users can see when a plant's subplants are
split across plant-level and synthetic hourly data files
Include rows for subplants aggregated to a synthetic plant, so users can see when a
plant's subplants are split across plant-level and synthetic hourly data files
Args:
plant_static_attributes (pd.DataFrame): plant static attributes table.
eia923_allocated (pd.DataFrame): allocated EIA-923 generation and fuel
consumption data.
cems (pd.DataFrame): CEMS data.
partial_cems_subplant (pd.DataFrame): subplant data for which there is partial
reporting in CEMS.
partial_cems_plant (pd.DataFrame): subplant data for which there is partial
plant data reporting in CEMS.
shaped_eia_data (pd.DataFrame): hourly generation profile derived from
monthly-level EIA data.
path_prefix (str): name of base directory prefixing directory where data will
be saved.
skip_outputs (bool): whether to save data or not.
year (int): a four-digit year indicating when data were taken.
"""

KEY_COLUMNS = [
Expand Down Expand Up @@ -424,37 +512,55 @@ def write_plant_metadata(
)


def round_table(table):
"""
Round each numeric column.
All values in a column have the same rounding.
Rounding for each col is based on the median non-zero value: if < 1, sigfigs = 3, else 2 decimal places
def round_table(table: pd.DataFrame) -> pd.DataFrame:
"""Round each numeric columns. All values in a column have the same rounding.
Rounding for each column is based on the median non-zero value.
Args:
table (pd.DataFrame): table whose numeric columns will be rounded.
Raises:
ValueError: if a column cannot be rouned.
Returns:
pd.DataFrame: data frame with rounded values.
"""
decimals = {}
# Iterate through numeric columns
for c in table.select_dtypes(include=np.number).columns:
# Non-zero minimum
val = table.loc[table[c] > 0, c].median()
if pd.isna(val): # if val is NaN, then this col has only NaN or only 0 values
# if val is NaN, then this col has only NaN or only 0 values
if pd.isna(val):
decimals[c] = 4
# >1 gets 2 decimals
elif val > 1:
decimals[c] = 2
else:
try:
decimals[c] = abs(math.floor(math.log10(val))) + 2
# Always 3 sigfigs (for median)
except ValueError:
logger.error(val)
raise Exception
raise ValueError
return table.round(decimals)


def write_power_sector_results(
ba_fuel_data, year, path_prefix, skip_outputs, include_hourly
ba_fuel_data: pd.DataFrame,
year: int,
path_prefix: str,
skip_outputs: bool,
include_hourly: bool,
):
"""
Helper function to write combined data by BA
"""Helper function to write combined data by BA
Args:
ba_fuel_data (pd.DataFrame): plant data aggregated by BA and fuel type.
year (int): a four-digit year indicating when data were taken.
path_prefix (str): name of base directory prefixing directory where data will
be saved.
skip_outputs (bool): whether to save power sector results or not.
include_hourly (bool): whether to include hourly results in addition to
monthly and yearly results
"""

data_columns = [
Expand Down Expand Up @@ -491,7 +597,8 @@ def write_power_sector_results(
for ba in list(ba_fuel_data.ba_code.unique()):
if not isinstance(ba, str):
logger.warning(
f"not aggregating {sum(ba_fuel_data.ba_code.isna())} plants with numeric BA {ba}"
f"not aggregating {sum(ba_fuel_data.ba_code.isna())} plants "
f"with numeric BA {ba}"
)
continue

Expand Down Expand Up @@ -636,6 +743,7 @@ def add_generated_emission_rate_columns(df: pd.DataFrame) -> pd.DataFrame:
Args:
df (pd.DataFrame): data frame with emission data.
Returns:
pd.DataFrame: data frame with the additional generated emission rate columns.
"""
Expand Down

0 comments on commit f4a1253

Please sign in to comment.