Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minio integration: write intermediate data to minio, instead of storing #79

Draft
wants to merge 36 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
894a6bb
changes for activation to support etl
supradeep2819 Apr 17, 2024
d845bdc
Merge branch 'dev' of github.com:valmi-io/valmi-activation into dev
rajvarkala Apr 17, 2024
e06dc9e
Merge pull request #70 from valmi-io/main
supradeep2819 Apr 26, 2024
5d31e97
fix: add missing import to connector lib
gane5hvarma Apr 26, 2024
ba79b23
Merge branch 'dev' of github.com:valmi-io/valmi-activation into dev
rajvarkala Apr 28, 2024
80cd1fc
feat: add normalization and dbt step into dag (#71)
gane5hvarma Apr 30, 2024
a4999b8
feature: Added endpoint to retrive last run status
supradeep2819 May 2, 2024
ececb3a
feature: Added endpoint for getting last run status
supradeep2819 May 2, 2024
f82d3b9
Merge pull request #72 from valmi-io/latest_run_status
supradeep2819 May 2, 2024
e06bbc1
feat: Handled missed schema for last run status
supradeep2819 May 3, 2024
d758da6
fix: retriving last run ID in descending order
supradeep2819 May 3, 2024
a1ce573
fix: retriving last run ID in descending order
supradeep2819 May 3, 2024
682460e
Merge pull request #73 from valmi-io/latest_run_status
supradeep2819 May 3, 2024
1b56fcb
"feat: Enhance source postgres configuration to provide specific tabl…
supradeep2819 May 10, 2024
a909e5c
Merge pull request #74 from valmi-io/latest_run_status
supradeep2819 May 10, 2024
fb0d957
Merge branch 'dev' of github.com:valmi-io/valmi-activation into dev
rajvarkala May 10, 2024
f69dcd8
feat: changes needed for destination postgres to work
gane5hvarma May 14, 2024
374bf85
feat: Adding checkpointing to etl sources and destinations
rajvarkala May 15, 2024
b22b4a2
version up
rajvarkala May 15, 2024
d307b97
feat: for etl, batching source records only on seeing state messages
rajvarkala May 15, 2024
e61e1a4
feat: generating catalog when query is present postgres config
supradeep2819 May 16, 2024
79654bb
feat: multiple state messages stored for etl sources
rajvarkala May 16, 2024
dd210cf
Merge branch 'dev' of github.com:valmi-io/valmi-activation into dev
rajvarkala May 16, 2024
172195f
Fix finalizer step which got stuck because of transformation (#75)
gane5hvarma May 17, 2024
bad286b
feat: rm print statements and update version
gane5hvarma May 20, 2024
dea7d8d
feat: Add endpoint to fetch latest successful sync
supradeep2819 May 23, 2024
5ff5f39
feat: renamed timestamp with run_end_at in latest_run_status endpoint
supradeep2819 May 23, 2024
3197240
feat: Used schemas instead of dict in sync service
supradeep2819 May 28, 2024
5db346c
Merge pull request #76 from valmi-io/feat.last_successful_sync
supradeep2819 May 28, 2024
c3d109e
bug: metrics fix for etl
rajvarkala May 29, 2024
3bd3a8a
feat: source-postgres changes to support filtered queries
supradeep2819 Jun 6, 2024
0701a1a
feat: changes for source-postgres to support filter queries
supradeep2819 Jun 6, 2024
7894aff
Merge pull request #77 from valmi-io/feat.source_postgres
supradeep2819 Jun 6, 2024
2f89aa9
minio integration: write intermediate data to minio, instead of stori…
saurav-malani Jun 17, 2024
34233fa
minio integration: write intermediate data to minio, instead of stori…
saurav-malani Jun 17, 2024
54f2ebc
removed gzip part
saurav-malani Jun 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,5 @@ CONNECTOR_RUN_CONFIG:
records_per_metric: 100
POSTGRES:
records_per_metric: 100
SHOPIFY:
chunk_size: 300
4 changes: 2 additions & 2 deletions packages/valmi-connector-lib/.vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
"[python]": {
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true,
"source.fixAll": true
"source.organizeImports": "explicit",
"source.fixAll": "explicit"
}
}
}
6 changes: 3 additions & 3 deletions packages/valmi-connector-lib/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "valmi-connector-lib"
version = "0.1.113"
version = "0.1.159"
description = ""
authors = ["Rajashekar Varkala <[email protected]>"]
readme = "README.md"
Expand All @@ -9,7 +9,7 @@ packages = [{include = "valmi_connector_lib"}]
[tool.poetry.dependencies]
python = "^3.9"
requests = "^2.30.0"
pydantic = "1.9.2"
pydantic = "^1.9.2"
valmi-airbyte-cdk = "^0.30.3"

[tool.poetry.group.dev.dependencies]
Expand Down Expand Up @@ -49,7 +49,7 @@ max-complexity = 10
max-line-length = 120

[build-system]
requires = ["poetry-core"]
requires = ["poetry-core", "cython<3.0"]
build-backend = "poetry.core.masonry.api"

