Snippets

Reach Media Network REACH.ServiceBus

Created by Nathan Davis last modified

Overview

REACH.ServiceBus is an inter-process communication (IPC) framework designed to allow easy and transparent communication between software components on the same player device (e.g. REACH.DigitalSignage.exe to RDS Modular Service Host). The impetus for this project is that currently we have multiple different IPC mechanisms/implementations in use, each with their own drawbacks and deficiencies.

Architecture

REACH.ServiceBus is a hub-and-spoke design. It has a central "server" component (REACH.ServiceBus.exe) which listens for connections to a local-only named pipe. The named pipe has security implemented in such a way that it cannot be read/written to remotely.

REACH.ServiceBus.exe acts as a service registry/discovery for clients as well as a common communication rendezvous point. REACH.ServiceBus.exe shall be referred to as the "Broker".

REACH.ServiceBus uses a message passing mechanism to communicate between components. A message can be sent in one of two modes:

  1. RequireAcknowledgment: the caller requires the recipient to acknowledge receipt of the message by send an "Ack" or "Exc" message back. A Ack message indicates that the recipient processed the message with no error. An Exc mesasge indicates that an error occured delivering the message. This error could occur anywhere in the message delivery aparatus or in the recipient's handler code (business logic).
  2. FireAndForget: the caller will not wait for a reply. The recipient may send an Ack or Exc reply, but the calling client will simply ignore it.

The following handshake take place when a client connects to the Broker:

  1. Broker sends an "Identity" type frame. This frame contains the "ClientId" (a GUID) which uniquely identifies the client on the service bus.
  2. Client sends an "IdentityReply" type "Ack" frame to the server which contains the client's "CommonName". The CommonName can be configured in the client application and is distributed to other clients with any service advertisements.
  3. Broker sends a "ServiceRegistryNotification" type frame. This frame contains a listing of all the services currently advertised to the Broker by other connected clients.
  4. Client sends a null "Ack" to the Broker indicating the ServiceRegistryNotification frame was received.
  5. Client sends a "GetServerConfiguration" type frame.
  6. Broker sends a "GetServerConfigurationReply" type frame. Currently, this frame only contains valid watchdog reset range that the server will allow. See the Watchdog section for more details.
  7. If the client has any services to advertise, the client will send a "ServiceAdvertisement" type frame which contains a list of services the client is offering to other clients.
  8. If the client sends a ServiceAdvertisement, the Broker will reply with a "ServiceAdvertisementReply" type frame.

Any time the service registry (listing of client service advertisements) changes on the Broker, the Broker will send a ServiceRegistryNotification frame to all connected clients. ServiceRegistryNotification frames are sent using the FireAndForget delivery mode.

Service Advertisements

The ServiceBusClient code is currently designed such that all the service offerings a client wants to make must be configured prior to connecting to the Broker for the first time:

static void Main(string[] args) =>
    Task.Run(async () => {
        var cts = new CancellationTokenSource();

        Console.WriteLine("Press Ctrl+C to exit . . .");

        //monitor for Ctrl+C
        Console.CancelKeyPress += (s, e) => {
            e.Cancel = true;

            cts.Cancel();
        };

        try
        {
            var sbc = new ServiceBusClient {
                Logger = _logger //Serilog ILogger instance from somewhere
            };

            //this client will provide the ILogConsumer service to other clients
            //the class TestLogConsumer is an implementation of ILogConsumer
            sbc.AdvertiseService<ILogConsumer, TestLogConsumer>();

            //connect to the Broker
            await sbc.Connect(cts.Token);
        }
        catch(Exception e)
        {
            Console.WriteLine(e);
        }

        await cts.Token;
    }).Wait();

Service Definition

Services must be defined as an interface which provider and consumer clients have access to. Here is an example service defintion:

[ServiceDefinition(Version = "1.0.0.0")]
public interface ILogConsumer : IServiceBusProxy
{
    /// <summary>
    /// Processes multiple log items.
    /// </summary>
    /// <param name="logItems"></param>
    Task LogMessageBatch(LogItem[] logItems);
}

Technically, you could copy/paste this interface into both the provider and consumer applications. Messages are serialized and marshalled via JSON, so the provider and consumer need not reference the exact same assembly in order for the communication to work. The interface definition must match. That said, for the sake of everyone's sanity, let's just keep all the interface definitions in a common place!

The version property of the ServiceDefinition attribute would allow advertisement and consumption of different versions of the service.

All methods of the service MUST return a Task or Task<T>.

ILogConsumer need not implement IServiceBusProxy. This is only necessary if you would like to use the GetProviderDetails to acquire provider details after a service has been captured by client code.

Service Implementation

Services are then implemented as concrete classes:

