Uploaded image for project: 'Data Management'
  1. Data Management
  2. DM-21419

The definition of which timestamp the EFD uses should be done by the SAL Kafka producer

    Details

    • Story Points:
      0
    • Sprint:
      TSSW Sprint - Sep 16 - Sep 28, TSSW Sprint - Sep 29 - Oct 13, TSSW Sprint - Oct 14 - Oct 27
    • Team:
      Telescope and Site

      Description

      In the future, we'll record EFD data in multiple stores like InfluxDB, Oracle, Parquet etc

      Kafka replicates data using connectors, right now the decision of which timestamp is used as the InfluxDB time is done in the connector configuration, by a KSQL query:

      INSERT INTO mytopic SELECT * FROM mytopic WITHTIMESTAMP private_sndStamp
      

      a similar configuration would be done in the Oracle connector to define which timestamp field should be indexed.

      To avoid making this decision in different connectors and potentially in different deployments of the EFD, the definition of which timestamp to use in the EFD should be done in the SAL Kafka Producer instead.

      There are different ways to accomplish this.

      The easiest one is to create a new field in the Avro schema called, for example, private_efdStamp which is a copy of whatever timestamp field we should use. Initially, that can be hardcoded to use private_sndStamp.

      The other way is making use of "aliases" in Avro:

      https://avro.apache.org/docs/1.8.1/spec.html#Aliases

      that create an alternate name private_efdStamp for the timestamp field we want to use.

      NOTE: we assume aliases work the way we expect but that needs to be tested. In the above KSQL query SELECT * FROM mytopic ensures that all fields in mytopic will be inserted as fields in InfluxDB with their original names and WITHTIMESTAMP private_efdStamp will use the alternate name to use the right timestamp as the InfluxDB timestamp.

      Either way, the different connectors always use private_efdStamp.

      Another benefit of doing this in the SAL Kafka producer code or in the Avro schema is that changes to the code or to the Avro schema are versioned, while changes in the connector configuration are not.

      This ticket is not blocking anything in the moment. But having the private_efdStamp would make the EFD configuration more realiable.

        Attachments

          Issue Links

            Activity

            Hide
            rowen Russell Owen added a comment -

            I will postpone merging this ticket until we are sure we need it. I'll leave the pull request as is.

            Show
            rowen Russell Owen added a comment - I will postpone merging this ticket until we are sure we need it. I'll leave the pull request as is.
            Hide
            afausti Angelo Fausti added a comment -

            I've set up a test environment for the EFD running version 1.2.3 of the InlfuxDB Sink connector that implements ticket DM-21391.

            lsst.sal.ScriptQueue.logevent_heartbeat was used for this test. The following commands were used to have a SAL test environment running:

            $ docker pull lsstts/develop-env:salobj4_b38
            $ docker run -it lsstts/develop-env:salobj4_b38
             
            #
            # Loading LSST Stack
            #
            # Loading sal environment
            SAL development environment is configured
            <<< Vortex OpenSplice HDE Release 6.9.181127OSS For x86_64.linux, Date 2018-11-28 >>>
            LSST middleware toolset environment v3.10.0 is configured
            #
            # Setting up sal, salobj and scriptqueue
            [saluser@b337e6a3b5dc ~]$ git clone https://github.com/lsst-ts/ts_salkafka.git
            [saluser@b337e6a3b5dc ts_salkafka]$ git checkout tickets/DM-21419
            [saluser@b337e6a3b5dc ts_salkafka]$ setup -r .
             
            [saluser@b337e6a3b5dc bin]$ ./run_salkafka_producer.py --broker efd0.lsst.codes:9094 --registry https://registry-efd.lsst.codes/ ScriptQueue
            Making Kafka client session
            Making avro schema registry.
            Creating producers
            Creating Kafka topics for ScriptQueue if not already present.
            Creating SAL/Kafka topic producers for ScriptQueue.
            Waiting for producers to start
            Read historical data in 5.38 sec
            Running
            

            on another terminal session of the same container we started:

            [saluser@b337e6a3b5dc ~]$ run_script_queue.py 1
            
            

            SAL Kafka successfully created a new version of the Avro schema with the alias:

            // https://registry-efd.lsst.codes/subjects/lsst.sal.Test.logevent_heartbeat-value/versions/3/schema
            ...
            {"name":"private_sndStamp","type":"double","aliases":["private_efdStamp"]}
            ...
            

            I first tested Kafka connect manager with the new --timestamp option to configure the InfluxDB Sink connector using private_sndStamp as the InfluxDB timestamp.

            ./connect_manager create influxdb-sink --influxdb_url https://influxdb-efd.lsst.codes  --database efd --tasks 1 --filter lsst.sal.* --auto-update --check-interval 15 --timestamp private_sndStamp
            

            That worked fine, which means InfluxDB can use timestamps created by SAL (Unix timestamps in seconds as doubles with milliseconds precision) see attached screenshot.

            However, the InfluxDB Sink connector does not understand Avro aliases. When configuring with

            ./connect_manager create influxdb-sink --influxdb_url https://influxdb-efd.lsst.codes  --database efd --tasks 1 --filter lsst.sal.* --auto-update --check-interval 15 --timestamp private_efdStamp
            

            we see this error:

            Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Couldn't find field 'private_efdStamp' in the schema:private_kafkaStamp,ScriptQueueID,private_revCode,private_sndStamp,private_rcvStamp,private_seqNum,private_origin,private_host,heartbeat,priority
            

            this will be addressed in another ticket DM-21884.

            Despite the error above,  SAL Kafka's implementation of Avro aliases worked just fine, and this ticket can be merged.

            Show
            afausti Angelo Fausti added a comment - I've set up a test environment for the EFD running version 1.2.3 of the InlfuxDB Sink connector that implements ticket DM-21391 . lsst.sal.ScriptQueue.logevent_heartbeat was used for this test. The following commands were used to have a SAL test environment running: $ docker pull lsstts/develop-env:salobj4_b38 $ docker run -it lsstts/develop-env:salobj4_b38   # # Loading LSST Stack # # Loading sal environment SAL development environment is configured <<< Vortex OpenSplice HDE Release 6.9.181127OSS For x86_64.linux, Date 2018-11-28 >>> LSST middleware toolset environment v3.10.0 is configured # # Setting up sal, salobj and scriptqueue [saluser@b337e6a3b5dc ~]$ git clone https://github.com/lsst-ts/ts_salkafka.git [saluser@b337e6a3b5dc ts_salkafka]$ git checkout tickets/DM-21419 [saluser@b337e6a3b5dc ts_salkafka]$ setup -r .   [saluser@b337e6a3b5dc bin]$ ./run_salkafka_producer.py --broker efd0.lsst.codes:9094 --registry https://registry-efd.lsst.codes/ ScriptQueue Making Kafka client session Making avro schema registry. Creating producers Creating Kafka topics for ScriptQueue if not already present. Creating SAL/Kafka topic producers for ScriptQueue. Waiting for producers to start Read historical data in 5.38 sec Running on another terminal session of the same container we started: [saluser@b337e6a3b5dc ~]$ run_script_queue.py 1 SAL Kafka successfully created a new version of the Avro schema with the alias: // https://registry-efd.lsst.codes/subjects/lsst.sal.Test.logevent_heartbeat-value/versions/3/schema ... {"name":"private_sndStamp","type":"double","aliases":["private_efdStamp"]} ... I first tested Kafka connect manager with the new --timestamp option to configure the InfluxDB Sink connector using private_sndStamp as the InfluxDB timestamp. ./connect_manager create influxdb-sink --influxdb_url https://influxdb-efd.lsst.codes --database efd --tasks 1 --filter lsst.sal.* --auto-update --check-interval 15 --timestamp private_sndStamp That worked fine, which means InfluxDB can use timestamps created by SAL (Unix timestamps in seconds as doubles with milliseconds precision) see attached screenshot. However, the InfluxDB Sink connector does not understand Avro aliases. When configuring with ./connect_manager create influxdb-sink --influxdb_url https://influxdb-efd.lsst.codes --database efd --tasks 1 --filter lsst.sal.* --auto-update --check-interval 15 --timestamp private_efdStamp we see this error: Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Couldn't find field 'private_efdStamp' in the schema:private_kafkaStamp,ScriptQueueID,private_revCode,private_sndStamp,private_rcvStamp,private_seqNum,private_origin,private_host,heartbeat,priority this will be addressed in another ticket DM-21884 . Despite the error above,  SAL Kafka's implementation of Avro aliases worked just fine, and this ticket can be merged.
            Hide
            rowen Russell Owen added a comment -

            Pull request: https://github.com/lsst-ts/ts_salkafka/pull/4

            Please give this a try

            Show
            rowen Russell Owen added a comment - Pull request: https://github.com/lsst-ts/ts_salkafka/pull/4 Please give this a try
            Hide
            rowen Russell Owen added a comment - - edited

            For the record: I am willing to use aliases, if that works. I will produce a ticket branch with an alias so we can try that. But I am not at all convinced we should bother. private_sndStamp is the only field that makes any sense to use for the long run. The main reason I see to make this configurable is if we wanted to use private_rcvStamp in the short term in order to reliably get TAI. But Patrick Ingraham prefers that we use private_sndStamp now and fix the 37 second error as soon as practical.

            In other words: my personal preference would be to not merge this work.

            Show
            rowen Russell Owen added a comment - - edited For the record: I am willing to use aliases, if that works. I will produce a ticket branch with an alias so we can try that. But I am not at all convinced we should bother. private_sndStamp is the only field that makes any sense to use for the long run. The main reason I see to make this configurable is if we wanted to use private_rcvStamp in the short term in order to reliably get TAI. But Patrick Ingraham prefers that we use private_sndStamp now and fix the 37 second error as soon as practical. In other words: my personal preference would be to not merge this work.

              People

              • Assignee:
                rowen Russell Owen
                Reporter:
                afausti Angelo Fausti
                Reviewers:
                Angelo Fausti
                Watchers:
                Angelo Fausti, Frossie Economou, Patrick Ingraham, Russell Owen, Simon Krughoff
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Summary Panel