accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [3/3] git commit: ACCUMULO-1854 Fix up InputFormatBase to use the information stored on RangeInputSplit and fall back onto the Configuration.
Date Sat, 09 Nov 2013 03:17:24 GMT
ACCUMULO-1854 Fix up InputFormatBase to use the information stored on
RangeInputSplit and fall back onto the Configuration.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a9644f5b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a9644f5b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a9644f5b

Branch: refs/heads/ACCUMULO-1854-info-in-splits
Commit: a9644f5b94d74466db624fb06bd3d70fb5bf9cf9
Parents: 7c549ab
Author: Josh Elser <elserj@apache.org>
Authored: Fri Nov 8 17:47:57 2013 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Fri Nov 8 17:47:57 2013 -0500

----------------------------------------------------------------------
 .../core/client/mapreduce/InputFormatBase.java  | 140 +++++++++++++++----
 .../core/client/mapreduce/RangeInputSplit.java  | 116 ++++++++++++---
 .../simple/filedata/ChunkInputFormatTest.java   |   4 +-
 3 files changed, 204 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/a9644f5b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
index d3ebd21..0fd2630 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
@@ -826,17 +826,17 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
    */
   protected static Set<Pair<Text,Text>> getFetchedColumns(Configuration conf)
{
     ArgumentChecker.notNull(conf);
-    
+
     return deserializeFetchedColumns(conf.getStrings(COLUMNS));
   }
 
   public static Set<Pair<Text,Text>> deserializeFetchedColumns(String[] serialized)
{
     Set<Pair<Text,Text>> columns = new HashSet<Pair<Text,Text>>();
-    
+
     if (null == serialized) {
       return columns;
     }
-    
+
     for (String col : serialized) {
       int idx = col.indexOf(":");
       Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes()) : Base64.decodeBase64(col.substring(0,
idx).getBytes()));
@@ -1061,8 +1061,9 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
     /**
      * @deprecated Use {@link #setupIterators(Configuration,Scanner)} instead
      */
