accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [2/5] git commit: ACCUMULO-1783 Use the sequence id to ensure AIF and AOF don't clobber one another.
Date Thu, 07 Nov 2013 05:24:04 GMT
ACCUMULO-1783 Use the sequence id to ensure AIF and AOF don't clobber
one another.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/d72e1cb5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/d72e1cb5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/d72e1cb5

Branch: refs/heads/ACCUMULO-1783
Commit: d72e1cb56b1c16bd09bff28d2d72163063781d63
Parents: dd21269
Author: Josh Elser <elserj@apache.org>
Authored: Tue Nov 5 17:08:08 2013 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Tue Nov 5 17:08:08 2013 -0500

----------------------------------------------------------------------
 pom.xml                                         | 17 +++++++--
 .../accumulo/pig/AbstractAccumuloStorage.java   | 36 +++++++++++---------
 .../apache/accumulo/pig/AccumuloStorage.java    |  7 ++--
 .../accumulo/pig/AccumuloWholeRowStorage.java   |  5 +--
 .../pig/AbstractAccumuloStorageTest.java        | 23 ++++++++-----
 .../pig/AccumuloWholeRowStorageTest.java        | 14 ++++++--
 6 files changed, 69 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/d72e1cb5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 963dab6..0c82c39 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,6 +30,14 @@
   				<target>1.6</target>
   			</configuration>
   		</plugin>
+  		<plugin>
+			<artifactId>maven-surefire-plugin</artifactId>
+			<version>2.16</version>
+			<configuration>
+				<argLine>-Xmx4g</argLine>
+                <redirectTestOutputToFile>true</redirectTestOutputToFile>
+			</configuration>
+		</plugin>
   	</plugins>
   </build>
   
@@ -50,9 +58,14 @@
       <version>1.2.1</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <version>1.2.1</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-core</artifactId>
-      <version>1.4.4</version>
+      <version>1.4.5-SNAPSHOT</version>
     </dependency>
     <dependency>
       <groupId>joda-time</groupId>
@@ -73,7 +86,7 @@
     <dependency>
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-minicluster</artifactId>
-      <version>1.4.4</version>
+      <version>1.4.5-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/d72e1cb5/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 37efe84..da4a51b 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -204,25 +204,29 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
     conf = job.getConfiguration();
     setLocationFromUri(location);
     
