DEV Community

Cover image for Amazon Kinesis vs Amazon MSK: The Complete Guide for Stream Processing on AWS
Data Tech Bridge
Data Tech Bridge

Posted on

Amazon Kinesis vs Amazon MSK: The Complete Guide for Stream Processing on AWS

Introduction

When it comes to real-time stream processing on AWS, developers face a critical choice: Amazon Kinesis or Amazon MSK (Managed Streaming for Apache Kafka)? Both services handle high-throughput data streams, but they're fundamentally different in philosophy, architecture, and use cases.

This comprehensive guide covers everything you need to know—from architectural differences to production best practices—helping you make the right choice for your streaming workloads.

Table of Contents

  1. Key Differentiators: Kinesis vs MSK
  2. Architectural Differences
  3. Producers and Consumers
  4. Checkpointing and Acknowledgment
  5. Error Handling
  6. Performance Tuning
  7. Best Practices
  8. Decision Framework

Part 1: Key Differentiators

When to Choose Amazon Kinesis

AWS-Native Simplicity

Kinesis is AWS's fully managed, opinionated streaming service. Think of it as the "serverless" approach to stream processing—you focus on your data, AWS handles everything else.

Best for:

  • Tight AWS Integration: Native connections to Lambda, Firehose, DynamoDB, and more
  • Quick Time-to-Market: Setup takes minutes, not hours
  • Limited Kafka Expertise: No need to understand Kafka internals
  • Predictable Workloads: Clear pricing model based on shard-hours

Ideal Use Cases:

✅ Real-time analytics dashboards
✅ Log aggregation to S3/Redshift
✅ IoT telemetry collection
✅ Clickstream analysis
✅ CloudWatch Logs processing
Enter fullscreen mode Exit fullscreen mode

When to Choose Amazon MSK

Kafka Ecosystem Power

MSK provides managed Kafka clusters with full access to the Kafka ecosystem. Now available in three deployment modes: Provisioned, Serverless, and Connect.

Best for:

  • Kafka Compatibility: Need Kafka Connect, Kafka Streams, or KSQL
  • Migration Scenarios: Moving existing Kafka workloads to AWS
  • Advanced Control: Custom configurations, long retention, exactly-once semantics
  • High Throughput: Sustained throughput over 10 MB/s per stream
  • Multi-cloud Strategy: Standard Kafka APIs work everywhere

Ideal Use Cases:

✅ Database change data capture (CDC) with Debezium
✅ Event-driven microservices architectures
✅ Complex stream processing with Kafka Streams
✅ Multi-datacenter replication
✅ Event sourcing with long retention
✅ Real-time data pipelines with Kafka Connect
Enter fullscreen mode Exit fullscreen mode

Quick Comparison

Factor Kinesis MSK Serverless MSK Provisioned
Setup Time 5 minutes 5 minutes 30-60 minutes
Learning Curve Low Low-Medium Medium-High
Ops Overhead Minimal Minimal Moderate
Throughput/Partition 1 MB/s write, 2 MB/s read per shard (Provisioned/On-demand) 5 MB/s write, 10 MB/s read per partition (cluster max: 200 MB/s write, 400 MB/s read) 10+ MB/s per partition (scales with broker type and provisioned storage throughput)
Max Retention 365 days 365 days Unlimited (with tiered storage)
API Standard AWS proprietary Apache Kafka Apache Kafka
Ecosystem AWS services Kafka ecosystem Full Kafka ecosystem
Pricing Model Shard-hour + PUT units Pay per throughput Broker-hour + storage
Scaling Manual/Auto Automatic Manual/Automatic

Part 2: Architectural Deep Dive

Understanding the architectural differences is crucial for making the right choice.

Data Organization: Shards vs Partitions

Kinesis Architecture

Stream: myOrdersStream
  ├── Shard 1 ────→ [Sequence: 001, 002, 003...]
  ├── Shard 2 ────→ [Sequence: 001, 002, 003...]
  └── Shard 3 ────→ [Sequence: 001, 002, 003...]

Key Points:
- Shard = capacity unit (1 MB/s write, 2 MB/s read)
- Fixed throughput per shard
- Shards are AWS-managed infrastructure
- Partition key determines shard routing
Enter fullscreen mode Exit fullscreen mode

MSK (Kafka) Architecture

Topic: orders
  ├── Partition 0 ────→ [Offset: 0, 1, 2...]
  ├── Partition 1 ────→ [Offset: 0, 1, 2...]
  └── Partition 2 ────→ [Offset: 0, 1, 2...]

Brokers (Provisioned Mode):
  Broker 1: Partitions [0, 3, 6]
  Broker 2: Partitions [1, 4, 7]
  Broker 3: Partitions [2, 5, 8]

Key Points:
- Partition = logical ordering unit
- Throughput determined by broker resources (Provisioned) or auto-scales (Serverless)
- Partitions distributed across brokers
- Configurable replication
Enter fullscreen mode Exit fullscreen mode

Critical Difference:

  • Kinesis shards are capacity containers
  • Kafka partitions are ordering guarantees with flexible throughput

Consumer Models

Kinesis: Shared vs Enhanced Fan-Out

Standard (Shared) Model:

Stream [Shard 1, Shard 2]
         │
    ┌────┴────┐
    │         │
App A       App B  
    └─────────┘
       │
   2 MB/s total (shared)
Enter fullscreen mode Exit fullscreen mode

Enhanced Fan-Out (EFO):

Stream [Shard 1, Shard 2]
    │
    ├─────→ App A: 2 MB/s dedicated
    ├─────→ App B: 2 MB/s dedicated
    └─────→ App C: 2 MB/s dedicated

Cost: 
- $0.015/consumer/shard/hour
- $0.015/GB data retrieved
Latency: ~70ms (vs ~200ms standard)
Enter fullscreen mode Exit fullscreen mode

MSK: Consumer Groups

Topic [P0, P1, P2, P3]
         │
    ┌────┴────┐
    │         │
Consumer Group: "analytics"
├── Consumer 1 → P0, P1
└── Consumer 2 → P2, P3

Consumer Group: "backup"  
└── Consumer 1 → P0, P1, P2, P3

Key Benefits:
- Parallel processing within group
- Independent consumer groups
- Automatic rebalancing
- Flexible scaling
Enter fullscreen mode Exit fullscreen mode

Scaling Mechanisms

Kinesis: Shard-Based Scaling

# Calculate required shards
def calculate_shards(write_mb_s, read_mb_s, consumers):
    write_shards = math.ceil(write_mb_s / 1.0)
    read_shards = math.ceil(read_mb_s * consumers / 2.0)
    return max(write_shards, read_shards)

# Provisioned Mode: Manual scaling
kinesis.update_shard_count(
    StreamName='myStream',
    TargetShardCount=10,
    ScalingType='UNIFORM_SCALING'
)

# On-Demand Mode: Auto-scales based on throughput
kinesis.create_stream(
    StreamName='autoScaleStream',
    StreamModeDetails={'StreamMode': 'ON_DEMAND'}
)
# On-Demand characteristics:
# - Scales to 4 MB/s write per shard automatically
# - Default account limit: 200 MB/s (can be increased)
# - Doubles previous peak capacity
# - Automatically adds/removes shards
Enter fullscreen mode Exit fullscreen mode

MSK Serverless: Automatic Scaling

# MSK Serverless (GA 2022)
msk.create_cluster_v2(
    ClusterName='serverless-cluster',
    Serverless={
        'VpcConfigs': [{
            'SubnetIds': subnet_ids,
            'SecurityGroupIds': sg_ids
        }],
        'ClientAuthentication': {
            'Sasl': {'Iam': {'Enabled': True}}
        }
    }
)

# Characteristics:
# - Automatically scales 1-200 MB/s
# - Pay per GB ingress/egress
# - No broker management
# - Compatible with Kafka APIs
# - Limited configuration options
Enter fullscreen mode Exit fullscreen mode

MSK Provisioned: Broker + Partition Scaling

# Add partitions (instant, no data movement)
kafka-topics.sh --alter \
  --bootstrap-server broker:9092 \
  --topic orders \
  --partitions 20

# ⚠️ WARNING: Adding partitions breaks ordering for existing keys
# Existing messages stay in old partitions
# Use consistent hashing: partition = hash(key) % total_partitions

# Add brokers (requires cluster expansion)
aws kafka update-broker-count \
  --cluster-arn arn:aws:kafka:... \
  --current-version v1 \
  --target-number-of-broker-nodes 6
Enter fullscreen mode Exit fullscreen mode

Key Difference:

  • Kinesis: Data reshards (takes 15-30 minutes)
  • MSK: New partitions, no resharding (instant but affects key distribution)

Storage and Retention

Kinesis: Time-Based, Abstracted

# Retention: 24 hours to 365 days
kinesis.increase_stream_retention_period(
    StreamName='myStream',
    RetentionPeriodHours=168  # 7 days
)

Characteristics:
- Always 3x replication (hidden)
- Pay for shard-hours, not storage
- Automatic cleanup after retention
- Maximum 365 days
Enter fullscreen mode Exit fullscreen mode

MSK: Configurable, Visible Storage

