incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [6/6] git commit: Adding javadocs and merging BlurRecordWriter into the BlurOutputFormat.
Date Wed, 15 May 2013 18:13:37 GMT
Adding javadocs and merging BlurRecordWriter into the BlurOutputFormat.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/6a9fde03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/6a9fde03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/6a9fde03

Branch: refs/heads/0.1.5
Commit: 6a9fde0372d15ba385643f7f8b2543e89edc5bea
Parents: 98f7e32
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed May 15 14:12:46 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed May 15 14:12:46 2013 -0400

----------------------------------------------------------------------
 .../blur/mapreduce/lib/BlurOutputFormat.java       |  149 ++++++++++++++-
 .../blur/mapreduce/lib/BlurRecordWriter.java       |  115 -----------
 .../blur/mapreduce/lib/BlurOutputFormatTest.java   |   31 +++-
 .../blur/mapreduce/lib/BlurRecordWriterTest.java   |   87 ---------
 4 files changed, 167 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a9fde03/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
index b36cc19..9557fa2 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -16,10 +16,26 @@ package org.apache.blur.mapreduce.lib;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.LuceneVersionConstant;
+import org.apache.blur.manager.writer.TransactionRecorder;
+import org.apache.blur.mapreduce.BlurColumn;
 import org.apache.blur.mapreduce.BlurMutate;
+import org.apache.blur.mapreduce.BlurRecord;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -31,14 +47,27 @@ import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.NoLockFactory;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
 
 public class BlurOutputFormat extends OutputFormat<Text, BlurMutate> {
 
+  public static final String BLUR_TABLE_DESCRIPTOR = "blur.table.descriptor";
   private static final String MAPRED_OUTPUT_COMMITTER_CLASS = "mapred.output.committer.class";
   public static final String BLUR_OUTPUT_PATH = "blur.output.path";
 
   @Override
   public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException
{
+
   }
 
   @Override
@@ -70,7 +99,7 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
 
   private OutputCommitter getDoNothing() {
     return new OutputCommitter() {
-      
+
       @Override
       public void commitJob(JobContext jobContext) throws IOException {
       }
@@ -85,31 +114,64 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
 
       @Override
       public void setupTask(TaskAttemptContext taskContext) throws IOException {
-        
+
       }
-      
+
       @Override
       public void setupJob(JobContext jobContext) throws IOException {
-        
+
       }
-      
+
       @Override
       public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
         return false;
       }
-      
+
       @Override
       public void commitTask(TaskAttemptContext taskContext) throws IOException {
-        
+
       }
-      
+
       @Override
       public void abortTask(TaskAttemptContext taskContext) throws IOException {
-        
+
       }
     };
   }
 
