Skip to content

Commit

Permalink
Merge branch '1.9.latest' into behavior_for_external_path
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db authored Oct 11, 2024
2 parents 887bf96 + 0e821b0 commit 7c8d5b7
Show file tree
Hide file tree
Showing 10 changed files with 737 additions and 18 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
- Add "use_info_schema_for_columns" behavior flag to turn on use of information_schema to get column info where possible. This may have more latency but will not truncate complex data types the way that 'describe' can. ([808](https://github.com/databricks/dbt-databricks/pull/808))
- Add support for table_format: iceberg. This uses UniForm under the hood to provide iceberg compatibility for tables or incrementals. ([815](https://github.com/databricks/dbt-databricks/pull/815))
- Add `include_full_name_in_path` config boolean for external locations. This writes tables to {location_root}/{catalog}/{schema}/{table} ([823](https://github.com/databricks/dbt-databricks/pull/823))
- Add a new `workflow_job` submission method for python, which creates a long-lived Databricks Workflow instead of a one-time run (thanks @kdazzle!) ([762](https://github.com/databricks/dbt-databricks/pull/762))
- Allow for additional options to be passed to the Databricks Job API when using other python submission methods. For example, enable email_notifications (thanks @kdazzle!) ([762](https://github.com/databricks/dbt-databricks/pull/762))

### Under the Hood

- Fix places where we were not properly closing cursors, and other test warnings ([713](https://github.com/databricks/dbt-databricks/pull/713))
- Upgrade databricks-sql-connector dependency to 3.4.0 ([790](https://github.com/databricks/dbt-databricks/pull/790))

## dbt-databricks 1.8.7 (TBD)
## dbt-databricks 1.8.7 (October 10, 2024)

### Features

Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/databricks/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version: str = "1.8.6"
version: str = "1.8.7"
135 changes: 123 additions & 12 deletions dbt/adapters/databricks/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from abc import ABC
from abc import abstractmethod
from dataclasses import dataclass
import re
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Set

Expand Down Expand Up @@ -41,6 +43,11 @@ def post(
) -> Response:
return self.session.post(f"{self.prefix}{suffix}", json=json, params=params)

def put(
self, suffix: str = "", json: Optional[Any] = None, params: Optional[Dict[str, Any]] = None
) -> Response:
return self.session.put(f"{self.prefix}{suffix}", json=json, params=params)


class DatabricksApi(ABC):
def __init__(self, session: Session, host: str, api: str):
Expand Down Expand Up @@ -142,20 +149,38 @@ def get_folder(self, _: str, schema: str) -> str:
return f"/Shared/dbt_python_models/{schema}/"


# Switch to this as part of 2.0.0 release
class UserFolderApi(DatabricksApi, FolderApi):
class CurrUserApi(DatabricksApi):

def __init__(self, session: Session, host: str):
super().__init__(session, host, "/api/2.0/preview/scim/v2")
self._user = ""

def get_folder(self, catalog: str, schema: str) -> str:
if not self._user:
response = self.session.get("/Me")
def get_username(self) -> str:
if self._user:
return self._user

if response.status_code != 200:
raise DbtRuntimeError(f"Error getting user folder.\n {response.content!r}")
self._user = response.json()["userName"]
folder = f"/Users/{self._user}/dbt_python_models/{catalog}/{schema}/"
response = self.session.get("/Me")
if response.status_code != 200:
raise DbtRuntimeError(f"Error getting current user.\n {response.content!r}")

username = response.json()["userName"]
self._user = username
return username

def is_service_principal(self, username: str) -> bool:
uuid_pattern = r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
return bool(re.match(uuid_pattern, username, re.IGNORECASE))


# Switch to this as part of 2.0.0 release
class UserFolderApi(DatabricksApi, FolderApi):
def __init__(self, session: Session, host: str, user_api: CurrUserApi):
super().__init__(session, host, "/api/2.0/preview/scim/v2")
self.user_api = user_api

def get_folder(self, catalog: str, schema: str) -> str:
username = self.user_api.get_username()
folder = f"/Users/{username}/dbt_python_models/{catalog}/{schema}/"
logger.debug(f"Using python model folder '{folder}'")

return folder
Expand Down Expand Up @@ -302,9 +327,11 @@ class JobRunsApi(PollableApi):
def __init__(self, session: Session, host: str, polling_interval: int, timeout: int):
super().__init__(session, host, "/api/2.1/jobs/runs", polling_interval, timeout)

def submit(self, run_name: str, job_spec: Dict[str, Any]) -> str:
def submit(
self, run_name: str, job_spec: Dict[str, Any], **additional_job_settings: Dict[str, Any]
) -> str:
submit_response = self.session.post(
"/submit", json={"run_name": run_name, "tasks": [job_spec]}
"/submit", json={"run_name": run_name, "tasks": [job_spec], **additional_job_settings}
)
if submit_response.status_code != 200:
raise DbtRuntimeError(f"Error creating python run.\n {submit_response.content!r}")
Expand Down Expand Up @@ -357,6 +384,87 @@ def cancel(self, run_id: str) -> None:
raise DbtRuntimeError(f"Cancel run {run_id} failed.\n {response.content!r}")


class JobPermissionsApi(DatabricksApi):
def __init__(self, session: Session, host: str):
super().__init__(session, host, "/api/2.0/permissions/jobs")

def put(self, job_id: str, access_control_list: List[Dict[str, Any]]) -> None:
request_body = {"access_control_list": access_control_list}

response = self.session.put(f"/{job_id}", json=request_body)
logger.debug(f"Workflow permissions update response={response.json()}")

if response.status_code != 200:
raise DbtRuntimeError(f"Error updating Databricks workflow.\n {response.content!r}")

def get(self, job_id: str) -> Dict[str, Any]:
response = self.session.get(f"/{job_id}")

if response.status_code != 200:
raise DbtRuntimeError(
f"Error fetching Databricks workflow permissions.\n {response.content!r}"
)

return response.json()


class WorkflowJobApi(DatabricksApi):

def __init__(self, session: Session, host: str):
super().__init__(session, host, "/api/2.1/jobs")

def search_by_name(self, job_name: str) -> List[Dict[str, Any]]:
response = self.session.get("/list", json={"name": job_name})

if response.status_code != 200:
raise DbtRuntimeError(f"Error fetching job by name.\n {response.content!r}")

return response.json().get("jobs", [])

def create(self, job_spec: Dict[str, Any]) -> str:
"""
:return: the job_id
"""
response = self.session.post("/create", json=job_spec)

if response.status_code != 200:
raise DbtRuntimeError(f"Error creating Workflow.\n {response.content!r}")

job_id = response.json()["job_id"]
logger.info(f"New workflow created with job id {job_id}")
return job_id

def update_job_settings(self, job_id: str, job_spec: Dict[str, Any]) -> None:
request_body = {
"job_id": job_id,
"new_settings": job_spec,
}
logger.debug(f"Job settings: {request_body}")
response = self.session.post("/reset", json=request_body)

if response.status_code != 200:
raise DbtRuntimeError(f"Error updating Workflow.\n {response.content!r}")

logger.debug(f"Workflow update response={response.json()}")

def run(self, job_id: str, enable_queueing: bool = True) -> str:
request_body = {
"job_id": job_id,
"queue": {
"enabled": enable_queueing,
},
}
response = self.session.post("/run-now", json=request_body)

if response.status_code != 200:
raise DbtRuntimeError(f"Error triggering run for workflow.\n {response.content!r}")

response_json = response.json()
logger.info(f"Workflow trigger response={response_json}")

return response_json["run_id"]


class DatabricksApiClient:
def __init__(
self,
Expand All @@ -368,13 +476,16 @@ def __init__(
):
self.clusters = ClusterApi(session, host)
self.command_contexts = CommandContextApi(session, host, self.clusters)
self.curr_user = CurrUserApi(session, host)
if use_user_folder:
self.folders: FolderApi = UserFolderApi(session, host)
self.folders: FolderApi = UserFolderApi(session, host, self.curr_user)
else:
self.folders = SharedFolderApi()
self.workspace = WorkspaceApi(session, host, self.folders)
self.commands = CommandApi(session, host, polling_interval, timeout)
self.job_runs = JobRunsApi(session, host, polling_interval, timeout)
self.workflows = WorkflowJobApi(session, host)
self.workflow_permissions = JobPermissionsApi(session, host)

@staticmethod
def create(
Expand Down
4 changes: 4 additions & 0 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
from dbt.adapters.databricks.python_models.python_submissions import (
ServerlessClusterPythonJobHelper,
)
from dbt.adapters.databricks.python_models.python_submissions import (
WorkflowPythonJobHelper,
)
from dbt.adapters.databricks.relation import DatabricksRelation
from dbt.adapters.databricks.relation import DatabricksRelationType
from dbt.adapters.databricks.relation import KEY_TABLE_PROVIDER
Expand Down Expand Up @@ -655,6 +658,7 @@ def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]:
"job_cluster": JobClusterPythonJobHelper,
"all_purpose_cluster": AllPurposeClusterPythonJobHelper,
"serverless_cluster": ServerlessClusterPythonJobHelper,
"workflow_job": WorkflowPythonJobHelper,
}

@available
Expand Down
Loading

0 comments on commit 7c8d5b7

Please sign in to comment.