Wiki

Clone wiki

memoria / Overview-2019

Overview of Memoria

There is a long-term trend for applications to become more computationally-intensive. Traditionally, most applications have been built around databases which are IO-bound: small number of operation performed on large number of data. There have been also compute-intensive scientific and HPC applications, performing large number of operations on small number of data (iterative and more complex algorithms). Today compute intensive tasks like Artificial Intelligence and Machine Learning merge with traditional IO-intensive database technologies like SQL processing, bringing new qualities to both worlds.

This trend is manifesting itself in many different ways, below there are only a few of them:

  1. Databases become more decentralized or federated. There is a tendency to integrate independently evolving data-stores into the single unified large data-space.
  2. Databases become more distributed. Individual databases grow and eventually don't fit into a single machine. New applications try to be distributed, decentralized, elastic from the beginning.
  3. Data becomes more structured (diverse and connected). Not just tabular, graph-like, tree-like or what a good old data structures (GODS) look like. Neural Network's weights are also data structures and can be hybridized with, say, trees for faster operations. Much like databases speedup queries with indexes.
  4. Computations become more heterogeneous. General purpose CPUs are not energy efficient on many specialized tasks. Specialized processors like GPUs, ASICs and neural processors are more energy efficient in thier narrow domains. Data storage format is different from the data processing format. Multiple data format transformation steps are required when data is moved between processors.
  5. Computers become more dense. Having 64 out-of-order cores in a single commodity processor is possible now. To utilize this horsepower, applications have to be designed in the special way.
  6. Networks become faster. Terabit Ethernet is on the way, having 10GBE network or direct cross-device connection (RDMA) in the cloud in not uncommon now. Asynchronous IO is needed to utilize this throughput efficiently, but software stacks are not generally ready for that.
  7. Storage devices becomes faster. NVMe SSD with millions of IOPS, NVRAM devices. Asynchronous IO, again, is needed for storage devices too, not only for the fast networking.

Catching up with this trend is not an easy thing, software is not that soft these days. Fixing a software bug is fast, but developing entire new ecosystem is slow. Utilizing high-throughput network and storage devices requires entirely new ecosystem: special "asynchronous" API have been proposed for OS kernels, implies using "green" threads and actor-like software designs. The same is for heterogeneous computations. Though much has been done here, compilers and virtual machines are only at half-way to this goal. Vendor lock-in (think, Cuda) is dominating this area.

There is no simple solution to this complex problem. Memoria is focusing on "core data" aspects: storage, transformation and publishing of dynamic long-term data. Instead of providing yet another vertically integrated Storage/ETL solution, Memoria is focusing on providing low-level building blocks, common for great variety of practical and theoretical problems. Bird's eye view will see three main subsystems:

  1. Advanced Dynamic Data Structures Framework
  2. Persistent Layer
  3. Storage Layer

More on this below.

Advanced Dynamic Data Structures Framework

A data structure is dynamic if it can be modified in a constant or logarithmic time of its size. Otherwise, if each operation requires linear or worse time for each modification, such data structure is considered static. For example, std::vector<T> is dynamic for push_back() operation and static for insert() operation. Relational tables in OLTP databases are dynamic, while in analytical databases (e.g. in Druid) they are static. This distinction is important, because dynamic data structures are write-optimized (O(log N) for all CRUD operations), and static data structures are read-optimized (O(1) reads, but O(N) for updates). From the theoretical perspective, we can't have both O(1) reads and O(1) updates for a vector data structure, that is the backend for many other more complex data structures.

There is no a strict definition of advanced data structure and how it differs form an ordinary one. Ordinary data structures are well-known, well-studied and widely-used. They usually have relatively simple and pretty efficient implementations. For example, B-Tree is an ordinary data structure.

