Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transform vceregen renewable generation profiles #3898

Merged
merged 106 commits into from
Oct 19, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
106 commits
Select commit Hold shift + click to select a range
9fadfff
Add source metadata for vceregen
aesharpe Sep 30, 2024
4036479
Add profiles to vceregen dataset name
aesharpe Sep 30, 2024
d8b992e
Remove blank line in description
aesharpe Sep 30, 2024
cac5509
Add blank Data Source template for vceregen
aesharpe Sep 30, 2024
a44a399
Add links to download docs section
aesharpe Sep 30, 2024
b8aa4e4
Add availability section
aesharpe Sep 30, 2024
65ddf74
Add respondents section
aesharpe Sep 30, 2024
d8071ed
Add original data section
aesharpe Sep 30, 2024
82be4d4
Stash WIP of extraction
e-belfer Oct 1, 2024
e044e93
Extract VCE tables to raw dask dfs
e-belfer Oct 2, 2024
57ad9a7
Clean up warnings and restore EIA 176
e-belfer Oct 2, 2024
b922328
Revert to pandas concatenation
e-belfer Oct 2, 2024
53934f2
Add latlonfips
e-belfer Oct 2, 2024
3c8cab6
Add blank transform module for vceregen
aesharpe Oct 3, 2024
9f7814b
Fill out the basic vceregen transforms
aesharpe Oct 4, 2024
600b3e2
Add underscores back to function names
aesharpe Oct 5, 2024
e6242f2
Update time col calculation
aesharpe Oct 7, 2024
130f1f0
Update docstrings and comments to reflect new time cols
aesharpe Oct 7, 2024
339d791
Change merge to concat
aesharpe Oct 7, 2024
3677f78
Remove dask, coerce dtypes on read-in
e-belfer Oct 8, 2024
c870b63
override load_column_maps behavior
e-belfer Oct 8, 2024
4a4511d
Merge branch 'main' into extract-vceregen
e-belfer Oct 8, 2024
a91d073
Update addition of county and state name fields
aesharpe Oct 8, 2024
076b113
Merge branch 'extract-vceregen' into transform-vceregen
aesharpe Oct 8, 2024
0b3fa45
Add vceregen to init files and metadata so that it will run on dagste…
aesharpe Oct 8, 2024
79c6016
Add resource metadata for vcregen
aesharpe Oct 8, 2024
54fd155
Clean county strings more
aesharpe Oct 9, 2024
6d14dae
Add release notes
aesharpe Oct 9, 2024
deae1e2
Add function to validate state_county_names and improve performance o…
aesharpe Oct 9, 2024
63d2666
make for loops into dict comp, update loggers, and improve regex
aesharpe Oct 9, 2024
24f4fb5
Add asset checks and remove inline checks
aesharpe Oct 9, 2024
da272f5
Change hour_utc to datetime_utc
aesharpe Oct 10, 2024
7bc1741
Remove incorrect docstring
aesharpe Oct 10, 2024
ae85f64
Update dataset and field metadata
aesharpe Oct 10, 2024
d669c74
Rename county col to county_or_subregion
aesharpe Oct 10, 2024
140a181
Merge branch 'transform-vceregen' into vceregen-docs
aesharpe Oct 11, 2024
7dbe5d2
Update data_source docs page
aesharpe Oct 11, 2024
98fd118
change axis=1 to axis=columns
aesharpe Oct 11, 2024
b6b5e6c
Merge branch 'main' into extract-vceregen
e-belfer Oct 11, 2024
291ba7d
Update DOI to sandbox and temporarily xfail DOI test
e-belfer Oct 11, 2024
44f3ae8
Merge branch 'extract-vceregen' into transform-vceregen
aesharpe Oct 11, 2024
8e6d88a
Change county_or_subregion to county_or_lake_name
aesharpe Oct 14, 2024
6a49f69
Change county_or_subregion to county_or_lake_name
aesharpe Oct 14, 2024
1f3666c
Merge branch 'transform-vceregen' of https://github.com/catalyst-coop…
aesharpe Oct 14, 2024
7319e7f
Update docs to explain solar cap fac
aesharpe Oct 14, 2024
08d7341
Merge branch 'main' into transform-vceregen
aesharpe Oct 15, 2024
3eaebe6
Update regen to rare
e-belfer Oct 16, 2024
b324123
Merge branch 'main' into extract-vceregen
e-belfer Oct 16, 2024
120451d
Merge branch 'extract-vceregen' of https://github.com/catalyst-cooper…
e-belfer Oct 16, 2024
5b98e60
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 16, 2024
9f6204e
Merge branch 'main' into extract-vceregen
e-belfer Oct 16, 2024
77d47a4
Update gsutil in zenodo-cache-sync
e-belfer Oct 16, 2024
adfff81
Merge branch 'extract-vceregen' of https://github.com/catalyst-cooper…
e-belfer Oct 16, 2024
ece9dab
Merge branch 'extract-vceregen' into transform-vceregen
e-belfer Oct 16, 2024
0e3792d
Merge branch 'extract-vceregen' into transform-vceregen
aesharpe Oct 16, 2024
93e4487
Merge branch 'transform-vceregen' of https://github.com/catalyst-coop…
aesharpe Oct 16, 2024
7e3c926
Rename vceregen to vcerare
aesharpe Oct 16, 2024
6dea332
Add back user project
e-belfer Oct 16, 2024
7554a36
Update project path
e-belfer Oct 16, 2024
9ada9f5
Update project to billing project
e-belfer Oct 16, 2024
4158afd
Update dockerfile to replace gsutil with gcloud storage
e-belfer Oct 16, 2024
069c246
Merge branch 'extract-vceregen' into transform-vceregen
e-belfer Oct 16, 2024
2ba9b16
Update docs/release_notes.rst
aesharpe Oct 16, 2024
e0f6524
Update docs/release_notes.rst
aesharpe Oct 16, 2024
f793776
Update docs/templates/vcerare_child.rst.jinja
aesharpe Oct 16, 2024
d41c44d
First batch of little docs fixes
aesharpe Oct 16, 2024
98c2f69
Restructure _combine_city_county_records function
aesharpe Oct 17, 2024
d7b59d5
Add link to zenodo archive to data source page
aesharpe Oct 17, 2024
178f0fb
Clarify 1 vs. 100 in data source page
aesharpe Oct 17, 2024
7a1ebe1
Spread out comments in the _prep_lat_long_fips_df function
aesharpe Oct 17, 2024
782e925
Update docstring for _prep_lat_long_fips_df
aesharpe Oct 17, 2024
58aa99f
Switch order of add_time_cols and make_cap_frac functions
aesharpe Oct 17, 2024
e494482
Update _combine_city_county_records and move assertion to asset checks
aesharpe Oct 17, 2024
c2f3f75
Change all().all() to any().any()
aesharpe Oct 17, 2024
ccaa4ae
Add validations to merges
aesharpe Oct 17, 2024
3913006
Resolve merge conflicts with main
aesharpe Oct 17, 2024
865756a
docs cleanup tidbits
aesharpe Oct 17, 2024
c838e63
Turn _combine_city_county_records function into _drop_city_records an…
aesharpe Oct 17, 2024
05376e8
Make fips columns categorical and narrow scope of regex
aesharpe Oct 17, 2024
ef1a243
data source docs updates
aesharpe Oct 17, 2024
78fe904
Add downloadable docs to vcerare data source and fix data source file…
aesharpe Oct 18, 2024
6becc52
Remove 1.34 from field description for capacity_factor_solar_pv
aesharpe Oct 18, 2024
c2d16ae
Add some logs and a function to null county_id_fips values from lakes…
aesharpe Oct 18, 2024
0d13365
Update solar_pv metadata
aesharpe Oct 18, 2024
6cde307
Update solar_pv metadata
aesharpe Oct 18, 2024
f356336
Merge branch 'main' into transform-vceregen
aesharpe Oct 18, 2024
d03eab3
Rename RARE dataset in the release notes
aesharpe Oct 18, 2024
73b70d9
Add issue number to release notes
aesharpe Oct 18, 2024
15e0f40
Merge branch 'transform-vceregen' of https://github.com/catalyst-coop…
aesharpe Oct 18, 2024
ff23cbe
Update field description for county_or_lake_name
aesharpe Oct 18, 2024
de63b12
Update docstring for transform module
aesharpe Oct 18, 2024
710ead0
Make all references to FIPS uppercase in notes and comments
aesharpe Oct 18, 2024
69b4f71
Correct inline comment in _null_non_county_fips_rows
aesharpe Oct 18, 2024
71af223
Fix asset check
aesharpe Oct 18, 2024
d1074f8
Minor late-night PR fixes
zaneselvans Oct 18, 2024
ae5fd8c
Log during VCE RARE asset checks to see what's slow.
zaneselvans Oct 18, 2024
06db957
Add simple notebook for processing vcerare data
aesharpe Oct 18, 2024
e6904ac
Re-enable Zenodo DOI validation unit test.
zaneselvans Oct 18, 2024
6516d7c
Update docs to use gcloud storage not gsutil
zaneselvans Oct 18, 2024
6f64a29
Try to reduce memory use & concurrency for VCE RARE dataset
zaneselvans Oct 18, 2024
a40b1f3
Retry policy for VCE + highmem use for VCE asset check.
zaneselvans Oct 18, 2024
077a47f
Bump VM RAM and remove very-high memory tag & retry
zaneselvans Oct 18, 2024
71256e4
Bump vCPUs to 16
zaneselvans Oct 18, 2024
cd78e56
Add fancy charts to notebook
aesharpe Oct 18, 2024
81297c9
Merge branch 'transform-vceregen' of https://github.com/catalyst-coop…
aesharpe Oct 18, 2024
bd13830
Add link to VCE data in nightly build outputs. Other docs tweaks.
zaneselvans Oct 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/pudl/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"raw_gridpathratoolkit": [pudl.extract.gridpathratoolkit],
"raw_phmsagas": [pudl.extract.phmsagas],
"raw_nrelatb": [pudl.extract.nrelatb],
"raw_vceregen": [pudl.extract.vceregen],
}


