Dieser Blogpost ist auch auf Deutsch verfügbar

Introduction

Whether you’re using Kafka for Event Sourcing, for IoT applications, or for the integration of your distributed systems, you may come to a point where you want to use a schema for your shared data. A schema can support you in establishing data consistency. Especially in organizations consisting of multiple independent teams this is of utmost importance. Using a schema you can also facilitate data validation before it can be written into Kafka. Your schema definition also serves as a concise documentation for your developers.

Schema Evolution

Similar to an organism, your software is living. Growing and changing, there comes the need to share additional data in your Kafka. It may also happen that existing fields of your data have to be renamed or removed. Once you find yourself in this situation you’ll have to adapt your existing schema and publish it. This process is called schema evolution.

Changing data structures sounds easy, yet brings its own challenges. When a data-producing team wants to change a data field that is already in use, consuming teams will also have to adapt their systems to the new schema. Similarily changing data fields and adding new data fields results in hightened effort: consuming teams can’t use the new data before adapting to the other changes. Which team has to update their systems first? Producers or consumers? How to coordinate them?

Schema Evolution is an important aspect in the management of your data, especially if it is being used by distributed systems, and should be conducted diligently.

Tools

Before delving into approaches and solutions for specific questions, let’s first explore a few tools. Apache Avro is an open-source system for data serialization. Schemas in Avro can be defined in JSON and can be used cross-platform. Avro also supports data compression, supporting you in reducing your storage footprint and improving your transport efficiency. It’s possible to dynamically type your data using union. This way you can describe your data in a more flexible way without having to immediately define them strictly. Using Avro, older and newer versions of your schemas can simultaneously coexist. This way, a system does not need to consume newly provided data and can decide to keep using an older schema version.

MessagePack is a lightweight alternative for data serialization. Your data structures can be declared as JSON as well. Your messages will be encoded as bytes in the serialization process, so you have similar compression benefits like in Avro. MessagePack is compatible across various different programming languages and usable cross-platform as well. Data structures are extensible without impairing compatibility with older versions of your schema. Changes to already existing data can result in some complexity though, especially if you need to guarantee backwards compatibility.

Apache Thrift is another alternative for serialization. You can define your data structures and service interfaces in a .thrift-file in Thrifts IDL. Compiling them with the thrift-compiler, you’ll generate code which can be used to enable your clients and servers to communicate via RPC. Thrift can be used cross-platform as well and supports quite a few programming languages. Interoperability is ensured due to the clients and servers communicating with each other based on a common thrift interface.

Protocol Buffers (also known as ProtoBuf) is another popular tool for data serialization. Data structures are defined in a .proto-file and can be used to generate code in several programming languages which you can use to manipulate your entities. Protobuf supports message packets up to a few megabytes in size and does not include compression. The documentation provided is quite extensive. You can find best practices for using data types, tutorials and references for various programming languages. The style guide gives you some guidelines for structuring your .proto-Files.

Avro & Kafka, Tooling

In my experience Avro is a popular choice in projects basing on Kafka so we’ll dive deeper here. Avro gives us serializers and deserializers (SerDes), which can directly be used in our producers and consumers. The SerDes can be seamlessly integrated into Kafka Connectors as well. Using Kafka Streams you can utilize Avro to process your data streams.

Avro also includes some CLI tooling. Using avro-tools fromjson, you can convert your data structures defined in JSON into Avro schemas. Using getmeta you can extract meta data of your Avro files. Avro uses its own IDL which can be used to describe data structures in a human readable format. Code for your applications can be generated compiling your IDL-files via compile.

A product defined in Avros IDL could look like this:

record Product {
  string name;
  union { null, string } description; //optional field
  boolean availability = true;
}

Often a schema registry is used in combination with Avro. Schema registries can help you in storing and managing your schemas centrally. Confluent Schema Registry is a popular registry for doing this. It can also do compatibility checks for you. AWS Glue Schema Registry is an alternative you could also use.