Advanced data structures, from the opposite, are usually application-specific, have some unique or special properties and sometimes don't have efficient general purpose implementations. These structures usually require nontrivial efforts to find efficient task-specific implementations or advanced professional skills to use them efficiently. Notable example of advanced data structure is compressed bitmap used in analytical databases as a search index to speedup queries. Another example of advanced data structures are full-text search indexes, used in bioinformatics and information retrieval to quickly find sub-strings in strings (e.g., documents by keywords). There are two practically important classes of advanced data structures: compressed storage and search indexes. Search indexes can also be compressed, like Wavelet Trees, though.

Memoria is focusing and optimizing on advanced data structures from the first place, and is providing conventional ones like dynamic arrays, maps, graphs and relational tables as specializations of advanced ones, not vice versa.

  1. Primitive data structures: arrays, search trees, prefix sum trees, bitmaps, nested values etc.
  2. Special allocator "packing" those structures into a fixed-size memory block, like a 4K memory page.
  3. Serialization facilities to store/load such blocks to/form external memory.
  4. Meta-programming framework providing dynamic balanced search trees over those memory blocks.
  5. IO Buffer framework, similar to Apache Arrow to forward data from block-level to application level and beyond. Efficient Java bindings utilizing such buffers are provided with Mariana project.

By and large, this layer of Memoria reduces data containers like dynamic arrays, sets, maps, multimaps to limited-size memory blocks which can be stored anywhere, from immediate in-memory stores to distributed hash tables:

Storage Options In Memoria

The following is a short list of low-level general purpose containers Memoria is providing out of the box.

  • Map<Key, Value> - classical key-value mapping for short/fixed size keys and values, O(log N) time complexity for insert/delete/find/access worst case, and O(1) scan in average.
  • Vector<Value> - dynamic array for short/fixed size values. Can be used in columnar storage for columns of fixed-size values.
  • Vector<Vector\<Value>> dynamic array for arbitrarily-sized values. Can be used in columnar storage for columns of generic values.
  • Map<Key, Vector\<Value>> - generic key-value mapping for fixed-size keys and arbitrary-sized values. Has almost no overhead representing small values and can store extremely large arrays of values up to 2^63 elements in size. Actually total size of all values must fit this limit.
  • Map<Key, Map\<Key, Value>> - "wide table" row->column->value (like Cassandra or HBase) for fixed size keys and values, implemented as a single multistream balanced tree.
  • Map<Key, Map\<Key, Vector\<Value>>> - the same as above but can store arbitrary-sized arrays of values up to 2^63 elements.
  • Sequence<N> -- searchable sequence for N-symbols alphabets, N=1..256, supporting rank and select operations. When N=1, we can create bitmap index.

Balanced tree-based containers have several notable features.

  1. Index-based access. For almost all containers it is possible to find an entry using its number, not just key. Hence, it is possible to read only a certain range of rows in, say, a table (e.g., paging). Or split a table into several parts and assign those parts to a set of threads for parallel query execution (horizontal partitioning).

  2. Updates batching. For almost all containers it is possible to batch arbitrarily many update operations into a single one, mitigating overhead of individual operations. For example, individual insertion into a b-tree is expensive (relative to hash tables). But insertion of a sorted entries list is 10-100 times faster. In raw numbers, when individual insertion rate is 100K entries/s, batch insertion rate is 10M entries/s.

Memoria is focused on low-level containers like Vector<T>. Though it is possible to implement high-level application-specific containers with rich semantics like row-wise or column-wise relational tables with flexible data-type statistics, those implementations are left for applications of Memoria, which should be stacked on top of low-level containers. Because no size fits all.

Persistent Data Structures

A data structure is persistent if update operations (a single one or a batch of) produce new version of data structure. Old version may be kept around or removed as soon as they are not in use. Read access to old version does not interfere with write access creating new version. So very few to none synchronization between threads is required. Persistent data structures are mostly wait-free, that makes them a good fit for concurrent environments.

