Skip to content

Commit

Permalink
Merge pull request #66 from RADAR-base/dev
Browse files Browse the repository at this point in the history
Initial Release of RADAR-Pipeline
  • Loading branch information
Hsankesara authored Jul 20, 2023
2 parents 399f0ff + f70f931 commit 0e1f97d
Show file tree
Hide file tree
Showing 50 changed files with 2,962 additions and 328 deletions.
4 changes: 2 additions & 2 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[flake8]
ignore = E203, E266, E501, W503, F403, F401
ignore = E203, E266, W503, F403, F401
max-line-length = 88
max-complexity = 18
max-complexity = 15
select = B,C,E,F,W,T4,B9
44 changes: 44 additions & 0 deletions .github/workflows/lint-and-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# This workflow will install Python dependencies, run tests and lint with a variety of Python versions
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python

name: Lint and Test

on:
push:
branches: ['main', 'dev']
pull_request:
branches: ['main', 'dev']

jobs:
build:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ['3.8', '3.9', '3.10']

steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
- name: Checkout submodules
run: git submodule update --init --recursive
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install flake8 pytest
pip install -r requirements.txt
- name: Install project
run: |
pip install -e .
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 radarpipeline/ --count --select=B,C,E,F,W,T4,B9 --ignore=E203,E266,W503,F403,F401 --show-source --statistics
# exit-zero treats all errors as warnings.
flake8 radarpipeline/ --count --exit-zero --max-complexity=15 --max-line-length=88 --statistics
- name: Test with pytest
run: |
pytest --cov=radarpipeline tests/
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
__pycache__/
*.py[cod]
*$py.class
*.DS_STORE

# C extensions
*.so
Expand Down Expand Up @@ -139,3 +140,6 @@ data/
features/
temp/
output/
tmp/

.DS_STORE
2 changes: 1 addition & 1 deletion .isort.cfg
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[settings]
known_third_party = git,pandas,pyspark,requests,setuptools,yaml
known_third_party = git,pandas,pyspark,pytest,requests,setuptools,yaml
profile = black
19 changes: 19 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# syntax=docker/dockerfile:1

FROM openjdk:slim

COPY --from=python:3.9 / /

RUN pip install --upgrade pip

WORKDIR /radarpipeline

COPY requirements.txt requirements.txt

RUN pip install -r requirements.txt

COPY . .

RUN pip install -e .

CMD ["python", "."]
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<h1 align="center">RADAR Pipeline</h1>

