Consumer#
What is Consumer#
The Consumer is a program that continuously pulls records from a stream system and processes the data, either sequentially or in parallel. Typically, we require a ‘pointer’ that tells the consumer where to start pulling the data. In Kafka, it is referred to as an ‘offset’; in AWS Kinesis Stream, it is known as a ‘shard iterator’; and in Pulsar, it is called a ‘message id’.
In the previous document, we introduced the concept of Checkpoint. A consumer program essentially leverages the checkpoint, updating the processing status before and after executing processing logic, and handling errors appropriately. It also persists the checkpoint data to the storage backend every time it changes.”
What is Dead-Letter-Queue (DLQ)#
Some records may still fail after multiple retries. Typically, we aim to ensure smooth data processing without blocking it. In business-critical applications, it’s common practice to route failed data to a dedicated location, often a message queue or another stream system. This allows for debugging and later reprocessing.
In certain use cases, it’s critical to process records strictly in order. If a preceding processing attempt fails, we must stop from processing subsequent records. In such scenarios, we should halt processing and trigger a notification for immediate investigation. In any case, a Dead-Letter Queue (DLQ) serves as an additional fault-tolerant layer for business-critical use cases.
Simple Consumer Example#
Below is the sample usage of SimpleConsumer
, a simple consumer that read data from the output of SimpleProducer
.
aws_kinesis_consumer.py Output
1# -*- coding: utf-8 -*-
2
3import typing as T
4import time
5import random
6import shutil
7import dataclasses
8from pathlib import Path
9from boto_session_manager import BotoSesManager
10
11from unistream.api import (
12 KinesisRecord,
13 KinesisStreamShard,
14 DynamoDBS3CheckPoint,
15 AwsKinesisStreamConsumer,
16)
17
18
19def rand_value() -> int:
20 return random.randint(1, 100)
21
22
23@dataclasses.dataclass
24class MyRecord(KinesisRecord):
25 value: int = dataclasses.field(default_factory=rand_value)
26
27
28class RandomError(Exception):
29 pass
30
31
32@dataclasses.dataclass
33class MyConsumer(AwsKinesisStreamConsumer):
34 path_target: Path = dataclasses.field()
35 path_dlq: Path = dataclasses.field()
36
37 def process_record(self, record: MyRecord) -> str:
38 s = record.serialize()
39 if random.randint(1, 100) <= 50:
40 print(f"❌ {s}")
41 raise RandomError(f"random error at record_id = {record.id}")
42 else:
43 with self.path_target.open("a") as f:
44 f.write(f"{s}\n")
45 print(f"✅ {s}")
46 return s
47
48 def process_failed_record(self, record: MyRecord) -> str:
49 s = record.serialize()
50 if random.randint(1, 100) <= 0:
51 print(f"❌ DLQ:{s}")
52 raise RandomError(f"{s}")
53 else:
54 with self.path_dlq.open("a") as f:
55 f.write(f"{s}\n")
56 print(f"✅ DLQ: {s}")
57 return s
58
59
60dir_here = Path(__file__).absolute().parent
61dir_demo = dir_here.joinpath("aws_kinesis_stream_consumer_demo")
62bsm = BotoSesManager(profile_name="awshsh_app_dev_us_east_1")
63
64stream_name = "aws_kinesis_producer_test"
65s3_bucket = "awshsh-app-dev-us-east-1-data"
66s3_key = "projects/unistream/aws_kinesis_stream_consumer_demo/checkpoint.json"
67dynamodb_table = "dynamodb_s3_checkpoint"
68dynamodb_pk_name = "id"
69dynamodb_pk_value = "s3://awshsh-app-dev-us-east-1-data/projects/unistream/aws_kinesis_stream_consumer_demo/checkpoint.json"
70res = bsm.kinesis_client.list_shards(StreamName=stream_name)
71shard_id = KinesisStreamShard.from_list_shards_response(res)[0].ShardId
72consumer_id = f"{stream_name}-{shard_id}"
73path_checkpoint = dir_demo.joinpath(f"{consumer_id}.checkpoint.json")
74path_records = dir_demo.joinpath(f"{consumer_id}.records.json")
75path_target = dir_demo.joinpath(f"{consumer_id}.target.json")
76path_dlq = dir_demo.joinpath(f"{consumer_id}.dlq.json")
77
78res = bsm.kinesis_client.get_shard_iterator(
79 StreamName=stream_name,
80 ShardId=shard_id,
81 ShardIteratorType="LATEST",
82)
83shard_iterator = res["ShardIterator"]
84
85# reset data and checkpoint
86shutil.rmtree(dir_demo, ignore_errors=True)
87dir_demo.mkdir(exist_ok=True)
88bsm.dynamodb_client.delete_item(
89 TableName=dynamodb_table,
90 Key={dynamodb_pk_name: {"S": dynamodb_pk_value}},
91)
92
93checkpoint = DynamoDBS3CheckPoint.load(
94 s3_bucket=s3_bucket,
95 s3_key=s3_key,
96 dynamodb_table=dynamodb_table,
97 dynamodb_pk_name=dynamodb_pk_name,
98 dynamodb_pk_value=dynamodb_pk_value,
99 bsm=bsm,
100 initial_pointer=shard_iterator,
101 start_pointer=shard_iterator,
102)
103
104consumer = MyConsumer.new(
105 record_class=MyRecord,
106 consumer_id=consumer_id,
107 checkpoint=checkpoint,
108 bsm=bsm,
109 stream_name=stream_name,
110 shard_id=shard_id,
111 limit=3,
112 delay=1,
113 additional_kwargs=dict(
114 path_target=path_target,
115 path_dlq=path_dlq,
116 ),
117)
118
119# --- method 1 ---
120consumer.run(verbose=True)
121
122# --- method 2 ---
123def run():
124 i = 0
125 while 1:
126 i += 1
127 print(f"--- {i} th pull ---")
128 consumer.process_batch(verbose=True)
129 if consumer.delay:
130 time.sleep(consumer.delay)
131
132
133# run()
aws_kinesis_consumer.py Output
--- 1 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 2 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 3 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 4 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.06 sec --------------------------+
--- 5 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 6 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
❌ {"id": "1", "create_at": "2024-01-09T06:24:06.759449+00:00", "value": 72}
✅ {"id": "1", "create_at": "2024-01-09T06:24:06.759449+00:00", "value": 72}
❌ {"id": "2", "create_at": "2024-01-09T06:24:07.764857+00:00", "value": 16}
❌ {"id": "2", "create_at": "2024-01-09T06:24:07.764857+00:00", "value": 16}
✅ {"id": "2", "create_at": "2024-01-09T06:24:07.764857+00:00", "value": 16}
✅ {"id": "3", "create_at": "2024-01-09T06:24:08.770662+00:00", "value": 35}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 3.05 sec --------------------------+
--- 7 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
❌ {"id": "4", "create_at": "2024-01-09T06:24:09.775362+00:00", "value": 89}
✅ {"id": "4", "create_at": "2024-01-09T06:24:09.775362+00:00", "value": 89}
❌ {"id": "5", "create_at": "2024-01-09T06:24:10.991460+00:00", "value": 7}
❌ {"id": "5", "create_at": "2024-01-09T06:24:10.991460+00:00", "value": 7}
✅ {"id": "5", "create_at": "2024-01-09T06:24:10.991460+00:00", "value": 7}
✅ {"id": "6", "create_at": "2024-01-09T06:24:11.994688+00:00", "value": 49}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 3.04 sec --------------------------+
--- 8 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
✅ {"id": "7", "create_at": "2024-01-09T06:24:13.025437+00:00", "value": 70}
✅ {"id": "8", "create_at": "2024-01-09T06:24:14.031405+00:00", "value": 32}
✅ {"id": "9", "create_at": "2024-01-09T06:24:15.036961+00:00", "value": 45}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 9 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
✅ {"id": "10", "create_at": "2024-01-09T06:24:16.043874+00:00", "value": 40}
❌ {"id": "11", "create_at": "2024-01-09T06:24:17.084737+00:00", "value": 6}
✅ {"id": "11", "create_at": "2024-01-09T06:24:17.084737+00:00", "value": 6}
❌ {"id": "12", "create_at": "2024-01-09T06:24:18.091867+00:00", "value": 87}
❌ {"id": "12", "create_at": "2024-01-09T06:24:18.091867+00:00", "value": 87}
✅ {"id": "12", "create_at": "2024-01-09T06:24:18.091867+00:00", "value": 87}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 3.07 sec --------------------------+
--- 10 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
❌ {"id": "13", "create_at": "2024-01-09T06:24:19.100240+00:00", "value": 26}
❌ {"id": "13", "create_at": "2024-01-09T06:24:19.100240+00:00", "value": 26}
❌ {"id": "13", "create_at": "2024-01-09T06:24:19.100240+00:00", "value": 26}
✅ {"id": "13", "create_at": "2024-01-09T06:24:19.100240+00:00", "value": 26}
✅ {"id": "14", "create_at": "2024-01-09T06:24:20.138912+00:00", "value": 7}
❌ {"id": "15", "create_at": "2024-01-09T06:24:21.144013+00:00", "value": 55}
✅ {"id": "15", "create_at": "2024-01-09T06:24:21.144013+00:00", "value": 55}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 4.15 sec --------------------------+
--- 11 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
❌ {"id": "16", "create_at": "2024-01-09T06:24:22.204526+00:00", "value": 57}
✅ {"id": "16", "create_at": "2024-01-09T06:24:22.204526+00:00", "value": 57}
❌ {"id": "17", "create_at": "2024-01-09T06:24:23.211185+00:00", "value": 31}
❌ {"id": "17", "create_at": "2024-01-09T06:24:23.211185+00:00", "value": 31}
❌ {"id": "17", "create_at": "2024-01-09T06:24:23.211185+00:00", "value": 31}
❌ {"id": "17", "create_at": "2024-01-09T06:24:23.211185+00:00", "value": 31}
✅ DLQ: {"id": "17", "create_at": "2024-01-09T06:24:23.211185+00:00", "value": 31}
✅ {"id": "18", "create_at": "2024-01-09T06:24:24.218099+00:00", "value": 92}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 4.06 sec --------------------------+
AWS Kinesis Producer Consumer Example#
This example demonstrates how to produce and consume message from AWS Kinesis Data Stream.
For producer, it has:
buffer and send records in batch
persist the buffered records
exponential backoff retry
recovery from last failure
For Consumer, it has:
automatic retry using exponential backoff
persist the checkpoint after succeed
recovery from last success
send failed records to dead letter queue
Note
this consumer use a local file to store checkpoint and use a local file as a DLQ. this is for POC only, in production, you should use a DynamoDB + S3 for checkpoint, and use AWS SQS or another AWS Kinesis Stream for DLQ.
aws_kinesis_consumer.py
1# -*- coding: utf-8 -*-
2
3import typing as T
4import time
5import random
6import shutil
7import dataclasses
8from pathlib import Path
9from boto_session_manager import BotoSesManager
10
11from unistream.api import (
12 KinesisRecord,
13 KinesisStreamShard,
14 DynamoDBS3CheckPoint,
15 AwsKinesisStreamConsumer,
16)
17
18
19def rand_value() -> int:
20 return random.randint(1, 100)
21
22
23@dataclasses.dataclass
24class MyRecord(KinesisRecord):
25 value: int = dataclasses.field(default_factory=rand_value)
26
27
28class RandomError(Exception):
29 pass
30
31
32@dataclasses.dataclass
33class MyConsumer(AwsKinesisStreamConsumer):
34 path_target: Path = dataclasses.field()
35 path_dlq: Path = dataclasses.field()
36
37 def process_record(self, record: MyRecord) -> str:
38 s = record.serialize()
39 if random.randint(1, 100) <= 50:
40 print(f"❌ {s}")
41 raise RandomError(f"random error at record_id = {record.id}")
42 else:
43 with self.path_target.open("a") as f:
44 f.write(f"{s}\n")
45 print(f"✅ {s}")
46 return s
47
48 def process_failed_record(self, record: MyRecord) -> str:
49 s = record.serialize()
50 if random.randint(1, 100) <= 0:
51 print(f"❌ DLQ:{s}")
52 raise RandomError(f"{s}")
53 else:
54 with self.path_dlq.open("a") as f:
55 f.write(f"{s}\n")
56 print(f"✅ DLQ: {s}")
57 return s
58
59
60dir_here = Path(__file__).absolute().parent
61dir_demo = dir_here.joinpath("aws_kinesis_stream_consumer_demo")
62bsm = BotoSesManager(profile_name="awshsh_app_dev_us_east_1")
63
64stream_name = "aws_kinesis_producer_test"
65s3_bucket = "awshsh-app-dev-us-east-1-data"
66s3_key = "projects/unistream/aws_kinesis_stream_consumer_demo/checkpoint.json"
67dynamodb_table = "dynamodb_s3_checkpoint"
68dynamodb_pk_name = "id"
69dynamodb_pk_value = "s3://awshsh-app-dev-us-east-1-data/projects/unistream/aws_kinesis_stream_consumer_demo/checkpoint.json"
70res = bsm.kinesis_client.list_shards(StreamName=stream_name)
71shard_id = KinesisStreamShard.from_list_shards_response(res)[0].ShardId
72consumer_id = f"{stream_name}-{shard_id}"
73path_checkpoint = dir_demo.joinpath(f"{consumer_id}.checkpoint.json")
74path_records = dir_demo.joinpath(f"{consumer_id}.records.json")
75path_target = dir_demo.joinpath(f"{consumer_id}.target.json")
76path_dlq = dir_demo.joinpath(f"{consumer_id}.dlq.json")
77
78res = bsm.kinesis_client.get_shard_iterator(
79 StreamName=stream_name,
80 ShardId=shard_id,
81 ShardIteratorType="LATEST",
82)
83shard_iterator = res["ShardIterator"]
84
85# reset data and checkpoint
86shutil.rmtree(dir_demo, ignore_errors=True)
87dir_demo.mkdir(exist_ok=True)
88bsm.dynamodb_client.delete_item(
89 TableName=dynamodb_table,
90 Key={dynamodb_pk_name: {"S": dynamodb_pk_value}},
91)
92
93checkpoint = DynamoDBS3CheckPoint.load(
94 s3_bucket=s3_bucket,
95 s3_key=s3_key,
96 dynamodb_table=dynamodb_table,
97 dynamodb_pk_name=dynamodb_pk_name,
98 dynamodb_pk_value=dynamodb_pk_value,
99 bsm=bsm,
100 initial_pointer=shard_iterator,
101 start_pointer=shard_iterator,
102)
103
104consumer = MyConsumer.new(
105 record_class=MyRecord,
106 consumer_id=consumer_id,
107 checkpoint=checkpoint,
108 bsm=bsm,
109 stream_name=stream_name,
110 shard_id=shard_id,
111 limit=3,
112 delay=1,
113 additional_kwargs=dict(
114 path_target=path_target,
115 path_dlq=path_dlq,
116 ),
117)
118
119# --- method 1 ---
120consumer.run(verbose=True)
121
122# --- method 2 ---
123def run():
124 i = 0
125 while 1:
126 i += 1
127 print(f"--- {i} th pull ---")
128 consumer.process_batch(verbose=True)
129 if consumer.delay:
130 time.sleep(consumer.delay)
131
132
133# run()
aws_kinesis_producer.py
1# -*- coding: utf-8 -*-
2
3import typing as T
4import time
5import random
6import shutil
7import dataclasses
8from pathlib import Path
9from boto_session_manager import BotoSesManager
10
11from unistream.api import (
12 FileBuffer,
13 RetryConfig,
14 KinesisRecord,
15 AwsKinesisStreamProducer,
16 exc,
17)
18
19
20def rand_value() -> int:
21 return random.randint(1, 100)
22
23
24@dataclasses.dataclass
25class MyRecord(KinesisRecord):
26 value: int = dataclasses.field(default_factory=rand_value)
27
28
29@dataclasses.dataclass
30class MyProducer(AwsKinesisStreamProducer):
31 def send(self, records: T.List[MyRecord]):
32 if random.randint(1, 100) <= 50:
33 raise exc.SendError("randomly failed due to send error")
34 super().send(records)
35
36
37dir_demo = Path(__file__).absolute().parent.joinpath("aws_kinesis_stream_producer_demo")
38shutil.rmtree(dir_demo, ignore_errors=True)
39dir_demo.mkdir(exist_ok=True)
40
41path_log = dir_demo / "aws_kinesis_stream_producer_buffer.log"
42bsm = BotoSesManager(profile_name="awshsh_app_dev_us_east_1")
43stream_name = "aws_kinesis_producer_test"
44
45
46def make_producer() -> MyProducer:
47 producer = MyProducer.new(
48 buffer=FileBuffer.new(
49 record_class=MyRecord,
50 path_wal=path_log,
51 max_records=3,
52 ),
53 retry_config=RetryConfig(
54 exp_backoff=[1, 2, 4],
55 ),
56 bsm=bsm,
57 stream_name=stream_name,
58 )
59 return producer
60
61
62producer = make_producer()
63
64# --- test 1 ---
65# n = 15
66# for i in range(1, 1 + n):
67# time.sleep(1)
68# # The producer program can be terminated with a 30% chance.
69# # we create a new producer object to simulate that.
70# if random.randint(1, 100) <= 30:
71# producer = make_producer()
72# producer.put(MyRecord(id=str(i)), verbose=True)
73
74# --- test 2 ---
75n = 1000
76for i in range(1, 1 + n):
77 time.sleep(1)
78 producer.put(MyRecord(id=f"id_{i}"), verbose=True)
aws_kinesis_consumer.py Output
--- 1 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 2 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 3 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.04 sec --------------------------+
--- 4 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 5 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
✅ {"id": "1", "create_at": "2024-01-09T05:50:53.158510+00:00", "value": 26}
❌ {"id": "2", "create_at": "2024-01-09T05:50:54.164330+00:00", "value": 87}
❌ {"id": "2", "create_at": "2024-01-09T05:50:54.164330+00:00", "value": 87}
✅ {"id": "2", "create_at": "2024-01-09T05:50:54.164330+00:00", "value": 87}
✅ {"id": "3", "create_at": "2024-01-09T05:50:55.171156+00:00", "value": 29}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 2.05 sec --------------------------+
--- 6 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
✅ {"id": "4", "create_at": "2024-01-09T05:50:56.342718+00:00", "value": 26}
✅ {"id": "5", "create_at": "2024-01-09T05:50:57.348946+00:00", "value": 62}
✅ {"id": "6", "create_at": "2024-01-09T05:50:58.353603+00:00", "value": 38}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.04 sec --------------------------+
--- 7 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 8 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 9 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
✅ {"id": "7", "create_at": "2024-01-09T05:50:59.383881+00:00", "value": 79}
✅ {"id": "8", "create_at": "2024-01-09T05:51:00.390543+00:00", "value": 23}
✅ {"id": "9", "create_at": "2024-01-09T05:51:01.396514+00:00", "value": 61}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 10 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.02 sec --------------------------+
--- 11 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 0.03 sec --------------------------+
--- 12 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
❌ {"id": "10", "create_at": "2024-01-09T05:51:02.421727+00:00", "value": 46}
✅ {"id": "10", "create_at": "2024-01-09T05:51:02.421727+00:00", "value": 46}
❌ {"id": "11", "create_at": "2024-01-09T05:51:03.427567+00:00", "value": 33}
✅ {"id": "11", "create_at": "2024-01-09T05:51:03.427567+00:00", "value": 33}
✅ {"id": "12", "create_at": "2024-01-09T05:51:04.433538+00:00", "value": 23}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 2.06 sec --------------------------+
--- 13 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
❌ {"id": "13", "create_at": "2024-01-09T05:51:05.472854+00:00", "value": 55}
❌ {"id": "13", "create_at": "2024-01-09T05:51:05.472854+00:00", "value": 55}
❌ {"id": "13", "create_at": "2024-01-09T05:51:05.472854+00:00", "value": 55}
❌ {"id": "13", "create_at": "2024-01-09T05:51:05.472854+00:00", "value": 55}
✅ DLQ: {"id": "13", "create_at": "2024-01-09T05:51:05.472854+00:00", "value": 55}
❌ {"id": "14", "create_at": "2024-01-09T05:51:06.477555+00:00", "value": 1}
❌ {"id": "14", "create_at": "2024-01-09T05:51:06.477555+00:00", "value": 1}
❌ {"id": "14", "create_at": "2024-01-09T05:51:06.477555+00:00", "value": 1}
✅ {"id": "14", "create_at": "2024-01-09T05:51:06.477555+00:00", "value": 1}
❌ {"id": "15", "create_at": "2024-01-09T05:51:07.484951+00:00", "value": 51}
✅ {"id": "15", "create_at": "2024-01-09T05:51:07.484951+00:00", "value": 51}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 7.09 sec --------------------------+
--- 14 th pull ---
+----- ⏱ ⏳ Start 'process batch' ----------------------------------------------+
⏳
✅ {"id": "16", "create_at": "2024-01-09T05:51:08.523287+00:00", "value": 11}
✅ {"id": "17", "create_at": "2024-01-09T05:51:09.529575+00:00", "value": 72}
❌ {"id": "18", "create_at": "2024-01-09T05:51:10.537244+00:00", "value": 46}
❌ {"id": "18", "create_at": "2024-01-09T05:51:10.537244+00:00", "value": 46}
❌ {"id": "18", "create_at": "2024-01-09T05:51:10.537244+00:00", "value": 46}
✅ {"id": "18", "create_at": "2024-01-09T05:51:10.537244+00:00", "value": 46}
⏳
+----- ⏰ ✅ ⏳ End 'process batch', elapsed = 3.18 sec --------------------------+
aws_kinesis_producer.py Output
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "1", "create_at": "2024-01-09T05:52:44.322991+00:00", "value": 43}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "2", "create_at": "2024-01-09T05:52:45.325225+00:00", "value": 84}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "3", "create_at": "2024-01-09T05:52:46.328910+00:00", "value": 88}
📤 📤 send records: ['1', '2', '3']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.17 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "4", "create_at": "2024-01-09T05:52:47.507636+00:00", "value": 48}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "5", "create_at": "2024-01-09T05:52:48.514062+00:00", "value": 58}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "6", "create_at": "2024-01-09T05:52:49.520373+00:00", "value": 60}
📤 📤 send records: ['4', '5', '6']
📤 🔴 failed, error: SendError('randomly failed due to send error')
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "7", "create_at": "2024-01-09T05:52:50.525767+00:00", "value": 29}
📤 📤 send records: ['4', '5', '6']
📤 🔴 failed, error: SendError('randomly failed due to send error')
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "8", "create_at": "2024-01-09T05:52:51.532027+00:00", "value": 73}
📤 🚫 on hold due to exponential backoff
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "9", "create_at": "2024-01-09T05:52:52.539793+00:00", "value": 100}
📤 📤 send records: ['4', '5', '6']
📤 🔴 failed, error: SendError('randomly failed due to send error')
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "10", "create_at": "2024-01-09T05:52:53.548661+00:00", "value": 31}
📤 🚫 on hold due to exponential backoff
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "11", "create_at": "2024-01-09T05:52:54.555044+00:00", "value": 65}
📤 🚫 on hold due to exponential backoff
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "12", "create_at": "2024-01-09T05:52:55.561778+00:00", "value": 34}
📤 🚫 on hold due to exponential backoff
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "13", "create_at": "2024-01-09T05:52:56.570101+00:00", "value": 44}
📤 📤 send records: ['4', '5', '6']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.11 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "14", "create_at": "2024-01-09T05:52:57.688880+00:00", "value": 32}
📤 📤 send records: ['7', '8', '9']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.04 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "15", "create_at": "2024-01-09T05:52:58.727742+00:00", "value": 71}
📤 📤 send records: ['10', '11', '12']
📤 🔴 failed, error: SendError('randomly failed due to send error')
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "16", "create_at": "2024-01-09T05:52:59.731922+00:00", "value": 22}
📤 📤 send records: ['10', '11', '12']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.03 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "17", "create_at": "2024-01-09T05:53:00.765578+00:00", "value": 5}
📤 📤 send records: ['13', '14', '15']
📤 🔴 failed, error: SendError('randomly failed due to send error')
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "18", "create_at": "2024-01-09T05:53:01.770917+00:00", "value": 44}
📤 📤 send records: ['13', '14', '15']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.03 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "19", "create_at": "2024-01-09T05:53:02.801051+00:00", "value": 8}
📤 📤 send records: ['16', '17', '18']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.04 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "20", "create_at": "2024-01-09T05:53:03.841379+00:00", "value": 86}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "21", "create_at": "2024-01-09T05:53:04.847012+00:00", "value": 49}
📤 📤 send records: ['19', '20', '21']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.04 sec -----------------------------+