Accumulo Ingest Using Apache Pig

In this post, I’ll go into how to utilize Apache Pig to ingest data into Accumulo. These concepts are also applicable to other NoSQL data stores.

Pig is a great tool to use to create ad-hoc queries of data stored using Hadoop, as well as a simple tool to use for people within the organization who are not ready to create their own Map-Reduce jobs, or otherwise delve more deeply into the world of Hadoop. A while ago, I was trying to import a bunch of CSV data into Accumulo, and I thought this would be a great time to learn Pig! I quickly realized though that Accumulo’s extra structure, like other NoSQL data stores, was not as easily wrangled in Pig like flat files such as my CSV’s.

The main sticking point with Accumulo and other flexibly structured data stores based on key-value pairs with data that is more rigidly structured is that a row in a file such as a CSV likely has many values, whereas in Accumulo, there is only one. In order to get around this, each of the values needs to be placed in its own ‘row’ so-to-speak. In Accumulo’s case, this means a row with a row identifier, a column family, column qualifier, column visibility, and finally the value.

Now, this might sound easy, but once you try to do this in Pig you can quickly find yourself a bit stuck, but I assure you there’s a way to do it that is rather painless!

The first thing to do in your Pig script, is to register all the jar’s you’ll need.  The following will seem like a lot, but I’ll explain them all in just a bit.

register /path/to/accumulo-core-1.4.2.jar
register /path/to/cloudtrace-1.4.2.jar
register /path/to/libthrift-0.6.1.jar
register /path/to/zookeeper-3.3.1.jar
register /path/to/accumulo-pig-1.4.2.jar
register /usr/lib/pig/contrib/piggybank/java/piggypank.jar

The first one, accumulo-core should be self explanatory. The next several, cloudtrace, libthrift, and zookeeper, are requirements for talking with Accumulo. Next is the Accumulo module for Pig, which adds storage implementations for Accumulo in Pig. Lastly is the piggybank, which I’ve imported here for using CSV data. If you’re loading in data using some other means, you might not need this.

Now that all the required jars are loaded, we can load our CSV!

DATA = LOAD '$file' USING org.apache.pig.piggybank.storage.CSVLoader() 
    AS (symbol:chararray, lasttime, lastprice, lastchange);

In my example, I’m using some stock data as the input. Each row contains a stock symbol, followed by the last time, price, and change in price observed in the data.

Now is where all the hard work is done. Again, what we want to accomplish is get a row identifier combined with a value. To distinguish the values, we’ll also need some additional data, such as the column qualifier. To do this, we’ll generate new intermediate results for each column from the CSV, containing our row identifier (the stock symbol), the column qualifier (i.e. it’s name), and the actual value.

TimeData = FOREACH DATA GENERATE symbol, 'lasttime' AS cq, lasttime AS val;
PriceData = FOREACH DATA GENERATE symbol, 'lastprice' AS cq, lastprice AS val;
ChangeData = FOREACH DATA GENERATE symbol, 'lastchange' AS cq, lastchange AS val;

After pivoting the data from a pure row format, we join all the data back together. The results will now be such that each ‘row’ of our results called ColData is a symbol, a name indicating what column the data is for, and the value for that row-column intersection.

ColData = UNION ONSCHEMA TimeData, PriceData, ChangeData;

Just before storing the data in Accumulo, there’s one last step to take. If we just imported one point of data for each stock symbol, we would be more-or-less done, however we want to be able to import data again and again! If we just imported price data for a few stocks, it wouldn’t be big data. So, we’ll add a timestamp to each row identifier to make it unique. We also need to add a few pieces of information. Pig won’t be able to place the data into Accumulo without a column family, or without a column visibility, so we’ll add a value for each of those that will be the same for every value we’re inserting.

AccumuloData = FOREACH ColData GENERATE CONCAT(symbol, '-$datetime') 
    AS row_id, 'pricepoint' AS cf, cq, 'everyone' AS cv, val;

The labels for the values are not important, but the ordering is! The Accumulo module for Pig will read the first value in the result as the row identifier, the second as the column family, etc.

Finally, we’re ready to store the data! This is a quick step, but is a bit difficult syntax wise at first. Basically, the URL says “store this data in accumulo, in the table I specified in the instance I gave you, with the supplied credentials, and use the specified zookeeper address to find all the instance specific information.”

STORE AccumuloData into 
    'accumulo://$table?instance=$instance&user=$user&password=$password&zookeepers=$zookeeper'
    using org.apache.accumulo.pig.AccumuloStorage;

That might seem like a lot, but it could be worse! I passed all of those pieces of information into my script using parameters, but you can hard-code them in if you so desire. For an example without any of those passed in as parameters:

STORE AccumuloData into 
    'accumulo://stocks?instance=my_instance&user=my_user&password=my_pass&zookeepers=zoo.local'
     using org.apache.accumulo.pig.AccumuloStorage;

And that’s it! If you’d like a fill copy of the script, see below. I hope this helps!

register /path/to/accumulo-core-1.4.2.jar
register /path/to/cloudtrace-1.4.2.jar
register /path/to/libthrift-0.6.1.jar
register /path/to/zookeeper-3.3.1.jar
register /path/to/accumulo-pig-1.4.2.jar
register /usr/lib/pig/contrib/piggybank/java/piggypank.jar
 
DATA = LOAD '$file' USING org.apache.pig.piggybank.storage.CSVLoader() 
    AS (symbol:chararray, lasttime, lastprice, lastchange);
 
TimeData = FOREACH DATA GENERATE symbol, 'lasttime' AS cq, lasttime AS val;
PriceData = FOREACH DATA GENERATE symbol, 'lastprice' AS cq, lastprice AS val;
ChangeData = FOREACH DATA GENERATE symbol, 'lastchange' AS cq, lastchange AS val;
 
ColData = UNION ONSCHEMA TimeData, PriceData, ChangeData;
 
AccumuloData = FOREACH ColData GENERATE CONCAT(symbol, '-$datetime') 
    AS row_id, 'pricepoint' AS cf, cq, 'everyone' AS cv, val;
 
STORE AccumuloData into 
    'accumulo://$table?instance=$instance&user=$user&password=$password&zookeepers=$zookeeper'
    using org.apache.accumulo.pig.AccumuloStorage;