Producer#

What is Producer#

The Producer is a program that continuously generates records and sends them to a target system. To optimize performance, it utilize a buffer to send data in batches. Regardless of whether it’s the producer program or the buffer program that encounters a failure, we must have the capability to restart the program and retry the operation without data loss. This retry process should be executed gracefully, incorporating an exponential backoff strategy. Upon successful completion, we can remove the data from the buffer to save space. This action is referred to as “commit”.

../_images/producer.png ../_images/event-loop.png

Error Handling#

What exponential backoff mean is that, we wait longer and longer between each retry and stop retrying at certain number of failure. The wait internal and max retry count strategy is called a “Schedule”.

When the max retry count is reached, we have two options:

  1. skip: persist the context data of this batch for debug or future retry, and skip to the next batch.

  2. raise: raise error immediately and stop the program.

../_images/error-handling.png

SimpleProducer Example#

Below is the sample usage of SimpleProducer, a simple producer that send data to a target file on your local machine in append-only mode. This producer is for demo and for testing purpose.

simple_producer.py
 1# -*- coding: utf-8 -*-
 2
 3import typing as T
 4import time
 5import random
 6import shutil
 7import dataclasses
 8from pathlib import Path
 9
10from unistream.api import (
11    exc,
12    DataClassRecord,
13    FileBuffer,
14    RetryConfig,
15    SimpleProducer,
16)
17
18
19def rand_value() -> int:
20    return random.randint(1, 100)
21
22
23@dataclasses.dataclass
24class MyRecord(DataClassRecord):
25    value: int = dataclasses.field(default_factory=rand_value)
26
27
28@dataclasses.dataclass
29class MyProducer(SimpleProducer):
30    def send(self, records: T.List[MyRecord]):
31        if random.randint(1, 100) <= 50:
32            raise exc.SendError("randomly failed due to send error")
33        super().send(records)
34
35
36dir_demo = Path(__file__).absolute().parent.joinpath("simple_producer_demo")
37shutil.rmtree(dir_demo, ignore_errors=True)
38dir_demo.mkdir(exist_ok=True)
39
40path_log = dir_demo / "simple_producer_buffer.log"
41path_client_target = dir_demo / "simple_producer_history.log"
42
43
44def make_producer() -> MyProducer:
45    producer = MyProducer.new(
46        buffer=FileBuffer.new(
47            record_class=MyRecord,
48            path_wal=path_log,
49            max_records=3,
50        ),
51        retry_config=RetryConfig(
52            exp_backoff=[1, 2, 4],
53        ),
54        path_sink=path_client_target,
55    )
56    return producer
57
58
59producer = make_producer()
60
61# --- test 1 ---
62n = 15
63for i in range(1, 1 + n):
64    time.sleep(1)
65    # The producer program can be terminated with a 30% chance.
66    # we create a new producer object to simulate that.
67    if random.randint(1, 100) <= 30:
68        producer = make_producer()
69    producer.put(DataClassRecord(id=str(i)), verbose=True)
70
71# --- test 2 ---
72# producer.retry_config.exp_backoff = [0.1, 0.2, 0.4]
73# n = 1000
74# for i in range(1, 1 + n):
75#     time.sleep(0.001)
76#     producer.put(DataClassRecord(id=str(i)), verbose=True)
simple_producer.py Output
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "1", "create_at": "2024-01-07T07:31:41.482432+00:00"}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "2", "create_at": "2024-01-07T07:31:42.486702+00:00"}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "3", "create_at": "2024-01-07T07:31:43.487467+00:00"}
📤 📤 send records: ['1', '2', '3']
📤 🔴 failed, error: SendError('randomly failed due to send error')
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "4", "create_at": "2024-01-07T07:31:44.493604+00:00"}
📤 📤 send records: ['1', '2', '3']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "5", "create_at": "2024-01-07T07:31:45.494650+00:00"}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "6", "create_at": "2024-01-07T07:31:46.500991+00:00"}
📤 📤 send records: ['4', '5', '6']
📤 🔴 failed, error: SendError('randomly failed due to send error')
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "7", "create_at": "2024-01-07T07:31:47.506891+00:00"}
📤 📤 send records: ['4', '5', '6']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "8", "create_at": "2024-01-07T07:31:48.513853+00:00"}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "9", "create_at": "2024-01-07T07:31:49.519912+00:00"}
📤 📤 send records: ['7', '8', '9']
📤 🔴 failed, error: SendError('randomly failed due to send error')
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "10", "create_at": "2024-01-07T07:31:50.524332+00:00"}
📤 📤 send records: ['7', '8', '9']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "11", "create_at": "2024-01-07T07:31:51.532262+00:00"}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "12", "create_at": "2024-01-07T07:31:52.540772+00:00"}
📤 📤 send records: ['10', '11', '12']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "13", "create_at": "2024-01-07T07:31:53.548328+00:00"}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "14", "create_at": "2024-01-07T07:31:54.557038+00:00"}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "15", "create_at": "2024-01-07T07:31:55.562171+00:00"}
📤 📤 send records: ['13', '14', '15']
📤 🔴 failed, error: SendError('randomly failed due to send error')
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+