[ServiceOffering(Version = "1.0.0.0", Lifestyle = ServiceLifestyle.Multiple)]
public class ConsoleLogConsumer : ILogConsumer
{
    public async Task LogMessageBatch(LogItem[] logItems)
    {
        Console.WriteLine($"{Environment.NewLine}======= Begin LogMessageBatch ======={Environment.NewLine}");

        int idx = 0;
        foreach (var logItem in logItems)
        {
            Console.WriteLine($"{(++idx).ToString().PadLeft(4, '0')} {logItem}{Environment.NewLine}");
        }

        Console.WriteLine($"{Environment.NewLine}======= End LogMessageBatch ======={Environment.NewLine}");
    }
}

The Version should match the Version of the interface if you want ServiceBusClient to automatically discover and wire up services for you.

The service "Lifestyle" determines if this service offering must be the only instance of this service offered on the service bus or if multiple advertisements for the same service can be made. This would be useful, for example, if you wanted to ensure there was only one ILogConsumer available to send logs to.

Example Application

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using Serilog;

using REACH.ServiceBus.Client;
using REACH.ServiceBus.Contracts;

namespace TestClient
{
    public enum LogItemSeverity
    {
        Dbg = 0,
        Inf = 1,
        Wrn = 2,
        Err = 3
    }

    public class LogItem
    {
        public Guid Id { get; set; } = Guid.NewGuid();
        public DateTime Timestamp { get; set; } = DateTime.UtcNow;
        public LogItemSeverity Severity { get; set; }
        public string Message { get; set; }
        public string Category { get; set; }
        public string Subcategory { get; set; }

        public override string ToString() => $"[{Severity}] {Timestamp} {Id} ({Category}/{Subcategory}) {Message}";
    }

    [ServiceDefinition(Version = "1.0.0.0")]
    public interface ILogConsumer : IServiceBusProxy
    {
        Task LogMessageBatch(LogItem[] logItems);
    }

    [ServiceOffering(Version = "1.0.0.0", Lifestyle = ServiceLifestyle.Singleton)]
    public class TestLogConsumer : ILogConsumer
    {
        public async Task LogMessageBatch(LogItem[] logItems)
        {
            Console.WriteLine($"{Environment.NewLine}======= Begin LogMessageBatch ======={Environment.NewLine}");

            int idx = 0;
            foreach (var logItem in logItems)
            {
                Console.WriteLine($"{(++idx).ToString().PadLeft(4, '0')} {logItem}{Environment.NewLine}");
            }

            Console.WriteLine($"{Environment.NewLine}======= End LogMessageBatch ======={Environment.NewLine}");
        }
    }

    class Program
    {
        private static CancellationTokenSource _cts;
        private static ServiceBusClient _sbc;
        private static ILogger _logger = new LoggerConfiguration()
            .MinimumLevel.Information()
            .WriteTo.Console(
                outputTemplate: "<{Level:u3}> {Timestamp:yyyy-MM-dd HH:mm:ss} {Message}{NewLine}{Exception}",
                theme: Serilog.Sinks.SystemConsole.Themes.AnsiConsoleTheme.Literate
            )
            .CreateLogger();

        static void Main(string[] _) => Main().Wait();

        static async Task Main() {
            var iAmConsumer = Environment.CommandLine.Contains("--be-a-consumer");
            var iAmProducer = !iAmConsumer;

            //monitor for Ctrl+C
            _cts = new CancellationTokenSource();
            Console.CancelKeyPress += (s, e) => {
                e.Cancel = true;

                _cts.Cancel();
            };

            _sbc = new ServiceBusClient { Logger = _logger };

            if (iAmConsumer)
            {
                //TestLogConsumer is Singleton, only advertise it if I'm the consumer
                _sbc.AdvertiseService<ILogConsumer, TestLogConsumer>();
            }

            //connect to the Broker
            await _sbc.Connect(_cts.Token);

            if (iAmProducer)
            {
                await ProduceMessages();
            }
            else
            {
                //we are consumer, just wait for user Ctrl+C
                _cts.Token.WaitHandle.WaitOne();
            }
        }