Virtually every dynamic data structure can be made persistent, but not every one can be made persistent natively the way that creating a new version does not result in copying of entire data structure. For example, there are efficient copy-on-write based algorithms for persistent trees without parent links (see below). But not for trees with parent or sibling links, linked lists or graphs. Fortunately, there is a way to "stack" non-persistent data structures on top of a persistent tree, and as a result, the former gains persistent properties of the latter by the expense of an O(log N) extra memory accesses. Memoria follows this way.

Persistent Trees

Persistent balanced tree is very much like ordinary (non-persistent) tree, except instead of updating tree in-place, we create a new tree that shares the most with "parent" tree. For example, updating a leaf results in copying entire path form the root to leaf into a new tree:

Copy-on-Write tree

Here, balanced tree for Version 1 consists from yellow path and from the rest of Version's 0 tree excluding yellow path. Version's 2 tree consists from blue path and the rest of Version's 1 tree excluding blue path. For insertions and deletions of leafs the idea is the same: we are copying modified path to a new tree and referencing the the rest on the old tree.

Atomic Commitment

Updates never touch old versions (copy-on-write). Many update operation can be combined into a single version, effectively forming an atomic snapshot.

Copy-on-Write tree

If writers buffer a series of updates in a thread-local memory and, on update completion, publishes those updates to shared memory atomically, then we have snapshot atomic commitment. Other readers will see either finished and consistent new version of the data or nothing. So, if a version fits into local memory of a writer thread and we never delete versions, we have Grow-Only Set CRDT, that works well without explicit synchronization.

Reference Counting and Garbage Collection

Deletion of versions is special. To delete a version means to delete every persistent tree's node that is not referenced in other versions, down to the leaves:

Copy-on-Write Tree + Delete Version

Here we are deleting Version's 0 root node, as well as green and blue nodes, because they are overshadowed by corresponding paths in other versions. To determine which node is still referenced in other versions, we have to remember reference counter for each node. On the deletion of a version, we recursively decrement nodes' reference counters and delete them physically, once counters reach the zero.

In case of deletions, persistent tree's nodes are still a 2P-Set CRDT, so insertions and deletions do not require synchronization. Unfortunately, reference counting requires linearizable memory model for counters. Memory reclamation in persistent trees does not scales well because of linerizability requirements for reference counting. But in practice it may or may not necessary be a problem in a concurrent environment, depending on the workload memory reclamation is put on the shared memory subsystem.

Note that main-memory persistent and functional data structures usually rely on the runtime-provided garbage collection to reclaim unused versions. Reference counting to track unused blocks in Memoria may be seen as a form of deterministic garbage collection.

Stacking on Top of Persistent Tree

Stacking non-persistent dynamic data structures on top of persistent tree is straightforward. Memoria transforms high-level data containers to a key-value mapping in a form of BlockID->Data. This key-value mapping is then served through persistent tree, where each version is a snapshot or immutable point-in-time view to the set of low-level containers.

CoW Allocators

Snapshot History Graph

Versions in persistent tree need not be ordered linearly. We can branch new version from any other committed version, effectively forming a tree- or graph-like structure:

CoW History Tree

Here, in Memoria, any path from root snapshot to a leaf (Head) is called a branch. A data structure is fully persistent (or, simply, persistent) if branches can't be merged. Otherwise it's confluently persistent. Memoria doesn't use confluently persistent trees under the hood. Instead, data blocks can be copied or imported from one snapshot to another, effectively providing the same function to applications.

If a writer may branch only from the single head, or, there is only one linear branch in the history graph, such data structure is called partially persistent. This apparent limitation may be useful in certain scenarios (see below).

Note that at the level of containers merging operation is not defined in a general case. It's might be obvious how to merge, say, relational tables because they are unordered sets of rows in the context of OLTP transactions. And this type of merge can be fully automated. But it's not clear how to merge ordered data structures like arbitrary text documents.

