incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Adding a combiner pattern to the blur input format.
Date Wed, 10 Jun 2015 19:20:46 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master b724bdf4a -> 13f3414b6


Adding a combiner pattern to the blur input format.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/13f3414b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/13f3414b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/13f3414b

Branch: refs/heads/master
Commit: 13f3414b6c7c95f3aa262c733dea18b0200c93b6
Parents: b724bdf
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed Jun 10 15:20:49 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed Jun 10 15:20:49 2015 -0400

----------------------------------------------------------------------
 .../blur/mapreduce/lib/BlurInputFormat.java     | 124 +++++++++++++++++--
 .../lib/GenericRecordReaderCollection.java      |  92 ++++++++++++++
 .../blur/mapreduce/lib/BlurInputFormatTest.java |   8 ++
 3 files changed, 216 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/13f3414b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
index ea6d5d4..f52842a 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
@@ -21,6 +21,8 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
@@ -62,6 +64,8 @@ import org.apache.lucene.index.SegmentInfos;
 import org.apache.lucene.store.Directory;
 
 public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord> {
+  private static final String BLUR_INPUT_FORMAT_MAX_MAPS = "blur.input.format.max.maps";
+
   private static final String BLUR_INPUTFORMAT_FILE_CACHE_PATH = "blur.inputformat.file.cache.path";
 
   private static final Log LOG = LogFactory.getLog(BlurInputFormat.class);
@@ -80,7 +84,19 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
     } else {
       splits = getSplits(configuration, dirs);
     }
-    return toList(splits);
+    return toList(getMaxNumberOfMaps(configuration), splits);
+  }
+
+  public static int getMaxNumberOfMaps(Configuration configuration) {
+    return configuration.getInt(BLUR_INPUT_FORMAT_MAX_MAPS, Integer.MAX_VALUE);
+  }
+
+  public static void setMaxNumberOfMaps(Configuration configuration, int maxNumberOfMaps)
{
+    configuration.setInt(BLUR_INPUT_FORMAT_MAX_MAPS, maxNumberOfMaps);
+  }
+
+  public static void setMaxNumberOfMaps(Job job, int maxNumberOfMaps) {
+    setMaxNumberOfMaps(job.getConfiguration(), maxNumberOfMaps);
   }
 
   private static List<BlurInputSplit> getSplitsFromCommand(Configuration configuration,
Path[] dirs) throws IOException {
@@ -168,14 +184,106 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
     return configuration.get(BlurConstants.BLUR_ZOOKEEPER_CONNECTION) != null;
   }
 
