This blog post is also available in English

Einleitung

Ob Kafka für Eventsourcing, für IoT Anwendungen oder aber zur Integration von verschiedenen Systemen verwendet wird: früher oder später kann es sinnvoll sein, ein Schema für die publizierten bzw. konsumierten Daten zu verwenden. Ein Schema kann dabei helfen, Datenkonsistenz herzustellen, was insbesondere bei Organisationen mit mehreren voneinander unabhängig operierenden Teams wichtig ist. Außerdem ermöglicht es die Validierung von Daten, bevor diese in Kafka geschrieben werden. Für Entwicklungsabteilungen bieten die im Schema festgehaltenen Datenstrukturen außerdem eine klare Dokumentation.

Schema Evolution

Software lebt - und während sie wächst kommt es irgendwann vor, dass neue bisher noch nicht geteilte Daten in Kafka landen. Auch kann es vorkommen, dass ein Team das Daten über Kafka teilt, die Bezeichnung ihrer in Kafka landenden Daten verändert oder nicht mehr benötigte Daten entfernen möchte. Sobald das passiert, muss das existierende Schema angepasst und eine neue Version publiziert werden. Dieser Prozess nennt sich Schema Evolution.

Datenstrukturen verändert klingt vielleicht erstmal eher simpel, bringt aber eine Reihe von Konsequenzen mit sich. Wenn ein Team das Daten produziert anfangen möchte, Bezeichnungen von Daten zu verändern, müssen konsumierende Teams ihre Systeme anpassen, um das neue Schema verwenden zu können. Wenn gleichzeitig Bezeichnungen von Daten verändert und in der neuen Schemaversion weitere Daten hinzugefügt werden, entsteht plötzlich ein höherer Aufwand - es können nicht einfach nur die gewünschten neuen Daten bezogen werden. Wer aktualisiert den Stand eigentlich zuerst? Produzierende oder konsumierende Systeme? Wie koordiniert man das ganze?

Schema Evolution ist ein wichtiger Aspekt bei der Verwaltung von Daten, insbesondere wenn diese von verteilten Systemen verwendet werden, und sollte mit Bedacht durchgeführt werden.

Hilfsmittel

Bevor wir mögliche Vorgehensweisen und Lösungsansätze für die vorherigen Situationen erörtern, möchte ich euch zuerst einmal einige Hilfsmittel für die (De-)Serialisierung von Daten vorstellen. Apache Avro ist ein open source Datenserialisierungs Tool. Ein Schema in Avro kann unter anderem in JSON definiert werden und ist plattformunabhängig verwendbar. Avro unterstützt Datenkomprimierung, wodurch geringerer Speicherbedarf und effizienterer Datentransport möglich werden. Dynamische Typisierung ist in Avro ebenfalls möglich: per union können Daten flexibler ohne strikte Definition im Schema hinzugefügt werden. Bei Verwendung von Avro können gleichzeitig neuere und ältere Versionen des Schemas koexistieren, wodurch Systeme die neue Datenfelder nicht benötigen auch bei einer älteren Version des Schemas bleiben können.

MessagePack ist ein alternatives leichtgewichtiges Tool das für die (De-)Serialisierung von Daten verwendet werden kann. Datenstrukturen können JSON-artig deklariert werden, die Nachrichten werden dann beim Serialisieren als Bytefolgen codiert. Dadurch sind auch hier die Daten komprimiert und können effizienter transportiert werden. Außerdem ist MessagePack ebenfalls plattformunabhängig. Datenstrukturen können erweitert werden ohne Kompatibilität mit älteren Versionen zu beeinträchtigen, allerdings können Änderungen an bereits bestehenden Daten erhöhte Komplexität erzeugen, insbesondere wenn Rückwärtskompatibilität sichergestellt werden soll.