# Time-based retention (Serverless & Provisioned)
kafka-configs.sh --alter \
  --entity-type topics \
  --entity-name orders \
  --add-config retention.ms=604800000  # 7 days

# Size-based retention
--add-config retention.bytes=10737418240  # 10 GB

# Log compaction (infinite retention)
--add-config cleanup.policy=compact

# NEW: MSK Tiered Storage (Provisioned only)
# Unlimited retention at 90%+ cost savings
aws kafka update-storage \
  --cluster-arn arn:aws:kafka:... \
  --storage-mode TIERED

# Hot tier: Primary brokers (low latency)
# Cold tier: S3 (cost-effective, higher latency)
Enter fullscreen mode Exit fullscreen mode

Part 3: Producers and Consumers Ecosystem

Kinesis Producers

AWS Services (Native)

Direct Integration:
  CloudWatch Logs    → Subscription Filters → Kinesis
  AWS IoT Core       → IoT Rules          → Kinesis
  EventBridge        → Targets            → Kinesis
  DMS                → Kinesis Target     → Kinesis
  API Gateway        → Direct PUT         → Kinesis
Enter fullscreen mode Exit fullscreen mode

Application SDKs

# AWS SDK (boto3)
import boto3

kinesis = boto3.client('kinesis')
response = kinesis.put_record(
    StreamName='myStream',
    Data=json.dumps({'user': 'john', 'action': 'login'}),
    PartitionKey='user-123'
)

# Batch for better performance
kinesis.put_records(
    StreamName='myStream',
    Records=[
        {'Data': data1, 'PartitionKey': 'key1'},
        {'Data': data2, 'PartitionKey': 'key2'}
    ]
)
Enter fullscreen mode Exit fullscreen mode
// Kinesis Producer Library (KPL) - High Performance
KinesisProducerConfiguration config = new KinesisProducerConfiguration()
    .setAggregationEnabled(true)
    .setCollectionMaxCount(500)
    .setRecordMaxBufferedTime(100);

KinesisProducer producer = new KinesisProducer(config);
producer.addUserRecord("myStream", "partitionKey", data);

// Achieves 5-10x better throughput than raw SDK
Enter fullscreen mode Exit fullscreen mode

Kinesis Consumers

AWS Services

Lambda:
  Trigger: Kinesis Stream
  Batch Size: 100-10000
  Parallelization: Up to 10 batches per shard
  Use Case: Real-time processing, triggers

Kinesis Data Firehose:
  Destinations: S3, Redshift, OpenSearch, HTTP, Snowflake
  Transformation: Lambda-based
  Use Case: Data lake ingestion

Kinesis Data Analytics:
  SQL: Real-time SQL queries
  Flink: Complex stream processing
  Use Case: Real-time analytics

EMR/Glue:
  Spark Streaming integration
  Use Case: Big data processing
Enter fullscreen mode Exit fullscreen mode

Application SDKs

// Kinesis Client Library (KCL)
// Note: KCL 3.x (2023) available with performance improvements

// KCL 2.x (stable, widely used)
ConfigsBuilder configsBuilder = new ConfigsBuilder(
    streamName, applicationName, 
    kinesisClient, dynamoDBClient, cloudWatchClient,
    workerId, recordProcessorFactory
);

Scheduler scheduler = new Scheduler(
    configsBuilder.checkpointConfig(),
    configsBuilder.coordinatorConfig(),
    configsBuilder.retrievalConfig()
);

new Thread(scheduler).start();

// KCL 3.x improvements:
// - Better performance
// - Reduced DynamoDB costs (on-demand billing by default)
// - Breaking changes from 2.x (multilang support removed)
Enter fullscreen mode Exit fullscreen mode

MSK Producers

AWS Services

AWS DMS:
  Target: Kafka (MSK)
  Use Case: Database CDC

Lambda:
  Library: kafka-python, node-rdkafka
  Use Case: Event-driven producers

EventBridge → Lambda → MSK:
  Pattern for AWS event routing
Enter fullscreen mode Exit fullscreen mode

Kafka Connect (Source Connectors)

Popular Connectors:
  Debezium:
    - MySQL, PostgreSQL, Oracle, SQL Server
    - MongoDB, Cassandra
    Use Case: Database CDC

  JDBC Source:
    - Any JDBC database
    Use Case: Bulk imports

  S3 Source:
    Use Case: File ingestion

  Salesforce Source:
    Use Case: CRM data integration

  MSK Connect:
    - Fully managed Kafka Connect
    - Deploy connectors without infrastructure
Enter fullscreen mode Exit fullscreen mode

Native Kafka Clients

// Java Producer
Properties props = new Properties();
props.put("bootstrap.servers", "b-1.msk:9092,b-2.msk:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("topic", "key", value));
Enter fullscreen mode Exit fullscreen mode
# Python Producer
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=['b-1.msk:9092', 'b-2.msk:9092']
)
producer.send('topic', b'message')
Enter fullscreen mode Exit fullscreen mode

MSK Consumers

AWS Services

Lambda:
  Event Source: MSK (Serverless & Provisioned)
  Batch Size: Configurable
  Use Case: Serverless processing

Glue:
  Streaming ETL from MSK
  Use Case: Data transformations

EMR:
  Spark Streaming, Flink
  Use Case: Big data analytics
Enter fullscreen mode Exit fullscreen mode

Kafka Connect (Sink Connectors)

Popular Sinks:
  S3 Sink:
    Format: Parquet, Avro, JSON
    Partitioning: Time-based, field-based

  JDBC Sink:
    Targets: PostgreSQL, MySQL, etc.
    Use Case: Database updates

  Elasticsearch Sink:
    Use Case: Search indexing

  Snowflake Sink:
    Use Case: Data warehousing
Enter fullscreen mode Exit fullscreen mode

Stream Processing Frameworks

// Kafka Streams
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("input-topic");

stream
    .filter((key, value) -> value.contains("error"))
    .mapValues(value -> value.toUpperCase())
    .to("output-topic");

// Apache Flink (works with both MSK Serverless & Provisioned)
StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>("topic", schema, properties))
    .map(value -> process(value))
    .addSink(new FlinkKafkaProducer<>("output-topic", schema, properties));
Enter fullscreen mode Exit fullscreen mode
-- KSQL (SQL on Kafka streams)
CREATE STREAM clicks_stream (
    user_id VARCHAR,
    page VARCHAR,
    timestamp BIGINT
) WITH (
    KAFKA_TOPIC='clicks',
    VALUE_FORMAT='JSON'
);

SELECT user_id, COUNT(*) as click_count
FROM clicks_stream
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY user_id;
Enter fullscreen mode Exit fullscreen mode

Part 4: Checkpointing and Acknowledgment

Understanding how data is acknowledged and progress is tracked is critical for reliability.

Write Acknowledgment (Producer → Cluster)

Kinesis: Always Strongly Consistent

# Write path
response = kinesis.put_record(
    StreamName='myStream',
    Data=data,
    PartitionKey='key'
)

# Behind the scenes:
# 1. Data written to leader shard
# 2. Synchronously replicated to 2 other AZs (always 3x)
# 3. Acknowledgment after all 3 writes complete
# 4. Returns sequence number

print(response['SequenceNumber'])
# '49590338271490256608559692538361571095921575989136588898'

Guarantees:
 Always 3-way replication
 Acknowledgment only after all replicas written
 No configuration options
 ~70-200ms latency
Enter fullscreen mode Exit fullscreen mode

MSK: Configurable Durability

// acks=0: Fire and forget (fastest, risky)
props.put("acks", "0");
// Producer doesn't wait, no guarantee

// acks=1: Leader only (balanced)
props.put("acks", "1");
// Wait for leader write, ~5-10ms

// acks=all: All ISR (safest)
props.put("acks", "all");
props.put("min.insync.replicas", "2");  // Topic config
// Wait for 2+ replicas, ~15-50ms

Performance Impact:
┌────────┬────────────┬─────────┬──────────────┐
 acks    Throughput  Latency  Durability   
├────────┼────────────┼─────────┼──────────────┤
 0       150K msg/s  1ms      Low          
 1       100K msg/s  5ms      Medium       
 all     80K msg/s   15ms     High         
└────────┴────────────┴─────────┴──────────────┘
Enter fullscreen mode Exit fullscreen mode

Idempotent Producer (Prevent Duplicates)

// Kafka only - prevents duplicates on retry
Properties props = new Properties();
props.put("enable.idempotence", "true");
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);

// With idempotence:
// - Automatic sequence numbers
// - Deduplication on broker
// - Exactly-once delivery to topic
Enter fullscreen mode Exit fullscreen mode

Read Checkpointing (Consumer Progress)

Kinesis: DynamoDB-Based

// KCL automatically creates DynamoDB table
// Table structure:
┌──────────────┬────────────┬──────────────┐
 ShardId       Checkpoint  LeaseOwner   
├──────────────┼────────────┼──────────────┤
 shardId-001   495903...   worker-1     
 shardId-002   495904...   worker-2     
└──────────────┴────────────┴──────────────┘

// In your record processor
public void processRecords(ProcessRecordsInput input) {
    for (KinesisClientRecord record : input.records()) {
        processRecord(record);
    }

    // Checkpoint (writes to DynamoDB)
    input.checkpointer().checkpoint();
}