Expand Down
1 change: 1 addition & 0 deletions src/pudl/extract/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@
gridpathratoolkit,
nrelatb,
phmsagas,
vceregen,
xbrl,
)
106 changes: 81 additions & 25 deletions src/pudl/extract/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from collections import defaultdict
from typing import Any

import dask.dataframe as dd
import pandas as pd
from dagster import (
AssetsDefinition,
Expand All @@ -13,6 +14,7 @@
DynamicOutput,
In,
OpDefinition,
Out,
TypeCheckContext,
graph_asset,
op,
Expand All @@ -22,9 +24,63 @@

StrInt = str | int
PartitionSelection = list[StrInt] | tuple[StrInt] | StrInt
DataframeType = pd.DataFrame | dd.DataFrame

logger = pudl.logging_helpers.get_logger(__name__)

# Define some custom dagster data types
# 2024-03-27: Dagster can't automatically convert union types within
# parametrized types; we have to write our own custom DagsterType for now.


def _is_dict_str_strint(_context: TypeCheckContext, x: Any) -> bool:
if not isinstance(x, dict):
return False
for key, value in x.items():
if not isinstance(key, str):
return False
if not isinstance(value, str | int):
return False
return True


dagster_dict_str_strint = DagsterType(
name="dict[str, str | int]", type_check_fn=_is_dict_str_strint
)


def _is_dict_str_dataframe(_context: TypeCheckContext, x: Any) -> bool:
if not isinstance(x, dict):
return False
for key, value in x.items():
if not isinstance(key, str):
return False
if not isinstance(value, DataframeType):
return False
return True


dataframe_dagster_type = DagsterType(
name="DataFrame Type Check", type_check_fn=_is_dict_str_dataframe
)


def _is_list_dict_str_dataframe(_context: TypeCheckContext, x: Any) -> bool:
if not isinstance(x, list):
return False
for item in x:
for key, value in item.items():
if not isinstance(key, str):
return False
if not isinstance(value, DataframeType):
return False
return True


list_dataframe_dagster_type = DagsterType(
name="List DataFrame Type Check", type_check_fn=_is_list_dict_str_dataframe
)


class GenericMetadata:
"""Load generic metadata from Python package data.
Expand Down Expand Up @@ -197,7 +253,7 @@ def validate(self, df: pd.DataFrame, page: str, **partition: PartitionSelection)
f"\n{missing_raw_cols}"
)

def process_final_page(self, df: pd.DataFrame, page: str) -> pd.DataFrame:
def process_final_page(self, df: DataframeType, page: str) -> DataframeType:
"""Final processing stage applied to a page DataFrame."""
return df

Expand All @@ -214,7 +270,7 @@ def combine(self, dfs: list[pd.DataFrame], page: str) -> pd.DataFrame:

return self.process_final_page(df, page)

def extract(self, **partitions: PartitionSelection) -> dict[str, pd.DataFrame]:
def extract(self, **partitions: PartitionSelection) -> dict[str, DataframeType]:
"""Extracts dataframes.

Returns dict where keys are page names and values are
Expand Down Expand Up @@ -243,6 +299,7 @@ def extract(self, **partitions: PartitionSelection) -> dict[str, pd.DataFrame]:
current_page_dfs = [
pd.DataFrame(),
]

for partition in pudl.helpers.iterate_multivalue_dict(**partitions):
# we are going to skip
if self.source_filename(page, **partition) == "-1":
Expand All @@ -262,8 +319,12 @@ def extract(self, **partitions: PartitionSelection) -> dict[str, pd.DataFrame]:
return all_page_dfs


@op(tags={"memory-use": "high"})
def concat_pages(paged_dfs: list[dict[str, pd.DataFrame]]) -> dict[str, pd.DataFrame]:
@op(
tags={"memory-use": "high"},
ins={"paged_dfs": In(dagster_type=list[dataframe_dagster_type])},
out=Out(dagster_type=dataframe_dagster_type),
)
def concat_pages(paged_dfs: list[dict[str, DataframeType]]) -> dict[str, DataframeType]:
"""Concatenate similar pages of data from different years into single dataframes.

Transform a list of dictionaries of dataframes into a single dictionary of
Expand All @@ -284,39 +345,33 @@ def concat_pages(paged_dfs: list[dict[str, pd.DataFrame]]) -> dict[str, pd.DataF
A dictionary of DataFrames keyed by page name, where the DataFrame contains that
page's data from all extracted years concatenated together.
"""
# Figure out what's in each dataframe.
dtypes = [type(item) for dictionary in paged_dfs for item in dictionary.values()]

# Transform the list of dictionaries of dataframes into a dictionary of lists of
# dataframes, in which all dataframes in each list represent different instances of
# the same page of data from different years

all_data = defaultdict(list)
for dfs in paged_dfs:
for page in dfs:
all_data[page].append(dfs[page])

# concatenate the dataframes in each list in the dictionary into a single dataframe
for page in all_data:
all_data[page] = pd.concat(all_data[page]).reset_index(drop=True)
if all(x == pd.DataFrame for x in dtypes): # If all dfs are pandas dfs
logger.warn("Concatenating pandas dataframes.")
for page in all_data:
all_data[page] = pd.concat(all_data[page]).reset_index(drop=True)
elif all(x == dd.DataFrame for x in dtypes): # If all dfs are dask dfs
logger.warn("Concatenating pandas dataframes.")
for page in all_data:
all_data[page] = dd.concat(all_data[page])
else:
raise AssertionError(f"Concatenation not supported for dtypes: {dtypes}")

return all_data


def _is_dict_str_strint(_context: TypeCheckContext, x: Any) -> bool:
if not isinstance(x, dict):
return False
for key, value in x.items():
if not isinstance(key, str):
return False
if not isinstance(value, str | int):
return False
return True


# 2024-03-27: Dagster can't automatically convert union types within
# parametrized types; we have to write our own custom DagsterType for now.
dagster_dict_str_strint = DagsterType(
name="dict[str, str | int]", type_check_fn=_is_dict_str_strint
)


def partition_extractor_factory(
extractor_cls: type[GenericExtractor], name: str
) -> OpDefinition:
Expand All @@ -331,10 +386,11 @@ def partition_extractor_factory(
required_resource_keys={"datastore"},
name=f"extract_single_{name}_partition",
ins={"part_dict": In(dagster_type=dagster_dict_str_strint)},
out=Out(dagster_type=dataframe_dagster_type),
)
def extract_single_partition(
context, part_dict: dict[str, str | int]
) -> dict[str, pd.DataFrame]:
) -> dict[str, DataframeType]:
"""A function that extracts a year of spreadsheet data from an Excel file.

This function will be decorated with a Dagster op and returned.
Expand Down
183 changes: 183 additions & 0 deletions src/pudl/extract/vceregen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
"""Extract VCE renewable generation profile data from CSVs.

