Uploaded image for project: 'Data Management'
  1. Data Management
  2. DM-24856

Register aggregated topic schemas to a secondary schema registry

    Details

    • Type: Story
    • Status: Done
    • Resolution: Done
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      In the EFD, data replication is done in one direction, copying kafka topics and schemas from a source cluster (e.g. base) to a destination cluster (e.g. LDF) (see the multi Data Center active-passive setup )

      The Aggregator is a component that runs on the destination cluster (e.g. LDF) and it creates new schemas for the aggregated topics. In the active-passive setup, these schemas will exist only at the destination cluster, we don't want them replicated back to the source cluster. That means the schema IDs would no longer be globally unique if we use the same Schema Registry to register the aggregated topic schemas, there may be collisions between schema IDs for schemas created at the source and destination clusters.

      For the Aggregator, we'll need a secondary schema registry to register the aggregated topic schemas.

      In this ticket, we'll investigate this possibility further and configure the kafka-aggregator to read schemas for the source topics from the primary Schema Registry and write schemas to the secondary one.

        Attachments

          Issue Links

            Activity

            Hide
            afausti Angelo Fausti added a comment - - edited

            I'll avoid the primary/secondary Schema Registy nomenclature as this has a special meaning in a multi-datacenter deployment of Kafka.

            From the point of view of the EFD Aggregator, I'm calling the second Schema Registry "internal Schema Registry" which is aligned with Faust nomenclature, "internal topics" are those managed by Faust and will have their schemas registered in the "internal Schema Registry".

            I've used docker-compose to show that it is possible to have two Schema Registries running simultaneously and I came with the following docker-compose configuration:

              schema-registry:
                image: confluentinc/cp-schema-registry:5.3.2
                hostname: schema-registry
                container_name: schema-registry
                depends_on:
                  - zookeeper
                  - broker
                ports:
                  - "8081:8081"
                environment:
                  SCHEMA_REGISTRY_HOST_NAME: schema-registry
                  SCHEMA_REGISTRY_MASTER_ELIGIBILITY: "true"
                  SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
             
              internal-schema-registry:
                image: confluentinc/cp-schema-registry:5.3.2
                hostname: internal-schema-registry
                container_name: internal-schema-registry
                depends_on:
                  - zookeeper
                  - broker
                ports:
                  - "28081:28081"
                environment:
                  SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:28081
                  SCHEMA_REGISTRY_HOST_NAME: internal-schema-registry
                  SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _internal_schemas
                  SCHEMA_REGISTRY_SCHEMA_REGISTRY_GROUP_ID: internal-schema-registry
                  SCHEMA_REGISTRY_MASTER_ELIGIBILITY: "true"
                  SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
            

            The rationale for this configuration:

            SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:28081 runs the internal-schema-registry on another port.

            We have to be careful and write the schemas to a different Kafka topic SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _internal_schemas

            Also SCHEMA_REGISTRY_SCHEMA_REGISTRY_GROUP_ID: internal-schema-registry make sure we use a different consumer group ID for the internal Schema Registry.

            Make sure that SCHEMA_REGISTRY_MASTER_ELIGIBILITY: "true" for both Schema Registries, both are seen as "primary" in the cluster.

            In doing that, there's the following message in the logs, but it looks like this configuration is allowed. See, for example, this issue.

            internal-schema-registry    | [2020-05-12 20:24:54,183] INFO conflict in /schema_registry_master data: \{"host":"internal-schema-registry","port":28081,"master_eligibility":true,"scheme":"http","version":1} stored data: \{"host":"schema-registry","port":8081,"master_eligibility":true,"scheme":"http","version":1}
            

             

            As a result, I can configure the kafka-aggregator example app to initialize the source topic and register its schema in the schema-registry while initializing the aggregated topic and registering its schema in the internal-schema-registry.

            Looks like this configuration works fine:

            schema-registry             | [2020-05-13 01:34:29,418] INFO 172.30.0.1 - - [13/May/2020:01:34:29 +0000] "POST /subjects/kafkaaggregator-src-topic-value HTTP/1.1" 200 172  12 (io.confluent.rest-utils.requests)
             
            internal-schema-registry    | [2020-05-13 01:34:30,029] INFO 172.30.0.1 - - [13/May/2020:01:34:29 +0000] "POST /compatibility/subjects/kafkaaggregator-agg-topic-value/versions/latest HTTP/1.1" 200 42  479 (io.confluent.rest-utils.requests)
             
            internal-schema-registry    | [2020-05-13 01:34:30,102] INFO 172.30.0.1 - - [13/May/2020:01:34:30 +0000] "POST /subjects/kafkaaggregator-agg-topic-value/versions HTTP/1.1" 200 8  56 (io.confluent.rest-utils.requests)
            

            $ curl http://localhost:8081/subjects
            ["kafkaaggregator-src-topic-value"]
            

            $ curl http://localhost:28081/subjects
            ["kafkaaggregator-agg-topic-value"]
            

            Show
            afausti Angelo Fausti added a comment - - edited I'll avoid the primary/secondary Schema Registy nomenclature as this has a special meaning in a multi-datacenter deployment of Kafka. From the point of view of the EFD Aggregator, I'm calling the second Schema Registry "internal Schema Registry" which is aligned with Faust nomenclature, "internal topics" are those managed by Faust and will have their schemas registered in the "internal Schema Registry". I've used docker-compose to show that it is possible to have two Schema Registries running simultaneously and I came with the following docker-compose configuration: schema-registry: image: confluentinc/cp-schema-registry:5.3.2 hostname: schema-registry container_name: schema-registry depends_on: - zookeeper - broker ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_MASTER_ELIGIBILITY: "true" SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'   internal-schema-registry: image: confluentinc/cp-schema-registry:5.3.2 hostname: internal-schema-registry container_name: internal-schema-registry depends_on: - zookeeper - broker ports: - "28081:28081" environment: SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:28081 SCHEMA_REGISTRY_HOST_NAME: internal-schema-registry SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _internal_schemas SCHEMA_REGISTRY_SCHEMA_REGISTRY_GROUP_ID: internal-schema-registry SCHEMA_REGISTRY_MASTER_ELIGIBILITY: "true" SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' The rationale for this configuration: listeners SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:28081 runs the internal-schema-registry on another port. kafkastore.topic We have to be careful and write the schemas to a different Kafka topic SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _internal_schemas schema_registry_group_id Also SCHEMA_REGISTRY_SCHEMA_REGISTRY_GROUP_ID: internal-schema-registry make sure we use a different consumer group ID for the internal Schema Registry. master.eligibility Make sure that SCHEMA_REGISTRY_MASTER_ELIGIBILITY: "true" for both Schema Registries, both are seen as "primary" in the cluster. In doing that, there's the following message in the logs, but it looks like this configuration is allowed. See, for example, this issue . internal-schema-registry    | [2020-05-12 20:24:54,183] INFO conflict in /schema_registry_master data: \{"host":"internal-schema-registry","port":28081,"master_eligibility":true,"scheme":"http","version":1} stored data: \{"host":"schema-registry","port":8081,"master_eligibility":true,"scheme":"http","version":1}   As a result, I can configure the kafka-aggregator example app to initialize the source topic and register its schema in the schema-registry while initializing the aggregated topic and registering its schema in the internal-schema-registry . Looks like this configuration works fine: schema-registry | [2020-05-13 01:34:29,418] INFO 172.30.0.1 - - [13/May/2020:01:34:29 +0000] "POST /subjects/kafkaaggregator-src-topic-value HTTP/1.1" 200 172 12 (io.confluent.rest-utils.requests)   internal-schema-registry | [2020-05-13 01:34:30,029] INFO 172.30.0.1 - - [13/May/2020:01:34:29 +0000] "POST /compatibility/subjects/kafkaaggregator-agg-topic-value/versions/latest HTTP/1.1" 200 42 479 (io.confluent.rest-utils.requests)   internal-schema-registry | [2020-05-13 01:34:30,102] INFO 172.30.0.1 - - [13/May/2020:01:34:30 +0000] "POST /subjects/kafkaaggregator-agg-topic-value/versions HTTP/1.1" 200 8 56 (io.confluent.rest-utils.requests) $ curl http://localhost:8081/subjects ["kafkaaggregator-src-topic-value"] $ curl http://localhost:28081/subjects ["kafkaaggregator-agg-topic-value"]
            Hide
            afausti Angelo Fausti added a comment - - edited

            Jonathan Sick this PR shows that it is possible to run the aggregation example with two Schema Registries.  I think this setup makes sense for our use case as discussed in the last co-work, let me know if this looks reasonable. Thanks!

             

             

            Show
            afausti Angelo Fausti added a comment - - edited Jonathan Sick   this PR shows that it is possible to run the aggregation example with two Schema Registries.  I think this setup makes sense for our use case as discussed in the last co-work, let me know if this looks reasonable. Thanks!    
            Hide
            afausti Angelo Fausti added a comment -

            Also,  SQR-040 has an updated version of the diagram we discussed last time.

            Show
            afausti Angelo Fausti added a comment - Also,  SQR-040  has an updated version of the diagram we discussed last time.
            Hide
            jsick Jonathan Sick added a comment -

            Nice solution. LGTM.

            Thoughts pasted over from the PR:

             

            This looks great. Its a clean solution and sidesteps any conflicts with a multi-cluster writable schema registry.

            One question: will the aggregated Kafka topics be consumed by anything other than the Kafka aggregated itself and the Connectors to Parquet object store / SQL destinations? I just ask because if there are potential third-party consumers of the aggregated kafka topics we might want to change the "internal Schema Registry" nomenclature to something like "aggregated EFD Schema Registry".

            No need to act on this before merging — just a thought.

            Show
            jsick Jonathan Sick added a comment - Nice solution. LGTM. Thoughts pasted over from the PR:   This looks great. Its a clean solution and sidesteps any conflicts with a multi-cluster writable schema registry. One question: will the aggregated Kafka topics be consumed by anything other than the Kafka aggregated itself and the Connectors to Parquet object store / SQL destinations? I just ask because if there are potential third-party consumers of the aggregated kafka topics we might want to change the "internal Schema Registry" nomenclature to something like "aggregated EFD Schema Registry". No need to act on this before merging — just a thought.
            Hide
            afausti Angelo Fausti added a comment -

            Nice! thank you for the review.

            Show
            afausti Angelo Fausti added a comment - Nice! thank you for the review.

              People

              • Assignee:
                afausti Angelo Fausti
                Reporter:
                afausti Angelo Fausti
                Reviewers:
                Jonathan Sick
                Watchers:
                Angelo Fausti, Jonathan Sick
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Summary Panel