Apache Thrift ist eine weitere Alternative für die Datenserialisierung und baut auf einer eigenen Interface Description Language (IDL) auf. Hier werden Datenstrukturen und Service Schnittstellen in einer „thrift“-Datei definiert. Beim Kompilieren mit dem thrift-Compiler entsteht Code, der es Clients und Servern erlaubt, mittels Remote Procedure Calls (RPC) miteinander zu kommunizieren. Thrift ist ebenfalls Cross-Language kompatibel und unterstützt eine Reihe von Programmiersprachen. Interoperabilität wird gewährleistet indem die Server und Clients - die untereinander Nachrichten austauschen - eine gemeinsame thrift Schnittstelle verwenden. Auch bei Thrift können Änderungen von Datenfeldern Herausforderungen mit sich bringen, wenn Rückwärtskompatibilität gewährleistet werden soll.

Protocol Buffers ist ein weiteres beliebtes Tool das zur Serialisierung von Daten verwendet werden kann. Datenstrukturen werden hier in einer .proto-Datei definiert und können auch dazu verwendet werden, um Code in verschiedenen Programmiersprachen zu generieren mit dem eure definierten Entitäten manipuliert werden können. Protobuf unterstützt Nachrichtengrößen bis zu einigen wenigen MB und bietet von sich aus keine Komprimierung an. Die bereitgestellte Dokumentation ist umfangreich, beispielsweise gibt es neben Best Practices für zu verwendende Datentypen auch Tutorials und Referenzen für verschiedene Programmiersprachen. Ein Style Guide für den Aufbau der .proto-Files wird ebenfalls bereitgestellt.

Avro & Kafka, Tooling

In auf Kafka aufbauenden Systemen scheint Avro meiner Erfahrung nach eine beliebte Wahl zu sein und daher schauen wir da mal tiefer rein. Avro bringt Serializer und einen Deserializer mit, die direkt von KafkaProducern bzw. KafkaConsumern verwendet werden können. Außerdem lassen sich die (De-)Serializer auch nahtlos in Kafka Connectoren einbinden. Bei der Verwendung von Kafka Streams kann Avro ebenfalls für die Verarbeitung von Datenstreams verwendet werden.

Avro stellt auch CLI tooling zur Verfügung. Mit diesen avro-tools können beispielsweiße per fromjson Datenstrukturen aus JSON in das Avro-Format konvertiert werden. Mit getmeta können aus einer solchen Avro-Datei Metadaten extrahiert werden. Avro benutzt eine eigene IDL in der Datenstrukturen in einem einfach lesbaren Format deklariert werden können. Per compile kann aus den IDL Dateien Code für eure Anwendung generiert werden.

Ein Produkt könnte in Avros IDL beispielsweiße so beschrieben werden:

record Produkt {
  string name;
  union { null, string } beschreibung; //optionales Feld
  boolean verfuegbarkeit = true;
}

In Kombination mit Avro wird häufig ein Schema Register verwendet um Schemainformationen zentral zu speichern und zu verwalten. Confluent Schema Registry ist ein bekanntes Register das hierfür verwendet werden und auch die Schemakompatibilität validieren kann. AWS Glue Schema Registry ist ein alternatives Schema Register.

