Uploaded image for project: 'Data Management'
  1. Data Management
  2. DM-19655

Proof of concept for using the Confluent JDBC connector (Oracle Sink) in the EFD

    Details

    • Type: Improvement
    • Status: Done
    • Resolution: Done
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      I'm working with Christopher Stephens to test the Confluent JDBC connector in the EFD. See https://sqr-034.lsst.io for the overall design.

      We've added the JDBC connector plugin to our kafka connect docker image and deployed at the lab in Tucson.

      We managed to open the Oracle 1521 port for this test and establish the connection from 140.252.32.142 to lsst-oradb.ncsa.illinois.edu (141.142.181.46)

      The connector was able to create the table, map the Avro schema to Oracle data types and write the messages cached on Kafka.

      We explored a few options for the connector configuration.

        Attachments

          Issue Links

            Activity

            Hide
            afausti Angelo Fausti added a comment -

            Nice some progress here, I have fixed the connection URL to

             "connection.url": "jdbc:oracle:thin:@lsst-oradb.ncsa.illinois.edu:1521/kafka_efd"
            

            and now I see:

            [2019-10-11 23:36:27,749] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter)
            [2019-10-11 23:36:28,845] INFO Checking Oracle dialect for existence of table "lsst"."sal" (io.confluent.connect.jdbc.dialect.OracleDatabaseDialect)
            [2019-10-11 23:36:29,297] INFO Using Oracle dialect table "lsst"."sal" absent (io.confluent.connect.jdbc.dialect.OracleDatabaseDialect)
            [2019-10-11 23:36:29,303] INFO Creating table with sql: CREATE TABLE "lsst"."sal" (
            "private_host" NUMBER(19,0) NOT NULL,
            "ScriptQueueID" NUMBER(19,0) NOT NULL,
            "private_origin" NUMBER(19,0) NOT NULL,
            "private_kafkaStamp" BINARY_DOUBLE NOT NULL,
            "private_sndStamp" BINARY_DOUBLE NOT NULL,
            "private_seqNum" NUMBER(19,0) NOT NULL,
            "heartbeat" NUMBER(1,0) NOT NULL,
            "private_revCode" CLOB NOT NULL,
            "priority" NUMBER(19,0) NOT NULL,
            "private_rcvStamp" BINARY_DOUBLE NOT NULL) (io.confluent.connect.jdbc.sink.DbStructure)
            [2019-10-11 23:36:29,407] WARN Create failed, will attempt amend if table already exists (io.confluent.connect.jdbc.sink.DbStructure)
            java.sql.SQLSyntaxErrorException: ORA-01918: user 'lsst' does not exist
             
            	at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:494)
            	at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:446)
            	at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1052)
            	at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:537)
            	at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:255)
            	at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:610)
            	at oracle.jdbc.driver.T4CStatement.doOall8(T4CStatement.java:213)
            	at oracle.jdbc.driver.T4CStatement.doOall8(T4CStatement.java:37)
            	at oracle.jdbc.driver.T4CStatement.executeForRows(T4CStatement.java:887)
            	at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1136)
            	at oracle.jdbc.driver.OracleStatement.executeUpdateInternal(OracleStatement.java:1678)
            	at oracle.jdbc.driver.OracleStatement.executeLargeUpdate(OracleStatement.java:1643)
            	at oracle.jdbc.driver.OracleStatement.executeUpdate(OracleStatement.java:1630)
            	at oracle.jdbc.driver.OracleStatementWrapper.executeUpdate(OracleStatementWrapper.java:282)
            	at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.applyDdlStatements(GenericDatabaseDialect.java:1064)
            	at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:92)
            	at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:60)
            	at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:84)
            	at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)
            	at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73)
            	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
            	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
            	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
            	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
            	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
            	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
            	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            	at java.lang.Thread.run(Thread.java:748)
            Caused by: Error : 1918, Position : 0, Sql = CREATE TABLE "lsst"."sal" (
            "private_host" NUMBER(19,0) NOT NULL,
            "ScriptQueueID" NUMBER(19,0) NOT NULL,
            "private_origin" NUMBER(19,0) NOT NULL,
            "private_kafkaStamp" BINARY_DOUBLE NOT NULL,
            "private_sndStamp" BINARY_DOUBLE NOT NULL,
            "private_seqNum" NUMBER(19,0) NOT NULL,
            "heartbeat" NUMBER(1,0) NOT NULL,
            "private_revCode" CLOB NOT NULL,
            "priority" NUMBER(19,0) NOT NULL,
            "private_rcvStamp" BINARY_DOUBLE NOT NULL), OriginalSql = CREATE TABLE "lsst"."sal" (
            "private_host" NUMBER(19,0) NOT NULL,
            "ScriptQueueID" NUMBER(19,0) NOT NULL,
            "private_origin" NUMBER(19,0) NOT NULL,
            "private_kafkaStamp" BINARY_DOUBLE NOT NULL,
            "private_sndStamp" BINARY_DOUBLE NOT NULL,
            "private_seqNum" NUMBER(19,0) NOT NULL,
            "heartbeat" NUMBER(1,0) NOT NULL,
            "private_revCode" CLOB NOT NULL,
            "priority" NUMBER(19,0) NOT NULL,
            "private_rcvStamp" BINARY_DOUBLE NOT NULL), Error Msg = ORA-01918: user 'lsst' does not exist
             
            	at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:498)
            	... 30 more
            [2019-10-11 23:36:30,582] INFO Refreshing metadata for table "lsst"."sal" to null (io.confluent.connect.jdbc.util.TableDefinitions)
            [2019-10-11 23:36:30,583] ERROR WorkerSinkTask{id=oracle-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
            java.lang.NullPointerException
            	at io.confluent.connect.jdbc.util.TableDefinitions.refresh(TableDefinitions.java:85)
            	at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:64)
            	at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:84)
            	at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)
            	at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73)
            	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
            	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
            	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
            	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
            	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
            	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
            	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            	at java.lang.Thread.run(Thread.java:748)
            [2019-10-11 23:36:30,583] ERROR WorkerSinkTask{id=oracle-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
            org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
            	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
            	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
            	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
            	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
            	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
            	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
            	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            	at java.lang.Thread.run(Thread.java:748)
            Caused by: java.lang.NullPointerException
            	at io.confluent.connect.jdbc.util.TableDefinitions.refresh(TableDefinitions.java:85)
            	at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:64)
            	at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:84)
            	at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)
            	at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73)
            	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
            	... 10 more
            [2019-10-11 23:36:30,584] ERROR WorkerSinkTask{id=oracle-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
            [2019-10-11 23:36:30,584] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask)
            [2019-10-11 23:36:30,584] INFO Closing connection #1 to Oracle (io.confluent.connect.jdbc.util.CachedConnectionProvider)
            
            

            Show
            afausti Angelo Fausti added a comment - Nice some progress here, I have fixed the connection URL to "connection.url": "jdbc:oracle:thin:@lsst-oradb.ncsa.illinois.edu:1521/kafka_efd" and now I see: [2019-10-11 23:36:27,749] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter) [2019-10-11 23:36:28,845] INFO Checking Oracle dialect for existence of table "lsst"."sal" (io.confluent.connect.jdbc.dialect.OracleDatabaseDialect) [2019-10-11 23:36:29,297] INFO Using Oracle dialect table "lsst"."sal" absent (io.confluent.connect.jdbc.dialect.OracleDatabaseDialect) [2019-10-11 23:36:29,303] INFO Creating table with sql: CREATE TABLE "lsst"."sal" ( "private_host" NUMBER(19,0) NOT NULL, "ScriptQueueID" NUMBER(19,0) NOT NULL, "private_origin" NUMBER(19,0) NOT NULL, "private_kafkaStamp" BINARY_DOUBLE NOT NULL, "private_sndStamp" BINARY_DOUBLE NOT NULL, "private_seqNum" NUMBER(19,0) NOT NULL, "heartbeat" NUMBER(1,0) NOT NULL, "private_revCode" CLOB NOT NULL, "priority" NUMBER(19,0) NOT NULL, "private_rcvStamp" BINARY_DOUBLE NOT NULL) (io.confluent.connect.jdbc.sink.DbStructure) [2019-10-11 23:36:29,407] WARN Create failed, will attempt amend if table already exists (io.confluent.connect.jdbc.sink.DbStructure) java.sql.SQLSyntaxErrorException: ORA-01918: user 'lsst' does not exist   at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:494) at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:446) at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1052) at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:537) at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:255) at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:610) at oracle.jdbc.driver.T4CStatement.doOall8(T4CStatement.java:213) at oracle.jdbc.driver.T4CStatement.doOall8(T4CStatement.java:37) at oracle.jdbc.driver.T4CStatement.executeForRows(T4CStatement.java:887) at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1136) at oracle.jdbc.driver.OracleStatement.executeUpdateInternal(OracleStatement.java:1678) at oracle.jdbc.driver.OracleStatement.executeLargeUpdate(OracleStatement.java:1643) at oracle.jdbc.driver.OracleStatement.executeUpdate(OracleStatement.java:1630) at oracle.jdbc.driver.OracleStatementWrapper.executeUpdate(OracleStatementWrapper.java:282) at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.applyDdlStatements(GenericDatabaseDialect.java:1064) at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:92) at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:60) at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:84) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: Error : 1918, Position : 0, Sql = CREATE TABLE "lsst"."sal" ( "private_host" NUMBER(19,0) NOT NULL, "ScriptQueueID" NUMBER(19,0) NOT NULL, "private_origin" NUMBER(19,0) NOT NULL, "private_kafkaStamp" BINARY_DOUBLE NOT NULL, "private_sndStamp" BINARY_DOUBLE NOT NULL, "private_seqNum" NUMBER(19,0) NOT NULL, "heartbeat" NUMBER(1,0) NOT NULL, "private_revCode" CLOB NOT NULL, "priority" NUMBER(19,0) NOT NULL, "private_rcvStamp" BINARY_DOUBLE NOT NULL), OriginalSql = CREATE TABLE "lsst"."sal" ( "private_host" NUMBER(19,0) NOT NULL, "ScriptQueueID" NUMBER(19,0) NOT NULL, "private_origin" NUMBER(19,0) NOT NULL, "private_kafkaStamp" BINARY_DOUBLE NOT NULL, "private_sndStamp" BINARY_DOUBLE NOT NULL, "private_seqNum" NUMBER(19,0) NOT NULL, "heartbeat" NUMBER(1,0) NOT NULL, "private_revCode" CLOB NOT NULL, "priority" NUMBER(19,0) NOT NULL, "private_rcvStamp" BINARY_DOUBLE NOT NULL), Error Msg = ORA-01918: user 'lsst' does not exist   at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:498) ... 30 more [2019-10-11 23:36:30,582] INFO Refreshing metadata for table "lsst"."sal" to null (io.confluent.connect.jdbc.util.TableDefinitions) [2019-10-11 23:36:30,583] ERROR WorkerSinkTask{id=oracle-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask) java.lang.NullPointerException at io.confluent.connect.jdbc.util.TableDefinitions.refresh(TableDefinitions.java:85) at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:64) at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:84) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2019-10-11 23:36:30,583] ERROR WorkerSinkTask{id=oracle-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at io.confluent.connect.jdbc.util.TableDefinitions.refresh(TableDefinitions.java:85) at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:64) at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:84) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565) ... 10 more [2019-10-11 23:36:30,584] ERROR WorkerSinkTask{id=oracle-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [2019-10-11 23:36:30,584] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask) [2019-10-11 23:36:30,584] INFO Closing connection #1 to Oracle (io.confluent.connect.jdbc.util.CachedConnectionProvider)
            Hide
            afausti Angelo Fausti added a comment - - edited

            Formatting the table name differently in the connector configuration made the trick ("." is not allowed)

            "table.name.format":"logevent_heartbeat"
            

            The table was successfully created in the Oracle database:

            [2019-10-11 23:41:06,885] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter)
            [2019-10-11 23:41:08,028] INFO Checking Oracle dialect for existence of table "logevent_heartbeat" (io.confluent.connect.jdbc.dialect.OracleDatabaseDialect)
            [2019-10-11 23:41:08,242] INFO Using Oracle dialect table "logevent_heartbeat" absent (io.confluent.connect.jdbc.dialect.OracleDatabaseDialect)
            [2019-10-11 23:41:08,242] INFO Creating table with sql: CREATE TABLE "logevent_heartbeat" (
            "private_host" NUMBER(19,0) NOT NULL,
            "ScriptQueueID" NUMBER(19,0) NOT NULL,
            "private_origin" NUMBER(19,0) NOT NULL,
            "private_kafkaStamp" BINARY_DOUBLE NOT NULL,
            "private_sndStamp" BINARY_DOUBLE NOT NULL,
            "private_seqNum" NUMBER(19,0) NOT NULL,
            "heartbeat" NUMBER(1,0) NOT NULL,
            "private_revCode" CLOB NOT NULL,
            "priority" NUMBER(19,0) NOT NULL,
            "private_rcvStamp" BINARY_DOUBLE NOT NULL) (io.confluent.connect.jdbc.sink.DbStructure)
            [2019-10-11 23:41:08,403] INFO Checking Oracle dialect for existence of table "logevent_heartbeat" (io.confluent.connect.jdbc.dialect.OracleDatabaseDialect)
            [2019-10-11 23:41:08,493] INFO Using Oracle dialect table "logevent_heartbeat" present (io.confluent.connect.jdbc.dialect.OracleDatabaseDialect)
            [2019-10-11 23:41:09,483] INFO Setting metadata for table "logevent_heartbeat" to Table{name='"logevent_heartbeat"', columns=[Column{'private_sndStamp', isPrimaryKey=false, allowsNull=false, sqlType=BINARY_DOUBLE}, Column{'heartbeat', isPrimaryKey=false, allowsNull=false, sqlType=NUMBER}, Column{'private_host', isPrimaryKey=false, allowsNull=false, sqlType=NUMBER}, Column{'ScriptQueueID', isPrimaryKey=false, allowsNull=false, sqlType=NUMBER}, Column{'private_kafkaStamp', isPrimaryKey=false, allowsNull=false, sqlType=BINARY_DOUBLE}, Column{'private_revCode', isPrimaryKey=false, allowsNull=false, sqlType=CLOB}, Column{'private_origin', isPrimaryKey=false, allowsNull=false, sqlType=NUMBER}, Column{'private_rcvStamp', isPrimaryKey=false, allowsNull=false, sqlType=BINARY_DOUBLE}, Column{'private_seqNum', isPrimaryKey=false, allowsNull=false, sqlType=NUMBER}, Column{'priority', isPrimaryKey=false, allowsNull=false, sqlType=NUMBER}]} (io.confluent.connect.jdbc.util.TableDefinitions)
            

            $ ./sqlplus kafka_efd/<passwd>@lsst-oradb.ncsa.illinois.edu:1521/kafka_efd
             
            SQL*Plus: Release 18.0.0.0.0 Production on Fri Oct 11 16:43:57 2019
            Version 18.1.0.0.0
             
            Copyright (c) 1982, 2018, Oracle.  All rights reserved.
             
            Last Successful login time: Fri Oct 11 2019 16:41:07 -07:00
             
            Connected to:
            Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production
            Version 19.4.0.0.0
             
            SQL> SELECT table_name FROM user_tables;
             
            TABLE_NAME
            --------------------------------------------------------------------------------
            logevent_heartbeat
             
            SQL>
            

            Show
            afausti Angelo Fausti added a comment - - edited Formatting the table name differently in the connector configuration made the trick ("." is not allowed) "table.name.format":"logevent_heartbeat" The table was successfully created in the Oracle database: [2019-10-11 23:41:06,885] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter) [2019-10-11 23:41:08,028] INFO Checking Oracle dialect for existence of table "logevent_heartbeat" (io.confluent.connect.jdbc.dialect.OracleDatabaseDialect) [2019-10-11 23:41:08,242] INFO Using Oracle dialect table "logevent_heartbeat" absent (io.confluent.connect.jdbc.dialect.OracleDatabaseDialect) [2019-10-11 23:41:08,242] INFO Creating table with sql: CREATE TABLE "logevent_heartbeat" ( "private_host" NUMBER(19,0) NOT NULL, "ScriptQueueID" NUMBER(19,0) NOT NULL, "private_origin" NUMBER(19,0) NOT NULL, "private_kafkaStamp" BINARY_DOUBLE NOT NULL, "private_sndStamp" BINARY_DOUBLE NOT NULL, "private_seqNum" NUMBER(19,0) NOT NULL, "heartbeat" NUMBER(1,0) NOT NULL, "private_revCode" CLOB NOT NULL, "priority" NUMBER(19,0) NOT NULL, "private_rcvStamp" BINARY_DOUBLE NOT NULL) (io.confluent.connect.jdbc.sink.DbStructure) [2019-10-11 23:41:08,403] INFO Checking Oracle dialect for existence of table "logevent_heartbeat" (io.confluent.connect.jdbc.dialect.OracleDatabaseDialect) [2019-10-11 23:41:08,493] INFO Using Oracle dialect table "logevent_heartbeat" present (io.confluent.connect.jdbc.dialect.OracleDatabaseDialect) [2019-10-11 23:41:09,483] INFO Setting metadata for table "logevent_heartbeat" to Table{name='"logevent_heartbeat"', columns=[Column{'private_sndStamp', isPrimaryKey=false, allowsNull=false, sqlType=BINARY_DOUBLE}, Column{'heartbeat', isPrimaryKey=false, allowsNull=false, sqlType=NUMBER}, Column{'private_host', isPrimaryKey=false, allowsNull=false, sqlType=NUMBER}, Column{'ScriptQueueID', isPrimaryKey=false, allowsNull=false, sqlType=NUMBER}, Column{'private_kafkaStamp', isPrimaryKey=false, allowsNull=false, sqlType=BINARY_DOUBLE}, Column{'private_revCode', isPrimaryKey=false, allowsNull=false, sqlType=CLOB}, Column{'private_origin', isPrimaryKey=false, allowsNull=false, sqlType=NUMBER}, Column{'private_rcvStamp', isPrimaryKey=false, allowsNull=false, sqlType=BINARY_DOUBLE}, Column{'private_seqNum', isPrimaryKey=false, allowsNull=false, sqlType=NUMBER}, Column{'priority', isPrimaryKey=false, allowsNull=false, sqlType=NUMBER}]} (io.confluent.connect.jdbc.util.TableDefinitions) $ ./sqlplus kafka_efd/<passwd>@lsst-oradb.ncsa.illinois.edu:1521/kafka_efd   SQL*Plus: Release 18.0.0.0.0 Production on Fri Oct 11 16:43:57 2019 Version 18.1.0.0.0   Copyright (c) 1982, 2018, Oracle. All rights reserved.   Last Successful login time: Fri Oct 11 2019 16:41:07 -07:00   Connected to: Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production Version 19.4.0.0.0   SQL> SELECT table_name FROM user_tables;   TABLE_NAME -------------------------------------------------------------------------------- logevent_heartbeat   SQL>
            Hide
            afausti Angelo Fausti added a comment - - edited

            The data type mapping from Avro to Oracle follows:

            https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/index.html#auto-creation-and-auto-evoluton

            Note that all columns are NOT NULL by default.

            The connector configuration uses "pk.mode": "none" by default, no primary key is created.

            We can use "pk.mode"="record_key" and "pk.fields"="private_sndStamp, private_rcvStamp " for example see:

            https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/sink_config_options.html#sink-pk-config-options

            Note that timestamps are not mapped to Oracle TIMESTAMP data type, but to BINARY_DOUBLE instead because SAL convention is to represent timestamps as double-precision Unix timestamps. We should investigate performance when joining large tables using these columns. We expect joins are based on timestamps see https://sqr-034.lsst.io

            We should try to format table names uppercase (or case insensitive) to avoid the double quoting when querying.

            SQL> SQL> DESCRIBE "logevent_heartbeat";
             Name                       Null?    Type
             ----------------------------------------- -------- ----------------------------
             private_host                   NOT NULL NUMBER(19)
             ScriptQueueID                   NOT NULL NUMBER(19)
             private_origin                NOT NULL NUMBER(19)
             private_kafkaStamp               NOT NULL BINARY_DOUBLE
             private_sndStamp               NOT NULL BINARY_DOUBLE
             private_seqNum                NOT NULL NUMBER(19)
             heartbeat                   NOT NULL NUMBER(1)
             private_revCode               NOT NULL CLOB
             priority                   NOT NULL NUMBER(19)
             private_rcvStamp               NOT NULL BINARY_DOUBLE
            

            SQL> SELECT * FROM "logevent_heartbeat" WHERE ROWNUM = 1;
             
            private_host ScriptQueueID private_origin private_kafkaStamp private_sndStamp
            ------------ ------------- -------------- ------------------ ----------------
            private_seqNum	heartbeat
            -------------- ----------
            private_revCode
            --------------------------------------------------------------------------------
              priority private_rcvStamp
            ---------- ----------------
              1793986198		 1	    32536	  1.571E+009	   1.571E+009
            	153790		0
            4fcf8575
            	 0	 1.571E+009
            

            In this test he connector was able to get all the messages cached on Kafka and write to Oracle.

            SQL> select COUNT(*) FROM "logevent_heartbeat";
             
              COUNT(*)
            ----------
                 23669
            

            Show
            afausti Angelo Fausti added a comment - - edited The data type mapping from Avro to Oracle follows: https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/index.html#auto-creation-and-auto-evoluton Note that all columns are NOT NULL by default. The connector configuration uses "pk.mode": "none" by default, no primary key is created. We can use "pk.mode"="record_key" and "pk.fields"="private_sndStamp, private_rcvStamp " for example see: https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/sink_config_options.html#sink-pk-config-options Note that timestamps are not mapped to Oracle TIMESTAMP data type, but to BINARY_DOUBLE instead because SAL convention is to represent timestamps as double-precision Unix timestamps. We should investigate performance when joining large tables using these columns. We expect joins are based on timestamps see https://sqr-034.lsst.io We should try to format table names uppercase (or case insensitive) to avoid the double quoting when querying. SQL> SQL> DESCRIBE "logevent_heartbeat"; Name Null? Type ----------------------------------------- -------- ---------------------------- private_host NOT NULL NUMBER(19) ScriptQueueID NOT NULL NUMBER(19) private_origin NOT NULL NUMBER(19) private_kafkaStamp NOT NULL BINARY_DOUBLE private_sndStamp NOT NULL BINARY_DOUBLE private_seqNum NOT NULL NUMBER(19) heartbeat NOT NULL NUMBER(1) private_revCode NOT NULL CLOB priority NOT NULL NUMBER(19) private_rcvStamp NOT NULL BINARY_DOUBLE SQL> SELECT * FROM "logevent_heartbeat" WHERE ROWNUM = 1;   private_host ScriptQueueID private_origin private_kafkaStamp private_sndStamp ------------ ------------- -------------- ------------------ ---------------- private_seqNum heartbeat -------------- ---------- private_revCode -------------------------------------------------------------------------------- priority private_rcvStamp ---------- ---------------- 1793986198 1 32536 1.571E+009 1.571E+009 153790 0 4fcf8575 0 1.571E+009 In this test he connector was able to get all the messages cached on Kafka and write to Oracle. SQL> select COUNT(*) FROM "logevent_heartbeat";   COUNT(*) ---------- 23669
            Hide
            afausti Angelo Fausti added a comment - - edited

            Hi Christopher Stephens I've put more information on this ticket and put you as reviewer. I am pretty happy with this POC for the Oracle Sink connector, let me know if there's any other aspect we should test before closing it.

            Show
            afausti Angelo Fausti added a comment - - edited Hi Christopher Stephens I've put more information on this ticket and put you as reviewer. I am pretty happy with this POC for the Oracle Sink connector, let me know if there's any other aspect we should test before closing it.
            Hide
            cs2018 Christopher Stephens added a comment -

             I think we've accomplished the intent of this task. Any additional work should go under a separate task/story.

             

             

            Show
            cs2018 Christopher Stephens added a comment -  I think we've accomplished the intent of this task. Any additional work should go under a separate task/story.    

              People

              • Assignee:
                afausti Angelo Fausti
                Reporter:
                afausti Angelo Fausti
                Reviewers:
                Christopher Stephens
                Watchers:
                Angelo Fausti, Christopher Pond, Christopher Stephens, Michelle Butler
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Summary Panel