Because of that, Memoria does not provide complete merge operation at the level of snapshots. But it provides special facilities to define and check for write-write ad read-write conflicts (conflict materialization). Living the final decision and merge responsibility to applications.

Transactional Operations

Memoria is designing for compute-intensive workloads, and such workloads imply continuous data updates. These updates may be either external data input like in streaming scenarios or internal state change defined by algorithms. Transactional ACID semantics is a very useful tool to mitigate complexity of algorithms in concurrent and distributed environments.

Not all transactions are the same. In theoretical plane, strict ACID semantics is considered not scalable but this doesn't say much about raw performance of a specific database system. Relaxed ACID semantics, like Read Committed isolation level, might be relatively scalable in the practical plane. That means, practical performance may fit requirements.

Memoria does not support transactions out of the box. It provides Atomic Commitment for copy-on-write based snapshots, which can be used to build transactions on top of it, but not yet a transactions by itself.

Single Writer Mutiple Readers

So-called Single Writer Multiple Readers, or SWMR for short, transactions can be build on the top of Memoria's snapshots pretty easy. In the SWMR scheme there is only one writer at a time but can be multiple readers accessing already committed data. To support SWMR transactions we need a lock around snapshot history branch's head to linearize all concurrent access to it:

Transactions

Despite being non-scalable in theory, because of the lock, SWMR scheme for transactions may show pretty high practical performance in certain cases:

  1. When write transactions are short: point-like queries + point-like updates like for moving money from one account to another.
  2. When writes does not depend on reads like in streaming: firehose is a writer ingesting events into the store, readers perform long-running analytical queries on "most recent" snapshots with point-in-time semantics.

There are two main reasons for SWMR high performance:

  1. History lock is taken only for the short period of time, just to protect snapshot history form concurrent modifications.
  2. Otherwise, no additional locks are involved, except, possible, implicit locks in IO and caching subsystems of computers.

For long-running readers the cost of these locks is amortized, so they can access dynamic shared data with efficiency of local immutable files.

Multiple Writer Multiple Readers

If read+write transactions may be long, but not interfere much with each other, we can execute them in separate branches. And, if after completion, there are no read/write and write/write conflicts, just merge them into a single snapshot (new head). Conflicting transactions may be just rolled back automatically:

Transactions

This is actually how MVCC-based relational DBMS work under the hood. What we need for MWMR transactions is somehow describe conflicting set and define corresponding merge operations for conflicting containers.

SWMR transactions are lightweight, performant and can be implemented on top of a linear snapshot history graph without much effort. But they must be short for keeping transaction latencies low. MWMR scheme can run multiple write transactions concurrently, but imply that snapshot merge is fast and efficient. Moreover, conflict materialization also consumes space and time, and neither of these are necessary for SWMR transactions.

Streaming + Batching

As it has been said, SWMR scheme is streaming-friendly, allowing continuous stream of incoming updates and point-in-time view semantics for readers at the same time. The latter is implemented with minimum of local and distributed locks and allows read-only data access that is almost as efficient as to immutable local files (and even more: see Asynchronous IO).

Once snapshot is committed, it will never be changed again. So, we can run iterative algorithms on the data without expecting that this data may change between iterations. At the same time, updates can be a accumulated in upcoming snapshots. And once readers are done with current ones, they can switch to the most recent snapshot, picking up the latest changes. So, updates can be ingested incrementally and processed as soon as they arrive and ready.

Storage Layer

In-Memory Allocator

SWMR Allocator

MWMR Allocator

Asynchronous IO Subsystem

Types of Asynchronous IO

There is nothing special in asynchronous IO (AIO), it's just a set of APIs to perform input-output operations in a non-blocking way. Blocking IO API can't handle high throughput of modern hardware efficiently, because in blocking IO each stream of IO events is represented with CPU thread. Threads have some overhead, both in memory space (stack size) and CPU time (context switching and L1 cache evictions). By and large, in relative numbers, asynchronous IO can be 100 times faster in terms of throughput and latency that its blocking counterpart.

