Introduction
When it comes to real-time stream processing on AWS, developers face a critical choice: Amazon Kinesis or Amazon MSK (Managed Streaming for Apache Kafka)? Both services handle high-throughput data streams, but they're fundamentally different in philosophy, architecture, and use cases.
This comprehensive guide covers everything you need to know—from architectural differences to production best practices—helping you make the right choice for your streaming workloads.
Table of Contents
- Key Differentiators: Kinesis vs MSK
- Architectural Differences
- Producers and Consumers
- Checkpointing and Acknowledgment
- Error Handling
- Performance Tuning
- Best Practices
- Decision Framework
Part 1: Key Differentiators
When to Choose Amazon Kinesis
AWS-Native Simplicity
Kinesis is AWS's fully managed, opinionated streaming service. Think of it as the "serverless" approach to stream processing—you focus on your data, AWS handles everything else.
Best for:
- Tight AWS Integration: Native connections to Lambda, Firehose, DynamoDB, and more
- Quick Time-to-Market: Setup takes minutes, not hours
- Limited Kafka Expertise: No need to understand Kafka internals
- Predictable Workloads: Clear pricing model based on shard-hours
Ideal Use Cases:
✅ Real-time analytics dashboards
✅ Log aggregation to S3/Redshift
✅ IoT telemetry collection
✅ Clickstream analysis
✅ CloudWatch Logs processing
When to Choose Amazon MSK
Kafka Ecosystem Power
MSK provides managed Kafka clusters with full access to the Kafka ecosystem. Now available in three deployment modes: Provisioned, Serverless, and Connect.
Best for:
- Kafka Compatibility: Need Kafka Connect, Kafka Streams, or KSQL
- Migration Scenarios: Moving existing Kafka workloads to AWS
- Advanced Control: Custom configurations, long retention, exactly-once semantics
- High Throughput: Sustained throughput over 10 MB/s per stream
- Multi-cloud Strategy: Standard Kafka APIs work everywhere
Ideal Use Cases:
✅ Database change data capture (CDC) with Debezium
✅ Event-driven microservices architectures
✅ Complex stream processing with Kafka Streams
✅ Multi-datacenter replication
✅ Event sourcing with long retention
✅ Real-time data pipelines with Kafka Connect
Quick Comparison
| Factor | Kinesis | MSK Serverless | MSK Provisioned |
|---|---|---|---|
| Setup Time | 5 minutes | 5 minutes | 30-60 minutes |
| Learning Curve | Low | Low-Medium | Medium-High |
| Ops Overhead | Minimal | Minimal | Moderate |
| Throughput/Partition | 1 MB/s write, 2 MB/s read per shard (Provisioned/On-demand) | 5 MB/s write, 10 MB/s read per partition (cluster max: 200 MB/s write, 400 MB/s read) | 10+ MB/s per partition (scales with broker type and provisioned storage throughput) |
| Max Retention | 365 days | 365 days | Unlimited (with tiered storage) |
| API Standard | AWS proprietary | Apache Kafka | Apache Kafka |
| Ecosystem | AWS services | Kafka ecosystem | Full Kafka ecosystem |
| Pricing Model | Shard-hour + PUT units | Pay per throughput | Broker-hour + storage |
| Scaling | Manual/Auto | Automatic | Manual/Automatic |
Part 2: Architectural Deep Dive
Understanding the architectural differences is crucial for making the right choice.
Data Organization: Shards vs Partitions
Kinesis Architecture
Stream: myOrdersStream
├── Shard 1 ────→ [Sequence: 001, 002, 003...]
├── Shard 2 ────→ [Sequence: 001, 002, 003...]
└── Shard 3 ────→ [Sequence: 001, 002, 003...]
Key Points:
- Shard = capacity unit (1 MB/s write, 2 MB/s read)
- Fixed throughput per shard
- Shards are AWS-managed infrastructure
- Partition key determines shard routing
MSK (Kafka) Architecture
Topic: orders
├── Partition 0 ────→ [Offset: 0, 1, 2...]
├── Partition 1 ────→ [Offset: 0, 1, 2...]
└── Partition 2 ────→ [Offset: 0, 1, 2...]
Brokers (Provisioned Mode):
Broker 1: Partitions [0, 3, 6]
Broker 2: Partitions [1, 4, 7]
Broker 3: Partitions [2, 5, 8]
Key Points:
- Partition = logical ordering unit
- Throughput determined by broker resources (Provisioned) or auto-scales (Serverless)
- Partitions distributed across brokers
- Configurable replication
Critical Difference:
- Kinesis shards are capacity containers
- Kafka partitions are ordering guarantees with flexible throughput
Consumer Models
Kinesis: Shared vs Enhanced Fan-Out
Standard (Shared) Model:
Stream [Shard 1, Shard 2]
│
┌────┴────┐
│ │
App A App B
└─────────┘
│
2 MB/s total (shared)
Enhanced Fan-Out (EFO):
Stream [Shard 1, Shard 2]
│
├─────→ App A: 2 MB/s dedicated
├─────→ App B: 2 MB/s dedicated
└─────→ App C: 2 MB/s dedicated
Cost:
- $0.015/consumer/shard/hour
- $0.015/GB data retrieved
Latency: ~70ms (vs ~200ms standard)
MSK: Consumer Groups
Topic [P0, P1, P2, P3]
│
┌────┴────┐
│ │
Consumer Group: "analytics"
├── Consumer 1 → P0, P1
└── Consumer 2 → P2, P3
Consumer Group: "backup"
└── Consumer 1 → P0, P1, P2, P3
Key Benefits:
- Parallel processing within group
- Independent consumer groups
- Automatic rebalancing
- Flexible scaling
Scaling Mechanisms
Kinesis: Shard-Based Scaling
# Calculate required shards
def calculate_shards(write_mb_s, read_mb_s, consumers):
write_shards = math.ceil(write_mb_s / 1.0)
read_shards = math.ceil(read_mb_s * consumers / 2.0)
return max(write_shards, read_shards)
# Provisioned Mode: Manual scaling
kinesis.update_shard_count(
StreamName='myStream',
TargetShardCount=10,
ScalingType='UNIFORM_SCALING'
)
# On-Demand Mode: Auto-scales based on throughput
kinesis.create_stream(
StreamName='autoScaleStream',
StreamModeDetails={'StreamMode': 'ON_DEMAND'}
)
# On-Demand characteristics:
# - Scales to 4 MB/s write per shard automatically
# - Default account limit: 200 MB/s (can be increased)
# - Doubles previous peak capacity
# - Automatically adds/removes shards
MSK Serverless: Automatic Scaling
# MSK Serverless (GA 2022)
msk.create_cluster_v2(
ClusterName='serverless-cluster',
Serverless={
'VpcConfigs': [{
'SubnetIds': subnet_ids,
'SecurityGroupIds': sg_ids
}],
'ClientAuthentication': {
'Sasl': {'Iam': {'Enabled': True}}
}
}
)
# Characteristics:
# - Automatically scales 1-200 MB/s
# - Pay per GB ingress/egress
# - No broker management
# - Compatible with Kafka APIs
# - Limited configuration options
MSK Provisioned: Broker + Partition Scaling
# Add partitions (instant, no data movement)
kafka-topics.sh --alter \
--bootstrap-server broker:9092 \
--topic orders \
--partitions 20
# ⚠️ WARNING: Adding partitions breaks ordering for existing keys
# Existing messages stay in old partitions
# Use consistent hashing: partition = hash(key) % total_partitions
# Add brokers (requires cluster expansion)
aws kafka update-broker-count \
--cluster-arn arn:aws:kafka:... \
--current-version v1 \
--target-number-of-broker-nodes 6
Key Difference:
- Kinesis: Data reshards (takes 15-30 minutes)
- MSK: New partitions, no resharding (instant but affects key distribution)
Storage and Retention
Kinesis: Time-Based, Abstracted
# Retention: 24 hours to 365 days
kinesis.increase_stream_retention_period(
StreamName='myStream',
RetentionPeriodHours=168 # 7 days
)
Characteristics:
- Always 3x replication (hidden)
- Pay for shard-hours, not storage
- Automatic cleanup after retention
- Maximum 365 days
MSK: Configurable, Visible Storage
# Time-based retention (Serverless & Provisioned)
kafka-configs.sh --alter \
--entity-type topics \
--entity-name orders \
--add-config retention.ms=604800000 # 7 days
# Size-based retention
--add-config retention.bytes=10737418240 # 10 GB
# Log compaction (infinite retention)
--add-config cleanup.policy=compact
# NEW: MSK Tiered Storage (Provisioned only)
# Unlimited retention at 90%+ cost savings
aws kafka update-storage \
--cluster-arn arn:aws:kafka:... \
--storage-mode TIERED
# Hot tier: Primary brokers (low latency)
# Cold tier: S3 (cost-effective, higher latency)
Part 3: Producers and Consumers Ecosystem
Kinesis Producers
AWS Services (Native)
Direct Integration:
CloudWatch Logs → Subscription Filters → Kinesis
AWS IoT Core → IoT Rules → Kinesis
EventBridge → Targets → Kinesis
DMS → Kinesis Target → Kinesis
API Gateway → Direct PUT → Kinesis
Application SDKs
# AWS SDK (boto3)
import boto3
kinesis = boto3.client('kinesis')
response = kinesis.put_record(
StreamName='myStream',
Data=json.dumps({'user': 'john', 'action': 'login'}),
PartitionKey='user-123'
)
# Batch for better performance
kinesis.put_records(
StreamName='myStream',
Records=[
{'Data': data1, 'PartitionKey': 'key1'},
{'Data': data2, 'PartitionKey': 'key2'}
]
)
// Kinesis Producer Library (KPL) - High Performance
KinesisProducerConfiguration config = new KinesisProducerConfiguration()
.setAggregationEnabled(true)
.setCollectionMaxCount(500)
.setRecordMaxBufferedTime(100);
KinesisProducer producer = new KinesisProducer(config);
producer.addUserRecord("myStream", "partitionKey", data);
// Achieves 5-10x better throughput than raw SDK
Kinesis Consumers
AWS Services
Lambda:
Trigger: Kinesis Stream
Batch Size: 100-10000
Parallelization: Up to 10 batches per shard
Use Case: Real-time processing, triggers
Kinesis Data Firehose:
Destinations: S3, Redshift, OpenSearch, HTTP, Snowflake
Transformation: Lambda-based
Use Case: Data lake ingestion
Kinesis Data Analytics:
SQL: Real-time SQL queries
Flink: Complex stream processing
Use Case: Real-time analytics
EMR/Glue:
Spark Streaming integration
Use Case: Big data processing
Application SDKs
// Kinesis Client Library (KCL)
// Note: KCL 3.x (2023) available with performance improvements
// KCL 2.x (stable, widely used)
ConfigsBuilder configsBuilder = new ConfigsBuilder(
streamName, applicationName,
kinesisClient, dynamoDBClient, cloudWatchClient,
workerId, recordProcessorFactory
);
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.retrievalConfig()
);
new Thread(scheduler).start();
// KCL 3.x improvements:
// - Better performance
// - Reduced DynamoDB costs (on-demand billing by default)
// - Breaking changes from 2.x (multilang support removed)
MSK Producers
AWS Services
AWS DMS:
Target: Kafka (MSK)
Use Case: Database CDC
Lambda:
Library: kafka-python, node-rdkafka
Use Case: Event-driven producers
EventBridge → Lambda → MSK:
Pattern for AWS event routing
Kafka Connect (Source Connectors)
Popular Connectors:
Debezium:
- MySQL, PostgreSQL, Oracle, SQL Server
- MongoDB, Cassandra
Use Case: Database CDC
JDBC Source:
- Any JDBC database
Use Case: Bulk imports
S3 Source:
Use Case: File ingestion
Salesforce Source:
Use Case: CRM data integration
MSK Connect:
- Fully managed Kafka Connect
- Deploy connectors without infrastructure
Native Kafka Clients
// Java Producer
Properties props = new Properties();
props.put("bootstrap.servers", "b-1.msk:9092,b-2.msk:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("topic", "key", value));
# Python Producer
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['b-1.msk:9092', 'b-2.msk:9092']
)
producer.send('topic', b'message')
MSK Consumers
AWS Services
Lambda:
Event Source: MSK (Serverless & Provisioned)
Batch Size: Configurable
Use Case: Serverless processing
Glue:
Streaming ETL from MSK
Use Case: Data transformations
EMR:
Spark Streaming, Flink
Use Case: Big data analytics
Kafka Connect (Sink Connectors)
Popular Sinks:
S3 Sink:
Format: Parquet, Avro, JSON
Partitioning: Time-based, field-based
JDBC Sink:
Targets: PostgreSQL, MySQL, etc.
Use Case: Database updates
Elasticsearch Sink:
Use Case: Search indexing
Snowflake Sink:
Use Case: Data warehousing
Stream Processing Frameworks
// Kafka Streams
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("input-topic");
stream
.filter((key, value) -> value.contains("error"))
.mapValues(value -> value.toUpperCase())
.to("output-topic");
// Apache Flink (works with both MSK Serverless & Provisioned)
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("topic", schema, properties))
.map(value -> process(value))
.addSink(new FlinkKafkaProducer<>("output-topic", schema, properties));
-- KSQL (SQL on Kafka streams)
CREATE STREAM clicks_stream (
user_id VARCHAR,
page VARCHAR,
timestamp BIGINT
) WITH (
KAFKA_TOPIC='clicks',
VALUE_FORMAT='JSON'
);
SELECT user_id, COUNT(*) as click_count
FROM clicks_stream
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY user_id;
Part 4: Checkpointing and Acknowledgment
Understanding how data is acknowledged and progress is tracked is critical for reliability.
Write Acknowledgment (Producer → Cluster)
Kinesis: Always Strongly Consistent
# Write path
response = kinesis.put_record(
StreamName='myStream',
Data=data,
PartitionKey='key'
)
# Behind the scenes:
# 1. Data written to leader shard
# 2. Synchronously replicated to 2 other AZs (always 3x)
# 3. Acknowledgment after all 3 writes complete
# 4. Returns sequence number
print(response['SequenceNumber'])
# '49590338271490256608559692538361571095921575989136588898'
Guarantees:
✅ Always 3-way replication
✅ Acknowledgment only after all replicas written
✅ No configuration options
✅ ~70-200ms latency
MSK: Configurable Durability
// acks=0: Fire and forget (fastest, risky)
props.put("acks", "0");
// Producer doesn't wait, no guarantee
// acks=1: Leader only (balanced)
props.put("acks", "1");
// Wait for leader write, ~5-10ms
// acks=all: All ISR (safest)
props.put("acks", "all");
props.put("min.insync.replicas", "2"); // Topic config
// Wait for 2+ replicas, ~15-50ms
Performance Impact:
┌────────┬────────────┬─────────┬──────────────┐
│ acks │ Throughput │ Latency │ Durability │
├────────┼────────────┼─────────┼──────────────┤
│ 0 │ 150K msg/s │ 1ms │ Low │
│ 1 │ 100K msg/s │ 5ms │ Medium │
│ all │ 80K msg/s │ 15ms │ High │
└────────┴────────────┴─────────┴──────────────┘
Idempotent Producer (Prevent Duplicates)
// Kafka only - prevents duplicates on retry
Properties props = new Properties();
props.put("enable.idempotence", "true");
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
// With idempotence:
// - Automatic sequence numbers
// - Deduplication on broker
// - Exactly-once delivery to topic
Read Checkpointing (Consumer Progress)
Kinesis: DynamoDB-Based
// KCL automatically creates DynamoDB table
// Table structure:
┌──────────────┬────────────┬──────────────┐
│ ShardId │ Checkpoint │ LeaseOwner │
├──────────────┼────────────┼──────────────┤
│ shardId-001 │ 495903... │ worker-1 │
│ shardId-002 │ 495904... │ worker-2 │
└──────────────┴────────────┴──────────────┘
// In your record processor
public void processRecords(ProcessRecordsInput input) {
for (KinesisClientRecord record : input.records()) {
processRecord(record);
}
// Checkpoint (writes to DynamoDB)
input.checkpointer().checkpoint();
}
Cost Consideration (KCL 2.x):
- Each checkpoint = 1 DynamoDB write
- 10 shards × 20 checkpoints/min = 200 WCU/min
- Plus lease renewals ≈ 200 WCU/min
- Total: ~7 WCU steady state
KCL 3.x Improvement:
- Uses DynamoDB On-Demand billing by default
- Reduced checkpoint costs
- Better DynamoDB efficiency
Checkpoint Strategies
// ❌ BAD: Checkpoint every record (expensive)
for (KinesisClientRecord record : records) {
processRecord(record);
checkpointer.checkpoint(record); // Too many DynamoDB writes
}
// ✅ GOOD: Time-based checkpointing
private long lastCheckpoint = System.currentTimeMillis();
private static final long INTERVAL = 60000; // 1 minute
public void processRecords(ProcessRecordsInput input) {
for (KinesisClientRecord record : input.records()) {
processRecord(record);
}
if (System.currentTimeMillis() - lastCheckpoint > INTERVAL) {
input.checkpointer().checkpoint();
lastCheckpoint = System.currentTimeMillis();
}
}
// ✅ GOOD: Count-based checkpointing
private int recordCount = 0;
for (KinesisClientRecord record : records) {
processRecord(record);
if (++recordCount % 1000 == 0) {
checkpointer.checkpoint();
}
}
// ✅ BEST: Handle resharding properly
@Override
public void shardEnded(ShardEndedInput input) {
// Checkpoint before shard closes
input.checkpointer().checkpoint();
// KCL automatically handles parent/child shard relationships
}
MSK: Internal Topic Storage
# Kafka stores offsets in special topic: __consumer_offsets
# 50 partitions (default), compacted for efficiency
Offset Storage:
┌─────────────┬───────────┬───────────┬────────┐
│ Group │ Topic │ Partition │ Offset │
├─────────────┼───────────┼───────────┼────────┤
│ analytics │ orders │ 0 │ 12345 │
│ analytics │ orders │ 1 │ 67890 │
│ backup │ orders │ 0 │ 12340 │
└─────────────┴───────────┴───────────┴────────┘
Kafka Commit Strategies
// 1. Auto-commit (simplest)
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
// Commits every 5 seconds automatically
// Risk: At-least-once (may reprocess on crash)
// 2. Manual sync commit (safest)
props.put("enable.auto.commit", "false");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
processRecords(records);
consumer.commitSync(); // Blocks until committed
}
// 3. Manual async commit (fastest)
processRecords(records);
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("Commit failed", exception);
}
});
// 4. Hybrid approach (best practice)
try {
while (true) {
processRecords(consumer.poll(100));
consumer.commitAsync(); // Fast path
}
} finally {
consumer.commitSync(); // Ensure final commit
consumer.close();
}
Exactly-Once Semantics
// Kinesis: Application-level only
public void processRecord(Record record) {
String messageId = record.sequenceNumber();
if (dedupeTable.exists(messageId)) {
return; // Already processed
}
database.beginTransaction();
try {
processData(record.data());
dedupeTable.insert(messageId);
database.commit();
} catch (Exception e) {
database.rollback();
}
}
// MSK: Native transactional support
Properties props = new Properties();
props.put("transactional.id", "my-transaction-" + instanceId);
props.put("enable.idempotence", "true");
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction(); // Atomic
} catch (Exception e) {
producer.abortTransaction();
}
// Kafka Streams exactly-once
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// Read → Process → Write is atomic
Part 5: Error Handling Strategies
Kinesis Producer Errors
Handling Throughput Exceeded
import time
import random
def put_with_exponential_backoff(kinesis, stream, data, key, max_retries=5):
"""
Most common error: ProvisionedThroughputExceededException
Solution: Exponential backoff with jitter
"""
for attempt in range(max_retries):
try:
return kinesis.put_record(
StreamName=stream,
Data=data,
PartitionKey=key
)
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceeded':
if attempt == max_retries - 1:
send_to_dlq(data, key)
raise
# Exponential backoff with jitter
delay = min((2 ** attempt) + random.uniform(0, 1), 10)
time.sleep(delay)
else:
# Non-retriable error
send_to_dlq(data, key, str(e))
raise
Batch Error Handling
def put_records_with_retry(kinesis, stream, records):
"""Handle partial batch failures"""
to_retry = records
attempt = 0
while to_retry and attempt < 5:
response = kinesis.put_records(
StreamName=stream,
Records=[{'Data': r['data'], 'PartitionKey': r['key']}
for r in to_retry]
)
if response['FailedRecordCount'] > 0:
# Extract failed records
to_retry = [
to_retry[i]
for i, result in enumerate(response['Records'])
if 'ErrorCode' in result
]
attempt += 1
time.sleep(2 ** attempt)
else:
break
# Send remaining failures to DLQ
for record in to_retry:
send_to_dlq(record)
Kinesis Consumer Errors
KCL Error Handling Pattern
public class RobustRecordProcessor implements ShardRecordProcessor {
@Override
public void processRecords(ProcessRecordsInput input) {
List<KinesisClientRecord> failedRecords = new ArrayList<>();
for (KinesisClientRecord record : input.records()) {
try {
processRecord(record);
} catch (RetryableException e) {
if (!retryWithBackoff(record, 3)) {
failedRecords.add(record);
}
} catch (NonRetryableException e) {
sendToDLQ(record, e);
}
}
// Handle persistent failures
for (KinesisClientRecord record : failedRecords) {
sendToDLQ(record, new Exception("Max retries exceeded"));
}
// Checkpoint after handling
try {
input.checkpointer().checkpoint();
} catch (Exception e) {
log.error("Checkpoint failed", e);
}
}
private void sendToDLQ(KinesisClientRecord record, Exception error) {
Map<String, Object> dlqRecord = Map.of(
"originalData", record.data(),
"sequenceNumber", record.sequenceNumber(),
"error", error.getMessage(),
"timestamp", Instant.now()
);
try {
// Primary: Send to DLQ stream
dlqKinesis.putRecord(
dlqStreamName,
serialize(dlqRecord),
record.partitionKey()
);
} catch (Exception e) {
// Fallback: Write to S3
s3.putObject(dlqBucket, generateKey(), serialize(dlqRecord));
}
}
}
Lambda Consumer Error Handling
def lambda_handler(event, context):
"""
Use batch item failures for partial success
"""
failed_record_ids = []
for record in event['Records']:
try:
payload = base64.b64decode(record['kinesis']['data'])
process_data(json.loads(payload))
except Exception as e:
logger.error(f"Processing failed: {e}")
failed_record_ids.append({
'itemIdentifier': record['kinesis']['sequenceNumber']
})
# Lambda retries only failed records
return {
'batchItemFailures': failed_record_ids
}
# CloudFormation configuration
MyFunction:
Type: AWS::Serverless::Function
Properties:
Events:
Stream:
Type: Kinesis
Properties:
Stream: !GetAtt MyStream.Arn
MaximumRetryAttempts: 3
BisectBatchOnFunctionError: true # Split bad batches
DestinationConfig:
OnFailure:
Type: SQS
Destination: !GetAtt DLQ.Arn
MSK Producer Errors
Comprehensive Error Handling
public class ResilientKafkaProducer {
private final KafkaProducer<String, byte[]> producer;
private final KafkaProducer<String, byte[]> dlqProducer;
public void sendWithErrorHandling(String topic, String key, byte[] value) {
ProducerRecord<String, byte[]> record =
new ProducerRecord<>(topic, key, value);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
handleError(record, exception);
} else {
metrics.recordSuccess(metadata);
}
});
}
private void handleError(ProducerRecord<String, byte[]> record,
Exception exception) {
if (exception instanceof RetriableException) {
// Already retried per config, send to retry queue
retryQueue.offer(record);
} else if (exception instanceof RecordTooLargeException) {
// Split message or send to alternative storage
handleOversizedMessage(record);
} else {
// Non-retriable, send to DLQ
sendToDLQ(record, exception);
}
}
private void sendToDLQ(ProducerRecord<String, byte[]> record,
Exception exception) {
Map<String, Object> dlqData = Map.of(
"originalTopic", record.topic(),
"originalKey", record.key(),
"originalValue", record.value(),
"error", exception.getMessage(),
"timestamp", Instant.now()
);
ProducerRecord<String, byte[]> dlqRecord = new ProducerRecord<>(
record.topic() + ".DLQ",
record.key(),
serialize(dlqData)
);
dlqProducer.send(dlqRecord, (metadata, e) -> {
if (e != null) {
// DLQ failed, fallback to S3
s3Client.putObject(dlqBucket, generateKey(), serialize(dlqData));
}
});
}
}
Idempotent Configuration
// Prevent duplicates on retry
Properties props = new Properties();
props.put("enable.idempotence", "true");
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 5);
// With idempotence enabled:
// ✅ Automatic deduplication
// ✅ Ordering preserved
// ✅ No duplicate messages on retry
MSK Consumer Errors
Robust Consumer Pattern
public class ResilientKafkaConsumer {
private final KafkaConsumer<String, byte[]> consumer;
public void consume() {
consumer.subscribe(Arrays.asList("topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Commit before losing partitions
consumer.commitSync();
cleanupPartitions(partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
initializePartitions(partitions);
}
});
try {
while (true) {
ConsumerRecords<String, byte[]> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, byte[]> record : records) {
processWithRetry(record);
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
// Rebalance occurred, partitions reassigned
log.warn("Commit failed, rebalance in progress");
}
}
} catch (WakeupException e) {
// Shutdown initiated
} finally {
consumer.commitSync();
consumer.close();
}
}
private void processWithRetry(ConsumerRecord<String, byte[]> record) {
int maxRetries = 3;
for (int i = 0; i < maxRetries; i++) {
try {
processRecord(record);
return;
} catch (RetryableException e) {
if (i == maxRetries - 1) {
sendToDLQ(record, e);
} else {
sleep(1000 * (i + 1));
}
} catch (NonRetryableException e) {
sendToDLQ(record, e);
return;
}
}
}
}
Dead Letter Topic Pattern
private void sendToDLQ(ConsumerRecord<String, byte[]> record, Exception error) {
Map<String, Object> dlqData = new HashMap<>();
dlqData.put("originalTopic", record.topic());
dlqData.put("originalPartition", record.partition());
dlqData.put("originalOffset", record.offset());
dlqData.put("originalKey", record.key());
dlqData.put("originalValue", record.value());
dlqData.put("errorMessage", error.getMessage());
dlqData.put("errorStackTrace", getStackTrace(error));
dlqData.put("timestamp", Instant.now());
ProducerRecord<String, byte[]> dlqRecord = new ProducerRecord<>(
record.topic() + ".DLQ",
record.key(),
serialize(dlqData)
);
// Add headers for filtering
dlqRecord.headers().add("error.class",
error.getClass().getName().getBytes());
dlqRecord.headers().add("error.time",
String.valueOf(System.currentTimeMillis()).getBytes());
dlqProducer.send(dlqRecord);
}
Part 6: Performance Tuning
Kinesis Performance Optimization
Producer Optimization
# ❌ BAD: Single puts (slow, expensive)
for record in records:
kinesis.put_record(
StreamName='myStream',
Data=record,
PartitionKey=get_key(record)
)
# Result: ~100 records/sec, high cost
# ✅ GOOD: Batch puts
batch = []
for record in records:
batch.append({'Data': record, 'PartitionKey': get_key(record)})
if len(batch) >= 500:
kinesis.put_records(StreamName='myStream', Records=batch)
batch = []
# Result: 500-1000 records/sec
# ✅ BEST: KPL with aggregation
KinesisProducerConfiguration config = new KinesisProducerConfiguration()
.setAggregationEnabled(true)
.setCollectionMaxCount(500)
.setRecordMaxBufferedTime(100);
KinesisProducer producer = new KinesisProducer(config);
// Result: 1000-5000 records/sec
Performance Comparison
| Method | Throughput | Latency | Cost Efficiency |
|---|---|---|---|
| PutRecord | 100 rec/s | 50ms | ★☆☆☆☆ |
| PutRecords | 1K rec/s | 100ms | ★★★☆☆ |
| KPL | 5K rec/s | 150ms | ★★★★★ |
Consumer Optimization
// Enhanced Fan-Out vs Standard
// Standard: Shared 2 MB/s
kinesis.register_stream_consumer(
StreamARN=stream_arn,
ConsumerName='standard-consumer'
)
// Cost: Included in shard cost
// Latency: ~200ms
// Enhanced Fan-Out: Dedicated 2 MB/s per consumer
RetrievalConfig efoConfig = configsBuilder
.retrievalConfig()
.retrievalSpecificConfig(
new FanOutConfig(kinesisClient)
.streamName(streamName)
.applicationName(applicationName)
);
// Cost: $0.015/consumer/shard/hour + $0.015/GB retrieved
// Latency: ~70ms
// When: Multiple consumers or low-latency requirements
Checkpoint Optimization
// Checkpoint frequency vs cost (KCL 2.x)
// Every record: ~$100/month for 10 shards (DON'T DO THIS)
for (KinesisClientRecord record : records) {
processRecord(record);
checkpointer.checkpoint(record);
}
// Every minute: ~$5/month for 10 shards (RECOMMENDED)
if (System.currentTimeMillis() - lastCheckpoint > 60000) {
checkpointer.checkpoint();
}
// Every 1000 records: Variable cost (ALTERNATIVE)
if (++recordCount % 1000 == 0) {
checkpointer.checkpoint();
}
// KCL 3.x: Lower costs with On-Demand DynamoDB billing by default
Scaling Strategy
# Calculate optimal shard count
def calculate_shards(write_mb_s, read_mb_s, consumers, use_efo):
write_shards = math.ceil(write_mb_s / 1.0)
if use_efo:
# Each consumer gets dedicated 2 MB/s
read_shards = math.ceil(read_mb_s / 2.0)
else:
# All consumers share 2 MB/s
read_shards = math.ceil(read_mb_s * consumers / 2.0)
# Add 20-30% buffer
return max(write_shards, read_shards) * 1.3
# Auto-scaling setup (Provisioned mode)
autoscaling.put_scaling_policy(
PolicyName='kinesis-scale-policy',
ServiceNamespace='kinesis',
ResourceId='stream/myStream',
ScalableDimension='kinesis:stream:WriteCapacityUnits',
PolicyType='TargetTrackingScaling',
TargetTrackingScalingPolicyConfiguration={
'TargetValue': 75.0, # Target 75% utilization
'ScaleInCooldown': 300,
'ScaleOutCooldown': 60
}
)
# Or use On-Demand for variable loads
kinesis.create_stream(
StreamName='variable-stream',
StreamModeDetails={'StreamMode': 'ON_DEMAND'}
)
# Auto-scales to 4 MB/s per shard
# Default limit: 200 MB/s (can request increase)
# Automatically adds/removes shards
Partition Key Best Practices
# ✅ GOOD: High-cardinality keys
import hashlib
# Option 1: Hash user ID for even distribution
partition_key = hashlib.md5(user_id.encode()).hexdigest()
# Option 2: Combine attributes for better distribution
partition_key = f"{user_id}-{timestamp % 100}"
# Option 3: Random for maximum distribution (loses ordering)
partition_key = str(uuid.uuid4())
# ❌ BAD: Low-cardinality keys create hot shards
partition_key = "constant" # All to one shard
partition_key = region # Only 3-4 values
partition_key = status # "active", "inactive" only
MSK Performance Optimization
Broker Sizing Guide
# Throughput guide per instance type
BROKER_CAPACITY = {
'kafka.m5.large': {
'throughput_mb_s': 50,
'partitions': 500,
'cost_per_hour': 0.21
},
'kafka.m5.xlarge': {
'throughput_mb_s': 100,
'partitions': 1000,
'cost_per_hour': 0.42
},
'kafka.m5.2xlarge': {
'throughput_mb_s': 200,
'partitions': 2000,
'cost_per_hour': 0.84
},
'kafka.m5.4xlarge': {
'throughput_mb_s': 400,
'partitions': 4000,
'cost_per_hour': 1.68
}
}
def calculate_broker_size(target_throughput, partition_count):
for instance, capacity in BROKER_CAPACITY.items():
brokers_needed = max(
3, # Minimum for HA
math.ceil(target_throughput / capacity['throughput_mb_s']),
math.ceil(partition_count / capacity['partitions'])
)
if brokers_needed <= 6: # Reasonable cluster size
return {
'instance_type': instance,
'broker_count': brokers_needed,
'monthly_cost': brokers_needed * capacity['cost_per_hour'] * 730
}
MSK Serverless Performance
# MSK Serverless auto-scales but has characteristics:
MSK_SERVERLESS = {
'throughput': {
'min': '1 MB/s',
'max': '200 MB/s',
'scaling': 'automatic'
},
'retention': {
'max': '365 days'
},
'pricing': {
'ingress': '$0.10/GB',
'egress': '$0.05/GB',
'storage': '$0.10/GB-month'
},
'best_for': [
'Variable workloads',
'Development/testing',
'Unpredictable traffic',
'Cost optimization for < 40 MB/s'
]
}
Producer Tuning
// High-throughput configuration
Properties props = new Properties();
// Batching (critical for performance)
props.put("batch.size", 65536); // 64 KB
props.put("linger.ms", 10); // Wait 10ms for more records
// Compression (2-4x improvement)
props.put("compression.type", "lz4"); // Fast
// Or "zstd" for better compression (more CPU)
// Buffer memory
props.put("buffer.memory", 67108864); // 64 MB
// In-flight requests
props.put("max.in.flight.requests.per.connection", 5);
// Durability vs speed tradeoff
props.put("acks", "1"); // Fast: leader only
// Or "all" for safety with idempotence
props.put("enable.idempotence", "true");
Performance Results:
┌──────────────────┬─────────────┬─────────┐
│ Configuration │ Throughput │ Latency │
├──────────────────┼─────────────┼─────────┤
│ Default │ 10K msg/s │ 10ms │
│ + Batching │ 50K msg/s │ 20ms │
│ + Compression │ 100K msg/s │ 15ms │
│ + acks=1 │ 150K msg/s │ 5ms │
└──────────────────┴─────────────┴─────────┘
Consumer Tuning
Properties props = new Properties();
// Fetch configuration (critical for throughput)
props.put("fetch.min.bytes", 50000); // 50 KB minimum
props.put("fetch.max.wait.ms", 500); // Max wait time
props.put("max.partition.fetch.bytes", 1048576); // 1 MB per partition
props.put("fetch.max.bytes", 52428800); // 50 MB per fetch
// Poll configuration
props.put("max.poll.records", 5000); // More per poll
// Session timeout
props.put("session.timeout.ms", 30000);
props.put("max.poll.interval.ms", 300000); // 5 minutes
// Ensure: max.poll.interval.ms > (max.poll.records × processing_time)
// Partition assignment
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.StickyAssignor");
// Minimizes rebalancing disruption
Tuning Results:
┌──────────────────┬─────────────┬─────────┐
│ Configuration │ Throughput │ Latency │
├──────────────────┼─────────────┼─────────┤
│ Default │ 50K msg/s │ 100ms │
│ + Large fetch │ 200K msg/s │ 200ms │
│ + High poll recs │ 500K msg/s │ 300ms │
└──────────────────┴─────────────┴─────────┘
Parallel Processing
// Scale consumers to match partitions
// 12 partitions → up to 12 consumers for max parallelism
// Multiple consumer instances (recommended)
public static void main(String[] args) {
int numConsumers = 4;
ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
for (int i = 0; i < numConsumers; i++) {
executor.submit(new ConsumerWorker(i));
}
}
// Kafka automatically distributes partitions
// 12 partitions ÷ 4 consumers = 3 partitions each
// Alternative: Single consumer, multi-threaded processing
ExecutorService processingPool = Executors.newFixedThreadPool(10);
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(100);
List<Future<?>> futures = new ArrayList<>();
for (ConsumerRecord<String, byte[]> record : records) {
futures.add(processingPool.submit(() -> processRecord(record)));
}
// Wait for all processing
for (Future<?> future : futures) {
future.get();
}
consumer.commitSync();
}
Topic Configuration
# Partition count calculation
# partitions = max(
# target_throughput / producer_throughput_per_partition,
# target_throughput / consumer_throughput_per_partition
# )
# Example: 100 MB/s target
# Producer: 10 MB/s per partition
# Consumer: 5 MB/s per partition
# Result: max(100/10, 100/5) = 20 partitions
kafka-topics.sh --create \
--bootstrap-server broker:9092 \
--topic high-volume \
--partitions 20 \
--replication-factor 3 \
--config min.insync.replicas=2 \
--config compression.type=lz4 \
--config retention.ms=604800000 # 7 days
# Guidelines:
# - Start with: partitions = 2-3x broker count
# - Max: ~4000 partitions per broker
# - Use powers of 2 for easier scaling
# ⚠️ WARNING: Adding partitions later breaks ordering
# Existing keys may route to different partitions
# Plan partition count carefully upfront
Storage Optimization
# Use compression to reduce storage
kafka-configs.sh --alter \
--entity-type topics \
--entity-name my-topic \
--add-config compression.type=lz4
# Compression ratios:
# - lz4: 2-3x (fast)
# - zstd: 4-6x (better compression)
# - gzip: 3-4x (slower)
# Log compaction for state topics
kafka-configs.sh --alter \
--entity-type topics \
--entity-name user-state \
--add-config cleanup.policy=compact
# Tiered storage (MSK Provisioned only)
# Unlimited retention with 90%+ cost savings
aws kafka update-storage \
--cluster-arn arn:aws:kafka:region:account:cluster/name/uuid \
--storage-mode TIERED
# Hot tier: Primary brokers (low latency)
# Cold tier: S3 (cost-effective, higher latency)
Monitoring for Performance
Kinesis Key Metrics
KINESIS_METRICS = {
'IncomingBytes': {
'threshold': 800000, # 80% of 1 MB/s per shard
'action': 'Scale up shards'
},
'GetRecords.IteratorAgeMilliseconds': {
'threshold': 300000, # 5 minutes
'action': 'Add consumers or EFO'
},
'WriteProvisionedThroughputExceeded': {
'threshold': 1,
'action': 'Immediate scaling needed'
},
'ReadProvisionedThroughputExceeded': {
'threshold': 1,
'action': 'Enable EFO or add shards'
},
'PutRecord.Success': {
'threshold': 95, # Percent
'action': 'Investigate errors if below'
}
}
MSK Key Metrics
MSK_METRICS = {
'CpuUser': {
'threshold': 70, # Percent
'action': 'Scale up brokers or optimize consumers'
},
'NetworkProcessorAvgIdlePercent': {
'threshold': 30, # Below 30% = overloaded
'action': 'Add brokers or upgrade instance type'
},
'UnderReplicatedPartitions': {
'threshold': 0, # Should always be 0
'action': 'Critical: investigate immediately'
},
'OfflinePartitionsCount': {
'threshold': 0, # Should always be 0
'action': 'Critical: data unavailable'
},
'ConsumerLag': {
'threshold': 10000, # Messages
'action': 'Add consumers or optimize processing'
}
}
Part 7: Production Best Practices
Kinesis Best Practices
1. Stream Design
# ✅ DO: Calculate shards with buffer
def calculate_shards_with_buffer(write_mb_s, read_mb_s, consumers):
base_shards = calculate_shards(write_mb_s, read_mb_s, consumers)
return math.ceil(base_shards * 1.3) # 30% buffer for growth
# ✅ DO: Use high-cardinality partition keys
import hashlib
partition_key = hashlib.md5(user_id.encode()).hexdigest()
# or
partition_key = f"{user_id}-{timestamp % 100}"
# ❌ DON'T: Use constant or low-cardinality keys
partition_key = "constant" # Creates hot shards
partition_key = region # Only 3-4 values
# ✅ DO: Handle resharding in consumers
@Override
public void shardEnded(ShardEndedInput input) {
input.checkpointer().checkpoint();
// KCL automatically handles parent/child relationships
}
2. Producer Best Practices
// ✅ DO: Use KPL for high throughput
KinesisProducerConfiguration config = new KinesisProducerConfiguration()
.setAggregationEnabled(true)
.setCollectionMaxCount(500)
.setRecordMaxBufferedTime(100)
.setMaxConnections(24);
// ✅ DO: Implement exponential backoff
// ✅ DO: Send failures to DLQ
// ✅ DO: Monitor shard-level metrics
// ❌ DON'T: Use single PutRecord for high volume
// ❌ DON'T: Ignore ProvisionedThroughputExceeded errors
3. Consumer Best Practices
// ✅ DO: Use Enhanced Fan-Out for multiple consumers
RetrievalConfig efoConfig = configsBuilder
.retrievalConfig()
.retrievalSpecificConfig(new FanOutConfig(kinesisClient));
// ✅ DO: Checkpoint periodically (not per record)
if (System.currentTimeMillis() - lastCheckpoint > 60000) {
checkpointer.checkpoint();
}
// ✅ DO: Consider KCL 3.x for reduced DynamoDB costs
// - On-Demand billing by default
// - Better performance
// - Note: Breaking changes from KCL 2.x
// ❌ DON'T: Checkpoint every record (expensive)
// ❌ DON'T: Process longer than shard iterator expiration
4. Operations
# ✅ DO: Choose right mode for your workload
# Provisioned: Predictable, steady throughput
# On-Demand: Variable, unpredictable loads (up to 200 MB/s)
# Provisioned with auto-scaling
autoscaling.put_scaling_policy(
PolicyName='scale-policy',
TargetValue=75.0, # Target 75% utilization
ScaleInCooldown=300,
ScaleOutCooldown=60
)
# On-Demand mode
kinesis.create_stream(
StreamModeDetails={'StreamMode': 'ON_DEMAND'}
)
# ✅ DO: Set appropriate retention
kinesis.increase_stream_retention_period(
StreamName='audit-stream',
RetentionPeriodHours=168 # 7 days
)
# ✅ DO: Use VPC endpoints to reduce costs and latency
ec2.create_vpc_endpoint(
ServiceName='com.amazonaws.region.kinesis-streams',
VpcEndpointType='Interface'
)
# ❌ DON'T: Use maximum retention for all streams
# ❌ DON'T: Under-provision for cost savings (leads to data loss)
5. Security
# ✅ DO: Enable encryption
kinesis.start_stream_encryption(
StreamName='sensitive-stream',
EncryptionType='KMS',
KeyId='arn:aws:kms:...'
)
# ✅ DO: Use VPC endpoints for private connectivity
ec2.create_vpc_endpoint(
ServiceName='com.amazonaws.region.kinesis-streams',
VpcEndpointType='Interface',
SubnetIds=private_subnets
)
# ✅ DO: Apply least privilege IAM policies
{
"Effect": "Allow",
"Action": ["kinesis:PutRecord", "kinesis:PutRecords"],
"Resource": "arn:aws:kinesis:region:account:stream/myStream"
}
# ✅ DO: Enable CloudTrail for audit logging
MSK Best Practices
1. Cluster Design
# ✅ DO: Deploy 3+ brokers across 3 AZs
MSKCluster:
BrokerNodeGroupInfo:
InstanceType: kafka.m5.xlarge
ClientSubnets:
- subnet-az1
- subnet-az2
- subnet-az3
BrokerAZDistribution: DEFAULT # Even distribution
# ✅ DO: Right-size partitions (plan ahead!)
# Formula: partitions = 2-3x broker count minimum
# Consider: target throughput ÷ throughput per partition
kafka-topics.sh --create \
--partitions 9 \ # 3 brokers × 3
--replication-factor 3 \
--config min.insync.replicas=2
# ⚠️ WARNING: Adding partitions breaks key ordering
# Existing messages stay in old partitions
# New messages with same key may go to different partition
# ❌ DON'T: Create too many partitions (overhead)
# ❌ DON'T: Use single-AZ deployment (no HA)
2. Deployment Mode Selection
# Decision tree:
Serverless:
Use when:
- Variable/unpredictable workload
- < 40 MB/s average throughput
- Want zero operational overhead
- Development/testing
Limitations:
- Max 365 day retention
- Limited Kafka configurations
- No tiered storage
Provisioned:
Use when:
- Sustained high throughput
- Need custom configurations
- Require > 365 day retention
- Use tiered storage
- Need specific Kafka versions
3. Producer Best Practices
// ✅ DO: Enable idempotence for reliability
Properties props = new Properties();
props.put("enable.idempotence", "true");
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
// ✅ DO: Optimize for throughput
props.put("batch.size", 65536); // 64 KB
props.put("linger.ms", 10);
props.put("compression.type", "lz4");
// ✅ DO: Use async sends with callbacks
producer.send(record, (metadata, exception) -> {
if (exception != null) {
handleError(record, exception);
}
});
// ❌ DON'T: Use acks=0 in production
// ❌ DON'T: Use synchronous sends (slow)
4. Consumer Best Practices
// ✅ DO: Scale consumers to partition count
// 12 partitions → up to 12 consumers for best parallelism
// ✅ DO: Use manual offset commits
props.put("enable.auto.commit", "false");
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(100);
processRecords(records);
consumer.commitSync();
}
// ✅ DO: Handle rebalances gracefully
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(); // Commit before losing partitions
}
});
// ✅ DO: Implement proper error handling with DLQ
for (ConsumerRecord<String, byte[]> record : records) {
try {
processRecord(record);
} catch (RetryableException e) {
retryWithBackoff(record);
} catch (NonRetryableException e) {
sendToDLQ(record, e);
}
}
// ❌ DON'T: Create more consumers than partitions
// ❌ DON'T: Block in poll loop (causes rebalancing)
5. Operations
# ✅ DO: Monitor critical metrics
CRITICAL_METRICS = [
'CpuUser', # < 70%
'UnderReplicatedPartitions', # = 0
'OfflinePartitionsCount', # = 0
'NetworkProcessorAvgIdlePercent' # > 30%
]
# ✅ DO: Set up comprehensive alarms
cloudwatch.put_metric_alarm(
AlarmName='msk-under-replicated',
MetricName='UnderReplicatedPartitions',
Threshold=0,
ComparisonOperator='GreaterThanThreshold'
)
# ✅ DO: Plan for capacity with headroom
TARGET_UTILIZATION = {
'cpu': 60, # 40% headroom
'network': 70, # 30% headroom
'storage': 75 # 25% headroom
}
# ✅ DO: Use tiered storage for long retention
aws kafka update-storage \
--cluster-arn arn:... \
--storage-mode TIERED
# 90%+ cost savings for cold data
# ❌ DON'T: Make multiple changes simultaneously
# ❌ DON'T: Skip capacity planning
6. Security
# ✅ DO: Enable authentication
MSKCluster:
ClientAuthentication:
Sasl:
Iam:
Enabled: true # Easiest with AWS services
Scram:
Enabled: true # For non-AWS clients
Tls:
CertificateAuthorityArnList:
- !Ref PrivateCA
# ✅ DO: Enable encryption
EncryptionInfo:
EncryptionAtRest:
DataVolumeKMSKeyId: !Ref KMSKey
EncryptionInTransit:
ClientBroker: TLS
InCluster: true
# ✅ DO: Use VPC private connectivity
# MSK supports multi-VPC access via PrivateLink
ConnectivityInfo:
VpcConnectivity:
ClientAuthentication:
Sasl:
Iam:
Enabled: true
# ✅ DO: Use security groups restrictively
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 9094 # TLS only
ToPort: 9094
SourceSecurityGroupId: !Ref AppSecurityGroup
# ❌ DON'T: Use PLAINTEXT in production
# ❌ DON'T: Allow 0.0.0.0/0 access
7. Topic Configuration
# ✅ DO: Set appropriate retention
kafka-configs.sh --alter \
--entity-type topics \
--entity-name production-topic \
--add-config retention.ms=604800000 # 7 days
# ✅ DO: Use log compaction for state topics
kafka-configs.sh --alter \
--entity-type topics \
--entity-name user-state \
--add-config cleanup.policy=compact
# ✅ DO: Configure min.insync.replicas
kafka-configs.sh --alter \
--entity-type topics \
--entity-name critical-topic \
--add-config min.insync.replicas=2
# ✅ DO: Plan partition count carefully upfront
# Adding partitions later breaks ordering guarantees
# ❌ DON'T: Use same config for all topics
# ❌ DON'T: Set retention longer than needed (costs)
8. Multi-Region/Multi-VPC Patterns
# Multi-VPC Connectivity:
Options:
VPC Peering:
Use: Simple, low latency
Limitation: Same region only
Transit Gateway:
Use: Hub-and-spoke, multiple VPCs
Cost: $0.05/GB data transfer
PrivateLink (MSK Multi-VPC):
Use: Managed, secure cross-VPC/account
Cost: $0.01/GB + $0.01/hour per ENI
# Multi-Region:
Active-Passive:
Tool: MirrorMaker 2.0
Use: Disaster recovery
Active-Active:
Tool: Custom dual-write or Confluent Cluster Linking
Use: Global applications
Complexity: High (conflict resolution needed)
Part 8: Making the Right Choice
Enhanced Decision Matrix
Use this framework to choose between Kinesis, MSK Serverless, and MSK Provisioned:
Choose Kinesis if you answer "Yes" to 3+ questions:
Questions:
- Is this your first streaming project?
- Do you primarily use AWS services?
- Is operational simplicity more important than flexibility?
- Is your throughput < 10 MB/s per stream?
- Do you have limited Kafka expertise on the team?
- Are you building a real-time analytics pipeline?
- Do you need Lambda integration?
- Is predictable pricing important?
- Do you need < 365 day retention?
Choose MSK Serverless if you answer "Yes" to 3+ questions:
Questions:
- Do you need Kafka APIs but not operational overhead?
- Is throughput variable/unpredictable (< 40 MB/s avg)?
- Are you migrating from Kafka but want simplicity?
- Do you want pay-per-use pricing?
- Is this for development/testing environments?
- Do you need Kafka Connect/Streams?
- Can you work within 365-day retention limit?
- Do you want automatic scaling?
Choose MSK Provisioned if you answer "Yes" to 3+ questions:
Questions:
- Are you migrating from existing Kafka?
- Do you need sustained throughput > 40 MB/s?
- Is retention > 1 year required?
- Do you have Kafka expertise in-house?
- Are you building event-driven microservices?
- Do you need exactly-once semantics?
- Do you require custom Kafka configurations?
- Is this part of a multi-cloud strategy?
- Do you need tiered storage?
Decision Flow Chart
START: Need stream processing on AWS
│
├─ Need Kafka APIs?
│ ├─ NO → Kinesis
│ └─ YES → Need operational control?
│ ├─ NO → MSK Serverless
│ └─ YES → Need > 365 day retention or > 200 MB/s?
│ ├─ NO → MSK Serverless
│ └─ YES → MSK Provisioned
Throughput-Based Guidance:
< 10 MB/s: Kinesis (simplest) or MSK Serverless
10-40 MB/s: Kinesis On-Demand or MSK Serverless
40-200 MB/s: MSK Serverless or MSK Provisioned
> 200 MB/s: MSK Provisioned
Cost-Based Guidance:
Kinesis:
- Predictable: Provisioned mode
- Variable: On-Demand mode
MSK Serverless:
- Pay per GB (good for < 40 MB/s average)
MSK Provisioned:
- Fixed broker costs (better for high sustained throughput)
Cost Comparison (Corrected)
Kinesis Pricing Example
# Scenario: 5 MB/s write, 10 MB/s read, 2 consumers
# PROVISIONED MODE
# Shards needed: 5 write + (10×2/2 read) = max(5, 10) = 10 shards
shards = 10
hours_per_month = 730
# Shard cost
shard_cost = shards * 0.015 * hours_per_month # $109.50
# PUT cost (corrected calculation)
# 5 MB/s = 5,120 KB/s
# Each PUT unit = 25 KB
# PUT units per second = 5,120 / 25 = 204.8
# PUT units per month = 204.8 * 3600 * 730 = 537,734,400
# Cost per 1M PUT units = $0.014
put_cost = (537.7 * 0.014) # $7.53
# Enhanced Fan-Out (if needed for 2 consumers)
efo_cost = shards * 2 * 0.015 * hours_per_month # $219.00
# Plus data retrieval
data_retrieved_gb = 10 * 0.001 * 3600 * 730 # 26,280 GB
efo_data_cost = data_retrieved_gb * 0.015 # $394.20
total_provisioned = shard_cost + put_cost + efo_cost + efo_data_cost
# $109.50 + $7.53 + $219.00 + $394.20 = $730.23/month
# ON-DEMAND MODE
# Automatically scales, pay per GB ingested
data_ingested_gb = 5 * 0.001 * 3600 * 730 # 13,140 GB
ondemand_write_cost = data_ingested_gb * 0.04 # $525.60
# Data retrieval cost (shared mode, no EFO)
data_read_gb = 10 * 0.001 * 3600 * 730 # 26,280 GB
ondemand_read_cost = data_read_gb * 0.013 # $341.64
total_ondemand = ondemand_write_cost + ondemand_read_cost
# $525.60 + $341.64 = $867.24/month
# Comparison:
# Provisioned (with EFO): $730.23/month
# On-Demand: $867.24/month
# Provisioned is cheaper for sustained 5 MB/s
MSK Pricing Example
# Scenario: 3 brokers (kafka.m5.xlarge), 1TB storage each
# MSK PROVISIONED
broker_cost = 3 * 0.42 * 730 # $919.80
storage_cost = 3 * 1000 * 0.10 # $300.00
total_msk_provisioned = broker_cost + storage_cost # $1,219.80/month
# MSK SERVERLESS (for same 5 MB/s workload)
# Ingress: 5 MB/s × 3600s × 730h = 13,140 GB
ingress_cost = 13140 * 0.10 # $1,314.00
# Egress: 10 MB/s × 3600s × 730h = 26,280 GB
egress_cost = 26280 * 0.05 # $1,314.00
# Storage (assuming 1TB average)
storage_serverless = 1000 * 0.10 # $100.00
total_msk_serverless = ingress_cost + egress_cost + storage_serverless
# $1,314.00 + $1,314.00 + $100.00 = $2,728.00/month
# For this workload:
# Kinesis Provisioned: $730/month (cheapest for this pattern)
# MSK Provisioned: $1,220/month
# MSK Serverless: $2,728/month (expensive for sustained high throughput)
# MSK Serverless is cheaper for:
# - Low average throughput (< 5 MB/s)
# - Variable workloads (spikes with low average)
# - Dev/test environments
Migration Considerations
Kinesis to MSK
# When to migrate:
triggers = [
"Need Kafka ecosystem tools (Connect, Streams, KSQL)",
"Outgrowing Kinesis limits (365 day retention)",
"Cost optimization for sustained high throughput (> 20 MB/s)",
"Multi-cloud requirements",
"Advanced processing with exactly-once semantics",
"Need custom Kafka configurations"
]
# Migration approach:
steps = [
"1. Choose MSK deployment (Serverless vs Provisioned)",
"2. Set up MSK cluster with appropriate sizing",
"3. Dual-write (Kinesis + MSK) - validate consistency",
"4. Migrate consumers to MSK incrementally",
"5. Validate data integrity and performance",
"6. Cutover producers to MSK",
"7. Deprecate Kinesis stream after retention period"
]
# Tools:
# - AWS Lambda for dual-write orchestration
# - Kinesis Data Firehose → MSK (via Lambda)
# - Custom application with both SDKs
MSK to Kinesis
# When to migrate:
triggers = [
"Reducing operational overhead",
"Simplifying architecture",
"Throughput decreased significantly (< 10 MB/s)",
"Team lacks Kafka expertise",
"Want deeper AWS integration (Lambda, Firehose)",
"Don't need > 365 day retention",
"Cost optimization for variable loads"
]
# Migration approach:
steps = [
"1. Create Kinesis streams with appropriate shards",
"2. Set up MirrorMaker 2.0 (Kafka → Kinesis)",
" - Or custom application with both SDKs",
"3. Migrate consumers to Kinesis incrementally",
"4. Validate data and monitor lag",
"5. Cutover producers to Kinesis",
"6. Decommission MSK cluster after retention"
]
# Considerations:
# - Kinesis max 1 MB record size vs Kafka's configurable limit
# - Partition key strategy may need adjustment
# - No exactly-once semantics in Kinesis
Production Readiness Checklist
Kinesis Production Checklist
Infrastructure:
- [ ] Calculated appropriate shard count with 30% buffer
- [ ] Chose deployment mode (Provisioned vs On-Demand)
- [ ] Enabled Enhanced Fan-Out (if multiple consumers)
- [ ] Configured auto-scaling (if Provisioned mode)
- [ ] Set appropriate retention period (24h-365d)
- [ ] Enabled encryption with KMS
- [ ] Set up VPC endpoints for private connectivity
Code:
- [ ] Implemented exponential backoff for retries
- [ ] Created dead letter queue/stream for failed records
- [ ] Optimized checkpoint frequency (time or count-based)
- [ ] Used KPL for high throughput or batch APIs
- [ ] Implemented proper error handling
- [ ] Used high-cardinality partition keys
- [ ] Handled resharding in consumers (shardEnded callback)
Operations:
- [ ] Set up CloudWatch alarms:
- IncomingBytes (80% threshold)
- WriteProvisionedThroughputExceeded
- GetRecords.IteratorAgeMilliseconds
- PutRecord.Success rate
- [ ] Created monitoring dashboard
- [ ] Documented partition key strategy
- [ ] Configured DynamoDB capacity (for KCL)
- [ ] Tested failure scenarios
- [ ] Load tested at 2x expected peak
Security:
- [ ] Applied least privilege IAM policies
- [ ] Enabled CloudTrail logging
- [ ] Reviewed network security groups
- [ ] Tested VPC endpoint connectivity
- [ ] Enabled KMS encryption for sensitive data
MSK Serverless Production Checklist
Infrastructure:
- [ ] Deployed in private subnets across 3 AZs
- [ ] Calculated expected throughput (< 200 MB/s)
- [ ] Set appropriate retention (< 365 days)
- [ ] Enabled IAM authentication
Configuration:
- [ ] Created topics with appropriate partition count
- [ ] Set min.insync.replicas=2 for critical topics
- [ ] Configured compression (lz4 or zstd)
- [ ] Set appropriate retention policy per topic
Code:
- [ ] Enabled idempotent producers
- [ ] Implemented manual offset commits
- [ ] Set up consumer rebalance listeners
- [ ] Created dead letter topic pattern
- [ ] Proper error handling with retries
- [ ] Optimized batch.size and linger.ms
Operations:
- [ ] Set up CloudWatch alarms:
- Cluster metrics available
- ConsumerLag monitoring
- [ ] Created monitoring dashboard
- [ ] Documented consumer group strategy
- [ ] Load tested at expected peak
Security:
- [ ] Enabled IAM authentication
- [ ] Enabled encryption in transit (TLS)
- [ ] Configured security groups (restrictive)
- [ ] Applied least privilege IAM policies
- [ ] Set up VPC Flow Logs
MSK Provisioned Production Checklist
Infrastructure:
- [ ] Deployed 3+ brokers across 3 AZs
- [ ] Right-sized broker instances (capacity planning)
- [ ] Calculated partition count (plan for growth)
- [ ] Configured proper storage (EBS with appropriate IOPS)
- [ ] Set up VPC with private subnets
- [ ] Considered tiered storage for long retention
Configuration:
- [ ] Configured min.insync.replicas=2
- [ ] Set appropriate topic retention
- [ ] Enabled compression
- [ ] Configured log segments properly
- [ ] Set up log compaction (if needed)
Code:
- [ ] Enabled idempotent producers
- [ ] Implemented manual offset commits
- [ ] Set up consumer rebalance listeners
- [ ] Created dead letter topic pattern
- [ ] Proper error handling with retries
- [ ] Optimized producer/consumer configs
Operations:
- [ ] Set up comprehensive CloudWatch alarms:
- CpuUser (< 70%)
- NetworkProcessorAvgIdlePercent (> 30%)
- UnderReplicatedPartitions (= 0)
- OfflinePartitionsCount (= 0)
- ConsumerLag
- [ ] Configured monitoring dashboard
- [ ] Documented consumer group strategy
- [ ] Created disaster recovery plan
- [ ] Capacity planning with 40% headroom
- [ ] Tested broker failure scenarios
- [ ] Load tested at 2x expected peak
Security:
- [ ] Enabled authentication (IAM and/or SASL/SCRAM)
- [ ] Enabled encryption (at rest and in transit)
- [ ] Configured security groups (restrictive)
- [ ] Set up VPC Flow Logs
- [ ] Applied least privilege IAM policies
- [ ] Considered multi-VPC connectivity (if needed)
- [ ] Set up AWS PrivateLink (for cross-VPC access)
Conclusion
Key Takeaways
Amazon Kinesis is the right choice when you:
- Want AWS-native simplicity with minimal operations
- Need tight integration with Lambda, Firehose, and other AWS services
- Have moderate, predictable throughput requirements (< 10 MB/s)
- Value quick time-to-market over flexibility
- Have limited Kafka expertise
- Can work within 365-day retention limits
Amazon MSK Serverless is the right choice when you:
- Need Kafka APIs without operational overhead
- Have variable/unpredictable workloads (< 40 MB/s average)
- Want automatic scaling with pay-per-use pricing
- Need Kafka ecosystem tools (Connect, Streams)
- Can work within 365-day retention limits
- Want zero cluster management
Amazon MSK Provisioned is the right choice when you:
- Need the full Kafka ecosystem (Connect, Streams, KSQL)
- Require advanced features like exactly-once semantics
- Have sustained high throughput (> 40 MB/s)
- Need retention > 365 days or unlimited (with tiered storage)
- Are migrating existing Kafka workloads
- Need multi-cloud or hybrid capabilities
- Require custom Kafka configurations
Final Recommendations
Start Simple: If unsure, start with Kinesis for pure AWS workloads or MSK Serverless if you need Kafka APIs. Both offer minimal operational overhead. You can always migrate to MSK Provisioned later if you outgrow them.
Think Long-Term: Consider your team's expertise, future requirements, and growth trajectory. MSK Provisioned requires more upfront investment but offers maximum flexibility.
-
Consider MSK Serverless: Often overlooked, MSK Serverless bridges the gap between Kinesis simplicity and Kafka capabilities. It's perfect for:
- Variable workloads
- Development/testing
- Medium throughput with Kafka requirements
Don't Optimize Prematurely: All three options can handle significant scale. Focus on getting your streaming pipeline working correctly first, then optimize for performance and cost.
Monitor Continuously: Regardless of choice, implement comprehensive monitoring from day one. Stream processing issues compound quickly if undetected.
Test Thoroughly: Test error scenarios, failover, partition rebalancing, and performance under load in non-production environments before going live.
Plan Partition Count: For MSK, plan partition count carefully upfront. Adding partitions later breaks ordering guarantees for existing keys.
Quick Reference Guide
Choose Kinesis when:
Throughput: < 10 MB/s
Retention: < 365 days
Team: Limited streaming expertise
Priority: Time to market, AWS integration
Cost: Predictable, shard-based pricing
Choose MSK Serverless when:
Throughput: Variable, < 40 MB/s average
Retention: < 365 days
Team: Kafka knowledge but want less ops
Priority: Kafka APIs + auto-scaling
Cost: Pay-per-use (good for variable loads)
Choose MSK Provisioned when:
Throughput: > 40 MB/s sustained
Retention: > 365 days or unlimited
Team: Strong Kafka expertise
Priority: Control, customization, ecosystem
Cost: Fixed (better for sustained high throughput)
Cost Optimization Tips
Kinesis:
- Use On-Demand for variable workloads
- Enable EFO only when needed (adds cost)
- Checkpoint every 60 seconds, not per record
- Use KPL aggregation for better PUT efficiency
- Right-size shards (don't over-provision)
MSK Serverless:
- Best for < 40 MB/s average throughput
- Enable compression (lz4) to reduce ingress/egress costs
- Use for dev/test environments
- Consider for variable production workloads
MSK Provisioned:
- Better economics for > 40 MB/s sustained
- Use tiered storage for long retention (90% savings)
- Enable compression (reduces storage costs)
- Right-size brokers (don't over-provision)
- Use Graviton instances for 20% cost savings
Next Steps
- Prototype: Build a simple proof-of-concept with your preferred option(s)
- Benchmark: Test with your actual data patterns and throughput
- Calculate TCO: Include operational overhead, not just AWS costs
- Review Team Skills: Assess current expertise and training needs
- Start Small: Begin with one use case, prove it, then expand
- Document: Create runbooks for common operations
- Monitor: Set up comprehensive monitoring and alerting
Resources
Official Documentation:
Best Practices:
Tools & Libraries:
Additional Reading:
About This Guide: This comprehensive guide covers Amazon Kinesis and Amazon MSK (Serverless and Provisioned). AWS services evolve rapidly—always consult official documentation for the latest features and pricing.
Found this guide helpful? Share it with your team! For questions, corrections, or suggestions, please reach out through GitHub Issues or the comments section.
Top comments (0)