Cost Consideration (KCL 2.x):
- Each checkpoint = 1 DynamoDB write
- 10 shards × 20 checkpoints/min = 200 WCU/min
- Plus lease renewals  200 WCU/min
- Total: ~7 WCU steady state

KCL 3.x Improvement:
- Uses DynamoDB On-Demand billing by default
- Reduced checkpoint costs
- Better DynamoDB efficiency
Enter fullscreen mode Exit fullscreen mode

Checkpoint Strategies

// ❌ BAD: Checkpoint every record (expensive)
for (KinesisClientRecord record : records) {
    processRecord(record);
    checkpointer.checkpoint(record);  // Too many DynamoDB writes
}

// ✅ GOOD: Time-based checkpointing
private long lastCheckpoint = System.currentTimeMillis();
private static final long INTERVAL = 60000;  // 1 minute

public void processRecords(ProcessRecordsInput input) {
    for (KinesisClientRecord record : input.records()) {
        processRecord(record);
    }

    if (System.currentTimeMillis() - lastCheckpoint > INTERVAL) {
        input.checkpointer().checkpoint();
        lastCheckpoint = System.currentTimeMillis();
    }
}

// ✅ GOOD: Count-based checkpointing
private int recordCount = 0;

for (KinesisClientRecord record : records) {
    processRecord(record);
    if (++recordCount % 1000 == 0) {
        checkpointer.checkpoint();
    }
}

// ✅ BEST: Handle resharding properly
@Override
public void shardEnded(ShardEndedInput input) {
    // Checkpoint before shard closes
    input.checkpointer().checkpoint();
    // KCL automatically handles parent/child shard relationships
}
Enter fullscreen mode Exit fullscreen mode

MSK: Internal Topic Storage

# Kafka stores offsets in special topic: __consumer_offsets
# 50 partitions (default), compacted for efficiency

Offset Storage:
┌─────────────┬───────────┬───────────┬────────┐
│ Group       │ Topic     │ Partition │ Offset │
├─────────────┼───────────┼───────────┼────────┤
│ analytics   │ orders    │ 0         │ 12345  │
│ analytics   │ orders    │ 1         │ 67890  │
│ backup      │ orders    │ 0         │ 12340  │
└─────────────┴───────────┴───────────┴────────┘
Enter fullscreen mode Exit fullscreen mode

Kafka Commit Strategies

// 1. Auto-commit (simplest)
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
// Commits every 5 seconds automatically
// Risk: At-least-once (may reprocess on crash)

// 2. Manual sync commit (safest)
props.put("enable.auto.commit", "false");

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    processRecords(records);

    consumer.commitSync();  // Blocks until committed
}

// 3. Manual async commit (fastest)
processRecords(records);
consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        log.error("Commit failed", exception);
    }
});

// 4. Hybrid approach (best practice)
try {
    while (true) {
        processRecords(consumer.poll(100));
        consumer.commitAsync();  // Fast path
    }
} finally {
    consumer.commitSync();  // Ensure final commit
    consumer.close();
}
Enter fullscreen mode Exit fullscreen mode

Exactly-Once Semantics

// Kinesis: Application-level only
public void processRecord(Record record) {
    String messageId = record.sequenceNumber();

    if (dedupeTable.exists(messageId)) {
        return;  // Already processed
    }

    database.beginTransaction();
    try {
        processData(record.data());
        dedupeTable.insert(messageId);
        database.commit();
    } catch (Exception e) {
        database.rollback();
    }
}

// MSK: Native transactional support
Properties props = new Properties();
props.put("transactional.id", "my-transaction-" + instanceId);
props.put("enable.idempotence", "true");

producer.initTransactions();

try {
    producer.beginTransaction();
    producer.send(record1);
    producer.send(record2);
    producer.commitTransaction();  // Atomic
} catch (Exception e) {
    producer.abortTransaction();
}

// Kafka Streams exactly-once
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
          StreamsConfig.EXACTLY_ONCE_V2);
// Read → Process → Write is atomic
Enter fullscreen mode Exit fullscreen mode

Part 5: Error Handling Strategies

Kinesis Producer Errors

Handling Throughput Exceeded

import time
import random

def put_with_exponential_backoff(kinesis, stream, data, key, max_retries=5):
    """
    Most common error: ProvisionedThroughputExceededException
    Solution: Exponential backoff with jitter
    """
    for attempt in range(max_retries):
        try:
            return kinesis.put_record(
                StreamName=stream,
                Data=data,
                PartitionKey=key
            )
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceeded':
                if attempt == max_retries - 1:
                    send_to_dlq(data, key)
                    raise

                # Exponential backoff with jitter
                delay = min((2 ** attempt) + random.uniform(0, 1), 10)
                time.sleep(delay)
            else:
                # Non-retriable error
                send_to_dlq(data, key, str(e))
                raise
Enter fullscreen mode Exit fullscreen mode

Batch Error Handling

def put_records_with_retry(kinesis, stream, records):
    """Handle partial batch failures"""
    to_retry = records
    attempt = 0

    while to_retry and attempt < 5:
        response = kinesis.put_records(
            StreamName=stream,
            Records=[{'Data': r['data'], 'PartitionKey': r['key']} 
                     for r in to_retry]
        )

        if response['FailedRecordCount'] > 0:
            # Extract failed records
            to_retry = [
                to_retry[i] 
                for i, result in enumerate(response['Records'])
                if 'ErrorCode' in result
            ]
            attempt += 1
            time.sleep(2 ** attempt)
        else:
            break

    # Send remaining failures to DLQ
    for record in to_retry:
        send_to_dlq(record)
Enter fullscreen mode Exit fullscreen mode

Kinesis Consumer Errors

KCL Error Handling Pattern

public class RobustRecordProcessor implements ShardRecordProcessor {

    @Override
    public void processRecords(ProcessRecordsInput input) {
        List<KinesisClientRecord> failedRecords = new ArrayList<>();

        for (KinesisClientRecord record : input.records()) {
            try {
                processRecord(record);
            } catch (RetryableException e) {
                if (!retryWithBackoff(record, 3)) {
                    failedRecords.add(record);
                }
            } catch (NonRetryableException e) {
                sendToDLQ(record, e);
            }
        }

        // Handle persistent failures
        for (KinesisClientRecord record : failedRecords) {
            sendToDLQ(record, new Exception("Max retries exceeded"));
        }

        // Checkpoint after handling
        try {
            input.checkpointer().checkpoint();
        } catch (Exception e) {
            log.error("Checkpoint failed", e);
        }
    }

    private void sendToDLQ(KinesisClientRecord record, Exception error) {
        Map<String, Object> dlqRecord = Map.of(
            "originalData", record.data(),
            "sequenceNumber", record.sequenceNumber(),
            "error", error.getMessage(),
            "timestamp", Instant.now()
        );

        try {
            // Primary: Send to DLQ stream
            dlqKinesis.putRecord(
                dlqStreamName,
                serialize(dlqRecord),
                record.partitionKey()
            );
        } catch (Exception e) {
            // Fallback: Write to S3
            s3.putObject(dlqBucket, generateKey(), serialize(dlqRecord));
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Lambda Consumer Error Handling

def lambda_handler(event, context):
    """
    Use batch item failures for partial success
    """
    failed_record_ids = []

    for record in event['Records']:
        try:
            payload = base64.b64decode(record['kinesis']['data'])
            process_data(json.loads(payload))
        except Exception as e:
            logger.error(f"Processing failed: {e}")
            failed_record_ids.append({
                'itemIdentifier': record['kinesis']['sequenceNumber']
            })

    # Lambda retries only failed records
    return {
        'batchItemFailures': failed_record_ids
    }

# CloudFormation configuration
MyFunction:
  Type: AWS::Serverless::Function
  Properties:
    Events:
      Stream:
        Type: Kinesis
        Properties:
          Stream: !GetAtt MyStream.Arn
          MaximumRetryAttempts: 3
          BisectBatchOnFunctionError: true  # Split bad batches
          DestinationConfig:
            OnFailure:
              Type: SQS
              Destination: !GetAtt DLQ.Arn
Enter fullscreen mode Exit fullscreen mode

MSK Producer Errors

Comprehensive Error Handling

public class ResilientKafkaProducer {
    private final KafkaProducer<String, byte[]> producer;
    private final KafkaProducer<String, byte[]> dlqProducer;

    public void sendWithErrorHandling(String topic, String key, byte[] value) {
        ProducerRecord<String, byte[]> record = 
            new ProducerRecord<>(topic, key, value);

        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                handleError(record, exception);
            } else {
                metrics.recordSuccess(metadata);
            }
        });
    }

    private void handleError(ProducerRecord<String, byte[]> record, 
                            Exception exception) {
        if (exception instanceof RetriableException) {
            // Already retried per config, send to retry queue
            retryQueue.offer(record);
        } else if (exception instanceof RecordTooLargeException) {
            // Split message or send to alternative storage
            handleOversizedMessage(record);
        } else {
            // Non-retriable, send to DLQ
            sendToDLQ(record, exception);
        }
    }

