Details
-
Type:
Story
-
Status: To Do
-
Resolution: Unresolved
-
Fix Version/s: None
-
Component/s: None
-
Labels:None
-
Story Points:1.4
-
Epic Link:
-
Team:SQuaRE
-
Urgent?:No
Description
When you start a second Faust worker (see kafka-aggregator README for instructions) the partitions get reassigned to the new worker but then the content of the the tumbling-window table has a mixture of objects and serialized objects values.
Example of output showhing the problem, look at the [2020-04-09 13:42:18,048] [25811] [WARNING] log entry.
[2020-04-09 13:41:48,035] [25811] [WARNING] [<TestTopic: time=1586464903.0109172, value=0.27039396820780215>, <TestTopic: time=1586464903.1126258, value=0.3731078082421537>, <TestTopic: time=1586464903.217427, value=0.5743990514685997>, <TestTopic: time=1586464903.319746, value=0.8719019467903101>, <TestTopic: time=1586464903.421516, value=0.5201865678987446>, <TestTopic: time=1586464903.774977, value=0.7573924423533871>]
|
[2020-04-09 13:41:48,041] [25811] [INFO] Processed 6 messages on window (1586464903, 1586464903.9).
|
[2020-04-09 13:41:48,041] [25811] [WARNING] [<TestTopic: time=1586464904.0118551, value=0.8079311512902113>, <TestTopic: time=1586464904.842612, value=0.523761301728725>, <TestTopic: time=1586464904.430518, value=0.8365539117164306>, <TestTopic: time=1586464904.635787, value=0.3151506857287941>, <TestTopic: time=1586464904.944146, value=0.9911582606349283>]
|
[2020-04-09 13:41:48,042] [25811] [INFO] Processed 5 messages on window (1586464904, 1586464904.9).
|
[2020-04-09 13:42:11,936] [25811] [WARNING] Heartbeat failed for group kafkaaggregator because it is rebalancing
|
[2020-04-09 13:42:11,938] [25811] [INFO] Revoking previously assigned partitions
|
┌Topic Partition Set────────────────────────┬────────────┐
|
│ topic │ partitions │
|
├───────────────────────────────────────────┼────────────┤
|
│ kafkaaggregator-__assignor-__leader │ {0} │
|
│ kafkaaggregator-tumbling-window-changelog │ {0-3} │
|
│ test-topic │ {2-3} │
|
└───────────────────────────────────────────┴────────────┘ for group kafkaaggregator
|
[2020-04-09 13:42:11,981] [25811] [INFO] (Re-)joining group kafkaaggregator
|
[2020-04-09 13:42:11,993] [25811] [INFO] Joined group 'kafkaaggregator' (generation 15) with member_id faust-1.10.4-cf02bed8-d48e-4771-a285-641554152e20
|
[2020-04-09 13:42:11,993] [25811] [INFO] Elected group leader -- performing partition assignments using faust
|
[2020-04-09 13:42:12,012] [25811] [INFO] Successfully synced group kafkaaggregator with generation 15
|
[2020-04-09 13:42:12,014] [25811] [INFO] Setting newly assigned partitions
|
┌Topic Partition Set────────────────────────┬────────────┐
|
│ topic │ partitions │
|
├───────────────────────────────────────────┼────────────┤
|
│ kafkaaggregator-__assignor-__leader │ {0} │
|
│ kafkaaggregator-tumbling-window-changelog │ {0-3} │
|
│ test-topic │ {0-3} │
|
└───────────────────────────────────────────┴────────────┘ for group kafkaaggregator
|
[2020-04-09 13:42:12,040] [25811] [INFO] [^---Recovery]: Highwater for active changelog partitions:
|
┌Highwater - Active─────────────────────────┬───────────┬───────────┐
|
│ topic │ partition │ highwater │
|
├───────────────────────────────────────────┼───────────┼───────────┤
|
│ kafkaaggregator-tumbling-window-changelog │ 0 │ 4319 │
|
│ 〃 │ 1 │ 4290 │
|
│ 〃 │ 2 │ 4382 │
|
│ 〃 │ 3 │ 4500 │
|
└───────────────────────────────────────────┴───────────┴───────────┘
|
[2020-04-09 13:42:12,071] [25811] [INFO] [^---Recovery]: active offsets at start of reading:
|
┌Reading Starts At - Active─────────────────┬───────────┬────────┐
|
│ topic │ partition │ offset │
|
├───────────────────────────────────────────┼───────────┼────────┤
|
│ kafkaaggregator-tumbling-window-changelog │ 0 │ 3885 │
|
│ 〃 │ 1 │ 3875 │
|
│ 〃 │ 2 │ 3922 │
|
│ 〃 │ 3 │ 4080 │
|
└───────────────────────────────────────────┴───────────┴────────┘
|
[2020-04-09 13:42:12,074] [25811] [INFO] [^---Recovery]: standby offsets at start of reading:
|
┌Reading Starts At - Standby────────────────┬───────────┬────────┐
|
│ topic │ partition │ offset │
|
├───────────────────────────────────────────┼───────────┼────────┤
|
│ kafkaaggregator-tumbling-window-changelog │ 0 │ 4319 │
|
│ 〃 │ 1 │ 4290 │
|
└───────────────────────────────────────────┴───────────┴────────┘
|
[2020-04-09 13:42:12,075] [25811] [INFO] [^---Recovery]: Restoring state from changelog topics...
|
[2020-04-09 13:42:12,076] [25811] [INFO] [^---Recovery]: Resuming flow...
|
[2020-04-09 13:42:12,104] [25811] [INFO] Fetch offset 3922 is out of range for partition TopicPartition(topic='kafkaaggregator-tumbling-window-changelog', partition=2), resetting offset
|
[2020-04-09 13:42:12,104] [25811] [INFO] Fetch offset 3875 is out of range for partition TopicPartition(topic='kafkaaggregator-tumbling-window-changelog', partition=1), resetting offset
|
[2020-04-09 13:42:12,104] [25811] [INFO] Fetch offset 3885 is out of range for partition TopicPartition(topic='kafkaaggregator-tumbling-window-changelog', partition=0), resetting offset
|
[2020-04-09 13:42:12,104] [25811] [INFO] Fetch offset 4080 is out of range for partition TopicPartition(topic='kafkaaggregator-tumbling-window-changelog', partition=3), resetting offset
|
[2020-04-09 13:42:12,520] [25811] [INFO] [^---Recovery]: Done reading from changelog topics
|
[2020-04-09 13:42:12,521] [25811] [INFO] [^---Recovery]: Recovery complete
|
[2020-04-09 13:42:12,626] [25811] [INFO] [^---Recovery]: Restore complete!
|
[2020-04-09 13:42:12,626] [25811] [INFO] [^---Recovery]: Seek stream partitions to committed offsets.
|
[2020-04-09 13:42:12,755] [25811] [INFO] [^---Recovery]: Worker ready
|
[2020-04-09 13:42:18,048] [25811] [WARNING] [{'time': 1586464931.081015, 'value': 0.7900335424046105, '__faust': {'ns': 'kafkaaggregator.models.TestTopic'}}, {'time': 1586464931.1845498, 'value': 0.09116006785441977, '__faust': {'ns': 'kafkaaggregator.models.TestTopic'}}, {'time': 1586464931.287586, 'value': 0.14069390523446101, '__faust': {'ns': 'kafkaaggregator.models.TestTopic'}}, <TestTopic: time=1586464931.5978189, value=0.19064550123995094>, <TestTopic: time=1586464931.495284, value=0.47868479275733844>, <TestTopic: time=1586464931.804087, value=0.5280674131458294>, <TestTopic: time=1586464931.910399, value=0.08680588873308903>]
|
[2020-04-09 13:42:18,050] [25811] [ERROR] [^---Table: tumbling-window]: Crashed reason=AttributeError("'dict' object has no attribute 'value'")
|
Traceback (most recent call last):
|
File "/Users/afausti/Projects/new/kafka-aggregator/venv/lib/python3.7/site-packages/mode/services.py", line 779, in _execute_task
|
await task
|
File "/Users/afausti/Projects/new/kafka-aggregator/venv/lib/python3.7/site-packages/mode/services.py", line 459, in _and_transition
|
return await fun(self, *args, **kwargs)
|
File "/Users/afausti/Projects/new/kafka-aggregator/venv/lib/python3.7/site-packages/faust/tables/base.py", line 354, in _clean_data
|
self._del_old_keys()
|
File "/Users/afausti/Projects/new/kafka-aggregator/venv/lib/python3.7/site-packages/faust/tables/base.py", line 370, in _del_old_keys
|
self.on_window_close(key, value)
|
File "/Users/afausti/Projects/new/kafka-aggregator/venv/lib/python3.7/site-packages/faust/tables/base.py", line 378, in on_window_close
|
self._on_window_close(key, value)
|
File "/Users/afausti/Projects/new/kafka-aggregator/src/kafkaaggregator/test-topic-agent.py", line 92, in process_window
|
aggregated_message = aggregate(window_timestamp, value)
|
File "/Users/afausti/Projects/new/kafka-aggregator/src/kafkaaggregator/test-topic-agent.py", line 54, in aggregate
|
values = [message.value for message in messages]
|
File "/Users/afausti/Projects/new/kafka-aggregator/src/kafkaaggregator/test-topic-agent.py", line 54, in <listcomp>
|
values = [message.value for message in messages]
|
AttributeError: 'dict' object has no attribute 'value'
|
[2020-04-09 13:42:18,114] [25811] [INFO] LeaveGroup request succeeded
|
|