accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [1/2] git commit: ACCUMULO-1783 Update code to work with upstream 1.5 changes.
Date Sun, 24 Nov 2013 05:53:30 GMT
Updated Branches:
  refs/heads/ACCUMULO-1783-1.5 [created] cb720e850


ACCUMULO-1783 Update code to work with upstream 1.5 changes.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/0ce6fb34
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/0ce6fb34
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/0ce6fb34

Branch: refs/heads/ACCUMULO-1783-1.5
Commit: 0ce6fb34a852050643aae5ad5feb5c21c4b6b94e
Parents: 170229a
Author: Josh Elser <elserj@apache.org>
Authored: Fri Nov 22 18:00:15 2013 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Fri Nov 22 18:00:15 2013 -0500

----------------------------------------------------------------------
 pom.xml                                         | 11 ++-
 .../accumulo/pig/AbstractAccumuloStorage.java   | 79 ++++++++++++--------
 .../apache/accumulo/pig/AccumuloStorage.java    |  7 +-
 .../accumulo/pig/AccumuloWholeRowStorage.java   |  6 +-
 .../pig/AbstractAccumuloStorageTest.java        | 48 ++++++++----
 5 files changed, 98 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/0ce6fb34/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0c82c39..587bf1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,7 +18,7 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.accumulo</groupId>
   <artifactId>accumulo-pig</artifactId>
-  <version>1.4.4-SNAPSHOT</version>
+  <version>1.5.1-SNAPSHOT</version>
   
   <build>
   	<plugins>
@@ -65,7 +65,7 @@
     <dependency>
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-core</artifactId>
-      <version>1.4.5-SNAPSHOT</version>
+      <version>1.5.1-SNAPSHOT</version>
     </dependency>
     <dependency>
       <groupId>joda-time</groupId>
@@ -78,6 +78,11 @@
       <version>15.0</version>
     </dependency>
     <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>1.2.16</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
       <version>3.4.5</version>
@@ -86,7 +91,7 @@
     <dependency>
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-minicluster</artifactId>
-      <version>1.4.5-SNAPSHOT</version>
+      <version>1.5.1-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/0ce6fb34/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index a829d4a..890abf3 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -26,10 +26,14 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 import java.util.Properties;
 
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
@@ -77,7 +81,6 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
   private static final String COLON = ":", COMMA = ",";
   private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName();
 
-  private Configuration conf;
   private RecordReader<Key,Value> reader;
   private RecordWriter<Text,Mutation> writer;
 
@@ -104,6 +107,10 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
 
   public AbstractAccumuloStorage() {}
 
+  protected Map<String,String> getInputFormatEntries(Configuration conf) {
+    return getEntries(conf, INPUT_PREFIX);
+  }
+
   @Override
   public Tuple getNext() throws IOException {
     try {
@@ -202,10 +209,6 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
     return writer;
   }
 
-  protected Map<String,String> getInputFormatEntries(Configuration conf) {
-    return getEntries(conf, INPUT_PREFIX);
-  }
-
   protected Map<String,String> getEntries(Configuration conf, String prefix) {
     Map<String,String> entries = new HashMap<String,String>();
 
@@ -221,44 +224,50 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
 
   @Override
   public void setLocation(String location, Job job) throws IOException {
-    conf = job.getConfiguration();
     setLocationFromUri(location);
 
-    Map<String,String> entries = getInputFormatEntries(conf);
-
+    Map<String,String> entries = getInputFormatEntries(job.getConfiguration());
     for (String key : entries.keySet()) {
-      conf.unset(key);
+      job.getConfiguration().unset(key);
     }
 
-    AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations);
-    AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers);
+    try {
+      AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password));
+    } catch (AccumuloSecurityException e) {
+      throw new IOException(e);
+    }
+
+    AccumuloInputFormat.setInputTableName(job, table);
+    AccumuloInputFormat.setScanAuthorizations(job, authorizations);
+    AccumuloInputFormat.setZooKeeperInstance(job, inst, zookeepers);
+
     if (columnFamilyColumnQualifierPairs.size() > 0) {
       LOG.info("columns: " + columnFamilyColumnQualifierPairs);
-      AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs);
+      AccumuloInputFormat.fetchColumns(job, columnFamilyColumnQualifierPairs);
     }
 
     Collection<Range> ranges = Collections.singleton(new Range(start, end));
 
     LOG.info("Scanning Accumulo for " + ranges + " for table " + table);
 
-    AccumuloInputFormat.setRanges(conf, ranges);
+    AccumuloInputFormat.setRanges(job, ranges);
 
-    configureInputFormat(conf);
+    configureInputFormat(job);
   }
 
   /**
-   * Method to allow specific implementations to add more elements to the Configuration for
reading data from Accumulo.
+   * Method to allow specific implementations to add more elements to the Job for reading
data from Accumulo.
    * 
-   * @param conf
+   * @param job
    */
-  protected void configureInputFormat(Configuration conf) {}
+  protected void configureInputFormat(Job job) {}
 
   /**
-   * Method to allow specific implementations to add more elements to the Configuration for
writing data to Accumulo.
+   * Method to allow specific implementations to add more elements to the Job for writing
data to Accumulo.
    * 
-   * @param conf
+   * @param job
    */
-  protected void configureOutputFormat(Configuration conf) {}
+  protected void configureOutputFormat(Job job) {}
 
   @Override
   public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
