Server-sent events in .NET with Akka

The starting point

In my first small .NET project, the use of server-sent events for the communication between two microservices (reservation and lending) at the Container Bootcamp was required. Server-sent events have the advantage of not needing a central message component (such as ActiveMq, Kafka, …). Server-sent events are part of the HTML5 specification and use HTTP as transport protocol.

The search for libraries

However, I couldn’t find a fitting library, so it was time to take matters into my own hands. I discovered a first aspect of my eventual solution in this blogpost. It describes a self-implemented IActionResult, and shows how to write strings to the HTTP response in .NET. In a previous Scala project, I had already implemented a manual HTTP connection handling in Akka.

As a fan of Akka, I checked whether an Akka implementation for .NET exists. To my surprise, Akka for .NET was available. Now, I only needed to combine the two.

The solution

Akka is based on the concept of actors. My idea was to implement an actor that holds all open connections and another one representing the actual connection.

Initialization of actors

In order to be able to generate the necessary actors, an actor system is required. This is created in the Startup.cs, as well the actor, which will take over the connection handling later on. Since all actors are of the same type IActorRef, wrapper classes are used to distinguish the actors for the dependency injection.

Startup.cs

...
var system = ActorSystem.Create("reservieren", config);
...
var eventConnectionHolder = system.ActorOf(ConnectionHolder.props(), "event-connection-holder");
...
pServices.AddTransient(typeof(IEventConnectionHolderActorRef), pServiceProvider => new EventConnectionHolderActorRefActorRef(eventConnectionHolder));
...

The controller

The Controller receives the ConnectionHolder via dependency injection. It also provides an /events endpoint to which the clients can connect later.

When the endpoint is called, a new instance of PushActorStreamResult is created and the ConnectionHolder and the return value of the Content-Type are provided as parameters.

ReservierenController.cs

...
public IActionResult Events() => new PushActorStreamResult(_connectionHolderActorRef, "text/event-stream");
...
PushActorStreamResult

As PushActorStreamResult implements the interface IActionResult, you get access to the context and the response stream in the method ExecuteResultAsync. At the same time, the content type for the HTTP response is set. The ConnectionHolder receives a message that a new connection has been opened. This message also transmits the response stream and the CancelationToken.

In the examples I looked at or tried out, the connection was closed at the end of the stream. The client would have to open a new connection over and over again and pick up the latest events. This would imply a certain time lag. It was important to me to keep this time delay as short as possible under normal circumstances.

This is where the Ask comes into play. An Ask in the Akka library returns a task of the same return type as ExecuteResultAsync. Usually, the answer is expected in a certain period of time, so that a timeout is set when the ask is called. In this specific case, the timeout is deliberately not set. Thus, the ask returns a task that normally does not end, keeping the connection open.

PushActorStreamResult.cs

...
 public Task ExecuteResultAsync(ActionContext pContext)
         {
             var stream = pContext.HttpContext.Response.Body;
             pContext.HttpContext.Response.GetTypedHeaders().ContentType = new MediaTypeHeaderValue(_contentType);
 
             return _connectionHolderActorRef.GetActorRef()
                                             .Ask(new ConnectionHolder.NewConnection(
                                                          stream, pContext.HttpContext.RequestAborted));
         }
 ....

The ConnectionHolder

Akka offers the possibility that one actor can generate additional actors, which are ranked under the generating actor in the hierarchy. The ConnectionHolder creates further child actuators, with each child actor representing a single connection.

ConnectionHolder.cs

...
 protected override void OnReceive(object pMessage)
        {
            switch (pMessage)
            {
                case NewConnection c:
                    Context.ActorOf(ConnectionHandler.props(c.Stream, c.CancellationToken));
                    break;
            }
        }
...

The ConnetionHandler

In comparison to the ConnectionHolder, the ConnectionHandler is much more comprehensive and has different tasks:

  • Reading the Akka persistence events
  • Writing the server-sent events to the response stream
  • Time-controlled sending of the heartbeat to keep the connection open
  • Time-controlled checks whether the connection still exists
Event store

When you work with events, you need an event store. This means, that all events can be resent at any time or only a subset of events, starting at a certain ID. In order for this to work, Last-Event-ID has to be set in the request header.

In this project, Akka persistence is used as an event store. A connection to the event store is established via Akka Persistence Query. A PostgreSQL database is used in this project. You can, however, also use other databases, such as MongoDB.

Time-controlled transmission of the heartbeat to keep the connection open

To signal to the client that the connection still exists, a :heartbeat is sent at periodic intervals as a server-sent event comment. A comment is ignored by the client and only keeps the connection open.

The heartbeat is created by an Akka scheduler. A scheduleTellOnce is used here. After sending a heartbeat, a new scheduler for the next heartbeat is started. This ensures that a new hearbeat is not sent until the current one has been successfully sent.

Time-controlled check whether the connection still exists

Checking whether the connection is still open, works according to the same scheduler principle as for the heartbeat. In the method ConnectionIsClosed, the above mentioned CancelationToken comes into effect. This is called periodically to check if the connection is still open. If the connection was closed by the client, the actor is closed via a PoisonPill and thus the connection is also closed on the server side.

The client side

Unlike for the server side, there are different libraries for server-sent events. In this project, I used the 3ventic/EvtSource library.

The bigger picture

In the meantime, this type of implementation has also been used productively in various projects and has proven its worth.

Depending on the requirements, the ConnectionHolder can take on more tasks. For example, an event could be sent to the ConnectionHolder and it distributes that event to the individual ConnectionHandlers.

The interested reader will have noticed that the request header Last-Event-ID is not taken into account in the example above. This is left to the reader as an exercise.

Links

TAGS

Kommentare

Um die Kommentare zu sehen, bitte unserer Cookie Vereinbarung zustimmen. Mehr lesen