Accumulo Ingest using Apache Pig

In this post, I’ll go into how to utilize Apache Pig to ingest data into Accumulo.  These concepts are also applicable to other NoSQL data stores.

Pig is a great tool to use to create ad-hoc queries of data stored using Hadoop, as well as a simple tool to use for people within the organization who are not ready to create their own Map-Reduce jobs, or otherwise delve more deeply into the world of Hadoop.  A while ago, I was trying to import a bunch of CSV data into Accumulo, and I thought this would be a great time to learn Pig!  I quickly realized though that Accumulo’s extra structure, like other NoSQL data stores, was not as easily wrangled in Pig like flat files such as my CSV’s.

The main sticking point with Accumulo and other flexibly structured data stores based on key-value pairs with data that is more rigidly structured is that a row in a file such as a CSV likely has many values, whereas in Accumulo, there is only one.  In order to get around this, each of the values needs to be placed in its own ‘row’ so-to-speak.  In Accumulo’s case, this means a row with a row identifier, a column family, column qualifier, column visibility, and finally the value.

Now, this might sound easy, but once you try to do this in Pig you can quickly find yourself a bit stuck, but I assure you there’s a way to do it that is rather painless!

The first thing to do in your Pig script, is to register all the jar’s you’ll need.  The following will seem like a lot, but I’ll explain them all in just a bit.

register /path/to/accumulo-core-1.4.2.jar
register /path/to/cloudtrace-1.4.2.jar
register /path/to/libthrift-0.6.1.jar
register /path/to/zookeeper-3.3.1.jar
register /path/to/accumulo-pig-1.4.2.jar
register /usr/lib/pig/contrib/piggybank/java/piggypank.jar

The first one, accumulo-core should be self explanatory.  The next several, cloudtrace, libthrift, and zookeeper, are requirements for talking with Accumulo.  Next is the Accumulo module for Pig, which adds storage implementations for Accumulo in Pig.  Lastly is the piggybank, which I’ve imported here for using CSV data.  If you’re loading in data using some other means, you might not need this.

Now that all the required jars are loaded, we can load our CSV!

DATA = LOAD '$file' USING org.apache.pig.piggybank.storage.CSVLoader() 
    AS (symbol:chararray, lasttime, lastprice, lastchange);

In my example, I’m using some stock data as the input.  Each row contains a stock symbol, followed by the last time, price, and change in price observed in the data.

Now is where all the hard work is done.  Again, what we want to accomplish is get a row identifier combined with a value.  To distinguish the values, we’ll also need some additional data, such as the column qualifier.  To do this, we’ll generate new intermediate results for each column from the CSV, containing our row identifier (the stock symbol), the column qualifier (i.e. it’s name), and the actual value.

TimeData = FOREACH DATA GENERATE symbol, 'lasttime' AS cq, lasttime AS val;
PriceData = FOREACH DATA GENERATE symbol, 'lastprice' AS cq, lastprice AS val;
ChangeData = FOREACH DATA GENERATE symbol, 'lastchange' AS cq, lastchange AS val;

After pivoting the data from a pure row format, we join all the data back together.  The results will now be such that each ‘row’ of our results called ColData is a symbol, a name indicating what column the data is for, and the value for that row-column intersection.

ColData = UNION ONSCHEMA TimeData, PriceData, ChangeData;

Just before storing the data in Accumulo, there’s one last step to take. If we just imported one point of data for each stock symbol, we would be more-or-less done, however we want to be able to import data again and again! If we just imported price data for a few stocks, it wouldn’t be big data. So, we’ll add a timestamp to each row identifier to make it unique. We also need to add a few pieces of information. Pig won’t be able to place the data into Accumulo without a column family, or without a column visibility, so we’ll add a value for each of those that will be the same for every value we’re inserting.

AccumuloData = FOREACH ColData GENERATE CONCAT(symbol, '-$datetime') 
    AS row_id, 'pricepoint' AS cf, cq, 'everyone' AS cv, val;

The labels for the values are not important, but the ordering is! The Accumulo module for Pig will read the first value in the result as the row identifier, the second as the column family, etc.

Finally, we’re ready to store the data! This is a quick step, but is a bit difficult syntax wise at first. Basically, the URL says “store this data in accumulo, in the table I specified in the instance I gave you, with the supplied credentials, and use the specified zookeeper address to find all the instance specific information.”

STORE AccumuloData into 
    'accumulo://$table?instance=$instance&user=$user&password=$password&zookeepers=$zookeeper'
    using org.apache.accumulo.pig.AccumuloStorage;

That might seem like a lot, but it could be worse! I passed all of those pieces of information into my script using parameters, but you can hard-code them in if you so desire. For an example without any of those passed in as parameters:

