accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [4/5] git commit: ACCUMULO-1783 Update the setStoreLocation method and tests.
Date Thu, 07 Nov 2013 05:24:06 GMT
ACCUMULO-1783 Update the setStoreLocation method and tests.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/904604db
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/904604db
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/904604db

Branch: refs/heads/ACCUMULO-1783
Commit: 904604db28adad26bc0ba02e7c78db15db6424e8
Parents: 9b398d4
Author: Josh Elser <elserj@apache.org>
Authored: Tue Nov 5 20:05:08 2013 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Tue Nov 5 20:05:08 2013 -0500

----------------------------------------------------------------------
 .../accumulo/pig/AbstractAccumuloStorage.java   | 39 +++++++++++++-------
 .../pig/AbstractAccumuloStorageTest.java        | 18 ++++-----
 2 files changed, 34 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/904604db/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index da4a51b..9473753 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -94,7 +94,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
   int maxWriteThreads = 10;
   long maxMutationBufferSize = 10 * 1000 * 1000;
   int maxLatency = 10 * 1000;
-
+  
   protected LoadStoreCaster caster;
   protected ResourceSchema schema;
   protected String contextSignature = null;
@@ -206,8 +206,11 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
     
     int sequence = AccumuloInputFormat.nextSequence();
     
-    if (conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured", false))
{
-      throw new RuntimeException("Was provided sequence number which was already configured:
" + sequence);
+    // TODO Something more.. "certain".
+    if (conf.getBoolean(AccumuloInputFormat.class.getSimpleName() + ".configured." + sequence,
false)) {
+      LOG.warn("InputFormat already configured for " + sequence);
+      return;
+      // throw new RuntimeException("Was provided sequence number which was already configured:
" + sequence);
     }
     
     AccumuloInputFormat.setInputInfo(conf, sequence, user, password.getBytes(), table, authorizations);
@@ -248,7 +251,8 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
   public void setStoreFuncUDFContextSignature(String signature) {
     this.contextSignature = signature;
     
-  }  
+  }
+  
   /**
    * Returns UDFProperties based on <code>contextSignature</code>.
    */
@@ -264,14 +268,22 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
     conf = job.getConfiguration();
     setLocationFromUri(location);
     
-    if (!conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured", false))
{
-      AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table);
-      AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers);
-      AccumuloOutputFormat.setMaxLatency(conf, maxLatency);
-      AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize);
-      AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads);
-      configureOutputFormat(conf);
+    int sequence = AccumuloOutputFormat.nextSequence();
+    
+    // TODO Something more.. "certain".
+    if (conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured." + sequence,
false)) {
+      LOG.warn("OutputFormat already configured for " + sequence);
+      return;
+      // throw new RuntimeException("Was provided sequence number which was already configured:
" + sequence);
     }
+    
+    AccumuloOutputFormat.setOutputInfo(conf, sequence, user, password.getBytes(), true, table);
+    AccumuloOutputFormat.setZooKeeperInstance(conf, sequence, inst, zookeepers);
+    AccumuloOutputFormat.setMaxLatency(conf, sequence, maxLatency);
+    AccumuloOutputFormat.setMaxMutationBufferSize(conf, sequence, maxMutationBufferSize);
+    AccumuloOutputFormat.setMaxWriteThreads(conf, sequence, maxWriteThreads);
+    
+    configureOutputFormat(conf);
   }
   
   @SuppressWarnings("rawtypes")
@@ -298,7 +310,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
   }
   
   public void cleanupOnFailure(String failure, Job job) {}
-
+  
   public void cleanupOnSuccess(String location, Job job) {}
   
   @Override
@@ -310,7 +322,6 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
     schema = s;
     getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema));
   }
-
   
   protected Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws
IOException {
     Object o = tuple.get(i);
@@ -341,7 +352,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements
StoreF
     
   }
   
-  protected long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws
IOException { 
+  protected long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws
IOException {
     Object o = tuple.get(i);
     byte type = schemaToType(o, i, fieldSchemas);
     

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/904604db/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index 5f4ecf2..e9f0297 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -77,20 +77,20 @@ public class AbstractAccumuloStorageTest {
     return expected;
   }
   
-  public Job getExpectedStoreJob(String inst, String zookeepers, String user, String password,
String table, long maxWriteBufferSize, int writeThreads,
+  public Job getExpectedStoreJob(int sequence, String inst, String zookeepers, String user,
String password, String table, long maxWriteBufferSize, int writeThreads,
       int maxWriteLatencyMS) throws IOException {
     Job expected = new Job();
     Configuration expectedConf = expected.getConfiguration();
-    AccumuloOutputFormat.setOutputInfo(expectedConf, user, password.getBytes(), true, table);
-    AccumuloOutputFormat.setZooKeeperInstance(expectedConf, inst, zookeepers);
-    AccumuloOutputFormat.setMaxLatency(expectedConf, maxWriteLatencyMS);
-    AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, maxWriteBufferSize);
-    AccumuloOutputFormat.setMaxWriteThreads(expectedConf, writeThreads);
+    AccumuloOutputFormat.setOutputInfo(expectedConf, sequence,user, password.getBytes(),
true, table);
+    AccumuloOutputFormat.setZooKeeperInstance(expectedConf, sequence,inst, zookeepers);
+    AccumuloOutputFormat.setMaxLatency(expectedConf, sequence,maxWriteLatencyMS);
+    AccumuloOutputFormat.setMaxMutationBufferSize(expectedConf, sequence,maxWriteBufferSize);
+    AccumuloOutputFormat.setMaxWriteThreads(expectedConf, sequence,writeThreads);
     
     return expected;
   }
   
-  public Job getDefaultExpectedStoreJob() throws IOException {
+  public Job getDefaultExpectedStoreJob(int sequence) throws IOException {
     String inst = "myinstance";
     String zookeepers = "127.0.0.1:2181";
     String user = "root";
@@ -100,7 +100,7 @@ public class AbstractAccumuloStorageTest {
     int writeThreads = 7;
     int maxWriteLatencyMS = 30000;
     
-    Job expected = getExpectedStoreJob(inst, zookeepers, user, password, table, maxWriteBufferSize,
writeThreads, maxWriteLatencyMS);
+    Job expected = getExpectedStoreJob(sequence, inst, zookeepers, user, password, table,
maxWriteBufferSize, writeThreads, maxWriteLatencyMS);
     return expected;
   }
   
@@ -150,7 +150,7 @@ public class AbstractAccumuloStorageTest {
     s.setStoreLocation(getDefaultStoreLocation(), actual);
     Configuration actualConf = actual.getConfiguration();
     
-    Job expected = getDefaultExpectedStoreJob();
+    Job expected = getDefaultExpectedStoreJob(1);
     Configuration expectedConf = expected.getConfiguration();
     
     TestUtils.assertConfigurationsEqual(expectedConf, actualConf);


Mime
View raw message