Uploaded image for project: 'Data Management'
  1. Data Management
  2. DM-24473

Starting multiple Faust workers crashes the test topic example in the kafka-aggregator application.

    Details

    • Type: Story
    • Status: To Do
    • Resolution: Unresolved
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      When you start a second Faust worker (see kafka-aggregator README for instructions) the partitions get reassigned to the new worker but then the content of the the tumbling-window table has a mixture of objects and serialized objects values.

      Example of output showhing the problem, look at the [2020-04-09 13:42:18,048] [25811] [WARNING] log entry.

      [2020-04-09 13:41:48,035] [25811] [WARNING] [<TestTopic: time=1586464903.0109172, value=0.27039396820780215>, <TestTopic: time=1586464903.1126258, value=0.3731078082421537>, <TestTopic: time=1586464903.217427, value=0.5743990514685997>, <TestTopic: time=1586464903.319746, value=0.8719019467903101>, <TestTopic: time=1586464903.421516, value=0.5201865678987446>, <TestTopic: time=1586464903.774977, value=0.7573924423533871>]
      [2020-04-09 13:41:48,041] [25811] [INFO] Processed 6 messages on window (1586464903, 1586464903.9).
      [2020-04-09 13:41:48,041] [25811] [WARNING] [<TestTopic: time=1586464904.0118551, value=0.8079311512902113>, <TestTopic: time=1586464904.842612, value=0.523761301728725>, <TestTopic: time=1586464904.430518, value=0.8365539117164306>, <TestTopic: time=1586464904.635787, value=0.3151506857287941>, <TestTopic: time=1586464904.944146, value=0.9911582606349283>]
      [2020-04-09 13:41:48,042] [25811] [INFO] Processed 5 messages on window (1586464904, 1586464904.9).
      [2020-04-09 13:42:11,936] [25811] [WARNING] Heartbeat failed for group kafkaaggregator because it is rebalancing
      [2020-04-09 13:42:11,938] [25811] [INFO] Revoking previously assigned partitions
      ┌Topic Partition Set────────────────────────┬────────────┐
      │ topic                                     │ partitions │
      ├───────────────────────────────────────────┼────────────┤
      │ kafkaaggregator-__assignor-__leader       │ {0}        │
      │ kafkaaggregator-tumbling-window-changelog │ {0-3}      │
      │ test-topic                                │ {2-3}      │
      └───────────────────────────────────────────┴────────────┘ for group kafkaaggregator
      [2020-04-09 13:42:11,981] [25811] [INFO] (Re-)joining group kafkaaggregator
      [2020-04-09 13:42:11,993] [25811] [INFO] Joined group 'kafkaaggregator' (generation 15) with member_id faust-1.10.4-cf02bed8-d48e-4771-a285-641554152e20
      [2020-04-09 13:42:11,993] [25811] [INFO] Elected group leader -- performing partition assignments using faust
      [2020-04-09 13:42:12,012] [25811] [INFO] Successfully synced group kafkaaggregator with generation 15
      [2020-04-09 13:42:12,014] [25811] [INFO] Setting newly assigned partitions
      ┌Topic Partition Set────────────────────────┬────────────┐
      │ topic                                     │ partitions │
      ├───────────────────────────────────────────┼────────────┤
      │ kafkaaggregator-__assignor-__leader       │ {0}        │
      │ kafkaaggregator-tumbling-window-changelog │ {0-3}      │
      │ test-topic                                │ {0-3}      │
      └───────────────────────────────────────────┴────────────┘ for group kafkaaggregator
      [2020-04-09 13:42:12,040] [25811] [INFO] [^---Recovery]: Highwater for active changelog partitions:
      ┌Highwater - Active─────────────────────────┬───────────┬───────────┐
      │ topic                                     │ partition │ highwater │
      ├───────────────────────────────────────────┼───────────┼───────────┤
      │ kafkaaggregator-tumbling-window-changelog │ 0         │ 4319      │
      │ 〃                                        │ 1         │ 4290      │
      │ 〃                                        │ 2         │ 4382      │
      │ 〃                                        │ 3         │ 4500      │
      └───────────────────────────────────────────┴───────────┴───────────┘
      [2020-04-09 13:42:12,071] [25811] [INFO] [^---Recovery]: active offsets at start of reading:
      ┌Reading Starts At - Active─────────────────┬───────────┬────────┐
      │ topic                                     │ partition │ offset │
      ├───────────────────────────────────────────┼───────────┼────────┤
      │ kafkaaggregator-tumbling-window-changelog │ 0         │ 3885   │
      │ 〃                                        │ 1         │ 3875   │
      │ 〃                                        │ 2         │ 3922   │
      │ 〃                                        │ 3         │ 4080   │
      └───────────────────────────────────────────┴───────────┴────────┘
      [2020-04-09 13:42:12,074] [25811] [INFO] [^---Recovery]: standby offsets at start of reading:
      ┌Reading Starts At - Standby────────────────┬───────────┬────────┐
      │ topic                                     │ partition │ offset │
      ├───────────────────────────────────────────┼───────────┼────────┤
      │ kafkaaggregator-tumbling-window-changelog │ 0         │ 4319   │
      │ 〃                                        │ 1         │ 4290   │
      └───────────────────────────────────────────┴───────────┴────────┘
      [2020-04-09 13:42:12,075] [25811] [INFO] [^---Recovery]: Restoring state from changelog topics...
      [2020-04-09 13:42:12,076] [25811] [INFO] [^---Recovery]: Resuming flow...
      [2020-04-09 13:42:12,104] [25811] [INFO] Fetch offset 3922 is out of range for partition TopicPartition(topic='kafkaaggregator-tumbling-window-changelog', partition=2), resetting offset
      [2020-04-09 13:42:12,104] [25811] [INFO] Fetch offset 3875 is out of range for partition TopicPartition(topic='kafkaaggregator-tumbling-window-changelog', partition=1), resetting offset
      [2020-04-09 13:42:12,104] [25811] [INFO] Fetch offset 3885 is out of range for partition TopicPartition(topic='kafkaaggregator-tumbling-window-changelog', partition=0), resetting offset
      [2020-04-09 13:42:12,104] [25811] [INFO] Fetch offset 4080 is out of range for partition TopicPartition(topic='kafkaaggregator-tumbling-window-changelog', partition=3), resetting offset
      [2020-04-09 13:42:12,520] [25811] [INFO] [^---Recovery]: Done reading from changelog topics
      [2020-04-09 13:42:12,521] [25811] [INFO] [^---Recovery]: Recovery complete
      [2020-04-09 13:42:12,626] [25811] [INFO] [^---Recovery]: Restore complete!
      [2020-04-09 13:42:12,626] [25811] [INFO] [^---Recovery]: Seek stream partitions to committed offsets.
      [2020-04-09 13:42:12,755] [25811] [INFO] [^---Recovery]: Worker ready
      [2020-04-09 13:42:18,048] [25811] [WARNING] [{'time': 1586464931.081015, 'value': 0.7900335424046105, '__faust': {'ns': 'kafkaaggregator.models.TestTopic'}}, {'time': 1586464931.1845498, 'value': 0.09116006785441977, '__faust': {'ns': 'kafkaaggregator.models.TestTopic'}}, {'time': 1586464931.287586, 'value': 0.14069390523446101, '__faust': {'ns': 'kafkaaggregator.models.TestTopic'}}, <TestTopic: time=1586464931.5978189, value=0.19064550123995094>, <TestTopic: time=1586464931.495284, value=0.47868479275733844>, <TestTopic: time=1586464931.804087, value=0.5280674131458294>, <TestTopic: time=1586464931.910399, value=0.08680588873308903>]
      [2020-04-09 13:42:18,050] [25811] [ERROR] [^---Table: tumbling-window]: Crashed reason=AttributeError("'dict' object has no attribute 'value'")
      Traceback (most recent call last):
        File "/Users/afausti/Projects/new/kafka-aggregator/venv/lib/python3.7/site-packages/mode/services.py", line 779, in _execute_task
          await task
        File "/Users/afausti/Projects/new/kafka-aggregator/venv/lib/python3.7/site-packages/mode/services.py", line 459, in _and_transition
          return await fun(self, *args, **kwargs)
        File "/Users/afausti/Projects/new/kafka-aggregator/venv/lib/python3.7/site-packages/faust/tables/base.py", line 354, in _clean_data
          self._del_old_keys()
        File "/Users/afausti/Projects/new/kafka-aggregator/venv/lib/python3.7/site-packages/faust/tables/base.py", line 370, in _del_old_keys
          self.on_window_close(key, value)
        File "/Users/afausti/Projects/new/kafka-aggregator/venv/lib/python3.7/site-packages/faust/tables/base.py", line 378, in on_window_close
          self._on_window_close(key, value)
        File "/Users/afausti/Projects/new/kafka-aggregator/src/kafkaaggregator/test-topic-agent.py", line 92, in process_window
          aggregated_message = aggregate(window_timestamp, value)
        File "/Users/afausti/Projects/new/kafka-aggregator/src/kafkaaggregator/test-topic-agent.py", line 54, in aggregate
          values = [message.value for message in messages]
        File "/Users/afausti/Projects/new/kafka-aggregator/src/kafkaaggregator/test-topic-agent.py", line 54, in <listcomp>
          values = [message.value for message in messages]
      AttributeError: 'dict' object has no attribute 'value'
      [2020-04-09 13:42:18,114] [25811] [INFO] LeaveGroup request succeeded
      
      

        Attachments

          Activity

          There are no comments yet on this issue.

            People

            • Assignee:
              afausti Angelo Fausti
              Reporter:
              afausti Angelo Fausti
              Watchers:
              Angelo Fausti
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:

                Summary Panel