Schema creators register and evolve their schema in the registry. The schema can be shared or [linked](https://docs.confluent.io/platform/current/schema-registry/schema-linking-cp.html#what-is-schema-linking). Producers and consumers can use this schema to serialize their data. Kafka validates incoming data against the schema.
Schema creators register and evolve their schema in the registry. The schema can be shared or linked. Producers and consumers can use this schema to serialize their data. Kafka validates incoming data against the schema.

Approaches

Once you evolve your schema – creating a new version of itself – you may find yourself in a situation where you have two versions of your schema. Fundamentally this is a desirable situation in the development of distributed systems. Otherwise you’d have to simultaneously update all systems which rely on your schema. Coordinating such an effort is a difficult process and inherently has high risk of error.

You have to consider compatibility to various versions before you can publish a new schema. Varying systems might be using disparate versions. A newly published schema could be backwards compatible. In this case systems based on the new version should be able to consume messages which are produced by sytems based on older versions. If you want your new version to be backwards compatible, you are only allowed to delete fields - only optional fields may be added simultaneously. Proceeding with such a schema evolution, consumers have to be updated first to the new schema version. For BACKWARD compatible schemas, Confluent Schema Registry checks whether it is compatible with the previous version. Using BACKWARD_TRANSITIVE, all previous versions are checked for compatibility.

In case you want to create a FORWARD compatible schema you are only allowed to add fields - only optional fields may be removed simultaneously. Doing a forward compatible evolution, you need to adapt your producers first to the new schema. Using FORWARD_TRANSITIVE, all previous versions are checked for compatiblity.

Schema versions where you only add or remove optional fields are forward and backward compatible. The compatibility type is FULL for this case, if you only want to check the previous version or FULL_TRANSITIVE if all previous verions shall be checked for compatibility. You do not need to adhere to any sequence when updating your producers and consumers if you do such an evolution.

Using NONE the registry does no compatibility check and all changes are allowed. The effort in coordinating such an evolution may be higher in this case. You’ll have to really carefully orchestrate the update process of your producers and consumers, and in some cases you may even have to update them at the same time. You should put additional monitoring and testing efforts in such an evolution.

This table gives a summary for compatibility & change order. Source of the table: https://docs.confluent.io/platform/current/schema-registry/fundamentals/schema-evolution.html#summary
This table gives a summary for compatibility & change order. Source of the table: https://docs.confluent.io/platform/current/schema-registry/fundamentals/schema-evolution.html#summary

Best Practices

If they are used correctly by all systems, you can basically remove or add optional fields freely. In general, changing the names of your fields can result in problems and is not advisable. In case you absolutely need to do this anyways, it’s easier to coordinate if you do it in two evolutions. Instead of changing the name of a field, create a new field with your desired name in the first evolution. At the same time you can document your old field as deprecated and communicate it as such. Once all consumers are adapted and use the new field, the deprecated one can be removed. This way you can effectively change the name of a field without forcing producers and consumers to orchestrate their deployments.

It’s advisable to separate addition and removal of mandatory fields. This makes your schema evolution process easier, since consumers and producers can update independently. If no schema registry is used you should manually manage the versioning of your schemas. This way you can easier track changes or restore an older version if something breaks. You should also do compatibility tests on your own in this case.

You have to deliberately plan ahead if you want to perform your schema evolution smoothly. Document your desired changes and communicate your plan clearly. In an Event-Driven Architecture you should consider documenting your asynchronous APIs with a tool like AsyncAPI. Also it might be better to deliberately design your API Governance instead of just letting things happen naturally all by themselve. In the course of your API Governance formation you could map out policies and define your lines of communication. This way the various teams can build on familiar workflows and on common rules.

Example of a Schema Evolution

Let’s assume we are building a distributed web application in e-commerce, for distribution of construction components like pillars, stairs, and facades. The components are consisting of wood or concrete. One of the systems wants to start writing records into Kafka. Also we are using Avro and won’t be using any schema registry in this example. At the beginning, the producing team wants to share products consisting of ID, name, price, material, and an optional description. The described products will also have some kind of attributes, which for example can describe dimensions or bearing capacity of a pillar, or any other information relevant for construction.

A schema definition for such a product could look like this:

{
  "type": "record",
  "name": "Product",
  "namespace": "com.ecommerce",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "price", "type": "bytes", "logicalType": "decimal", "scale": 2},
    {"name": "material", "type": "string"},
    {"name": "attributes", "type": {
      "type": "map",
      "values": "string"
    }},
    {"name": "description", "type": ["string", "null"]}
  ],
  "version": "1"
}

So far we didn’t have any schema for products so the new schema can be introduced without any issues. Another existing system, which is responsible for the search features of the application, starts to consume products and displays them in their new product-search feature. The search-team also creates a filter in their new feature where users can limit the price and select the material of their desired components.

We are living in ever-changing tumultuos times, and a few things happen simultaneously with immediate effects on the company and some indirect effects on the application as well. The competitors of our fictitious company grow, especially in wood components - unfortunately, it looks like their wood-based products are much better. Also, the present economic situation is not really great anyways. The strategic division of the company takes a drastic step and decides to specialize in concrete. Distribution and production of wood-based components shall be stopped as soon as possible. Previously, all products were only marketed on the web application. Demand for our concrete-based components is immense though, and the sales division is on the brinks of their limit: far too many e-mails and the phones are occupied all the time! We need a feature to enable buying on our e-commerce application urgently to relieve our sales division. In the meantime, the the web applications product-team has realized that the other teams don’t need the description: it looks like all products are succesfully found via search by name and the attributes, in some cases directly via filter. Also nobody except the product-team is displaying the description.

