incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Fixed BLUR-94.
Date Wed, 22 May 2013 02:02:45 GMT
Updated Branches:
  refs/heads/0.1.5 6302f0fdf -> 0d23eddda


Fixed BLUR-94.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/0d23eddd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/0d23eddd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/0d23eddd

Branch: refs/heads/0.1.5
Commit: 0d23eddda70bbee34b2e9d38240b29825455448e
Parents: 6302f0f
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue May 21 22:02:31 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue May 21 22:02:31 2013 -0400

----------------------------------------------------------------------
 .../blur/mapreduce/lib/BlurOutputFormat.java       |  134 ++++++++++++++-
 .../blur/mapreduce/lib/BlurOutputFormatTest.java   |    3 +
 2 files changed, 131 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0d23eddd/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
index e194285..053d41e 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -30,6 +30,8 @@ import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.LuceneVersionConstant;
 import org.apache.blur.manager.writer.TransactionRecorder;
 import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.Column;
 import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.thrift.generated.TableDescriptor;
@@ -63,8 +65,36 @@ import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TJSONProtocol;
 import org.apache.thrift.transport.TIOStreamTransport;
 
+/**
+ * {@link BlurOutputFormat} is used to index data and delivery the indexes to
+ * the proper Blur table for searching. A typical usage of this class would be
+ * as follows.<br/>
+ * <br/>
+ * 
+ * <br/>
+ * {@link Iface} client = {@link BlurClient}.getClient("controller1:40010");<br/>
+ * <br/>
+ * TableDescriptor tableDescriptor = client.describe(tableName);<br/>
+ * <br/>
+ * Job job = new Job(jobConf, "blur index");<br/>
+ * job.setJarByClass(BlurOutputFormatTest.class);<br/>
+ * job.setMapperClass(CsvBlurMapper.class);<br/>
+ * job.setInputFormatClass(TextInputFormat.class);<br/>
+ * <br/>
+ * FileInputFormat.addInputPath(job, new Path(input));<br/>
+ * CsvBlurMapper.addColumns(job, "cf1", "col");<br/>
+ * <br/>
+ * BlurOutputFormat.setupJob(job, tableDescriptor);<br/>
+ * BlurOutputFormat.setIndexLocally(job, true);<br/>
+ * BlurOutputFormat.setOptimizeInFlight(job, false);<br/>
+ * <br/>
+ * job.waitForCompletion(true);<br/>
+ * 
+ */
 public class BlurOutputFormat extends OutputFormat<Text, BlurMutate> {
 
+  public static final String BLUR_OUTPUT_OPTIMIZEINFLIGHT = "blur.output.optimizeinflight";
+  public static final String BLUR_OUTPUT_INDEXLOCALLY = "blur.output.indexlocally";
   public static final String BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE = "blur.output.max.document.buffer.size";
   public static final String BLUR_TABLE_DESCRIPTOR = "blur.table.descriptor";
   public static final String BLUR_OUTPUT_PATH = "blur.output.path";
@@ -212,6 +242,70 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     return new Path(configuration.get(BLUR_OUTPUT_PATH));
   }
 