    private void sendToDLQ(ProducerRecord<String, byte[]> record, 
                          Exception exception) {
        Map<String, Object> dlqData = Map.of(
            "originalTopic", record.topic(),
            "originalKey", record.key(),
            "originalValue", record.value(),
            "error", exception.getMessage(),
            "timestamp", Instant.now()
        );

        ProducerRecord<String, byte[]> dlqRecord = new ProducerRecord<>(
            record.topic() + ".DLQ",
            record.key(),
            serialize(dlqData)
        );

        dlqProducer.send(dlqRecord, (metadata, e) -> {
            if (e != null) {
                // DLQ failed, fallback to S3
                s3Client.putObject(dlqBucket, generateKey(), serialize(dlqData));
            }
        });
    }
}
Enter fullscreen mode Exit fullscreen mode

Idempotent Configuration

// Prevent duplicates on retry
Properties props = new Properties();
props.put("enable.idempotence", "true");
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 5);

// With idempotence enabled:
// ✅ Automatic deduplication
// ✅ Ordering preserved
// ✅ No duplicate messages on retry
Enter fullscreen mode Exit fullscreen mode

MSK Consumer Errors

Robust Consumer Pattern

public class ResilientKafkaConsumer {
    private final KafkaConsumer<String, byte[]> consumer;

