Server-sent Events in .NET mit Akka

Die Ausgangslage

In meinem ersten kleinen .NET-Projekt wurde das Senden von Server-sent Events für die Kommunikation zwischen zwei Microservices (Reservieren und Ausleihen) im Container Bootcamp benötigt. Der Vorteil, den Server-sent Events mitbringen, ist, dass keine zentrale Message-Komponente wie (ActiveMq, Kafka, …) benötigt wird. Server-sent Events sind Bestandteil der HTML5 Spezifikation und verwenden HTTP als Transport-Protokoll.

Die Suche nach Bibliotheken

Nachdem - wie oben beschrieben - so gut wie keine Bibliothek zu finden war, hieß es, sich die Finger schmutzig machen. Einen ersten Teilaspekt der späteren Lösung lieferte mir dieser Blogpost. Hier war zu sehen, wie ein selbst implementiertes IActionResult aussehen muss und wie in .NET Strings in die HTTP-Response geschrieben werden. In einem früheren Scala Projekt hatte ich bereits einmal eine manuelle Behandlung der HTTP-Verbindungen in Akka implementiert. Als Freund von Akka schaute ich, ob eine Akka-Implementierung für .NET existiert. Zu meiner Überraschung war Akka für .NET verfügbar. Jetzt galt es, beides miteinander zu verbinden.

Die Lösung

Akka basiert auf dem Konzept von Aktoren. Die Idee war nun, einen Aktor zu implementieren, der alle offenen Verbindungen hält, und einen weiteren, der die eigentliche Verbindung repräsentiert.

Initialisierung der Aktoren

Um die nötigen Aktoren erzeugen zu können, wird zuerst ein ActorSystem benötigt. Dieses wird in der Startup.cs erzeugt, ebenso wie der Aktor, der das spätere Connection-Handling übernimmt. Da alle Aktoren vom gleichen Typ IActorRef sind, werden Wrapper-Klassen zur Unterscheidung bei der Dependency-Injection verwendet.

Startup.cs

...
var system = ActorSystem.Create("reservieren", config);
...
var eventConnectionHolder = system.ActorOf(ConnectionHolder.props(), "event-connection-holder");
...
pServices.AddTransient(typeof(IEventConnectionHolderActorRef), pServiceProvider => new EventConnectionHolderActorRefActorRef(eventConnectionHolder));
...

Der Controller

Der Controller bekommt via Dependency-Injection den ConnectionHolder übergeben. Er stellt zudem einen Endpunkt /events bereit, auf den sich die Clients später verbinden können.

Wird der Endpunkt nun aufgerufen, wird eine neue Instanz von PushActorStreamResult erzeugt und als Parameter werden der ConnectionHolder sowie der Rückgabe- Content-Type übergeben.

ReservierenController.cs

...
public IActionResult Events() => new PushActorStreamResult(_connectionHolderActorRef, "text/event-stream");
...
PushActorStreamResult

Dadurch, dass PushActorStreamResult das Interface IActionResult implementiert, bekommt man in der Methode ExecuteResultAsync über den Kontext Zugriff auf den Response-Stream. Gleichzeitig wird bereits der Content-Type für die HTTP-Antwort gesetzt. Der ConnectionHolder bekommt die Nachricht, dass eine neue Verbindung geöffnet worden ist. In dieser Nachricht werden der Response-Stream und das CancelationToken mit übergeben.

In den Beispielen, die ich mir anschaute oder ausprobierte, war es so, dass die Verbindung nach Ende des Streams geschlossen wurde. Der Client müsste dann zeitgesteuert immer wieder eine Verbindung öffnen und die neuen Events abholen. Dies würde einen gewissen Zeitversatz bedeuten. Mir war es wichtig, diesen Zeitversatz unter normalen Umständen so gering wie möglich zu halten.

Hier kommt das Ask zum Tragen. Ein Ask in der Akka-Bibliothek gibt einen Task zurück, den gleichen Rückgabe-Typ, den auch ExecuteResultAsync besitzt. Normalerweise erwartet man die Antwort innerhalb eines gewissen Zeitraumes, so dass beim Aufruf des Ask ein Timeout übergeben wird. In diesem speziellen Fall ist der Timeout bewusst nicht gesetzt: So gibt das Ask einen Task zurück, der sich normalerweise erst einmal nicht beendet und so die Verbindung offen hält.

PushActorStreamResult.cs

...
 public Task ExecuteResultAsync(ActionContext pContext)
         {
             var stream = pContext.HttpContext.Response.Body;
             pContext.HttpContext.Response.GetTypedHeaders().ContentType = new MediaTypeHeaderValue(_contentType);
 
             return _connectionHolderActorRef.GetActorRef()
                                             .Ask(new ConnectionHolder.NewConnection(
                                                          stream, pContext.HttpContext.RequestAborted));
         }
 ....