+  /**
+   * Enabled by default, this will enable local indexing on the machine where
+   * the task is running. Then when the {@link RecordWriter} closes the index is
+   * copied to the remote destination in HDFS.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param b
+   *          the boolean to true enable, false to disable.
+   */
+  public static void setIndexLocally(Job job, boolean b) {
+    setIndexLocally(job.getConfiguration(), b);
+  }
+
+  /**
+   * Enabled by default, this will enable local indexing on the machine where
+   * the task is running. Then when the {@link RecordWriter} closes the index is
+   * copied to the remote destination in HDFS.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param b
+   *          the boolean to true enable, false to disable.
+   */
+  public static void setIndexLocally(Configuration configuration, boolean b) {
+    configuration.setBoolean(BLUR_OUTPUT_INDEXLOCALLY, b);
+  }
+
+  public static boolean isIndexLocally(Configuration configuration) {
+    return configuration.getBoolean(BLUR_OUTPUT_INDEXLOCALLY, true);
+  }
+
+  /**
+   * Enabled by default, this will optimize the index while copying from the
+   * local index to the remote destination in HDFS. Used in conjunction with the
+   * setIndexLocally.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param b
+   *          the boolean to true enable, false to disable.
+   */
+  public static void setOptimizeInFlight(Job job, boolean b) {
+    setOptimizeInFlight(job.getConfiguration(), b);
+  }
+
+  /**
+   * Enabled by default, this will optimize the index while copying from the
+   * local index to the remote destination in HDFS. Used in conjunction with the
+   * setIndexLocally.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param b
+   *          the boolean to true enable, false to disable.
+   */
+  public static void setOptimizeInFlight(Configuration configuration, boolean b) {
+    configuration.setBoolean(BLUR_OUTPUT_OPTIMIZEINFLIGHT, b);
+  }
+
+  public static boolean isOptimizeInFlight(Configuration configuration) {
+    return configuration.getBoolean(BLUR_OUTPUT_OPTIMIZEINFLIGHT, true);
+  }
+
   static class BlurRecordWriter extends RecordWriter<Text, BlurMutate> {
 
     private static final Log LOG = LogFactory.getLog(BlurRecordWriter.class);
@@ -226,6 +320,9 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     private final File _localPath;
     private final int _maxDocumentBufferSize;
     private final IndexWriterConfig _conf;
+    private final Path _newIndex;
+    private final boolean _indexLocally;
+    private final boolean _optimizeInFlight;
     private Counter _fieldCount;
     private Counter _recordCount;
     private Counter _rowCount;
@@ -239,11 +336,13 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     private File _localTmpPath;
     private ProgressableDirectory _localTmpDir;
     private Counter _rowOverFlowCount;
-    private final Path _newIndex;
 
     public BlurRecordWriter(Configuration configuration, BlurAnalyzer blurAnalyzer, int attemptId,
String tmpDirName)
         throws IOException {
 
+      _indexLocally = BlurOutputFormat.isIndexLocally(configuration);
+      _optimizeInFlight = BlurOutputFormat.isOptimizeInFlight(configuration);
+
       TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
       int shardCount = tableDescriptor.getShardCount();
       int shardId = attemptId % shardCount;
@@ -262,10 +361,16 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
       TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
       mergePolicy.setUseCompoundFile(false);
 
-      String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
-      _localPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
-      _localDir = new ProgressableDirectory(FSDirectory.open(_localPath), BlurOutputFormat.getProgressable());
-      _writer = new IndexWriter(_localDir, _conf.clone());
+      if (_indexLocally) {
+        String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
+        _localPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
+        _localDir = new ProgressableDirectory(FSDirectory.open(_localPath), BlurOutputFormat.getProgressable());
+        _writer = new IndexWriter(_localDir, _conf.clone());
+      } else {
+        _localPath = null;
+        _localDir = null;
+        _writer = new IndexWriter(_finalDir, _conf.clone());
+      }
     }
 
     @Override
@@ -378,10 +483,26 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
       _writer.close();
       _recordRateCounter.close();
       _rowRateCounter.close();
-      copyDir();
+      if (_indexLocally) {
+        if (_optimizeInFlight) {
+          copyAndOptimizeInFlightDir();
+        } else {
+          copyDir();
+        }
+      }
       _copyRateCounter.close();
     }
 
+    private void copyAndOptimizeInFlightDir() throws IOException {
+      CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
+      copyRateDirectory.setLockFactory(NoLockFactory.getNoLockFactory());
+      DirectoryReader reader = DirectoryReader.open(_localDir);
+      IndexWriter writer = new IndexWriter(copyRateDirectory, _conf.clone());
+      writer.addIndexes(reader);
+      writer.close();
+      rm(_localPath);
+    }
+
     private void copyDir() throws IOException {
       CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
       String[] fileNames = _localDir.listAll();
@@ -423,4 +544,5 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     job.setOutputFormatClass(BlurOutputFormat.class);
     setTableDescriptor(job, tableDescriptor);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0d23eddd/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
index 05c4a9d..36bfba9 100644
--- a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
+++ b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -149,6 +149,8 @@ public class BlurOutputFormatTest {
     tableDescriptor.setTableUri(tableUri);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setIndexLocally(job, true);
+    BlurOutputFormat.setOptimizeInFlight(job, false);
 
     assertTrue(job.waitForCompletion(true));
     Counters ctrs = job.getCounters();
@@ -187,6 +189,7 @@ public class BlurOutputFormatTest {
     tableDescriptor.setTableUri(tableUri);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurOutputFormat.setIndexLocally(job, false);
 
     assertTrue(job.waitForCompletion(true));
     Counters ctrs = job.getCounters();


Mime
View raw message