STORE AccumuloData into 
    'accumulo://stocks?instance=my_instance&user=my_user&password=my_pass&zookeepers=zoo.local'
     using org.apache.accumulo.pig.AccumuloStorage;

And that’s it! If you’d like a fill copy of the script, see below. I hope this helps!

register /path/to/accumulo-core-1.4.2.jar
register /path/to/cloudtrace-1.4.2.jar
register /path/to/libthrift-0.6.1.jar
register /path/to/zookeeper-3.3.1.jar
register /path/to/accumulo-pig-1.4.2.jar
register /usr/lib/pig/contrib/piggybank/java/piggypank.jar
 
DATA = LOAD '$file' USING org.apache.pig.piggybank.storage.CSVLoader() 
    AS (symbol:chararray, lasttime, lastprice, lastchange);
 
TimeData = FOREACH DATA GENERATE symbol, 'lasttime' AS cq, lasttime AS val;
PriceData = FOREACH DATA GENERATE symbol, 'lastprice' AS cq, lastprice AS val;
ChangeData = FOREACH DATA GENERATE symbol, 'lastchange' AS cq, lastchange AS val;
 
ColData = UNION ONSCHEMA TimeData, PriceData, ChangeData;
 
AccumuloData = FOREACH ColData GENERATE CONCAT(symbol, '-$datetime') 
    AS row_id, 'pricepoint' AS cf, cq, 'everyone' AS cv, val;
 
STORE AccumuloData into 
    'accumulo://$table?instance=$instance&user=$user&password=$password&zookeepers=$zookeeper'
    using org.apache.accumulo.pig.AccumuloStorage;

C++ Templates with Virtual Functions?

C++ offers two forms of polymorphism: virtual functions and overrides / implementations, and templates.  There can be cases when it makes the most sense to essentially have both.  You can’t do this though.  C++ expressly forbids virtual template functions because the virtual tables that would have to be built are way too complex.  Luckily, C++ offers a way around this.  It’s a programming paradigm called Policy Based Design.  In this article, I’ll cover why you might need this, and how it works.

I recently was presented with a case where I needed a class that had common operations to be performed on varying different type implementations, so I implemented it as a template.

template<typename Type>
class Loader {
public:
    Loader(Type* const value, const std::string refName);
    std::string getRefName() const;
    Type* const getType() const;
private:
    Type* value;
    std::string name;
}

This is a normal thing in C++, no big deal. I then realized though that there were a number of operations that needed to be specific for different implementations. Now, normally I might handle this by specializing the template like below:

template<typename Type>
std::string Loader::getRefName() const {
    return "Irrational Number " + name;
}

But I needed more than just specialization.  What I really needed was an interface that I could extend, and then implement the extension. The only way to do this is to have some sort of virtual function in the template that could be inherited or overrode.  But C++ forbids this, and for good reason.

There is hope though.  In 2001, a gentleman by the name of Andrei Alexandrescu wrote about a new paradigm called Policy Based Design.  The essentials are that you write your base / interface template, and have an extra template parameter for the implementer of the interface.  Then any of the methods you want to act as a method to be implemented, you write a call on the implementer for the method of that signature.

template<typename Type, typename Implementer>
class Loader {
public:
    Loader(Type* const value, const std::string refName);
    std::string getRefName() const;
    Type* const getType() const;
    Type* const load(int itemNum) {
        return impl.load(itemNum);
    }
private:
    Implementer impl;
    Type* value;
    std::string name;
}

How in the world does this work? During precompilation, the compiler adds a note to look for a method with a signature similar enough to how it was used in the method definition that at least with a cast, it can complete the call.  Then, if it finds it during compilation, it substitutes that method call in.  If not, it throws an error.  Using the example above to explain this, the compiler sees line 8, and will make a note to look in any Implementer for a method that returns the type specified as the first template parameter called load that takes an integer as its argument.

This paradigm gives a lot of extra flexibility in class design in C++.  For example, you can have multiple extensions of the interface provided by the base class.  Another example is in defining the specific implementation to use when you compile.  You don’t need this paradigm to do this, but it does make it easier.

/* LoaderConfig.h */
 
typedef ImaginaryNumberFileSystemImpl ImaginaryNumberLoaderImpl;
typedef IrrationalNumberFileSystemImpl IrrationalNumberLoaderImpl;
 /* ImaginaryNumberLoader */
 
typedef Loader<ImaginaryNumber, ImaginaryNumberLoaderImpl> IrmaginaryNumberLoader;

With this, I can have configurations for different builds for, say different systems, all by substituting out different versions of LoaderConfig.h header file. I could easily have another that uses a sockets implementation for my loaders.

As I’ve said before, there’s extra flexibility presented with this paradigm, and a lot of extra power.  You might not realize all the things this allows you to do at first, but over time, with this in your toolbox, you can start to do some pretty amazing things in C++ that developers in other languages can only dream about.