There are three main approaches to AIO:

  1. Continuation passing style or CPS.
  2. Stackful coroutines or fibers (green threads).
  3. Stackless coroutines or async/await style.

CPS is a promise/future library implementation on top event loop and epoll, kqueue or IOCP. CPS does not require any changes at the level of programming language, but suffers from inversion of control: it's hard to move from one continuation to another one in an interactive debugger, because form the system's perspective, visually linear program for a human is no more linear for CPU.

CPS may perform pretty well in GC-based languages like Java or JavaScript where there is no stack allocation. In C++ CPS is not compatible with stack allocation because hardware call stack is not preserved between continuations. Continuations may use dedicated stack-like data structures but they are not as efficient as the hardware call stack. The main drawback of CPS is that it's a highly invasive programming style. Everything directly or indirectly touching the IO must be written in a CPS style. One notable example of a CPS-based AIO frameworks is Seastar.

Stackful coroutines or fibers (green threads) are just threads managed in userland on top of the same event loop over AIO APIs as with CPS. No language-level integration (like go operator in Go) is usually necessary but such integration may provide significant performance boost in certain cases. Fibers are almost non-invasive. Existing thread-based application code may require next-to-none modifications to run in a fiber-based environment. One notable problem here is that popular rich runtime environments like JVM do not support manual stack switching, so they can't currently host such types of coroutines. Another problem of the same type is they existing data platforms was written without fibers in mind (especially java-based ones). Integrating fiber-based IO may be relatively easy (vs CPS) thing but still a non-zero effort.

The main limitation of fibers is that OS can't manage large number of hardware (memory-protected) stacks efficiently. Stack creation/destruction is a pretty costly process. Software-only stacks creation/destruction is cheap but there will be no hardware memory protection from stack overflow.

Note that stackful Coroutines and Fibers are just different APIs providing the same thing. We can implement Coroutines on top of Fibers or fibers on top of coroutines, depending on what is provided by the runtime.

Stackless coroutines are somewhere between CPS and Fibers. Under the hood they are much like CPS, but now managed by the compiler through dedicated asycn/await keywords. Asynchronous code now looks and feels naturally sequential as with Fibers or Threads, but this style is still invasive because async/await keywords throughout the entire source code base.

Stackless coroutines are probably the fastest way to run AIO programs because no hardware stacks are required. Compiler emulates hardware stack and transparently controls for stack boundaries. Moreover, C++ compiler can even perform static optimizations on top of such coroutines, simplifying concurrent code.

AIO in Memoria

Memoria is focused on high-performance IO because this is necessary for most of compute-intensive workloads. AIO is the way to go here because modern IO stacks like DPDK and SPDK require. Fibers are a good choice for AIO because of because of software compatibility reasons:

  1. Memoria is just a storage. It's expected that many existing libraries will be integrating with this storage into data platforms. So, minimization of integration efforts is highly desirable.
  2. There is a long-term trend to adopt stackful coroutines in various software platforms. C++ and Julia support them naturally, JVM is on the way to.

For even better compatibility Memoria supports both blocking IO and AIO, but in two different modes:

  1. Compatibility mode using threads for concurrency and memory mapped files for IO. This mode provide pretty good disk IO and full compatibility with existing software, but may contribute to tail latencies a lot when is used together with asynchronous networking.
  2. Performance mode using Boost Fibers and wrappers around native AIO APIs: epoll + linux AIO (for file IO), kqueue and IOCP on Linux, MacOS X and Windows respectively. This setup is currently incompatible with JVM but compatible with C++ and Julia and, probably, with V8 virtual machine.

Because of high complexity and possible compatibility issues, performance mode is currently considered experimental.

Memoria and Unikernels