+  public static TableDescriptor getTableDescriptor(Configuration configuration) throws IOException
{
+    String tableDesStr = configuration.get(BLUR_TABLE_DESCRIPTOR);
+    if (tableDesStr == null) {
+      return null;
+    }
+    ByteArrayInputStream inputStream = new ByteArrayInputStream(tableDesStr.getBytes());
+    TIOStreamTransport transport = new TIOStreamTransport(inputStream);
+    TJSONProtocol protocol = new TJSONProtocol(transport);
+    TableDescriptor descriptor = new TableDescriptor();
+    try {
+      descriptor.read(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+    transport.close();
+    return descriptor;
+  }
+
+  public static void setTableDescriptor(Job job, TableDescriptor tableDescriptor) throws
IOException {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    TIOStreamTransport transport = new TIOStreamTransport(outputStream);
+    TJSONProtocol protocol = new TJSONProtocol(transport);
+    try {
+      tableDescriptor.write(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+    transport.close();
+    Configuration configuration = job.getConfiguration();
+    configuration.set(BLUR_TABLE_DESCRIPTOR, new String(outputStream.toByteArray()));
+    setOutputPath(job, new Path(tableDescriptor.getTableUri()));
+  }
+
   public static void setOutputPath(Job job, Path path) {
     Configuration configuration = job.getConfiguration();
     configuration.set(BLUR_OUTPUT_PATH, path.toString());
@@ -120,4 +182,73 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     return new Path(configuration.get(BLUR_OUTPUT_PATH));
   }
 
+  
+  static class BlurRecordWriter extends RecordWriter<Text, BlurMutate> {
+
+    private static Log LOG = LogFactory.getLog(BlurRecordWriter.class);
+
+    private final Text _prevKey = new Text();
+    private final List<Document> _documents = new ArrayList<Document>();
+    private final IndexWriter _writer;
+    private final BlurAnalyzer _analyzer;
+    private final StringBuilder _builder = new StringBuilder();
+
+    public BlurRecordWriter(Configuration configuration, BlurAnalyzer blurAnalyzer, int shardId,
String tmpDirName) throws IOException {
+      Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
+      String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
+      Path indexPath = new Path(tableOutput, shardName);
+      Path newIndex = new Path(indexPath,tmpDirName);
+      
+      TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+      _analyzer = new BlurAnalyzer(tableDescriptor.getAnalyzerDefinition());
+      IndexWriterConfig conf = new IndexWriterConfig(LuceneVersionConstant.LUCENE_VERSION,
_analyzer);
+      Directory dir = new HdfsDirectory(configuration, newIndex);
+      dir.setLockFactory(NoLockFactory.getNoLockFactory());
+      _writer = new IndexWriter(dir, conf);
+    }
+
+    @Override
+    public void write(Text key, BlurMutate value) throws IOException, InterruptedException
{
+      if (!_prevKey.equals(key)) {
+        flush();
+        _prevKey.set(key);
+      }
+      add(value);
+    }
+
+    private void add(BlurMutate value) {
+      BlurRecord blurRecord = value.getRecord();
+      Record record = getRecord(blurRecord);
+      Document document = TransactionRecorder.convert(blurRecord.getRowId(), record, _builder,
_analyzer);
+      if (_documents.size() == 0) {
+        document.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE,
Store.NO));
+      }
+      _documents.add(document);
+      LOG.error("Needs to use blur analyzer and field converter");
+    }
+
+    private Record getRecord(BlurRecord value) {
+      Record record = new Record();
+      record.setRecordId(value.getRecordId());
+      record.setFamily(value.getFamily());
+      for (BlurColumn col : value.getColumns()) {
+        record.addToColumns(new Column(col.getName(), col.getValue()));
+      }
+      return record;
+    }
+
+    private void flush() throws CorruptIndexException, IOException {
+      if (_documents.isEmpty()) {
+        return;
+      }
+      _writer.addDocuments(_documents);
+      _documents.clear();
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
+      flush();
+      _writer.close();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a9fde03/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordWriter.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordWriter.java
deleted file mode 100644
index a40fa49..0000000
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordWriter.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.blur.analysis.BlurAnalyzer;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.lucene.LuceneVersionConstant;
-import org.apache.blur.manager.writer.TransactionRecorder;
-import org.apache.blur.mapreduce.BlurColumn;
-import org.apache.blur.mapreduce.BlurMutate;
-import org.apache.blur.mapreduce.BlurRecord;
-import org.apache.blur.store.hdfs.HdfsDirectory;
-import org.apache.blur.thrift.generated.Column;
-import org.apache.blur.thrift.generated.Record;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field.Store;
-import org.apache.lucene.document.StringField;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.NoLockFactory;
-
-public class BlurRecordWriter extends RecordWriter<Text, BlurMutate> {
-
-  private static Log LOG = LogFactory.getLog(BlurRecordWriter.class);
-
-  private final Text _prevKey = new Text();
-  private final List<Document> _documents = new ArrayList<Document>();
-  private final IndexWriter _writer;
-  private final BlurAnalyzer _analyzer;
-  private final StringBuilder _builder = new StringBuilder();
-
-  public BlurRecordWriter(Configuration configuration, BlurAnalyzer blurAnalyzer, int shardId,
String tmpDirName) throws IOException {
-    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
-    String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
-    Path indexPath = new Path(tableOutput, shardName);
-    Path newIndex = new Path(indexPath,tmpDirName);
-    _analyzer = blurAnalyzer;
-    IndexWriterConfig conf = new IndexWriterConfig(LuceneVersionConstant.LUCENE_VERSION,
_analyzer);
-    Directory dir = new HdfsDirectory(configuration, newIndex);
-    dir.setLockFactory(NoLockFactory.getNoLockFactory());
-    _writer = new IndexWriter(dir, conf);
-  }
-
-  @Override
-  public void write(Text key, BlurMutate value) throws IOException, InterruptedException
{
-    if (!_prevKey.equals(key)) {
-      flush();
-      _prevKey.set(key);
-    }
-    add(value);
-  }
-
-  private void add(BlurMutate value) {
-    BlurRecord blurRecord = value.getRecord();
-    Record record = getRecord(blurRecord);
-    Document document = TransactionRecorder.convert(blurRecord.getRowId(), record, _builder,
_analyzer);
-    if (_documents.size() == 0) {
-      document.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE,
Store.NO));
-    }
-    _documents.add(document);
-    LOG.error("Needs to use blur analyzer and field converter");
-  }
-
-  private Record getRecord(BlurRecord value) {
-    Record record = new Record();
-    record.setRecordId(value.getRecordId());
-    record.setFamily(value.getFamily());
-    for (BlurColumn col : value.getColumns()) {
-      record.addToColumns(new Column(col.getName(), col.getValue()));
-    }
-    return record;
-  }
-
-  private void flush() throws CorruptIndexException, IOException {
-    if (_documents.isEmpty()) {
-      return;
-    }
-    _writer.addDocuments(_documents);
-    _documents.clear();
-  }
-
-  @Override
-  public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
-    flush();
-    _writer.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a9fde03/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
index e728d41..bb15969 100644
--- a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
+++ b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -1,5 +1,21 @@
 package org.apache.blur.mapreduce.lib;
 
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 import static org.junit.Assert.assertTrue;
 
 import java.io.BufferedReader;
@@ -10,6 +26,8 @@ import java.io.InputStreamReader;
 
 import org.apache.blur.mapreduce.BlurMutate;
 import org.apache.blur.mapreduce.csv.CsvBlurMapper;
+import org.apache.blur.thrift.generated.AnalyzerDefinition;
+import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -56,20 +74,25 @@ public class BlurOutputFormatTest {
     localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
     writeFile("in/part1", "1,1,cf1,val1");
     writeFile("in/part2", "1,2,cf1,val2");
+    
     Job job = new Job(conf, "blur index");
     job.setJarByClass(BlurOutputFormatTest.class);
     job.setMapperClass(CsvBlurMapper.class);
     job.setReducerClass(DefaultBlurReducer.class);
-    job.setNumReduceTasks(4);
+    job.setNumReduceTasks(1);
     job.setInputFormatClass(TrackingTextInputFormat.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(BlurMutate.class);
     job.setOutputFormatClass(BlurOutputFormat.class);
 
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    BlurOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+    
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setAnalyzerDefinition(new AnalyzerDefinition());
+    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/out").toString());
+    BlurOutputFormat.setTableDescriptor(job, tableDescriptor);
 
     assertTrue(job.waitForCompletion(true));
     Counters ctrs = job.getCounters();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/6a9fde03/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurRecordWriterTest.java
b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurRecordWriterTest.java
deleted file mode 100644
index 4cd3e4c..0000000
--- a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurRecordWriterTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.blur.mapreduce.BlurRecord;
-import org.apache.blur.mapreduce.lib.BlurRecordWriter;
-import org.apache.blur.store.hdfs.HdfsDirectory;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.lucene.index.IndexReader;
-import org.junit.Test;
-
-
-public abstract class BlurRecordWriterTest {
-
-//  @Test
-//  public void testBlurRecordWriter() throws IOException, InterruptedException {
-//    JobID jobId = new JobID();
-//    TaskID tId = new TaskID(jobId, false, 13);
-//    TaskAttemptID taskId = new TaskAttemptID(tId, 0);
-//    Configuration conf = new Configuration();
-//    String pathStr = TMPDIR, "./tmp/output-record-writer-test-newapi";
-//    rm(new File(pathStr));
-//    conf.set("mapred.output.dir", pathStr);
-//    TaskAttemptContext context = new TaskAttemptContext(conf, taskId);
-//    BlurRecordWriter writer = new BlurRecordWriter(context);
-//
-//    Text key = new Text();
-//    BlurRecord value = new BlurRecord();
-//
-//    for (int i = 0; i < 10; i++) {
-//      String rowId = UUID.randomUUID().toString();
-//      key.set(rowId);
-//      value.setFamily("cf");
-//      value.setRowId(rowId);
-//      value.setRecordId(UUID.randomUUID().toString());
-//      value.addColumn("name", "value");
-//      writer.write(key, value);
-//    }
-//
-//    writer.close(context);
-//
-//    // assert index exists and has document
-//
-//    HdfsDirectory dir = new HdfsDirectory(new Path(pathStr, BlurUtil.getShardName(BlurConstants.SHARD_PREFIX,
13)));
-//    assertTrue(IndexReader.indexExists(dir));
-//    IndexReader reader = IndexReader.open(dir);
-//    assertEquals(10, reader.numDocs());
-//  }
-//
-//  private void rm(File file) {
-//    if (file.isDirectory()) {
-//      for (File f : file.listFiles()) {
-//        rm(f);
-//      }
-//    }
-//    file.delete();
-//  }
-
-}


Mime
View raw message