@@ -288,20 +297,30 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
   }
 
   public void setStoreLocation(String location, Job job) throws IOException {
-    conf = job.getConfiguration();
     setLocationFromUri(location);
-    
-    // TODO If Pig ever uses a MultipleOutputs-esque construct, this approach will fall apart
-    if (conf.get(AccumuloOutputFormat.class.getSimpleName() + ".configured") == null) {
-      AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
-      AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
-      AccumuloOutputFormat.setMaxLatency(conf, maxLatency);
-      AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize);
-      AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads);
+
+    // If Pig ever uses an approach like they handle inputs (load), this will fall apart.
+    // Currently, it appears that multiple stores will get new m/r jobs
+    if (job.getConfiguration().get(AccumuloOutputFormat.class.getSimpleName() + ".configured")
== null) {
+      try {
+        AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password));
+      } catch (AccumuloSecurityException e) {
+        throw new IOException(e);
+      }
+
+      // AccumuloOutputFormat.setCreateTables(job, true);
+      // AccumuloOutputFormat.setDefaultTableName(job, table);
+      AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers);
+
+      BatchWriterConfig bwConfig = new BatchWriterConfig();
+      bwConfig.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS);
+      bwConfig.setMaxMemory(maxMutationBufferSize);
+      bwConfig.setMaxWriteThreads(maxWriteThreads);
+      AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
 
       LOG.info("Writing data to " + table);
 
-      configureOutputFormat(conf);
+      configureOutputFormat(job);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/0ce6fb34/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 8e9cfef..742480c 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -18,15 +18,14 @@ import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.Logger;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.Utf8StorageConverter;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 
@@ -142,8 +141,8 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
   }
   
   @Override
-  protected void configureInputFormat(Configuration conf) {
-    AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
+  protected void configureInputFormat(Job job) {
+    AccumuloInputFormat.addIterator(job, new IteratorSetting(50, WholeRowIterator.class));
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/0ce6fb34/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
index a6db638..784904f 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloWholeRowStorage.java
@@ -33,8 +33,8 @@ import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DefaultDataBag;
@@ -84,8 +84,8 @@ public class AccumuloWholeRowStorage extends AbstractAccumuloStorage {
   }
   
   @Override
-  protected void configureInputFormat(Configuration conf) {
-    AccumuloInputFormat.addIterator(conf, new IteratorSetting(50, WholeRowIterator.class));
+  protected void configureInputFormat(Job job) {
+    AccumuloInputFormat.addIterator(job, new IteratorSetting(50, WholeRowIterator.class));
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/0ce6fb34/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index 1b5b81a..21d4fc7 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -20,9 +20,13 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
@@ -33,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.data.Tuple;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class AbstractAccumuloStorageTest {
@@ -42,12 +47,19 @@ public class AbstractAccumuloStorageTest {
     Collection<Range> ranges = new LinkedList<Range>();
     ranges.add(new Range(start, end));
     
-    Job expected = new Job();
-    Configuration expectedConf = expected.getConfiguration();
-    AccumuloInputFormat.setInputInfo(expectedConf, user, password.getBytes(), table, authorizations);
-    AccumuloInputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
-    AccumuloInputFormat.fetchColumns(expectedConf, columnFamilyColumnQualifierPairs);
-    AccumuloInputFormat.setRanges(expectedConf, ranges);
+    Job expected = new Job(new Configuration());
+    
+    try {
+      AccumuloInputFormat.setConnectorInfo(expected, user, new PasswordToken(password));
+    } catch (AccumuloSecurityException e) {
+      Assert.fail(e.getMessage());
+    }
+    AccumuloInputFormat.setInputTableName(expected, table);
+    AccumuloInputFormat.setScanAuthorizations(expected, authorizations);
+    AccumuloInputFormat.setZooKeeperInstance(expected, inst, zookeepers);
+    AccumuloInputFormat.fetchColumns(expected, columnFamilyColumnQualifierPairs);
+    AccumuloInputFormat.setRanges(expected, ranges);
+    
     return expected;
   }
   
@@ -72,13 +84,23 @@ public class AbstractAccumuloStorageTest {
   
   public Job getExpectedStoreJob(String inst, String zookeepers, String user, String password,
String table, long maxWriteBufferSize, int writeThreads,
       int maxWriteLatencyMS) throws IOException {
-    Job expected = new Job();
-    Configuration expectedConf = expected.getConfiguration();
-    AccumuloOutputFormat.setOutputInfo(expectedConf, user, password.getBytes(), true, table);
-    AccumuloOutputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
-    AccumuloOutputFormat.setMaxLatency(expectedConf, maxWriteLatencyMS);
-    AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, maxWriteBufferSize);
-    AccumuloOutputFormat.setMaxWriteThreads(expectedConf, writeThreads);
+    
+    Job expected = new Job(new Configuration());
+    
+    try {
+      AccumuloOutputFormat.setConnectorInfo(expected, user, new PasswordToken(password));
+    } catch (AccumuloSecurityException e) {
+      Assert.fail(e.getMessage());
+    }
+    
+    AccumuloOutputFormat.setZooKeeperInstance(expected, inst, zookeepers);
+    
+    BatchWriterConfig bwConfig = new BatchWriterConfig();
+    bwConfig.setMaxLatency(maxWriteLatencyMS, TimeUnit.MILLISECONDS);
+    bwConfig.setMaxMemory(maxWriteBufferSize);
+    bwConfig.setMaxWriteThreads(writeThreads);
+    
+    AccumuloOutputFormat.setBatchWriterOptions(expected, bwConfig);
     
     return expected;
   }


Mime
View raw message