Dynamic predictive high-performance analyticsDownload HAMR 0.4.1

Features


HAMR™ is a distributed software system for analyzing large amounts of data in memory. HAMR is different than similar systems (e.g. Hadoop MapReduce and Spark) in that it is fundamentally a streaming engine. HAMR seeks to reduce data as early as possible, and to push data out of the system as quickly as possible, thereby minimizing the memory footprint of data. This allows the user to process larger data sets in smaller systems. This feature of HAMR also has a significant performance impact. Reducing the memory footprint early and often means there is less data to process, freeing up processor resources to do more computation.

A HAMR workflow takes data from an external data source, processes it into key/value pairs, manipulates it as needed, and writes the results out to an external data sink. It is a data-parallel system for big data, both unstructured and structured data types. It does not require the movement of data into HDFS, although it accepts input from and can output to HDFS. It can work with both batch and streaming data, and diverse data formats, in the same workflow.

A HAMR workflow defines a directed acyclic graph, where the nodes of the graph are Flowlets™. The vertices or edges in the graph are the the connections that bind the Flowlets and transfer the key/value pairs between them.

Figure 1. Directed Acyclic Graph

Directed Acyclic Graph

HAMR executes on a cluster of interconnected compute nodes. Each node has a symmetric view of data sources such as file systems, databases, and message queues. When a HAMR application executes it will access these data sources to read and write data. The physical interconnection between nodes is irrelevant to this view of the system. All nodes are assumed to be fully interconnected. Each compute node contains a number of processor cores. HAMR is not currently aware of memory hierarchies and views processor cores as a homogenous set of resources, regardless of the physical chip that contains the cores. The following figure shows an example cluster consisting of four fully interconnected nodes with symmetric access to a single data source for reading and writing data. Each node has four processor cores.

Figure 2. Example Cluster

Example Cluster

Application developers write a HAMR workflow according to the HAMR Application Programming Interface (API). Execution of a workflow is carried out on two types of nodes: master and slave. The master node is where the HAMR main routine, written by the application developer, is executed. The user submits the application to the master node through a client. On the client, the main routine sets up the workflow and then calls execute on the workflow object. Once execute is called, a workflow is copied to each of the slave nodes where it is executed to completion, at which point control is transferred back to the master node.

Workflows operate on key/value pairs, which are the data format required by HAMR, and allow HAMR to achieve data parallelism.

Each actor in a workflow is called a flowlet. Flowlets represent a single stage of parallel processing and operate on key/value pairs. Flowlets are connected to each other by binding their ports and slots together to create directed edges via which key/value pairs can be sent and received. A connection between a port and a slot is called a binding. There are two models under which key/value pairs can be transmitted through a workflow. Push ports and slots are the normal transmission mode where key/value pairs are pushed through the system. In the push model flowlets receive key/value pairs, operate on them, and may push them further down the workflow. Pull ports and slots are used to retrieve a single key/value pair and provide random access (with restrictions) to data storage. In the pull model flowlets request a value for a provided key.

Figure 3. Ports and Slots

Ports and Slots

Flowlets are subdivided into a number of partitions. Each partition represents a serial processor that executes the defined behavior of the corresponding flowlet, usually on a subset of the pushed key/value pairs. Partitions are assigned to and execute on a slave. A flowlet has a partitioner that defines the mapping of key/value pairs to partitions destined for the flowlet. When key/value pairs are pushed into a flowlet, the HAMR network layer uses the destination flowlet’s partitioner to determine which partition the key/value pair maps to. It looks up the slave that owns that partition, and sends the key/value pair to that slave.

Figure 4. Partitions

Partitions

Every flowlet has a partitioner assigned to it that defines the mapping of incoming keys to partition numbers, usually through a hash function. The flowlet also has a partition count that defines the parallelism of the flowlet, however it is the binding’s flow type between push ports that defines how the partitioner is used. With global flow type the hash of the key/value pair (provided by the destination flowlet’s partitioner) is modded by the total number of destination partitions. This means that a binding with a global flow type has access to all destination partitions.

A local flow type will mod the hash of the key/value pair by the number of partitions on the local slave.

The broadcast flow type ignores the partitioner and pushes the key/value pair to all destination partitions.

Synchronous flow type completely bypasses the partitioning mechanism, pushing the key/value pair to the co-indexed partition of the destination flowlet, within the stack frame of the flowlet that pushed the key/value pair. Co-indexed partitions are mapped to the same slave. Synchronous flow type is a performance optimization that allows for modular flowlets to be bound together without the performance penalty of network communication between them. It should be used whenever re-partitioning is not necessary. A good example of the need for a synchronous flow type is between a flowlet that reads lines of text from a file and a flowlet that parses the lines of text into appropriate key/value pairs. There is no need to partition key/value pairs between these two flowlets, so communication between them should be synchronous for performance reasons. The following figure illustrates the four different flow types.

Figure 5. Flow-types

Flow-types

Pull ports and slots are always bound synchronously.

HAMR tracks dependencies between flowlets and uses this information to transition flowlets and their partitions through a simple state machine. A partition is complete when all upstream flowlets are complete and when the partition has processed all key/value pairs. A flowlet is complete when all of its partitions are complete. Flowlets and partitions have several callbacks that HAMR executes to notify the flowlet or partition of a state event. For example when all upstream flowlets are complete, all of the partitions of the connected flowlet are notified. Flowlets at the beginning of the graph are called initial flowlets and immediately transition to the upstream complete state. Flowlets at the end of the graph are called terminal flowlets.

In practice developers rarely work at the flowlet level of abstraction.

There are several pre-defined types of flowlets that provide useful functionality. At the top of the flowlet hierarchy are resource, transform, and key/value store flowlets. A resource flowlet reads data from an external data source and writes data to an external data sink. The resource flowlet is responsible for converting the data from an external format, such as a string, into key/value pairs when reading, and is responsible for converting from key/value pairs into an external format when writing. A transform flowlet receives a key/value pair, applies a User Defined Function (UDF) to the pair, and produces zero or more key/value pairs. A key/value store flowlet is a type of distributed data structure for storing key/value pairs. Flowlets come in several other forms, and can be completely user defined, but these three make up the top of the flowlet hierarchy.

A component flowlet represents a higher level of abstraction such as a complete algorithm or a specific type of data storage. Component flowlets are composed of one or more resource, transform, or KVS flowlets.

When key/value pairs are pushed they are serialized into a sequence of bytes using a HAMR serializer. HAMR supports the serialization of all primitive types, arrays, Strings, and a few other classes by default, and the serialization can be easily extended with user defined types. The serialized key/value pairs are written to a buffer owned by the current thread that is executing the partition. This buffer is called an aggregator and it is used to store many key/value pairs in order to mitigate the impact of network overhead. When aggregators become full they will flush their data to the network. An aggregator is full when the buffer size is reached, or when a specified amount of time has past since the first key/value pair was placed into the aggregator’s buffer. In this way HAMR networking supports both batch and streaming models and allows the tradeoff of throughput and latency. When an aggregator flushes its buffer, it first sorts all of the key/value pairs in place according to their destination. All key/value pairs destined for the same slave will be sequential in the aggregator’s buffer. Then each sequence of key/value pairs is copied into a bin and sent over the network to the appropriate destination.