AIO support in mainstream operation systems is broken and incomplete. While it's possible to poll for events from kernel, most of important system call are still blocking. Most of filesystems either block on certain system calls or use thread pools behind the scene. It's not that simple, for example, to build a web server with completely non-blocking IO, because of pure AIO support in filesystems.

Unikernels are single-application or 'library' operating systems where the application runs in a kernel mode, and the unikernel is designed to run in a virtualized environment or on the bare metal. Kernel/user mode separation elimination does improve raw IO throughput and latencies. But not usually so much to justify additional complexities of running native applications in a kernel mode.

Nevertheless, unikernels make sense not just for performance reasons. When we have full direct access to IO and hardware memory protection facilities, we can design the entire memory architecture that fits the needs of certain algorithms and data structures. For example, fast hardware memory protection for immutable snapshots. This finally may provide new protection and stability guaranties, not easily achievable with present OS API designs.

Apache Arrow Integration

Apache Arrow is very similar in spirit to Memoria but have different focus. Arrow is focused on interoperability and zero-copy sharing of immutable columnar data between different environments. Memoria is focused on dynamic advanced data structures storage and processing, not just columnar data.

Both projects have the same foundation:

  1. Memory buffer management.
  2. Primitive types over memory buffers.
  3. Complex types over buffers.

Memoria uses C++ template metaprogramming internally to build complex types form simple ones. Arrow is focused on interoperability, and environments other than C++ may not have access to a C++ compiler. For complex nested types, Arrow utilizes FlatBuffers interface compiler and hand-crafted "interoperable" memory layouts that can be accesses the same way from different programming languages.

Unlike Arrow, Memoria is not focused on the tabular/nested/columnar data only, but on more advanced cases. For example, typical table representation in Arrow is a list of buffers with embedded columnar data for some range of rows. This is sufficient for scan-like queries, but not for point-like ones.

Memoria has very similar (in the spirit) column-wise data layout for leaf nodes list, but also builds search tree on top of it for point-like queries:

Arrow and Memoria

Though Arrow is staring used not just for interoperability, as a primary data representation, Memoria and Arrow are complement to each other and does not compete. Memoria's buffers follow the same (SIMD-friendly) rules for memory alignment and can be wrapped into Arrow's interfaces. Memoria even have its own Arrow-like IOBuffer subsystem used, for example, to forward structured data from storage layer (containers, C++) to processing layer (C++, Java).

Mariana Java Bindings

Mariana is a java bindings generator, specially designed for Memoria's type system. Programmer can wrap certain containers into specially annotated Java interfaces, and Mariana generate required JNI artifacts for these containers.

Below there is a typical code snippet:

public class Snippet
{
    static
    {
        Mariana.init(); // Initialize native library
    }

    public static void main( String[] args ) throws Exception
    {
        // Create new in-memory allocator to hold all data
        try (InMemoryAllocator alloc = InMemoryAllocator.create()) 
        {
            // Create new snapshot. Data model in Mariana/Memoria is confluently persistent, 
            // much like in Git. Create, remove, merge, export, import sanpshots with data
            // change sets. 
            try(Snapshot snp = alloc.master().branch()) 
            {
                // Create simple map String -> String
                try(MapSS map = snp.create(MapSS.class))
                {
                    map.assign("Boo", "Zoo").close();

                    // Iterate over data
                    try(MapSSIterator iter = map.begin()) {
                        while (!iter.isEnd()) {
                            System.out.println("Key: " + iter.key() + " -- Value: " + iter.value());
                            iter.next();
                        }
                    }

                    // Finish snapshot, so not further updates are possible
                    snp.commit();

                    // Set this shapshot as master.
                    snp.setAsMaster();
                }
            }

            // Store entire allocator's data to disk.
            alloc.store("target/myalloc.dump");
        }
    }
}

Heterogeneous Computing

Use Cases and Applications

Machine Learning

Logical Inference

Probabilistic Programming

Artificial Intelligence

Long-Term Structured Data Storage and Communication

Roadmap

Updated