Skip to content

Commit

Permalink
Merge pull request #2804 from catalyst-cooperative/nightly-build-2023…
Browse files Browse the repository at this point in the history
…-08-22

Merge dev into main for 2023-08-22
  • Loading branch information
zaneselvans authored Aug 22, 2023
2 parents a484746 + 3b983b9 commit bfcf495
Show file tree
Hide file tree
Showing 15 changed files with 147 additions and 102 deletions.
10 changes: 5 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,18 @@ What data is available?

PUDL currently integrates data from:

* `EIA Form 860 <https://www.eia.gov/electricity/data/eia860/>`__: 2001-2021
* `EIA Form 860m <https://www.eia.gov/electricity/data/eia860m/>`__: 2022-06
* `EIA Form 861 <https://www.eia.gov/electricity/data/eia861/>`__: 2001-2021
* `EIA Form 923 <https://www.eia.gov/electricity/data/eia923/>`__: 2001-2021
* `EIA Form 860 <https://www.eia.gov/electricity/data/eia860/>`__: 2001-2022
* `EIA Form 860m <https://www.eia.gov/electricity/data/eia860m/>`__: 2023-06
* `EIA Form 861 <https://www.eia.gov/electricity/data/eia861/>`__: 2001-2022
* `EIA Form 923 <https://www.eia.gov/electricity/data/eia923/>`__: 2001-2022
* `EPA Continuous Emissions Monitoring System (CEMS) <https://campd.epa.gov/>`__: 1995-2021
* `FERC Form 1 <https://www.ferc.gov/industries-data/electric/general-information/electric-industry-forms/form-1-electric-utility-annual>`__: 1994-2021
* `FERC Form 714 <https://www.ferc.gov/industries-data/electric/general-information/electric-industry-forms/form-no-714-annual-electric/data>`__: 2006-2020
* `US Census Demographic Profile 1 Geodatabase <https://www.census.gov/geographies/mapping-files/2010/geo/tiger-data.html>`__: 2010

Thanks to support from the `Alfred P. Sloan Foundation Energy & Environment
Program <https://sloan.org/programs/research/energy-and-environment>`__, from
2021 to 2023 we will be integrating the following data as well:
2021 to 2024 we will be integrating the following data as well:

* `EIA Form 176 <https://www.eia.gov/dnav/ng/TblDefs/NG_DataSources.html#s176>`__
(The Annual Report of Natural Gas Supply and Disposition)
Expand Down
2 changes: 2 additions & 0 deletions docs/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ Data Coverage