We now have several requirements for our web application:

Requirements R1. and R2. have similar priority: a feature for buying online relives sales. Removal of wood-based components recudes the time needed for transitioning to conrecte-only, empowering the company to specialize in concrete-only faster. Given limited ressources, the development teams should be coordinated to effectively prioritize the work on the requirements. In case you have sufficient ressources, all teams can start simultaneously on all features. Usually one of the teams will finish their work faster, resulting in a natural order for deploying the features into production.

For our scenario, lets assume that work for all features is finished at exactly the same time. For this case it would be advisable that the executive levels have already decided a preference and communicated it to development. In our case we’ll assume there’s no preference, and flow of information between development and executive levels is to slow. In such an extreme scenario the development teams will have to reach a decision among themselves. Relieving sales and transitioning into specialization are both of high priority, and waiting for too long is resulting in unnecessary costs.

Aside to the creation of a new system for the shopping cart and buying features, and various changes to remove wood-based components, you’ll also have to adapt the schema. To enable the desired requirements we need the following changes in our schema:

S3 is not that important for the critical requirements R1 and R2 and thus has a lower priority. Our data structure is minimized and messages can be transported more efficiently though, so we also want to do this soon - maybe we won’t need to do this in a separate schema evolution.

Remembering the best practices, we know that we have two changes which should be done separately: S1 and S2. Also S3 can be done together with S1 in one evolution.

The change for S1 and S3 is FORWARD compatible, thus producers have to be updated first. The required change for S2 is BACKWARD compatible, thus consumers need to be updated first.

In our previously described scenario the development teams are free in deciding the sequence of the schema evolutions. Under the assumption that the required changes in updating the producers and consumers could be finished in exactly the same time, the decision for the update sequence can be taken completely arbitrarily. Ultimately you could also decide based on the amount of consumers and producers that have to be adapted. If consumers can be adapted faster, you should do the schema change for S2 first. In this case “material” will be removed in your application, accelerating your companies transition process.

In our case we only have one producer and multiple consumers, so we’ll start with the FORWARD compatible change. In the first step we can remove “description” and add “availability”:

{
  "type": "record",
  "name": "Product",
  "namespace": "com.ecommerce",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "price", "type": "bytes", "logicalType": "decimal", "scale": 2},
    {"name": "material", "type": "string"},
    {"name": "availability", "type": "boolean"},
    {"name": "attributes", "type": {
      "type": "map",
      "values": "string"
    }},
  ],
  "version": "2"
}

After publishing the new schema the product-teams producer has to be updated. Once this is done the other systems can be updated to use version 2 of the schema.

In the next evolution we can remove “material”:

{
  "type": "record",
  "name": "Product",
  "namespace": "com.ecommerce",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "price", "type": "bytes", "logicalType": "decimal", "scale": 2},
    {"name": "availability", "type": "boolean"},
    {"name": "attributes", "type": {
      "type": "map",
      "values": "string"
    }},
  ],
  "version": "3"
}

This is a backwards compatible change and when using a schema registry you could chose BACKWARD as compatibility type. After publishing the new schema all consumers have to be adapted to this new version. Once this is done - and thus all references to material are removed - the product-team can update their systems to use the new schema version.

Monitoring & Testing

Schema evolutions should be monitored and tested. This way you could quickly identify errors and take appropriate actions. Automated regression tests can be of high value as well. Also you may want to check your metrics and monitoring tools while conducting a schema evolution. Kafka already produces some consumer and producer metrics by itself. If your record-send-rate or records-consumed-rate drop after your evolution – and do not rise to familiar levels after a while – maybe something went wrong. Looking into your record-error-rate will show you how many record sends fail per second.

Producers and consumers throw exceptions in case of errors during sending or reading messages. Assuming these errors are logged sufficiently, a higher rate of exceptions after you have evolved your schema might indicate problems or gaps in your evolution process. Examples for exceptions that might be relevant during your schema evolution might be SerializationException, RecordDeserializationException and AvroTypeException.

You also may utilize specialized tools for schema linting which can be used before publishing a new schema version. ajv and jsonschema are two examples which you could consider. Such tools can ensure that changes in your schema adhere to your chosen compatibility guidelines. You could integrate these tools into your CI/CD pipelines as well. Alternatively you may use a schema registry like Confluent Schema Registry - it can take care of compatibility tests and version control of your schemas.

Conclusion

Using Kafka, the utilization of a schema can help you in documentating your data structures properly. Established approaches and tools exist to assist you in conducting your schema evolution flawlessly. As shown in the extreme case, sometimes your approach may be highly influenced by your given context. For successful realization of your schema evolution you should deliberately plan ahead and communicate clearly.