Building Scalable Event-Driven Systems with Kafka and Avro Schema Registry
Introduction
In today's data-driven world, modern applications need to process large volumes of events reliably and efficiently. At our organization, we've embraced Apache Kafka and Avro Schema Registry as foundational technologies for our event-driven architecture. This blog post shares our implementation journey, design decisions, and lessons learned while building a robust event streaming platform.
For our Kafka implementation, we leverage the excellent franz-go/kgo library - a pure Go Kafka client that provides high performance, modern features, and a clean API. This library has been instrumental in helping us build a reliable and efficient Kafka integration.
Why Kafka and Avro?
Before diving into implementation details, let's understand why we chose this technology stack:
Apache Kafka provides:
- Fault-tolerant, high-throughput event streaming
- Persistent storage of events with configurable retention
- Horizontal scalability for both producers and consumers
- Strong ordering guarantees at the partition level
- Exactly-once semantics (with proper configuration)
Avro and Schema Registry deliver:
- Compact, efficient binary serialization
- Rich data structures with nested objects
- Schema evolution capabilities
- Backward and forward compatibility
- Centralized schema management
The combination gives us a powerful foundation for building event-driven systems that can evolve over time while maintaining compatibility between producers and consumers.
Key Concepts
Before exploring our implementation, let's review some key concepts:
Kafka Fundamentals
- Topics: Named channels for publishing event streams
- Partitions: The unit of parallelism in Kafka, allowing horizontal scaling
- Producer: Publishes events to topics
- Consumer: Subscribes to topics and processes events
- Consumer Groups: Coordinate consumption across multiple consumer instances
- Offsets: Track the position of consumers within partitions
Avro and Schema Registry
- Avro Schema: JSON document defining the structure of events
- Schema Registry: Central repository for schemas
- Schema ID: Unique identifier for a registered schema
- Schema Evolution: Rules for compatible schema changes
- Serialization/Deserialization: Converting between objects and binary format
Implementing a Robust Kafka Producer
Let's examine how we implemented our Kafka producer with Avro serialization.
Schema Management
The first step in our implementation was building a robust schema management system. We created a SchemaManager component responsible for:
- Registering schemas with the Schema Registry
- Caching schemas to avoid repeated registry calls
- Serializing and deserializing messages
- Creating properly formatted Kafka records
Here's how our Schema Manager is structured:
type SchemaManager struct {
Client *sr.Client // Schema Registry client
Serde *sr.Serde // Serializer/Deserializer
SchemaCache map[string]avro.Schema // Cache for parsed schemas
SubjectCache map[string]sr.SubjectSchema // Cache for registry schemas
}
A key design decision was implementing schema registration with version checking, which ensures we only register new schemas when necessary:
func RegisterSchema(ctx context.Context, topic string, schemaFile string) (int, error) {
// Read schema file
schemaText := readSchemaFile(schemaFile)
subject := topic + "-value" // Standard naming convention
// Check if schema already exists
versions := getExistingVersions(subject)
if schemaAlreadyExists(versions, schemaText) {
// Use existing schema
return existingSchemaId, nil
}
// Register new schema if needed
avroSchema := parseAvroSchema(schemaText)
schemaId := registerWithRegistry(subject, schemaText)
// Cache the schema for future use
cacheSchema(subject, avroSchema, schemaId)
return schemaId, nil
}
Producer Configuration
Proper producer configuration is crucial for reliability and performance. Here's how we configure our Kafka producers:
func NewProducer(cfg *Config) (*Producer, error) {
opts := []kgo.Opt{
kgo.SeedBrokers(cfg.Brokers...),
kgo.DialTimeout(cfg.DialTimeout),
kgo.RetryBackoffFn(exponentialBackoff(cfg.RetryBackoffMs)),
kgo.RecordRetries(cfg.Retries),
kgo.RequiredAcks(kgo.AllISRAcks), // Wait for all replicas
kgo.ProducerBatchCompression(kgo.SnappyCompression()),
}
return kgo.NewClient(opts...)
}
Key configuration options include:
- RequiredAcks: We use
AllISRAcksto ensure durability, waiting for all in-sync replicas to acknowledge writes - RecordRetries: Configures how many times to retry failed produces
- RetryBackoff: Implements exponential backoff for retries
- BatchCompression: Reduces network bandwidth and storage requirements
Serialization and Production
The core of our producer implementation combines schema management with message production:
func Produce(ctx context.Context, topic string, key string, event interface{}) error {
// Get schema for topic
subject := topic + "-value"
schema := getSchemaFromRegistry(subject)
// Serialize the event
serializedData := serializeWithSchema(schema, event)
// Create Kafka record
record := &kgo.Record{
Topic: topic,
Key: []byte(key),
Value: serializedData,
Headers: []kgo.RecordHeader{
{Key: "trace_id", Value: []byte(getTraceID(ctx))},
},
}
// Send asynchronously with callback
resultCh := make(chan error, 1)
producer.Produce(ctx, record, func(r *kgo.Record, err error) {
if err != nil {
log.Error("delivery failed", "error", err)
resultCh <- err
return
}
log.Info("message delivered",
"topic", r.Topic,
"partition", r.Partition,
"offset", r.Offset)
resultCh <- nil
})
// Wait for result
return <-resultCh
}
Our implementation uses asynchronous production with callbacks, which provides better throughput while still giving us synchronous error handling in the client code.
Advanced Kafka Configuration
Beyond basic producer setup, we've implemented several advanced patterns for reliability and performance.
Handling Delivery Guarantees
Kafka offers different delivery guarantees depending on configuration. We've chosen the strongest guarantees for our critical data flows:
- Producer Acks: Set to
allto ensure messages are durably stored - Idempotent Producers: Prevent duplicate messages due to retries
- Transactional Messages: For scenarios requiring atomic multi-record operations
// For transactional producers
kgo.TransactionalID("unique-producer-id"),
kgo.ProducerBatchMaxBytes(2*1024*1024), // 2MB batch size
Partitioning Strategies
Partition selection is critical for workload distribution and maintaining event ordering. We use several strategies depending on the use case:
- Key-Based Partitioning: Events with the same key go to the same partition
- Custom Partitioning: For specific workload distribution requirements
// Custom partition strategy
kgo.RecordPartitioner(kgo.StickyKeyPartitioner(nil))
Key-based partitioning is particularly important for us when processing related events that must maintain order, such as a sequence of operations on the same resource.
Implementing Avro Schemas
Avro schemas form the contract between our producers and consumers. Here's an example of a typical event schema:
{
"type": "record",
"name": "Event",
"namespace": "com.example",
"fields": [
{"name": "id", "type": "string"},
{"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
{"name": "type", "type": {"type": "enum", "name": "EventType", "symbols": ["CREATED", "UPDATED", "DELETED"]}},
{"name": "resource_id", "type": "string"},
{"name": "metadata", "type": {"type": "map", "values": "string"}, "default": {}}
]
}
The Go struct representing this schema is annotated with Avro tags for serialization:
type Event struct {
ID string `avro:"id"`
Timestamp int64 `avro:"timestamp"`
Type string `avro:"type"`
ResourceID string `avro:"resource_id"`
Metadata map[string]string `avro:"metadata"`
}
Schema Evolution
One of the most powerful features of Avro is schema evolution. We follow these rules to ensure compatibility:
- Adding Fields: Always include default values for new fields
- Removing Fields: Only remove fields that have defaults
- Changing Types: Only widen types (int → long)
- Renaming Fields: Use aliases for field renames
For example, to add a new optional field:
{"name": "priority", "type": ["null", "int"], "default": null}
Consuming Kafka Messages with Avro
On the consumer side, we implement a similar pattern for deserialization:
func Consume(ctx context.Context, topic string, handler func(Event) error) error {
// Configure consumer
consumer := configureConsumer(topic)
// Start polling loop
for {
fetches := consumer.PollFetches(ctx)
if fetches.IsClientClosed() {
return nil
}
// Process all fetched records
for _, fetch := range fetches {
for _, record := range fetch.Records {
// Deserialize using Avro schema
event, err := deserializeEvent(record.Value)
if err != nil {
log.Error("failed to deserialize", "error", err)
continue
}
// Process the event
if err := handler(event); err != nil {
log.Error("failed to process", "error", err)
// Handle error based on strategy
}
}
}
// Commit offsets after successful processing
consumer.CommitRecords(ctx, fetches.Records()...)
}
}
Consumer Group Configuration
We use consumer groups to distribute workload across multiple instances:
func configureConsumer(topic string) *kgo.Client {
return kgo.NewClient(
kgo.SeedBrokers(brokers...),
kgo.ConsumerGroup("my-service-group"),
kgo.ConsumeTopics(topic),
kgo.AutoCommitInterval(0), // Disable auto-commit
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), // Start from beginning if no offset
kgo.BlockRebalanceOnPoll(), // Prevent rebalances during processing
)
}
Key consumer configurations include:
- ConsumerGroup: Enables cooperative consumption
- AutoCommitInterval: We disable this in favor of explicit commits
- ConsumeResetOffset: Controls behavior when no offset is found
- BlockRebalanceOnPoll: Prevents rebalances during processing
Scaling Considerations
Our Kafka implementation is designed to scale horizontally. Here are key considerations:
Producer Scaling
- Connection Pooling: We use a shared producer client to avoid connection explosion
- Batching: Our producers batch messages for efficiency
- Async Production: Allows high throughput without blocking
Consumer Scaling
- Partition Count: We size topic partitions based on expected consumer count
- Consumer Groups: Allow horizontal scaling of processing
- Rebalance Strategy: We use cooperative rebalancing to minimize disruption
Kafka Cluster Scaling
- Topic Replication: We use a replication factor of 3 for durability
- Broker Sizing: Properly sized brokers based on throughput and retention
- Monitoring: Comprehensive metrics for scaling decisions
Operational Patterns
Beyond the core implementation, we've developed several operational patterns:
Observability
We add trace IDs to message headers, enabling distributed tracing across our event pipeline:
record.Headers = append(record.Headers, kgo.RecordHeader{
Key: "trace_id",
Value: []byte(getTraceID(ctx)),
})
This allows us to correlate logs across services and track event flow through the system.
Error Handling
We implement sophisticated error handling for different failure scenarios:
- Transient Failures: Automatic retries with backoff
- Schema Errors: Alert and manual intervention
- Dead-Letter Queues: Separate topics for unprocessable messages
Schema Registry Management
We've implemented tooling to manage schema evolution, including:
- Schema Validation: CI/CD checks for compatibility
- Schema Versioning: Tracking schema versions in source control
- Schema Deployment: Coordinated deployment of schemas and code
Performance Tuning
Optimizing Kafka performance requires tuning at multiple levels:
Producer Tuning
// Producer performance tuning
kgo.ProducerBatchMaxBytes(1024*1024), // 1MB batches
kgo.ProducerBatchMaxMessages(1000), // Max messages per batch
kgo.ProducerLinger(10*time.Millisecond), // Wait for batch to fill
kgo.ProducerBatchCompression(kgo.SnappyCompression()),
Consumer Tuning
// Consumer performance tuning
kgo.FetchMaxBytes(10*1024*1024), // 10MB max fetch
kgo.FetchMaxWait(100*time.Millisecond), // Max wait time for fetch
kgo.FetchMinBytes(1024), // Minimum bytes to fetch
JVM Tuning (Kafka Brokers)
Though we don't manage the Kafka cluster directly in this example, broker tuning is essential:
# Broker performance settings
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.flush.interval.messages=10000
log.flush.interval.ms=1000
Lessons Learned
Implementing Kafka with Avro has taught us several valuable lessons:
- Schema First Development: Define schemas before implementing producers/consumers
- Compatibility Testing: Test schema evolution rigorously
- Monitoring Is Critical: Implement comprehensive monitoring for Kafka metrics
- Proper Error Handling: Design for failure from the beginning
- Thoughtful Partitioning: Partition strategy impacts performance and ordering
Conclusion
Apache Kafka with Avro Schema Registry provides a powerful foundation for scalable, event-driven systems. By implementing robust producers and consumers with proper configuration and error handling, we've built a reliable event streaming platform that forms the backbone of our distributed architecture.
The combination of Kafka's durability and scalability with Avro's efficient serialization and schema evolution capabilities gives us a system that can grow with our business needs while maintaining backward compatibility.
As with any technology, the implementation details matter. Careful attention to configuration, error handling, and operational patterns has allowed us to build a system that not only works well today but can evolve to meet future requirements.
Technology Credits
Our implementation relies on several outstanding open-source libraries:
-
franz-go/kgo: A pure Go Kafka client that provides excellent performance and a modern API design. The kgo library offers a clean, idiomatic Go interface to Kafka with powerful features like record batching, automatic retries, and flexible configuration. We chose kgo for its performance characteristics, thorough documentation, and active maintenance.
-
hamba/avro: A fast Avro codec for Go that supports schema evolution and efficient serialization/deserialization. This library provides the foundation for our Avro implementation.
-
franz-go/schema-registry: A Schema Registry client that integrates seamlessly with franz-go/kgo, making it easy to manage Avro schemas for our Kafka topics.
Special thanks to the maintainers of these libraries for creating such high-quality tools for the Go ecosystem.
This article represents our experiences implementing Kafka and Avro Schema Registry in a production environment. Your specific requirements may differ, and I encourage careful testing and validation of any implementation before deploying to production.