This dataset has 1,000s of columns, so we don't want to manually specify a rename on
import because we'll pivot these to a column. We adapt the standard extraction
infrastructure to simply read in the data.

Each annual zip folder contains a folder with three files:
Wind_Power_140m_Offshore_county.csv
Wind_Power_100m_Onshore_county.csv
Fixed_SolarPV_Lat_UPV_county.csv

The drive also contains one more file: RA_county_lat_long_FIPS_table.csv. This file is
not partitioned, so we always read it in regardless of the partitions configured for the
run.
"""

from io import BytesIO

import pandas as pd
from dagster import AssetsDefinition, Output, asset

from pudl import logging_helpers
from pudl.extract.csv import CsvExtractor
from pudl.extract.extractor import GenericMetadata, PartitionSelection, raw_df_factory

logger = logging_helpers.get_logger(__name__)

VCEREGEN_PAGES = [
"offshore_wind_power_140m",
"onshore_wind_power_100m",
"fixed_solar_pv_lat_upv",
]


class VCEMetadata(GenericMetadata):
"""Special metadata class for VCE renewable generation profiles."""

def __init__(self, *args, **kwargs):
"""Initialize the module.

Args:
ds (:class:datastore.Datastore): Initialized datastore.
"""
super().__init__(*args, **kwargs)
self._file_name = self._load_csv(self._pkg, "file_map.csv")

def get_all_pages(self) -> list[str]:
"""Hard code the page names, which usually are pulled from column rename spreadsheets."""
return VCEREGEN_PAGES

def get_file_name(self, page, **partition):
"""Returns file name of given partition and page."""
return self._file_name.loc[page, str(self._get_partition_selection(partition))]


class Extractor(CsvExtractor):
"""Extractor for VCE renewable generation profiles."""

def __init__(self, *args, **kwargs):
"""Initialize the module.