Schemahersteller registrieren und versionieren das Schema im Register. Ab dann kann das Schema verteilt oder [verlinkt](https://docs.confluent.io/platform/current/schema-registry/schema-linking-cp.html#what-is-schema-linking) werden. Producer und Consumer können dieses Schema dann beziehen und benutzen es, um ihre Daten zu (de-)serialisieren. Kafka kann die ankommenden Daten gegen das von der Registry bereitgestellte Schema validieren.
Schemahersteller registrieren und versionieren das Schema im Register. Ab dann kann das Schema verteilt oder verlinkt werden. Producer und Consumer können dieses Schema dann beziehen und benutzen es, um ihre Daten zu (de-)serialisieren. Kafka kann die ankommenden Daten gegen das von der Registry bereitgestellte Schema validieren.

Vorgehensweisen

Sobald ein Schema weiterentwickelt wird, d.h. eine neue Version dessen entsteht, existieren zwei Versionen dieses Schemas. Grundsätzlich ist das bei der Entwicklung von verteilten System durchaus eine erwünschte Situation, denn sonst müssten alle verschiedenen Systeme die auf dem Schema aufbauen zeitgleich aktualisiert werden. Das wäre ein schwer zu koordinierender Aufwand und das Fehlerpotential ist hoch.

Bevor aber eine neue Schema Version veröffentlicht werden kann, müssen wir uns Gedanken über die Kompatibilität der verschiedenen Versionen machen. Schließlich könnten verschiedene Systeme eine voneinander unterschiedliche Version des Schemas verwenden. Eine neue Schemaversion kann rückwärtskompatibel sein. In dem Fall sollten Systeme, die auf der aktuelleren Version aufbauen, in der Lage sein, Nachrichten zu lesen, die von Systemen mit älterer Version in Kafka geschrieben werden. Damit die neue Version rückwärtskompatibel ist, dürfen nur Felder gelöscht werden und Felder die neu hinzukommen müssen optional sein. Bei einer derartigen Schema Evolution müssen die Consumer als erstes auf die neue Schema Version aktualisiert werden. Wenn ein Schema BACKWARD kompatibel ist, überprüft die Confluent Schema Registry ob die letzte Schema Version mit der aktuellen kompatibel ist. Bei BACKWARD_TRANSITIVE werden alle Vorgängerversionen auf Kompatibilität geprüft.

Bei einem FORWARD kompatiblen Schema dürfen neue Felder hinzugefügt werden. Es dürfen allerdings nur optionale Felder aus dem Schema gelöscht werden. Wenn eine neue vorwärtskompatible Schema Version entwickelt wird, müssen die Producer zuerst auf diese Version aktualisiert werden. Bei FORWARD_TRANSITIVE werden alle Vorgängerversionen auf Kompatibilität geprüft.

Neue Schemata bei denen nur optionale Felder hinzugefügt oder gelöscht werden sind vorwärts- und rückwärtskompatibel. Der Kompatibilitätstyp hierbei ist FULL, wenn nur die direkte Vorgängerversion oder FULL_TRANSITIVE, wenn alle Vorgänger auf Kompatibilität geprüft werden sollen. Bei solchen Schema Evolutionen ist die Reihenfolge der Aktualisierung der Producer und Consumer nicht wichtig und braucht nicht gesteuert werden.

Bei NONE ist die Kompatibilitätsprüfung der Registry ausgeschaltet und es sind alle Änderungsarten erlaubt. Allerdings kann der Koordinierungsaufwand in diesem Fall höher sein. Die Aktualisierung der Producer und Consumer sollte mit Bedacht orchestriert und muss in manchen Fällen möglichst zeitgleich durchgeführt werden. Eine solche Evolution sollte außerdem besonders gut überwacht und getestet werden.

Diese Tabelle fasst die verschiedenen Kompatibilitätsmodi zusammen und stammt von https://docs.confluent.io/platform/current/schema-registry/fundamentals/schema-evolution.html#summary
Diese Tabelle fasst die verschiedenen Kompatibilitätsmodi zusammen und stammt von https://docs.confluent.io/platform/current/schema-registry/fundamentals/schema-evolution.html#summary

Best Practices

Sofern sichergestellt ist, dass optionale Felder von den beteiligten Systemen korrekt benutzt werden, können diese relativ frei gelöscht bzw. hinzugefügt werden. Grundsätzlich können Umbenennungen von bestehenden Feldern problematisch sein und sind eher nicht empfehlenswert. Falls das aus irgendwelchen Gründen dennoch zwingend notwendig ist, dann kann das etwas leichter koordinierbar über zwei Schema Evolutionen durchgeführt werden. In der ersten Evolution kann zunächst ein neues Feld angelegt werden anstatt wie ursprünglich geplant, die Feldbezeichnung zu verändern. Gleichzeitig wird das alte Feld als deprecated dokumentiert bzw. kommuniziert. Sobald alle Konsumenten das neue Feld verwenden, kann in einer folgenden Evolution das veraltete Feld entfernt werden, wodurch effektiv das Feld umbenannt ist.

Es empfiehlt sich das Löschen von bestehenden Feldern separat von dem Hinzufügen von neuen Feldern durchzuführen. Dadurch kann das Schema leichter weiterentwickelt werden, denn Consumer und Producer können so unabhängig voneinander aktualisiert werden. Sofern kein Schema Register verwendet wird, sollte eine manuelle Versionskontrolle der Schemata durchgeführt werden. So können Änderungen besser nachverfolgt und notfalls ältere Versionen wiederhergestellt werden. Außerdem empfiehlt es sich in dem Fall eigene Kompatibilitätstests durchzuführen.

Um eine reibungslose Schema Evolution durchzuführen sollten anstehende Änderungen sorgfältig geplant und kommuniziert bzw. dokumentiert werden. In einer Event-Driven Architektur kann es sich auch lohnen, die asynchronen Schnittstellen über Tools wie AsyncAPI zu dokumentieren. Es ist empfehlenswert bewusst eine API Governance auszuarbeiten, anstelle diesen Prozess vollkommen frei ablaufen zu lassen. In dem Zuge können Richtlinien ausgearbeitet und notwendige Kommunikationenswege beschlossen werden, damit die unterschiedlichen Teams auf vertrauten Abläufen aufbauen und nach gemeinsam definierten Regeln arbeiten können.

Schema Evolution anhand eines Beispiels

Angenommen wir entwickeln eine verteilte E-Commerce Webanwendung zum Vertrieb von Bauelementen wie Stützen, Treppen, und Fassadenelementen aus Holz und Beton. Eines der Systeme möchte anfangen, Produkte in Kafka zu schreiben. Wir benutzen Avro und verzichten für das Beispiel auf ein Schema Register. Am Anfang möchte das produzierende Team die Produkte in Form von ID, Namen, Preis, Material, und optionaler - da nicht immer vorhandener - Beschreibung des Produkts mit anderen Systemen teilen. Ferner werden Attribute bereitgestellt, die beispielsweise die Dimensionen oder Lastdurchleitung der Stützen, oder andere bautechnisch relevante Informationen der Produkte beinhalten können.

Eine Schema Definition für Produkte könnte so aussehen:

{
  "type": "record",
  "name": "Produkt",
  "namespace": "com.ecommerce",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "preis", "type": "bytes", "logicalType": "decimal", "scale": 2},
    {"name": "material", "type": "string"},
    {"name": "attribute", "type": {
      "type": "map",
      "values": "string"
    }},
    {"name": "beschreibung", "type": ["string", "null"]}
  ],
  "version": "1"
}

