HTTPS SSH

erles - Erlang client for Event Store

erles is an Erlang client library for
Event Store.

erles is licensed under The MIT License.
See LICENSE file for details.

Copyright © 2013 Andrii Nakryiko andrii.nakryiko@gmail.com

I'm very new to Erlang ecosystem and this is my first Erlang project, so if I'm
doing something in a non-optimal, non-idiomatic or, even worse, plain wrong way,
please, let me know and I'll try to fix it.

Supported versions of Event Store

As erles uses and supports some of nice latest changes to Event Store (next
expected versions on writes and soft deletes), which haven't been released yet,
erles can be used only with latest dev-builds currently. As soon as Event Store
releases its new version, erles will stick to supporting stable versions.

Erles itself is not yet production ready, so there could be some slight breaking
changes if that will lead to better client library.

Features

erles aims to provide a convenient and reliable client for working with Event
Store. It strives to be on par or better (and more convenient) with official
Event Store client functionality-wise.

Main features:

  • Permanent connection to node/cluster with automatic reconnects.
  • User authentication support;
  • Automatic reconnection to master node for maximum performance and least
    network overhead.
  • Batch writes to stream.
  • Explicit transaction writes.
  • Stream deletion (both soft- and hard-deletes).
  • Single event reads from streams.
  • Stream reads in forward/backward direction from usual stream or $all
    pseudo-stream.
  • Permanent catch-up subscriptions which catch-up, subscribe to live events,
    and reconnects on errors/connection drops.
  • Primitive subscriptions provided by Event Store directly for those who need
    full control.
  • Metadata set/get operations which allow to work with both raw and structured
    stream metadata.
  • Ping operation (just for fun and testing that Event Store is responding :).

Using erles

Installation

Download sources from Bitbucket repository.

To build library and run tests:

$ rebar get-deps
$ rebar compile
$ rebar skip_deps=true eunit

You can also add repository as a dependency to rebar.config:

{deps, [
    ....
    {erles, ".*", {hg, "https://bitbucket.org/anakryiko/erles", {branch, "master"}}}
]}.

erles requires some of standard application to be started, so if you are working
with it in Erlang shell, please first run erles:start(). There is
also a convenience shell.cmd/shell.sh scripts that start Erlang console with
correct ebin paths set.

If you are doing proper Erlang release, adding erles to applications
property of your .app file should just work.

General conventions

Strings

Whenever string value is expected, erles accepts both <<"binary strings">> and
"list strings". If not - that's a bug, please report it.

Operation results

All operations in case of success return either ok atom or a tuple in which
first element is atom ok. Also most operations in case of failure return a
2-tuple {error, Reason}, where Reason is either predefined atom or any
Erlang term (only in some rare cases).

Common operation failure reasons:

  • retry_limit - operation reached allowed number of retries;
  • server_timeout - no response from server was returned within allotted
    time;
  • {aborted, AbortReason} - operation was aborted due to closed erles
    connection (because of AbortReason).
  • {not_authenticated, EsMsg} - provided login/password was not authenticated
    on server. EsMsg can provide a bit more information;
  • {bad_request, EsMsg} - Event Store received malformed package for some
    reason (e.g., because of protocol version mismatch). EsMsg can provide
    more information.

For more detail information on structure and type of arguments and results,
please refer to erles.hrl header file (which you probably will want to
include into your modules for more convenient usage of erles records).

Operation options

Most erles functions exist in at least two forms:

  • short one with sensible defaults;

  • the long one, which takes a list of additional options that modify or
    enhance operation behavior.

