Frequently Asked Questions
We've been asked these questions frequently, we promise.

No, you can run HAMR without Hadoop if you have a shared filesystem (such as NFS) in your cluster. Use HORN (HAMR's Other Resource Negotiator) to run HAMR jobs. HORN isn't recommended for multi-user environments since HORN doesn't keep track of resources that are in use.

To read binary data you will need to implement a record reader that knows how to convert the binary data into the desired values.

The DelimitedFileRecordReader reads bytes from a file and provides as a convenience an implementation of the getRecordValue method for returning UTF Strings from a byte buffer. The method getUtf8RecordValue is the implementation and is used by the provided WhiteSpace record readers.

Extending the DelimitedFileRecordReader class and providing an implementation of getRecordValue() to convert the binary data contained in the ByteBuffer into the desired value is what is required. If you are going to be returning string values then extending the WhiteSpaceFileRecordReader and overriding the getRecordValue() with your implementation should be all that is needed.

There are a few other methods that will need to be implemented if extending DelimitedFileRecordReader since the getUtf8RecordValue() is using a char buffer for the value. Take a look at AbstractWhitespaceFileRecordReader and WhitespaceFileRecordReader for examples.

Iterative processing is achieved by submitting multiple jobs. However, only a single job may run at any instant in time. It is permitted to reuse a given key/value store container so that any key/value pair held in a container may be referenced and/or changed from one iteration to the next.

Check out the Multijob and LogisticRegression examples to see how to reuse a container between iterations.

Each of the configured flowlets within a given job, including ResourceReaders, Transforms, KeyValueStores and ResourceWriters is internally represented by partitions. Each of these partitions operate on their respective data independently from the other partitions.

The number of partitions a file is split into is determined by the partition count of the Flowlet that is performing the reading or writing. By default the partition count is equal to (the number of cores * the number of nodes). Each flowlet within a job can have it's partition count set differently to achieve a varied level of parallelism within the job. ResourceReaders divide the file into frames, and frames are assigned to partitions, so that the file may be read in parallel. ResourceWriters will write an independent file for each partition with the extension .partXXXX appended to the file name.

No, they cannot all be pull because there is no way to signal the apply() method. Apply is called when a key/value pair is pushed to the in() slot.

In general this isn't a good idea. Although reliability hasn't been implemented yet, the HAMR API has been carefully designed for reliability. Intermediate variables are considered 'state'. What happens if a node fails? HAMR should be able to migrate the failed partitions to another node. Therefore HAMR must know about all of the state in order for HAMR to pick up at a consistent point in the job. HAMR won't know about your intermediate variable, and therefore may not be able to start where it left off.

If you are willing to forgo reliability, you could add a member variable to your own transform. For example:

static class MyTransform extends SymmetricTransform {
    AtomicInteger intermediateVariable;
    public MyTransform(Workflow root, int intermediateParam) {
        super(root);
        this.intermediateVariable = new AtomicInteger(intermediateParam);
    }
    
    @Override
    public void apply(String key, Integer value, Flow context) throws IOException {
        if (value > 0)
            intermediateVariable.incrementAndGet();
        context.push(out2(), key, (pullVal1 + value) * pullVal2);
    }
}

Notice that the member must be atomic. This is because this member is accessed by all partitions on the node.

Each Transform comes with a pull port for convenience. Access to this port is through the pull() method. Most workflows don't use it, but it's always there. You can bind this port to pull slots, allowing you random access to data. Some ResourceReaders have pull slots, allowing you to grab a single record at a particular byte offset, or allowing you to grab a single row from a database. KeyValueStores have pull slots allowing you to randomly access values given a key.

Let's say you've already ran some statistical analysis on a lot of text, and have determined the relative frequency of terms within this text. This data is stored in a ValueStore where the key, a String, represents the term, and the value, a Double, represents the previously calculated relative frequency. Now you're processing terms that are streaming into the system and you need the relative frequency. The Transform receives some integer ID as the key, and a term. You can do something like this:

final Transform getFrequency = new Transform(workflow) {
    @Override
    public void apply(Integer id, String term, Flow context) throws IOException {
        double frequency = context.pull(term);
        context.push(term, frequency);
    }
};

getFrequency.bindPull(frequencies);

Pulls must always be local! That is, a Flowlet must only pull a key that is assigned to the same partition. Usually this is handled for you by HAMR as long as the Flowlet you are pulling from and the Flowlet you are pulling to are partitioned in the same way. For example you can not generally do the following unless you specifically override the Flowlet's partitioner to ensure pulls are always local.

final SymmetricTransform wontWork = new SymmetricTransform(job) {
    @Override
    public void apply(Long a, Long b, Flow context) throws IOException {
        Long nextA = context.pull(a + 1); // Probably will fail
        context.push(word, nextA == null ? Long.valueOf(b) : nextA +b);
    }
};

The pull using the key 'a + 1' will likely not be local. Which partition is the key 'a + 1' assigned to? It is probably not assigned to the same partition as the key 'a'. By default, each Flowlet gets a HashPartitioner. When keys are sent, the partitioner of the destination flowlet is used. A HashPartitioner will get the hash code of the key (in this case the value of the long) and mod that by the total number of partitions. The result is a number between 0 and the number of partitions in the destination flowlet. In this case the key 'a + 1' will most likely be assigned to the next partition, not the current one. Hence it will fail. In general you can work around this by sending the key 'a + 1' to another Flowlet which can then pull the key because it will be local.

The temporary directory defined in hamr-site.xml must be on a shared file system (such as NFS) that is visible to all the nodes in the cluster. For example:

<property name='cluster.tmpdir'>/share/hamr-tmp</property>