Uploaded image for project: 'Data Management'
  1. Data Management
  2. DM-23973

Deploy the EFD replicator on the LSP integration cluster

    Details

    • Type: Story
    • Status: Done
    • Resolution: Done
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      We'll deploy and test the replicator connector at NCSA. It is configured to read from the Tucson test stand cluster (source) and write to the LSP integration cluster (destination). Both Kafka topics and schemas are replicated.

      That's also an opportunity to test the kafka-connect-manager support to the Replicator connector and to add the deployment configuration to Argo CD.

      At the moment, the Summit EFD is not operational due to the Observatory shutdown. When it is back we'll configure the connector on the LSP integration cluster to replicate data from the Summit EFD and, once we verify that, we'll proceed with the deployment of the replicator connector on stable (DM-23974).

        Attachments

          Issue Links

            Activity

            Hide
            afausti Angelo Fausti added a comment - - edited

            The Replicator connector was successfully deployed on int, here's the configuration produced by the kafka-connect-manager

            {
                "config": {
                    "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
                    "dest.kafka.bootstrap.servers": "cp-helm-charts-cp-kafka-headless.cp-helm-charts:9092
            ",
                    "header.coverter": "io.confluent.connect.replicator.util.ByteArrayConverter",
                    "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
                    "name": "replicator",
                    "schema.registry.topic": "_schemas",
                    "schema.registry.url": "https://lsst-schema-registry-int-efd.ncsa.illinois.edu/",
                    "schema.subject.translator.class": "io.confluent.connect.replicator.schemas.DefaultSu
            bjectTranslator",
                    "src.kafka.bootstrap.servers": "kafka-0-tucson-teststand-efd.lsst.codes:31090",
                    "tasks.max": "1",
                    "topic.poll.interval.ms": "300000",
                    "topic.regex": "lsst.sal.*",
                    "topic.rename.format": "tts.${topic}",
                    "topic.whitelist": "_schemas",
                    "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter"
                },
                "name": "replicator",
                "tasks": [],
                "type": "source"
            }
            

            Show
            afausti Angelo Fausti added a comment - - edited The Replicator connector was successfully deployed on int, here's the configuration produced by the kafka-connect-manager { "config": { "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector", "dest.kafka.bootstrap.servers": "cp-helm-charts-cp-kafka-headless.cp-helm-charts:9092 ", "header.coverter": "io.confluent.connect.replicator.util.ByteArrayConverter", "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter", "name": "replicator", "schema.registry.topic": "_schemas", "schema.registry.url": "https://lsst-schema-registry-int-efd.ncsa.illinois.edu/", "schema.subject.translator.class": "io.confluent.connect.replicator.schemas.DefaultSu bjectTranslator", "src.kafka.bootstrap.servers": "kafka-0-tucson-teststand-efd.lsst.codes:31090", "tasks.max": "1", "topic.poll.interval.ms": "300000", "topic.regex": "lsst.sal.*", "topic.rename.format": "tts.${topic}", "topic.whitelist": "_schemas", "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter" }, "name": "replicator", "tasks": [], "type": "source" }
            Hide
            afausti Angelo Fausti added a comment - - edited

            It is configured to replicate topics and schemas from the Tucson Test stand EFD. In the first attempt running the replicator connector, we could not establish a connection to the kafka brokers in Tucson because of the VPN.

            [2020-03-25 19:07:24,388] WARN [AdminClient clientId=adminclient-8] Connection to node -1 (kafka-0-tucson-teststand-efd.lsst.codes/140.252.32.141:31090) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
            

            I confirm the broker is up and I can connect to it from the NOAO VPN network

            kafkacat -L -b kafka-0-tucson-teststand-efd.lsst.codes:31090
            

            a ticket was opened with Networking in Tucson to fix that IHS-3680.

            Show
            afausti Angelo Fausti added a comment - - edited It is configured to replicate topics and schemas from the Tucson Test stand EFD. In the first attempt running the replicator connector, we could not establish a connection to the kafka brokers in Tucson because of the VPN. [2020-03-25 19:07:24,388] WARN [AdminClient clientId=adminclient-8] Connection to node -1 (kafka-0-tucson-teststand-efd.lsst.codes/140.252.32.141:31090) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) I confirm the broker is up and I can connect to it from the NOAO VPN network kafkacat -L -b kafka-0-tucson-teststand-efd.lsst.codes:31090 a ticket was opened with Networking in Tucson to fix that IHS-3680.
            Hide
            afausti Angelo Fausti added a comment - - edited

            Also, as part of the replicator configuration, the Schema Registry in the destination cluster must be configured to allow mode mutability mode.mutability=True. That was added to the deployment configuration for the cp-helm-charts.

            The Schema Registry must be changed to IMPORT mode to migrate schemas from the source cluster to the destination cluster. This is done manually, a Helm post-installation hook could be used to automate that.

            curl  -X PUT -H "Content-Type: application/json" "https://lsst-schema-registry-int-efd.ncsa.illinois.edu/mode" --data '{"mode": "IMPORT"}'
            

            NOTE: to apply this configuration the Schema Registry in the destination cluster must be empty.

            SQR-034 also have documentation on how to configure the replicator connector for topic replication and schema migration, see https://sqr-034.lsst.io/#appendix-a-configuring-the-kafka-connect-replicator-source-connector

            Show
            afausti Angelo Fausti added a comment - - edited Also, as part of the replicator configuration, the Schema Registry in the destination cluster must be configured to allow mode mutability mode.mutability=True . That was added to the deployment configuration for the cp-helm-charts . The Schema Registry must be changed to IMPORT mode to migrate schemas from the source cluster to the destination cluster. This is done manually, a Helm post-installation hook could be used to automate that. curl -X PUT -H "Content-Type: application/json" "https://lsst-schema-registry-int-efd.ncsa.illinois.edu/mode" --data '{"mode": "IMPORT"}' NOTE: to apply this configuration the Schema Registry in the destination cluster must be empty. SQR-034 also have documentation on how to configure the replicator connector for topic replication and schema migration, see https://sqr-034.lsst.io/#appendix-a-configuring-the-kafka-connect-replicator-source-connector
            Hide
            afausti Angelo Fausti added a comment - - edited

            The replicator connector seems to be working fine. After creating the topics and schemas at the destination cluster, the initialization step includes checking for topic offsets between the source and destination clusters. That step only takes ~10min for 1259 topics. The throughput is also improved after the FS maintenance around 1 MB/s compared to the 100KB/s last Friday. Here some fragments from the logs:

            [2020-03-30 22:16:45,183] INFO [Worker clientId=connect-1, groupId=cp-helm-charts] Starting connector replicator 
            [2020-03-30 22:16:45,183] INFO ConnectorConfig values:
             
            (configs)
             
            [2020-03-30 22:16:45,184] INFO Instantiated connector replicator with version 5.3.1 of type class io.confluent.connect.replicator.ReplicatorSourceConnector (org.apache.kafka.connect
            [2020-03-30 22:17:25,386] INFO Registering schema translator for topic _schemas (io.confluent.connect.replicator.ReplicatorSourceTask)
             
            (here's when schema migration happens)
             
            [2020-03-30 22:17:25,402] INFO Requesting metadata refresh after 1215 new topics were added (io.confluent.connect.replicator.util.ReplicatorAdminClient)
             
            (then it checks for topic offsets between the source and destination clusters)
             
            [2020-03-30 22:17:26,385] INFO [Consumer clientId=replicator-0, groupId=replicator] Found no committed offset for partition lsst.sal.ATAOS.logevent_appliedSettingsMatchStart-0 (org.
            apache.kafka.clients.consumer.internals.ConsumerCoordinator)
            [2020-03-30 22:17:26,385] INFO [Consumer clientId=replicator-0, groupId=replicator] Seeking to EARLIEST offset of partition lsst.sal.ATAOS.logevent_appliedSettingsMatchStart-0 (org.
             
            (and it is finally ready to replicate messages, that's about ~10 for 1259 topics)
             
            [2020-03-30 22:28:10,364] INFO Started kafka replicator task replicator-0 replicating partitions
            

            Show
            afausti Angelo Fausti added a comment - - edited The replicator connector seems to be working fine. After creating the topics and schemas at the destination cluster, the initialization step includes checking for topic offsets between the source and destination clusters. That step only takes ~10min for 1259 topics. The throughput is also improved after the FS maintenance around 1 MB/s compared to the 100KB/s last Friday. Here some fragments from the logs: [2020-03-30 22:16:45,183] INFO [Worker clientId=connect-1, groupId=cp-helm-charts] Starting connector replicator [2020-03-30 22:16:45,183] INFO ConnectorConfig values:   (configs)   [2020-03-30 22:16:45,184] INFO Instantiated connector replicator with version 5.3.1 of type class io.confluent.connect.replicator.ReplicatorSourceConnector (org.apache.kafka.connect [2020-03-30 22:17:25,386] INFO Registering schema translator for topic _schemas (io.confluent.connect.replicator.ReplicatorSourceTask)   (here's when schema migration happens)   [2020-03-30 22:17:25,402] INFO Requesting metadata refresh after 1215 new topics were added (io.confluent.connect.replicator.util.ReplicatorAdminClient)   (then it checks for topic offsets between the source and destination clusters)   [2020-03-30 22:17:26,385] INFO [Consumer clientId=replicator-0, groupId=replicator] Found no committed offset for partition lsst.sal.ATAOS.logevent_appliedSettingsMatchStart-0 (org. apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2020-03-30 22:17:26,385] INFO [Consumer clientId=replicator-0, groupId=replicator] Seeking to EARLIEST offset of partition lsst.sal.ATAOS.logevent_appliedSettingsMatchStart-0 (org.   (and it is finally ready to replicate messages, that's about ~10 for 1259 topics)   [2020-03-30 22:28:10,364] INFO Started kafka replicator task replicator-0 replicating partitions
            Hide
            afausti Angelo Fausti added a comment - - edited

            [2020-03-30 23:34:15,028] INFO WorkerSourceTask{id=replicator-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask)
            [2020-03-30 23:34:32,988] INFO WorkerSourceTask{id=replicator-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
            [2020-03-30 23:34:32,989] INFO WorkerSourceTask{id=replicator-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
            [2020-03-30 23:35:14,954] INFO [Consumer clientId=replicator-0, groupId=replicator] Setting offset for partition _schemas-0 to the committed offset FetchPosition{offset=1128, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=140.252.32.141:31091 (id: 1 rack: null), epoch=314}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
            

            Show
            afausti Angelo Fausti added a comment - - edited [2020-03-30 23:34:15,028] INFO WorkerSourceTask{id=replicator-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask) [2020-03-30 23:34:32,988] INFO WorkerSourceTask{id=replicator-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) [2020-03-30 23:34:32,989] INFO WorkerSourceTask{id=replicator-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask) [2020-03-30 23:35:14,954] INFO [Consumer clientId=replicator-0, groupId=replicator] Setting offset for partition _schemas-0 to the committed offset FetchPosition{offset=1128, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=140.252.32.141:31091 (id: 1 rack: null), epoch=314}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

              People

              • Assignee:
                afausti Angelo Fausti
                Reporter:
                afausti Angelo Fausti
                Watchers:
                Angelo Fausti
              • Votes:
                0 Vote for this issue
                Watchers:
                1 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Summary Panel