Es gab bisher noch kein Schema für Produkte, daher kann das Schema problemlos eingeführt werden.

Ein anderes bereits existierendes System, das eine allgemeine „Suche“ innerhalb der Webanwendung ermöglicht, fängt nun an die Produkte zu konsumieren und in einem neuen Produkt-Suche Feature anzuzeigen. Zusätzlich baut das Sucheteam in dem neuen Feature einen Filter ein in dem der Preis eingegrenzt und die Materialien der gesuchten Bauelemente eingeschränkt werden können.

Wir befinden uns in turbulenten Zeiten und es passieren gleichzeitig mehrere Dinge, die Einfluss auf das Unternehmen und damit auch die Webanwendung haben können. Die Marktbegleiter des fiktiven Unternehmens wachsen, insbesondere bei Holzbauteilen - deren Produkte scheinen in diesem Segment einfach besser zu sein. Die aktuelle wirtschaftliche Lage ist ohnehin nicht besonders gut. Daher entscheidet die strategische Abteilung des Unternehmens aufgrund der Gegebenheiten sich dazu, das Unternehmen auf Betonelemente zu spezialisieren und die Produktion und den Vertrieb von Holzelementen zeitnah vollständig einzustellen. Bisher wurden die Produkte auf dem Webshop nur vermarktet. Die Nachfrage nach eigenen Betonelementen allerdings ist immens und die Verkaufsabteilung kommt nicht mehr hinterher: viel zu viele E-Mails und die Telefone sind auch ständig besetzt! Es wird dringend ein Kauf Feature für die E-Commerce Anwendung benötigt um die Verkaufsabteilung zu entlasten. Ferner stellt das Produktteam der Webanwendung fest, dass die anderen Teams die „Beschreibung“ der Produkte gar nicht benötigen: die Produkte werden anscheinend erfolgreich durch die Suche innerhalb des Namens und der Attributslisten und teilweise direkt über den Kategoriefilter gefunden. Ausser vom Produktteam wird die Beschreibung von keinem anderen Team benötigt.

