Uploaded image for project: 'Data Management'
  1. Data Management
  2. DM-30082

send private_efdStamp to Kafka with realtime/UTC clock

    XMLWordPrintable

    Details

    • Type: Story
    • Status: Done
    • Resolution: Done
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None
    • Story Points:
      2
    • Epic Link:
    • Sprint:
      TSSW Sprint - May 24 - Jun 07, TSSW Sprint - Jun 07 - Jun 21
    • Team:
      Telescope and Site
    • Urgent?:
      No

      Description

      See RFC-767

      This will require adding some support to ts_salobj for TAI unix seconds to UTC unix seconds (since AstroPy is too slow). This direction must provide the offset as an integer number of seconds in order to satisfy the RFC.

      The existing salobj functions that convert UTC unix seconds to TAI unix seconds (utc_from_tai and utc_from_tai_unix) smear the leap second over the full day before a leap second, in order to give unambiguous results (like SOFA and AstroPy). Leave those functions alone, but carefully document the difference with the new tai_from_utc_unix function that does no smearing.

        Attachments

          Issue Links

            Activity

            Hide
            rowen Russell Owen added a comment - - edited

            I also worked on a hang seen in Jenkins. I found and fixed a few instances of await statements that could wait forever (mostly in unit tests, but one in production code that could be a problem – I think the others are safe). Unfortunately the main fix was to wait a bit longer before trying to abort the distributed producers. There are some subtleties to managing tasks with asyncio run_in_executor that I still don't understand. I filed DM-30755 to deal with that.

            Finally I found and deleted an unused argument to ComponentProducerSubset.run_producer_subprocess.

            Pull requests:

            Show
            rowen Russell Owen added a comment - - edited I also worked on a hang seen in Jenkins. I found and fixed a few instances of await statements that could wait forever (mostly in unit tests, but one in production code that could be a problem – I think the others are safe). Unfortunately the main fix was to wait a bit longer before trying to abort the distributed producers. There are some subtleties to managing tasks with asyncio run_in_executor that I still don't understand. I filed DM-30755 to deal with that. Finally I found and deleted an unused argument to ComponentProducerSubset.run_producer_subprocess . Pull requests: https://github.com/lsst-ts/ts_salobj/pull/195 https://github.com/lsst-ts/ts_salkafka/pull/20
            Hide
            afausti Angelo Fausti added a comment - - edited

            I have run an integration test to verify the creation of the new private_efdStamp field in InfluxDB.

            1) Add the current T&S development environment to this docker-compose.yml setup:

            services:
              develop:
                image: lsstts/develop-env:develop
                hostname: develop
                container_name: develop
            

            2) Start the services with:

            $ docker-compose up -d
            

            3) Run ts_salkafka producer with the Test CSC:

            $ docker-compose run develop
             
            # First setup the versions of ts_salobj and ts_salkafka versions to tickets/DM-30082
            > git clone https://github.com/lsst-ts/ts_salobj.git
            > cd ts_salobj/
            > git checkout tickets/DM-30082
            > setup -r . 
             
            > git clone https://github.com/lsst-ts/ts_salkafka.git
            > cd ts_salkafka/
            > git checkout tickets/DM-30082
            > setup -r .
             
            # This additional env variable is required
            > export LSST_DDS_PARTITION_PREFIX=test
             
            # Run the producer with --replicator-factor 1 since we have only one broker in this setup.
            > ./bin/run_salkafka_producer.py --replication-factor 1 --broker  broker:29092 --registry http://schema-registry:8081 Test &
            current_tai uses current_tai_from_utc; clock_gettime(CLOCK_TAI) is off by 37.0 seconds
            Making Kafka client session
            Making avro schema registry.
            Creating producers for ['Test']
            Creating Kafka topics for Test if not already present.
            Creating SAL/Kafka topic producers for Test.
            Waiting for producers to start
            Read historical data in 5.40 sec
            

            At this point, we can see the new private_efdStamp field in the Avro schemas:

            $  curl http://localhost:8081/subjects/lsst.sal.Test.logevent_heartbeat-value/versions/1/schema
            {"type":"record","name":"logevent_heartbeat","namespace":"lsst.sal.Test","fields":[{"name":"private_efdStamp","type":"double","description":"UTC time for EFD timestamp. An integer (the number of leap seconds) different from private_sndStamp.","units":"second"},{"name":"private_kafkaStamp","type":"double","description":"TAI time at which the Kafka message was created.","units":"second"},{"name":"TestID","type":"long"}
            ...
            

            4) Produce records for the Test CSC:

            $ docker-compose run develop
             
            > export LSST_DDS_PARTITION_PREFIX=test
            > run_test_csc.py 1
            current_tai uses current_tai_from_utc; clock_gettime(CLOCK_TAI) is off by 37.0 seconds
            config_kwargs= {'config_schema': {'$schema': 'http://json-schema.org/draft-07/schema#', '$id': 'https://github.com/lsst-ts/ts_salobj/blob/master/python/lsst/ts/salobj/config_schema.py', 'title': 'Test v1', 'description': 'Configuration for the TestCsc', 'type': 'object', 'properties': {'string0': {'type': 'string', 'default': 'default value for string0'}, 'bool0': {'type': 'boolean', 'default': True}, 'int0': {'type': 'integer', 'default': 5}, 'float0': {'type': 'number', 'default': 3.14}, 'intarr0': {'type': 'array', 'default': [-1, 1], 'items': {'type': 'integer'}}, 'multi_type': {'anyOf': [{'type': 'integer', 'minimum': 1}, {'type': 'string'}, {'type': 'null'}], 'default': None}}, 'required': ['string0', 'bool0', 'int0', 'float0', 'intarr0', 'multi_type'], 'additionalProperties': False}}
            

            5) Create a database in InfluxDB:

            $ docker-compose exec influxdb influx -execute "CREATE DATABASE mydb"
            

            6) Use kafka-connect-manager to create an instance of the InfluxDB Sink connector.

            Note that the connector is configured to use the new timestamp private_efdStamp as the InfluxDB time:

            $ docker-compose run kafkaconnect create influxdb-sink --database mydb --timestamp private_efdStamp --topic-regex "lsst.sal.*"
             
            Creating test_kafkaconnect_run ... done
            Found 30 topics.
            Validation returned 0 error(s).
            Uploading influxdb-sink connector configuration...
             
            $ docker-compose run kafkaconnect status influxdb-sink
            Creating test_kafkaconnect_run ... done
            {
                "connector": {
                    "state": "RUNNING",
                    "worker_id": "connect:8083"
                },
                "name": "influxdb-sink",
                "tasks": [
                    {
                        "id": 0,
                        "state": "RUNNING",
                        "worker_id": "connect:8083"
                    }
                ],
                "type": "sink"
            }
            
            

            7) Finally check the results in InfluxDB:

            $ docker-compose exec influxdb influx -database mydb -execute 'SELECT * FROM "lsst.sal.Test.logevent_heartbeat"'
            name: lsst.sal.Test.logevent_heartbeat
            time                TestID heartbeat priority private_efdStamp   private_host private_identity private_kafkaStamp private_origin private_rcvStamp   private_revCode private_seqNum private_sndStamp
            ----                ------ --------- -------- ----------------   ------------ ---------------- ------------------ -------------- ----------------   --------------- -------------- ----------------
            1623805130557000000 1      false     0        1623805130.5570297 0            Test:1           1623805167.5581822 586            1623805167.5575716 57130bcb        1              1623805167.5570297
            1623805131558000000 1      false     0        1623805131.5585315 0            Test:1           1623805168.559541  586            1623805168.5589802 57130bcb        2              1623805168.5585315
            1623805132559000000 1      false     0        1623805132.5592391 0            Test:1           1623805169.5603163 586            1623805169.5597157 57130bcb        3              1623805169.5592391
            

            From these results, we can see that the InfluxDB time field corresponds to private_efdStamp with milliseconds precision (the precision set in InfluxDB) and that it is 37 seconds behind private_sndStamp, the TAI timestamp used to create  private_efdStamp.

            Show
            afausti Angelo Fausti added a comment - - edited I have run an integration test to verify the creation of the new private_efdStamp field in InfluxDB. 1) Add the current T&S development environment to this  docker-compose.yml setup: services: develop: image: lsstts/develop-env:develop hostname: develop container_name: develop 2) Start the services with: $ docker-compose up -d 3) Run ts_salkafka producer with the Test CSC: $ docker-compose run develop   # First setup the versions of ts_salobj and ts_salkafka versions to tickets/DM-30082 > git clone https://github.com/lsst-ts/ts_salobj.git > cd ts_salobj/ > git checkout tickets/DM-30082 > setup -r .   > git clone https://github.com/lsst-ts/ts_salkafka.git > cd ts_salkafka/ > git checkout tickets/DM-30082 > setup -r .   # This additional env variable is required > export LSST_DDS_PARTITION_PREFIX=test   # Run the producer with --replicator-factor 1 since we have only one broker in this setup. > ./bin/run_salkafka_producer.py --replication-factor 1 --broker broker:29092 --registry http://schema-registry:8081 Test & current_tai uses current_tai_from_utc; clock_gettime(CLOCK_TAI) is off by 37.0 seconds Making Kafka client session Making avro schema registry. Creating producers for ['Test'] Creating Kafka topics for Test if not already present. Creating SAL/Kafka topic producers for Test. Waiting for producers to start Read historical data in 5.40 sec At this point, we can see the new  private_efdStamp  field in the Avro schemas: $ curl http://localhost:8081/subjects/lsst.sal.Test.logevent_heartbeat-value/versions/1/schema {"type":"record","name":"logevent_heartbeat","namespace":"lsst.sal.Test","fields":[{"name":"private_efdStamp","type":"double","description":"UTC time for EFD timestamp. An integer (the number of leap seconds) different from private_sndStamp.","units":"second"},{"name":"private_kafkaStamp","type":"double","description":"TAI time at which the Kafka message was created.","units":"second"},{"name":"TestID","type":"long"} ... 4) Produce records for the Test CSC: $ docker-compose run develop   > export LSST_DDS_PARTITION_PREFIX=test > run_test_csc.py 1 current_tai uses current_tai_from_utc; clock_gettime(CLOCK_TAI) is off by 37.0 seconds config_kwargs= {'config_schema': {'$schema': 'http://json-schema.org/draft-07/schema#', '$id': 'https://github.com/lsst-ts/ts_salobj/blob/master/python/lsst/ts/salobj/config_schema.py', 'title': 'Test v1', 'description': 'Configuration for the TestCsc', 'type': 'object', 'properties': {'string0': {'type': 'string', 'default': 'default value for string0'}, 'bool0': {'type': 'boolean', 'default': True}, 'int0': {'type': 'integer', 'default': 5}, 'float0': {'type': 'number', 'default': 3.14}, 'intarr0': {'type': 'array', 'default': [-1, 1], 'items': {'type': 'integer'}}, 'multi_type': {'anyOf': [{'type': 'integer', 'minimum': 1}, {'type': 'string'}, {'type': 'null'}], 'default': None}}, 'required': ['string0', 'bool0', 'int0', 'float0', 'intarr0', 'multi_type'], 'additionalProperties': False}} 5) Create a database in InfluxDB: $ docker-compose exec influxdb influx -execute "CREATE DATABASE mydb" 6) Use kafka-connect-manager to create an instance of the InfluxDB Sink connector. Note that the connector is configured to use the new timestamp private_efdStamp as the InfluxDB time: $ docker-compose run kafkaconnect create influxdb-sink --database mydb --timestamp private_efdStamp --topic-regex "lsst.sal.*"   Creating test_kafkaconnect_run ... done Found 30 topics. Validation returned 0 error(s). Uploading influxdb-sink connector configuration...   $ docker-compose run kafkaconnect status influxdb-sink Creating test_kafkaconnect_run ... done { "connector": { "state": "RUNNING", "worker_id": "connect:8083" }, "name": "influxdb-sink", "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "connect:8083" } ], "type": "sink" } 7) Finally check the results in InfluxDB: $ docker-compose exec influxdb influx -database mydb -execute 'SELECT * FROM "lsst.sal.Test.logevent_heartbeat"' name: lsst.sal.Test.logevent_heartbeat time TestID heartbeat priority private_efdStamp private_host private_identity private_kafkaStamp private_origin private_rcvStamp private_revCode private_seqNum private_sndStamp ---- ------ --------- -------- ---------------- ------------ ---------------- ------------------ -------------- ---------------- --------------- -------------- ---------------- 1623805130557000000 1 false 0 1623805130.5570297 0 Test:1 1623805167.5581822 586 1623805167.5575716 57130bcb 1 1623805167.5570297 1623805131558000000 1 false 0 1623805131.5585315 0 Test:1 1623805168.559541 586 1623805168.5589802 57130bcb 2 1623805168.5585315 1623805132559000000 1 false 0 1623805132.5592391 0 Test:1 1623805169.5603163 586 1623805169.5597157 57130bcb 3 1623805169.5592391 From these results, we can see that the InfluxDB  time field corresponds to private_efdStamp with milliseconds precision (the precision set in InfluxDB) and that it is 37 seconds behind private_sndStamp , the TAI timestamp used to create   private_efdStamp .
            Hide
            rowen Russell Owen added a comment -

            Released ts_salobj v6.4.0 and ts_salkafka v1.7.0

            Show
            rowen Russell Owen added a comment - Released ts_salobj v6.4.0 and ts_salkafka v1.7.0

              People

              Assignee:
              rowen Russell Owen
              Reporter:
              frossie Frossie Economou
              Reviewers:
              Angelo Fausti
              Watchers:
              Angelo Fausti, Frossie Economou, Russell Owen
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Dates

                Created:
                Updated:
                Resolved:

                  Jenkins

                  No builds found.