So I've made it work, but I don't "get it" yet.

I have no idea why my DIY server works when I set the environment variables on the machine that kicks off pig ("master"), and in EMR it doesn't.  I recompiled ConfigHelper and CassandraStorage with tons of debugging, and in EMR I can see the hadoop Configuration object get the proper values on the master node, and I can see it does NOT propagate to the task threads. 

The other part that was driving me nuts could be made more user friendly.  The issue is this: I started to try to set cassandra.thrift.address, cassandra.thrift.port, cassandra.partitioner.class in mapred-site.xml, and it didn't work.  After even more painful debugging, I noticed that the only time Cassandra sets the input/output versions of those settings (and these input/output specific versions are the only versions really used!) is when Cassandra maps the system environment variables.  So, having cassandra.thrift.address in mapred-site.xml does NOTHING, as I needed to have cassandra.output.thrift.address set.  It would be much nicer if the get{Input/Output}XYZ checked for the existence of getXYZ if get{Input/Output}XYZ is empty/null.  E.g. in getOutputThriftAddress(), if that setting is null, it would have been nice if that method returned getThriftAddress().  My problem went away when I put the full cross product in the XML. E.g. cassandra.input.thrift.address and cassandra.output.thrift.address (and port, and partitioner).

I still want to know why the old easy way (of setting the 3 system variables on the pig starter box, and having the config flow into the task trackers) doesn't work!


On Fri, Jan 4, 2013 at 9:04 AM, William Oberman <> wrote:
On all tasktrackers, I see: PIG_OUTPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set
        at org.apache.cassandra.hadoop.pig.CassandraStorage.setStoreLocation(
        at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat.setLocation(
        at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter.setUpContext(
        at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter.getCommitters(
        at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter.<init>(
        at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat.getOutputCommitter(
        at org.apache.hadoop.mapred.Task.initialize(
        at org.apache.hadoop.mapred.Child$
        at Method)
        at org.apache.hadoop.mapred.Child.main(

On Thu, Jan 3, 2013 at 10:45 PM, aaron morton <> wrote:
Instead, I get an error from CassandraStorage that the initial address isn't set (on the slave, the master is ok). 
Can you post the full error ?

Aaron Morton
Freelance Cassandra Developer
New Zealand


On 4/01/2013, at 11:15 AM, William Oberman <> wrote:

Anyone ever try to read or write directly between EMR <-> Cassandra?  

I'm running various Cassandra resources in Ec2, so the "physical connection" part is pretty easy using security groups.  But, I'm having some configuration issues.  I have managed to get Cassandra + Hadoop working in the past using a DIY hadoop cluster, and looking at the configurations in the two environments (EMR vs DIY), I'm not sure what's different that is causing my failures...  I should probably note I'm using the Pig integration of Cassandra.

Versions: Hadoop 1.0.3, Pig 0.10, Cassandra 1.1.7.

I'm 99% sure I have classpaths working (because I didn't at first, and now EMR can find and instantiate CassandraStorage on master and slaves).  What isn't working are the system variables.  In my DIY cluster, all I needed to do was:
export PIG_RPC_PORT=9160
export PIG_PARTITIONER=org.apache.cassandra.dht.RandomPartitioner
And the task trackers somehow magically picked up the values (I never questioned how/why).  But, in EMR, they do not.  Instead, I get an error from CassandraStorage that the initial address isn't set (on the slave, the master is ok).  

My DIY cluster used CDH3, which was hadoop 0.20.something.  So, maybe the problem is a different version of hadoop?  

Looking at the CassandraStorage class, I realize I have no idea how it used to work, since it only seems to look at System variables.  Those variables are set on the Job.getConfiguration object.  I don't know how that part of hadoop works though... do variables that get set on Job on the master get propagated to the task threads?  I do know that on my DIY cluster, I do NOT set those system variables on the slaves...