-    protected void setupIterators(TaskAttemptContext attempt, Scanner scanner) throws AccumuloException
{
-      setupIterators(attempt.getConfiguration(), scanner);
+    protected void setupIterators(TaskAttemptContext attempt, Scanner scanner, List<AccumuloIterator>
iterators, List<AccumuloIteratorOption> options)
+        throws AccumuloException {
+      setupIterators(attempt.getConfiguration(), scanner, iterators, options);
     }
 
     /**
@@ -1074,9 +1075,8 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
      *          the scanner to configure
      * @throws AccumuloException
      */
-    protected void setupIterators(Configuration conf, Scanner scanner) throws AccumuloException
{
-      List<AccumuloIterator> iterators = getIterators(conf);
-      List<AccumuloIteratorOption> options = getIteratorOptions(conf);
+    protected void setupIterators(Configuration conf, Scanner scanner, List<AccumuloIterator>
iterators, List<AccumuloIteratorOption> options)
+        throws AccumuloException {
 
       Map<String,IteratorSetting> scanIterators = new HashMap<String,IteratorSetting>();
       for (AccumuloIterator iterator : iterators) {
@@ -1093,8 +1093,8 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
     /**
      * @deprecated Use {@link #setupMaxVersions(Configuration,Scanner)} instead
      */
-    protected void setupMaxVersions(TaskAttemptContext attempt, Scanner scanner) {
-      setupMaxVersions(attempt.getConfiguration(), scanner);
+    protected void setupMaxVersions(TaskAttemptContext attempt, Scanner scanner, int maxVersions)
{
+      setupMaxVersions(attempt.getConfiguration(), scanner, maxVersions);
     }
 
     /**
@@ -1105,8 +1105,7 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
      * @param scanner
      *          the scanner to configure
      */
-    protected void setupMaxVersions(Configuration conf, Scanner scanner) {
-      int maxVersions = getMaxVersions(conf);
+    protected void setupMaxVersions(Configuration conf, Scanner scanner, int maxVersions)
{
       // Check to make sure its a legit value
       if (maxVersions >= 1) {
         IteratorSetting vers = new IteratorSetting(0, "vers", VersioningIterator.class);
@@ -1123,43 +1122,119 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
       split = (RangeInputSplit) inSplit;
       log.debug("Initializing input split: " + split.getRange());
       Configuration conf = attempt.getConfiguration();
-      Instance instance = getInstance(conf);
-      String user = getUsername(conf);
-      byte[] password = getPassword(conf);
-      Authorizations authorizations = getAuthorizations(conf);
+
+      Instance instance = split.getInstance();
+      if (null == instance) {
+        instance = getInstance(conf);
+      }
+
+      String user = split.getUsername();
+      if (null == user) {
+        user = getUsername(conf);
+      }
+
+      byte[] password = split.getPassword();
+      if (null == password) {
+        password = getPassword(conf);
+      }
+
+      Authorizations authorizations = split.getAuths();
+      if (null == authorizations) {
+        authorizations = getAuthorizations(conf);
+      }
+
+      String table = split.getTable();
+      if (null == table) {
+        table = getTablename(conf);
+      }
+      
+      Boolean isOffline = split.isOffline();
+      if (null == isOffline) {
+        isOffline = isOfflineScan(conf);
+      }
+
+      Boolean isIsolated = split.isIsolatedScan();
+      if (null == isIsolated) {
+        isIsolated = isIsolated(conf);
+      }
+
+      Boolean usesLocalIterators = split.usesLocalIterators();
+      if (null == usesLocalIterators) {
+        usesLocalIterators = usesLocalIterators(conf);
+      }
+
+      String rowRegex = split.getRowRegex();
+      if (null == rowRegex) {
+        rowRegex = conf.get(ROW_REGEX);
+      }
+
+      String colfRegex = split.getColfamRegex();
+      if (null == colfRegex) {
+        colfRegex = conf.get(COLUMN_FAMILY_REGEX);
+      }
+
+      String colqRegex = split.getColqualRegex();
+      if (null == colqRegex) {
+        colqRegex = conf.get(COLUMN_QUALIFIER_REGEX);
+      }
+
+      String valueRegex = split.getValueRegex();
+      if (null == valueRegex) {
+        valueRegex = conf.get(VALUE_REGEX);
+      }
+
+      Integer maxVersions = split.getMaxVersions();
+      if (null == maxVersions) {
+        maxVersions = getMaxVersions(conf);
+      }
+      
+      List<AccumuloIterator> iterators = split.getIterators();
+      if (null == iterators) {
+        iterators = getIterators(conf);
+      }
+      
+      List<AccumuloIteratorOption> options = split.getOptions();
+      if (null == options) {
+        options = getIteratorOptions(conf);
+      }
+      
+      Set<Pair<Text,Text>> columns = split.getFetchedColumns();
+      if (null == columns) {
+        columns = getFetchedColumns(conf);
+      }
 
       try {
         log.debug("Creating connector with user: " + user);
         Connector conn = instance.getConnector(user, password);
-        log.debug("Creating scanner for table: " + getTablename(conf));
+        log.debug("Creating scanner for table: " + table);
         log.debug("Authorizations are: " + authorizations);
-        if (isOfflineScan(conf)) {
-          scanner = new OfflineScanner(instance, new AuthInfo(user, ByteBuffer.wrap(password),
instance.getInstanceID()), Tables.getTableId(instance,
-              getTablename(conf)), authorizations);
+        if (isOffline) {
+          scanner = new OfflineScanner(instance, new AuthInfo(user, ByteBuffer.wrap(password),
instance.getInstanceID()), Tables.getTableId(instance, table),
+              authorizations);
         } else {
-          scanner = conn.createScanner(getTablename(conf), authorizations);
+          scanner = conn.createScanner(table, authorizations);
         }
-        if (isIsolated(conf)) {
+        if (isIsolated) {
           log.info("Creating isolated scanner");
           scanner = new IsolatedScanner(scanner);
         }
-        if (usesLocalIterators(conf)) {
+        if (usesLocalIterators) {
           log.info("Using local iterators");
           scanner = new ClientSideIteratorScanner(scanner);
         }
-        setupMaxVersions(conf, scanner);
-        if (conf.get(ROW_REGEX) != null || conf.get(COLUMN_FAMILY_REGEX) != null || conf.get(COLUMN_QUALIFIER_REGEX)
!= null || conf.get(VALUE_REGEX) != null) {
+        setupMaxVersions(conf, scanner, maxVersions);
+        if (rowRegex != null || colfRegex != null || colqRegex != null || valueRegex != null)
{
           IteratorSetting is = new IteratorSetting(50, RegExFilter.class);
-          RegExFilter.setRegexs(is, conf.get(ROW_REGEX), conf.get(COLUMN_FAMILY_REGEX), conf.get(COLUMN_QUALIFIER_REGEX),
conf.get(VALUE_REGEX), false);
+          RegExFilter.setRegexs(is, rowRegex, colfRegex, colqRegex, valueRegex, false);
           scanner.addScanIterator(is);
         }
-        setupIterators(conf, scanner);
+        setupIterators(conf, scanner, iterators, options);
       } catch (Exception e) {
         throw new IOException(e);
       }
 
       // setup a scanner within the bounds of this split
-      for (Pair<Text,Text> c : getFetchedColumns(conf)) {
+      for (Pair<Text,Text> c : columns) {
         if (c.getSecond() != null) {
           log.debug("Fetching column " + c.getFirst() + ":" + c.getSecond());
           scanner.fetchColumn(c.getFirst(), c.getSecond());
@@ -1318,14 +1393,15 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
     boolean localIterators = usesLocalIterators(conf);
     boolean mockInstance = conf.getBoolean(MOCK, false);
     int maxVersions = getMaxVersions(conf);
-    String rowRegex = conf.get(ROW_REGEX), colfamRegex = conf.get(COLUMN_FAMILY_REGEX), colqualRegex
= conf.get(COLUMN_QUALIFIER_REGEX), 
-        valueRegex = conf.get(VALUE_REGEX);
+    String rowRegex = conf.get(ROW_REGEX), colfamRegex = conf.get(COLUMN_FAMILY_REGEX), colqualRegex
= conf.get(COLUMN_QUALIFIER_REGEX), valueRegex = conf
+        .get(VALUE_REGEX);
     Set<Pair<Text,Text>> fetchedColumns = getFetchedColumns(conf);
     Authorizations auths = getAuthorizations(conf);
     byte[] password = getPassword(conf);
     String username = getUsername(conf);
     Instance instance = getInstance(conf);
-        
+    List<AccumuloIterator> iterators = getIterators(conf);
+    List<AccumuloIteratorOption> options = getIteratorOptions(conf);
 
     if (ranges.isEmpty()) {
       ranges = new ArrayList<Range>(1);
@@ -1426,6 +1502,8 @@ public abstract class InputFormatBase<K,V> extends InputFormat<K,V>
{
       split.setInstanceName(instance.getInstanceName());
       split.setZooKeepers(instance.getZooKeepers());
       split.setAuths(auths);
+      split.setIterators(iterators);
+      split.setOptions(options);
     }
 
     return splits;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a9644f5b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
index 4cd16b2..3caa111 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
@@ -20,8 +20,14 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.math.BigInteger;
+import java.util.List;
 import java.util.Set;
 
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIterator;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase.AccumuloIteratorOption;
+import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
@@ -42,10 +48,12 @@ public class RangeInputSplit extends InputSplit implements Writable {
   private String table, instanceName, zooKeepers, username;
   private String rowRegex, colfamRegex, colqualRegex, valueRegex;
   private byte[] password;
-  private boolean offline = false, mockInstance = false, isolatedScan = false, localIterators
= false;
-  private int maxVersions = 1;
+  private Boolean offline, mockInstance, isolatedScan, localIterators;
+  private Integer maxVersions;
   private Authorizations auths;
   private Set<Pair<Text,Text>> fetchedColumns;
+  private List<AccumuloIterator> iterators;
+  private List<AccumuloIteratorOption> options;
 
   public RangeInputSplit() {
     range = new Range();
@@ -133,12 +141,25 @@ public class RangeInputSplit extends InputSplit implements Writable
{
     for (int i = 0; i < numLocs; ++i)
       locations[i] = in.readUTF();
     
-    isolatedScan = in.readBoolean();
-    offline = in.readBoolean();
-    localIterators = in.readBoolean();
-    mockInstance = in.readBoolean();
+    if (in.readBoolean()) {
+      isolatedScan = in.readBoolean();
+    }
+    
+    if (in.readBoolean()) {
+      offline = in.readBoolean();
+    }
+    
+    if (in.readBoolean()) {
+      localIterators = in.readBoolean();
+    }
+    
+    if (in.readBoolean()) {
+      mockInstance = in.readBoolean();
+    }
     
-    maxVersions = in.readInt();
+    if (in.readBoolean()) {
+      maxVersions = in.readInt();
+    }
     
     if (in.readBoolean()) {
       rowRegex = in.readUTF();
@@ -193,12 +214,30 @@ public class RangeInputSplit extends InputSplit implements Writable
{
     for (int i = 0; i < locations.length; ++i)
       out.writeUTF(locations[i]);
     
-    out.writeBoolean(isIsolatedScan());
-    out.writeBoolean(isOffline());
-    out.writeBoolean(usesLocalIterators());
-    out.writeBoolean(isMockInstance());
+    out.writeBoolean(null != isolatedScan);
+    if (null != isolatedScan) {
+      out.writeBoolean(isolatedScan);
+    }
+    
+    out.writeBoolean(null != offline);
+    if (null != offline) {
+      out.writeBoolean(offline);
+    }
+    
+    out.writeBoolean(null != localIterators);
+    if (null != localIterators) {
+      out.writeBoolean(localIterators);
+    }
+    
+    out.writeBoolean(null != mockInstance);
+    if (null != mockInstance) {
+      out.writeBoolean(mockInstance);
+    }
     
-    out.writeInt(getMaxVersions());
+    out.writeBoolean(null != maxVersions);
+    if (null != maxVersions) {
+      out.writeInt(getMaxVersions());
+    }
     
     out.writeBoolean(null != rowRegex);
     if (null != rowRegex) {
@@ -261,6 +300,7 @@ public class RangeInputSplit extends InputSplit implements Writable {
     sb.append("Range: ").append(range);
     sb.append(" Locations: ").append(locations);
     sb.append(" Table: ").append(table);
+    // TODO finish building of string
     return sb.toString();
   }
 
@@ -271,6 +311,22 @@ public class RangeInputSplit extends InputSplit implements Writable {
   public void setTable(String table) {
     this.table = table;
   }
+  
+  public Instance getInstance() {
+    if (null == instanceName) {
+      return null;
+    }
+    
+    if (isMockInstance()) {  
+      return new MockInstance(getInstanceName());
+    }
+    
+    if (null == zooKeepers) {
+      return null;
+    }
+    
+    return new ZooKeeperInstance(getInstanceName(), getZooKeepers());
+  }
 
   public String getInstanceName() {
     return instanceName;
@@ -304,11 +360,11 @@ public class RangeInputSplit extends InputSplit implements Writable
{
     this.password = password;
   }
 
-  public boolean isOffline() {
+  public Boolean isOffline() {
     return offline;
   }
 
-  public void setOffline(boolean offline) {
+  public void setOffline(Boolean offline) {
     this.offline = offline;
   }
 
@@ -348,27 +404,27 @@ public class RangeInputSplit extends InputSplit implements Writable
{
     this.valueRegex = valueRegex;
   }
 
-  public boolean isMockInstance() {
+  public Boolean isMockInstance() {
     return mockInstance;
   }
 
-  public void setMockInstance(boolean mockInstance) {
+  public void setMockInstance(Boolean mockInstance) {
     this.mockInstance = mockInstance;
   }
 
-  public boolean isIsolatedScan() {
+  public Boolean isIsolatedScan() {
     return isolatedScan;
   }
 
-  public void setIsolatedScan(boolean isolatedScan) {
+  public void setIsolatedScan(Boolean isolatedScan) {
     this.isolatedScan = isolatedScan;
   }
 
-  public int getMaxVersions() {
+  public Integer getMaxVersions() {
     return maxVersions;
   }
 
-  public void setMaxVersions(int maxVersions) {
+  public void setMaxVersions(Integer maxVersions) {
     this.maxVersions = maxVersions;
   }
 
@@ -384,11 +440,11 @@ public class RangeInputSplit extends InputSplit implements Writable
{
     this.range = range;
   }
 
-  public boolean usesLocalIterators() {
+  public Boolean usesLocalIterators() {
     return localIterators;
   }
 
-  public void setUsesLocalIterators(boolean localIterators) {
+  public void setUsesLocalIterators(Boolean localIterators) {
     this.localIterators = localIterators;
   }
 
@@ -399,4 +455,20 @@ public class RangeInputSplit extends InputSplit implements Writable {
   public void setFetchedColumns(Set<Pair<Text,Text>> fetchedColumns) {
     this.fetchedColumns = fetchedColumns;
   }
+
+  public List<AccumuloIterator> getIterators() {
+    return iterators;
+  }
+
+  public void setIterators(List<AccumuloIterator> iterators) {
+    this.iterators = iterators;
+  }
+
+  public List<AccumuloIteratorOption> getOptions() {
+    return options;
+  }
+
+  public void setOptions(List<AccumuloIteratorOption> options) {
+    this.options = options;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a9644f5b/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
b/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
index c31c738..af12302 100644
--- a/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
+++ b/src/examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
@@ -30,15 +30,13 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.mapreduce.InputFormatBase.RangeInputSplit;
+import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.examples.simple.filedata.ChunkInputFormat;
-import org.apache.accumulo.examples.simple.filedata.ChunkInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;


Mime
View raw message