Consumer#

What is Consumer#

The Consumer is a program that continuously pulls records from a stream system and processes the data, either sequentially or in parallel. Typically, we require a ‘pointer’ that tells the consumer where to start pulling the data. In Kafka, it is referred to as an ‘offset’; in AWS Kinesis Stream, it is known as a ‘shard iterator’; and in Pulsar, it is called a ‘message id’.

In the previous document, we introduced the concept of Checkpoint. A consumer program essentially leverages the checkpoint, updating the processing status before and after executing processing logic, and handling errors appropriately. It also persists the checkpoint data to the storage backend every time it changes.”

What is Dead-Letter-Queue (DLQ)#

Some records may still fail after multiple retries. Typically, we aim to ensure smooth data processing without blocking it. In business-critical applications, it’s common practice to route failed data to a dedicated location, often a message queue or another stream system. This allows for debugging and later reprocessing.

In certain use cases, it’s critical to process records strictly in order. If a preceding processing attempt fails, we must stop from processing subsequent records. In such scenarios, we should halt processing and trigger a notification for immediate investigation. In any case, a Dead-Letter Queue (DLQ) serves as an additional fault-tolerant layer for business-critical use cases.

../_images/dlq.png

Simple Consumer Example#

Below is the sample usage of SimpleConsumer, a simple consumer that read data from the output of SimpleProducer.

