Skip to content

Commit

Permalink
Fix pylint errors
Browse files Browse the repository at this point in the history
  • Loading branch information
lmello committed Feb 7, 2024
1 parent 6069684 commit 9693db0
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 34 deletions.
160 changes: 132 additions & 28 deletions src/opencost_parquet_exporter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import requests
import json
import os
"""OpenCost parquet exporter.
This module exports data from OpenCost API to parquet format, making it suitable
for further analysis or storage in data warehouses.
"""
import sys
from datetime import datetime, timedelta
import os
import pandas as pd
import boto3
import requests
import botocore.exceptions as boto_exceptions


def get_config(
Expand All @@ -15,9 +20,43 @@ def get_config(
file_key_prefix=None,
aggregate_by=None,
step=None):
"""
Get configuration for the parquet exporter based on either provided
parameters or environment variables.
Parameters:
- hostname (str): Hostname for the OpenCost service,
defaults to the 'OPENCOST_PARQUET_SVC_HOSTNAME' environment variable,
or 'localhost' if the environment variable is not set.
- port (int): Port number for the OpenCost service,
defaults to the 'OPENCOST_PARQUET_SVC_PORT' environment variable,
or 9003 if the environment variable is not set.
- window_start (str): Start datetime window for fetching data, in ISO format,
defaults to the 'OPENCOST_PARQUET_WINDOW_START' environment variable,
or yesterday's date at 00:00:00 if not set.
- window_end (str): End datetime window for fetching data, in ISO format,
defaults to the 'OPENCOST_PARQUET_WINDOW_END' environment variable,
or yesterday's date at 23:59:59 if not set.
- s3_bucket (str): S3 bucket name to upload the parquet file,
defaults to the 'OPENCOST_PARQUET_S3_BUCKET' environment variable.
- file_key_prefix (str): Prefix for file keys within the S3 bucket or local filesystem,
defaults to the 'OPENCOST_PARQUET_FILE_KEY_PREFIX' environment
variable, or '/tmp/' if not set.
- aggregate_by (str): Criteria for aggregating data, separated by commas,
defaults to the 'OPENCOST_PARQUET_AGGREGATE' environment variable,
or 'namespace,pod,container' if not set.
- step (str): Granularity for the data aggregation,
defaults to the 'OPENCOST_PARQUET_STEP' environment variable,
or '1h' if not set.
Returns:
- dict: Configuration dictionary with keys for 'url', 'params', 's3_bucket',
'file_key_prefix', 'data_types', 'ignored_alloc_keys', and 'rename_columns_config'.
"""
config = {}

# If function was called passing parameters the default value is ignored and environment variable is also ignored.
# If function was called passing parameters the default value is ignored and environment
# variable is also ignored.
# This is done, so passing parameters have precedence to environment variables.
if hostname is None:
hostname = os.environ.get('OPENCOST_PARQUET_SVC_HOSTNAME', 'localhost')
Expand All @@ -38,15 +77,15 @@ def get_config(

if s3_bucket is not None:
config['s3_bucket'] = s3_bucket
config['url'] = "http://{}:{}/allocation/compute".format(hostname, port)
config['url'] = f"http://{hostname}:{port}/allocation/compute"
config['file_key_prefix'] = file_key_prefix
# If window is not specified assume we want yesterday data.
if window_start is None or window_end is None:
yesterday = datetime.strftime(
datetime.now() - timedelta(1), '%Y-%m-%d')
window_start = yesterday+'T00:00:00Z'
window_end = yesterday+'T23:59:59Z'
window = '{},{}'.format(window_start, window_end)
window = f"{window_start},{window_end}"
config['params'] = (
("window", window),
("aggregate", aggregate_by),
Expand Down Expand Up @@ -117,26 +156,48 @@ def get_config(


def request_data(config):
"""
Request data from the OpenCost service using the provided configuration.
Parameters:
- config (dict): Configuration dictionary with necessary URL and parameters for the API request.
Returns:
- dict or None: The response from the OpenCost API parsed as a dictionary, or None if an error
occurs.
"""
url, params = config['url'], config['params']
try:
response = requests.get(
url,
params=params,
# 15 seconds connect timeout
# No read timeout, in case it takes a long
timeout=(15,None)
)
response.raise_for_status()
if 'application/json' in response.headers['content-type']:
response_object = response.json()['data']
return response_object
else:
print(f"Invalid content type: {response.headers['content-type']}")
return None
print(f"Invalid content type: {response.headers['content-type']}")
return None
except (requests.exceptions.RequestException, requests.exceptions.Timeout,
requests.exceptions.TooManyRedirects, ValueError) as err:
requests.exceptions.TooManyRedirects, ValueError, KeyError) as err:
print(f"Request error: {err}")
return None


def process_result(result, config):
"""
Process raw results from the OpenCost API data request.
Parameters:
- result (dict): Raw response data from the OpenCost API.
- config (dict): Configuration dictionary with data types and other processing options.
Returns:
- DataFrame or None: Processed data as a Pandas DataFrame, or None if an error occurs.
"""
for split in result:
# Remove entry for unmounted pv's .
# this break the table schema in athena
Expand All @@ -151,38 +212,81 @@ def process_result(result, config):
processed_data.rename(
columns=config['rename_columns_config'], inplace=True)
processed_data = processed_data.astype(config['data_types'])
except Exception as err:
print(f"Error during panda dataframe processing {str(err)}")
except pd.errors.EmptyDataError as err:
print(f"No data: {err}")
return None
except pd.errors.ParserError as err:
print(f"Error parsing data: {err}")
return None
except pd.errors.MergeError as err:
print(f"Data merge error: {err}")
return None
except ValueError as err:
print(f"Value error: {err}")
return None
except KeyError as err:
print(f"Key error: {err}")
return None
return processed_data


def save_result(processed_result, config):
"""
Save the processed result either to the local filesystem or an S3 bucket
in parquet file format.
Parameters:
- processed_result (DataFrame): The processed data to save.
- config (dict): Configuration dictionary including keys for the S3 bucket,
file key prefix, and others.
Returns:
- None
"""
file_name = 'k8s_opencost.parquet'
window = datetime.strptime(config['window_start'], "%Y-%m-%dT%H:%M:%SZ")
parquet_prefix = '{}/year={}/month={}/day={}'.format(
config['file_key_prefix'], window.year, window.month, window.day)
parquet_prefix = f"{config['file_key_prefix']}/year={window.year}"\
f"/month={window.month}/day={window.day}"
try:
if config.get('s3_bucket', None):
uri = 's3://{}/{}/{}'.format(config['s3_bucket'],
parquet_prefix, filename)
uri = f"s3://{config['s3_bucket']}/{parquet_prefix}/{file_name}"
else:
uri = 'file://{}/{}'.format(parquet_prefix, file_name)
uri = f"file://{parquet_prefix}/{file_name}"
path = '/'+parquet_prefix
os.makedirs(path, 0o750, exist_ok=True)
processed_result.to_parquet(uri)
except Exception as err:
print(f"Error {str(err)}")

except pd.errors.EmptyDataError as ede:
print(f"Error: No data to save, the DataFrame is empty.{ede}")
except KeyError as ke:
print(f"Missing configuration key: {ke}")
except ValueError as ve:
print(f"Error parsing date format: {ve}")
except FileNotFoundError as fnfe:
print(f"File or directory not found: {fnfe}")
except PermissionError as pe:
print(f"Permission error: {pe}")
except boto_exceptions.NoCredentialsError:
print("Error: No AWS credentials found to access S3")
except boto_exceptions.PartialCredentialsError:
print("Error: Incomplete AWS credentials provided for accessing S3")
except boto_exceptions.ClientError as ce:
print(f"AWS Client Error: {ce}")

def main():
try:
config = get_config()
result = request_data(config)
processed_data = process_result(result, config)
save_result(processed_data, config)
except Exception as err:
print(f"Unexpected error {str(err)}")
"""
Main function to execute the workflow of fetching, processing, and saving data
for yesterday.
"""
config = get_config()
result = request_data(config)
if result is None:
print("Result is None. Aborting execution")
sys.exit(1)
processed_data = process_result(result, config)
if processed_data is None:
print("Processed data is None, aborting execution.")
sys.exit(1)
save_result(processed_data, config)


if __name__ == "__main__":
Expand Down
15 changes: 9 additions & 6 deletions src/test_opencost_parquet_exporter.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
""" Test cases for opencost-parquet-exporter."""
import unittest
import requests
import os
from datetime import datetime, timedelta
from unittest.mock import patch, MagicMock
import os
import requests
from opencost_parquet_exporter import get_config, request_data

class TestGetConfig(unittest.TestCase):
"""Test cases for get_config method"""
def test_get_config_with_env_vars(self):
"""Test get_config returns correct configurations based on environment variables."""
with patch.dict(os.environ, {
Expand All @@ -30,7 +32,7 @@ def test_get_config_defaults(self):
datetime.now() - timedelta(1), '%Y-%m-%d')
window_start = yesterday+'T00:00:00Z'
window_end = yesterday+'T23:59:59Z'
window = '{},{}'.format(window_start, window_end)
window = f"{window_start},{window_end}"

config = get_config()
self.assertEqual(config['url'], 'http://localhost:9003/allocation/compute')
Expand All @@ -48,7 +50,7 @@ def test_get_config_no_window_start(self):
datetime.now() - timedelta(1), '%Y-%m-%d')
window_start = yesterday+'T00:00:00Z'
window_end = yesterday+'T23:59:59Z'
window = '{},{}'.format(window_start, window_end)
window = f"{window_start},{window_end}"

config = get_config()
self.assertEqual(config['url'], 'http://localhost:9003/allocation/compute')
Expand All @@ -65,7 +67,7 @@ def test_get_config_no_window_end(self):
datetime.now() - timedelta(1), '%Y-%m-%d')
window_start = yesterday+'T00:00:00Z'
window_end = yesterday+'T23:59:59Z'
window = '{},{}'.format(window_start, window_end)
window = f"{window_start},{window_end}"

config = get_config()
self.assertEqual(config['url'], 'http://localhost:9003/allocation/compute')
Expand All @@ -74,6 +76,7 @@ def test_get_config_no_window_end(self):
self.assertEqual(config['params'][0][1], window)

class TestRequestData(unittest.TestCase):
""" Test request_data method """
@patch('opencost_parquet_exporter.requests.get')
def test_request_data_success(self, mock_get):
"""Test request_data successfully retrieves data when response is OK."""
Expand All @@ -93,7 +96,7 @@ def test_request_data_success(self, mock_get):
self.assertEqual(data, [{'key': 'value'}])

@patch('opencost_parquet_exporter.requests.get')
def test_request_data_success(self, mock_get):
def test_request_data_wrong_content_type(self, mock_get):
"""Test request_data successfully retrieves data when response is OK."""
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
Expand Down

0 comments on commit 9693db0

Please sign in to comment.