<p align="center">
<a href="https://github.com/RADAR-base/radarpipeline"><img alt="GitHub branch checks state" src="https://img.shields.io/github/checks-status/RADAR-base/radarpipeline/main"></a>
<a href="https://github.com/RADAR-base/radarpipeline/issues"><img alt="GitHub issues" src="https://img.shields.io/github/issues/RADAR-base/radarpipeline"></a>
<a href="https://github.com/thepushkarp/radarpipeline/pulls"><img alt="GitHub pull requests" src="https://img.shields.io/github/issues-pr/radar-base/radarpipeline"></a>
<a href="https://github.com/RADAR-base/radarpipeline/network"><img alt="GitHub forks" src="https://img.shields.io/github/forks/RADAR-base/radarpipeline"></a>
Expand All @@ -24,6 +25,10 @@ Wiki resources:
- [Contributor Guide](https://github.com/RADAR-base/radarpipeline/wiki/Contributor-Guide)
- [Mock Pipeline](https://github.com/RADAR-base/radarpipeline/wiki/Mock-Pipeline)
- [Configuration](https://github.com/RADAR-base/radarpipeline/wiki/Configuration)
- [Data Ingestion](https://github.com/RADAR-base/radarpipeline/wiki/Data-Ingestion)
- [Setup](https://github.com/RADAR-base/radarpipeline/wiki/Setup)
- [Pipeline Core Topics](https://github.com/RADAR-base/radarpipeline/wiki/Pipeline-Core-Topics)
- [Creating Citable Analytics Pipelines](https://github.com/RADAR-base/radarpipeline/wiki/Creating-Citable-Analytics-Pipelines)

## How to run

Expand Down
35 changes: 29 additions & 6 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,44 @@ project:
description: mock_description
version: mock_version

input_data:
data_location: mock
local_directory: mockdata/mockdata
input:
data_type: mock # couldbe mock, local, sftp, s3
config:
# In case of sftp, use the following format
# sftp_host:
# sftp_source_path:
# sftp_username:
# sftp_private_key:
# sftp_target_path:
# In case of s3, use the following format
# aws_access_key_id:
# aws_secret_access_key:
# region_name:
# s3_access_url:
# bucket:
# prefix:
# In case of local or Mock, use the following format
source_path: mockdata/mockdata
data_format: csv

configurations:
df_type: 'pandas'

features:
- location: 'https://github.com/RADAR-base-Analytics/mockfeatures'
branch: main
feature_groups:
- MockFeatureGroup
feature_names:
- all

output_data:
output_location: local
local_directory: output/mockdata
output:
output_location: local # can be local, postgres, sftp
config:
target_path: output/mockdata
data_format: csv
compress: false

spark_config:
spark.executor.instances: 2
spark.driver.memory: 13G
46 changes: 46 additions & 0 deletions config.yaml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
project:
project_name: mock_project
description: mock_description
version: mock_version

input:
data_type: mock # couldbe mock, local, sftp, s3
config:
## In case of sftp, use the following format
# sftp_host:
# sftp_source_path:
# sftp_username:
# sftp_private_key:
# sftp_target_path:
## In case of s3, use the following format
# aws_access_key_id:
# aws_secret_access_key:
# region_name:
# s3_access_url:
# bucket:
# prefix:
## In case of local or Mock, use the following format
# source_path: mockdata/mockdata
data_format: csv

configurations:
df_type: 'pandas' # can be pandas or spark

features:
- location: 'https://github.com/RADAR-base-Analytics/mockfeatures'
branch: main
feature_groups:
- MockFeatureGroup
feature_names:
- all

output:
output_location: local # can be local, postgres, sftp
config:
target_path: output/mockdata
data_format: csv
compress: false

spark_config:
spark.executor.instances: 6
spark.driver.memory: 10G
File renamed without changes.
2 changes: 1 addition & 1 deletion mockdata
Submodule mockdata updated 3358 files
8 changes: 8 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,13 @@ exclude = '''
| buck-out
| build
| dist
| env
| venv
| .vscode
| .pytest_cache
| logs
| temp
| tmp
| output
)/
'''
37 changes: 37 additions & 0 deletions radarpipeline/common/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,40 @@
from pyspark.sql.types import (
BooleanType,
ByteType,
DoubleType,
FloatType,
IntegerType,
LongType,
ShortType,
StringType,
)

ENCODING = "utf-8"
LINESEP = "\n"
CSV_DELIMITER = ","

STRING_TYPE = StringType()
BOOLEAN_TYPE = BooleanType()
BYTE_TYPE = ByteType()
SHORT_TYPE = ShortType()
INT_TYPE = IntegerType()
LONG_TYPE = LongType()
FLOAT_TYPE = FloatType()
DOUBLE_TYPE = DoubleType()

DATA_TYPE_MAPPING = {
"byte": BYTE_TYPE,
"short": SHORT_TYPE,
"int": INT_TYPE,
"long": LONG_TYPE,
"float": FLOAT_TYPE,
"double": DOUBLE_TYPE,
"string": STRING_TYPE,
"char": STRING_TYPE,
"enum": STRING_TYPE,
"array": STRING_TYPE,
"boolean": BOOLEAN_TYPE,
}

INTEGER_TYPES = set([BYTE_TYPE, SHORT_TYPE, INT_TYPE, LONG_TYPE])
FLOATING_TYPES = set([FLOAT_TYPE, DOUBLE_TYPE])
2 changes: 1 addition & 1 deletion radarpipeline/common/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def logger_init(level: int = logging.INFO) -> None:
logger.setLevel(level)

# File handler
path = "log"
path = "logs"
year = str(datetime.now().year)
month = str(datetime.now().month)
filename = datetime.now().strftime("%Y%m%d") + "_logfile.log"
Expand Down
89 changes: 86 additions & 3 deletions radarpipeline/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@
from functools import reduce
from typing import Any, Dict, List
from urllib.parse import urlparse
import pathlib
from pathlib import Path

import pyspark.sql as ps
import requests
import yaml
from strictyaml import load, Map, Int, Str, Seq, Bool, Optional
from strictyaml import YAMLError, CommaSeparated, MapPattern

import ntpath
import posixpath

from radarpipeline.common import constants

Expand Down Expand Up @@ -36,10 +43,9 @@ def read_yaml(yaml_file_path: str) -> Dict[str, Any]:
raise ValueError("Input file is not a yaml file")
if os.stat(yaml_file_path).st_size == 0:
raise ValueError("Input file is empty")

schema = get_yaml_schema()
with open(yaml_file_path, "r", encoding=constants.ENCODING) as file:
config = yaml.load(file, Loader=yaml.FullLoader)

config = load(file.read(), schema).data
return config


Expand Down Expand Up @@ -102,6 +108,7 @@ def get_repo_name_from_url(url: str) -> str:
str
Name of the repository
"""
url = url.rstrip("/")
last_slash_index = url.rfind("/")
last_suffix_index = url.rfind(".git")
if last_suffix_index < 0:
Expand All @@ -128,3 +135,79 @@ def pascal_to_snake_case(s: str) -> str:
Converted string
"""
return "".join(["_" + i.lower() if i.isupper() else i for i in s]).lstrip("_")


def get_yaml_schema() -> Map:
schema = Map({
"project": Map({
"project_name": Str(),
Optional("description"): Str(),
Optional("version"): Str()
}),
"input": Map({
"data_type": Str(),
"config": MapPattern(Str(), Str()),
"data_format": Str()
}),
"configurations": Map({
"df_type": Str()
}),
"features": Seq(Map({
"location": Str(),
Optional("branch", default='main'): Str(),
"feature_groups": Seq(Str()),
"feature_names": Seq(CommaSeparated(Str()))
})),
"output": Map({
"output_location": Str(),
"config": MapPattern(Str(), Str()),
"data_format": Str(),
"compress": Bool()
}),
Optional("spark_config"): Map({
Optional("spark.executor.instances", default=4): Int(),
Optional("spark.executor.cores", default=4): Int(),
Optional("spark.executor.memory", default='10g'): Str(),
Optional("spark.driver.memory", default='15g'): Str(),
Optional("spark.memory.offHeap.enabled", default=True): Bool(),
Optional("spark.memory.offHeap.size", default='20g'): Str(),
Optional("spark.driver.maxResultSize", default='0'): Str(),
}),
})
return schema


def get_absolute_path(path: str) -> str:
"""
Returns the absolute path of the path
Parameters
----------
path: str
Path to be converted to absolute path
Returns
-------
str
Absolute path of the path
"""
if not os.path.isabs(path):
pipeline_dir = pathlib.Path(__file__).parent.parent.parent.resolve()
path = os.path.join(pipeline_dir, path)
return path


def reparent(newparent, oldpath):
'''when copying or moving a directory structure, you need to re-parent the
oldpath. When using os.path.join to calculate this new path, the
appearance of a / root path at the beginning of oldpath, supplants the
newparent and we don't want this to happen, so we need to make the oldpath
root appear as a child of the newparent.
:param: str newparent: the new parent location for oldpath (target)
:param str oldpath: the path being adopted by newparent (source)
:returns: (str) resulting adoptive path
'''

if oldpath[0] in (posixpath.sep, ntpath.sep):
oldpath = '.' + oldpath
return os.path.join(newparent, oldpath)
Loading

0 comments on commit 0e1f97d

Please sign in to comment.