aws_kinesis_consumer.py Output
  1# -*- coding: utf-8 -*-
  2
  3import typing as T
  4import time
  5import random
  6import shutil
  7import dataclasses
  8from pathlib import Path
  9from boto_session_manager import BotoSesManager
 10
 11from unistream.api import (
 12    KinesisRecord,
 13    KinesisStreamShard,
 14    DynamoDBS3CheckPoint,
 15    AwsKinesisStreamConsumer,
 16)
 17
 18
 19def rand_value() -> int:
 20    return random.randint(1, 100)
 21
 22
 23@dataclasses.dataclass
 24class MyRecord(KinesisRecord):
 25    value: int = dataclasses.field(default_factory=rand_value)
 26
 27
 28class RandomError(Exception):
 29    pass
 30
 31
 32@dataclasses.dataclass
 33class MyConsumer(AwsKinesisStreamConsumer):
 34    path_target: Path = dataclasses.field()
 35    path_dlq: Path = dataclasses.field()
 36
 37    def process_record(self, record: MyRecord) -> str:
 38        s = record.serialize()
 39        if random.randint(1, 100) <= 50:
 40            print(f"❌ {s}")
 41            raise RandomError(f"random error at record_id = {record.id}")
 42        else:
 43            with self.path_target.open("a") as f:
 44                f.write(f"{s}\n")
 45            print(f"✅ {s}")
 46        return s
 47
 48    def process_failed_record(self, record: MyRecord) -> str:
 49        s = record.serialize()
 50        if random.randint(1, 100) <= 0:
 51            print(f"❌ DLQ:{s}")
 52            raise RandomError(f"{s}")
 53        else:
 54            with self.path_dlq.open("a") as f:
 55                f.write(f"{s}\n")
 56            print(f"✅ DLQ: {s}")
 57        return s
 58
 59
 60dir_here = Path(__file__).absolute().parent
 61dir_demo = dir_here.joinpath("aws_kinesis_stream_consumer_demo")
 62bsm = BotoSesManager(profile_name="awshsh_app_dev_us_east_1")
 63
 64stream_name = "aws_kinesis_producer_test"
 65s3_bucket = "awshsh-app-dev-us-east-1-data"
 66s3_key = "projects/unistream/aws_kinesis_stream_consumer_demo/checkpoint.json"
 67dynamodb_table = "dynamodb_s3_checkpoint"
 68dynamodb_pk_name = "id"
 69dynamodb_pk_value = "s3://awshsh-app-dev-us-east-1-data/projects/unistream/aws_kinesis_stream_consumer_demo/checkpoint.json"
 70res = bsm.kinesis_client.list_shards(StreamName=stream_name)
 71shard_id = KinesisStreamShard.from_list_shards_response(res)[0].ShardId
 72consumer_id = f"{stream_name}-{shard_id}"
 73path_checkpoint = dir_demo.joinpath(f"{consumer_id}.checkpoint.json")
 74path_records = dir_demo.joinpath(f"{consumer_id}.records.json")
 75path_target = dir_demo.joinpath(f"{consumer_id}.target.json")
 76path_dlq = dir_demo.joinpath(f"{consumer_id}.dlq.json")
 77
 78res = bsm.kinesis_client.get_shard_iterator(
 79    StreamName=stream_name,
 80    ShardId=shard_id,
 81    ShardIteratorType="LATEST",
 82)
 83shard_iterator = res["ShardIterator"]
 84
 85# reset data and checkpoint
 86shutil.rmtree(dir_demo, ignore_errors=True)
 87dir_demo.mkdir(exist_ok=True)
 88bsm.dynamodb_client.delete_item(
 89    TableName=dynamodb_table,
 90    Key={dynamodb_pk_name: {"S": dynamodb_pk_value}},
 91)
 92
 93checkpoint = DynamoDBS3CheckPoint.load(
 94    s3_bucket=s3_bucket,
 95    s3_key=s3_key,
 96    dynamodb_table=dynamodb_table,
 97    dynamodb_pk_name=dynamodb_pk_name,
 98    dynamodb_pk_value=dynamodb_pk_value,
 99    bsm=bsm,
100    initial_pointer=shard_iterator,
101    start_pointer=shard_iterator,
102)
103
104consumer = MyConsumer.new(
105    record_class=MyRecord,
106    consumer_id=consumer_id,
107    checkpoint=checkpoint,
108    bsm=bsm,
109    stream_name=stream_name,
110    shard_id=shard_id,
111    limit=3,
112    delay=1,
113    additional_kwargs=dict(
114        path_target=path_target,
115        path_dlq=path_dlq,
116    ),
117)
118
119# --- method 1 ---
120consumer.run(verbose=True)
121
122# --- method 2 ---
123def run():
124    i = 0
125    while 1:
126        i += 1
127        print(f"--- {i} th pull ---")
128        consumer.process_batch(verbose=True)
129        if consumer.delay:
130            time.sleep(consumer.delay)
131
132
133# run()
aws_kinesis_consumer.py Output
--- 1 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 2 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 3 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 4 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.06 sec --------------------------+
--- 5 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 6 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
❌ {"id": "1", "create_at": "2024-01-09T06:24:06.759449+00:00", "value": 72}
✅ {"id": "1", "create_at": "2024-01-09T06:24:06.759449+00:00", "value": 72}
❌ {"id": "2", "create_at": "2024-01-09T06:24:07.764857+00:00", "value": 16}
❌ {"id": "2", "create_at": "2024-01-09T06:24:07.764857+00:00", "value": 16}
✅ {"id": "2", "create_at": "2024-01-09T06:24:07.764857+00:00", "value": 16}
✅ {"id": "3", "create_at": "2024-01-09T06:24:08.770662+00:00", "value": 35}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 3.05 sec --------------------------+
--- 7 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
❌ {"id": "4", "create_at": "2024-01-09T06:24:09.775362+00:00", "value": 89}
✅ {"id": "4", "create_at": "2024-01-09T06:24:09.775362+00:00", "value": 89}
❌ {"id": "5", "create_at": "2024-01-09T06:24:10.991460+00:00", "value": 7}
❌ {"id": "5", "create_at": "2024-01-09T06:24:10.991460+00:00", "value": 7}
✅ {"id": "5", "create_at": "2024-01-09T06:24:10.991460+00:00", "value": 7}
✅ {"id": "6", "create_at": "2024-01-09T06:24:11.994688+00:00", "value": 49}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 3.04 sec --------------------------+
--- 8 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
✅ {"id": "7", "create_at": "2024-01-09T06:24:13.025437+00:00", "value": 70}
✅ {"id": "8", "create_at": "2024-01-09T06:24:14.031405+00:00", "value": 32}
✅ {"id": "9", "create_at": "2024-01-09T06:24:15.036961+00:00", "value": 45}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 9 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
✅ {"id": "10", "create_at": "2024-01-09T06:24:16.043874+00:00", "value": 40}
❌ {"id": "11", "create_at": "2024-01-09T06:24:17.084737+00:00", "value": 6}
✅ {"id": "11", "create_at": "2024-01-09T06:24:17.084737+00:00", "value": 6}
❌ {"id": "12", "create_at": "2024-01-09T06:24:18.091867+00:00", "value": 87}
❌ {"id": "12", "create_at": "2024-01-09T06:24:18.091867+00:00", "value": 87}
✅ {"id": "12", "create_at": "2024-01-09T06:24:18.091867+00:00", "value": 87}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 3.07 sec --------------------------+
--- 10 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
❌ {"id": "13", "create_at": "2024-01-09T06:24:19.100240+00:00", "value": 26}
❌ {"id": "13", "create_at": "2024-01-09T06:24:19.100240+00:00", "value": 26}
❌ {"id": "13", "create_at": "2024-01-09T06:24:19.100240+00:00", "value": 26}
✅ {"id": "13", "create_at": "2024-01-09T06:24:19.100240+00:00", "value": 26}
✅ {"id": "14", "create_at": "2024-01-09T06:24:20.138912+00:00", "value": 7}
❌ {"id": "15", "create_at": "2024-01-09T06:24:21.144013+00:00", "value": 55}
✅ {"id": "15", "create_at": "2024-01-09T06:24:21.144013+00:00", "value": 55}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 4.15 sec --------------------------+
--- 11 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
❌ {"id": "16", "create_at": "2024-01-09T06:24:22.204526+00:00", "value": 57}
✅ {"id": "16", "create_at": "2024-01-09T06:24:22.204526+00:00", "value": 57}
❌ {"id": "17", "create_at": "2024-01-09T06:24:23.211185+00:00", "value": 31}
❌ {"id": "17", "create_at": "2024-01-09T06:24:23.211185+00:00", "value": 31}
❌ {"id": "17", "create_at": "2024-01-09T06:24:23.211185+00:00", "value": 31}
❌ {"id": "17", "create_at": "2024-01-09T06:24:23.211185+00:00", "value": 31}
✅ DLQ: {"id": "17", "create_at": "2024-01-09T06:24:23.211185+00:00", "value": 31}
✅ {"id": "18", "create_at": "2024-01-09T06:24:24.218099+00:00", "value": 92}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 4.06 sec --------------------------+