AWS CloudWatch Logs Producer#

aws_cloudwatch_logs_producer.py
 1# -*- coding: utf-8 -*-
 2
 3import typing as T
 4import time
 5import random
 6import shutil
 7import dataclasses
 8from pathlib import Path
 9from boto_session_manager import BotoSesManager
10
11from unistream.vendor.aws_cloudwatch_logs_insights_query import (
12    create_log_group,
13    create_log_stream,
14)
15from unistream.api import (
16    exc,
17    DataClassRecord,
18    FileBuffer,
19    RetryConfig,
20    AwsCloudWatchLogsProducer,
21)
22
23
24def rand_value() -> int:
25    return random.randint(1, 100)
26
27
28@dataclasses.dataclass
29class MyRecord(DataClassRecord):
30    value: int = dataclasses.field(default_factory=rand_value)
31
32
33@dataclasses.dataclass
34class MyProducer(AwsCloudWatchLogsProducer):
35    def send(self, records: T.List[MyRecord]):
36        if random.randint(1, 100) <= 50:
37            raise exc.SendError("randomly failed due to send error")
38        super().send(records)
39
40
41dir_demo = (
42    Path(__file__).absolute().parent.joinpath("aws_cloudwatch_logs_producer_demo")
43)
44shutil.rmtree(dir_demo, ignore_errors=True)
45dir_demo.mkdir(exist_ok=True)
46
47path_log = dir_demo / "aws_cloudwatch_logs_producer_buffer.log"
48bsm = BotoSesManager(profile_name="awshsh_app_dev_us_east_1")
49log_group_name = "aws_cloudwatch_logs_producer"
50log_stream_name = "stream1"
51create_log_group(bsm.cloudwatchlogs_client, log_group_name)
52create_log_stream(bsm.cloudwatchlogs_client, log_group_name, log_stream_name)
53
54
55def make_producer() -> MyProducer:
56    producer = MyProducer.new(
57        buffer=FileBuffer.new(
58            record_class=MyRecord,
59            path_wal=path_log,
60            max_records=3,
61        ),
62        retry_config=RetryConfig(
63            exp_backoff=[1, 2, 4],
64        ),
65        bsm=bsm,
66        log_group_name=log_group_name,
67        log_stream_name=log_stream_name,
68    )
69    return producer
70
71
72producer = make_producer()
73
74# --- test 1 ---
75n = 15
76for i in range(1, 1 + n):
77    time.sleep(1)
78    # The producer program can be terminated with a 30% chance.
79    # we create a new producer object to simulate that.
80    if random.randint(1, 100) <= 30:
81        producer = make_producer()
82    producer.put(DataClassRecord(id=str(i)), verbose=True)
aws_cloudwatch_logs_producer.py Output
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "1", "create_at": "2024-01-07T07:53:21.589807+00:00"}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "2", "create_at": "2024-01-07T07:53:22.591715+00:00"}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "3", "create_at": "2024-01-07T07:53:23.592865+00:00"}
📤 📤 send records: ['1', '2', '3']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.12 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "4", "create_at": "2024-01-07T07:53:24.718601+00:00"}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "5", "create_at": "2024-01-07T07:53:25.723828+00:00"}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "6", "create_at": "2024-01-07T07:53:26.729785+00:00"}
📤 📤 send records: ['4', '5', '6']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.03 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "7", "create_at": "2024-01-07T07:53:27.762263+00:00"}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "8", "create_at": "2024-01-07T07:53:28.766919+00:00"}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "9", "create_at": "2024-01-07T07:53:29.772756+00:00"}
📤 📤 send records: ['7', '8', '9']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.06 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "10", "create_at": "2024-01-07T07:53:30.841143+00:00"}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "11", "create_at": "2024-01-07T07:53:31.846897+00:00"}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "12", "create_at": "2024-01-07T07:53:32.854608+00:00"}
📤 📤 send records: ['10', '11', '12']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.03 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "13", "create_at": "2024-01-07T07:53:33.894938+00:00"}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "14", "create_at": "2024-01-07T07:53:34.899529+00:00"}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "15", "create_at": "2024-01-07T07:53:35.906959+00:00"}
📤 📤 send records: ['13', '14', '15']
📤 🔴 failed, error: SendError('randomly failed due to send error')
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+

