incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: Fixing issues with the counters in BlurOutputFormat.
Date Fri, 23 May 2014 01:45:42 GMT
Fixing issues with the counters in BlurOutputFormat.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/2e62b7ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/2e62b7ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/2e62b7ff

Branch: refs/heads/apache-blur-0.2
Commit: 2e62b7fff2fba1bbbabb84485214f05f23f92d56
Parents: eed70bb
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu May 22 21:24:52 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu May 22 21:24:52 2014 -0400

----------------------------------------------------------------------
 .../mapreduce/lib/GenericBlurRecordWriter.java  | 43 ++++++------
 .../apache/blur/mapreduce/lib/NullCounter.java  | 72 ++++++++++++++++++++
 2 files changed, 95 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2e62b7ff/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
index e138cd5..bda61be 100644
--- a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
@@ -55,7 +55,6 @@ import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.index.TieredMergePolicy;
 import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.NoLockFactory;
 
@@ -63,6 +62,7 @@ public class GenericBlurRecordWriter {
 
   private static final Log LOG = LogFactory.getLog(GenericBlurRecordWriter.class);
   private static final String JAVA_IO_TMPDIR = "java.io.tmpdir";
+  private static final Counter NULL_COUNTER = new NullCounter();
 
   private final Text _prevKey = new Text();
   private final Map<String, List<Field>> _documents = new TreeMap<String,
List<Field>>();
@@ -77,38 +77,39 @@ public class GenericBlurRecordWriter {
   private final Path _newIndex;
   private final boolean _indexLocally;
   private final boolean _optimizeInFlight;
-  private Counter _columnCount;
-  private Counter _fieldCount;
-  private Counter _recordCount;
-  private Counter _rowCount;
-  private Counter _recordDuplicateCount;
-  private Counter _rowOverFlowCount;
-  private Counter _rowDeleteCount;
-  private RateCounter _recordRateCounter;
-  private RateCounter _rowRateCounter;
-  private RateCounter _copyRateCounter;
+  private Counter _columnCount = NULL_COUNTER;
+  private Counter _fieldCount = NULL_COUNTER;
+  private Counter _recordCount = NULL_COUNTER;
+  private Counter _rowCount = NULL_COUNTER;
+  private Counter _recordDuplicateCount = NULL_COUNTER;
+  private Counter _rowOverFlowCount = NULL_COUNTER;
+  private Counter _rowDeleteCount = NULL_COUNTER;
+  private RateCounter _recordRateCounter = new RateCounter(NULL_COUNTER);
+  private RateCounter _rowRateCounter = new RateCounter(NULL_COUNTER);
+  private RateCounter _copyRateCounter = new RateCounter(NULL_COUNTER);
   private boolean _countersSetup = false;
   private IndexWriter _localTmpWriter;
   private boolean _usingLocalTmpindex;
   private File _localTmpPath;
   private ProgressableDirectory _localTmpDir;
   private String _deletedRowId;
+  private Configuration _configuration;
 
   public GenericBlurRecordWriter(Configuration configuration, int attemptId, String tmpDirName)
throws IOException {
+    _configuration = configuration;
+    _indexLocally = BlurOutputFormat.isIndexLocally(_configuration);
+    _optimizeInFlight = BlurOutputFormat.isOptimizeInFlight(_configuration);
 
-    _indexLocally = BlurOutputFormat.isIndexLocally(configuration);
-    _optimizeInFlight = BlurOutputFormat.isOptimizeInFlight(configuration);
-
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
     int shardCount = tableDescriptor.getShardCount();
     int shardId = attemptId % shardCount;
 
-    _maxDocumentBufferSize = BlurOutputFormat.getMaxDocumentBufferSize(configuration);
-    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
+    _maxDocumentBufferSize = BlurOutputFormat.getMaxDocumentBufferSize(_configuration);
+    Path tableOutput = BlurOutputFormat.getOutputPath(_configuration);
     String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
     Path indexPath = new Path(tableOutput, shardName);
     _newIndex = new Path(indexPath, tmpDirName);
-    _finalDir = new ProgressableDirectory(new HdfsDirectory(configuration, _newIndex), getProgressable());
+    _finalDir = new ProgressableDirectory(new HdfsDirectory(_configuration, _newIndex), getProgressable());
     _finalDir.setLockFactory(NoLockFactory.getNoLockFactory());
 
     TableContext tableContext = TableContext.create(tableDescriptor);
@@ -127,7 +128,8 @@ public class GenericBlurRecordWriter {
     if (_indexLocally) {
       String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
       _localPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
-      _localDir = new ProgressableDirectory(FSDirectory.open(_localPath), getProgressable());
+      HdfsDirectory directory = new HdfsDirectory(_configuration, new Path(_localPath.toURI()));
+      _localDir = new ProgressableDirectory(directory, getProgressable());
       _writer = new IndexWriter(_localDir, _conf.clone());
     } else {
       _localPath = null;
@@ -212,7 +214,8 @@ public class GenericBlurRecordWriter {
     if (_localTmpWriter == null) {
       String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
       _localTmpPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
-      _localTmpDir = new ProgressableDirectory(FSDirectory.open(_localTmpPath), BlurOutputFormat.getProgressable());
+      HdfsDirectory directory = new HdfsDirectory(_configuration, new Path(_localTmpPath.toURI()));
+      _localTmpDir = new ProgressableDirectory(directory, BlurOutputFormat.getProgressable());
       _localTmpWriter = new IndexWriter(_localTmpDir, _overFlowConf.clone());
       // The local tmp writer has merging disabled so the first document in is
       // going to be doc 0.

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2e62b7ff/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/NullCounter.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/NullCounter.java
b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/NullCounter.java
new file mode 100644
index 0000000..4b180fd
--- /dev/null
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/NullCounter.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Counter;
+
+public class NullCounter implements Counter {
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+
+  }
+
+  @Override
+  public void setDisplayName(String displayName) {
+
+  }
+
+  @Override
+  public String getName() {
+    return null;
+  }
+
+  @Override
+  public String getDisplayName() {
+    return null;
+  }
+
+  @Override
+  public long getValue() {
+    return 0;
+  }
+
+  @Override
+  public void setValue(long value) {
+
+  }
+
+  @Override
+  public void increment(long incr) {
+
+  }
+
+  @Override
+  public Counter getUnderlyingCounter() {
+    return null;
+  }
+
+}


Mime
View raw message