AWS Kinesis Producer Consumer Example#

This example demonstrates how to produce and consume message from AWS Kinesis Data Stream.

For producer, it has:

  • buffer and send records in batch

  • persist the buffered records

  • exponential backoff retry

  • recovery from last failure

For Consumer, it has:

  • automatic retry using exponential backoff

  • persist the checkpoint after succeed

  • recovery from last success

  • send failed records to dead letter queue

Note

this consumer use a local file to store checkpoint and use a local file as a DLQ. this is for POC only, in production, you should use a DynamoDB + S3 for checkpoint, and use AWS SQS or another AWS Kinesis Stream for DLQ.

aws_kinesis_consumer.py
  1# -*- coding: utf-8 -*-
  2
  3import typing as T
  4import time
  5import random
  6import shutil
  7import dataclasses
  8from pathlib import Path
  9from boto_session_manager import BotoSesManager
 10
 11from unistream.api import (
 12    KinesisRecord,
 13    KinesisStreamShard,
 14    DynamoDBS3CheckPoint,
 15    AwsKinesisStreamConsumer,
 16)
 17
 18
 19def rand_value() -> int:
 20    return random.randint(1, 100)
 21
 22
 23@dataclasses.dataclass
 24class MyRecord(KinesisRecord):
 25    value: int = dataclasses.field(default_factory=rand_value)
 26
 27
 28class RandomError(Exception):
 29    pass
 30
 31
 32@dataclasses.dataclass
 33class MyConsumer(AwsKinesisStreamConsumer):
 34    path_target: Path = dataclasses.field()
 35    path_dlq: Path = dataclasses.field()
 36
 37    def process_record(self, record: MyRecord) -> str:
 38        s = record.serialize()
 39        if random.randint(1, 100) <= 50:
 40            print(f"❌ {s}")
 41            raise RandomError(f"random error at record_id = {record.id}")
 42        else:
 43            with self.path_target.open("a") as f:
 44                f.write(f"{s}\n")
 45            print(f"✅ {s}")
 46        return s
 47
 48    def process_failed_record(self, record: MyRecord) -> str:
 49        s = record.serialize()
 50        if random.randint(1, 100) <= 0:
 51            print(f"❌ DLQ:{s}")
 52            raise RandomError(f"{s}")
 53        else:
 54            with self.path_dlq.open("a") as f:
 55                f.write(f"{s}\n")
 56            print(f"✅ DLQ: {s}")
 57        return s
 58
 59
 60dir_here = Path(__file__).absolute().parent
 61dir_demo = dir_here.joinpath("aws_kinesis_stream_consumer_demo")
 62bsm = BotoSesManager(profile_name="awshsh_app_dev_us_east_1")
 63
 64stream_name = "aws_kinesis_producer_test"
 65s3_bucket = "awshsh-app-dev-us-east-1-data"
 66s3_key = "projects/unistream/aws_kinesis_stream_consumer_demo/checkpoint.json"
 67dynamodb_table = "dynamodb_s3_checkpoint"
 68dynamodb_pk_name = "id"
 69dynamodb_pk_value = "s3://awshsh-app-dev-us-east-1-data/projects/unistream/aws_kinesis_stream_consumer_demo/checkpoint.json"
 70res = bsm.kinesis_client.list_shards(StreamName=stream_name)
 71shard_id = KinesisStreamShard.from_list_shards_response(res)[0].ShardId
 72consumer_id = f"{stream_name}-{shard_id}"
 73path_checkpoint = dir_demo.joinpath(f"{consumer_id}.checkpoint.json")
 74path_records = dir_demo.joinpath(f"{consumer_id}.records.json")
 75path_target = dir_demo.joinpath(f"{consumer_id}.target.json")
 76path_dlq = dir_demo.joinpath(f"{consumer_id}.dlq.json")
 77
 78res = bsm.kinesis_client.get_shard_iterator(
 79    StreamName=stream_name,
 80    ShardId=shard_id,
 81    ShardIteratorType="LATEST",
 82)
 83shard_iterator = res["ShardIterator"]
 84
 85# reset data and checkpoint
 86shutil.rmtree(dir_demo, ignore_errors=True)
 87dir_demo.mkdir(exist_ok=True)
 88bsm.dynamodb_client.delete_item(
 89    TableName=dynamodb_table,
 90    Key={dynamodb_pk_name: {"S": dynamodb_pk_value}},
 91)
 92
 93checkpoint = DynamoDBS3CheckPoint.load(
 94    s3_bucket=s3_bucket,
 95    s3_key=s3_key,
 96    dynamodb_table=dynamodb_table,
 97    dynamodb_pk_name=dynamodb_pk_name,
 98    dynamodb_pk_value=dynamodb_pk_value,
 99    bsm=bsm,
100    initial_pointer=shard_iterator,
101    start_pointer=shard_iterator,
102)
103
104consumer = MyConsumer.new(
105    record_class=MyRecord,
106    consumer_id=consumer_id,
107    checkpoint=checkpoint,
108    bsm=bsm,
109    stream_name=stream_name,
110    shard_id=shard_id,
111    limit=3,
112    delay=1,
113    additional_kwargs=dict(
114        path_target=path_target,
115        path_dlq=path_dlq,
116    ),
117)
118
119# --- method 1 ---
120consumer.run(verbose=True)
121
122# --- method 2 ---
123def run():
124    i = 0
125    while 1:
126        i += 1
127        print(f"--- {i} th pull ---")
128        consumer.process_batch(verbose=True)
129        if consumer.delay:
130            time.sleep(consumer.delay)
131
132
133# run()
aws_kinesis_producer.py
 1# -*- coding: utf-8 -*-
 2
 3import typing as T
 4import time
 5import random
 6import shutil
 7import dataclasses
 8from pathlib import Path
 9from boto_session_manager import BotoSesManager
