incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [03/50] [abbrv] git commit: Figured out how to get counters into BlurOutputFormat.
Date Fri, 17 May 2013 03:24:30 GMT
Figured out how to get counters into BlurOutputFormat.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/0184ccae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/0184ccae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/0184ccae

Branch: refs/heads/0.1.5
Commit: 0184ccaeab4a55292b5cfff908ae7e7920fe55ad
Parents: 4563dde
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu May 16 22:59:50 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu May 16 22:59:50 2013 -0400

----------------------------------------------------------------------
 .../apache/blur/mapreduce/lib/BlurCounters.java    |    2 +-
 .../blur/mapreduce/lib/BlurOutputFormat.java       |   84 +++++++++++++-
 .../apache/blur/mapreduce/lib/CsvBlurMapper.java   |   15 ++-
 .../blur/mapreduce/lib/DefaultBlurReducer.java     |   12 ++
 .../blur/mapreduce/lib/ProgressableDirectory.java  |    7 +-
 .../blur/mapreduce/lib/CsvBlurMapperTest.java      |   36 +++++--
 6 files changed, 134 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0184ccae/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
index 6bd3407..99372b1 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
@@ -21,6 +21,6 @@ package org.apache.blur.mapreduce.lib;
  * The enum class used for all the internal counters during map reduce jobs.
  */
 public enum BlurCounters {
-  RECORD_COUNT, FIELD_COUNT
+  RECORD_COUNT, FIELD_COUNT, ROW_COUNT
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0184ccae/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
index 435d072..d462780 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -18,9 +18,11 @@ package org.apache.blur.mapreduce.lib;
  */
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.blur.analysis.BlurAnalyzer;
 import org.apache.blur.log.Log;
@@ -36,6 +38,7 @@ import org.apache.blur.utils.BlurUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus.State;
@@ -44,13 +47,16 @@ import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progressable;
 import org.apache.lucene.document.Document;
-import org.apache.lucene.document.StringField;
 import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.NoLockFactory;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TJSONProtocol;
@@ -59,9 +65,29 @@ import org.apache.thrift.transport.TIOStreamTransport;
 public class BlurOutputFormat extends OutputFormat<Text, BlurMutate> {
 
   public static final String BLUR_TABLE_DESCRIPTOR = "blur.table.descriptor";
-  private static final String MAPRED_OUTPUT_COMMITTER_CLASS = "mapred.output.committer.class";
   public static final String BLUR_OUTPUT_PATH = "blur.output.path";
 
+  private static final String JAVA_IO_TMPDIR = "java.io.tmpdir";
+  private static final String MAPRED_OUTPUT_COMMITTER_CLASS = "mapred.output.committer.class";
+  private static ThreadLocal<Progressable> _progressable = new ThreadLocal<Progressable>();
+  private static ThreadLocal<GetCounter> _getCounter = new ThreadLocal<GetCounter>();
+
+  static void setProgressable(Progressable progressable) {
+    _progressable.set(progressable);
+  }
+
+  static Progressable getProgressable() {
+    return _progressable.get();
+  }
+
+  static void setGetCounter(GetCounter getCounter) {
+    _getCounter.set(getCounter);
+  }
+
+  static GetCounter getGetCounter() {
+    return _getCounter.get();
+  }
+
   @Override
   public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException
{
 
@@ -181,13 +207,20 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
 
   static class BlurRecordWriter extends RecordWriter<Text, BlurMutate> {
 
-    private static Log LOG = LogFactory.getLog(BlurRecordWriter.class);
+    private static final Log LOG = LogFactory.getLog(BlurRecordWriter.class);
 
     private final Text _prevKey = new Text();
     private final List<Document> _documents = new ArrayList<Document>();
     private final IndexWriter _writer;
     private final BlurAnalyzer _analyzer;
     private final StringBuilder _builder = new StringBuilder();
+    private final Directory _finalDir;
+    private final Directory _localDir;
+    private final File _localPath;
+    private Counter _fieldCount;
+    private Counter _recordCount;
+    private Counter _rowCount;
+    private boolean _countersSetup = false;
 
     public BlurRecordWriter(Configuration configuration, BlurAnalyzer blurAnalyzer, int shardId,
String tmpDirName)
         throws IOException {
@@ -195,17 +228,27 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
       String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
       Path indexPath = new Path(tableOutput, shardName);
       Path newIndex = new Path(indexPath, tmpDirName);
+      _finalDir = new ProgressableDirectory(new HdfsDirectory(configuration, newIndex),
+          BlurOutputFormat.getProgressable());
+      _finalDir.setLockFactory(NoLockFactory.getNoLockFactory());
 
       TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
       _analyzer = new BlurAnalyzer(tableDescriptor.getAnalyzerDefinition());
       IndexWriterConfig conf = new IndexWriterConfig(LuceneVersionConstant.LUCENE_VERSION,
_analyzer);
-      Directory dir = new HdfsDirectory(configuration, newIndex);
-      dir.setLockFactory(NoLockFactory.getNoLockFactory());
-      _writer = new IndexWriter(dir, conf);
+
+      String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
+      _localPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
+      _localDir = new ProgressableDirectory(FSDirectory.open(_localPath), BlurOutputFormat.getProgressable());
+      _writer = new IndexWriter(_localDir, conf);
+
     }
 
     @Override
     public void write(Text key, BlurMutate value) throws IOException, InterruptedException
{
+      if (!_countersSetup) {
+        setupCounter();
+        _countersSetup = true;
+      }
       if (!_prevKey.equals(key)) {
         flush();
         _prevKey.set(key);
@@ -213,6 +256,13 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
       add(value);
     }
 
+    private void setupCounter() {
+      GetCounter getCounter = BlurOutputFormat.getGetCounter();
+      _fieldCount = getCounter.getCounter(BlurCounters.FIELD_COUNT);
+      _recordCount = getCounter.getCounter(BlurCounters.RECORD_COUNT);
+      _rowCount = getCounter.getCounter(BlurCounters.ROW_COUNT);
+    }
+
     private void add(BlurMutate value) {
       BlurRecord blurRecord = value.getRecord();
       Record record = getRecord(blurRecord);
@@ -246,6 +296,28 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
       flush();
       _writer.close();
+      copyDir();
+    }
+
+    private void copyDir() throws IOException {
+      String[] fileNames = _localDir.listAll();
+      for (String fileName : fileNames) {
+        LOG.info("Copying [{0}]", fileName);
+        _localDir.copy(_finalDir, fileName, fileName, IOContext.DEFAULT);
+      }
+      rm(_localPath);
+    }
+
+    private void rm(File file) {
+      if (!file.exists()) {
+        return;
+      }
+      if (file.isDirectory()) {
+        for (File f : file.listFiles()) {
+          rm(f);
+        }
+      }
+      file.delete();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0184ccae/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
index 36474fd..e5e73fb 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
@@ -283,9 +284,13 @@ public class CsvBlurMapper extends BaseBlurMapper<LongWritable, Text>
{
   }
 
   private Path getCurrentFile(Context context) throws IOException {
-    FileSplit inputSplit = (FileSplit) context.getInputSplit();
-    Path path = inputSplit.getPath();
-    return path.makeQualified(path.getFileSystem(context.getConfiguration()));
+    InputSplit split = context.getInputSplit();
+    if (split != null) {
+      FileSplit inputSplit = (FileSplit) split;
+      Path path = inputSplit.getPath();
+      return path.makeQualified(path.getFileSystem(context.getConfiguration()));
+    }
+    return null;
   }
 
   @Override
@@ -334,6 +339,10 @@ public class CsvBlurMapper extends BaseBlurMapper<LongWritable, Text>
{
     context.progress();
   }
 
+  public void setFamilyFromPath(String familyFromPath) {
+    this.familyFromPath = familyFromPath;
+  }
+
   private String getColumnNames(List<String> columnNames) {
     StringBuilder builder = new StringBuilder();
     for (String c : columnNames) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0184ccae/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
index 826ba46..016fb51 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
@@ -19,6 +19,7 @@ package org.apache.blur.mapreduce.lib;
 import java.io.IOException;
 
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Reducer;
 
 /**
@@ -58,6 +59,17 @@ import org.apache.hadoop.mapreduce.Reducer;
 public class DefaultBlurReducer extends Reducer<Text, BlurMutate, Text, BlurMutate>
{
 
   @Override
+  protected void setup(final Context context) throws IOException, InterruptedException {
+    BlurOutputFormat.setProgressable(context);
+    BlurOutputFormat.setGetCounter(new GetCounter() {
+      @Override
+      public Counter getCounter(Enum<?> counterName) {
+        return context.getCounter(counterName);
+      }
+    });
+  }
+
+  @Override
   protected void reduce(Text key, Iterable<BlurMutate> values, Context context) throws
IOException,
       InterruptedException {
     for (BlurMutate value : values) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0184ccae/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
index 3ca6a35..6a96166 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
@@ -42,7 +42,12 @@ public class ProgressableDirectory extends Directory {
 
   public ProgressableDirectory(Directory directory, Progressable progressable) {
     _directory = directory;
-    _progressable = progressable;
+    _progressable = progressable == null ? new Progressable() {
+      @Override
+      public void progress() {
+
+      }
+    } : progressable;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0184ccae/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
index 7633762..2e28f6c 100644
--- a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
+++ b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import org.apache.blur.mapreduce.lib.CsvBlurMapper;
 import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mrunit.mapreduce.MapDriver;
@@ -29,24 +30,37 @@ import org.junit.Test;
 
 public class CsvBlurMapperTest {
 
-  MapDriver<LongWritable, Text, Text, BlurMutate> mapDriver;
+  private MapDriver<LongWritable, Text, Text, BlurMutate> _mapDriver;
+  private CsvBlurMapper _mapper;
 
   @Before
   public void setUp() throws IOException {
-    CsvBlurMapper mapper = new CsvBlurMapper();
-    mapDriver = MapDriver.newMapDriver(mapper);
-    Configuration configuration = mapDriver.getConfiguration();
+    _mapper = new CsvBlurMapper();
+    _mapDriver = MapDriver.newMapDriver(_mapper);
+  }
+
+  @Test
+  public void testMapperWithFamilyInData() {
+    Configuration configuration = _mapDriver.getConfiguration();
     CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    _mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,cf1,value1,value2"));
+    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1",
"record1", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
   }
 
   @Test
-  public void testMapper() {
-    mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,cf1,value1,value2"));
-    mapDriver.withOutput(
-        new Text("rowid1"),
-        new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "record1", "cf1").addColumn("col1",
"value1").addColumn("col2",
-            "value2"));
-    mapDriver.runTest();
+  public void testMapperFamilyPerPath() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setFamilyNotInFile(configuration, true);
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
+    _mapper.setFamilyFromPath("cf1");
+
+    _mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,value1,value2"));
+    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1",
"record1", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
   }
 
 }
\ No newline at end of file


Mime
View raw message