* Updated :doc:`data_sources/eia860` to include early release data from 2022.
* Updated :doc:`data_sources/eia923` to include early release data from 2022.
* Updated :doc:`data_sources/epacems` to switch from the old FTP server to the new
CAMPD API, and to include 2022 data.
* New :ref:`epacamd_eia` crosswalk version v0.3, see issue :issue:`2317` and PR
:pr:`2316`. EPA's updates add manual matches and exclusions focusing on operating
units with a generator ID as of 2018.
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ dependencies = [
"coloredlogs>=14.0,<15.1", # Dagster requires 14.0
"dagster-webserver>=1.4,<1.5", # 1.2.2 is first version to support Python 3.11
"dagster>=1.4,<1.5", # 1.2.2 is first version to support Python 3.11
"dask>=2021.8,<2023.8.1",
"dask>=2021.8,<2023.8.2",
"datapackage>=1.11,<1.16", # Transition datastore to use frictionless.
"fsspec>=2021.7,<2023.6.1", # For caching datastore on GCS
"gcsfs>=2021.7,<2023.6.1", # For caching datastore on GCS
Expand Down Expand Up @@ -128,7 +128,7 @@ doc = [
"sphinx-autoapi>=1.8,<2.2",
"sphinx-issues>=1.2,<3.1",
"sphinx-reredirects",
"sphinx>=4,!=5.1.0,<7.2",
"sphinx>=4,!=5.1.0,<7.3",
"sphinxcontrib_bibtex>=2.4,<2.6",
]
test = [
Expand All @@ -153,7 +153,7 @@ test = [
"pytest>=6.2,<7.5",
"responses>=0.14,<0.24",
"rstcheck[sphinx]>=5.0,<6.2",
"tox>=4.0,<4.9",
"tox>=4.0,<4.10",
]
datasette = [
"datasette>=0.60,<0.65",
Expand Down
3 changes: 2 additions & 1 deletion src/pudl/etl/epacems_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ def process_single_year(
for state in epacems_settings.states:
logger.info(f"Processing EPA CEMS hourly data for {year}-{state}")
df = pudl.extract.epacems.extract(year=year, state=state, ds=ds)
df = pudl.transform.epacems.transform(df, epacamd_eia, plants_entity_eia)
if not df.empty: # If state-year combination has data
df = pudl.transform.epacems.transform(df, epacamd_eia, plants_entity_eia)
table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)

# Write to a directory of partitioned parquet files
Expand Down
176 changes: 98 additions & 78 deletions src/pudl/extract/epacems.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,74 @@
"""Retrieve data from EPA CEMS hourly zipped CSVs.
Prior to August 2023, this data was retrieved from an FTP server. After August 2023,
this data is now retrieved from the CEMS API. The format of the files has changed from
monthly CSVs for each state to one CSV per state per year. The names of the columns
have also changed. Column name compatibility was determined by reading the CEMS API
documentation on column names.
Presently, this module is where the CEMS columns are renamed and dropped. Any columns in
the IGNORE_COLS dictionary are excluded from the final output. All of these columns are
calculable rates, measurement flags, or descriptors (like facility name) that can be
accessed by merging this data with the EIA860 plants entity table. We also remove the
`FACILITY_ID` field because it is internal to the EPA's business accounting database and
`UNIT_ID` field because it's a unique (calculable) identifier for plant_id and
emissions_unit_id (previously `UNITID`) groupings. It took a minute to verify the
difference between the `UNITID` and `UNIT_ID` fields, but coorespondance with the EPA's
CAMD team cleared this up.
`FACILITY_ID` field because it is internal to the EPA's business accounting database.
Pre-transform, the `plant_id_epa` field is a close but not perfect indicator for
`plant_id_eia`. In the raw data it's called `ORISPL_CODE` but that's not entirely
accurate. The epacamd_eia crosswalk will show that the mapping between `ORISPL_CODE` as
it appears in CEMS and the `plant_id_eia` field used in EIA data. Hense, we've called it
`plant_id_epa` until it gets transformed into `plant_id_eia` during the transform
process with help from the crosswalk.
`plant_id_eia`. In the raw data it's called `Facility ID` (ORISPL code) but that's not
entirely accurate. The epacamd_eia crosswalk will show that the mapping between
`Facility ID` as it appears in CEMS and the `plant_id_eia` field used in EIA data.
Hence, we've called it `plant_id_epa` until it gets transformed into `plant_id_eia`
during the transform process with help from the crosswalk.
"""
from pathlib import Path
from typing import NamedTuple
from zipfile import ZipFile

import pandas as pd

import pudl.logging_helpers
from pudl.metadata.classes import Resource
from pudl.workspace.datastore import Datastore

logger = pudl.logging_helpers.get_logger(__name__)

# EPA CEMS constants #####
RENAME_DICT = {
"STATE": "state",
"FACILITY_NAME": "plant_name", # Not reading from CSV
"ORISPL_CODE": "plant_id_epa", # Not quite the same as plant_id_eia
"UNITID": "emissions_unit_id_epa",
########################################################################################
# EPA CEMS constants for API CSV files #####

API_RENAME_DICT = {
"State": "state",
"Facility Name": "plant_name", # Not reading from CSV
"Facility ID": "plant_id_epa", # unique facility id for internal EPA database management (ORIS code)
"Unit ID": "emissions_unit_id_epa",
"Associated Stacks": "associated_stacks",
# These op_date, op_hour, and op_time variables get converted to
# operating_date, operating_datetime and operating_time_interval in
# transform/epacems.py
"OP_DATE": "op_date",
"OP_HOUR": "op_hour",
"OP_TIME": "operating_time_hours",
"GLOAD (MW)": "gross_load_mw",
"GLOAD": "gross_load_mw",
"SLOAD (1000 lbs)": "steam_load_1000_lbs",
"SLOAD (1000lb/hr)": "steam_load_1000_lbs",
"SLOAD": "steam_load_1000_lbs",
"SO2_MASS (lbs)": "so2_mass_lbs",
"SO2_MASS": "so2_mass_lbs",
"SO2_MASS_MEASURE_FLG": "so2_mass_measurement_code",
"SO2_RATE (lbs/mmBtu)": "so2_rate_lbs_mmbtu", # Not reading from CSV
"SO2_RATE": "so2_rate_lbs_mmbtu", # Not reading from CSV
"SO2_RATE_MEASURE_FLG": "so2_rate_measure_flg", # Not reading from CSV
"NOX_RATE (lbs/mmBtu)": "nox_rate_lbs_mmbtu",
"NOX_RATE": "nox_rate_lbs_mmbtu", # Not reading from CSV
"NOX_RATE_MEASURE_FLG": "nox_rate_measurement_code", # Not reading from CSV
"NOX_MASS (lbs)": "nox_mass_lbs",
"NOX_MASS": "nox_mass_lbs",
"NOX_MASS_MEASURE_FLG": "nox_mass_measurement_code",
"CO2_MASS (tons)": "co2_mass_tons",
"CO2_MASS": "co2_mass_tons",
"CO2_MASS_MEASURE_FLG": "co2_mass_measurement_code",
"CO2_RATE (tons/mmBtu)": "co2_rate_tons_mmbtu", # Not reading from CSV
"CO2_RATE": "co2_rate_tons_mmbtu", # Not reading from CSV
"CO2_RATE_MEASURE_FLG": "co2_rate_measure_flg", # Not reading from CSV
"HEAT_INPUT (mmBtu)": "heat_content_mmbtu",
"HEAT_INPUT": "heat_content_mmbtu",
"FAC_ID": "facility_id", # unique facility id for internal EPA database management
"UNIT_ID": "unit_id_what", # unique unit id for internal EPA database management
"Date": "op_date",
"Hour": "op_hour",
"Operating Time": "operating_time_hours",
"Gross Load (MW)": "gross_load_mw",
"Steam Load (1000 lb/hr)": "steam_load_1000_lbs",
"SO2 Mass (lbs)": "so2_mass_lbs",
"SO2 Mass Measure Indicator": "so2_mass_measurement_code",
"SO2 Rate (lbs/mmBtu)": "so2_rate_lbs_mmbtu", # Not reading from CSV
"SO2 Rate Measure Indicator": "so2_rate_measure_flg", # Not reading from CSV
"NOx Rate (lbs/mmBtu)": "nox_rate_lbs_mmbtu", # Not reading from CSV
"NOx Rate Measure Indicator": "nox_rate_measurement_code", # Not reading from CSV
"NOx Mass (lbs)": "nox_mass_lbs",
"NOx Mass Measure Indicator": "nox_mass_measurement_code",
"CO2 Mass (short tons)": "co2_mass_tons",
"CO2 Mass Measure Indicator": "co2_mass_measurement_code",
"CO2 Rate (short tons/mmBtu)": "co2_rate_tons_mmbtu", # Not reading from CSV
"CO2 Rate Measure Indicator": "co2_rate_measure_flg", # Not reading from CSV
"Heat Input (mmBtu)": "heat_content_mmbtu",
"Heat Input Measure Indicator": "heat_content_measure_flg",
"Primary Fuel Type": "primary_fuel_type",
"Secondary Fuel Type": "secondary_fuel_type",
"Unit Type": "unit_type",
"SO2 Controls": "so2_controls",
"NOx Controls": "nox_controls",
"PM Controls": "pm_controls",
"Hg Controls": "hg_controls",
"Program Code": "program_code",
}
"""Dict: A dictionary containing EPA CEMS column names (keys) and replacement names to
use when reading those columns into PUDL (values).
Expand All @@ -75,19 +77,22 @@
"""

# Any column that exactly matches one of these won't be read
IGNORE_COLS = {
"FACILITY_NAME",
"SO2_RATE (lbs/mmBtu)",
"SO2_RATE",
"SO2_RATE_MEASURE_FLG",
"CO2_RATE (tons/mmBtu)",
"CO2_RATE",
"CO2_RATE_MEASURE_FLG",
"NOX_RATE_MEASURE_FLG",
"NOX_RATE",
"NOX_RATE (lbs/mmBtu)",
"FAC_ID",
"UNIT_ID",
API_IGNORE_COLS = {
"Facility Name",
"SO2 Rate (lbs/mmBtu)",
"SO2 Rate Measure Indicator",
"CO2 Rate (tons/mmBtu)",
"CO2 Rate Measure Indicator",
"NOx Rate (lbs/mmBtu)",
"NOX Rate Measure Indicator",
"Primary Fuel Type",
"Secondary Fuel Type",
"Unit Type",
"SO2 Controls",
"NOx Controls",
"PM Controls",
"Hg Controls",
"Program Code",
}
"""Set: The set of EPA CEMS columns to ignore when reading data."""

Expand All @@ -106,9 +111,9 @@ def get_filters(self):
"""Returns filters for retrieving given partition resource from Datastore."""
return dict(year=self.year, state=self.state.lower())

def get_monthly_file(self, month: int) -> Path:
"""Returns the filename (without suffix) that contains the monthly data."""
return Path(f"{self.year}{self.state.lower()}{month:02}")
def get_annual_file(self) -> Path:
"""Return the name of the CSV file that holds annual hourly data."""
return Path(f"epacems-{self.year}-{self.state.lower()}.csv")


class EpaCemsDatastore:
Expand All @@ -125,21 +130,28 @@ def __init__(self, datastore: Datastore):
self.datastore = datastore

def get_data_frame(self, partition: EpaCemsPartition) -> pd.DataFrame:
"""Constructs dataframe holding data for a given (year, state) partition."""
"""Constructs dataframe from a zipfile for a given (year, state) partition."""
archive = self.datastore.get_zipfile_resource(
"epacems", **partition.get_filters()
)
dfs = []
for month in range(1, 13):
mf = partition.get_monthly_file(month)
with archive.open(str(mf.with_suffix(".zip")), "r") as mzip:
with ZipFile(mzip, "r").open(
str(mf.with_suffix(".csv")), "r"
) as csv_file:
dfs.append(self._csv_to_dataframe(csv_file))
return pd.concat(dfs, sort=True, copy=False, ignore_index=True)

def _csv_to_dataframe(self, csv_file) -> pd.DataFrame:

# Get names of files in zip file
files = self.datastore.get_zipfile_file_names(archive)

# If archive has one csv file in it, this is a yearly CSV (archived after 08/23)
# and this CSV does not need to be concatenated.
if len(files) == 1 and files[0].endswith(".csv"):
with archive.open(str(partition.get_annual_file()), "r") as csv_file:
df = self._csv_to_dataframe(
csv_file, ignore_cols=API_IGNORE_COLS, rename_dict=API_RENAME_DICT
)
return df
else:
raise AssertionError(f"Unexpected archive format. Found files: {files}.")

def _csv_to_dataframe(
self, csv_file: Path, ignore_cols: dict[str, str], rename_dict: dict[str, str]
) -> pd.DataFrame:
"""Convert a CEMS csv file into a :class:`pandas.DataFrame`.
Args:
Expand All @@ -151,9 +163,9 @@ def _csv_to_dataframe(self, csv_file) -> pd.DataFrame:
return pd.read_csv(
csv_file,
index_col=False,
usecols=lambda col: col not in IGNORE_COLS,
usecols=lambda col: col not in ignore_cols,
low_memory=False,
).rename(columns=RENAME_DICT)
).rename(columns=rename_dict)


def extract(year: int, state: str, ds: Datastore):
Expand All @@ -169,4 +181,12 @@ def extract(year: int, state: str, ds: Datastore):
ds = EpaCemsDatastore(ds)
partition = EpaCemsPartition(state=state, year=year)
# We have to assign the reporting year for partitioning purposes
return ds.get_data_frame(partition).assign(year=year)
try:
df = ds.get_data_frame(partition).assign(year=year)
except KeyError: # If no state-year combination found, return empty df.
logger.warning(
f"No data found for {state} in {year}. Returning empty dataframe."
)
res = Resource.from_id("hourly_emissions_epacems")
df = res.format_df(pd.DataFrame())
return df
4 changes: 2 additions & 2 deletions src/pudl/metadata/dfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@
"""
subdivision_code,subdivision_name,country_code,country_name,subdivision_type,timezone_approx,state_id_fips,division_name_us_census,division_code_us_census,region_name_us_census,is_epacems_state
AB,Alberta,CAN,Canada,province,America/Edmonton,,,,,0
AK,Alaska,USA,United States of America,state,America/Anchorage,"02",Pacific Noncontiguous,PCN,West,0
AK,Alaska,USA,United States of America,state,America/Anchorage,"02",Pacific Noncontiguous,PCN,West,1
AL,Alabama,USA,United States of America,state,America/Chicago,"01",East South Central,ESC,South,1
AR,Arkansas,USA,United States of America,state,America/Chicago,"05",West South Central,WSC,South,1
AS,American Samoa,USA,United States of America,outlying_area,Pacific/Pago_Pago,"60",,,,0
Expand Down Expand Up @@ -351,7 +351,7 @@
OR,Oregon,USA,United States of America,state,America/Los_Angeles,"41",Pacific Contiguous,PCC,West,1
PA,Pennsylvania,USA,United States of America,state,America/New_York,"42",Middle Atlantic,MAT,Northeast,1
PE,Prince Edwards Island,CAN,Canada,province,America/Halifax,,,,,0
PR,Puerto Rico,USA,United States of America,outlying_area,America/Puerto_Rico,"72",,,,0
PR,Puerto Rico,USA,United States of America,outlying_area,America/Puerto_Rico,"72",,,,1
QC,Quebec,CAN,Canada,province,America/Montreal,,,,,0
RI,Rhode Island,USA,United States of America,state,America/New_York,"44",New England,NEW,Northeast,1
SC,South Carolina,USA,United States of America,state,America/New_York,"45",South Atlantic,SAT,South,1
Expand Down
2 changes: 1 addition & 1 deletion src/pudl/metadata/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@
},
"field_namespace": "epacems",
"working_partitions": {
"years": sorted(set(range(1995, 2022))),
"years": sorted(set(range(1995, 2023))),
"states": sorted(EPACEMS_STATES),
},
"contributors": [
Expand Down
4 changes: 3 additions & 1 deletion src/pudl/package_data/epacems/additional_epacems_plants.csv
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ plant_id_eia,plant_name_eia,last_date,state,latitude,longitude,fill_data_source_
880107,SPMT Marcus Hook Industrial Complex,2017-12-31,PA,39.8076,-75.4239,EPA CAMD web query
880108,Grain Processing Corporation,2018-12-31,IN,38.6552,-87.1814,EPA CAMD web query
880109,"Pratt Paper (OH), LLC",2020-10-13,OH,40.5379994,-84.1909398,Not found in EPA CAMD Avg OH Lat
55098,Frontera Energy Center,2016-01-01,TX,26.208000,-98.399200,In CEMS in 2019 but missing from EIA since 2016
55098,Frontera Energy Center,2016-01-01,TX,26.208,-98.3992,In CEMS in 2019 but missing from EIA since 2016
55120,SRW Cogen LP,2014-01-01,TX,30.054478,-93.757435,In CEMS in 2019 but missing from EIA since 2014
55248,Tait,2018-01-01,OH,39.727679,-84.209489,In CEMS in 2021 but missing from EIA since 2018
880110,Holston Army Ammunition Plant,2022-09-28,TN,36.5493,-82.6342,EPA CAMD web query
880102,"AES Puerto Rico, LP",2015-09-30,PR,17.9477,-66.154,EPA CAMD web query
6 changes: 3 additions & 3 deletions src/pudl/package_data/settings/etl_fast.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ ferc_to_sqlite_settings:
name: pudl-fast
title: PUDL Fast ETL
description: >
FERC 1 and EIA 860/923 from 2020 (output to SQLite) plus
EPA CEMS hourly emissions data from 2020 (output to Parquet).
FERC 1 data from 2020 and 2021, EIA 860/923 from 2020 and 2022 (output to SQLite) plus
EPA CEMS hourly emissions data from 2020 and 2022 (output to Parquet).
version: 0.1.0
datasets:
ferc1:
Expand Down Expand Up @@ -76,4 +76,4 @@ datasets:
# so if you're loading CEMS data for a particular year, you should
# also load the EIA 860 data for that year if possible
states: [ID, ME]
years: [2019, 2020, 2021]
years: [2020, 2022]
1 change: 1 addition & 0 deletions src/pudl/package_data/settings/etl_full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,5 @@ datasets:
2019,
2020,
2021,
2022,
]
2 changes: 1 addition & 1 deletion src/pudl/transform/epacems.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def convert_to_utc(df: pd.DataFrame, plant_utc_offset: pd.DataFrame) -> pd.DataF
# `parse_dates=True`, is >10x faster.
# Read the date as a datetime, so all the dates are midnight
op_datetime_naive=lambda x: pd.to_datetime(
x.op_date, format=r"%m-%d-%Y", exact=True, cache=True
x.op_date, format=r"%Y-%m-%d", exact=True, cache=True
)
+ pd.to_timedelta(x.op_hour, unit="h") # Add the hour
).merge(
Expand Down
Loading

0 comments on commit bfcf495

Please sign in to comment.