10
11from unistream.api import (
12    FileBuffer,
13    RetryConfig,
14    KinesisRecord,
15    AwsKinesisStreamProducer,
16    exc,
17)
18
19
20def rand_value() -> int:
21    return random.randint(1, 100)
22
23
24@dataclasses.dataclass
25class MyRecord(KinesisRecord):
26    value: int = dataclasses.field(default_factory=rand_value)
27
28
29@dataclasses.dataclass
30class MyProducer(AwsKinesisStreamProducer):
31    def send(self, records: T.List[MyRecord]):
32        if random.randint(1, 100) <= 50:
33            raise exc.SendError("randomly failed due to send error")
34        super().send(records)
35
36
37dir_demo = Path(__file__).absolute().parent.joinpath("aws_kinesis_stream_producer_demo")
38shutil.rmtree(dir_demo, ignore_errors=True)
39dir_demo.mkdir(exist_ok=True)
40
41path_log = dir_demo / "aws_kinesis_stream_producer_buffer.log"
42bsm = BotoSesManager(profile_name="awshsh_app_dev_us_east_1")
43stream_name = "aws_kinesis_producer_test"
44
45
46def make_producer() -> MyProducer:
47    producer = MyProducer.new(
48        buffer=FileBuffer.new(
49            record_class=MyRecord,
50            path_wal=path_log,
51            max_records=3,
52        ),
53        retry_config=RetryConfig(
54            exp_backoff=[1, 2, 4],
55        ),
56        bsm=bsm,
57        stream_name=stream_name,
58    )
59    return producer
60
61
62producer = make_producer()
63
64# --- test 1 ---
65# n = 15
66# for i in range(1, 1 + n):
67#     time.sleep(1)
68#     # The producer program can be terminated with a 30% chance.
69#     # we create a new producer object to simulate that.
70#     if random.randint(1, 100) <= 30:
71#         producer = make_producer()
72#     producer.put(MyRecord(id=str(i)), verbose=True)
73
74# --- test 2 ---
75n = 1000
76for i in range(1, 1 + n):
77    time.sleep(1)
78    producer.put(MyRecord(id=f"id_{i}"), verbose=True)
aws_kinesis_consumer.py Output
--- 1 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 2 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 3 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.04 sec --------------------------+
--- 4 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 5 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
✅ {"id": "1", "create_at": "2024-01-09T05:50:53.158510+00:00", "value": 26}
❌ {"id": "2", "create_at": "2024-01-09T05:50:54.164330+00:00", "value": 87}
❌ {"id": "2", "create_at": "2024-01-09T05:50:54.164330+00:00", "value": 87}
✅ {"id": "2", "create_at": "2024-01-09T05:50:54.164330+00:00", "value": 87}
✅ {"id": "3", "create_at": "2024-01-09T05:50:55.171156+00:00", "value": 29}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 2.05 sec --------------------------+
--- 6 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
✅ {"id": "4", "create_at": "2024-01-09T05:50:56.342718+00:00", "value": 26}
✅ {"id": "5", "create_at": "2024-01-09T05:50:57.348946+00:00", "value": 62}
✅ {"id": "6", "create_at": "2024-01-09T05:50:58.353603+00:00", "value": 38}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.04 sec --------------------------+
--- 7 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 8 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 9 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
✅ {"id": "7", "create_at": "2024-01-09T05:50:59.383881+00:00", "value": 79}
✅ {"id": "8", "create_at": "2024-01-09T05:51:00.390543+00:00", "value": 23}
✅ {"id": "9", "create_at": "2024-01-09T05:51:01.396514+00:00", "value": 61}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 10 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.02 sec --------------------------+
--- 11 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 12 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
❌ {"id": "10", "create_at": "2024-01-09T05:51:02.421727+00:00", "value": 46}
✅ {"id": "10", "create_at": "2024-01-09T05:51:02.421727+00:00", "value": 46}
❌ {"id": "11", "create_at": "2024-01-09T05:51:03.427567+00:00", "value": 33}
✅ {"id": "11", "create_at": "2024-01-09T05:51:03.427567+00:00", "value": 33}
✅ {"id": "12", "create_at": "2024-01-09T05:51:04.433538+00:00", "value": 23}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 2.06 sec --------------------------+
--- 13 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
❌ {"id": "13", "create_at": "2024-01-09T05:51:05.472854+00:00", "value": 55}
❌ {"id": "13", "create_at": "2024-01-09T05:51:05.472854+00:00", "value": 55}
❌ {"id": "13", "create_at": "2024-01-09T05:51:05.472854+00:00", "value": 55}
❌ {"id": "13", "create_at": "2024-01-09T05:51:05.472854+00:00", "value": 55}
✅ DLQ: {"id": "13", "create_at": "2024-01-09T05:51:05.472854+00:00", "value": 55}
❌ {"id": "14", "create_at": "2024-01-09T05:51:06.477555+00:00", "value": 1}
❌ {"id": "14", "create_at": "2024-01-09T05:51:06.477555+00:00", "value": 1}
❌ {"id": "14", "create_at": "2024-01-09T05:51:06.477555+00:00", "value": 1}
✅ {"id": "14", "create_at": "2024-01-09T05:51:06.477555+00:00", "value": 1}
❌ {"id": "15", "create_at": "2024-01-09T05:51:07.484951+00:00", "value": 51}
✅ {"id": "15", "create_at": "2024-01-09T05:51:07.484951+00:00", "value": 51}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 7.09 sec --------------------------+
--- 14 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
✅ {"id": "16", "create_at": "2024-01-09T05:51:08.523287+00:00", "value": 11}
✅ {"id": "17", "create_at": "2024-01-09T05:51:09.529575+00:00", "value": 72}
❌ {"id": "18", "create_at": "2024-01-09T05:51:10.537244+00:00", "value": 46}
❌ {"id": "18", "create_at": "2024-01-09T05:51:10.537244+00:00", "value": 46}
❌ {"id": "18", "create_at": "2024-01-09T05:51:10.537244+00:00", "value": 46}
✅ {"id": "18", "create_at": "2024-01-09T05:51:10.537244+00:00", "value": 46}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 3.18 sec --------------------------+
aws_kinesis_producer.py Output
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "1", "create_at": "2024-01-09T05:52:44.322991+00:00", "value": 43}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "2", "create_at": "2024-01-09T05:52:45.325225+00:00", "value": 84}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "3", "create_at": "2024-01-09T05:52:46.328910+00:00", "value": 88}
📤 📤 send records: ['1', '2', '3']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.17 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "4", "create_at": "2024-01-09T05:52:47.507636+00:00", "value": 48}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "5", "create_at": "2024-01-09T05:52:48.514062+00:00", "value": 58}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "6", "create_at": "2024-01-09T05:52:49.520373+00:00", "value": 60}
📤 📤 send records: ['4', '5', '6']
📤 🔴 failed, error: SendError('randomly failed due to send error')
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "7", "create_at": "2024-01-09T05:52:50.525767+00:00", "value": 29}
📤 📤 send records: ['4', '5', '6']
📤 🔴 failed, error: SendError('randomly failed due to send error')
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "8", "create_at": "2024-01-09T05:52:51.532027+00:00", "value": 73}
📤 🚫 on hold due to exponential backoff
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "9", "create_at": "2024-01-09T05:52:52.539793+00:00", "value": 100}
📤 📤 send records: ['4', '5', '6']
📤 🔴 failed, error: SendError('randomly failed due to send error')
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "10", "create_at": "2024-01-09T05:52:53.548661+00:00", "value": 31}
📤 🚫 on hold due to exponential backoff
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "11", "create_at": "2024-01-09T05:52:54.555044+00:00", "value": 65}
📤 🚫 on hold due to exponential backoff
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "12", "create_at": "2024-01-09T05:52:55.561778+00:00", "value": 34}
📤 🚫 on hold due to exponential backoff
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "13", "create_at": "2024-01-09T05:52:56.570101+00:00", "value": 44}
📤 📤 send records: ['4', '5', '6']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.11 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "14", "create_at": "2024-01-09T05:52:57.688880+00:00", "value": 32}
📤 📤 send records: ['7', '8', '9']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.04 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "15", "create_at": "2024-01-09T05:52:58.727742+00:00", "value": 71}
📤 📤 send records: ['10', '11', '12']
📤 🔴 failed, error: SendError('randomly failed due to send error')
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "16", "create_at": "2024-01-09T05:52:59.731922+00:00", "value": 22}
📤 📤 send records: ['10', '11', '12']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.03 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "17", "create_at": "2024-01-09T05:53:00.765578+00:00", "value": 5}
📤 📤 send records: ['13', '14', '15']
📤 🔴 failed, error: SendError('randomly failed due to send error')
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "18", "create_at": "2024-01-09T05:53:01.770917+00:00", "value": 44}
📤 📤 send records: ['13', '14', '15']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.03 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "19", "create_at": "2024-01-09T05:53:02.801051+00:00", "value": 8}
📤 📤 send records: ['16', '17', '18']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.04 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "20", "create_at": "2024-01-09T05:53:03.841379+00:00", "value": 86}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "21", "create_at": "2024-01-09T05:53:04.847012+00:00", "value": 49}
📤 📤 send records: ['19', '20', '21']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.04 sec -----------------------------+