Mastering Your EDA: Event Ordering and Smooth Schema Evolution
Event ordering plays a critical role in Event-Driven Architectures (EDAs) when defining business events for a specific subject. If you want to achieve ordering on a Kafka-based system, publishing events to the same topic partition is recommended, so you can create a data stream that reflects the event lifecycle of that subject. But how do you set this up?
In this technical deep dive, we'll explore best practices for setting up event ordering in Kafka. We'll also discuss some common challenges encountered in real-world projects and how to address them effectively. Let's get started.
First steps and questions
We'll begin by assuming the use of schemas and serialization frameworks within a Kafka cluster that includes a schema registry. However, remember that these concepts can be applied to other scenarios too.
To make the discussion more concrete, let's define a set of events that would occur in an order fulfillment setting, for example for a webshop. These events will occur on a topic in the order_fulfillment bounded context and handle the orders subject.
- Order Confirmed
- Order Shipped
- Order Delivered
Now, we face a crucial decision regarding data contracts: should we define a single or multiple schemas within the topic? In Kafka terminology, this translates to the selection of an appropriate subject naming strategy. Let's explore both options in more detail.
Defining a single schema
In this case, our goal is to accommodate multiple event types on a single topic. This requires a single schema that can validate all these event types. The burden then falls on the chosen serialisation technology to handle this complexity effectively.
Fortunately, most modern serialisation frameworks, like Protocol Buffers (also known as Protobuf) and Apache Avro are well-equipped to manage this task. As the examples below show, Protobuf uses a oneof structure, while Avro uses union.
Protocol Buffers
message OrderEvent {
required uint64 timestamp = 1;
required Order order = 2;
oneof event {
OrderConfirmed confirmed = 10;
OrderShipped shipped = 11;
OrderDelivered delivered = 12;
}
}
AVRO
record OrderEvent {
long timestamp;
Order order;
union {
OrderConfirmed,
OrderDelivered,
OrderShipped
} event;
}
Once the schema is defined, we need to establish a method for enforcing and managing its usage. Kafka's Schema Registry plays a crucial role here. The schema is linked to a subject, which in turn is linked to the actual topic. In Schema Registry terminology, this corresponds to the default strategy, TopicNamingStrategy. The subject for the topic's value is constructed following a specific format: <topic_name>_value.
Now, let's consider the implications for schema evolution. As discussed in a previous blog post, enforcing the strictest strategy (FULL_TRANSITIVE) can be quite restrictive. Let's try a common modification that affects the oneof structure.
Adding events in Protocol Buffers
When we add the OrderPaid event in Protocol Buffers, the new schema looks as follows:
message OrderEvent {
required uint64 timestamp = 1;
required Order order = 2;
oneof event {
OrderConfirmed confirmed = 10;
OrderShipped shipped = 11;
OrderDelivered delivered = 12;
OrderPaid paid = 13;
}
}
However, when we try to register this schema, the following happens:
org.apache.kafka.common.errors.InvalidConfigurationException: Schema being registered is incompatible with an earlier schema for subject "proto.singleSchema.order.events-value", details: [{errorType:"ONEOF_FIELD_REMOVED", description:"The old schema is missing a oneof field at path '#/OrderEvent/event/13' in the new schema"},
{compatibility: 'FULL_TRANSITIVE'}]; error code: 409
Changes to a oneof structure in Protocol Buffers are considered breaking changes for forward and full compatibility. However, changing the compatibility level to backward_transitive allows registration and publishing of events with the new schema.
Now, it's time to ask ourselves an important question: what does this mean for consumers? The good news is that consumers can still process the events, but they will encounter unknown fields for the newly added OrderPaid property. In these cases, you will have to implement logic to either ignore these unknown fields or handle them appropriately.
Here's an example of the consumer properties for a scenario where the old schema (without "OrderPaid") is used to consume events:
public static Properties getPropertiesSingleSchema(String bootstrapServers, String schemaRegistryUrl) {
Properties props = new Properties(4);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
props.put("schema.registry.url", schemaRegistryUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName()); props.put("value.subject.name.strategy", TopicNameStrategy.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class.getName());
props.put("specific.protobuf.value.type", OrderEvent.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
This code snippet shows the consumer itself:
private static void consumeSingleSchemaProtoEvents(){
KafkaConsumer<String, OrderEvent> consumer = new KafkaConsumer<>(ConsumeProto.getPropertiesSingleSchema("localhost:9092", "http://localhost:8081"));
consumer.subscribe(List.of(PROTO_SS_ORDER_EVENTS_TOPIC));
consumer.poll(Duration.ofSeconds(2)).forEach(rec -> {
System.out.printf("----------------%nOffset: [%s]%nEvent: [%s]%nKey:[%s]%nValue: [%s]%n %n",
rec.offset(),
rec.value().getEventCase(),
rec.key(),
rec.value().toString());
});
}
As expected, the consumer encounters an unknown field (EVENT_NOT_SET) for the newly added property.
Offset: [4]
Event: [EVENT_NOT_SET]
Key: [DweTB9rQ5sPjPXsMQjHD]
Value: [
timestamp: 1712231832
order {
id: "DweTB9rQ5sPjPXsMQjHD"
}
13: { }
]
In summary, Protocol Buffers/Protobuf handles forward compatibility through its internal mechanisms. If you use Protobuf's oneof structure within your schema definitions, it's crucial to set the compatibility level to either BACKWARD or BACKWARD_TRANSITIVE to ensure successful schema evolution. Choosing FULL or FORWARD compatibility will effectively lock down any changes to the oneof structure, potentially rendering the schema unusable.
Keep in mind that this strategy comes with some limitations. Adding required fields will also be considered breaking changes. However, adding optional fields is allowed, but it will result in an opaque property within the payload for consumers using older schema versions. These consumers can see the unknown field but lack the information to understand it.
Adding events in Apache Avro
When we add the OrderPaid event in Avro, the new schema looks as follows:
record OrderEvent {
long timestamp;
Order order;
union {
OrderConfirmed,
OrderDelivered,
OrderShipped,
OrderPaid,
} event;
}
Just like with Protocol Buffers, when we try to register the schema with FULL_TRANSITIVE compatibility results in an error:
org.apache.kafka.common.errors.InvalidConfigurationException:
Schema being registered is incompatible with an earlier schema for subject "avro.singleSchema.order.events-value",
details: [{errorType:'MISSING_UNION_BRANCH', description:'The old schema is missing a type inside a union field at path '/fields/2/type/3' in the new schema',
additionalInfo:'reader union lacking writer type: RECORD'},
{compatibility: 'FULL_TRANSITIVE'}]; error code: 409
This happens because Avro also treats changes to union fields as breaking for forward compatibility. Setting this to BACKWARD_TRANSITIVE instead solves the error:
The error might be solved, but can we still consume the event without the forward compatibility? Let's try:
KafkaConsumer<String, eu.cymo.kafkaSerializationEvolution.OrderEvent> consumer = new KafkaConsumer<> (ConsumeAvro.getPropertiesSingleSchema("localhost:9092", "http://localhost:8081"));
consumer.subscribe(List.of(AVRO_SS_ORDER_EVENTS_TOPIC)); consumer.poll(Duration.ofSeconds(2)).forEach(rec -> {
System.out.printf("----------------%nOffset: [%s]%nEvent: [%s]%nKey:
[%s]%nValue: [%s]%n %n",
rec.offset(),
rec.value().getEvent().getClass(),
rec.key(), rec.value().toString());
});
Depending on the content of the events, you may receive a different response.
- If you do have an empty event structure, it will default to that one (record OrderConfirmed{ }), and consuming it yields the following result:
Offset: [4]
Event: [class eu.cymo.kafkaSerializationEvolution.event.OrderConfirmed]
Key: [pO18e2pTe0m5UnyOOymQ]
Value: [{
"timestamp": 1712256251,
"order": {
"id": "pO18e2pTe0m5UnyOOymQ"
},
"event": {}
}]
However, this is a plain lie. The event simply did not occur. Depending on what it defaults to, this can be potentially dangerous.
2. If you do not have an empty event structure that it can default to, the deserialisation will simply fail.
ERROR org.apache.kafka.clients.consumer.internals.CompletedFetch - [Consumer clientId=consumer-8af41c8f-e3f5-4b14-8b87-2c4b48f6ee0a-1, groupId=8af41c8f-e3f5-4b14-8b87-2c4b48f6ee0a] Deserializers with error: Deserializers{keyDeserializer=org.apache.kafka.common.serialization.String
Deserializer@4dbad37,
valueDeserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer@7b4 acdc2}
This is where Avro differs from Protocol Buffers:
- Instead of defaulting to an unknown state, it defaults to a compatible type of the existing members of the union or fails entirely.
- If you have multiple empty payloads, it will default to the first one as per the default Avro spec.
By itself, this behaviour makes it unusable in its current form. However, we can make it work using a minor tweak.
record Unknown {}
...
record OrderEvent {
long timestamp;
Order order;
union {
Unknown,
OrderConfirmed,
OrderDelivered,
OrderShipped,
OrderPaid
} event;
}
Adding the empty Unknown record make sit default to Unknown every time, which makes it predictable to use.
Offset: [4]
Event: [class eu.cymo.kafkaSerializationEvolution.event.Unknown]
Key: [auzEa4LqDqiJwso6k7UC]
Value: [{
"timestamp": 1712257605,
"order": {
"id": "auzEa4LqDqiJwso6k7UC"
},
"event": {}
}]
Defining multiple schemas
We've explored the challenges associated with a single schema for multiple event types, but you could also use a different schema for each event type within the same topic. This approach still guarantees event ordering, but each event can be defined as its own subject, which in turn can evolve independently from each other.
In a Kafka schema registry setup, this aligns with the TopicRecordNamingStrategy. This strategy constructs the subject by combining the topic name with the fully qualified domain name (FQDN) of the event record.
In our order fulfilment example, this would translate to four distinct subjects, one for each event type:
Consequently, schema changes for one event (e.g., adding OrderPaid) would only impact its corresponding subject and schema, leaving the other event schemas unaffected.
Evolving schemas
At this point, you may be worried about the evolution of the order schema. What would happen if we evolved it at a different rate for those events?
Let's try this out, starting with FULL_TRANSITIVE compatibility. We begin by adding an optional field to the order structure:
message Order {
required string id = 1;
optional string category = 2;
}
If we publish with the new structure with a single event, this works fine. A new schema version will be added for both order and the event that was published, while the others will not be affected.
We can also still produce using the old order schema for the remaining three events.
However, a problem can occur when you share the order structure with producers that want to evolve it differently.
For example, consider a scenario where we've added category to version 1 to create a version 2. If another service is still using version 1 and wants to add a field, it will first have to conform to version 2 before it can create version 3.
We highly recommend not sharing common structures across bounded contexts, since they could lead to the challenges we discussed. Within a bounded context you typically want to align on these common structures, so communication and data contracts are clear. Defining them as separate subjects makes it easier to keep these common structures on the same evolution path across the various schemas.
Avro and schema reuse
Avro also lets you reuse schemas through imports, but it doesn't automatically create separate subjects for imported schemas. Because the order structure is included within each individual event definition, these definitions could diverge over time, creating inconsistencies if you don't control them centrally.
The example below shows what happens when one evolution of the order added a category, while the other added a type.
Should you use a single schema or multiple schemas?
Single schema considerations
Many analytics tools and data ingestion platforms require using a single schema per topic. This makes third-party integration much easier, especially for setting up data structures within or schemas within the consuming applications. Examples of this include Apache Flink, Databricks, many Confluent Cloud connectors, ClickHouse, and other streaming database ingesters.
A single schema is also easier to consume with custom code. Consumers can use semantic type definitions within their code to handle record structures effectively. Serialization frameworks offer well-defined mechanisms for processing these structures. However, it's easy to solve with some libraries if the language does not cover it out of the box.
The main downside to using a single schema is that event evolution becomes harder. You'll have to carefully consider how it impacts other events in your schema. However, it's manageable. We recommend using the Single Writer Principle, which states that for any item of data, or resource, that item of data should be owned by a single execution context for all mutations.
Multiple schemas considerations
Using multiple schemas per topic offers several advantages. Handling is a lot more lightweight: because each event schema only defines a single event type, they are less complex. It is also easier to add or remove events. Finally, schemas can evolve independently: a non-breaking change to one schema will have no effect on the others.
On the other hand, multiple schemas are less straightforward to consume. Consumers will need to handle a wider range of schemas, which may require additional logic to identify the event type before deserialization. This often involves an initial step into a generic payload format, followed by extraction of the specific event data structure.
As we mentioned above, using multiple schemas per topic is also not supported by (most) third-party tools. Finally, there is also a risk of common structures diverging within the same bounded context, if they are not used carefully.
Making the right choice
Unfortunately, there is no clear-cut answer. Ultimately, the optimal approach depends heavily on your specific environment and use cases. Carefully consider factors like third-party tool integrations, schema evolution needs, and the complexity of your event types. If you're unsure which approach best suits your project, or if you require assistance with schema design and implementation, Cymo is here to help. Our team of experts can guide you through the decision-making process and assist you in crafting a robust and scalable Event-Driven Architecture.
Need some help deciding? Contact us!Written byBryan De Smaele