# Example configuration for Black.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from typing import Any, Dict

from valmi_connector_lib.common.logs import SingletonLogWriter, TimeAndChunkEndFlushPolicy
from valmi_connector_lib.common.samples import SampleWriter
from valmi_connector_lib.destination_wrapper.engine import CONNECTOR_STRING

from .proc_stdout_handler import ProcStdoutHandlerThread
Expand All @@ -53,7 +54,7 @@
def get_airbyte_command():
entrypoint_str = os.environ["VALMI_ENTRYPOINT"]
entrypoint = entrypoint_str.split(" ")

airbyte_command = sys.argv[3]
for i, arg in enumerate(sys.argv[1:]):
if i >= len(entrypoint):
Expand Down Expand Up @@ -165,30 +166,32 @@ def main():
engine.connector_state.run_time_args["sync_id"],
engine.connector_state.run_time_args["run_id"],
CONNECTOR_STRING)

#initialize SampleWriter
SampleWriter.get_writer_by_metric_type(store_config_str=os.environ["VALMI_INTERMEDIATE_STORE"],
sync_id=engine.connector_state.run_time_args["sync_id"],
run_id=engine.connector_state.run_time_args["run_id"],
connector=CONNECTOR_STRING)

# initialize handler
for key in handlers.keys():
handlers[key] = handlers[key](engine=engine, store_writer=None, stdout_writer=None)
# initialize SampleWriter
SampleWriter.get_writer_by_metric_type(store_config_str=os.environ["VALMI_INTERMEDIATE_STORE"],
sync_id=engine.connector_state.run_time_args["sync_id"],
run_id=engine.connector_state.run_time_args["run_id"],
connector=CONNECTOR_STRING)

global loaded_state
store_reader = StoreReader(engine=engine, state=loaded_state)

# initialize handler
for key in handlers.keys():
handlers[key] = handlers[key](engine=engine, store_writer=None,
stdout_writer=None, store_reader=store_reader)

# create the subprocess
subprocess_args = sys.argv[1:]

# HACK: Remove destination_catalog command line argument when working with etl destination
# For ETL, there is no concept of destination catalog
if os.environ.get('MODE', 'any') == 'etl' and "--destination_catalog" in subprocess_args:
arg_idx = subprocess_args.index("--destination_catalog")
subprocess_args.remove("--destination_catalog")
subprocess_args.pop(arg_idx)