AWS Kinesis Stream Producer#

aws_kinesis_producer.py
 1# -*- coding: utf-8 -*-
 2
 3import typing as T
 4import time
 5import random
 6import shutil
 7import dataclasses
 8from pathlib import Path
 9from boto_session_manager import BotoSesManager
10
11from unistream.api import (
12    FileBuffer,
13    RetryConfig,
14    KinesisRecord,
15    AwsKinesisStreamProducer,
16    exc,
17)
18
19
20def rand_value() -> int:
21    return random.randint(1, 100)
22
23
24@dataclasses.dataclass
25class MyRecord(KinesisRecord):
26    value: int = dataclasses.field(default_factory=rand_value)
27
28
29@dataclasses.dataclass
30class MyProducer(AwsKinesisStreamProducer):
31    def send(self, records: T.List[MyRecord]):
32        if random.randint(1, 100) <= 50:
33            raise exc.SendError("randomly failed due to send error")
34        super().send(records)
35
36
37dir_demo = Path(__file__).absolute().parent.joinpath("aws_kinesis_stream_producer_demo")
38shutil.rmtree(dir_demo, ignore_errors=True)
39dir_demo.mkdir(exist_ok=True)
40
41path_log = dir_demo / "aws_kinesis_stream_producer_buffer.log"
42bsm = BotoSesManager(profile_name="awshsh_app_dev_us_east_1")
43stream_name = "aws_kinesis_producer_test"
44
45
46def make_producer() -> MyProducer:
47    producer = MyProducer.new(
48        buffer=FileBuffer.new(
49            record_class=MyRecord,
50            path_wal=path_log,
51            max_records=3,
52        ),
53        retry_config=RetryConfig(
54            exp_backoff=[1, 2, 4],
55        ),
56        bsm=bsm,
57        stream_name=stream_name,
58    )
59    return producer
60
61
62producer = make_producer()
63
64# --- test 1 ---
65# n = 15
66# for i in range(1, 1 + n):
67#     time.sleep(1)
68#     # The producer program can be terminated with a 30% chance.
69#     # we create a new producer object to simulate that.
70#     if random.randint(1, 100) <= 30:
71#         producer = make_producer()
72#     producer.put(MyRecord(id=str(i)), verbose=True)
73
74# --- test 2 ---
75n = 1000
76for i in range(1, 1 + n):
77    time.sleep(1)
78    producer.put(MyRecord(id=f"id_{i}"), verbose=True)
aws_kinesis_producer.py Output
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "1", "create_at": "2024-01-07T08:00:25.458576+00:00", "value": 90}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "2", "create_at": "2024-01-07T08:00:26.462578+00:00", "value": 83}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "3", "create_at": "2024-01-07T08:00:27.468298+00:00", "value": 25}
📤 📤 send records: ['1', '2', '3']
Found credentials in shared credentials file: ~/.aws/credentials
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.19 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "4", "create_at": "2024-01-07T08:00:28.663244+00:00", "value": 48}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "5", "create_at": "2024-01-07T08:00:29.666372+00:00", "value": 47}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "6", "create_at": "2024-01-07T08:00:30.673782+00:00", "value": 74}
📤 📤 send records: ['4', '5', '6']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.03 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "7", "create_at": "2024-01-07T08:00:31.712087+00:00", "value": 51}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "8", "create_at": "2024-01-07T08:00:32.716550+00:00", "value": 33}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "9", "create_at": "2024-01-07T08:00:33.719337+00:00", "value": 83}
📤 📤 send records: ['7', '8', '9']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.02 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "10", "create_at": "2024-01-07T08:00:34.747290+00:00", "value": 68}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "11", "create_at": "2024-01-07T08:00:35.752213+00:00", "value": 93}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.01 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "12", "create_at": "2024-01-07T08:00:36.759224+00:00", "value": 66}
📤 📤 send records: ['10', '11', '12']
📤 🔴 failed, error: SendError('randomly failed due to send error')
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "13", "create_at": "2024-01-07T08:00:37.765174+00:00", "value": 89}
📤 📤 send records: ['10', '11', '12']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.03 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "14", "create_at": "2024-01-07T08:00:38.794387+00:00", "value": 50}
📤 🚫 we should not emit
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.00 sec -----------------------------+
+----- ⏱ 📤 Start 'put record' -------------------------------------------------+
📤
📤 record = {"id": "15", "create_at": "2024-01-07T08:00:39.799690+00:00", "value": 56}
📤 📤 send records: ['13', '14', '15']
📤 🟢 succeeded
📤
+----- ⏰ ✅ 📤 End 'put record', elapsed = 0.02 sec -----------------------------+