Der ConnectionHolder

Akka bietet die Möglichkeit, dass ein Aktor weitere Aktoren erzeugen kann, die in der Hierarchie unter dem erzeugenden Aktor angesiedelt sind. Der ConnectionHolder erzeugt weitere Kind-Aktoren, wobei ein Kind-Aktor eine einzelne Verbindung repräsentiert.

ConnectionHolder.cs

...
 protected override void OnReceive(object pMessage)
        {
            switch (pMessage)
            {
                case NewConnection c:
                    Context.ActorOf(ConnectionHandler.props(c.Stream, c.CancellationToken));
                    break;
            }
        }
...

Der ConnectionHandler

Der ConnectionHandler ist im Gegensatz zum ConnectionHolder deutlich umfangreicher und hat verschiedene Aufgaben:

  • Lesen der Akka-Persistence Events
  • Schreiben der Server-sent Events in den Response-Stream
  • Zeitgesteuertes Senden des Heartbeats, um die Verbindung offen zu halten.
  • Zeitgesteuertes Überprüfen, ob die Verbindung noch besteht.
Event-Store

Arbeitet man mit Events, wird ein Store benötigt, um die Events zu Speichern. So können jederzeit alle Events neu gesendet werden oder nur eine Teilmenge ab einer bestimmten Id. Hierzu wird Last-Event-ID im Request-Header gesetzt.

In diesem Projekt wird Akka-Persistence als Event-Store verwendet. Über Akka-Persistence-Query wird eine Verbindung mit dem Event-Store aufgebaut. Als Datenbank wird in diesem Projekt eine PostgreSQL verwendet. Es können jedoch auch andere Datenbanken, wie z.B. MongoDB, verwendet werden.

Schreiben der Server-sent Events in den Response-Stream

Wird ein neues Event in den Event-Store geschrieben, wird dieses automatisch über Akka-Persistence-Query gelesen. Der ConnectionHandler schickt das Event als Akka-Nachricht an sich selbst und schreibt es dann in den Response-Stream der Verbindung.

Zeitgesteuertes Senden des Heartbeats, um die Verbindung offen zu halten.

Um dem Client zu signalisieren, dass die Verbindung noch besteht, wird in regelmäßigen Zeitabständen ein :heartbeat als Server-sent Event Kommentar gesendet. Ein Kommentar wird vom Client ignoriert und hält lediglich die Verbindung offen.

Der Heartbeat wird über einen Akka-Scheduler erzeugt. Es wird hierbei ein scheduleTellOnce verwendet. Nach dem Senden eines Heartbeats wird ein neuer Scheduler für den nächsten Heartbeat gestartet. Somit ist sichergestellt, dass erst dann ein neuer Hearbeat gesendet wird, wenn der aktuelle erfolgreich gesendet wurde.

Zeitgesteuertes Überprüfen, ob die Verbindung noch besteht.

Das Überprüfen, ob die Verbindung noch geöffnet ist, funktioniert nach dem gleichen Scheduler-Prinzip wie beim Heartbeat. In der aufgerufenen Funktion ConnectionIsClosed kommt das weiter oben erwähnte CancelationToken zum Tragen. Dieses wird regelmäßig aufgerufen, um zu überprüfen, ob die Verbindung noch geöffnet ist. Wurde die Verbindung vom Client geschlossen, wird der Aktor über eine PoisonPill beendet und damit auch serverseitig die Verbindung geschlossen.

Die Client-Seite

Im Gegensatz zur Server-Seite gibt es zum Konsumieren von Server-sent Events verschiedene Bibliotheken. Hier kam die Bibliothek von 3ventic/EvtSource im Projekt zum Einsatz.

Über den Tellerrand

Mittlerweile ist diese Art der Implementierung in verschiedenen Projekten auch produktiv zum Einsatz gekommen und hat sich dabei bewährt.

Je nach Anforderung kann der ConnectionHolder mehr Aufgaben übernehmen. So könnte zum Beispiel das zu sendende Event an den ConnectionHolder gesendet werden und dieser verteilt es dann an die einzelnen ConnectionHandler.

Der geneigte Leser wird festgestellt haben, dass im oben gezeigten Beispiel der Request-Header Last-Event-ID nicht berücksichtigt wird. Dieses einzubauen, sei dem Leser als Übung überlassen.

Links

TAGS

Kommentare

Um die Kommentare zu sehen, bitte unserer Cookie Vereinbarung zustimmen. Mehr lesen