    public void consume() {
        consumer.subscribe(Arrays.asList("topic"), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                // Commit before losing partitions
                consumer.commitSync();
                cleanupPartitions(partitions);
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                initializePartitions(partitions);
            }
        });

        try {
            while (true) {
                ConsumerRecords<String, byte[]> records = 
                    consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, byte[]> record : records) {
                    processWithRetry(record);
                }

                try {
                    consumer.commitSync();
                } catch (CommitFailedException e) {
                    // Rebalance occurred, partitions reassigned
                    log.warn("Commit failed, rebalance in progress");
                }
            }
        } catch (WakeupException e) {
            // Shutdown initiated
        } finally {
            consumer.commitSync();
            consumer.close();
        }
    }

    private void processWithRetry(ConsumerRecord<String, byte[]> record) {
        int maxRetries = 3;
        for (int i = 0; i < maxRetries; i++) {
            try {
                processRecord(record);
                return;
            } catch (RetryableException e) {
                if (i == maxRetries - 1) {
                    sendToDLQ(record, e);
                } else {
                    sleep(1000 * (i + 1));
                }
            } catch (NonRetryableException e) {
                sendToDLQ(record, e);
                return;
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Dead Letter Topic Pattern

private void sendToDLQ(ConsumerRecord<String, byte[]> record, Exception error) {
    Map<String, Object> dlqData = new HashMap<>();
    dlqData.put("originalTopic", record.topic());
    dlqData.put("originalPartition", record.partition());
    dlqData.put("originalOffset", record.offset());
    dlqData.put("originalKey", record.key());
    dlqData.put("originalValue", record.value());
    dlqData.put("errorMessage", error.getMessage());
    dlqData.put("errorStackTrace", getStackTrace(error));
    dlqData.put("timestamp", Instant.now());

    ProducerRecord<String, byte[]> dlqRecord = new ProducerRecord<>(
        record.topic() + ".DLQ",
        record.key(),
        serialize(dlqData)
    );

    // Add headers for filtering
    dlqRecord.headers().add("error.class", 
        error.getClass().getName().getBytes());
    dlqRecord.headers().add("error.time", 
        String.valueOf(System.currentTimeMillis()).getBytes());

    dlqProducer.send(dlqRecord);
}
Enter fullscreen mode Exit fullscreen mode

Part 6: Performance Tuning

Kinesis Performance Optimization

Producer Optimization

# ❌ BAD: Single puts (slow, expensive)
for record in records:
    kinesis.put_record(
        StreamName='myStream',
        Data=record,
        PartitionKey=get_key(record)
    )
# Result: ~100 records/sec, high cost

# ✅ GOOD: Batch puts
batch = []
for record in records:
    batch.append({'Data': record, 'PartitionKey': get_key(record)})
    if len(batch) >= 500:
        kinesis.put_records(StreamName='myStream', Records=batch)
        batch = []
# Result: 500-1000 records/sec

# ✅ BEST: KPL with aggregation
KinesisProducerConfiguration config = new KinesisProducerConfiguration()
    .setAggregationEnabled(true)
    .setCollectionMaxCount(500)
    .setRecordMaxBufferedTime(100);

KinesisProducer producer = new KinesisProducer(config);
// Result: 1000-5000 records/sec
Enter fullscreen mode Exit fullscreen mode

Performance Comparison

Method Throughput Latency Cost Efficiency
PutRecord 100 rec/s 50ms ★☆☆☆☆
PutRecords 1K rec/s 100ms ★★★☆☆
KPL 5K rec/s 150ms ★★★★★

Consumer Optimization

// Enhanced Fan-Out vs Standard

// Standard: Shared 2 MB/s
kinesis.register_stream_consumer(
    StreamARN=stream_arn,
    ConsumerName='standard-consumer'
)
// Cost: Included in shard cost
// Latency: ~200ms

// Enhanced Fan-Out: Dedicated 2 MB/s per consumer
RetrievalConfig efoConfig = configsBuilder
    .retrievalConfig()
    .retrievalSpecificConfig(
        new FanOutConfig(kinesisClient)
            .streamName(streamName)
            .applicationName(applicationName)
    );
// Cost: $0.015/consumer/shard/hour + $0.015/GB retrieved
// Latency: ~70ms
// When: Multiple consumers or low-latency requirements
Enter fullscreen mode Exit fullscreen mode

Checkpoint Optimization

// Checkpoint frequency vs cost (KCL 2.x)

// Every record: ~$100/month for 10 shards (DON'T DO THIS)
for (KinesisClientRecord record : records) {
    processRecord(record);
    checkpointer.checkpoint(record);
}

// Every minute: ~$5/month for 10 shards (RECOMMENDED)
if (System.currentTimeMillis() - lastCheckpoint > 60000) {
    checkpointer.checkpoint();
}

// Every 1000 records: Variable cost (ALTERNATIVE)
if (++recordCount % 1000 == 0) {
    checkpointer.checkpoint();
}

// KCL 3.x: Lower costs with On-Demand DynamoDB billing by default
Enter fullscreen mode Exit fullscreen mode

Scaling Strategy

# Calculate optimal shard count
def calculate_shards(write_mb_s, read_mb_s, consumers, use_efo):
    write_shards = math.ceil(write_mb_s / 1.0)

    if use_efo:
        # Each consumer gets dedicated 2 MB/s
        read_shards = math.ceil(read_mb_s / 2.0)
    else:
        # All consumers share 2 MB/s
        read_shards = math.ceil(read_mb_s * consumers / 2.0)

    # Add 20-30% buffer
    return max(write_shards, read_shards) * 1.3

# Auto-scaling setup (Provisioned mode)
autoscaling.put_scaling_policy(
    PolicyName='kinesis-scale-policy',
    ServiceNamespace='kinesis',
    ResourceId='stream/myStream',
    ScalableDimension='kinesis:stream:WriteCapacityUnits',
    PolicyType='TargetTrackingScaling',
    TargetTrackingScalingPolicyConfiguration={
        'TargetValue': 75.0,  # Target 75% utilization
        'ScaleInCooldown': 300,
        'ScaleOutCooldown': 60
    }
)

# Or use On-Demand for variable loads
kinesis.create_stream(
    StreamName='variable-stream',
    StreamModeDetails={'StreamMode': 'ON_DEMAND'}
)
# Auto-scales to 4 MB/s per shard
# Default limit: 200 MB/s (can request increase)
# Automatically adds/removes shards
Enter fullscreen mode Exit fullscreen mode

Partition Key Best Practices

# ✅ GOOD: High-cardinality keys
import hashlib

# Option 1: Hash user ID for even distribution
partition_key = hashlib.md5(user_id.encode()).hexdigest()

# Option 2: Combine attributes for better distribution
partition_key = f"{user_id}-{timestamp % 100}"

# Option 3: Random for maximum distribution (loses ordering)
partition_key = str(uuid.uuid4())

# ❌ BAD: Low-cardinality keys create hot shards
partition_key = "constant"  # All to one shard
partition_key = region  # Only 3-4 values
partition_key = status  # "active", "inactive" only
Enter fullscreen mode Exit fullscreen mode

MSK Performance Optimization

Broker Sizing Guide

# Throughput guide per instance type
BROKER_CAPACITY = {
    'kafka.m5.large': {
        'throughput_mb_s': 50,
        'partitions': 500,
        'cost_per_hour': 0.21
    },
    'kafka.m5.xlarge': {
        'throughput_mb_s': 100,
        'partitions': 1000,
        'cost_per_hour': 0.42
    },
    'kafka.m5.2xlarge': {
        'throughput_mb_s': 200,
        'partitions': 2000,
        'cost_per_hour': 0.84
    },
    'kafka.m5.4xlarge': {
        'throughput_mb_s': 400,
        'partitions': 4000,
        'cost_per_hour': 1.68
    }
}

def calculate_broker_size(target_throughput, partition_count):
    for instance, capacity in BROKER_CAPACITY.items():
        brokers_needed = max(
            3,  # Minimum for HA
            math.ceil(target_throughput / capacity['throughput_mb_s']),
            math.ceil(partition_count / capacity['partitions'])
        )

        if brokers_needed <= 6:  # Reasonable cluster size
            return {
                'instance_type': instance,
                'broker_count': brokers_needed,
                'monthly_cost': brokers_needed * capacity['cost_per_hour'] * 730
            }
Enter fullscreen mode Exit fullscreen mode

MSK Serverless Performance

# MSK Serverless auto-scales but has characteristics:
MSK_SERVERLESS = {
    'throughput': {
        'min': '1 MB/s',
        'max': '200 MB/s',
        'scaling': 'automatic'
    },
    'retention': {
        'max': '365 days'
    },
    'pricing': {
        'ingress': '$0.10/GB',
        'egress': '$0.05/GB',
        'storage': '$0.10/GB-month'
    },
    'best_for': [
        'Variable workloads',
        'Development/testing',
        'Unpredictable traffic',
        'Cost optimization for < 40 MB/s'
    ]
}
Enter fullscreen mode Exit fullscreen mode

Producer Tuning

// High-throughput configuration
Properties props = new Properties();

// Batching (critical for performance)
props.put("batch.size", 65536);  // 64 KB
props.put("linger.ms", 10);  // Wait 10ms for more records

// Compression (2-4x improvement)
props.put("compression.type", "lz4");  // Fast
// Or "zstd" for better compression (more CPU)

// Buffer memory
props.put("buffer.memory", 67108864);  // 64 MB

// In-flight requests
props.put("max.in.flight.requests.per.connection", 5);

// Durability vs speed tradeoff
props.put("acks", "1");  // Fast: leader only
// Or "all" for safety with idempotence
props.put("enable.idempotence", "true");

Performance Results:
┌──────────────────┬─────────────┬─────────┐
 Configuration     Throughput   Latency 
├──────────────────┼─────────────┼─────────┤
 Default           10K msg/s    10ms    
 + Batching        50K msg/s    20ms    
 + Compression     100K msg/s   15ms    
 + acks=1          150K msg/s   5ms     
└──────────────────┴─────────────┴─────────┘
Enter fullscreen mode Exit fullscreen mode

Consumer Tuning

Properties props = new Properties();

// Fetch configuration (critical for throughput)
props.put("fetch.min.bytes", 50000);  // 50 KB minimum
props.put("fetch.max.wait.ms", 500);  // Max wait time
props.put("max.partition.fetch.bytes", 1048576);  // 1 MB per partition
props.put("fetch.max.bytes", 52428800);  // 50 MB per fetch

// Poll configuration
props.put("max.poll.records", 5000);  // More per poll

// Session timeout
props.put("session.timeout.ms", 30000);
props.put("max.poll.interval.ms", 300000);  // 5 minutes
// Ensure: max.poll.interval.ms > (max.poll.records × processing_time)

// Partition assignment
props.put("partition.assignment.strategy", 
    "org.apache.kafka.clients.consumer.StickyAssignor");
// Minimizes rebalancing disruption

Tuning Results:
┌──────────────────┬─────────────┬─────────┐
 Configuration     Throughput   Latency 
├──────────────────┼─────────────┼─────────┤
 Default           50K msg/s    100ms   
 + Large fetch     200K msg/s   200ms   
 + High poll recs  500K msg/s   300ms   
└──────────────────┴─────────────┴─────────┘
Enter fullscreen mode Exit fullscreen mode

Parallel Processing

// Scale consumers to match partitions
// 12 partitions → up to 12 consumers for max parallelism

// Multiple consumer instances (recommended)
public static void main(String[] args) {
    int numConsumers = 4;
    ExecutorService executor = Executors.newFixedThreadPool(numConsumers);

    for (int i = 0; i < numConsumers; i++) {
        executor.submit(new ConsumerWorker(i));
    }
}

// Kafka automatically distributes partitions
// 12 partitions ÷ 4 consumers = 3 partitions each

// Alternative: Single consumer, multi-threaded processing
ExecutorService processingPool = Executors.newFixedThreadPool(10);

while (true) {
    ConsumerRecords<String, byte[]> records = consumer.poll(100);
    List<Future<?>> futures = new ArrayList<>();

    for (ConsumerRecord<String, byte[]> record : records) {
        futures.add(processingPool.submit(() -> processRecord(record)));
    }

    // Wait for all processing
    for (Future<?> future : futures) {
        future.get();
    }

    consumer.commitSync();
}
Enter fullscreen mode Exit fullscreen mode

Topic Configuration

# Partition count calculation
# partitions = max(
#   target_throughput / producer_throughput_per_partition,
#   target_throughput / consumer_throughput_per_partition
# )

# Example: 100 MB/s target
# Producer: 10 MB/s per partition
# Consumer: 5 MB/s per partition
# Result: max(100/10, 100/5) = 20 partitions

kafka-topics.sh --create \
  --bootstrap-server broker:9092 \
  --topic high-volume \
  --partitions 20 \
  --replication-factor 3 \
  --config min.insync.replicas=2 \
  --config compression.type=lz4 \
  --config retention.ms=604800000  # 7 days

# Guidelines:
# - Start with: partitions = 2-3x broker count
# - Max: ~4000 partitions per broker
# - Use powers of 2 for easier scaling

# ⚠️ WARNING: Adding partitions later breaks ordering
# Existing keys may route to different partitions
# Plan partition count carefully upfront
Enter fullscreen mode Exit fullscreen mode

Storage Optimization

# Use compression to reduce storage
kafka-configs.sh --alter \
  --entity-type topics \
  --entity-name my-topic \
  --add-config compression.type=lz4

# Compression ratios:
# - lz4: 2-3x (fast)
# - zstd: 4-6x (better compression)
# - gzip: 3-4x (slower)

# Log compaction for state topics
kafka-configs.sh --alter \
  --entity-type topics \
  --entity-name user-state \
  --add-config cleanup.policy=compact

# Tiered storage (MSK Provisioned only)
# Unlimited retention with 90%+ cost savings
aws kafka update-storage \
  --cluster-arn arn:aws:kafka:region:account:cluster/name/uuid \
  --storage-mode TIERED

# Hot tier: Primary brokers (low latency)
# Cold tier: S3 (cost-effective, higher latency)
Enter fullscreen mode Exit fullscreen mode

Monitoring for Performance

Kinesis Key Metrics

KINESIS_METRICS = {
    'IncomingBytes': {
        'threshold': 800000,  # 80% of 1 MB/s per shard
        'action': 'Scale up shards'
    },
    'GetRecords.IteratorAgeMilliseconds': {
        'threshold': 300000,  # 5 minutes
        'action': 'Add consumers or EFO'
    },
    'WriteProvisionedThroughputExceeded': {
        'threshold': 1,
        'action': 'Immediate scaling needed'
    },
    'ReadProvisionedThroughputExceeded': {
        'threshold': 1,
        'action': 'Enable EFO or add shards'
    },
    'PutRecord.Success': {
        'threshold': 95,  # Percent
        'action': 'Investigate errors if below'
    }
}
Enter fullscreen mode Exit fullscreen mode

MSK Key Metrics

MSK_METRICS = {
    'CpuUser': {
        'threshold': 70,  # Percent
        'action': 'Scale up brokers or optimize consumers'
    },
    'NetworkProcessorAvgIdlePercent': {
        'threshold': 30,  # Below 30% = overloaded
        'action': 'Add brokers or upgrade instance type'
    },
    'UnderReplicatedPartitions': {
        'threshold': 0,  # Should always be 0
        'action': 'Critical: investigate immediately'
    },
    'OfflinePartitionsCount': {
        'threshold': 0,  # Should always be 0
        'action': 'Critical: data unavailable'
    },
    'ConsumerLag': {
        'threshold': 10000,  # Messages
        'action': 'Add consumers or optimize processing'
    }
}
Enter fullscreen mode Exit fullscreen mode

Part 7: Production Best Practices

Kinesis Best Practices

1. Stream Design

# ✅ DO: Calculate shards with buffer
def calculate_shards_with_buffer(write_mb_s, read_mb_s, consumers):
    base_shards = calculate_shards(write_mb_s, read_mb_s, consumers)
    return math.ceil(base_shards * 1.3)  # 30% buffer for growth

# ✅ DO: Use high-cardinality partition keys
import hashlib

partition_key = hashlib.md5(user_id.encode()).hexdigest()
# or
partition_key = f"{user_id}-{timestamp % 100}"

# ❌ DON'T: Use constant or low-cardinality keys
partition_key = "constant"  # Creates hot shards
partition_key = region  # Only 3-4 values

# ✅ DO: Handle resharding in consumers
@Override
public void shardEnded(ShardEndedInput input) {
    input.checkpointer().checkpoint();
    // KCL automatically handles parent/child relationships
}
Enter fullscreen mode Exit fullscreen mode

2. Producer Best Practices

// ✅ DO: Use KPL for high throughput
KinesisProducerConfiguration config = new KinesisProducerConfiguration()
    .setAggregationEnabled(true)
    .setCollectionMaxCount(500)
    .setRecordMaxBufferedTime(100)
    .setMaxConnections(24);

// ✅ DO: Implement exponential backoff
// ✅ DO: Send failures to DLQ
// ✅ DO: Monitor shard-level metrics

// ❌ DON'T: Use single PutRecord for high volume
// ❌ DON'T: Ignore ProvisionedThroughputExceeded errors
Enter fullscreen mode Exit fullscreen mode

3. Consumer Best Practices

// ✅ DO: Use Enhanced Fan-Out for multiple consumers
RetrievalConfig efoConfig = configsBuilder
    .retrievalConfig()
    .retrievalSpecificConfig(new FanOutConfig(kinesisClient));

// ✅ DO: Checkpoint periodically (not per record)
if (System.currentTimeMillis() - lastCheckpoint > 60000) {
    checkpointer.checkpoint();
}

// ✅ DO: Consider KCL 3.x for reduced DynamoDB costs
// - On-Demand billing by default
// - Better performance
// - Note: Breaking changes from KCL 2.x

// ❌ DON'T: Checkpoint every record (expensive)
// ❌ DON'T: Process longer than shard iterator expiration
Enter fullscreen mode Exit fullscreen mode

4. Operations

# ✅ DO: Choose right mode for your workload
# Provisioned: Predictable, steady throughput
# On-Demand: Variable, unpredictable loads (up to 200 MB/s)

# Provisioned with auto-scaling
autoscaling.put_scaling_policy(
    PolicyName='scale-policy',
    TargetValue=75.0,  # Target 75% utilization
    ScaleInCooldown=300,
    ScaleOutCooldown=60
)

# On-Demand mode
kinesis.create_stream(
    StreamModeDetails={'StreamMode': 'ON_DEMAND'}
)

# ✅ DO: Set appropriate retention
kinesis.increase_stream_retention_period(
    StreamName='audit-stream',
    RetentionPeriodHours=168  # 7 days
)

# ✅ DO: Use VPC endpoints to reduce costs and latency
ec2.create_vpc_endpoint(
    ServiceName='com.amazonaws.region.kinesis-streams',
    VpcEndpointType='Interface'
)

# ❌ DON'T: Use maximum retention for all streams
# ❌ DON'T: Under-provision for cost savings (leads to data loss)
Enter fullscreen mode Exit fullscreen mode

5. Security

# ✅ DO: Enable encryption
kinesis.start_stream_encryption(
    StreamName='sensitive-stream',
    EncryptionType='KMS',
    KeyId='arn:aws:kms:...'
)

# ✅ DO: Use VPC endpoints for private connectivity
ec2.create_vpc_endpoint(
    ServiceName='com.amazonaws.region.kinesis-streams',
    VpcEndpointType='Interface',
    SubnetIds=private_subnets
)

# ✅ DO: Apply least privilege IAM policies
{
    "Effect": "Allow",
    "Action": ["kinesis:PutRecord", "kinesis:PutRecords"],
    "Resource": "arn:aws:kinesis:region:account:stream/myStream"
}

# ✅ DO: Enable CloudTrail for audit logging
Enter fullscreen mode Exit fullscreen mode

MSK Best Practices

1. Cluster Design

# ✅ DO: Deploy 3+ brokers across 3 AZs
MSKCluster:
  BrokerNodeGroupInfo:
    InstanceType: kafka.m5.xlarge
    ClientSubnets:
      - subnet-az1
      - subnet-az2
      - subnet-az3
    BrokerAZDistribution: DEFAULT  # Even distribution

# ✅ DO: Right-size partitions (plan ahead!)
# Formula: partitions = 2-3x broker count minimum
# Consider: target throughput ÷ throughput per partition

kafka-topics.sh --create \
  --partitions 9 \  # 3 brokers × 3
  --replication-factor 3 \
  --config min.insync.replicas=2

# ⚠️ WARNING: Adding partitions breaks key ordering
# Existing messages stay in old partitions
# New messages with same key may go to different partition

# ❌ DON'T: Create too many partitions (overhead)
# ❌ DON'T: Use single-AZ deployment (no HA)
Enter fullscreen mode Exit fullscreen mode

2. Deployment Mode Selection

# Decision tree:
Serverless:
  Use when:
    - Variable/unpredictable workload
    - < 40 MB/s average throughput
    - Want zero operational overhead
    - Development/testing
  Limitations:
    - Max 365 day retention
    - Limited Kafka configurations
    - No tiered storage

Provisioned:
  Use when:
    - Sustained high throughput
    - Need custom configurations
    - Require > 365 day retention
    - Use tiered storage
    - Need specific Kafka versions
Enter fullscreen mode Exit fullscreen mode

3. Producer Best Practices

// ✅ DO: Enable idempotence for reliability
Properties props = new Properties();
props.put("enable.idempotence", "true");
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);

// ✅ DO: Optimize for throughput
props.put("batch.size", 65536);  // 64 KB
props.put("linger.ms", 10);
props.put("compression.type", "lz4");

// ✅ DO: Use async sends with callbacks
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        handleError(record, exception);
    }
});