if is_state_available():
# For ETL, internal connectors do not need any state information
if os.environ.get('MODE', 'any') != 'etl' and is_state_available():
subprocess_args.append("--state")
subprocess_args.append(state_file_path)
proc = subprocess.Popen(subprocess_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
Expand All @@ -202,8 +205,6 @@ def main():
record_types = handlers.keys()

for line in store_reader.read():
print("Reading")
print(line)
if line.strip() == "":
continue
json_record = json.loads(line)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ def finalise_message_handling(self) -> HandlerResponseData:

def read_chunk_id_checkpoint(self):
if self.previous_state is not None \
and 'state' in self.previous_state \
and 'data' in self.previous_state['state'] \
and 'chunk_id' in self.previous_state['state']['data']:
return self.previous_state['state']['data']['chunk_id'] + 1
and '_valmi_meta' in self.previous_state \
and 'chunk_id' in self.previous_state['_valmi_meta']:
return self.previous_state['_valmi_meta']['chunk_id'] + 1
return 1

def start_message_handling(self, input_messages: Iterable[AirbyteMessage]) -> AirbyteMessage:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ def __init__(self, *args, **kwargs):
self.connector_state = ConnectorState(run_time_args=run_time_args)

def current_run_details(self):
sync_id = du(os.environ.get("DAGSTER_RUN_JOB_NAME", "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"))
sync_id = du(os.environ.get("DAGSTER_RUN_JOB_NAME", "cf280e5c-1184-4052-b089-f9f41b25138e"))
# sync_id = du(os.environ.get("DAGSTER_RUN_JOB_NAME", "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"))
r = self.session_with_retries.get(
f"{self.engine_url}/syncs/{sync_id}/runs/current_run_details/{CONNECTOR_STRING}",
timeout=HTTP_TIMEOUT,
Expand Down Expand Up @@ -198,7 +199,7 @@ def checkpoint(self, state):
sync_id = self.connector_state.run_time_args["sync_id"]
run_id = self.connector_state.run_time_args["run_id"]
r = self.session_with_retries.post(
f"{self.engine_url}/syncs/{sync_id}/runs/{run_id}/state/{CONNECTOR_STRING}/",
f"{self.engine_url}/syncs/{sync_id}/runs/{run_id}/state/{CONNECTOR_STRING}/{os.environ.get('MODE', 'any')}",
timeout=HTTP_TIMEOUT,
json=state,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,17 @@ def __init__(self, engine: NullEngine, state: str) -> None:
self.engine = engine
self.connector_state: ConnectorState = self.engine.connector_state
self.loaded_state = state
self.current_file_name = None

store_config = json.loads(os.environ["VALMI_INTERMEDIATE_STORE"])
if store_config["provider"] == "local":
path_name = join(store_config["local"]["directory"], self.connector_state.run_time_args["run_id"], "data")
os.makedirs(path_name, exist_ok=True)
self.path_name = path_name
self.last_handled_fn = self.get_file_name_from_chunk_id(self.read_chunk_id_checkpoint())
if os.environ.get('MODE', 'any') == 'etl':
self.last_handled_fn = self.read_file_marker_from_checkpoint()
else:
self.last_handled_fn = self.get_file_name_from_chunk_id(self.read_chunk_id_checkpoint())

def read(self):
while True:
Expand All @@ -76,6 +80,7 @@ def read(self):
if self.last_handled_fn is not None and int(fn[:-5]) <= int(self.last_handled_fn[:-5]):
continue
if fn.endswith(".vald"):
self.current_file_name = fn
with open(join(self.path_name, fn), "r") as f:
for line in f.readlines():
# print("yiedling", line)
Expand All @@ -101,10 +106,16 @@ def read(self):
def read_chunk_id_checkpoint(self):
# TODO: connector_state is not being used for destination, clean it up.
if self.loaded_state is not None \
and 'state' in self.loaded_state \
and 'data' in self.loaded_state['state'] \
and 'chunk_id' in self.loaded_state['state']['data']:
return self.loaded_state['state']['data']['chunk_id']
and '_valmi_meta' in self.loaded_state \
and 'chunk_id' in self.loaded_state['_valmi_meta']:
return self.loaded_state['_valmi_meta']['chunk_id']
return None

def read_file_marker_from_checkpoint(self):
if self.loaded_state is not None \
and '_valmi_meta' in self.loaded_state \
and 'file_marker' in self.loaded_state['_valmi_meta']:
return self.loaded_state["_valmi_meta"]["file_marker"]
return None

def get_file_name_from_chunk_id(self, chunk_id):
Expand Down Expand Up @@ -154,30 +165,33 @@ def handle(self, record) -> bool:
print("Checkpoint seen")
print(record)

records_delivered = record["state"]["data"]["records_delivered"]
finished = record["state"]["data"]["finished"]
commit_state = record["state"]["data"]["commit_state"]
commit_metric = record["state"]["data"]["commit_metric"]

total_records = 0
for k, v in records_delivered.items():
total_records += v

self.engine.connector_state.register_records(total_records)

if commit_metric:
self.engine.metric_ext(records_delivered, record["state"]["data"]["chunk_id"], commit=True)
# self.engine.connector_state.register_chunk()
if commit_state:
self.engine.checkpoint(record)
if SingletonLogWriter.instance() is not None:
SingletonLogWriter.instance().data_chunk_flush_callback()
SampleWriter.data_chunk_flush_callback()
else:
if SingletonLogWriter.instance() is not None:
SingletonLogWriter.instance().check_for_flush()

return True
if os.environ.get('MODE', 'any') == 'etl':
return True
else :
records_delivered = record["state"]["data"]["records_delivered"]
finished = record["state"]["data"]["finished"]
commit_state = record["state"]["data"]["commit_state"]
commit_metric = record["state"]["data"]["commit_metric"]

total_records = 0
for k, v in records_delivered.items():
total_records += v

self.engine.connector_state.register_records(total_records)

if commit_metric:
self.engine.metric_ext(records_delivered, record["state"]["data"]["chunk_id"], commit=True)
# self.engine.connector_state.register_chunk()
if commit_state:
self.engine.checkpoint(record["state"])
if SingletonLogWriter.instance() is not None:
SingletonLogWriter.instance().data_chunk_flush_callback()
SampleWriter.data_chunk_flush_callback()
else:
if SingletonLogWriter.instance() is not None:
SingletonLogWriter.instance().check_for_flush()

return True


class RecordHandler(DefaultHandler):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def run(self) -> None:
ret_val = handlers[json_record["type"]].handle(json_record)
if ret_val is False: # TODO: comes from ERROR Trace, should be handled cleanly
self.proc.kill()
os._exit(0) # error is already logged with engine in the handler
os._exit(1) # error is already logged with engine in the handler

# stdout finished. clean close
self.exit_flag = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
StoreReader,
StdoutWriter,
)
import os


class ReadDefaultHandler:
Expand Down Expand Up @@ -62,7 +63,12 @@ def __init__(self, *args, **kwargs):
super(ReadCheckpointHandler, self).__init__(*args, **kwargs)

def handle(self, record) -> bool:
# do an engine call to proceed.
# For ETL, we store the checkpoint for the reader instead of the destination stdout state,
# because state is dictated by the source.
if os.environ.get('MODE', 'any') == 'etl':
_valmi_meta = {"file_marker": self.store_reader.current_file_name}
record["state"]["_valmi_meta"] = _valmi_meta
self.engine.checkpoint(record["state"])
return True


Expand Down
Loading