1. Stefan Weigert
  2. dynapsys

Overview

HTTPS SSH

uPS is a scalable and elastic distributed content-based publish subscribe system. The filtering is done based on the evaluation of regular expressions. Read this blog-entry for more background information.

Architecture

The architecture of uPS is heavily based on StreamHUB, a research prototype, published in DEBS 2013. The general idea is to divide the many computational steps involved in Pub/Sub into independent sets of processes. For example, uPS might be used in a setting where there are many subscriptions to be matched but only few of them produce a match. Therefore, we want to allocate as much of the available resources towards matching but we do not need many resources for notification delivery. This is only possible if these processing steps are loosely coupled instead of being hardwired together within a single monolith.

In contrast to P2P inspred Pub/Sub systems, StreamHUB and uPS are suited to be executed in a datacenter. The advantage is that routing of publications does become predictable, i.e. we do not need to keep a complex set of forwarding tables which connect the individual brokers in a P2P Pub/Sub. With that, message latencies become predictable as well.

However, there is one important difference to the original prototype, described in the paper. uPS does not partition it's subscriptions across the nodes but it partitions publications instead. This has the advantage that we can replicate the nodes that hold subscriptions easily and at the same time these replicas improve performance since we partition the publication workload among them.

Main Components

  • Web: The web-component is basically a small http-server which offers an http-interface to publish or subscribe to the system. Each subscription spawns a new process.
  • Access Points (AP): Access Points do nothing but route the messages at the moment. However, a possible use-case is pre-processing of messages.
  • Matcher (M): Matchers hold the information of which subscriber holds which subscription. When they receive a publication they try to match this publication against all registered subscriptions and send all matching subscriber-ids to the EPs. All matchers share the same state but since they use kvmnesia, the state is replicated in RAM on every node. Write-accesses are transactional but reads are dirty. This is why adding matchers has a negative impact on the performance of subscription-storing but a positive impact on publication matching.
  • Exit Point (EP): Exit Points hold the information on the process-id(s) of the web component instance(s) with which the corresponding subscriber is connected to the system. Since they also use kvmnesia, adding more exit-points improves message-dissemination performance (since you only need to read the table) but decreases subscriber-storing performance (as this is a transactional write to all exit-points).

arch.png

Running

You will need Erlang 17.1 and rebar installed on your computer. Furthermore, your computer (and any machine you want to run this on) needs an ssh server and it is advisable to use public-key authentication in order to avoid endless password-checking.

Once, this is done, you can execute "run_e2e_tests.sh" which will deploy uPS on your local machine and run some test-cases. You can also run "run_benchmark.sh" which works as the end-to-end test but additionally deploys a subscriber who makes 1000 subscriptions and a publisher who publishes 10 publications/s.

You can adapt these bash-scripts to your needs (for example, remove the sample subscriber and sample publisher from the "run_benchmark.sh" and you have a simple deployment script). The most important part in these scripts is the line "cp cluster_config_local.py cluster_config.py". The file "cluster_config.py" is loaded whenever fabric commands are used to deploy the cluster. In this case we put a single "localhost" everywhere. The nice thing is though, that you can add any number of machines to those lists - thus scale to any kind of workload.

Elasticity

Elasticity describes the possibility to add more computing nodes during runtime which is essential whenever you are dealing with dynamic workloads (i.e. diurnal patterns in requests). Adding a component once the system is running is straight forward for the components which do not share any state (i.e. web and Access Points) - you can just add more by executing "fab -H your-new-host@example.com start:stage=access_point". The only thing you need to make sure is that all hosts know each other from the beginning on by providing appropriate ".hosts.erlang" files. If you create them on your own on your machines, be sure to remove "fab -R workers upload_cluster" from your script as it will overwrite those files with all nodes, specified in "cluster_config.py".

Adding an additional node to a stateful component (i.e. Mather and Exit Point) is slightly more complicated: "fab -H your-new-host@example.com start:stage=matcher,mode=slave,nthreads=1,master_node=`python get_master.py matcher`" As you can see, the tricky part is the last config-parameter which tells the new node to which master to connect. This can be any matcher that is already running and you can either let the python script determine one (as above) or provide one manually.