// ❌ DON'T: Use acks=0 in production
// ❌ DON'T: Use synchronous sends (slow)
Enter fullscreen mode Exit fullscreen mode

4. Consumer Best Practices

// ✅ DO: Scale consumers to partition count
// 12 partitions → up to 12 consumers for best parallelism

// ✅ DO: Use manual offset commits
props.put("enable.auto.commit", "false");

while (true) {
    ConsumerRecords<String, byte[]> records = consumer.poll(100);
    processRecords(records);
    consumer.commitSync();
}

// ✅ DO: Handle rebalances gracefully
consumer.subscribe(topics, new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        consumer.commitSync();  // Commit before losing partitions
    }
});

// ✅ DO: Implement proper error handling with DLQ
for (ConsumerRecord<String, byte[]> record : records) {
    try {
        processRecord(record);
    } catch (RetryableException e) {
        retryWithBackoff(record);
    } catch (NonRetryableException e) {
        sendToDLQ(record, e);
    }
}

// ❌ DON'T: Create more consumers than partitions
// ❌ DON'T: Block in poll loop (causes rebalancing)
Enter fullscreen mode Exit fullscreen mode

5. Operations

# ✅ DO: Monitor critical metrics
CRITICAL_METRICS = [
    'CpuUser',  # < 70%
    'UnderReplicatedPartitions',  # = 0
    'OfflinePartitionsCount',  # = 0
    'NetworkProcessorAvgIdlePercent'  # > 30%
]

# ✅ DO: Set up comprehensive alarms
cloudwatch.put_metric_alarm(
    AlarmName='msk-under-replicated',
    MetricName='UnderReplicatedPartitions',
    Threshold=0,
    ComparisonOperator='GreaterThanThreshold'
)

# ✅ DO: Plan for capacity with headroom
TARGET_UTILIZATION = {
    'cpu': 60,     # 40% headroom
    'network': 70,  # 30% headroom
    'storage': 75   # 25% headroom
}

# ✅ DO: Use tiered storage for long retention
aws kafka update-storage \
  --cluster-arn arn:... \
  --storage-mode TIERED
# 90%+ cost savings for cold data

# ❌ DON'T: Make multiple changes simultaneously
# ❌ DON'T: Skip capacity planning
Enter fullscreen mode Exit fullscreen mode

6. Security

# ✅ DO: Enable authentication
MSKCluster:
  ClientAuthentication:
    Sasl:
      Iam:
        Enabled: true  # Easiest with AWS services
      Scram:
        Enabled: true  # For non-AWS clients
    Tls:
      CertificateAuthorityArnList:
        - !Ref PrivateCA

# ✅ DO: Enable encryption
EncryptionInfo:
  EncryptionAtRest:
    DataVolumeKMSKeyId: !Ref KMSKey
  EncryptionInTransit:
    ClientBroker: TLS
    InCluster: true

# ✅ DO: Use VPC private connectivity
# MSK supports multi-VPC access via PrivateLink
ConnectivityInfo:
  VpcConnectivity:
    ClientAuthentication:
      Sasl:
        Iam:
          Enabled: true

# ✅ DO: Use security groups restrictively
SecurityGroupIngress:
  - IpProtocol: tcp
    FromPort: 9094  # TLS only
    ToPort: 9094
    SourceSecurityGroupId: !Ref AppSecurityGroup

# ❌ DON'T: Use PLAINTEXT in production
# ❌ DON'T: Allow 0.0.0.0/0 access
Enter fullscreen mode Exit fullscreen mode

7. Topic Configuration

# ✅ DO: Set appropriate retention
kafka-configs.sh --alter \
  --entity-type topics \
  --entity-name production-topic \
  --add-config retention.ms=604800000  # 7 days

# ✅ DO: Use log compaction for state topics
kafka-configs.sh --alter \
  --entity-type topics \
  --entity-name user-state \
  --add-config cleanup.policy=compact

# ✅ DO: Configure min.insync.replicas
kafka-configs.sh --alter \
  --entity-type topics \
  --entity-name critical-topic \
  --add-config min.insync.replicas=2

# ✅ DO: Plan partition count carefully upfront
# Adding partitions later breaks ordering guarantees

# ❌ DON'T: Use same config for all topics
# ❌ DON'T: Set retention longer than needed (costs)
Enter fullscreen mode Exit fullscreen mode

8. Multi-Region/Multi-VPC Patterns