-  private List<InputSplit> toList(List<BlurInputSplit> splits) {
+  private List<InputSplit> toList(int maxSplits, List<BlurInputSplit> splits)
{
+    // Reduce splits number requested
+    List<BlurInputSplitColletion> collections = new ArrayList<BlurInputSplitColletion>();
+    for (BlurInputSplit blurInputSplit : splits) {
+      BlurInputSplitColletion blurInputSplitColletion;
+      if (collections.size() < maxSplits) {
+        blurInputSplitColletion = new BlurInputSplitColletion();
+        collections.add(blurInputSplitColletion);
+      } else {
+        blurInputSplitColletion = findSmallest(collections);
+      }
+      blurInputSplitColletion.add(blurInputSplit);
+    }
+
     List<InputSplit> inputSplits = new ArrayList<InputSplit>();
-    for (BlurInputSplit inputSplit : splits) {
+    for (BlurInputSplitColletion inputSplit : collections) {
       inputSplits.add(inputSplit);
     }
     return inputSplits;
   }
 
+  private BlurInputSplitColletion findSmallest(List<BlurInputSplitColletion> collections)
{
+    Collections.sort(collections, new Comparator<BlurInputSplitColletion>() {
+      @Override
+      public int compare(BlurInputSplitColletion o1, BlurInputSplitColletion o2) {
+        long l1 = o1.getLength();
+        long l2 = o2.getLength();
+        if (l1 == l2) {
+          return 0;
+        }
+        // Smallest first
+        return l1 < l2 ? -1 : 1;
+      }
+    });
+    return collections.get(0);
+  }
+
+  public static class BlurInputSplitColletion extends InputSplit implements Writable {
+
+    private List<BlurInputSplit> _splits = new ArrayList<BlurInputSplit>();
+    private long _length;
+
+    public BlurInputSplitColletion() {
+
+    }
+
+    public void add(BlurInputSplit blurInputSplit) {
+      _splits.add(blurInputSplit);
+      _length += blurInputSplit.getLength();
+    }
+
+    public BlurInputSplitColletion(List<BlurInputSplit> splits) {
+      _splits = splits;
+    }
+
+    @Override
+    public long getLength() {
+      return _length;
+    }
+
+    @Override
+    public String[] getLocations() {
+      return new String[] {};
+    }
+
+    public List<BlurInputSplit> getSplits() {
+      return _splits;
+    }
+
+    public void setSplits(List<BlurInputSplit> splits) {
+      _splits = splits;
+    }
+
+    public void setLength(long length) {
+      _length = length;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeLong(_length);
+      out.writeInt(_splits.size());
+      for (BlurInputSplit split : _splits) {
+        split.write(out);
+      }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      _splits.clear();
+      _length = in.readLong();
+      int size = in.readInt();
+      for (int i = 0; i < size; i++) {
+        BlurInputSplit blurInputSplit = new BlurInputSplit();
+        blurInputSplit.readFields(in);
+        _splits.add(blurInputSplit);
+      }
+    }
+
+  }
+
   public static List<BlurInputSplit> getSplits(Configuration configuration, Path[]
dirs) throws IOException {
     int threads = configuration.getInt(BLUR_INPUT_FORMAT_DISCOVERY_THREADS, 10);
     ExecutorService service = Executors.newFixedThreadPool(threads);
@@ -334,13 +442,13 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
   @Override
   public RecordReader<Text, TableBlurRecord> createRecordReader(InputSplit split, TaskAttemptContext
context)
       throws IOException, InterruptedException {
-    final GenericRecordReader genericRecordReader = new GenericRecordReader();
-    genericRecordReader.initialize((BlurInputSplit) split, context.getConfiguration());
+    final GenericRecordReaderCollection genericRecordReader = new GenericRecordReaderCollection();
+    genericRecordReader.initialize((BlurInputSplitColletion) split, context.getConfiguration());
     return new RecordReader<Text, TableBlurRecord>() {
 
       @Override
       public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
-        genericRecordReader.initialize((BlurInputSplit) split, context.getConfiguration());
+        genericRecordReader.initialize((BlurInputSplitColletion) split, context.getConfiguration());
       }
 
       @Override
@@ -400,12 +508,12 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
     }
 
     @Override
-    public long getLength() throws IOException {
+    public long getLength() {
       return _fileLength;
     }
 
     @Override
-    public String[] getLocations() throws IOException {
+    public String[] getLocations() {
       // @TODO create locations for fdt file
       return new String[] {};
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/13f3414b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReaderCollection.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReaderCollection.java
b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReaderCollection.java
new file mode 100644
index 0000000..b1617f0
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReaderCollection.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.blur.mapreduce.lib.BlurInputFormat.BlurInputSplit;
+import org.apache.blur.mapreduce.lib.BlurInputFormat.BlurInputSplitColletion;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+
+public class GenericRecordReaderCollection {
+
+  private Configuration _configuration;
+  private boolean _setup;
+  private List<BlurInputSplit> _splits;
+  private boolean _complete;
+  private GenericRecordReader _reader;
+  private int _currentSplit = 0;
+
+  public void initialize(BlurInputSplitColletion split, Configuration configuration) {
+    _configuration = configuration;
+    if (_setup) {
+      return;
+    }
+    _setup = true;
+    _splits = split.getSplits();
+  }
+
+  public boolean nextKeyValue() throws IOException {
+    if (_complete) {
+      return false;
+    }
+    while (true) {
+      if (_reader == null) {
+        // setup first reader
+        _reader = new GenericRecordReader();
+        _reader.initialize(_splits.get(_currentSplit), _configuration);
+        _currentSplit++;
+      }
+      boolean nextKeyValue = _reader.nextKeyValue();
+      if (nextKeyValue) {
+        return true;
+      } else {
+        _reader.close();
+        if (_currentSplit < _splits.size()) {
+          _reader = new GenericRecordReader();
+          _reader.initialize(_splits.get(_currentSplit), _configuration);
+          _currentSplit++;
+        } else {
+          _reader = null;
+          _complete = true;
+          return false;
+        }
+      }
+    }
+  }
+
+  public Text getCurrentKey() throws IOException {
+    return _reader.getCurrentKey();
+  }
+
+  public TableBlurRecord getCurrentValue() throws IOException {
+    return _reader.getCurrentValue();
+  }
+
+  public float getProgress() {
+    return _currentSplit / (float) _splits.size();
+  }
+
+  public void close() throws IOException {
+    if (_reader != null) {
+      _reader.close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/13f3414b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
index 209a1fa..c60c92c 100644
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
@@ -50,6 +50,9 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Reader;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -169,12 +172,17 @@ public class BlurInputFormatTest {
       BlurInputFormat.setLocalCachePath(job, fileCache);
     }
 
+    BlurInputFormat.setMaxNumberOfMaps(job, 1);
     BlurInputFormat.setZooKeeperConnectionStr(job, miniCluster.getZkConnectionString());
     BlurInputFormat.addTable(job, tableDescriptor, snapshot);
     FileOutputFormat.setOutputPath(job, output);
 
     try {
       assertTrue(job.waitForCompletion(true));
+      Counters counters = job.getCounters();
+      CounterGroup counterGroup = counters.getGroup("org.apache.hadoop.mapreduce.JobCounter");
+      Counter counter = counterGroup.findCounter("TOTAL_LAUNCHED_MAPS");
+      assertEquals(1, counter.getValue());
     } finally {
       client.removeSnapshot(tableName, snapshot);
     }


Mime
View raw message