The definition of which timestamp the EFD uses should be done by the SAL Kafka producer
Use private_sndStamp as the InfluxDB timestamp
Add --timestamp option to kafka-connect-manager to select a specific field to be used as the InfluxDB timestamp.
The default (generic) behavior is still to use the current system time.
This can be done in KSQL, see https://docs.lenses.io/connectors/sink/influx.html#kcql-support
the --timestamp option was added, and the default behavior preserved.
$ connect_manager create influxdb-sink -d efd -t 10 --dry-run mytopic1 --timestamp private_sndStamp
"connect.influx.kcql": "INSERT INTO mytopic1 SELECT * FROM mytopic1 WITHTIMESTAMP private_sndStamp",
$ connect_manager create influxdb-sink -d efd -t 10 --dry-run mytopic1
"connect.influx.kcql": "INSERT INTO mytopic1 SELECT * FROM mytopic1 WITHTIMESTAMP sys_time()",
java.lang.IllegalArgumentException: Invalid value for field:private_sndStamp.Value '1.3261721783120623E9' is not a valid field for the timestamp
It seems reasonable to add support for double types in the InfluxDB Sink connector.
See PR https://github.com/Landoop/stream-reactor/pull/626
With this change in the InfluxDB Sink connector we can understand SAL timestamps that are stored as double, then we coerce to Long with milliseconds precision to use with InfluxDB.
The change was included in the 1.2.3 release of the connector https://github.com/lensesio/stream-reactor/releases.
The --timestamp option was successfully tested with version 1.2.3 of the InfluxDB Sink connector during review of DM-21419