        private static async Task ProduceMessages() {
            //get a list of ILogConsumer instances. Should be either 0 or 1 instances.
            var logConsumers = _sbc.GetServices<ILogConsumer>();

            if (logConsumers.Count() > 0)
            {
                var logConsumer = logConsumers.First();

                _logger.Information($"Remote ILogCosumer found: {logConsumer.GetProviderDetails()}");

                //shovel messages until Ctrl+C is pressed
                var logItems = new List<LogItem>(500);
                var r = new Random();

                while (!_cts.IsCancellationRequested)
                {
                    //generate 500 log messages to send to the consumer
                    for (var i = 0; i < 500; ++i)
                    {
                        var severity = (LogItemSeverity)r.Next(0, 4);
                        var iterations = r.Next(5, 26);
                        var incantations = string.Join(", ", ArrayList.Repeat("All work and no play make Jack a dull boy", iterations).ToArray());

                        logItems.Add(new LogItem
                        {
                            Category = "TestClient",
                            Subcategory = "Tests",
                            Severity = severity,
                            Message = $"A log message of severity {severity}. Will repeat the magic incantation {iterations} times: {incantations}",
                        });
                    }

                    await logConsumer.LogMessageBatch(logItems.ToArray());

                    logItems.Clear();
                }
            }
            else
            {
                //no one is listening
                _logger.Error($"No remote ILogConsumer found.");
            }
        }
    }
}

Watchdog

When a client connects to the Broker, the Broker will set an initial timeout of 2 minutes. If the Broker does not receive a "WatchdogReset" frame within this timeout, it will send a "Trm" frame to the client and disconnect the client.

When the client sends a WatchdogReset frame, it must specify when the watchdog timer on the Broker should next elapse (e.g. after 15 seconds). The Broker has a configurable range of acceptable reset values. The initial implementation is between 15 seconds and 2 minutes.

If a client sends WatchdogReset that is outside the acceptable range, the Broker will reply with an Exc frame and will not reset the client's watchdog timer.

If at any time a watchdog timer elapses for a client, that client will be disconnected by the Broker and it's service advertisements will be removed.

A client, therefore, has some control of the aggressiveness of the watchdog mechanism by choosing a more or less agressive interval between the Broker's minimum and maximum acceptable reset values.

For example, if a client is hosting a service which needs to be very responsive to callers, it could choose lower reset values and send resets on a more frequent basis. The Broker, then, would realize the cliend had become unresponsive more quickly than a client which chooses a higher interval.

Service Definition Requirements

  • REACH.ServiceBus only proxies methods on the interface. It does not proxy properties, members, events, etc. Those are all features to implement another day.
  • Methods MUST return Task or Task<T>. This is just my arbitrary design decision. I did it this way because developers often don't realize their method may need to be async in the future when they write it. What if the method accesses direct memory today but needs to access a database server tomorrow? Well, now you get to refactor a bunch of code everywhere. If the method was async to start with, no problem.
  • Method parameters cannot be ref or out. I'm sure this could be implemented, but I'm just going to say NO. Also, ref/out does not play nicely with async so it might not even be possible after all.
  • Return types and parameter types must be serializable to/from JSON using the fantastic Newtonsoft.JSON library.

There are several serialization edge cases that cause problems when marshalling C# types to/from JSON.

Objects

REACH.ServiceBus can marshall any C# objects which can be serialized and deserialized by Newtonsoft.Json. The producer and consumer need not even reference the same assembly as long as the object definitions on boths sides are compatible (e.g. string[] on one side, List<string> on the other side). That said, it is strongly encouraged that both the producer and consumer reference the exact same assembly to prevent serialization problems that might be tough to diagnose in the wild.

Numerics

Numbers are represented in JS as double precision floating point numbers. When they are converted from C# to JSON and then back to C#, integer values come back as Int64 always. But if the method signature specifies Int32, then we get a type casting exception because C# will not allow an Int64 to be cast to Int32.

The C# Convert.ChangeType function has been used to smooth this over. Simply put, if the caller has the method signature with Int32 and the provider has the method signature with Int32, then we can be sure that no truncation is actually happening (since the producer cannot create integers that would result in truncation on the consumer side).