Args:
ds (:class:datastore.Datastore): Initialized datastore.
"""
self.METADATA = VCEMetadata("vceregen")
super().__init__(*args, **kwargs)

def get_column_map(self, page, **partition):
"""Return empty dictionary, we don't rename these files."""
return {}

def source_filename(self, page: str, **partition: PartitionSelection) -> str:
"""Produce the CSV file name as it will appear in the archive.

The files are nested in an additional folder with the year name inside of the
zipfile, so we add a prefix folder based on the yearly partition to the source
filename.

Args:
page: pudl name for the dataset contents, eg "boiler_generator_assn" or
"coal_stocks"
partition: partition to load. Examples:
{'year': 2009}
{'year_month': '2020-08'}

Returns:
string name of the CSV file
"""
return f"{partition['year']}/{self._metadata.get_file_name(page, **partition)}"

def load_source(self, page: str, **partition: PartitionSelection) -> pd.DataFrame:
"""Produce the dataframe object for the given partition.

Args:
page: pudl name for the dataset contents, eg "boiler_generator_assn" or
"data"
partition: partition to load. Examples:
{'year': 2009}
{'year_month': '2020-08'}

Returns:
pd.DataFrame instance containing CSV data
"""
with (
self.ds.get_zipfile_resource(self._dataset_name, **partition) as zf,
):
# # Get path to zipfile
# zippath = zf.filename
# Get list of file names in the zipfile
files = zf.namelist()
# Get the particular file of interest
file = next(
(x for x in files if self.source_filename(page, **partition) in x), None
)
# # Read it in using dask
df = pd.read_csv(BytesIO(zf.read(file)), **self.READ_CSV_KWARGS)