# Multi-VPC Connectivity:
Options:
  VPC Peering:
    Use: Simple, low latency
    Limitation: Same region only

  Transit Gateway:
    Use: Hub-and-spoke, multiple VPCs
    Cost: $0.05/GB data transfer

  PrivateLink (MSK Multi-VPC):
    Use: Managed, secure cross-VPC/account
    Cost: $0.01/GB + $0.01/hour per ENI

# Multi-Region:
Active-Passive:
  Tool: MirrorMaker 2.0
  Use: Disaster recovery

Active-Active:
  Tool: Custom dual-write or Confluent Cluster Linking
  Use: Global applications
  Complexity: High (conflict resolution needed)
Enter fullscreen mode Exit fullscreen mode

Part 8: Making the Right Choice

Enhanced Decision Matrix

Use this framework to choose between Kinesis, MSK Serverless, and MSK Provisioned:

Choose Kinesis if you answer "Yes" to 3+ questions:

Questions:
  - Is this your first streaming project?
  - Do you primarily use AWS services?
  - Is operational simplicity more important than flexibility?
  - Is your throughput < 10 MB/s per stream?
  - Do you have limited Kafka expertise on the team?
  - Are you building a real-time analytics pipeline?
  - Do you need Lambda integration?
  - Is predictable pricing important?
  - Do you need < 365 day retention?
Enter fullscreen mode Exit fullscreen mode

Choose MSK Serverless if you answer "Yes" to 3+ questions:

Questions:
  - Do you need Kafka APIs but not operational overhead?
  - Is throughput variable/unpredictable (< 40 MB/s avg)?
  - Are you migrating from Kafka but want simplicity?
  - Do you want pay-per-use pricing?
  - Is this for development/testing environments?
  - Do you need Kafka Connect/Streams?
  - Can you work within 365-day retention limit?
  - Do you want automatic scaling?
Enter fullscreen mode Exit fullscreen mode

Choose MSK Provisioned if you answer "Yes" to 3+ questions:

Questions:
  - Are you migrating from existing Kafka?
  - Do you need sustained throughput > 40 MB/s?
  - Is retention > 1 year required?
  - Do you have Kafka expertise in-house?
  - Are you building event-driven microservices?
  - Do you need exactly-once semantics?
  - Do you require custom Kafka configurations?
  - Is this part of a multi-cloud strategy?
  - Do you need tiered storage?
Enter fullscreen mode Exit fullscreen mode

Decision Flow Chart

START: Need stream processing on AWS
│
├─ Need Kafka APIs?
│  ├─ NO → Kinesis
│  └─ YES → Need operational control?
│      ├─ NO → MSK Serverless
│      └─ YES → Need > 365 day retention or > 200 MB/s?
│          ├─ NO → MSK Serverless
│          └─ YES → MSK Provisioned

Throughput-Based Guidance:
  < 10 MB/s:     Kinesis (simplest) or MSK Serverless
  10-40 MB/s:    Kinesis On-Demand or MSK Serverless
  40-200 MB/s:   MSK Serverless or MSK Provisioned
  > 200 MB/s:    MSK Provisioned

Cost-Based Guidance:
  Kinesis:
    - Predictable: Provisioned mode
    - Variable: On-Demand mode

  MSK Serverless:
    - Pay per GB (good for < 40 MB/s average)

  MSK Provisioned:
    - Fixed broker costs (better for high sustained throughput)
Enter fullscreen mode Exit fullscreen mode

Cost Comparison (Corrected)

Kinesis Pricing Example

# Scenario: 5 MB/s write, 10 MB/s read, 2 consumers

# PROVISIONED MODE
# Shards needed: 5 write + (10×2/2 read) = max(5, 10) = 10 shards
shards = 10
hours_per_month = 730

# Shard cost
shard_cost = shards * 0.015 * hours_per_month  # $109.50

# PUT cost (corrected calculation)
# 5 MB/s = 5,120 KB/s
# Each PUT unit = 25 KB
# PUT units per second = 5,120 / 25 = 204.8
# PUT units per month = 204.8 * 3600 * 730 = 537,734,400
# Cost per 1M PUT units = $0.014
put_cost = (537.7 * 0.014)  # $7.53

# Enhanced Fan-Out (if needed for 2 consumers)
efo_cost = shards * 2 * 0.015 * hours_per_month  # $219.00
# Plus data retrieval
data_retrieved_gb = 10 * 0.001 * 3600 * 730  # 26,280 GB
efo_data_cost = data_retrieved_gb * 0.015  # $394.20

total_provisioned = shard_cost + put_cost + efo_cost + efo_data_cost
# $109.50 + $7.53 + $219.00 + $394.20 = $730.23/month

# ON-DEMAND MODE
# Automatically scales, pay per GB ingested
data_ingested_gb = 5 * 0.001 * 3600 * 730  # 13,140 GB
ondemand_write_cost = data_ingested_gb * 0.04  # $525.60

# Data retrieval cost (shared mode, no EFO)
data_read_gb = 10 * 0.001 * 3600 * 730  # 26,280 GB
ondemand_read_cost = data_read_gb * 0.013  # $341.64

total_ondemand = ondemand_write_cost + ondemand_read_cost
# $525.60 + $341.64 = $867.24/month

# Comparison:
# Provisioned (with EFO): $730.23/month
# On-Demand: $867.24/month
# Provisioned is cheaper for sustained 5 MB/s
Enter fullscreen mode Exit fullscreen mode

MSK Pricing Example

# Scenario: 3 brokers (kafka.m5.xlarge), 1TB storage each

# MSK PROVISIONED
broker_cost = 3 * 0.42 * 730  # $919.80
storage_cost = 3 * 1000 * 0.10  # $300.00
total_msk_provisioned = broker_cost + storage_cost  # $1,219.80/month

# MSK SERVERLESS (for same 5 MB/s workload)
# Ingress: 5 MB/s × 3600s × 730h = 13,140 GB
ingress_cost = 13140 * 0.10  # $1,314.00

# Egress: 10 MB/s × 3600s × 730h = 26,280 GB
egress_cost = 26280 * 0.05  # $1,314.00

# Storage (assuming 1TB average)
storage_serverless = 1000 * 0.10  # $100.00

total_msk_serverless = ingress_cost + egress_cost + storage_serverless
# $1,314.00 + $1,314.00 + $100.00 = $2,728.00/month

# For this workload:
# Kinesis Provisioned: $730/month (cheapest for this pattern)
# MSK Provisioned: $1,220/month
# MSK Serverless: $2,728/month (expensive for sustained high throughput)

# MSK Serverless is cheaper for:
# - Low average throughput (< 5 MB/s)
# - Variable workloads (spikes with low average)
# - Dev/test environments
Enter fullscreen mode Exit fullscreen mode

Migration Considerations

Kinesis to MSK

# When to migrate:
triggers = [
    "Need Kafka ecosystem tools (Connect, Streams, KSQL)",
    "Outgrowing Kinesis limits (365 day retention)",
    "Cost optimization for sustained high throughput (> 20 MB/s)",
    "Multi-cloud requirements",
    "Advanced processing with exactly-once semantics",
    "Need custom Kafka configurations"
]

# Migration approach:
steps = [
    "1. Choose MSK deployment (Serverless vs Provisioned)",
    "2. Set up MSK cluster with appropriate sizing",
    "3. Dual-write (Kinesis + MSK) - validate consistency",
    "4. Migrate consumers to MSK incrementally",
    "5. Validate data integrity and performance",
    "6. Cutover producers to MSK",
    "7. Deprecate Kinesis stream after retention period"
]

# Tools:
# - AWS Lambda for dual-write orchestration
# - Kinesis Data Firehose → MSK (via Lambda)
# - Custom application with both SDKs
Enter fullscreen mode Exit fullscreen mode

MSK to Kinesis

# When to migrate:
triggers = [
    "Reducing operational overhead",
    "Simplifying architecture",
    "Throughput decreased significantly (< 10 MB/s)",
    "Team lacks Kafka expertise",
    "Want deeper AWS integration (Lambda, Firehose)",
    "Don't need > 365 day retention",
    "Cost optimization for variable loads"
]

# Migration approach:
steps = [
    "1. Create Kinesis streams with appropriate shards",
    "2. Set up MirrorMaker 2.0 (Kafka → Kinesis)",
    "    - Or custom application with both SDKs",
    "3. Migrate consumers to Kinesis incrementally",
    "4. Validate data and monitor lag",
    "5. Cutover producers to Kinesis",
    "6. Decommission MSK cluster after retention"
]

# Considerations:
# - Kinesis max 1 MB record size vs Kafka's configurable limit
# - Partition key strategy may need adjustment
# - No exactly-once semantics in Kinesis
Enter fullscreen mode Exit fullscreen mode

Production Readiness Checklist

Kinesis Production Checklist

Infrastructure:
  - [ ] Calculated appropriate shard count with 30% buffer
  - [ ] Chose deployment mode (Provisioned vs On-Demand)
  - [ ] Enabled Enhanced Fan-Out (if multiple consumers)
  - [ ] Configured auto-scaling (if Provisioned mode)
  - [ ] Set appropriate retention period (24h-365d)
  - [ ] Enabled encryption with KMS
  - [ ] Set up VPC endpoints for private connectivity

