I have run an integration test to verify the creation of the new private_efdStamp field in InfluxDB.
1) Add the current T&S development environment to this docker-compose.yml setup:
services:
|
develop:
|
image: lsstts/develop-env:develop
|
hostname: develop
|
container_name: develop
|
2) Start the services with:
3) Run ts_salkafka producer with the Test CSC:
$ docker-compose run develop
|
|
# First setup the versions of ts_salobj and ts_salkafka versions to tickets/DM-30082
|
> git clone https://github.com/lsst-ts/ts_salobj.git
|
> cd ts_salobj/
|
> git checkout tickets/DM-30082
|
> setup -r .
|
|
> git clone https://github.com/lsst-ts/ts_salkafka.git
|
> cd ts_salkafka/
|
> git checkout tickets/DM-30082
|
> setup -r .
|
|
# This additional env variable is required
|
> export LSST_DDS_PARTITION_PREFIX=test
|
|
# Run the producer with --replicator-factor 1 since we have only one broker in this setup.
|
> ./bin/run_salkafka_producer.py --replication-factor 1 --broker broker:29092 --registry http://schema-registry:8081 Test &
|
current_tai uses current_tai_from_utc; clock_gettime(CLOCK_TAI) is off by 37.0 seconds
|
Making Kafka client session
|
Making avro schema registry.
|
Creating producers for ['Test']
|
Creating Kafka topics for Test if not already present.
|
Creating SAL/Kafka topic producers for Test.
|
Waiting for producers to start
|
Read historical data in 5.40 sec
|
At this point, we can see the new private_efdStamp field in the Avro schemas:
$ curl http://localhost:8081/subjects/lsst.sal.Test.logevent_heartbeat-value/versions/1/schema
|
{"type":"record","name":"logevent_heartbeat","namespace":"lsst.sal.Test","fields":[{"name":"private_efdStamp","type":"double","description":"UTC time for EFD timestamp. An integer (the number of leap seconds) different from private_sndStamp.","units":"second"},{"name":"private_kafkaStamp","type":"double","description":"TAI time at which the Kafka message was created.","units":"second"},{"name":"TestID","type":"long"}
|
...
|
4) Produce records for the Test CSC:
$ docker-compose run develop
|
|
> export LSST_DDS_PARTITION_PREFIX=test
|
> run_test_csc.py 1
|
current_tai uses current_tai_from_utc; clock_gettime(CLOCK_TAI) is off by 37.0 seconds
|
config_kwargs= {'config_schema': {'$schema': 'http://json-schema.org/draft-07/schema#', '$id': 'https://github.com/lsst-ts/ts_salobj/blob/master/python/lsst/ts/salobj/config_schema.py', 'title': 'Test v1', 'description': 'Configuration for the TestCsc', 'type': 'object', 'properties': {'string0': {'type': 'string', 'default': 'default value for string0'}, 'bool0': {'type': 'boolean', 'default': True}, 'int0': {'type': 'integer', 'default': 5}, 'float0': {'type': 'number', 'default': 3.14}, 'intarr0': {'type': 'array', 'default': [-1, 1], 'items': {'type': 'integer'}}, 'multi_type': {'anyOf': [{'type': 'integer', 'minimum': 1}, {'type': 'string'}, {'type': 'null'}], 'default': None}}, 'required': ['string0', 'bool0', 'int0', 'float0', 'intarr0', 'multi_type'], 'additionalProperties': False}}
|
5) Create a database in InfluxDB:
$ docker-compose exec influxdb influx -execute "CREATE DATABASE mydb"
|
6) Use kafka-connect-manager to create an instance of the InfluxDB Sink connector.
Note that the connector is configured to use the new timestamp private_efdStamp as the InfluxDB time:
$ docker-compose run kafkaconnect create influxdb-sink --database mydb --timestamp private_efdStamp --topic-regex "lsst.sal.*"
|
|
Creating test_kafkaconnect_run ... done
|
Found 30 topics.
|
Validation returned 0 error(s).
|
Uploading influxdb-sink connector configuration...
|
|
$ docker-compose run kafkaconnect status influxdb-sink
|
Creating test_kafkaconnect_run ... done
|
{
|
"connector": {
|
"state": "RUNNING",
|
"worker_id": "connect:8083"
|
},
|
"name": "influxdb-sink",
|
"tasks": [
|
{
|
"id": 0,
|
"state": "RUNNING",
|
"worker_id": "connect:8083"
|
}
|
],
|
"type": "sink"
|
}
|
|
7) Finally check the results in InfluxDB:
$ docker-compose exec influxdb influx -database mydb -execute 'SELECT * FROM "lsst.sal.Test.logevent_heartbeat"'
|
name: lsst.sal.Test.logevent_heartbeat
|
time TestID heartbeat priority private_efdStamp private_host private_identity private_kafkaStamp private_origin private_rcvStamp private_revCode private_seqNum private_sndStamp
|
---- ------ --------- -------- ---------------- ------------ ---------------- ------------------ -------------- ---------------- --------------- -------------- ----------------
|
1623805130557000000 1 false 0 1623805130.5570297 0 Test:1 1623805167.5581822 586 1623805167.5575716 57130bcb 1 1623805167.5570297
|
1623805131558000000 1 false 0 1623805131.5585315 0 Test:1 1623805168.559541 586 1623805168.5589802 57130bcb 2 1623805168.5585315
|
1623805132559000000 1 false 0 1623805132.5592391 0 Test:1 1623805169.5603163 586 1623805169.5597157 57130bcb 3 1623805169.5592391
|
From these results, we can see that the InfluxDB time field corresponds to private_efdStamp with milliseconds precision (the precision set in InfluxDB) and that it is 37 seconds behind private_sndStamp, the TAI timestamp used to create private_efdStamp.
I also worked on a hang seen in Jenkins. I found and fixed a few instances of await statements that could wait forever (mostly in unit tests, but one in production code that could be a problem – I think the others are safe). Unfortunately the main fix was to wait a bit longer before trying to abort the distributed producers. There are some subtleties to managing tasks with asyncio run_in_executor that I still don't understand. I filed DM-30755 to deal with that.
Finally I found and deleted an unused argument to ComponentProducerSubset.run_producer_subprocess.
Pull requests: