Here are my timings as per the Kafka readme, using kafka-aggregator.
The defaults:
- Write with waiting for ack from Kafka (safest) and making a pydantic model (for full validation before handing over to Kafka for some validation).
- Read and create a pydantic model.
read_kafka.py MTM1M3 tel_forceActuatorData -n 2000 -t &
|
# wait for it to start, then...
|
write_kafka.py MTM1M3 tel_forceActuatorData -n 2000
|
Write and read with default options:
Wrote 650.6 messages/second: Namespace(component='MTM1M3', nowait_ack=False, number=2000, topic='tel_forceActuatorData', validation='pydantic')
Read 652.3 messages/second: Namespace(component='MTM1M3', number=2000, postprocess='pydantic', time=True, topic='tel_forceActuatorData')
Delay mean = 0.005, stdev = 0.001, min = 0.002, max = 0.017 seconds
This is rather slow. Try the custom validator in the writer to see if it speeds up writing:
Write with --validation=custom, read with defaults:
Wrote 1473.5 messages/second: Namespace(component='MTM1M3', nowait_ack=False, number=2000, topic='tel_forceActuatorData', validation='custom')
Read 813.0 messages/second: Namespace(component='MTM1M3', number=2000, postprocess='pydantic', time=True, topic='tel_forceActuatorData')
Delay mean = 0.580, stdev = 0.323, min = 0.012, max = 1.121 seconds
The fast write speed shows that the custom validator is much faster than using pydantic.
The slow read speed and large delay both suggest that reading into pydantic is expensive, so the reader was falling behind. So try without pydantic in the reader:
Write with --validation=custom, read with --postprocess=simple_namespace:
Wrote 1337.9 messages/second: Namespace(component='MTM1M3', nowait_ack=False, number=2000, topic='tel_forceActuatorData', validation='custom')
Read 1345.6 messages/second: Namespace(component='MTM1M3', number=2000, postprocess='simple_namespace', time=True, topic='tel_forceActuatorData')
Delay mean = 0.003, stdev = 0.001, min = 0.002, max = 0.012 seconds
This looks quite reasonable and is a configuration we could easily live with.
Now try writing and reading with no ack, no extra validation (beyond what Kafka gives, which does not check array lengths, string lengths, nor extra fields) and no post-processing. This is just to show how fast Kafka can go; I would hate to operate in this mode:
Write with --validation=none --nowait_ack, read with --postprocess=simple_namespace:
Wrote 2755.8 messages/second: Namespace(component='MTM1M3', nowait_ack=True, number=2000, topic='tel_forceActuatorData', validation='none')
Read 2046.1 messages/second: Namespace(component='MTM1M3', number=2000, postprocess='simple_namespace', time=True, topic='tel_forceActuatorData')
Delay mean = 0.156, stdev = 0.072, min = 0.012, max = 0.269 seconds
By comparison, I tweaked test/read_speed.py to measure DDS speeds for the same topic (the standard version runs with two smaller topics). I get:
Read 1580 samples/second (1000 samples); lost 52 samples
Wrote 1597 samples/second (1000 samples)
Comments:
- The Kafka tests never lost data (even when not waiting for an ack from the Kafka broker, and even when writing 10,000 messages, not shown above).
- We should repeat these tests using the JVM that Dave Mills found. I hope that this will speed up waiting for ack in the writer and reduce the delays.
- Using pydantic models is too slow. Fortunately we have good alternatives.
Many thanks to Angelo Fausti for his help with this.
The package includes:
Code: https://github.com/r-owen/kafkaprototype
I also tweaked ts_salobj's tests/test_speed.py to report the speed of writing MTM1M3 forceActuatorData (one of the largest topics I could find) using temporary branch tickets/
DM-33071. I don't feel it is necessary to review this branch. I don't intend to merge it because that would require ts_salobj tests have the MTM1M3 IDL file