return df

def process_raw(
self, df: pd.DataFrame, page: str, **partition: PartitionSelection
) -> pd.DataFrame:
"""Append report year to df to distinguish data from other years."""
self.cols_added.append("report_year")
selection = self._metadata._get_partition_selection(partition)
return df.assign(report_year=selection)

def validate(
self, df: pd.DataFrame, page: str, **partition: PartitionSelection
) -> pd.DataFrame:
"""Skip this step, as we aren't renaming any columns."""
return df

def combine(self, dfs: list[pd.DataFrame], page: str) -> pd.DataFrame:
"""Concatenate dataframes into one, take any special steps for processing final page."""
# dfs = [dd.from_pandas(df, npartitions=2) for df in dfs]
# df = dd.concat(dfs)
# # TODO: Confirm that using pandas is preferable. Otherwise revert to this code.
df = pd.concat(dfs, sort=True, ignore_index=True)

return self.process_final_page(df, page)


raw_vceregen__all_dfs = raw_df_factory(Extractor, name="vceregen")


def raw_vceregen_asset_factory(part: str) -> AssetsDefinition:
"""An asset factory for VCE hourly renewable generation profiles."""
asset_kwargs = {
"name": f"raw_vceregen__{part}",
"required_resource_keys": {"datastore", "dataset_settings"},
"compute_kind": "Python",
}

@asset(**asset_kwargs)
def _extract(context, raw_vceregen__all_dfs):
"""Extract raw GridPath RA Toolkit renewable energy generation profiles.

Args:
context: dagster keyword that provides access to resources and config.
"""
return Output(value=raw_vceregen__all_dfs[part])

return _extract


raw_vceregen_assets = [raw_vceregen_asset_factory(part) for part in VCEREGEN_PAGES]


@asset(required_resource_keys={"datastore", "dataset_settings"})
def raw_vcegen__lat_lon_fips(context) -> pd.DataFrame:
"""Extract lat/lon to FIPS and county mapping CSV.

This dataframe is static, so it has a distinct partition from the other datasets and
its extraction is controlled by a boolean in the ETL run.
"""
ds = context.resources.datastore
partition_settings = context.resources.dataset_settings.vceregen
if partition_settings.fips:
return pd.read_csv(
BytesIO(ds.get_unique_resource("vceregen", fips=partition_settings.fips))
)
return pd.DataFrame() # TODO: What makes sense here?
Loading