Es ergeben sich daraus mehrere Änderungen für die Anwendung:

Anforderungen A1. und A2. haben vergleichbar hohe Priorität: die Möglichkeit, Produkte online zu kaufen, entlastet das Verkaufsteam. Entfernung sämtlicher Bauelemente aus Holz hat zur Folge, dass die Produktion davon schneller eingestellt und damit die Fokussierung des Unternehmens auf Beton schneller durchgeführt werden kann. Bei begrenzten Ressourcen müsste das Entwicklungsteam koordiniert werden um zu priorisieren, welche Änderungen zuerst durchgeführt werden. Bei ausreichenden Ressourcen könnten alle Teams gleichzeitig an den Features arbeiten. Für gewöhnlich wird eines der Teams schneller fertig und es ergibt sich dadurch eine natürliche Reihenfolge in der die Features in Produktion gehen können.

Für unser Beispielszenario gehen wir mal einfach davon aus, das sämtliche Features gleichzeitig fertig werden. Für diesen Fall wäre es ratsam, dass die Entscheidungsebenen der Organisation im voraus eine Präferenz gewählt und an die Entwicklungsteams kommuniziert haben. Für unser Beispiel gehen wir weiter davon aus, dass bisher keine Präferenz gefunden worden ist und der Informationsfluss zwischen Entwicklungsteam und Entscheidungsebenen langsam ist. In diesem Fall sollten die Entwicklungsteams so schnell wie möglich untereinander eine Entscheidung treffen, denn sowohl die Entlastung des Verkaufsteams als auch die Spezialisierung sind von immens hoher Priorität.

Neben der Einführung eines neuen Systems, das den Warenkorb und Online Kauf ermöglicht, und diversen Änderungen zur Entfernung der Holzbauteilen, muss auch das Schema angepasst werden. Damit die neuen gewünschten Anforderungen möglich sind, werden unter anderem folgende Änderungen am Schema benötigt:

S3 ist für die kritischeren Anforderungen A1 und A2 nicht wichtig und hat damit keine hohe Priorität. Aber unsere Datenstruktur wird kleiner und unser Nachrichtenaustausch wird effizienter. Deshalb möchten wir das ebenfalls zeitnah machen - eventuell können wir auf eine separate Schema Evolution nur für das Entfernen des optionalen Felds verzichten.

Wenn wir uns an die Best Practices erinnern, fällt schnell auf, dass wir zwei Schema Änderungen haben die voneinander getrennt durchgeführt werden sollten: S1 und S2. Außerdem kann S3 in Kombination mit S1 durchgeführt werden.

Die Schema Änderung für S1+S3 ist FORWARD kompatibel, das heißt Producer müssten zuerst auf das neue Schema angepasst werden. Änderung S2 ist BACKWARD kompatibel, das heißt die Consumer müssen zuerst auf die neue Version aktualisiert werden.

In dem bisher beschriebenen Szenario sind die Entwicklungsteams frei in der Entscheidung der Evolutionsreihenfolge. Unter der Annahme, dass alle Teams gleich schnell in der - in Folge der Schema Evolution - stattfindenden Aktualisierung der Producer und Consumer sind, kann die Reihenfolge vollkommen willkürlich entschieden werden. Im Grunde könnte man auch Anhand der Anzahl der anzupassenden Consumer und Producer entscheiden. Sofern Consumer schneller angepasst werden können, sollte beispielsweise zuerst die Schema Änderung S2 durchgeführt werden. In diesem Fall verschwindet „Material“ schneller aus der Anwendung und das Unternehmen kann die Produktion von Holzbauteilen einstellen.

In unserem Fall haben wir nur einen Producer und mehrere Consumer. Daher fangen wir mit der vorwärtskompatiblen Anpassung an. Bei Verwendung eines Schema Registers kann dazu als Kompatibilitätstyp FORWARD gewählt werden. Im ersten Schritt können wir jetzt also die „Beschreibung“ entfernen und die „Verfügbarkeit“ hinzufügen:

{
  "type": "record",
  "name": "Produkt",
  "namespace": "com.ecommerce",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "preis", "type": "bytes", "logicalType": "decimal", "scale": 2},
    {"name": "material", "type": "string"},
    {"name": "verfuegbarkeit", "type": "boolean"},
    {"name": "attribute", "type": {
      "type": "map",
      "values": "string"
    }},
  ],
  "version": "2"
}

Nach Publikation dieser neuen Schema Version muss im nächsten Schritt als erstes der Producer des Produktteams angepasst werden. Sobald das erledigt ist, können die anderen Systeme auf Version 2 des Schemas aktualisiert werden.

In der nächsten Evolution kann dann „Material“ aus dem Schema entfernt werden:

{
  "type": "record",
  "name": "Produkt",
  "namespace": "com.ecommerce",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "preis", "type": "bytes", "logicalType": "decimal", "scale": 2},
    {"name": "verfuegbarkeit", "type": "boolean"},
    {"name": "attribute", "type": {
      "type": "map",
      "values": "string"
    }},
  ],
  "version": "3"
}

Das ist eine rückwärtskompatible Anpassung und bei Verwendung eines Schema Registers kann als Kompatibilitätstyp für diesen Schritt BACKWARD gewählt werden. Sobald das Schema publiziert ist, müssen zuerst die Consumer auf die neue Version angepasst werden. Sobald alle Consumer aktualisiert sind und es somit keine Referenzen mehr auf Material gibt, kann im nächsten Schritt das Produktteam auf die neue Schema Version aktualisieren.

Monitoring & Testing

Schema Evolutionen sollten gut überwacht und getestet werden, um eventuell auftretende Fehler schnell identifizieren und entsprechende Maßnahmen treffen zu können. Automatisierte Regressionstests können diesen Prozess erheblich vereinfachen. Außerdem kann es helfen, vorhande Metriken zu überprüfen oder Monitoringtools während der Evolution zu verwenden. Kafka produziert von sich aus bereits Consumer- und Producermetriken. Beispielsweiße könnte ein sinken der record-send-rate oder der records-consumed-rate nach erfolgter Evolution Probleme signalisieren, sofern sich die Raten nicht nach einer Weile wieder auf ein übliches Niveau begeben. Ansonsten lohnt sich vielleicht auch ein Blick auf die record-error-rate um herauszufinden wieviele Schreibversuche pro Sekunde fehlschlagen.

Producer und Consumer können beim schreiben bzw. lesen Exceptions werfen - sofern diese ordentlich geloggt werden, kann ein erhöhtes Aufkommen an Exceptions nach einer erfolgten Evolution Probleme oder Lücken im Aktualisierungsprozess aufzeigen. Beispiele für Exceptions die während einer Schema Evolution relevant sein können sind SerializationException, RecordDeserializationException und AvroTypeException.

Es gibt auch spezielle Schema Linting Tools, die vor der Publikation einer neuen Schemaversion verwendet werden können. Beispiele dafür sind ajv oder jsonschema. Solche Tools können sicherstellen, dass Schemaänderungen den gewählten Kompatibilitätsrichtlinien entsprechen und können beispielsweise in die CI/CD Pipeline integriert werden. Alternativ dazu kann ein Schema Register wie z.B. Confluent Schema Registry verwendet werden. Neben bereits vorhandenen Kompatibilitätstests kümmert sich diese Registry auch um die Versionskontrolle der Schemata.

Fazit

Beim Einsatz von Kafka kann die Verwendung eines Schemas dabei helfen, die Datenstrukturen ordentlich zu dokumentieren. Es gibt eine Reihe von bewährten Ansätzen und Hilfsmitteln, die es einfacher machen können eine Schema Evolution fehlerfrei durchzuführen. Wie das Beispiel zeigt, kann das Vorgehen dabei stark vom gegebenen Kontext abhängig sein. Um eine Schema Evolution erfolgreich durchzuführen, sollte diese sorgsam geplant und kommuniziert werden.