# Create code to read and write SAL topics over Kafka.

XMLWordPrintable

#### Details

• Type: Story
• Status: Done
• Resolution: Done
• Fix Version/s: None
• Component/s:
• Labels:
• Story Points:
4
• Sprint:
TSSW Sprint - Jan 03 - Jan 17, TSSW Sprint - Jan 17 - Jan 31
• Team:
Telescope and Site
• Urgent?:
No

#### Description

Create prototype code to generate dataclasses, pydantic classes and Avro schemas from XML or IDL, for the Kafka experiment.

Use XML because it is easier to parse, and because if we use Kafka we have no further use for the OMG IDL files.

Make it a separate package that is as self-contained as possible (it will need ts_xml and some standard Kafka libraries). As a prototype it does not need unit tests. The main products are:

• Code to parse our XML files and generate Avro schemas, Python dataclasses, and pydantic data models.
• Code to validate topic data (at least check array and string lengths and reject unknown fields; Kafka can do the rest).
• Bin scripts to read and write SAL topics using Kafka and report transmission rate (messages/second) and latency statistics.

#### Activity

Hide
Russell Owen added a comment - - edited

Many thanks to Angelo Fausti for his help with this.

The package includes:

• ComponentInfo, TopicInfo, and FieldInfo classes, which parse ts_xml and can be used to produce Avro schemas, python dataclasses, and pydantic data models and perform the basic validation mentioned in the description.
• bin/read_kafka_data.py, which can be used to read a set of topics for one SAL component. When run with -t it will report how performance numbers. When run without it will print each received message. It also retrieves one item of historical data (per index for an indexed component) and reports basic information about that.
• bin/write_kafka_data.py, which can be used to write one topic (or to just register a schema if run with -n 0). It always reports messages/second if -n > 0.
• Kafka readme.txt: a file giving instructions for running in a Docker image.

I also tweaked ts_salobj's tests/test_speed.py to report the speed of writing MTM1M3 forceActuatorData (one of the largest topics I could find) using temporary branch tickets/DM-33071. I don't feel it is necessary to review this branch. I don't intend to merge it because that would require ts_salobj tests have the MTM1M3 IDL file

Show
Russell Owen added a comment - - edited Many thanks to Angelo Fausti for his help with this. The package includes: ComponentInfo, TopicInfo, and FieldInfo classes, which parse ts_xml and can be used to produce Avro schemas, python dataclasses, and pydantic data models and perform the basic validation mentioned in the description. bin/read_kafka_data.py, which can be used to read a set of topics for one SAL component. When run with -t it will report how performance numbers. When run without it will print each received message. It also retrieves one item of historical data (per index for an indexed component) and reports basic information about that. bin/write_kafka_data.py, which can be used to write one topic (or to just register a schema if run with -n 0). It always reports messages/second if -n > 0. Kafka readme.txt: a file giving instructions for running in a Docker image. Code: https://github.com/r-owen/kafkaprototype I also tweaked ts_salobj's tests/test_speed.py to report the speed of writing MTM1M3 forceActuatorData (one of the largest topics I could find) using temporary branch tickets/ DM-33071 . I don't feel it is necessary to review this branch. I don't intend to merge it because that would require ts_salobj tests have the MTM1M3 IDL file
Hide
Russell Owen added a comment - - edited

Here are my timings as per the Kafka readme, using kafka-aggregator.

The defaults:

• Write with waiting for ack from Kafka (safest) and making a pydantic model (for full validation before handing over to Kafka for some validation).
• Read and create a pydantic model.

 read_kafka.py MTM1M3 tel_forceActuatorData -n 2000 -t & # wait for it to start, then... write_kafka.py MTM1M3 tel_forceActuatorData -n 2000 

Write and read with default options:
Wrote 650.6 messages/second: Namespace(component='MTM1M3', nowait_ack=False, number=2000, topic='tel_forceActuatorData', validation='pydantic')
Read 652.3 messages/second: Namespace(component='MTM1M3', number=2000, postprocess='pydantic', time=True, topic='tel_forceActuatorData')
Delay mean = 0.005, stdev = 0.001, min = 0.002, max = 0.017 seconds

This is rather slow. Try the custom validator in the writer to see if it speeds up writing:

Write with --validation=custom, read with defaults:
Wrote 1473.5 messages/second: Namespace(component='MTM1M3', nowait_ack=False, number=2000, topic='tel_forceActuatorData', validation='custom')
Read 813.0 messages/second: Namespace(component='MTM1M3', number=2000, postprocess='pydantic', time=True, topic='tel_forceActuatorData')
Delay mean = 0.580, stdev = 0.323, min = 0.012, max = 1.121 seconds

The fast write speed shows that the custom validator is much faster than using pydantic.
The slow read speed and large delay both suggest that reading into pydantic is expensive, so the reader was falling behind. So try without pydantic in the reader:

Write with --validation=custom, read with --postprocess=simple_namespace:
Wrote 1337.9 messages/second: Namespace(component='MTM1M3', nowait_ack=False, number=2000, topic='tel_forceActuatorData', validation='custom')
Read 1345.6 messages/second: Namespace(component='MTM1M3', number=2000, postprocess='simple_namespace', time=True, topic='tel_forceActuatorData')
Delay mean = 0.003, stdev = 0.001, min = 0.002, max = 0.012 seconds

This looks quite reasonable and is a configuration we could easily live with.

Now try writing and reading with no ack, no extra validation (beyond what Kafka gives, which does not check array lengths, string lengths, nor extra fields) and no post-processing. This is just to show how fast Kafka can go; I would hate to operate in this mode:

Write with --validation=none --nowait_ack, read with --postprocess=simple_namespace:
Wrote 2755.8 messages/second: Namespace(component='MTM1M3', nowait_ack=True, number=2000, topic='tel_forceActuatorData', validation='none')
Read 2046.1 messages/second: Namespace(component='MTM1M3', number=2000, postprocess='simple_namespace', time=True, topic='tel_forceActuatorData')
Delay mean = 0.156, stdev = 0.072, min = 0.012, max = 0.269 seconds

By comparison, I tweaked test/read_speed.py to measure DDS speeds for the same topic (the standard version runs with two smaller topics). I get:
Read 1580 samples/second (1000 samples); lost 52 samples
Wrote 1597 samples/second (1000 samples)

• The Kafka tests never lost data (even when not waiting for an ack from the Kafka broker, and even when writing 10,000 messages, not shown above).
• We should repeat these tests using the JVM that Dave Mills found. I hope that this will speed up waiting for ack in the writer and reduce the delays.
• Using pydantic models is too slow. Fortunately we have good alternatives.
Show
Hide
Tiago Ribeiro added a comment -

Pretty impressive progress accomplished with this ticket. I am also impressed with Kafka's performance, even though it is using the regular openjdk JVM.

One thing I would suggest is that the repository https://github.com/r-owen/kafkaprototype be moved into lsst-ts organization.

Show
Tiago Ribeiro added a comment - Pretty impressive progress accomplished with this ticket. I am also impressed with Kafka's performance, even though it is using the regular openjdk JVM. One thing I would suggest is that the repository https://github.com/r-owen/kafkaprototype be moved into lsst-ts organization.
Hide
Russell Owen added a comment -

Thanks. I moved the package, as requested.

Show
Russell Owen added a comment - Thanks. I moved the package, as requested.

#### People

Assignee:
Russell Owen
Reporter:
Russell Owen
Reviewers:
Tiago Ribeiro
Watchers:
Dave Mills, Russell Owen, Tiago Ribeiro