-    if (!conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured", false))
{
-      AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
-      AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
-      if (columnFamilyColumnQualifierPairs.size() > 0) {
-        LOG.info("columns: " + columnFamilyColumnQualifierPairs);
-        AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
-      }
-      
-      Collection<Range> ranges = Collections.singleton(new Range(start, end));
-      
-      LOG.info("Scanning Accumulo for " + ranges);
-      
-      AccumuloInputFormat.setRanges(conf, ranges);
-      
-      configureInputFormat(conf);
+    int sequence = AccumuloInputFormat.nextSequence();
+    
+    if (conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured", false))
{
+      throw new RuntimeException("Was provided sequence number which was already configured:
" + sequence);
     }
+    
+    AccumuloInputFormat.setInputInfo(conf, sequence, user, password.getBytes(), table, authorizations);
+    AccumuloInputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
+    if (columnFamilyColumnQualifierPairs.size() > 0) {
+      LOG.info("columns: " + columnFamilyColumnQualifierPairs);
+      AccumuloInputFormat.fetchColumns(conf, sequence, columnFamilyColumnQualifierPairs);
+    }
+    
+    Collection<Range> ranges = Collections.singleton(new Range(start, end));
+    
+    LOG.info("Scanning Accumulo for " + ranges);
+    
+    AccumuloInputFormat.setRanges(conf, sequence, ranges);
+    
+    configureInputFormat(conf, sequence);
   }
   
-  protected void configureInputFormat(Configuration conf) {
+  protected void configureInputFormat(Configuration conf, int sequence) {
     
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/d72e1cb5/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index dcfd888..bd43dce 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -141,8 +141,9 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
     return map;
   }
   
-  protected void configureInputFormat(Configuration conf) {
-    AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
+  @Override
+  protected void configureInputFormat(Configuration conf, int sequence) {
+    AccumuloInputFormat.addIterator(conf, sequence, new IteratorSetting(50, WholeRowIterator.class));
   }
   
   @Override
@@ -229,7 +230,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
    */
   protected void addColumn(Mutation mutation, String columnDef, String columnName, Value
columnValue) {
     if (null == columnDef && null == columnName) {
-      // TODO Emit a counter here somehow?
+      // TODO Emit a counter here somehow? org.apache.pig.tools.pigstats.PigStatusReporter
       log.warn("Was provided no name or definition for column. Ignoring value");
       return;
     }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/d72e1cb5/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
index af3ee01..499558f 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
@@ -83,8 +83,9 @@ public class AccumuloWholeRowStorage extends AbstractAccumuloStorage {
     return tuple;
   }
   
-  protected void configureInputFormat(Configuration conf) {
-    AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
+  @Override
+  protected void configureInputFormat(Configuration conf, int sequence) {
+    AccumuloInputFormat.addIterator(conf, sequence, new IteratorSetting(50, WholeRowIterator.class));
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/d72e1cb5/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index 1b5b81a..5f4ecf2 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -33,25 +33,32 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.data.Tuple;
+import org.junit.Before;
 import org.junit.Test;
 
 public class AbstractAccumuloStorageTest {
   
-  public Job getExpectedLoadJob(String inst, String zookeepers, String user, String password,
String table, String start, String end,
+  @Before
+  public void setup() {
+    AccumuloInputFormat.resetCounters();
+    AccumuloOutputFormat.resetCounters();
+  }
+  
+  public Job getExpectedLoadJob(int sequence, String inst, String zookeepers, String user,
String password, String table, String start, String end,
       Authorizations authorizations, List<Pair<Text,Text>> columnFamilyColumnQualifierPairs)
throws IOException {
     Collection<Range> ranges = new LinkedList<Range>();
     ranges.add(new Range(start, end));
     
     Job expected = new Job();
     Configuration expectedConf = expected.getConfiguration();
-    AccumuloInputFormat.setInputInfo(expectedConf, user, password.getBytes(), table, authorizations);
-    AccumuloInputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
-    AccumuloInputFormat.fetchColumns(expectedConf, columnFamilyColumnQualifierPairs);
-    AccumuloInputFormat.setRanges(expectedConf, ranges);
+    AccumuloInputFormat.setInputInfo(expectedConf, sequence, user, password.getBytes(), table,
authorizations);
+    AccumuloInputFormat.setZooKeeperInstance(expectedConf, sequence, inst, zookeepers);
+    AccumuloInputFormat.fetchColumns(expectedConf, sequence, columnFamilyColumnQualifierPairs);
+    AccumuloInputFormat.setRanges(expectedConf, sequence, ranges);
     return expected;
   }
   
-  public Job getDefaultExpectedLoadJob() throws IOException {
+  public Job getDefaultExpectedLoadJob(int sequence) throws IOException {
     String inst = "myinstance";
     String zookeepers = "127.0.0.1:2181";
     String user = "root";
@@ -66,7 +73,7 @@ public class AbstractAccumuloStorageTest {
     columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text("col2"), new
Text("cq2")));
     columnFamilyColumnQualifierPairs.add(new Pair<Text,Text>(new Text("col3"), null));
     
-    Job expected = getExpectedLoadJob(inst, zookeepers, user, password, table, start, end,
authorizations, columnFamilyColumnQualifierPairs);
+    Job expected = getExpectedLoadJob(sequence, inst, zookeepers, user, password, table,
start, end, authorizations, columnFamilyColumnQualifierPairs);
     return expected;
   }
   
@@ -129,7 +136,7 @@ public class AbstractAccumuloStorageTest {
     s.setLocation(getDefaultLoadLocation(), actual);
     Configuration actualConf = actual.getConfiguration();
     
-    Job expected = getDefaultExpectedLoadJob();
+    Job expected = getDefaultExpectedLoadJob(1);
     Configuration expectedConf = expected.getConfiguration();
     
     TestUtils.assertConfigurationsEqual(expectedConf, actualConf);

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/d72e1cb5/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
index 690d86c..3a0ab85 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloWholeRowStorageTest.java
@@ -30,6 +30,7 @@ import java.util.List;
 
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.core.data.ColumnUpdate;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
@@ -42,9 +43,16 @@ import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DefaultDataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.junit.Before;
 import org.junit.Test;
 
 public class AccumuloWholeRowStorageTest {
+
+  @Before
+  public void setup() {
+    AccumuloInputFormat.resetCounters();
+    AccumuloOutputFormat.resetCounters();
+  }
   
   @Test
   public void testConfiguration() throws IOException {
@@ -56,9 +64,11 @@ public class AccumuloWholeRowStorageTest {
     s.setLocation(test.getDefaultLoadLocation(), actual);
     Configuration actualConf = actual.getConfiguration();
     
-    Job expected = test.getDefaultExpectedLoadJob();
+    final int sequence = 1;
+    
+    Job expected = test.getDefaultExpectedLoadJob(sequence);
     Configuration expectedConf = expected.getConfiguration();
-    AccumuloInputFormat.addIterator(expectedConf, new IteratorSetting(50, WholeRowIterator.class));
+    AccumuloInputFormat.addIterator(expectedConf, sequence, new IteratorSetting(50, WholeRowIterator.class));
     
     TestUtils.assertConfigurationsEqual(expectedConf, actualConf);
   }


Mime
View raw message