Connection operation accepts the following options:

  • {default_auth, noauth | defauth | {Login, Pass}} (default: noauth) -
    default authentication used for each operation if no explicit override is
    provided for particular operation:

    • noauth - no authentication should be used;

    • defauth - same as noauth (when set as connection option, see
      below for meaning when used for individual operation);

    • {Login, Pass} - login/password string pair to use for authentication.

  • {max_server_ops, pos_integer()} (default: 2000) - maximum amount of
    active operations on Event Store node. Used to prevent node overload. All
    other operations are enqueued internally in erles connection.

  • {operation_timeout, pos_integer()} (default: 7000) - timeout after which
    {error, server_timeout} will be returned for operation, if there was no
    response from Event Store.

  • {operation_retries, non_neg_integer()} (default: 10) - how many times
    operation will be retried (due to reconnection, prepare/commit/forwarding
    timeout, etc) before giving up and returning {error, retry_limit}.

  • {retry_delay, non_neg_integer()} (default: 500) - the delay before
    operation retry.

  • {connection_timeout, pos_integer()} (default: 2000) - TCP connection
    timeout.

  • {reconnection_delay, non_neg_integer()} (default: 500) - the delay
    before TCP reconnection in case of dropped connection.

  • {max_conn_retries, non_neg_integer()} (default: 10) - how many times
    erles will try to reconnect to a given destination (node or cluster).

  • {heartbeat_period, pos_integer()} (default: 500) - heartbeat period.

  • {heartbeat_timeout, pos_integer()} (default: 1000) - heartbeat timeout
    after which connection is declared closed, if no heartbeat response is get
    from server.

  • {dns_timeout, pos_integer()} (default: 2000) - timeout of resolving DNS
    entry.

  • {gossip_timeout, pos_integer()}. (default: 1000) - timeout of getting
    gossip from node.

All timeouts are in milliseconds. infinity is not accepted.

Write operations (append, transactions, stream deletion, setting stream
metadata) accept the following options:

  • {auth, noauth | defauth | {Login, Pass}} (default: defauth) -
    authentication used for operation.

    • noauth - no authentication should be used;

    • defauth - use default authentication of connection (set with
      default_auth option provided in connect/4). If no default
      authentication is set on connection level, noauth is used;

    • {Login, Pass} - login/password string pair to use for authentication.

  • {master_only, boolean()} - the flag that specifies whether operation
    should be performed exclusively on master node. When master_only flag is
    set, erles connection will automatically reconnect to new master in case
    connected node is not master. This option ensures no additional
    communication overhead and lower latency for writes (otherwise write
    operations are forwarded within Event Store nodes internally) and most up
    to date reads from master for reads (otherwise reads are served locally
    from Event Store node, no forwarding occurs). Default value is true.

Read operations (single event read, stream range reads, getting stream
metadata) accept the following options:

  • auth - same as for write operations;

  • master_only - same as for write operations, see option's description
    for effect on reads;

  • {resolve, boolean()} - the flag that indicates whether Event Store
    should resolve links emitted from projections into actual events. If the
    flag is set - any link is automatically replaced with original linked
    event. Defaults to true.

Subscription operation accepts the following options:

  • auth - same as for write and read operations;

  • resolve - same as for read operations;

  • {subscriber, pid()} - PID of process that will receive all subscription
    messages. If not provided - the caller process ID is used.

  • {read_batch, pos_integer()} - the number of events that will be read
    in a single stream read operation during catch-up phase of subscription.
    Default value - 100.

Connection

All operations in erles are performed through a connection, which, once
established, stays open and keeps reconnecting in case of TCP connection drop.

erles connection is established through calling connect/2 (or connect/3)
function. When successful, function returns process ID, which is used with all
operations (except subscription unsubscribe/1).

-spec connect('node', {Ip :: inet:ip_address(), TcpPort :: 0..65535})              -> {'ok', pid()} | {'error', term()};
             ('dns', {ClusterDns :: string() | binary(), ManagerPort :: 0..65535}) -> {'ok', pid()} | {'error', term()};
             ('cluster', [{Ip :: inet:ip_address(), HttpPort :: 0..65535}])        -> {'ok', pid()} | {'error', term()}.

Three connection modes are currently supported:

  • Connection to single node. IP address and TCP port of Event Store node
    should be specified:
> {ok, C} = erles:connect(node, {{127,0,0,1}, 1113}).
{ok,<0.33.0>}
  • Connection to cluster of nodes. A list of 2-tuples with IP address and
    HTTP port of each node is specified. erles will automatically determine
    through gossip protocol the best node to connect to.
> {ok, C} = erles:connect(cluster, [{{127,0,0,1}, 7777},
                                    {{127,0,0,1}, 8888},
                                    {{127,0,0,1}, 9999}]).
{ok,<0.40.0>}
  • Connection to cluster of nodes through specifying DNS name and common HTTP
    port for all endpoints. erles will resolve DNS name into a list of IP
    addresses and pair them with provided HTTP port forming same list of
    endpoints used with cluster connection mode. This mode is intended to be
    used with Event Store's managers, though specifying HTTP port of cluster
    node is enough to function properly provided all machines have nodes with
    same HTTP port. Usage:
> {ok, C} = erles:connect(dns, {<<"demo.cluster.foo.com">>, 30778}).
{ok,<0.45.0>}

To close existing connection just call close/1:

> erles:close(C).
ok

Writing events

Event Store provides two ways to store events, both of which are supported by
erles:

  • batch writes when all sent events are committed immediately;

  • transactional writes. You have to start transaction specifying expected
    version of stream you write to, then you can make multiple transactional
    writes of events which Event Store won't commit until you explicitly
    commit that transaction.

Batch writes

Batch writes are done with append/4 (append/5 for variant with options)
function:

-spec append(pid(), stream_id(), exp_ver(), [event_data()]) ->
        {'ok', stream_ver()} | {'error', write_error()}.

Where parameters (in order) are:

  1. erles connection PID (returned from connect/2 function).
  2. Non-empty stream ID (either binary or list string).
  3. Expected version of stream we are about to write to. Expected version is
    either atom any (to specify that you don't care about version of
    stream, so no optimistic concurrency control should be used and write
    should always succeed) or integer, with -2 meaning same as any, -1
    meaning empty stream, any other non-negative integer specifying the
    number of last event in the stream.
  4. List of events to append to stream, each event defined as:
-record(event_data,
        {
            event_id = erles_utils:gen_uuid()                 :: uuid(),
            event_type = erlang:error({required, event_type}) :: binary() | string(),
            data_type = raw                                   :: 'raw' | 'json',
            data = erlang:error({required, data})             :: binary(),
            metadata = <<>>                                   :: binary()
        }).

The two required fields are event_type and data. event_id, if not
specified, will be set to newly generated UUID, metadata will be empty
and data type will be assumed to be raw (not structured). If you are
writing valid JSON as data, you should set data_type to json, so
Event Store's projections will know how to work with event data.

In case of success, the new expected version of stream is returned (that should
be used with consequent writes to same stream).

The write-specific failure reasons include:

  • wrong_exp_ver - provided stream expected version is wrong;

  • access_denied - you don't have enough access rights for operation, provide
    login/password credentials with enough access rights;

  • stream_deleted - you are trying to write events into the stream that was
    permanently deleted;

  • invalid_transaction - this can mean few things, but most probably wrong
    transaction ID (for transaction operations).

Example:

> {ok, NextExpVer} = erles:append(C, <<"stream">>, any,
                                  [#event_data{event_type="type1", data="data1"},
                                   #event_data{event_type="type2", data="data2"}]).
{ok,1}

Transactional writes

To start transaction you use txn_start/3 (txn_start/4) function:

-spec txn_start(pid(), stream_id(), exp_ver()) ->
        {'ok', trans_id()} | {'error', write_error()}.

All parameters have the same meaning as with append/4. In case of success
transaction ID is returned which should be used with all operations withing that
transaction. Failure reasons are same as for append/4.

Example:

> {ok, Tid} = erles:txn_start(C, <<"stream">>, 1).
{ok,9679198}

To write some events within transaction you use txn_append/3 function:

-spec txn_append(pid(), trans_id(), [event_data()]) ->
        'ok' | {'error', write_error()}.

It accepts connection PID, transaction ID returned from txn_start/3 and a list
of event of the same structure as in append/4. When operation succeeds, events
are not committed yet.

Example:

> ok = erles:txn_append(C, Tid, [#event_data{event_type = <<"et1">>,
                                             data = <<"{\"num\":123}">>,
                                             data_type = json}]).
ok
> ok = erles:txn_append(C, Tid, [#event_data{event_type = <<"et2">>,
                                             data = <<1,2,3>>}]).
ok

To force events to be stored in the stream, call txn_commit/2:

-spec txn_commit(pid(), trans_id()) ->
        {'ok', stream_ver()} | {'error', write_error()}.

The result is the same as with append/4 - expected version of stream for
following writes. Failures are same as well.

Example:

> {ok, NextExpVer} = erles:txn_commit(C, Tid).
{ok,3}

Reading events

To be written... For now, please take a look at erles_tests.erl for some
samples.

Subscriptions

To be written... For now, please take a look at erles_tests.erl for some
samples.

Working with stream metadata

To be written... For now, please take a look at erles_tests.erl for some
samples.