Uploaded image for project: 'Data Management'
  1. Data Management
  2. DM-16492

Configure Kafka InfluxDB Sink Connector

    Details

    • Type: Story
    • Status: Done
    • Resolution: Done
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      The Kafka Confluent Platform deployment in efd-kafka-demo includes a confluent-kafka-cp-kafka-connect pod but no connectors are configured.

      The idea is to use the Kafka Connect REST API to create and configure the InfluxDB Sink Connector which will let us to consume specific kafka topics and save them InfluxDB.

      This ticket includes a new deployment of efd-kafka-demo and ick-deployment in the same cluster.

        Attachments

          Issue Links

            Activity

            Hide
            afausti Angelo Fausti added a comment - - edited

            The information here was not preserved in any repo because it didn't work, but it is worth to capture:

            Dockerfile to install the confluentinc/kafka-connect-influxdb plugin.

             
            # Builds a docker image for Kafka Connect with InfluxDB Sink Connector
             
            FROM confluentinc/cp-kafka-connect-base
             
            MAINTAINER afausti@lsst.org
             
            ENV COMPONENT=kafka-connect
             
            RUN echo "===> Installing InfluxDB Sink Connector ..."
            RUN confluent-hub install confluentinc/kafka-connect-influxdb:latest --no-prompt
            
            

            Add this to the cp-kafka-connect helm chart:

            cp-kafka-connect:
              enabled: true
              image: lsstsqre/cp-kafka-connect
              imageTag: latest
             
              pluginPath: "/usr/share/java,/usr/share/confluent-hub-components"
             
              resources: {}
            
            

            After installing the the cp-kafka-connect helm chart, you should be able to configure the connector:

            echo "Port forwarding Kafka Connect to http://localhost:8083"
             
            POD_NAME=$(kubectl get pods --namespace default -l "app=cp-kafka-connect,release=confluent-kafka" -o jsonpath="{.items[0].metadata.name}")
            kubectl --namespace default port-forward $POD_NAME 8083
            
            

            and

            echo "Configure InfluxDBSinkConnector"
             
            curl -s -X POST -H 'Content-Type: application/json' --data @connector-create.json http://localhost:8083/connectors
            
            

            where connector-create.json is:

             
            {
              "name" : "InfluxDBSinkConnector1",
              "config" : {
                "connector.class" : "io.confluent.influxdb.InfluxDBSinkConnector",
                "tasks.max" : "1",
                "topics" : "helloavro",
                "influxdb.url" : "http://influxdb-influxdb.default:8086"
              }
            }
            

            a database named helloavro is created in the influxdb instance. But see the error

            [2018-11-14 21:30:35,089] ERROR WorkerSinkTask{id=InfluxDBSinkConnector1-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
            org.apache.kafka.connect.errors.DataException: measurement is not a valid field name
            

            in the logs from the kafka-connect pod (attached).

            Show
            afausti Angelo Fausti added a comment - - edited The information here was not preserved in any repo because it didn't work, but it is worth to capture: Dockerfile to install the confluentinc/kafka-connect-influxdb plugin.   # Builds a docker image for Kafka Connect with InfluxDB Sink Connector   FROM confluentinc/cp-kafka-connect-base   MAINTAINER afausti@lsst.org   ENV COMPONENT=kafka-connect   RUN echo "===> Installing InfluxDB Sink Connector ..." RUN confluent-hub install confluentinc/kafka-connect-influxdb:latest --no-prompt Add this to the cp-kafka-connect helm chart: cp-kafka-connect: enabled: true image: lsstsqre/cp-kafka-connect imageTag: latest pluginPath: "/usr/share/java,/usr/share/confluent-hub-components"   resources: {} After installing the the cp-kafka-connect helm chart, you should be able to configure the connector: echo "Port forwarding Kafka Connect to http://localhost:8083"   POD_NAME=$(kubectl get pods --namespace default -l "app=cp-kafka-connect,release=confluent-kafka" -o jsonpath="{.items[0].metadata.name}") kubectl --namespace default port-forward $POD_NAME 8083 and echo "Configure InfluxDBSinkConnector"   curl -s -X POST -H 'Content-Type: application/json' --data @connector-create.json http://localhost:8083/connectors where connector-create.json is:   { "name" : "InfluxDBSinkConnector1", "config" : { "connector.class" : "io.confluent.influxdb.InfluxDBSinkConnector", "tasks.max" : "1", "topics" : "helloavro", "influxdb.url" : "http://influxdb-influxdb.default:8086" } } a database named helloavro is created in the influxdb instance. But see the error [2018-11-14 21:30:35,089] ERROR WorkerSinkTask{id=InfluxDBSinkConnector1-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask) org.apache.kafka.connect.errors.DataException: measurement is not a valid field name in the logs from the kafka-connect pod (attached).
            Hide
            afausti Angelo Fausti added a comment -

            The Landoop InfluxDB Sink Connectar was added to the `cp-kafka-connect` image and an example configuration was provided to demonstrate it using the Hello world for Avro demo.

            https://github.com/lsst-sqre/kafka-efd-demo/pull/9

            Show
            afausti Angelo Fausti added a comment - The Landoop InfluxDB Sink Connectar was added to the `cp-kafka-connect` image and an example configuration was provided to demonstrate it using the Hello world for Avro demo. https://github.com/lsst-sqre/kafka-efd-demo/pull/9
            Hide
            afausti Angelo Fausti added a comment -

            Instructions on how to deploy InfluxDB+Chronograf+Kapacitor are kept in another repository and were updated to reflect the fact that the Kafka deployment is done in the default namespace.

            https://github.com/lsst-sqre/ick-deployment/pull/6

            Show
            afausti Angelo Fausti added a comment - Instructions on how to deploy InfluxDB+Chronograf+Kapacitor are kept in another repository and were updated to reflect the fact that the Kafka deployment is done in the default namespace. https://github.com/lsst-sqre/ick-deployment/pull/6
            Hide
            afausti Angelo Fausti added a comment -

            Would you like to review the PR that adds the Landoop InfluxDB Sink Connector to the Kafka deployment?
            https://github.com/lsst-sqre/kafka-efd-demo/pull/9

            Show
            afausti Angelo Fausti added a comment - Would you like to review the PR that adds the Landoop InfluxDB Sink Connector to the Kafka deployment? https://github.com/lsst-sqre/kafka-efd-demo/pull/9
            Hide
            jsick Jonathan Sick added a comment -

            Terrific! Thoughts on PR, but no changes.

            Show
            jsick Jonathan Sick added a comment - Terrific! Thoughts on PR, but no changes.
            Hide
            afausti Angelo Fausti added a comment -

            Thanks for reviewing! more info in the PR.

            Show
            afausti Angelo Fausti added a comment - Thanks for reviewing! more info in the PR.

              People

              • Assignee:
                afausti Angelo Fausti
                Reporter:
                afausti Angelo Fausti
                Reviewers:
                Jonathan Sick
                Watchers:
                Angelo Fausti, Jonathan Sick
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Summary Panel