From 894a6bbb84c772ded2498dbf18115aec3216ab9a Mon Sep 17 00:00:00 2001 From: supradeep2819 Date: Wed, 17 Apr 2024 11:36:38 +0530 Subject: [PATCH 01/26] changes for activation to support etl --- config.yaml | 2 + .../valmi-connector-lib/.vscode/settings.json | 4 +- packages/valmi-connector-lib/pyproject.toml | 6 +-- .../destination_container_wrapper.py | 2 +- .../destination_wrapper/engine.py | 3 +- .../proc_stdout_event_handlers.py | 51 ++++++++++--------- .../source_container_wrapper.py | 5 ++ secrets/data.txt | 0 secrets/run.py | 3 ++ src/api/routers/syncs.py | 39 ++++++++++++-- src/metrics/metric_store.py | 22 ++------ valmi-app | 2 +- valmi-app-backend | 2 +- 13 files changed, 87 insertions(+), 54 deletions(-) create mode 100644 secrets/data.txt create mode 100644 secrets/run.py diff --git a/config.yaml b/config.yaml index 6428fee9..31a4c43b 100644 --- a/config.yaml +++ b/config.yaml @@ -70,3 +70,5 @@ CONNECTOR_RUN_CONFIG: records_per_metric: 100 POSTGRES: records_per_metric: 100 + SHOPIFY: + chunk_size: 1 diff --git a/packages/valmi-connector-lib/.vscode/settings.json b/packages/valmi-connector-lib/.vscode/settings.json index 04b84a39..995f0956 100644 --- a/packages/valmi-connector-lib/.vscode/settings.json +++ b/packages/valmi-connector-lib/.vscode/settings.json @@ -12,8 +12,8 @@ "[python]": { "editor.formatOnSave": true, "editor.codeActionsOnSave": { - "source.organizeImports": true, - "source.fixAll": true + "source.organizeImports": "explicit", + "source.fixAll": "explicit" } } } diff --git a/packages/valmi-connector-lib/pyproject.toml b/packages/valmi-connector-lib/pyproject.toml index d0f1200c..0e5b9cf3 100644 --- a/packages/valmi-connector-lib/pyproject.toml +++ b/packages/valmi-connector-lib/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "valmi-connector-lib" -version = "0.1.113" +version = "0.1.138" description = "" authors = ["Rajashekar Varkala "] readme = "README.md" @@ -9,7 +9,7 @@ packages = [{include = "valmi_connector_lib"}] [tool.poetry.dependencies] python = "^3.9" requests = "^2.30.0" -pydantic = "1.9.2" +pydantic = "^1.9.2" valmi-airbyte-cdk = "^0.30.3" [tool.poetry.group.dev.dependencies] @@ -49,7 +49,7 @@ max-complexity = 10 max-line-length = 120 [build-system] -requires = ["poetry-core"] +requires = ["poetry-core", "cython<3.0"] build-backend = "poetry.core.masonry.api" # Example configuration for Black. diff --git a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py index f7240e18..a4f3fbad 100755 --- a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py @@ -54,7 +54,7 @@ def get_airbyte_command(): entrypoint_str = os.environ["VALMI_ENTRYPOINT"] entrypoint = entrypoint_str.split(" ") - + airbyte_command = sys.argv[3] for i, arg in enumerate(sys.argv[1:]): if i >= len(entrypoint): diff --git a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/engine.py b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/engine.py index 61b7a770..130172cd 100644 --- a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/engine.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/engine.py @@ -110,7 +110,8 @@ def __init__(self, *args, **kwargs): self.connector_state = ConnectorState(run_time_args=run_time_args) def current_run_details(self): - sync_id = du(os.environ.get("DAGSTER_RUN_JOB_NAME", "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa")) + sync_id = du(os.environ.get("DAGSTER_RUN_JOB_NAME", "cf280e5c-1184-4052-b089-f9f41b25138e")) + # sync_id = du(os.environ.get("DAGSTER_RUN_JOB_NAME", "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa")) r = self.session_with_retries.get( f"{self.engine_url}/syncs/{sync_id}/runs/current_run_details/{CONNECTOR_STRING}", timeout=HTTP_TIMEOUT, diff --git a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_event_handlers.py b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_event_handlers.py index 4a99b639..46879bfa 100644 --- a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_event_handlers.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_event_handlers.py @@ -154,30 +154,33 @@ def handle(self, record) -> bool: print("Checkpoint seen") print(record) - records_delivered = record["state"]["data"]["records_delivered"] - finished = record["state"]["data"]["finished"] - commit_state = record["state"]["data"]["commit_state"] - commit_metric = record["state"]["data"]["commit_metric"] - - total_records = 0 - for k, v in records_delivered.items(): - total_records += v - - self.engine.connector_state.register_records(total_records) - - if commit_metric: - self.engine.metric_ext(records_delivered, record["state"]["data"]["chunk_id"], commit=True) - # self.engine.connector_state.register_chunk() - if commit_state: - self.engine.checkpoint(record) - if SingletonLogWriter.instance() is not None: - SingletonLogWriter.instance().data_chunk_flush_callback() - SampleWriter.data_chunk_flush_callback() - else: - if SingletonLogWriter.instance() is not None: - SingletonLogWriter.instance().check_for_flush() - - return True + if os.environ.get('MODE', 'any') == 'etl': + return True + else : + records_delivered = record["state"]["data"]["records_delivered"] + finished = record["state"]["data"]["finished"] + commit_state = record["state"]["data"]["commit_state"] + commit_metric = record["state"]["data"]["commit_metric"] + + total_records = 0 + for k, v in records_delivered.items(): + total_records += v + + self.engine.connector_state.register_records(total_records) + + if commit_metric: + self.engine.metric_ext(records_delivered, record["state"]["data"]["chunk_id"], commit=True) + # self.engine.connector_state.register_chunk() + if commit_state: + self.engine.checkpoint(record) + if SingletonLogWriter.instance() is not None: + SingletonLogWriter.instance().data_chunk_flush_callback() + SampleWriter.data_chunk_flush_callback() + else: + if SingletonLogWriter.instance() is not None: + SingletonLogWriter.instance().check_for_flush() + + return True class RecordHandler(DefaultHandler): diff --git a/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py b/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py index 7c218c39..71b433b4 100755 --- a/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py @@ -294,6 +294,11 @@ def __init__(self, *args, **kwargs): def handle(self, record): print(json.dumps(record)) + + if os.environ.get('MODE', 'any') == 'etl': + record['state']['data']['chunk_id']= self.engine.connector_state.num_chunks - 1 # Not oprating on chunk boundary -- fix + self.store_writer.write(record) + self.engine.checkpoint(record) if SingletonLogWriter.instance() is not None: SingletonLogWriter.instance().data_chunk_flush_callback() diff --git a/secrets/data.txt b/secrets/data.txt new file mode 100644 index 00000000..e69de29b diff --git a/secrets/run.py b/secrets/run.py new file mode 100644 index 00000000..221b7fbd --- /dev/null +++ b/secrets/run.py @@ -0,0 +1,3 @@ +with open("/Users/raj/valmi/valmi-activation/secrets/data.txt", "a") as f: + for i in range(5000): + f.write('{"type": "RECORD", "record": {"stream": "products", "data": {"id": 8336347070716, "title": "short sleeve t-shirt", "body_html": "comfort to wear in summer.", "vendor": "chitumalla-store", "product_type": "", "created_at": "2024-03-13T03:08:30-04:00", "handle": "short-sleeve-t-shirt", "updated_at": "2024-03-28T02:12:58-04:00", "published_at": "2024-03-13T03:08:30-04:00", "template_suffix": "", "published_scope": "global", "tags": "", "status": "active", "admin_graphql_api_id": "gid://shopify/Product/8336347070716", "variants": [{"id": 45202861424892, "product_id": 8336347070716, "title": "Default Title", "price": 199.0, "sku": "22", "position": 2, "inventory_policy": "deny", "compare_at_price": 250.0, "fulfillment_service": "manual", "inventory_management": "shopify", "option1": "Default Title", "option2": null, "option3": null, "created_at": "2024-03-13T03:08:31-04:00", "updated_at": "2024-03-27T06:38:06-04:00", "taxable": true, "barcode": "223432344211", "grams": 0, "weight": 0.0, "weight_unit": "kg", "inventory_item_id": 47297495302396, "inventory_quantity": 18, "old_inventory_quantity": 18, "requires_shipping": true, "admin_graphql_api_id": "gid://shopify/ProductVariant/45202861424892", "image_id": null}], "options": [{"id": 10652504916220, "product_id": 8336347070716, "name": "Title", "position": 1, "values": ["Default Title"]}], "images": [{"id": 40627989840124, "alt": null, "position": 1, "product_id": 8336347070716, "created_at": "2024-03-13T03:11:27-04:00", "updated_at": "2024-03-13T03:11:28-04:00", "admin_graphql_api_id": "gid://shopify/ProductImage/40627989840124", "width": 267, "height": 148, "src": "https://cdn.shopify.com/s/files/1/0686/8726/6044/files/download.jpg?v=1710313888", "variant_ids": []}], "image": {"id": 40627989840124, "alt": null, "position": 1, "product_id": 8336347070716, "created_at": "2024-03-13T03:11:27-04:00", "updated_at": "2024-03-13T03:11:28-04:00", "admin_graphql_api_id": "gid://shopify/ProductImage/40627989840124", "width": 267, "height": 148, "src": "https://cdn.shopify.com/s/files/1/0686/8726/6044/files/download.jpg?v=1710313888", "variant_ids": []}, "shop_url": "chitumalla-store"}, "emitted_at": 1712674949666, "rejected": false, "metric_type": "succeeded"}}') diff --git a/src/api/routers/syncs.py b/src/api/routers/syncs.py index 4d00b35b..283aa808 100644 --- a/src/api/routers/syncs.py +++ b/src/api/routers/syncs.py @@ -90,6 +90,19 @@ async def get_current_run_details_for_connector_string( sync_service: SyncsService = Depends(get_syncs_service), sync_runs_service: SyncRunsService = Depends(get_sync_runs_service), ) -> SyncCurrentRunArgs: + + # Hardcoded for testing + if str(sync_id) == "cf280e5c-1184-4052-b089-f9f41b25138e": + return SyncCurrentRunArgs(**{ + "sync_id": sync_id, + "run_id": sync_id, + "chunk_size": 300, + "chunk_id": 0, + "records_per_metric": 10, + "previous_run_status": "success", + "full_refresh": False, + }) + sync_schedule = sync_service.get(sync_id) runs = sync_runs_service.get_runs(sync_id, datetime.now(), 2) previous_run = runs[1] if len(runs) > 1 else None @@ -99,10 +112,15 @@ async def get_current_run_details_for_connector_string( ) # TODO: Have to find a better way instead of so many refreshes # Get connector run config - dst_connector_type = "_".join(sync_schedule.dst_connector_type.split('_')[1:]) + connector_type = "" + if connector_string == "src": + connector_type = "_".join(sync_schedule.src_connector_type.split('_')[1:]) + else: + connector_type = "_".join(sync_schedule.dst_connector_type.split('_')[1:]) + connector_run_config = {} - if dst_connector_type in v.get("CONNECTOR_RUN_CONFIG"): - connector_run_config = v.get("CONNECTOR_RUN_CONFIG")[dst_connector_type] + if connector_type in v.get("CONNECTOR_RUN_CONFIG"): + connector_run_config = v.get("CONNECTOR_RUN_CONFIG")[connector_type] # TODO: get saved checkpoint state of the run_id & create column run_time_args in the sync_runs table to get repeatable runs @@ -134,7 +152,7 @@ async def get_current_run_details_for_connector_string( run_args["state"] = current_run.extra[connector_string]['state']['state'] return SyncCurrentRunArgs(**run_args) - + @router.get("/{sync_id}/runs/current_run_details", response_model=SyncCurrentRunArgs) async def get_current_run_details( @@ -151,6 +169,11 @@ async def synchronize_connector( run_id: UUID4, sync_runs_service: SyncRunsService = Depends(get_sync_runs_service), ) -> ConnectorSynchronization: + + # Hardcoded for testing + if str(sync_id) == "cf280e5c-1184-4052-b089-f9f41b25138e": + return ConnectorSynchronization(abort_required=False) + run = sync_runs_service.get(run_id) abort_required = False @@ -178,6 +201,10 @@ async def state( state: Dict, sync_runs_service: SyncRunsService = Depends(get_sync_runs_service), ) -> GenericResponse: + # Hardcoded for testing + if str(sync_id) == "cf280e5c-1184-4052-b089-f9f41b25138e": + return GenericResponse() + sync_runs_service.save_state(sync_id, run_id, connector_string, state) return GenericResponse() @@ -190,6 +217,10 @@ async def status( status: Dict, sync_runs_service: SyncRunsService = Depends(get_sync_runs_service), ) -> GenericResponse: + # Hardcoded for testing + if str(sync_id) == "cf280e5c-1184-4052-b089-f9f41b25138e": + return GenericResponse() + sync_runs_service.save_status(sync_id, run_id, connector_string, status) return GenericResponse() diff --git a/src/metrics/metric_store.py b/src/metrics/metric_store.py index 481593b7..750f934f 100644 --- a/src/metrics/metric_store.py +++ b/src/metrics/metric_store.py @@ -52,27 +52,15 @@ def __new__(cls, delete_db=False, *args, **kwargs) -> object: def __init__(self, delete_db=False, *args, **kwargs) -> None: if Metrics.__initialized: return - Metrics.__initialized = True self.con = duckdb.connect(DB_NAME) - - metric_table_found = False if delete_db: self.con.execute(f"DROP TABLE IF EXISTS {METRICS_TABLE}") - else: - self.con.execute("SHOW TABLES") - tables = self.con.fetchall() - - for table in tables: - if table[0] == METRICS_TABLE: - metric_table_found = True - - if not metric_table_found: - self.con.sql( - f"CREATE TABLE {METRICS_TABLE} (sync_id VARCHAR, connector_id VARCHAR, run_id VARCHAR, \ - chunk_id BIGINT, metric_type VARCHAR,\ - count BIGINT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)" - ) + self.con.sql( + f"CREATE TABLE IF NOT EXISTS {METRICS_TABLE} (sync_id VARCHAR, connector_id VARCHAR, run_id VARCHAR, \ + chunk_id BIGINT, metric_type VARCHAR,\ + count BIGINT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)" + ) def clear_metrics(self, sync_id: UUID4, run_id: UUID4) -> None: try: diff --git a/valmi-app b/valmi-app index 3bd0dcd9..faba0ac4 160000 --- a/valmi-app +++ b/valmi-app @@ -1 +1 @@ -Subproject commit 3bd0dcd907beac71dc734ede5c199757f9a4538f +Subproject commit faba0ac407bdb8146fd128a4a4c677d5b0adea96 diff --git a/valmi-app-backend b/valmi-app-backend index 8a8a37b1..65f115ee 160000 --- a/valmi-app-backend +++ b/valmi-app-backend @@ -1 +1 @@ -Subproject commit 8a8a37b18743b71c98b1cc5f60707918130cf0a8 +Subproject commit 65f115ee4739ab1bc2246ddc0fe7f67508ac3e7f From 5d31e97cf56f7e7c0102f7bd329368bf44fd3724 Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Fri, 26 Apr 2024 17:17:23 +0530 Subject: [PATCH 02/26] fix: add missing import to connector lib --- config.yaml | 2 +- packages/valmi-connector-lib/pyproject.toml | 2 +- .../destination_container_wrapper.py | 17 ++++++++--------- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/config.yaml b/config.yaml index 31a4c43b..74ec0d58 100644 --- a/config.yaml +++ b/config.yaml @@ -71,4 +71,4 @@ CONNECTOR_RUN_CONFIG: POSTGRES: records_per_metric: 100 SHOPIFY: - chunk_size: 1 + chunk_size: 100 diff --git a/packages/valmi-connector-lib/pyproject.toml b/packages/valmi-connector-lib/pyproject.toml index 0e5b9cf3..91a19aaa 100644 --- a/packages/valmi-connector-lib/pyproject.toml +++ b/packages/valmi-connector-lib/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "valmi-connector-lib" -version = "0.1.138" +version = "0.1.149" description = "" authors = ["Rajashekar Varkala "] readme = "README.md" diff --git a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py index e87d7494..130cac2a 100755 --- a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py @@ -23,24 +23,23 @@ SOFTWARE. """ +import io import json import os -import sys import subprocess -import io +import sys from typing import Any, Dict -from valmi_connector_lib.common.logs import SingletonLogWriter, TimeAndChunkEndFlushPolicy +from valmi_connector_lib.common.logs import (SingletonLogWriter, + TimeAndChunkEndFlushPolicy) +from valmi_connector_lib.common.samples import SampleWriter from valmi_connector_lib.destination_wrapper.engine import CONNECTOR_STRING +from .proc_stdout_event_handlers import Engine, NullEngine, StoreReader from .proc_stdout_handler import ProcStdoutHandlerThread -from .proc_stdout_event_handlers import ( - Engine, - StoreReader, - NullEngine, -) -from .read_handlers import ReadCheckpointHandler, ReadDefaultHandler, ReadLogHandler, ReadRecordHandler from .proc_stdout_handler import handlers as stdout_handlers +from .read_handlers import (ReadCheckpointHandler, ReadDefaultHandler, + ReadLogHandler, ReadRecordHandler) handlers = { "LOG": ReadLogHandler, From 80cd1fcd10bb3af1d873fbc35c564c498a5a8809 Mon Sep 17 00:00:00 2001 From: Ganesh varma Date: Tue, 30 Apr 2024 16:02:37 +0530 Subject: [PATCH 03/26] feat: add normalization and dbt step into dag (#71) * feat: add normalization and dbt step into dag --- src/orchestrator/job_generator.py | 6 +- .../templates/shopify_template.jinja | 201 ++++++++++++++++++ 2 files changed, 205 insertions(+), 2 deletions(-) create mode 100644 src/orchestrator/templates/shopify_template.jinja diff --git a/src/orchestrator/job_generator.py b/src/orchestrator/job_generator.py index 85d9617a..e71b5e81 100644 --- a/src/orchestrator/job_generator.py +++ b/src/orchestrator/job_generator.py @@ -182,8 +182,10 @@ def gen_job_file(self, dirs: dict[str, str], sync: Json[any]) -> None: file_loader = FileSystemLoader(join(dirname(__file__), "templates")) env = Environment(loader=file_loader) - template = env.get_template("job_template.jinja") - + template_name = "job_template.jinja" + if sync["source"]["credential"]["connector_type"] == "SRC_SHOPIFY": + template_name = "shopify_template.jinja" + template = env.get_template(template_name) output = template.render(sync=sync, app=v.get("APP"), prefix=SHARED_DIR) with open(join(dirs[GENERATED_DIR], f"{sync['id'].replace('-','_')}.py"), "w") as f: f.write(output) diff --git a/src/orchestrator/templates/shopify_template.jinja b/src/orchestrator/templates/shopify_template.jinja new file mode 100644 index 00000000..bc46d9cb --- /dev/null +++ b/src/orchestrator/templates/shopify_template.jinja @@ -0,0 +1,201 @@ +""" +Copyright (c) 2024 valmi.io + +Created Date: Wednesday, March 8th 2023, 11:45:42 pm +Author: Rajashekar Varkala @ valmi.io + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +import os + +from dagster_docker import docker_container_op, execute_docker_container + +from dagster import DefaultScheduleStatus, ScheduleDefinition, graph, op, RetryPolicy, Backoff, Jitter +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry +from dagster import run_status_sensor, RunStatusSensorContext, DagsterRunStatus, JobSelector + + +@op(name="initialise_{{ sync['id'].replace('-','_') }}") +def initialise(): + pass + +source_op = docker_container_op.configured( + { + "image": "{{ sync['source']['credential']['docker_image'] }}:{{ sync['source']['credential']['docker_tag'] }}", + "command": [ + "read", + "--config", + "/tmp/config.json", + "--catalog", + "/tmp/configured_catalog.json", + ], + "container_kwargs": { # keyword args to be passed to the container. example: + "volumes": ["{{ prefix }}/intermediate_store:{{ prefix }}/intermediate_store","{{ prefix }}/{{ app }}/repo/config/{{ sync['id'] }}-{{ sync['source']['id'] }}.json:/tmp/config.json" , "{{prefix}}/{{app}}/repo/catalog/{{sync['id']}}-{{ sync['source']['id'] }}.json:/tmp/configured_catalog.json"], + }, + "env_vars": list({**os.environ}.keys()), + }, + name="source_op_{{ sync['id'].replace('-','_') }}", + +) +destination_op = docker_container_op.configured( + { + "image": "{{ sync['destination']['credential']['docker_image'] }}:{{ sync['destination']['credential']['docker_tag'] }}", + "command": ["write", + "--config", + "/tmp/config.json", + "--catalog", + "/tmp/configured_source_catalog.json", + "--destination_catalog", + "/tmp/configured_catalog.json"], + "container_kwargs": { # keyword args to be passed to the container. example: + "volumes": ["{{ prefix }}/intermediate_store:{{ prefix }}/intermediate_store","{{ prefix }}/{{ app }}/repo/config/{{sync['id']}}-{{ sync['destination']['id'] }}.json:/tmp/config.json" , "{{prefix}}/{{app}}/repo/catalog/{{sync['id']}}-{{ sync['source']['id'] }}.json:/tmp/configured_source_catalog.json", "{{prefix}}/{{app}}/repo/catalog/{{sync['id']}}-{{ sync['destination']['id'] }}.json:/tmp/configured_catalog.json"], + }, + "env_vars": list({**os.environ}.keys()), + }, + name="destination_op_{{ sync['id'].replace('-','_') }}" +) + +@op( + name="normalization_op_{{ sync['id'].replace('-','_') }}", + retry_policy=RetryPolicy( + max_retries=100, + delay=2, # 2s + backoff=Backoff.EXPONENTIAL, + jitter=Jitter.PLUS_MINUS, + ), +) +def normalization_op(context, a, b): + execute_docker_container( + context, + image = "airbyte/normalization:latest", + command = [ + "run", + "--integration-type", + "postgres", + "--config", + "/tmp/config.json", + "--catalog", + "/tmp/configured_catalog.json" + ], + container_kwargs = { # keyword args to be passed to the container. example: + "volumes": ["{{ prefix }}/intermediate_store:{{ prefix }}/intermediate_store","{{ prefix }}/{{ app }}/repo/config/{{sync['id']}}-{{ sync['destination']['id'] }}.json:/tmp/config.json" , "{{prefix}}/{{app}}/repo/catalog/{{sync['id']}}-{{ sync['source']['id'] }}.json:/tmp/configured_source_catalog.json", "{{prefix}}/{{app}}/repo/catalog/{{sync['id']}}-{{ sync['destination']['id'] }}.json:/tmp/configured_catalog.json"], + }, + env_vars= list({**os.environ}.keys()), + ) + +@op( + name="transformation_po_op_{{ sync['id'].replace('-','_') }}", + retry_policy=RetryPolicy( + max_retries=100, + delay=2, # 2s + backoff=Backoff.EXPONENTIAL, + jitter=Jitter.PLUS_MINUS, + ), +) +def transformation_po_op(context, c) -> None: + execute_docker_container( + context, + image = "valmiio/transform-po:latest", + command = [ + "--config-file", + "/tmp/config.json", + ], + container_kwargs = { # keyword args to be passed to the container. example: + "volumes": ["{{ prefix }}/intermediate_store:{{ prefix }}/intermediate_store","{{ prefix }}/{{ app }}/repo/config/{{sync['id']}}-{{ sync['destination']['id'] }}.json:/tmp/config.json" ], + }, + env_vars= list({**os.environ}.keys()), + ) + + + + +@op(name="finalizer_{{ sync['id'].replace('-','_') }}" , retry_policy=RetryPolicy( + max_retries=100, + delay=2, # 2s + backoff=Backoff.EXPONENTIAL, + jitter=Jitter.PLUS_MINUS, + )) +def finalizer(context,d) -> None: + context.log.info("finalizer") + finalise_this_run() + + +def job(): + @graph(name="graph_{{ sync['id'].replace('-','_') }}") + def sync_graph(): + i = initialise() + a = source_op.with_retry_policy( + RetryPolicy( + max_retries=100, + delay=2, # 2s + backoff=Backoff.EXPONENTIAL, + jitter=Jitter.PLUS_MINUS, + ) + )(i) + + b = destination_op.with_retry_policy( + RetryPolicy( + max_retries=100, + delay=2, # 2s + backoff=Backoff.EXPONENTIAL, + jitter=Jitter.PLUS_MINUS, + ) + )(i) + c = normalization_op(a,b) + d = transformation_po_op(c) + finalizer(d) + + return sync_graph.to_job(name="{{ sync['id'].replace('-','_') }}") + + +def schedule(): + return ScheduleDefinition( + cron_schedule="0 0 1 1 *", + job=job(), + default_status=DefaultScheduleStatus.STOPPED, + execution_timezone="US/Central", + ) + + +@run_status_sensor(name="canceled_sensor_{{ sync['id'].replace('-','_') }}", run_status=DagsterRunStatus.CANCELED, monitored_jobs = [JobSelector(location_name="valmi-repo",repository_name="valmi-repo",job_name="{{ sync['id'].replace('-','_') }}" )]) +def finalise_on_run_canceled(context: RunStatusSensorContext): + context.log.info("finalizer on run cancel") + finalise_this_run() + + +@run_status_sensor(name="failure_sensor_{{ sync['id'].replace('-','_') }}", run_status=DagsterRunStatus.FAILURE, monitored_jobs = [JobSelector(location_name="valmi-repo",repository_name="valmi_repo",job_name="{{ sync['id'].replace('-','_') }}" )] ) +def finalise_on_run_failure(context: RunStatusSensorContext): + context.log.info("finalizer on run failure") + finalise_this_run() + + +def finalise_this_run(): + activation_url = os.environ["ACTIVATION_ENGINE_URL"] + session = requests.Session() + retry = Retry(connect=5, backoff_factor=5) + adapter = HTTPAdapter(max_retries=retry) + session.mount('http://', adapter) + session.mount('https://', adapter) + + sync_id = os.environ["DAGSTER_RUN_JOB_NAME"].replace("_","-") + response = session.get(f"{activation_url}/syncs/{sync_id}/runs/finalise_last_run") + response.raise_for_status() \ No newline at end of file From a4999b8f6a8586b127ef621373ed9f42159c6cc2 Mon Sep 17 00:00:00 2001 From: supradeep2819 Date: Thu, 2 May 2024 18:05:06 +0530 Subject: [PATCH 04/26] feature: Added endpoint to retrive last run status --- src/api/routers/syncs.py | 9 ++++++++- src/api/services/sync_runs.py | 15 +++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/api/routers/syncs.py b/src/api/routers/syncs.py index 283aa808..2141836d 100644 --- a/src/api/routers/syncs.py +++ b/src/api/routers/syncs.py @@ -31,6 +31,7 @@ from datetime import datetime from typing import Any, Dict, List, Optional +from api.schemas.sync_status import LastSyncStatus from fastapi import Depends from fastapi.routing import APIRouter @@ -73,7 +74,7 @@ # Create /{sync_id}/runs api counter -activation_sync_runs_api_counter = meter.create_counter("activation_sync_runs_api_counter") +activation_sync_runs_api_counter = meter.create_counter("activation_sync_runs_api_counter") @router.get("/", response_model=List[SyncSchedule]) @@ -392,3 +393,9 @@ async def get_samples( sample_handling_service.add_sample_retriever_task( sample_retriever_task=sample_retriever_task) return await sample_handling_service.read_sample_retriever_data(sample_retriever_task=sample_retriever_task) + + +@router.get("/{sync_id}/status", response_model=str) +def get_run_status(sync_id, + sync_runs_service: SyncRunsService = Depends(get_sync_runs_service)): + return sync_runs_service.last_run_status(sync_id) \ No newline at end of file diff --git a/src/api/services/sync_runs.py b/src/api/services/sync_runs.py index a7e0c7b2..572e4b36 100644 --- a/src/api/services/sync_runs.py +++ b/src/api/services/sync_runs.py @@ -23,6 +23,7 @@ SOFTWARE. """ from datetime import datetime +from api.schemas.sync_status import LastSyncStatus from pydantic import UUID4 from sqlalchemy import and_, or_ from sqlalchemy.orm import Session @@ -34,6 +35,8 @@ from metastore.models import SyncStatus from .base import BaseService from sqlalchemy.orm.attributes import flag_modified +import logging +logger = logging.getLogger(__name__) class SyncRunsService(BaseService[SyncRun, SyncRunCreate, Any]): @@ -106,3 +109,15 @@ def update_sync_run_extra_data(self, run_id, connector_string, key, value): flag_modified(sync_run, "extra") self.db_session.commit() + + def last_run_status(self,sync_id) -> str: + logger.debug("in service method") + return ( + self.db_session.query(self.model) + .filter(SyncRun.sync_id == sync_id) + .order_by(SyncRun.created_at) + .limit(1) + .first() + ).status + + \ No newline at end of file From ececb3a86cbca22e975f7f46064762f2d203ba00 Mon Sep 17 00:00:00 2001 From: supradeep2819 Date: Thu, 2 May 2024 18:32:40 +0530 Subject: [PATCH 05/26] feature: Added endpoint for getting last run status --- src/api/routers/syncs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/routers/syncs.py b/src/api/routers/syncs.py index 2141836d..177e0201 100644 --- a/src/api/routers/syncs.py +++ b/src/api/routers/syncs.py @@ -395,7 +395,7 @@ async def get_samples( return await sample_handling_service.read_sample_retriever_data(sample_retriever_task=sample_retriever_task) -@router.get("/{sync_id}/status", response_model=str) +@router.get("/{sync_id}/last/run/status", response_model=str) def get_run_status(sync_id, sync_runs_service: SyncRunsService = Depends(get_sync_runs_service)): return sync_runs_service.last_run_status(sync_id) \ No newline at end of file From e06bbc1823443ade383f5959b23d03cfd4ba4fb5 Mon Sep 17 00:00:00 2001 From: supradeep2819 Date: Fri, 3 May 2024 11:56:05 +0530 Subject: [PATCH 06/26] feat: Handled missed schema for last run status --- src/api/schemas/sync_status.py | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 src/api/schemas/sync_status.py diff --git a/src/api/schemas/sync_status.py b/src/api/schemas/sync_status.py new file mode 100644 index 00000000..4680998b --- /dev/null +++ b/src/api/schemas/sync_status.py @@ -0,0 +1,5 @@ +from pydantic import BaseModel +from enum import Enum + +class LastSyncStatus(BaseModel): + status: str From d758da6d4593bd66386e709d6f465c805cc2ab20 Mon Sep 17 00:00:00 2001 From: supradeep2819 Date: Fri, 3 May 2024 13:00:03 +0530 Subject: [PATCH 07/26] fix: retriving last run ID in descending order --- src/api/routers/syncs.py | 4 ++-- src/api/services/sync_runs.py | 21 ++++++++++++--------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/api/routers/syncs.py b/src/api/routers/syncs.py index 177e0201..26ba6663 100644 --- a/src/api/routers/syncs.py +++ b/src/api/routers/syncs.py @@ -31,7 +31,7 @@ from datetime import datetime from typing import Any, Dict, List, Optional -from api.schemas.sync_status import LastSyncStatus + from fastapi import Depends from fastapi.routing import APIRouter @@ -395,7 +395,7 @@ async def get_samples( return await sample_handling_service.read_sample_retriever_data(sample_retriever_task=sample_retriever_task) -@router.get("/{sync_id}/last/run/status", response_model=str) +@router.get("/{sync_id}/lastRunStatus", response_model=str) def get_run_status(sync_id, sync_runs_service: SyncRunsService = Depends(get_sync_runs_service)): return sync_runs_service.last_run_status(sync_id) \ No newline at end of file diff --git a/src/api/services/sync_runs.py b/src/api/services/sync_runs.py index 572e4b36..edb5591b 100644 --- a/src/api/services/sync_runs.py +++ b/src/api/services/sync_runs.py @@ -23,7 +23,6 @@ SOFTWARE. """ from datetime import datetime -from api.schemas.sync_status import LastSyncStatus from pydantic import UUID4 from sqlalchemy import and_, or_ from sqlalchemy.orm import Session @@ -112,12 +111,16 @@ def update_sync_run_extra_data(self, run_id, connector_string, key, value): def last_run_status(self,sync_id) -> str: logger.debug("in service method") - return ( - self.db_session.query(self.model) - .filter(SyncRun.sync_id == sync_id) - .order_by(SyncRun.created_at) - .limit(1) - .first() - ).status - + try: + + return ( + self.db_session.query(self.model) + .filter(SyncRun.sync_id == sync_id) + .order_by(SyncRun.created_at.desc()) + .limit(1) + .first() + ).status + except Exception as e: + logger.error(e) + return e.message \ No newline at end of file From a1ce573ceed3950ff906db463be34fb7abbc3118 Mon Sep 17 00:00:00 2001 From: supradeep2819 Date: Fri, 3 May 2024 13:56:59 +0530 Subject: [PATCH 08/26] fix: retriving last run ID in descending order --- src/api/routers/syncs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/routers/syncs.py b/src/api/routers/syncs.py index 26ba6663..ae090837 100644 --- a/src/api/routers/syncs.py +++ b/src/api/routers/syncs.py @@ -395,7 +395,7 @@ async def get_samples( return await sample_handling_service.read_sample_retriever_data(sample_retriever_task=sample_retriever_task) -@router.get("/{sync_id}/lastRunStatus", response_model=str) +@router.get("/{sync_id}/latestRunStatus", response_model=str) def get_run_status(sync_id, sync_runs_service: SyncRunsService = Depends(get_sync_runs_service)): return sync_runs_service.last_run_status(sync_id) \ No newline at end of file From 1b56fcb2c640d354cffb6444522643a916b441d1 Mon Sep 17 00:00:00 2001 From: supradeep2819 Date: Fri, 10 May 2024 13:52:16 +0530 Subject: [PATCH 09/26] "feat: Enhance source postgres configuration to provide specific table schema --- .../valmi_dbt/dbt_airbyte_adapter.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_airbyte_adapter.py b/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_airbyte_adapter.py index b68e1bbd..724b82d5 100644 --- a/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_airbyte_adapter.py +++ b/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_airbyte_adapter.py @@ -118,12 +118,24 @@ def discover_streams(self, logger: AirbyteLogger, config): with self.adapter.connection_named("discover-connection"): if "namespace" in config: + relations = self.adapter.list_relations(self.faldbt._config.credentials.database, schema=config["namespace"]) + if "table" in config: + # get subset of the relations + subset = [] + for relation in relations: + if relation.path.identifier.lower() == config["table"].lower() \ + and relation.path.schema.lower() == config["namespace"].lower(): + subset.append(relation) + break + return ( + False, + subset + ) return ( False, self.adapter.list_relations(self.faldbt._config.credentials.database, schema=config["namespace"]), ) - else: - return (True, self.adapter.list_schemas(self.faldbt._config.credentials.database)) + return (True, self.adapter.list_schemas(self.faldbt._config.credentials.database)) def get_columns(self, adapter: SQLAdapter, relation: BaseRelation): with self.adapter.connection_named("getcolumns-connection"): From f69dcd89e1b1aac8b8560bb45e4b3a4a020506b5 Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Tue, 14 May 2024 17:34:53 +0530 Subject: [PATCH 10/26] feat: changes needed for destination postgres to work --- packages/valmi-connector-lib/pyproject.toml | 2 +- .../destination_wrapper/destination_container_wrapper.py | 6 ++++++ .../destination_wrapper/proc_stdout_handler.py | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/valmi-connector-lib/pyproject.toml b/packages/valmi-connector-lib/pyproject.toml index 91a19aaa..23ab263a 100644 --- a/packages/valmi-connector-lib/pyproject.toml +++ b/packages/valmi-connector-lib/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "valmi-connector-lib" -version = "0.1.149" +version = "0.1.151" description = "" authors = ["Rajashekar Varkala "] readme = "README.md" diff --git a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py index a4f3fbad..cba3bb1d 100755 --- a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py @@ -182,6 +182,12 @@ def main(): # create the subprocess subprocess_args = sys.argv[1:] + + if os.environ.get('MODE', 'any') == 'etl' and "--destination_catalog" in subprocess_args: + arg_idx = subprocess_args.index("--destination_catalog") + subprocess_args.remove("--destination_catalog") + subprocess_args.pop(arg_idx) + if is_state_available(): subprocess_args.append("--state") subprocess_args.append(state_file_path) diff --git a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_handler.py b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_handler.py index e4f5e312..522ee2ec 100644 --- a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_handler.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_handler.py @@ -78,7 +78,7 @@ def run(self) -> None: ret_val = handlers[json_record["type"]].handle(json_record) if ret_val is False: # TODO: comes from ERROR Trace, should be handled cleanly self.proc.kill() - os._exit(0) # error is already logged with engine in the handler + os._exit(1) # error is already logged with engine in the handler # stdout finished. clean close self.exit_flag = True From 374bf8559e78fc53e9031be54e0407ade7fbac76 Mon Sep 17 00:00:00 2001 From: Rajashekar Varkala Date: Wed, 15 May 2024 11:43:56 +0530 Subject: [PATCH 11/26] feat: Adding checkpointing to etl sources and destinations --- .../destination_container_wrapper.py | 8 +++++--- .../destination_write_wrapper.py | 1 + .../proc_stdout_event_handlers.py | 15 ++++++++++++++- .../destination_wrapper/read_handlers.py | 7 ++++++- .../source_wrapper/source_container_wrapper.py | 4 ++-- 5 files changed, 28 insertions(+), 7 deletions(-) diff --git a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py index cba3bb1d..d4a05f69 100755 --- a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py @@ -166,7 +166,7 @@ def main(): engine.connector_state.run_time_args["sync_id"], engine.connector_state.run_time_args["run_id"], CONNECTOR_STRING) - + # initialize SampleWriter SampleWriter.get_writer_by_metric_type(store_config_str=os.environ["VALMI_INTERMEDIATE_STORE"], sync_id=engine.connector_state.run_time_args["sync_id"], @@ -183,12 +183,14 @@ def main(): # create the subprocess subprocess_args = sys.argv[1:] + # For ETL, there is no concept of destination catalog if os.environ.get('MODE', 'any') == 'etl' and "--destination_catalog" in subprocess_args: arg_idx = subprocess_args.index("--destination_catalog") subprocess_args.remove("--destination_catalog") subprocess_args.pop(arg_idx) - - if is_state_available(): + + # For ETL, internal connectors do not need any state information + if os.environ.get('MODE', 'any') != 'etl' and is_state_available(): subprocess_args.append("--state") subprocess_args.append(state_file_path) proc = subprocess.Popen(subprocess_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE) diff --git a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_write_wrapper.py b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_write_wrapper.py index d6029b6f..b2199b91 100644 --- a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_write_wrapper.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_write_wrapper.py @@ -59,6 +59,7 @@ def read_chunk_id_checkpoint(self): return self.previous_state['state']['data']['chunk_id'] + 1 return 1 + def start_message_handling(self, input_messages: Iterable[AirbyteMessage]) -> AirbyteMessage: counter: int = 0 counter_by_type: dict[str, int] = defaultdict(lambda: 0) diff --git a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_event_handlers.py b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_event_handlers.py index 46879bfa..0a270fd6 100644 --- a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_event_handlers.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_event_handlers.py @@ -58,13 +58,17 @@ def __init__(self, engine: NullEngine, state: str) -> None: self.engine = engine self.connector_state: ConnectorState = self.engine.connector_state self.loaded_state = state + self.current_file_name = None store_config = json.loads(os.environ["VALMI_INTERMEDIATE_STORE"]) if store_config["provider"] == "local": path_name = join(store_config["local"]["directory"], self.connector_state.run_time_args["run_id"], "data") os.makedirs(path_name, exist_ok=True) self.path_name = path_name - self.last_handled_fn = self.get_file_name_from_chunk_id(self.read_chunk_id_checkpoint()) + if os.environ.get('MODE', 'any') == 'etl': + self.last_handled_fn = self.read_file_marker_from_checkpoint() + else: + self.last_handled_fn = self.get_file_name_from_chunk_id(self.read_chunk_id_checkpoint()) def read(self): while True: @@ -76,6 +80,7 @@ def read(self): if self.last_handled_fn is not None and int(fn[:-5]) <= int(self.last_handled_fn[:-5]): continue if fn.endswith(".vald"): + self.current_file_name = fn with open(join(self.path_name, fn), "r") as f: for line in f.readlines(): # print("yiedling", line) @@ -106,6 +111,14 @@ def read_chunk_id_checkpoint(self): and 'chunk_id' in self.loaded_state['state']['data']: return self.loaded_state['state']['data']['chunk_id'] return None + + def read_file_marker_from_checkpoint(self): + if self.loaded_state is not None \ + and 'state' in self.loaded_state \ + and 'data' in self.loaded_state['state'] \ + and 'file_marker' in self.loaded_state['state']['data']: + return self.loaded_state["state"]["data"]["file_marker"] + return None def get_file_name_from_chunk_id(self, chunk_id): if chunk_id is not None: diff --git a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/read_handlers.py b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/read_handlers.py index 6f9e3edb..10d45aa4 100644 --- a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/read_handlers.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/read_handlers.py @@ -29,6 +29,7 @@ StoreReader, StdoutWriter, ) +import os class ReadDefaultHandler: @@ -62,7 +63,11 @@ def __init__(self, *args, **kwargs): super(ReadCheckpointHandler, self).__init__(*args, **kwargs) def handle(self, record) -> bool: - # do an engine call to proceed. + # For ETL, we store the checkpoint for the reader instead of the destination stdout state, + # because state is dictated by the source. + if os.environ.get('MODE', 'any') == 'etl': + record["state"]["data"]["file_marker"] = self.store_reader.current_file_name + self.engine.checkpoint(record) return True diff --git a/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py b/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py index 8e07c970..fdc5cd80 100755 --- a/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py @@ -296,9 +296,9 @@ def __init__(self, *args, **kwargs): def handle(self, record): print(json.dumps(record)) - + if os.environ.get('MODE', 'any') == 'etl': - record['state']['data']['chunk_id']= self.engine.connector_state.num_chunks - 1 # Not oprating on chunk boundary -- fix + # record['state']['data']['chunk_id'] = self.engine.connector_state.num_chunks - 1 # Not oprating on chunk boundary -- fix self.store_writer.write(record) self.engine.checkpoint(record) From b22b4a24de899149dd872dc7ae1508a97b64b480 Mon Sep 17 00:00:00 2001 From: Rajashekar Varkala Date: Wed, 15 May 2024 11:44:43 +0530 Subject: [PATCH 12/26] version up --- packages/valmi-connector-lib/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/valmi-connector-lib/pyproject.toml b/packages/valmi-connector-lib/pyproject.toml index 23ab263a..5f24c76e 100644 --- a/packages/valmi-connector-lib/pyproject.toml +++ b/packages/valmi-connector-lib/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "valmi-connector-lib" -version = "0.1.151" +version = "0.1.152" description = "" authors = ["Rajashekar Varkala "] readme = "README.md" From d307b9784bab2b14b78c20071e556e4784264b53 Mon Sep 17 00:00:00 2001 From: Rajashekar Varkala Date: Wed, 15 May 2024 12:09:21 +0530 Subject: [PATCH 13/26] feat: for etl, batching source records only on seeing state messages --- packages/valmi-connector-lib/pyproject.toml | 2 +- .../source_wrapper/source_container_wrapper.py | 13 ++++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/packages/valmi-connector-lib/pyproject.toml b/packages/valmi-connector-lib/pyproject.toml index 5f24c76e..6f7ff2c0 100644 --- a/packages/valmi-connector-lib/pyproject.toml +++ b/packages/valmi-connector-lib/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "valmi-connector-lib" -version = "0.1.152" +version = "0.1.153" description = "" authors = ["Rajashekar Varkala "] readme = "README.md" diff --git a/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py b/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py index fdc5cd80..bedede6b 100755 --- a/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py @@ -127,7 +127,7 @@ def __init__(self, *args, **kwargs): def current_run_details(self): sync_id = du(os.environ.get("DAGSTER_RUN_JOB_NAME", "cf280e5c-1184-4052-b089-f9f41b25138e")) - #sync_id = du(os.environ.get("DAGSTER_RUN_JOB_NAME", "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa")) + # sync_id = du(os.environ.get("DAGSTER_RUN_JOB_NAME", "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa")) r = self.session_with_retries.get( f"{self.engine_url}/syncs/{sync_id}/runs/current_run_details/{CONNECTOR_STRING}", timeout=HTTP_TIMEOUT, @@ -239,8 +239,15 @@ def __init__(self, engine: NullEngine) -> None: def write(self, record, last=False): self.records.append(record) - self.connector_state.register_record() - if self.connector_state.records_in_chunk >= self.connector_state.run_time_args["chunk_size"]: + is_state_message = record['type'] == "STATE" + if not is_state_message: + self.connector_state.register_record() + + etl_mode = os.environ.get('MODE', 'any') == 'etl' + + # For ETL the chunk flush should happen only after seeing STATE message. + if (etl_mode and is_state_message) or \ + (not etl_mode and self.connector_state.records_in_chunk >= self.connector_state.run_time_args["chunk_size"]): self.flush(last=False) self.records = [] self.engine.metric(commit=True) From e61e1a4bf7bae9bf1a1659b2d88db2f5cb58a39e Mon Sep 17 00:00:00 2001 From: supradeep2819 Date: Thu, 16 May 2024 18:59:35 +0530 Subject: [PATCH 14/26] feat: generating catalog when query is present postgres config --- .../source-postgres/source_postgres/source.py | 35 +++++++++++++++++-- .../valmi_dbt/dbt_airbyte_adapter.py | 12 +++++-- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/valmi-integrations/connectors/source-postgres/source_postgres/source.py b/valmi-integrations/connectors/source-postgres/source_postgres/source.py index 725ab863..3b1731c0 100644 --- a/valmi-integrations/connectors/source-postgres/source_postgres/source.py +++ b/valmi-integrations/connectors/source-postgres/source_postgres/source.py @@ -112,6 +112,35 @@ def discover(self, logger: AirbyteLogger, config: json) -> ValmiCatalog: catalog.__setattr__("more", more) return catalog else: + if "query" in config: + streams = [] + properties = {} + json_schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": properties + } + for column_name in result_streams.column_names: + predefined_type = { + "type": "character varying" + } + properties[column_name] = predefined_type + stream = { + "name": "dvdrental.public.shop", + "json_schema": json_schema, + "supported_sync_modes": [ + "full_refresh", + "incremental" + ], + "label": "dvdrental.public.shop", + + } + streams.append(stream) + catalog = ValmiCatalog(streams=streams) + catalog.__setattr__("type", "table") + catalog.__setattr__("more", more) + return catalog + streams = [] for row in result_streams: stream_name = str(row) @@ -141,7 +170,7 @@ def is_dbt_run_finished(self, state: Dict[str, any]): if state is None or 'state' not in state: return False return True - + def read_chunk_id_checkpoint(self, state: Dict[str, any]): if state is not None \ and 'state' in state \ @@ -188,7 +217,7 @@ def read( ), ) return - + # initialise chunk_size if "run_time_args" in config and "chunk_size" in config["run_time_args"]: chunk_size = config["run_time_args"]["chunk_size"] @@ -282,7 +311,7 @@ def generate_samples_for_ignored_data(self, faldbt, logger, sync_id, catalog) -> ), ) ''' - + def generate_sync_metrics(self, faldbt, logger, sync_id, catalog) -> Generator[AirbyteMessage, None, None]: adapter_resp, agate_table = self.dbt_adapter.execute_sql( faldbt, diff --git a/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_airbyte_adapter.py b/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_airbyte_adapter.py index 724b82d5..c61a4447 100644 --- a/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_airbyte_adapter.py +++ b/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_airbyte_adapter.py @@ -118,13 +118,19 @@ def discover_streams(self, logger: AirbyteLogger, config): with self.adapter.connection_named("discover-connection"): if "namespace" in config: - relations = self.adapter.list_relations(self.faldbt._config.credentials.database, schema=config["namespace"]) + relations = self.adapter.list_relations( + self.faldbt._config.credentials.database, schema=config["namespace"]) + if "query" in config: + adapter_resp, agate_table = self.adapter.execute(config["query"], fetch=True) + # adapter_resp, agate_table = self.execute_sql(self.faldbt, logger, config["query"]) + # logger.info(agate_table) + return (False, agate_table) if "table" in config: # get subset of the relations subset = [] for relation in relations: if relation.path.identifier.lower() == config["table"].lower() \ - and relation.path.schema.lower() == config["namespace"].lower(): + and relation.path.schema.lower() == config["namespace"].lower(): subset.append(relation) break return ( @@ -229,7 +235,7 @@ def execute_dbt(self, logger: AirbyteLogger): stderr=subprocess.PIPE, stdout=subprocess.PIPE, ) - + logs = [] for line in iter(proc.stdout.readline, b''): logs.append(line) From 79654bba9be22e29d304befdd9a3d11a7056881e Mon Sep 17 00:00:00 2001 From: Rajashekar Varkala Date: Thu, 16 May 2024 19:07:56 +0530 Subject: [PATCH 15/26] feat: multiple state messages stored for etl sources --- packages/valmi-connector-lib/pyproject.toml | 2 +- .../destination_container_wrapper.py | 9 +++---- .../destination_write_wrapper.py | 8 +++---- .../destination_wrapper/engine.py | 2 +- .../proc_stdout_event_handlers.py | 16 ++++++------- .../destination_wrapper/read_handlers.py | 2 +- .../source_container_wrapper.py | 13 +++++----- src/api/routers/syncs.py | 12 ++++++---- src/api/services/sync_runs.py | 24 +++++++++++++++++-- 9 files changed, 53 insertions(+), 35 deletions(-) diff --git a/packages/valmi-connector-lib/pyproject.toml b/packages/valmi-connector-lib/pyproject.toml index 6f7ff2c0..50e471ea 100644 --- a/packages/valmi-connector-lib/pyproject.toml +++ b/packages/valmi-connector-lib/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "valmi-connector-lib" -version = "0.1.153" +version = "0.1.155" description = "" authors = ["Rajashekar Varkala "] readme = "README.md" diff --git a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py index d4a05f69..d092212f 100755 --- a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_container_wrapper.py @@ -173,13 +173,14 @@ def main(): run_id=engine.connector_state.run_time_args["run_id"], connector=CONNECTOR_STRING) - # initialize handler - for key in handlers.keys(): - handlers[key] = handlers[key](engine=engine, store_writer=None, stdout_writer=None) - global loaded_state store_reader = StoreReader(engine=engine, state=loaded_state) + # initialize handler + for key in handlers.keys(): + handlers[key] = handlers[key](engine=engine, store_writer=None, + stdout_writer=None, store_reader=store_reader) + # create the subprocess subprocess_args = sys.argv[1:] diff --git a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_write_wrapper.py b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_write_wrapper.py index b2199b91..b7f24eec 100644 --- a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_write_wrapper.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_write_wrapper.py @@ -53,13 +53,11 @@ def finalise_message_handling(self) -> HandlerResponseData: def read_chunk_id_checkpoint(self): if self.previous_state is not None \ - and 'state' in self.previous_state \ - and 'data' in self.previous_state['state'] \ - and 'chunk_id' in self.previous_state['state']['data']: - return self.previous_state['state']['data']['chunk_id'] + 1 + and 'data' in self.previous_state \ + and 'chunk_id' in self.previous_state['data']: + return self.previous_state['data']['chunk_id'] + 1 return 1 - def start_message_handling(self, input_messages: Iterable[AirbyteMessage]) -> AirbyteMessage: counter: int = 0 counter_by_type: dict[str, int] = defaultdict(lambda: 0) diff --git a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/engine.py b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/engine.py index 130172cd..90cd4020 100644 --- a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/engine.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/engine.py @@ -199,7 +199,7 @@ def checkpoint(self, state): sync_id = self.connector_state.run_time_args["sync_id"] run_id = self.connector_state.run_time_args["run_id"] r = self.session_with_retries.post( - f"{self.engine_url}/syncs/{sync_id}/runs/{run_id}/state/{CONNECTOR_STRING}/", + f"{self.engine_url}/syncs/{sync_id}/runs/{run_id}/state/{CONNECTOR_STRING}/{os.environ.get('MODE', 'any')}", timeout=HTTP_TIMEOUT, json=state, ) diff --git a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_event_handlers.py b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_event_handlers.py index 0a270fd6..410a1a56 100644 --- a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_event_handlers.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_event_handlers.py @@ -106,18 +106,16 @@ def read(self): def read_chunk_id_checkpoint(self): # TODO: connector_state is not being used for destination, clean it up. if self.loaded_state is not None \ - and 'state' in self.loaded_state \ - and 'data' in self.loaded_state['state'] \ - and 'chunk_id' in self.loaded_state['state']['data']: - return self.loaded_state['state']['data']['chunk_id'] + and 'data' in self.loaded_state \ + and 'chunk_id' in self.loaded_state['data']: + return self.loaded_state['data']['chunk_id'] return None def read_file_marker_from_checkpoint(self): if self.loaded_state is not None \ - and 'state' in self.loaded_state \ - and 'data' in self.loaded_state['state'] \ - and 'file_marker' in self.loaded_state['state']['data']: - return self.loaded_state["state"]["data"]["file_marker"] + and 'data' in self.loaded_state \ + and 'file_marker' in self.loaded_state['data']: + return self.loaded_state["data"]["file_marker"] return None def get_file_name_from_chunk_id(self, chunk_id): @@ -185,7 +183,7 @@ def handle(self, record) -> bool: self.engine.metric_ext(records_delivered, record["state"]["data"]["chunk_id"], commit=True) # self.engine.connector_state.register_chunk() if commit_state: - self.engine.checkpoint(record) + self.engine.checkpoint(record["state"]) if SingletonLogWriter.instance() is not None: SingletonLogWriter.instance().data_chunk_flush_callback() SampleWriter.data_chunk_flush_callback() diff --git a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/read_handlers.py b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/read_handlers.py index 10d45aa4..74ee6923 100644 --- a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/read_handlers.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/read_handlers.py @@ -67,7 +67,7 @@ def handle(self, record) -> bool: # because state is dictated by the source. if os.environ.get('MODE', 'any') == 'etl': record["state"]["data"]["file_marker"] = self.store_reader.current_file_name - self.engine.checkpoint(record) + self.engine.checkpoint(record["state"]) return True diff --git a/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py b/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py index bedede6b..f960671e 100755 --- a/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py @@ -60,13 +60,12 @@ def __init__(self, run_time_args={}) -> None: self.records_in_chunk = 0 self.run_time_args = run_time_args self.total_records = 0 - + def reset_chunk_id_from_state(self, state): if state is not None \ - and 'state' in state \ - and 'data' in state['state'] \ - and 'chunk_id' in state['state']['data']: - self.num_chunks = state['state']['data']['chunk_id'] + 1 + and 'data' in state \ + and 'chunk_id' in state['data']: + self.num_chunks = state['data']['chunk_id'] + 1 else: self.num_chunks = 1 self.total_records = (self.num_chunks - 1) * self.run_time_args["chunk_size"] @@ -200,7 +199,7 @@ def checkpoint(self, state): sync_id = self.connector_state.run_time_args["sync_id"] run_id = self.connector_state.run_time_args["run_id"] r = self.session_with_retries.post( - f"{self.engine_url}/syncs/{sync_id}/runs/{run_id}/state/{CONNECTOR_STRING}/", + f"{self.engine_url}/syncs/{sync_id}/runs/{run_id}/state/{CONNECTOR_STRING}/{os.environ.get('MODE', 'any')}", timeout=HTTP_TIMEOUT, json=state, ) @@ -308,7 +307,7 @@ def handle(self, record): # record['state']['data']['chunk_id'] = self.engine.connector_state.num_chunks - 1 # Not oprating on chunk boundary -- fix self.store_writer.write(record) - self.engine.checkpoint(record) + self.engine.checkpoint(record['state']) if SingletonLogWriter.instance() is not None: SingletonLogWriter.instance().data_chunk_flush_callback() SampleWriter.data_chunk_flush_callback() diff --git a/src/api/routers/syncs.py b/src/api/routers/syncs.py index ae090837..5e555d9c 100644 --- a/src/api/routers/syncs.py +++ b/src/api/routers/syncs.py @@ -137,7 +137,7 @@ async def get_current_run_details_for_connector_string( "records_per_metric": connector_run_config["records_per_metric"] if "records_per_metric" in connector_run_config else 10, - + "previous_run_status": "success" if previous_run is None or ("run_manager" in previous_run.extra and previous_run.extra["run_manager"]["status"]["status"] == "success") else "failure", # For first run also, previous_run_status will be success @@ -149,8 +149,9 @@ async def get_current_run_details_for_connector_string( run_args["full_refresh"] = current_run.run_time_args["full_refresh"] # Set Connector State for the run_time_args to restart the run from the checkpoint - if current_run.extra is not None and connector_string in current_run.extra and 'state' in current_run.extra[connector_string]: - run_args["state"] = current_run.extra[connector_string]['state']['state'] + if current_run.extra is not None and connector_string in current_run.extra \ + and 'state' in current_run.extra[connector_string]: + run_args["state"] = current_run.extra[connector_string]['state'] return SyncCurrentRunArgs(**run_args) @@ -194,11 +195,12 @@ async def synchronize_connector( return ConnectorSynchronization(abort_required=abort_required) -@router.post("/{sync_id}/runs/{run_id}/state/{connector_string}/", response_model=GenericResponse) +@router.post("/{sync_id}/runs/{run_id}/state/{connector_string}/{mode}", response_model=GenericResponse) async def state( sync_id: UUID4, run_id: UUID4, connector_string: str, + mode: str, state: Dict, sync_runs_service: SyncRunsService = Depends(get_sync_runs_service), ) -> GenericResponse: @@ -206,7 +208,7 @@ async def state( if str(sync_id) == "cf280e5c-1184-4052-b089-f9f41b25138e": return GenericResponse() - sync_runs_service.save_state(sync_id, run_id, connector_string, state) + sync_runs_service.save_state(sync_id, run_id, connector_string, mode, state) return GenericResponse() diff --git a/src/api/services/sync_runs.py b/src/api/services/sync_runs.py index edb5591b..01c60c04 100644 --- a/src/api/services/sync_runs.py +++ b/src/api/services/sync_runs.py @@ -81,7 +81,7 @@ def save_status(self, sync_id, run_id, connector_string, status): self.db_session.commit() - def save_state(self, sync_id, run_id, connector_string, state): + def save_state(self, sync_id, run_id, connector_string, mode, state): sync_run = self.get(run_id) self.db_session.refresh(sync_run) @@ -89,7 +89,27 @@ def save_state(self, sync_id, run_id, connector_string, state): sync_run.extra = {} if connector_string not in sync_run.extra: sync_run.extra[connector_string] = {} - sync_run.extra[connector_string]["state"] = {"state": state} + + # PER STREAM STATE message in etl sources + if mode == "etl" and connector_string == "src": + state_to_input: List = None + if "state" in sync_run.extra[connector_string]: + state_to_input = sync_run.extra[connector_string]["state"] + if state_to_input is None: + state_to_input = [state] + else: + current_stream: str = state['stream']['stream_descriptor']['name'] + new_state = [] + for s in state_to_input: + print(s) + if current_stream != s['stream']['stream_descriptor']['name']: + new_state.append(s) + new_state.append(state) + state_to_input = new_state + else: + state_to_input = state + + sync_run.extra[connector_string]["state"] = state_to_input flag_modified(sync_run, "extra") self.db_session.commit() From 172195f6249cf5809f82989ff143c62c1e0eab12 Mon Sep 17 00:00:00 2001 From: Ganesh varma Date: Fri, 17 May 2024 13:49:17 +0530 Subject: [PATCH 16/26] Fix finalizer step which got stuck because of transformation (#75) --- src/orchestrator/templates/shopify_template.jinja | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/orchestrator/templates/shopify_template.jinja b/src/orchestrator/templates/shopify_template.jinja index bc46d9cb..3077820f 100644 --- a/src/orchestrator/templates/shopify_template.jinja +++ b/src/orchestrator/templates/shopify_template.jinja @@ -111,7 +111,7 @@ def normalization_op(context, a, b): jitter=Jitter.PLUS_MINUS, ), ) -def transformation_po_op(context, c) -> None: +def transformation_po_op(context, c): execute_docker_container( context, image = "valmiio/transform-po:latest", From bad286bffe7c0b4a95d769ca98bab84c5b9ed933 Mon Sep 17 00:00:00 2001 From: gane5hvarma Date: Mon, 20 May 2024 16:39:15 +0530 Subject: [PATCH 17/26] feat: rm print statements and update version --- packages/valmi-connector-lib/pyproject.toml | 2 +- .../source_wrapper/source_container_wrapper.py | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/packages/valmi-connector-lib/pyproject.toml b/packages/valmi-connector-lib/pyproject.toml index 50e471ea..fc85ec3a 100644 --- a/packages/valmi-connector-lib/pyproject.toml +++ b/packages/valmi-connector-lib/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "valmi-connector-lib" -version = "0.1.155" +version = "0.1.156" description = "" authors = ["Rajashekar Varkala "] readme = "README.md" diff --git a/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py b/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py index f960671e..68e489e4 100755 --- a/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py @@ -454,7 +454,6 @@ def main(): # populate run_time_args populate_run_time_args(airbyte_command, engine, config_file_path=config_file) - print("in valmi-lib-after populate run_time_args") if airbyte_command == "read": # initialize LogWriter @@ -469,7 +468,6 @@ def main(): sync_id=engine.connector_state.run_time_args["sync_id"], run_id=engine.connector_state.run_time_args["run_id"], connector=CONNECTOR_STRING) - print("in valmi-lib-after creating log and samplewriter") stdout_writer = StdoutWriter(engine) @@ -488,15 +486,11 @@ def main(): ) # check engine errors every CHUNK_SIZE records - print("in valmi-lib-getting to reading stdout") record_types = handlers.keys() for line in io.TextIOWrapper(proc.stdout, encoding="utf-8"): # or another encoding if line.strip() == "": continue - print("in valmi-lib-begin") - print(line) - print("in valmi-lib-end") json_record = json.loads(line) if json_record["type"] not in record_types: handlers["default"].handle(json_record) From dea7d8d0ce1fa192d70f36dc70b2fc388ee2a916 Mon Sep 17 00:00:00 2001 From: supradeep2819 Date: Thu, 23 May 2024 16:17:39 +0530 Subject: [PATCH 18/26] feat: Add endpoint to fetch latest successful sync --- src/api/routers/syncs.py | 12 ++++++++++- src/api/services/sync_runs.py | 39 ++++++++++++++++++++++++++++++++--- 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/src/api/routers/syncs.py b/src/api/routers/syncs.py index 5e555d9c..fba2f9bb 100644 --- a/src/api/routers/syncs.py +++ b/src/api/routers/syncs.py @@ -400,4 +400,14 @@ async def get_samples( @router.get("/{sync_id}/latestRunStatus", response_model=str) def get_run_status(sync_id, sync_runs_service: SyncRunsService = Depends(get_sync_runs_service)): - return sync_runs_service.last_run_status(sync_id) \ No newline at end of file + return sync_runs_service.last_run_status(sync_id) + +@router.get("/{sync_id}/last_successful_sync", response_model=dict) +def get_last_sync_sucess_time(sync_id, + sync_runs_service: SyncRunsService = Depends(get_sync_runs_service)): + return sync_runs_service.last_successful_sync_run(sync_id) + +@router.get("/{sync_id}/latest_sync_info", response_model=dict) +def get_latest_sync_inof(sync_id, + sync_runs_service: SyncRunsService = Depends(get_sync_runs_service)): + return sync_runs_service.latest_sync_info(sync_id) \ No newline at end of file diff --git a/src/api/services/sync_runs.py b/src/api/services/sync_runs.py index 01c60c04..55aaed7d 100644 --- a/src/api/services/sync_runs.py +++ b/src/api/services/sync_runs.py @@ -29,7 +29,7 @@ from metastore.models import SyncRun from api.schemas import SyncRunCreate -from typing import Any, List +from typing import Any, Dict, List, Optional from metastore.models import SyncStatus from .base import BaseService @@ -142,5 +142,38 @@ def last_run_status(self,sync_id) -> str: ).status except Exception as e: logger.error(e) - return e.message - \ No newline at end of file + raise e + def last_successful_sync_run(self, sync_id) -> Dict: + try: + logger.debug('here in activation below is query') + + # Query for the latest stopped sync run + result = ( + self.db_session.query(self.model) + .filter(SyncRun.sync_id == sync_id, SyncRun.status == "stopped") + .order_by(SyncRun.created_at.desc()) + .limit(1) + .first() + ) + + if result is None: + return {"found": False, "timestamp": "0000-00-00 00:00:00.000000"} + return {"found":True,"timestamp": result.run_end_at} + except Exception as e: + logger.error(e) + raise e + def latest_sync_info(self, sync_id)->Dict: + try: + result = ( + self.db_session.query(self.model) + .filter(SyncRun.sync_id == sync_id) + .order_by(SyncRun.created_at.desc()) + .limit(1) + .first() + ) + if result is None: + return {"enabled": False} + return {"enabled":True,"status":result.status,"created_at":result.created_at} + except Exception as e: + logger.error(e) + raise e \ No newline at end of file From 5ff5f397ebc5265739ca4281b90f9692be08f1e9 Mon Sep 17 00:00:00 2001 From: supradeep2819 Date: Thu, 23 May 2024 16:26:00 +0530 Subject: [PATCH 19/26] feat: renamed timestamp with run_end_at in latest_run_status endpoint --- src/api/services/sync_runs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/api/services/sync_runs.py b/src/api/services/sync_runs.py index 55aaed7d..c5bb5497 100644 --- a/src/api/services/sync_runs.py +++ b/src/api/services/sync_runs.py @@ -157,8 +157,8 @@ def last_successful_sync_run(self, sync_id) -> Dict: ) if result is None: - return {"found": False, "timestamp": "0000-00-00 00:00:00.000000"} - return {"found":True,"timestamp": result.run_end_at} + return {"found": False, "run_end_at": "0000-00-00 00:00:00.000000"} + return {"found":True,"run_end_at": result.run_end_at} except Exception as e: logger.error(e) raise e From 3197240548a301eb75efef9b00958fcc692c3296 Mon Sep 17 00:00:00 2001 From: supradeep2819 Date: Tue, 28 May 2024 11:01:29 +0530 Subject: [PATCH 20/26] feat: Used schemas instead of dict in sync service --- src/api/routers/syncs.py | 47 ++++++++++++++++++++++------------ src/api/schemas/sync.py | 10 ++++++++ src/api/schemas/sync_status.py | 5 ---- src/api/services/sync_runs.py | 31 ++++++---------------- 4 files changed, 49 insertions(+), 44 deletions(-) delete mode 100644 src/api/schemas/sync_status.py diff --git a/src/api/routers/syncs.py b/src/api/routers/syncs.py index fba2f9bb..c28e2ffd 100644 --- a/src/api/routers/syncs.py +++ b/src/api/routers/syncs.py @@ -32,11 +32,12 @@ from typing import Any, Dict, List, Optional -from fastapi import Depends - +from fastapi import Depends,Response +from fastapi.responses import JSONResponse from fastapi.routing import APIRouter from orchestrator.run_manager import SyncRunnerThread from pydantic import UUID4, Json +from api.schemas.sync import LastSuccessfulSync, LatestSyncInfo from vyper import v from metastore import models @@ -106,6 +107,7 @@ async def get_current_run_details_for_connector_string( sync_schedule = sync_service.get(sync_id) runs = sync_runs_service.get_runs(sync_id, datetime.now(), 2) + previous_run = runs[1] if len(runs) > 1 else None if previous_run is not None: sync_runs_service.db_session.refresh( @@ -122,7 +124,9 @@ async def get_current_run_details_for_connector_string( connector_run_config = {} if connector_type in v.get("CONNECTOR_RUN_CONFIG"): connector_run_config = v.get("CONNECTOR_RUN_CONFIG")[connector_type] - + print(sync_id) + print(previous_run) + print(connector_string) # TODO: get saved checkpoint state of the run_id & create column run_time_args in the sync_runs table to get repeatable runs run_args = { @@ -266,12 +270,17 @@ async def new_run( with sync_service.api_and_run_manager_mutex: sync = sync_service.get_sync(sync_id) runs = sync_runs_service.get_runs(sync_id, datetime.now(), 2) - previous_run = runs[0] if len(runs) > 1 else None + + previous_run = runs[0] if len(runs) >= 1 else None previous_run_status = previous_run.status if previous_run is not None else None + print(previous_run) + print(previous_run_status) + if sync.status == SyncConfigStatus.ACTIVE and ( previous_run is None or previous_run.status == SyncStatus.STOPPED ): + print("inside creating run") run = SyncRunCreate( run_id=uuid.uuid4(), sync_id=sync.sync_id, @@ -285,6 +294,9 @@ async def new_run( sync.last_run_id = run.run_id sync_service.update_sync_and_create_run(sync, run) + print("sync" , sync) + print("run", run) + SyncRunnerThread.refresh_db_session() return GenericResponse(success=True, message=f"Sync run started with run_id: {run.run_id}") @@ -397,17 +409,20 @@ async def get_samples( return await sample_handling_service.read_sample_retriever_data(sample_retriever_task=sample_retriever_task) -@router.get("/{sync_id}/latestRunStatus", response_model=str) -def get_run_status(sync_id, - sync_runs_service: SyncRunsService = Depends(get_sync_runs_service)): - return sync_runs_service.last_run_status(sync_id) - -@router.get("/{sync_id}/last_successful_sync", response_model=dict) +@router.get("/{sync_id}/last_successful_sync", response_model=LastSuccessfulSync) def get_last_sync_sucess_time(sync_id, - sync_runs_service: SyncRunsService = Depends(get_sync_runs_service)): - return sync_runs_service.last_successful_sync_run(sync_id) - -@router.get("/{sync_id}/latest_sync_info", response_model=dict) + sync_runs_service: SyncRunsService = Depends(get_sync_runs_service)) -> LastSuccessfulSync: + try: + return sync_runs_service.last_successful_sync_run(sync_id) + except Exception as e: + logger.error(f"An error occurred while fetching the last successful sync run: {e}") + return JSONResponse(status_code=500, content={"detail": "Internal Server Error"}) + +@router.get("/{sync_id}/latest_sync_info", response_model=LatestSyncInfo) def get_latest_sync_inof(sync_id, - sync_runs_service: SyncRunsService = Depends(get_sync_runs_service)): - return sync_runs_service.latest_sync_info(sync_id) \ No newline at end of file + sync_runs_service: SyncRunsService = Depends(get_sync_runs_service)) -> LatestSyncInfo: + try: + return sync_runs_service.latest_sync_info(sync_id) + except Exception as e: + logger.error(f"An error occurred while fetching the latest sync info: {e}") + return JSONResponse(status_code=500, content={"detail": "Internal Server Error"}) \ No newline at end of file diff --git a/src/api/schemas/sync.py b/src/api/schemas/sync.py index 2d4ae7a4..aa32d695 100644 --- a/src/api/schemas/sync.py +++ b/src/api/schemas/sync.py @@ -34,3 +34,13 @@ class SyncCurrentRunArgs(BaseModel): class Config: extra = Extra.allow + + +class LastSuccessfulSync(BaseModel): + found: bool + run_end_at: Optional[datetime] = None + +class LatestSyncInfo(BaseModel): + found: bool + status: Optional[str] = None + created_at: Optional[datetime] = None \ No newline at end of file diff --git a/src/api/schemas/sync_status.py b/src/api/schemas/sync_status.py deleted file mode 100644 index 4680998b..00000000 --- a/src/api/schemas/sync_status.py +++ /dev/null @@ -1,5 +0,0 @@ -from pydantic import BaseModel -from enum import Enum - -class LastSyncStatus(BaseModel): - status: str diff --git a/src/api/services/sync_runs.py b/src/api/services/sync_runs.py index c5bb5497..8baad6d6 100644 --- a/src/api/services/sync_runs.py +++ b/src/api/services/sync_runs.py @@ -32,6 +32,7 @@ from typing import Any, Dict, List, Optional from metastore.models import SyncStatus +from api.schemas.sync import LastSuccessfulSync, LatestSyncInfo from .base import BaseService from sqlalchemy.orm.attributes import flag_modified import logging @@ -129,21 +130,7 @@ def update_sync_run_extra_data(self, run_id, connector_string, key, value): self.db_session.commit() - def last_run_status(self,sync_id) -> str: - logger.debug("in service method") - try: - - return ( - self.db_session.query(self.model) - .filter(SyncRun.sync_id == sync_id) - .order_by(SyncRun.created_at.desc()) - .limit(1) - .first() - ).status - except Exception as e: - logger.error(e) - raise e - def last_successful_sync_run(self, sync_id) -> Dict: + def last_successful_sync_run(self, sync_id) -> LastSuccessfulSync: try: logger.debug('here in activation below is query') @@ -157,12 +144,11 @@ def last_successful_sync_run(self, sync_id) -> Dict: ) if result is None: - return {"found": False, "run_end_at": "0000-00-00 00:00:00.000000"} - return {"found":True,"run_end_at": result.run_end_at} + return LastSuccessfulSync(found=False) + return LastSuccessfulSync(found=True,run_end_at = result.run_end_at) except Exception as e: logger.error(e) - raise e - def latest_sync_info(self, sync_id)->Dict: + def latest_sync_info(self, sync_id)->LatestSyncInfo: try: result = ( self.db_session.query(self.model) @@ -172,8 +158,7 @@ def latest_sync_info(self, sync_id)->Dict: .first() ) if result is None: - return {"enabled": False} - return {"enabled":True,"status":result.status,"created_at":result.created_at} + return LatestSyncInfo(found=False) + return LatestSyncInfo(found=True,status=result.status,created_at=result.created_at) except Exception as e: - logger.error(e) - raise e \ No newline at end of file + logger.error(e) \ No newline at end of file From c3d109e25e099605780dcb813b5eebd53645452b Mon Sep 17 00:00:00 2001 From: Rajashekar Varkala Date: Wed, 29 May 2024 23:10:33 +0530 Subject: [PATCH 21/26] bug: metrics fix for etl --- config.yaml | 2 +- packages/valmi-connector-lib/pyproject.toml | 2 +- .../destination_write_wrapper.py | 6 +++--- .../proc_stdout_event_handlers.py | 12 ++++++------ .../destination_wrapper/read_handlers.py | 3 ++- .../source_wrapper/source_container_wrapper.py | 17 +++++++++++------ src/api/services/sync_runs.py | 14 +++++++++++--- 7 files changed, 35 insertions(+), 21 deletions(-) diff --git a/config.yaml b/config.yaml index 31a4c43b..3eb88055 100644 --- a/config.yaml +++ b/config.yaml @@ -71,4 +71,4 @@ CONNECTOR_RUN_CONFIG: POSTGRES: records_per_metric: 100 SHOPIFY: - chunk_size: 1 + chunk_size: 300 diff --git a/packages/valmi-connector-lib/pyproject.toml b/packages/valmi-connector-lib/pyproject.toml index fc85ec3a..28af3493 100644 --- a/packages/valmi-connector-lib/pyproject.toml +++ b/packages/valmi-connector-lib/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "valmi-connector-lib" -version = "0.1.156" +version = "0.1.159" description = "" authors = ["Rajashekar Varkala "] readme = "README.md" diff --git a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_write_wrapper.py b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_write_wrapper.py index b7f24eec..a9c236f4 100644 --- a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_write_wrapper.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/destination_write_wrapper.py @@ -53,9 +53,9 @@ def finalise_message_handling(self) -> HandlerResponseData: def read_chunk_id_checkpoint(self): if self.previous_state is not None \ - and 'data' in self.previous_state \ - and 'chunk_id' in self.previous_state['data']: - return self.previous_state['data']['chunk_id'] + 1 + and '_valmi_meta' in self.previous_state \ + and 'chunk_id' in self.previous_state['_valmi_meta']: + return self.previous_state['_valmi_meta']['chunk_id'] + 1 return 1 def start_message_handling(self, input_messages: Iterable[AirbyteMessage]) -> AirbyteMessage: diff --git a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_event_handlers.py b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_event_handlers.py index 410a1a56..5da1db29 100644 --- a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_event_handlers.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/proc_stdout_event_handlers.py @@ -106,16 +106,16 @@ def read(self): def read_chunk_id_checkpoint(self): # TODO: connector_state is not being used for destination, clean it up. if self.loaded_state is not None \ - and 'data' in self.loaded_state \ - and 'chunk_id' in self.loaded_state['data']: - return self.loaded_state['data']['chunk_id'] + and '_valmi_meta' in self.loaded_state \ + and 'chunk_id' in self.loaded_state['_valmi_meta']: + return self.loaded_state['_valmi_meta']['chunk_id'] return None def read_file_marker_from_checkpoint(self): if self.loaded_state is not None \ - and 'data' in self.loaded_state \ - and 'file_marker' in self.loaded_state['data']: - return self.loaded_state["data"]["file_marker"] + and '_valmi_meta' in self.loaded_state \ + and 'file_marker' in self.loaded_state['_valmi_meta']: + return self.loaded_state["_valmi_meta"]["file_marker"] return None def get_file_name_from_chunk_id(self, chunk_id): diff --git a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/read_handlers.py b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/read_handlers.py index 74ee6923..6ac680c5 100644 --- a/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/read_handlers.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/destination_wrapper/read_handlers.py @@ -66,7 +66,8 @@ def handle(self, record) -> bool: # For ETL, we store the checkpoint for the reader instead of the destination stdout state, # because state is dictated by the source. if os.environ.get('MODE', 'any') == 'etl': - record["state"]["data"]["file_marker"] = self.store_reader.current_file_name + _valmi_meta = {"file_marker": self.store_reader.current_file_name} + record["state"]["_valmi_meta"] = _valmi_meta self.engine.checkpoint(record["state"]) return True diff --git a/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py b/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py index 68e489e4..01924dac 100755 --- a/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py @@ -63,9 +63,9 @@ def __init__(self, run_time_args={}) -> None: def reset_chunk_id_from_state(self, state): if state is not None \ - and 'data' in state \ - and 'chunk_id' in state['data']: - self.num_chunks = state['data']['chunk_id'] + 1 + and '_valmi_meta' in state \ + and 'chunk_id' in state['_valmi_meta']: + self.num_chunks = state['_valmi_meta']['chunk_id'] + 1 else: self.num_chunks = 1 self.total_records = (self.num_chunks - 1) * self.run_time_args["chunk_size"] @@ -304,7 +304,8 @@ def handle(self, record): print(json.dumps(record)) if os.environ.get('MODE', 'any') == 'etl': - # record['state']['data']['chunk_id'] = self.engine.connector_state.num_chunks - 1 # Not oprating on chunk boundary -- fix + _valmi_meta = {"chunk_id": self.engine.connector_state.num_chunks - 1} + record['state']['_valmi_meta'] = _valmi_meta self.store_writer.write(record) self.engine.checkpoint(record['state']) @@ -323,7 +324,7 @@ def handle(self, record): if not isinstance(record, ValmiFinalisedRecordMessage): record["record"]["rejected"] = False record["record"]["metric_type"] = "succeeded" - # + # if not record["record"]["rejected"]: self.store_writer.write(record) @@ -409,7 +410,11 @@ def populate_run_time_args(airbyte_command, engine, config_file_path): engine.connector_state.reset_chunk_id_from_state(loaded_state) print("num_chunks", engine.connector_state.num_chunks) with open(state_file_path, "w") as f: - f.write(json.dumps(run_time_args['state'])) + etl_mode = os.environ.get('MODE', 'any') == 'etl' + state = loaded_state + if etl_mode: + state = loaded_state["global"] + f.write(json.dumps(state)) def sync_engine_for_error(proc: subprocess, engine: NullEngine): diff --git a/src/api/services/sync_runs.py b/src/api/services/sync_runs.py index 8baad6d6..7ac1ad5c 100644 --- a/src/api/services/sync_runs.py +++ b/src/api/services/sync_runs.py @@ -94,19 +94,27 @@ def save_state(self, sync_id, run_id, connector_string, mode, state): # PER STREAM STATE message in etl sources if mode == "etl" and connector_string == "src": state_to_input: List = None + _valmi_meta = None + if "_valmi_meta" in state: + _valmi_meta = state['_valmi_meta'] + del state['_valmi_meta'] + if "state" in sync_run.extra[connector_string]: state_to_input = sync_run.extra[connector_string]["state"] + if state_to_input is None: - state_to_input = [state] + state_to_input = {} + state_to_input["global"] = [state] else: current_stream: str = state['stream']['stream_descriptor']['name'] new_state = [] - for s in state_to_input: + for s in state_to_input["global"]: print(s) if current_stream != s['stream']['stream_descriptor']['name']: new_state.append(s) new_state.append(state) - state_to_input = new_state + state_to_input["global"] = new_state + state_to_input["_valmi_meta"] = _valmi_meta else: state_to_input = state From 3bd3a8ae4cd5215cc995112475f26bcea83cb305 Mon Sep 17 00:00:00 2001 From: supradeep2819 Date: Thu, 6 Jun 2024 12:12:14 +0530 Subject: [PATCH 22/26] feat: source-postgres changes to support filtered queries --- .../connectors/source-postgres/valmi_dbt/dbt_airbyte_adapter.py | 1 + .../connectors/source-postgres/valmi_dbt/dbt_project.jinja | 1 + 2 files changed, 2 insertions(+) diff --git a/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_airbyte_adapter.py b/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_airbyte_adapter.py index c61a4447..b4d053d7 100644 --- a/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_airbyte_adapter.py +++ b/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_airbyte_adapter.py @@ -183,6 +183,7 @@ def generate_project_yml(self, logger: AirbyteLogger, config: json, catalog: Con "columns": f"[{col_arr_str}]", "id_key": catalog.streams[0].id_key, "name": self.get_table_name(catalog.streams[0].stream.name), + "query": config["query"], "previous_run_status": previous_run_status, "destination_sync_mode": catalog.streams[0].destination_sync_mode.value, } diff --git a/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_project.jinja b/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_project.jinja index 9698f9b8..7662e748 100644 --- a/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_project.jinja +++ b/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_project.jinja @@ -28,6 +28,7 @@ models: vars: source_table: "{{args['name']}}" + query: {{ args['query'] | replace('\n', ' ') }} init: "init_{{args['sync_id']}}" finalized_snapshot: "finalized_snapshot_{{args['sync_id']}}" stg_snapshot: "stg_snapshot_{{args['sync_id']}}" From 0701a1aaf71422910e88d3e5474b2a2ebd8ce623 Mon Sep 17 00:00:00 2001 From: supradeep2819 Date: Thu, 6 Jun 2024 12:23:54 +0530 Subject: [PATCH 23/26] feat: changes for source-postgres to support filter queries --- .../connectors/source-postgres/valmi_dbt/dbt_project.jinja | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_project.jinja b/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_project.jinja index 7662e748..1b3ea400 100644 --- a/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_project.jinja +++ b/valmi-integrations/connectors/source-postgres/valmi_dbt/dbt_project.jinja @@ -28,7 +28,7 @@ models: vars: source_table: "{{args['name']}}" - query: {{ args['query'] | replace('\n', ' ') }} + query: {{ args['query']}} init: "init_{{args['sync_id']}}" finalized_snapshot: "finalized_snapshot_{{args['sync_id']}}" stg_snapshot: "stg_snapshot_{{args['sync_id']}}" From 2f89aa9b301f004f0d7afa88310a3a5aca238e87 Mon Sep 17 00:00:00 2001 From: saurav-malani Date: Mon, 17 Jun 2024 19:31:18 +0530 Subject: [PATCH 24/26] minio integration: write intermediate data to minio, instead of storing locally. --- .../source_container_wrapper.py | 95 +++++++++++++++---- 1 file changed, 78 insertions(+), 17 deletions(-) diff --git a/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py b/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py index 01924dac..8659daf6 100755 --- a/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py @@ -31,9 +31,11 @@ import io from typing import Any, Dict import uuid +import gzip from pydantic import UUID4 import requests from requests.adapters import HTTPAdapter, Retry +from minio import Minio from valmi_connector_lib.common.logs import SingletonLogWriter, TimeAndChunkEndFlushPolicy from valmi_connector_lib.common.samples import SampleWriter @@ -229,24 +231,31 @@ def __init__(self, engine: NullEngine) -> None: self.engine = engine self.connector_state: ConnectorState = self.engine.connector_state store_config = json.loads(os.environ["VALMI_INTERMEDIATE_STORE"]) - if store_config["provider"] == "local": - path_name = join(store_config["local"]["directory"], self.connector_state.run_time_args["run_id"], "data") - os.makedirs(path_name, exist_ok=True) - - self.path_name = path_name - self.records = [] + self.etl_mode = os.environ.get('MODE', 'any') == 'etl' + + path_name = join(store_config["local"]["directory"], self.connector_state.run_time_args["run_id"], "data") + os.makedirs(path_name, exist_ok=True) + self.path_name = path_name + self.records = [] + # create flusher instance + if store_config["provider"] == "local": + print("storage mode: local") + self.flusher=localFlusher() + elif store_config["provider"]=="minio": + print("storage mode: cloud(minio)") + self.flusher=minioFlusher() def write(self, record, last=False): self.records.append(record) + is_state_message = record['type'] == "STATE" if not is_state_message: self.connector_state.register_record() - etl_mode = os.environ.get('MODE', 'any') == 'etl' # For ETL the chunk flush should happen only after seeing STATE message. - if (etl_mode and is_state_message) or \ - (not etl_mode and self.connector_state.records_in_chunk >= self.connector_state.run_time_args["chunk_size"]): + if (self.etl_mode and is_state_message) or \ + (not self.etl_mode and self.connector_state.records_in_chunk >= self.connector_state.run_time_args["chunk_size"]): self.flush(last=False) self.records = [] self.engine.metric(commit=True) @@ -257,15 +266,63 @@ def write(self, record, last=False): def flush(self, last=False): # list_dir = sorted([f.lower() for f in os.listdir(self.path_name)], key=lambda x: int(x[:-5])) new_file_name = f"{MAGIC_NUM}.vald" if last else f"{self.engine.connector_state.num_chunks}.vald" - with open(join(self.path_name, new_file_name), "w") as f: - for record in self.records: - f.write(json.dumps(record)) - f.write("\n") - + file_path=join(self.path_name,new_file_name) + self.flusher.flush(file_path,self.records) def finalize(self): self.flush(last=True) self.engine.metric(commit=True) +class DefaultFlusher: + def __init__(self): + pass + def flush(): + pass + +class localFlusher(DefaultFlusher): + def __init__(self): + pass + def flush(self,file_path,records): + with open(file_path, "w") as f: + for record in records: + f.write(json.dumps(record)+"\n") + +class minioFlusher(DefaultFlusher): + def __init__(self): + self.minio_host = os.getenv('MINIO_HOST','localhost:9000') + self.access_key = os.getenv('MINIO_ACCESS_KEY','YOURACCESSKEY') + self.secret_key = os.getenv('MINIO_SECRET_KEY','YOURSECRETKEY') + self.secure = os.getenv('MINIO_SECURE', 'True').lower() == 'false' + + self.minio_client = Minio(self.minio_host, + access_key=self.access_key, + secret_key=self.secret_key, + secure=self.secure) + + self.bucket_name=os.getenv('BUCKET_NAME','intermediate-store') + def flush(self,file_path,records): + with open(file_path, "w") as f: + for record in records: + f.write(json.dumps(record)+"\n") + + gzip_file_path=f"{file_path}.gz" + with open(file_path, 'rb') as f_in: + with gzip.open(gzip_file_path, 'wb') as f_out: + f_out.writelines(f_in) + os.remove(file_path) + + # Extract object name (key) from file_path + object_name = os.path.basename(gzip_file_path) + + if not self.minio_client.bucket_exists(self.bucket_name): + self.minio_client.make_bucket(self.bucket_name) + + try: + self.minio_client.fput_object(self.bucket_name, object_name, gzip_file_path) + print(f"File {object_name} uploaded successfully to bucket {self.bucket_name}") + except ResponseError as err: + print(f"Error uploading file: {err}") + + class DefaultHandler: def __init__( @@ -308,7 +365,9 @@ def handle(self, record): record['state']['_valmi_meta'] = _valmi_meta self.store_writer.write(record) + # store [STATE] log as checkpoint in Engine, to restart after last sync point. self.engine.checkpoint(record['state']) + if SingletonLogWriter.instance() is not None: SingletonLogWriter.instance().data_chunk_flush_callback() SampleWriter.data_chunk_flush_callback() @@ -336,7 +395,8 @@ def handle(self, record): sample_writer.write(record) def finalize(self): - self.store_writer.finalize() + self.store_writer.flush(last=True) + class TraceHandler(DefaultHandler): @@ -485,13 +545,13 @@ def main(): if is_state_available(): subprocess_args.append("--state") subprocess_args.append(state_file_path) + proc = subprocess.Popen( subprocess_args, stdout=subprocess.PIPE, ) # check engine errors every CHUNK_SIZE records - record_types = handlers.keys() for line in io.TextIOWrapper(proc.stdout, encoding="utf-8"): # or another encoding if line.strip() == "": @@ -518,7 +578,8 @@ def main(): sys.exit(return_code) else: if airbyte_command == "read": - store_writer.finalize() + # this triggers the flush of the last chunk of the sync. + handlers["RECORD"].finalize() engine.success() From 34233fa1d75a48b8c630e59ec5a0d39974b04edf Mon Sep 17 00:00:00 2001 From: saurav-malani Date: Mon, 17 Jun 2024 19:40:06 +0530 Subject: [PATCH 25/26] minio integration: write intermediate data to minio, instead of storing locally. --- .../source_wrapper/source_container_wrapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py b/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py index 8659daf6..2af55e44 100755 --- a/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py @@ -395,7 +395,7 @@ def handle(self, record): sample_writer.write(record) def finalize(self): - self.store_writer.flush(last=True) + self.store_writer.finalize() From 54f2ebc397a400400f16ed80a479d11e3d194c8c Mon Sep 17 00:00:00 2001 From: saurav-malani Date: Tue, 18 Jun 2024 12:14:36 +0530 Subject: [PATCH 26/26] removed gzip part --- .../source_wrapper/source_container_wrapper.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py b/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py index 2af55e44..d1688a0f 100755 --- a/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py +++ b/packages/valmi-connector-lib/valmi_connector_lib/source_wrapper/source_container_wrapper.py @@ -31,11 +31,11 @@ import io from typing import Any, Dict import uuid -import gzip from pydantic import UUID4 import requests from requests.adapters import HTTPAdapter, Retry from minio import Minio +from minio.error import S3Error from valmi_connector_lib.common.logs import SingletonLogWriter, TimeAndChunkEndFlushPolicy from valmi_connector_lib.common.samples import SampleWriter @@ -304,20 +304,13 @@ def flush(self,file_path,records): for record in records: f.write(json.dumps(record)+"\n") - gzip_file_path=f"{file_path}.gz" - with open(file_path, 'rb') as f_in: - with gzip.open(gzip_file_path, 'wb') as f_out: - f_out.writelines(f_in) - os.remove(file_path) - - # Extract object name (key) from file_path - object_name = os.path.basename(gzip_file_path) + object_name = os.path.basename(file_path) if not self.minio_client.bucket_exists(self.bucket_name): self.minio_client.make_bucket(self.bucket_name) try: - self.minio_client.fput_object(self.bucket_name, object_name, gzip_file_path) + self.minio_client.fput_object(self.bucket_name, object_name, file_path) print(f"File {object_name} uploaded successfully to bucket {self.bucket_name}") except ResponseError as err: print(f"Error uploading file: {err}") @@ -395,7 +388,7 @@ def handle(self, record): sample_writer.write(record) def finalize(self): - self.store_writer.finalize() + self.store_writer.flush(last=True)