incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Adding counters in the bluroutputformat.
Date Fri, 17 May 2013 18:16:09 GMT
Updated Branches:
  refs/heads/0.1.5 8b85fcfc5 -> 5d49585c9


Adding counters in the bluroutputformat.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/5d49585c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/5d49585c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/5d49585c

Branch: refs/heads/0.1.5
Commit: 5d49585c996fdbec3d3c6aacd8d1cda3baa22f20
Parents: 8b85fcf
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri May 17 14:15:24 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri May 17 14:15:24 2013 -0400

----------------------------------------------------------------------
 .../apache/blur/mapreduce/lib/BlurCounters.java    |    2 +-
 .../blur/mapreduce/lib/BlurOutputFormat.java       |  104 +++++++------
 .../blur/mapreduce/lib/CopyRateDirectory.java      |  124 +++++++++++++++
 .../org/apache/blur/mapreduce/lib/RateCounter.java |   64 ++++++++
 4 files changed, 248 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5d49585c/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
index 99372b1..efcd645 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
@@ -21,6 +21,6 @@ package org.apache.blur.mapreduce.lib;
  * The enum class used for all the internal counters during map reduce jobs.
  */
 public enum BlurCounters {
-  RECORD_COUNT, FIELD_COUNT, ROW_COUNT
+  RECORD_COUNT, FIELD_COUNT, ROW_COUNT, RECORD_RATE, COPY_RATE, ROW_RATE
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5d49585c/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
index d462780..141cee7 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -120,48 +120,6 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     return new BlurOutputCommitter();
   }
 
-  private OutputCommitter getDoNothing() {
-    return new OutputCommitter() {
-
-      @Override
-      public void commitJob(JobContext jobContext) throws IOException {
-      }
-
-      @Override
-      public void cleanupJob(JobContext context) throws IOException {
-      }
-
-      @Override
-      public void abortJob(JobContext jobContext, State state) throws IOException {
-      }
-
-      @Override
-      public void setupTask(TaskAttemptContext taskContext) throws IOException {
-
-      }
-
-      @Override
-      public void setupJob(JobContext jobContext) throws IOException {
-
-      }
-
-      @Override
-      public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
-        return false;
-      }
-
-      @Override
-      public void commitTask(TaskAttemptContext taskContext) throws IOException {
-
-      }
-
-      @Override
-      public void abortTask(TaskAttemptContext taskContext) throws IOException {
-
-      }
-    };
-  }
-
   public static TableDescriptor getTableDescriptor(Configuration configuration) throws IOException
{
     String tableDesStr = configuration.get(BLUR_TABLE_DESCRIPTOR);
     if (tableDesStr == null) {
@@ -221,6 +179,9 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     private Counter _recordCount;
     private Counter _rowCount;
     private boolean _countersSetup = false;
+    private RateCounter _recordRateCounter;
+    private RateCounter _rowRateCounter;
+    private RateCounter _copyRateCounter;
 
     public BlurRecordWriter(Configuration configuration, BlurAnalyzer blurAnalyzer, int shardId,
String tmpDirName)
         throws IOException {
@@ -240,7 +201,6 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
       _localPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
       _localDir = new ProgressableDirectory(FSDirectory.open(_localPath), BlurOutputFormat.getProgressable());
       _writer = new IndexWriter(_localDir, conf);
-
     }
 
     @Override
@@ -261,6 +221,9 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
       _fieldCount = getCounter.getCounter(BlurCounters.FIELD_COUNT);
       _recordCount = getCounter.getCounter(BlurCounters.RECORD_COUNT);
       _rowCount = getCounter.getCounter(BlurCounters.ROW_COUNT);
+      _recordRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.RECORD_RATE));
+      _rowRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.ROW_RATE));
+      _copyRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.COPY_RATE));
     }
 
     private void add(BlurMutate value) {
@@ -271,7 +234,8 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
         document.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE,
Store.NO));
       }
       _documents.add(document);
-      LOG.error("Needs to use blur analyzer and field converter");
+      _fieldCount.increment(document.getFields().size());
+      _recordCount.increment(1);
     }
 
     private Record getRecord(BlurRecord value) {
@@ -289,6 +253,9 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
         return;
       }
       _writer.addDocuments(_documents);
+      _rowCount.increment(1);
+      _recordRateCounter.mark(_documents.size());
+      _rowRateCounter.mark();
       _documents.clear();
     }
 
@@ -296,14 +263,18 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
       flush();
       _writer.close();
+      _recordRateCounter.close();
+      _rowRateCounter.close();
       copyDir();
+      _copyRateCounter.close();
     }
 
     private void copyDir() throws IOException {
+      CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
       String[] fileNames = _localDir.listAll();
       for (String fileName : fileNames) {
         LOG.info("Copying [{0}]", fileName);
-        _localDir.copy(_finalDir, fileName, fileName, IOContext.DEFAULT);
+        _localDir.copy(copyRateDirectory, fileName, fileName, IOContext.DEFAULT);
       }
       rm(_localPath);
     }