Code:
  - [ ] Implemented exponential backoff for retries
  - [ ] Created dead letter queue/stream for failed records
  - [ ] Optimized checkpoint frequency (time or count-based)
  - [ ] Used KPL for high throughput or batch APIs
  - [ ] Implemented proper error handling
  - [ ] Used high-cardinality partition keys
  - [ ] Handled resharding in consumers (shardEnded callback)

Operations:
  - [ ] Set up CloudWatch alarms:
        - IncomingBytes (80% threshold)
        - WriteProvisionedThroughputExceeded
        - GetRecords.IteratorAgeMilliseconds
        - PutRecord.Success rate
  - [ ] Created monitoring dashboard
  - [ ] Documented partition key strategy
  - [ ] Configured DynamoDB capacity (for KCL)
  - [ ] Tested failure scenarios
  - [ ] Load tested at 2x expected peak

Security:
  - [ ] Applied least privilege IAM policies
  - [ ] Enabled CloudTrail logging
  - [ ] Reviewed network security groups
  - [ ] Tested VPC endpoint connectivity
  - [ ] Enabled KMS encryption for sensitive data
Enter fullscreen mode Exit fullscreen mode

MSK Serverless Production Checklist

Infrastructure:
  - [ ] Deployed in private subnets across 3 AZs
  - [ ] Calculated expected throughput (< 200 MB/s)
  - [ ] Set appropriate retention (< 365 days)
  - [ ] Enabled IAM authentication

Configuration:
  - [ ] Created topics with appropriate partition count
  - [ ] Set min.insync.replicas=2 for critical topics
  - [ ] Configured compression (lz4 or zstd)
  - [ ] Set appropriate retention policy per topic

Code:
  - [ ] Enabled idempotent producers
  - [ ] Implemented manual offset commits
  - [ ] Set up consumer rebalance listeners
  - [ ] Created dead letter topic pattern
  - [ ] Proper error handling with retries
  - [ ] Optimized batch.size and linger.ms

Operations:
  - [ ] Set up CloudWatch alarms:
        - Cluster metrics available
        - ConsumerLag monitoring
  - [ ] Created monitoring dashboard
  - [ ] Documented consumer group strategy
  - [ ] Load tested at expected peak

Security:
  - [ ] Enabled IAM authentication
  - [ ] Enabled encryption in transit (TLS)
  - [ ] Configured security groups (restrictive)
  - [ ] Applied least privilege IAM policies
  - [ ] Set up VPC Flow Logs
Enter fullscreen mode Exit fullscreen mode

MSK Provisioned Production Checklist

Infrastructure:
  - [ ] Deployed 3+ brokers across 3 AZs
  - [ ] Right-sized broker instances (capacity planning)
  - [ ] Calculated partition count (plan for growth)
  - [ ] Configured proper storage (EBS with appropriate IOPS)
  - [ ] Set up VPC with private subnets
  - [ ] Considered tiered storage for long retention

Configuration:
  - [ ] Configured min.insync.replicas=2
  - [ ] Set appropriate topic retention
  - [ ] Enabled compression
  - [ ] Configured log segments properly
  - [ ] Set up log compaction (if needed)

Code:
  - [ ] Enabled idempotent producers
  - [ ] Implemented manual offset commits
  - [ ] Set up consumer rebalance listeners
  - [ ] Created dead letter topic pattern
  - [ ] Proper error handling with retries
  - [ ] Optimized producer/consumer configs

Operations:
  - [ ] Set up comprehensive CloudWatch alarms:
        - CpuUser (< 70%)
        - NetworkProcessorAvgIdlePercent (> 30%)
        - UnderReplicatedPartitions (= 0)
        - OfflinePartitionsCount (= 0)
        - ConsumerLag
  - [ ] Configured monitoring dashboard
  - [ ] Documented consumer group strategy
  - [ ] Created disaster recovery plan
  - [ ] Capacity planning with 40% headroom
  - [ ] Tested broker failure scenarios
  - [ ] Load tested at 2x expected peak

Security:
  - [ ] Enabled authentication (IAM and/or SASL/SCRAM)
  - [ ] Enabled encryption (at rest and in transit)
  - [ ] Configured security groups (restrictive)
  - [ ] Set up VPC Flow Logs
  - [ ] Applied least privilege IAM policies
  - [ ] Considered multi-VPC connectivity (if needed)
  - [ ] Set up AWS PrivateLink (for cross-VPC access)
Enter fullscreen mode Exit fullscreen mode

Conclusion

Key Takeaways

Amazon Kinesis is the right choice when you:

  • Want AWS-native simplicity with minimal operations
  • Need tight integration with Lambda, Firehose, and other AWS services
  • Have moderate, predictable throughput requirements (< 10 MB/s)
  • Value quick time-to-market over flexibility
  • Have limited Kafka expertise
  • Can work within 365-day retention limits

Amazon MSK Serverless is the right choice when you:

  • Need Kafka APIs without operational overhead
  • Have variable/unpredictable workloads (< 40 MB/s average)
  • Want automatic scaling with pay-per-use pricing
  • Need Kafka ecosystem tools (Connect, Streams)
  • Can work within 365-day retention limits
  • Want zero cluster management

Amazon MSK Provisioned is the right choice when you:

  • Need the full Kafka ecosystem (Connect, Streams, KSQL)
  • Require advanced features like exactly-once semantics
  • Have sustained high throughput (> 40 MB/s)
  • Need retention > 365 days or unlimited (with tiered storage)
  • Are migrating existing Kafka workloads
  • Need multi-cloud or hybrid capabilities
  • Require custom Kafka configurations

Final Recommendations

  1. Start Simple: If unsure, start with Kinesis for pure AWS workloads or MSK Serverless if you need Kafka APIs. Both offer minimal operational overhead. You can always migrate to MSK Provisioned later if you outgrow them.

  2. Think Long-Term: Consider your team's expertise, future requirements, and growth trajectory. MSK Provisioned requires more upfront investment but offers maximum flexibility.

  3. Consider MSK Serverless: Often overlooked, MSK Serverless bridges the gap between Kinesis simplicity and Kafka capabilities. It's perfect for:

    • Variable workloads
    • Development/testing
    • Medium throughput with Kafka requirements
  4. Don't Optimize Prematurely: All three options can handle significant scale. Focus on getting your streaming pipeline working correctly first, then optimize for performance and cost.

  5. Monitor Continuously: Regardless of choice, implement comprehensive monitoring from day one. Stream processing issues compound quickly if undetected.

  6. Test Thoroughly: Test error scenarios, failover, partition rebalancing, and performance under load in non-production environments before going live.

  7. Plan Partition Count: For MSK, plan partition count carefully upfront. Adding partitions later breaks ordering guarantees for existing keys.

Quick Reference Guide

Choose Kinesis when:
  Throughput: < 10 MB/s
  Retention: < 365 days
  Team: Limited streaming expertise
  Priority: Time to market, AWS integration
  Cost: Predictable, shard-based pricing

Choose MSK Serverless when:
  Throughput: Variable, < 40 MB/s average
  Retention: < 365 days
  Team: Kafka knowledge but want less ops
  Priority: Kafka APIs + auto-scaling
  Cost: Pay-per-use (good for variable loads)

Choose MSK Provisioned when:
  Throughput: > 40 MB/s sustained
  Retention: > 365 days or unlimited
  Team: Strong Kafka expertise
  Priority: Control, customization, ecosystem
  Cost: Fixed (better for sustained high throughput)
Enter fullscreen mode Exit fullscreen mode

Cost Optimization Tips

Kinesis:
  - Use On-Demand for variable workloads
  - Enable EFO only when needed (adds cost)
  - Checkpoint every 60 seconds, not per record
  - Use KPL aggregation for better PUT efficiency
  - Right-size shards (don't over-provision)

MSK Serverless:
  - Best for < 40 MB/s average throughput
  - Enable compression (lz4) to reduce ingress/egress costs
  - Use for dev/test environments
  - Consider for variable production workloads

MSK Provisioned:
  - Better economics for > 40 MB/s sustained
  - Use tiered storage for long retention (90% savings)
  - Enable compression (reduces storage costs)
  - Right-size brokers (don't over-provision)
  - Use Graviton instances for 20% cost savings
Enter fullscreen mode Exit fullscreen mode

Next Steps

  1. Prototype: Build a simple proof-of-concept with your preferred option(s)
  2. Benchmark: Test with your actual data patterns and throughput
  3. Calculate TCO: Include operational overhead, not just AWS costs
  4. Review Team Skills: Assess current expertise and training needs
  5. Start Small: Begin with one use case, prove it, then expand
  6. Document: Create runbooks for common operations
  7. Monitor: Set up comprehensive monitoring and alerting

Resources

Official Documentation:

Best Practices:

Tools & Libraries:

Additional Reading:


About This Guide: This comprehensive guide covers Amazon Kinesis and Amazon MSK (Serverless and Provisioned). AWS services evolve rapidly—always consult official documentation for the latest features and pricing.

Found this guide helpful? Share it with your team! For questions, corrections, or suggestions, please reach out through GitHub Issues or the comments section.

Top comments (0)