It is probable that precision issues will arise when dealing with double/decimal/float types. Provided, these values are not actually being shoveled through Javascript, it would not surprise me if Newtonsoft.Json was doing some handling of floating point numbers to improve compatibility with JS implementations. Work done for NEO Mercury explored these issues extensively because NEO Mercury (C#) and Signs (JS) need a way to compute compatible SHA hashes of objects to detect if they differed. Much consideration had to be taken to ensure that numerics were being hashed consistently between the two languages and ultimately 100% fidelity could not be ensured (C# numerics had to be truncated so as to not exceed precision characteristics of JS).

Arrays/Collections

Collections and typed arrays are both serialized to JS arrays in JSON and then deserilize to a JArray type on the receiving side. The receiver knows the exact type the JArray should be converted to based on the method signature. The code performs the following conversion cascade:

JArray -> IEnumerable<object> -> T[]

If the desired type is a type array (T[]), then we're done. If the desired type is a generic collection (List<T>, Queue<T>, etc.), then the code uses Activator.CreateInstance() to find the appropriate constructor that accepts the T[] that was already prepared:

var genericArgument = marshalTo.GenericTypeArguments.First();

Activator.CreateInstance(
    marshalTo,
    CastEnumerableToTypedArray(
        (source as JArray).Select(item => item.ToObject(genericArgument)),
        genericArgument
    )
);

Byte Arrays (byte[])

Support for marshalling byte[] has not been written yet. There is no JSON representation for byte[]. JSON libraries each have their own convension on how to store byte arrays. Most use an array of numbers ([ 2, 9, 12, 256 ]). Some implementations base64 encode the bytes into a string.

It is just a matter of finding out how Newtonsoft.Json encodes byte arrays and writing code in REACH.Proxies.Marshalling.Json.cs to handle this case. This is on the short-list of things to fix because I like to work with byte arrays a lot.

Configuring RPC Invocation Parameters

A client can configure various operational parameters on a per-method basis:

  • IsFireAndForget: indicates that the client will treat calls to this RPC as "fire-and-forget". The call will return immediately to application code with a value of default(T) where T is the type returned by the method.
  • OnUnhandledException: provides a delegate which is called when a fire-and-forget method call generates an exception.
  • Timeout: how long to wait for the invocation to complete. NOTE: the underlying named pipe library has a hard-coded time of 60 seconds, so setting the Timeout to anything near or above 60 seconds will not be effective.
  • Expiry: when the invocation should "expire". If a service provider receives and invocation that has expired, it will simply ignore the invocation.

The API for configuring method behavior is admittedly crappy. I have not come up with a better idea:

sbc.ConfigureServiceBehavior<ISomeService>(configurator =>
    configurator.ConfigureMethod(
        someService => nameof(someService.MethodWhichThrowsAnException),
        config => {
            config.IsFireAndForget = true;
            config.OnUnhandledException = (invocation, exception) => {
                _logger?.Warning(exception, $"fire-and-forget invocation `{invocation.Method.DeclaringType.FullName}::{invocation.Method.Name}` threw an exception");
            };
        }
    ).ConfigureMethod(
        someService => nameof(someService.DeadOnArrival),
        config => {
            //setting Expiry to "Zero" means the message will be "dead on arrival"
            //since it is immediately expired
            config.Expiry = TimeSpan.Zero;
        }
    ).ConfigureMethod(
        someService => nameof(someService.LongRunningOperationWithBigTimeout),
        config => {
            config.Timeout = TimeSpan.FromSeconds(30);
        }
    )
);

var someService = sbc.GetServices<ISomeService>().First();

//does not actually cause an exception! This is because we have configured the method
//as fire-and-forget, so the service bus client immediately returns without waiting
//for a reply (which will be an exception). Good thing we set the OnUnhandledException
//handler too!
await someService.MethodWhichThrowsAnException();

//throws System.Timeout exception because the operation takes more than
//default 1 second timeout
await someService.LongRunningOperationWithDefaultTimeout();

//throw System.Timeout exception because the consumer rejects the expired invocation
await someService.DeadOnArrival();

//succeeds because we configured big timeout
await someService.LongRunningOperationWithBigTimeout();

Performance

A variety of performance trade-offs were considered when building REACH.ServiceBus.

ArrayPool<byte>

The named pipe library uses byte arrays to read data from. Each read operation uses smallish byte arrays (10Kb). These byte arrays are rented from an array pool to avoid excessive allocations. Additionally, a fully assembled message frame can become quite large. The library has a hard-coded maximum buffer size of 10 MB.

Use of ArrayPool decreases the frequency of gen 2 GC sweeps at the expensive of increasing thread lock contention. This resulted in a decrease in total throughput in exchange for fewer disruptive "stop the world" GC sweeps.

Message Queuing

The original design wrote message frames in one shot by serializing entire frames before the write operation. This resulted in LOH allocations for large messges. As such, I switched to using Newtonsoft.Json's stream reading/writing facilities which avoids LOHs allocations. In order for this to work, the named pipe channel needs to be "locked" until the entire frame is written. Rather than using lock syncrhonizations, I choose to enqueue frames that need to be written to the named pipe and have a separate, dedicated thread do the writing. This ensures that complete frames are written and that calling code does not block a thread (it can benefit from async/await scheduling).

This queuing mechanism resulted in a decrease in total throughput.

Measurements

A 10 MB message payload took, on average, ~1,500 msec to transmit from producer to consumer. REACH.ServiceBus can sustain ~6.7 MB/s of data transfer from a single client.

A simple RPC invocation which takes an integer and returns an integer executes, on average, in 373 usec (0.373 ms). A single client saturating a connection can execute ~2,700 simple RPCs / second.

A consumer, Broker, producer triplet running at full saturation consumed about 65% of the CPU time on my laptop.

These operational parameters far exceed system requirements. Simply stated, it is unlikely that any component of the system will need to produce MB/s of data or execute 100's or 1000's of RPCs / sec.

Comments (0)

HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.