@@ -339,4 +310,47 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     job.setOutputFormatClass(BlurOutputFormat.class);
     setTableDescriptor(job, tableDescriptor);
   }
+
+  private OutputCommitter getDoNothing() {
+    return new OutputCommitter() {
+
+      @Override
+      public void commitJob(JobContext jobContext) throws IOException {
+      }
+
+      @Override
+      public void cleanupJob(JobContext context) throws IOException {
+      }
+
+      @Override
+      public void abortJob(JobContext jobContext, State state) throws IOException {
+      }
+
+      @Override
+      public void setupTask(TaskAttemptContext taskContext) throws IOException {
+
+      }
+
+      @Override
+      public void setupJob(JobContext jobContext) throws IOException {
+
+      }
+
+      @Override
+      public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
+        return false;
+      }
+
+      @Override
+      public void commitTask(TaskAttemptContext taskContext) throws IOException {
+
+      }
+
+      @Override
+      public void abortTask(TaskAttemptContext taskContext) throws IOException {
+
+      }
+    };
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5d49585c/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CopyRateDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CopyRateDirectory.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CopyRateDirectory.java
new file mode 100644
index 0000000..886f2d6
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CopyRateDirectory.java
@@ -0,0 +1,124 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+
+/**
+ * Decorator of Directory to capture the copy rate of a directory copy.
+ */
+public class CopyRateDirectory extends Directory {
+
+  private final Directory _directory;
+  private final RateCounter _copyRateCounter;
+
+  public CopyRateDirectory(Directory dir, RateCounter copyRateCounter) {
+    _directory = dir;
+    _copyRateCounter = copyRateCounter;
+  }
+
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    return wrap(_directory.createOutput(name, context));
+  }
+
+  private IndexOutput wrap(IndexOutput output) {
+    return new CopyRateIndexOutput(output, _copyRateCounter);
+  }
+
+  static class CopyRateIndexOutput extends IndexOutput {
+
+    private final IndexOutput _indexOutput;
+    private final RateCounter _copyRateCounter;
+
+    public CopyRateIndexOutput(IndexOutput output, RateCounter copyRateCounter) {
+      _indexOutput = output;
+      _copyRateCounter = copyRateCounter;
+    }
+
+    public void copyBytes(DataInput input, long numBytes) throws IOException {
+      _indexOutput.copyBytes(input, numBytes);
+      _copyRateCounter.mark(numBytes);
+    }
+
+    public void writeByte(byte b) throws IOException {
+      _indexOutput.writeByte(b);
+      _copyRateCounter.mark();
+    }
+
+    public void flush() throws IOException {
+      _indexOutput.flush();
+    }
+
+    public void close() throws IOException {
+      _indexOutput.close();
+    }
+
+    public long getFilePointer() {
+      return _indexOutput.getFilePointer();
+    }
+
+    @SuppressWarnings("deprecation")
+    public void seek(long pos) throws IOException {
+      _indexOutput.seek(pos);
+    }
+
+    public void writeBytes(byte[] b, int offset, int length) throws IOException {
+      _indexOutput.writeBytes(b, offset, length);
+      _copyRateCounter.mark(length);
+    }
+
+    public long length() throws IOException {
+      return _indexOutput.length();
+    }
+  }
+
+  public String[] listAll() throws IOException {
+    return _directory.listAll();
+  }
+
+  public boolean fileExists(String name) throws IOException {
+    return _directory.fileExists(name);
+  }
+
+  public void deleteFile(String name) throws IOException {
+    _directory.deleteFile(name);
+  }
+
+  public long fileLength(String name) throws IOException {
+    return _directory.fileLength(name);
+  }
+
+  public void sync(Collection<String> names) throws IOException {
+    _directory.sync(names);
+  }
+
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    return _directory.openInput(name, context);
+  }
+
+  public void close() throws IOException {
+    _directory.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/5d49585c/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
new file mode 100644
index 0000000..ff8ee20
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
@@ -0,0 +1,64 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.mapreduce.Counter;
+
+/**
+ * This turns a standard hadoop counter into a rate counter.
+ */
+public class RateCounter {
+
+  private final Counter _counter;
+  private final long _reportTime;
+  private final long _rateTime;
+  private long _lastReport;
+  private long _count = 0;
+
+  public RateCounter(Counter counter) {
+    this(counter, TimeUnit.SECONDS, 10);
+  }
+
+  public RateCounter(Counter counter, TimeUnit unit, long reportTime) {
+    _counter = counter;
+    _lastReport = System.nanoTime();
+    _reportTime = unit.toNanos(reportTime);
+    _rateTime = unit.toSeconds(reportTime);
+  }
+
+  public void mark() {
+    mark(1l);
+  }
+
+  public void mark(long n) {
+    long now = System.nanoTime();
+    if (_lastReport + _reportTime < now) {
+      long rate = _count / _rateTime;
+      _counter.setValue(rate);
+      _lastReport = System.nanoTime();
+      _count = 0;
+    }
+    _count += n;
+  }
+
+  public void close() {
+    _counter.setValue(0);
+  }
+
+}


Mime
View raw message