From 1561fbd2996219c15025cdc0ef7b559202ae236e Mon Sep 17 00:00:00 2001 From: Hazem Salama Date: Mon, 23 Nov 2020 21:57:33 -0500 Subject: [PATCH 1/3] Implemented base functions in oracledb Target --- luigi/contrib/oracledb.py | 151 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 151 insertions(+) create mode 100644 luigi/contrib/oracledb.py diff --git a/luigi/contrib/oracledb.py b/luigi/contrib/oracledb.py new file mode 100644 index 0000000000..770c16c32f --- /dev/null +++ b/luigi/contrib/oracledb.py @@ -0,0 +1,151 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2012-2015 Spotify AB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This module makes use of the f string formatting syntax, which requires Python 3.6 or higher + + +import logging + +import luigi + +logger = logging.getLogger('luigi-interface') + +try: + import cx_Oracle +except ImportError: + logger.warning("Loading oracledb module without the python package cx_Oracle. \ + This will crash at runtime if Oracle Server functionality is used.") + + +class OracleTarget(luigi.Target): + """ + Target for a resource in Microsoft SQL Server. + This module is primarily derived from mysqldb.py. Much of MSSqlTarget, + MySqlTarget and PostgresTarget are similar enough to potentially add a + RDBMSTarget abstract base class to rdbms.py that these classes could be + derived from. + """ + + marker_table = luigi.configuration.get_config().get('oraclesql', 'marker_table', 'luigi_targets') + + def __init__(self, host, database, user, password, table, update_id, **cnx_kwargs): + """ + Initializes an OracleTarget instance. + + :param host: Oracle server address. Possibly a host:port string. + :type host: str + :param database: database name. + :type database: str + :param user: database user + :type user: str + :param password: password for specified user. + :type password: str + :param update_id: an identifier for this data set. + :type update_id: str + """ + if ':' in host: + self.host, self.port = host.split(':') + self.port = int(self.port) + else: + self.host = host + self.port = 1521 + self.database = database + self.user = user + self.password = password + self.table = table + self.update_id = update_id + self.cnx_kwargs = cnx_kwargs + + def touch(self, connection=None): + """ + Mark this update as complete. + + IMPORTANT, If the marker table doesn't exist, + the connection transaction will be aborted and the connection reset. + Then the marker table will be created. + """ + self.create_marker_table() + + if connection is None: + connection = self.connect() + + cursor = connection.cursor() + sql = f"MERGE INTO {self.marker_table} d " \ + f"USING (SELECT '{self.update_id}' UPDATE_ID, '{self.table}' TARGET_TABLE, SYSDATE INSERTED from dual) s " \ + f"ON (d.UPDATE_ID = s.UPDATE_ID) " \ + f"WHEN MATCHED THEN UPDATE SET d.TARGET_TABLE = s.TARGET_TABLE, d.INSERTED = s.INSERTED " \ + f"WHEN NOT MATCHED THEN INSERT (UPDATE_ID, TARGET_TABLE) VALUES (s.UPDATE_ID, s.TARGET_TABLE)" + cursor.execute(sql) + connection.commit() + + # make sure update is properly marked + assert self.exists(connection) + + def exists(self, connection=None): + if connection is None: + connection = self.connect() + try: + # TODO: User cursor as a context manager + # with connection.cursor() as cursor: + # for row in cursor.execute("select * from MyTable"): + # print(row) + cursor = connection.cursor() + cursor.execute(f"SELECT NULL FROM {self.marker_table} WHERE update_id = '{self.update_id}'") + cursor.fetchall() + row_count = cursor.rowcount + + except cx_Oracle.DatabaseError as e: + error, = e.args # Returns cx_Oracle._Error object + # Table does not exists code + if error.code == 942: + row_count = 0 + else: + raise + return row_count > 0 + + def connect(self): + """ + Create a SQL Server connection and return a connection object + """ + connection_string = f"{self.user}/{self.password}@{self.host}:{self.port}/{self.database}" + connection = cx_Oracle.connect(connection_string) + return connection + + def create_marker_table(self): + """ + Create marker table if it doesn't exist. + Use a separate connection since the transaction might have to be reset. + """ + print('WE GOT THIS FAR') + connection = self.connect() + try: + cursor = connection.cursor() + cursor.execute(f"CREATE TABLE {self.marker_table}" + f"(ID NUMBER GENERATED ALWAYS AS IDENTITY," + f"UPDATE_ID VARCHAR2(128) NOT NULL," + f"TARGET_TABLE VARCHAR2(128)," + f"INSERTED DATE DEFAULT SYSDATE NOT NULL," + f"CONSTRAINT LUIGI_UPDATE_ID_PK PRIMARY KEY (UPDATE_ID))" + ) + except cx_Oracle.DatabaseError as e: + error, = e.args # Returns cx_Oracle._Error object + # Table already exists code + if error.code == 955: + pass + else: + raise + connection.close() From 407bea7cc6bed041773365b328ccdad69784bdf3 Mon Sep 17 00:00:00 2001 From: Hazem Salama Date: Tue, 24 Nov 2020 00:06:24 -0500 Subject: [PATCH 2/3] Tidy up code --- luigi/contrib/oracledb.py | 69 ++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 37 deletions(-) diff --git a/luigi/contrib/oracledb.py b/luigi/contrib/oracledb.py index 770c16c32f..8ee7c8ac7f 100644 --- a/luigi/contrib/oracledb.py +++ b/luigi/contrib/oracledb.py @@ -17,9 +17,7 @@ # This module makes use of the f string formatting syntax, which requires Python 3.6 or higher - import logging - import luigi logger = logging.getLogger('luigi-interface') @@ -33,21 +31,22 @@ class OracleTarget(luigi.Target): """ - Target for a resource in Microsoft SQL Server. - This module is primarily derived from mysqldb.py. Much of MSSqlTarget, - MySqlTarget and PostgresTarget are similar enough to potentially add a + Target for a resource in Oracle Server. + This module is primarily derived from mysqldb.py. Much of OracleTarget, MSSqlTarget, + MySqlTarget, and PostgresTarget are similar enough to potentially add a RDBMSTarget abstract base class to rdbms.py that these classes could be derived from. """ - marker_table = luigi.configuration.get_config().get('oraclesql', 'marker_table', 'luigi_targets') - def __init__(self, host, database, user, password, table, update_id, **cnx_kwargs): + def __init__(self, host, port, database, user, password, table, update_id, **cnx_kwargs): """ Initializes an OracleTarget instance. :param host: Oracle server address. Possibly a host:port string. :type host: str + :param port: Oracle server port number + :type host: int :param database: database name. :type database: str :param user: database user @@ -62,7 +61,7 @@ def __init__(self, host, database, user, password, table, update_id, **cnx_kwarg self.port = int(self.port) else: self.host = host - self.port = 1521 + self.port = '1521' # Default Oracle Server Port self.database = database self.user = user self.password = password @@ -83,14 +82,14 @@ def touch(self, connection=None): if connection is None: connection = self.connect() - cursor = connection.cursor() - sql = f"MERGE INTO {self.marker_table} d " \ - f"USING (SELECT '{self.update_id}' UPDATE_ID, '{self.table}' TARGET_TABLE, SYSDATE INSERTED from dual) s " \ - f"ON (d.UPDATE_ID = s.UPDATE_ID) " \ - f"WHEN MATCHED THEN UPDATE SET d.TARGET_TABLE = s.TARGET_TABLE, d.INSERTED = s.INSERTED " \ - f"WHEN NOT MATCHED THEN INSERT (UPDATE_ID, TARGET_TABLE) VALUES (s.UPDATE_ID, s.TARGET_TABLE)" - cursor.execute(sql) - connection.commit() + with connection.cursor() as cursor: + sql = f"MERGE INTO {self.marker_table} d " \ + f"USING (SELECT '{self.update_id}' UPDATE_ID, '{self.table}' TARGET_TABLE, SYSDATE INSERTED from dual) s " \ + f"ON (d.UPDATE_ID = s.UPDATE_ID) " \ + f"WHEN MATCHED THEN UPDATE SET d.TARGET_TABLE = s.TARGET_TABLE, d.INSERTED = s.INSERTED " \ + f"WHEN NOT MATCHED THEN INSERT (UPDATE_ID, TARGET_TABLE) VALUES (s.UPDATE_ID, s.TARGET_TABLE)" + cursor.execute(sql) + connection.commit() # make sure update is properly marked assert self.exists(connection) @@ -99,17 +98,14 @@ def exists(self, connection=None): if connection is None: connection = self.connect() try: - # TODO: User cursor as a context manager - # with connection.cursor() as cursor: - # for row in cursor.execute("select * from MyTable"): - # print(row) - cursor = connection.cursor() - cursor.execute(f"SELECT NULL FROM {self.marker_table} WHERE update_id = '{self.update_id}'") - cursor.fetchall() - row_count = cursor.rowcount - + with connection.cursor() as cursor: + cursor.execute( + f"SELECT NULL FROM {self.marker_table} WHERE UPPER(update_id) = UPPER('{self.update_id}')") + cursor.fetchall() + row_count = cursor.rowcount + except cx_Oracle.DatabaseError as e: - error, = e.args # Returns cx_Oracle._Error object + error, = e.args # Returns cx_Oracle._Error object # Table does not exists code if error.code == 942: row_count = 0 @@ -119,7 +115,7 @@ def exists(self, connection=None): def connect(self): """ - Create a SQL Server connection and return a connection object + Create an Oracle Server connection and return a connection object """ connection_string = f"{self.user}/{self.password}@{self.host}:{self.port}/{self.database}" connection = cx_Oracle.connect(connection_string) @@ -130,17 +126,16 @@ def create_marker_table(self): Create marker table if it doesn't exist. Use a separate connection since the transaction might have to be reset. """ - print('WE GOT THIS FAR') connection = self.connect() try: - cursor = connection.cursor() - cursor.execute(f"CREATE TABLE {self.marker_table}" - f"(ID NUMBER GENERATED ALWAYS AS IDENTITY," - f"UPDATE_ID VARCHAR2(128) NOT NULL," - f"TARGET_TABLE VARCHAR2(128)," - f"INSERTED DATE DEFAULT SYSDATE NOT NULL," - f"CONSTRAINT LUIGI_UPDATE_ID_PK PRIMARY KEY (UPDATE_ID))" - ) + with connection.cursor() as cursor: + cursor.execute(f"CREATE TABLE {self.marker_table}" + f"(ID NUMBER GENERATED ALWAYS AS IDENTITY," + f"UPDATE_ID VARCHAR2(128) NOT NULL," + f"TARGET_TABLE VARCHAR2(128)," + f"INSERTED DATE DEFAULT SYSDATE NOT NULL," + f"CONSTRAINT LUIGI_UPDATE_ID_PK PRIMARY KEY (UPDATE_ID))" + ) except cx_Oracle.DatabaseError as e: error, = e.args # Returns cx_Oracle._Error object # Table already exists code @@ -148,4 +143,4 @@ def create_marker_table(self): pass else: raise - connection.close() + connection.close() \ No newline at end of file From 8adffd1a582904225e6aa91dc21f9cbb78f9e9dd Mon Sep 17 00:00:00 2001 From: Hazem Salama Date: Thu, 10 Dec 2020 09:44:57 -0500 Subject: [PATCH 3/3] Added the OracleQuery task --- luigi/contrib/oracledb.py | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/luigi/contrib/oracledb.py b/luigi/contrib/oracledb.py index 8ee7c8ac7f..ec9f551e5f 100644 --- a/luigi/contrib/oracledb.py +++ b/luigi/contrib/oracledb.py @@ -18,7 +18,9 @@ # This module makes use of the f string formatting syntax, which requires Python 3.6 or higher import logging + import luigi +from luigi.contrib import rdbms logger = logging.getLogger('luigi-interface') @@ -143,4 +145,36 @@ def create_marker_table(self): pass else: raise - connection.close() \ No newline at end of file + connection.close() + + +class OracleQuery(rdbms.Query): + """ + Template task for querying an Oracle compatible database + + Usage: + Subclass and override the required `host`, `database`, `user`, `password`, `table`, and `query` attributes. + Optionally one can override the `autocommit` attribute to put the connection for the query in autocommit mode. + + Override the `run` method if your use case requires some action with the query result. + + Task instances require a dynamic `update_id`, e.g. via parameter(s), otherwise the query will only execute once + + To customize the query signature as recorded in the database marker table, override the `update_id` property. + """ + + def output(self): + """ + Returns an OracleTarget to record execution in a marker table + + Normally you don't override this. + """ + return OracleTarget( + host=self.host, + port=self.port, + database=self.database, + user=self.user, + password=self.password, + table=self.table, + update_id=self.update_id + )