incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/3] git commit: Fixing issue where the update reducer handles dups before the sending data to the output format.
Date Fri, 31 Jul 2015 14:40:38 GMT
Fixing issue where the update reducer handles dups before the sending data to the output format.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/13c7e3d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/13c7e3d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/13c7e3d8

Branch: refs/heads/master
Commit: 13c7e3d8963d5d628b4f82ab8190a269058937af
Parents: 9d5f1c3
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri Jul 31 10:40:22 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri Jul 31 10:40:22 2015 -0400

----------------------------------------------------------------------
 .../blur/mapreduce/lib/update/IndexValue.java   |   5 +
 .../mapreduce/lib/update/UpdateReducer.java     | 129 +++++++++++++++----
 2 files changed, 112 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/13c7e3d8/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexValue.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexValue.java
b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexValue.java
index 2f8a61e..126c92e 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexValue.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/IndexValue.java
@@ -63,4 +63,9 @@ public class IndexValue implements Writable {
     this._blurRecord = blurRecord;
   }
 
+  @Override
+  public String toString() {
+    return "IndexValue [_blurRecord=" + _blurRecord + "]";
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/13c7e3d8/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java
b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java
index c1b65ec..f8705aa 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/UpdateReducer.java
@@ -30,6 +30,30 @@ import org.apache.hadoop.mapreduce.Reducer;
 
 public class UpdateReducer extends Reducer<IndexKey, IndexValue, Text, BlurMutate>
{
 
+  private static final String IGNORED_EXISTING_ROWS = "Ignored Existing Rows";
+  private static final String MULTIPLE_RECORD_W_SAME_RECORD_ID = "Multiple Record w/ Same
Record Id";
+  private static final String INDEX_VALUES = "IndexValues";
+  private static final String NULL_BLUR_RECORDS = "NULL Blur Records";
+  private static final String MARKER_RECORDS = "Marker Records";
+  private static final String SEP = " - ";
+  private static final String BLUR_UPDATE = "Blur Update";
+  private static final String EXISTING_RCORDS = "Existing Rcords";
+  private static final String NEW_RCORDS = "New Rcords";
+  private static final String NO_UPDATE = "NoUpdate";
+  private static final String UPDATE = "Update";
+  private static final String BLUR_UPDATE_DEBUG = BLUR_UPDATE + SEP + "DEBUG";
+
+  private Counter _newRecordsUpdate;
+  private Counter _newRecordsNoUpdate;
+  private Counter _existingRecordsUpdate;
+  private Counter _existingRecordsNoUpdate;
+  private Counter _ignoredExistingRows;
+  private Counter _debugRecordsWithSameRecordId;
+  private Counter _debugMarkerRecordsNoUpdate;
+  private Counter _debugMarkerRecordsUpdate;
+  private Counter _debugIndexValues;
+  private Counter _debugNullBlurRecords;
+
   @Override
   protected void setup(final Context context) throws IOException, InterruptedException {
     BlurOutputFormat.setProgressable(context);
@@ -39,40 +63,101 @@ public class UpdateReducer extends Reducer<IndexKey, IndexValue, Text,
BlurMutat
         return context.getCounter(counterName);
       }
     });
+
+    _newRecordsUpdate = context.getCounter(BLUR_UPDATE, NEW_RCORDS + SEP + UPDATE);
+    _newRecordsNoUpdate = context.getCounter(BLUR_UPDATE, NEW_RCORDS + SEP + NO_UPDATE);
+    _existingRecordsUpdate = context.getCounter(BLUR_UPDATE, EXISTING_RCORDS + SEP + UPDATE);
+    _existingRecordsNoUpdate = context.getCounter(BLUR_UPDATE, EXISTING_RCORDS + SEP + NO_UPDATE);
+    _ignoredExistingRows = context.getCounter(BLUR_UPDATE, IGNORED_EXISTING_ROWS);
+
+    _debugRecordsWithSameRecordId = context.getCounter(BLUR_UPDATE_DEBUG, MULTIPLE_RECORD_W_SAME_RECORD_ID);
+
+    _debugMarkerRecordsNoUpdate = context.getCounter(BLUR_UPDATE_DEBUG, MARKER_RECORDS +
SEP + NO_UPDATE);
+    _debugMarkerRecordsUpdate = context.getCounter(BLUR_UPDATE_DEBUG, MARKER_RECORDS + SEP
+ UPDATE);
+    _debugIndexValues = context.getCounter(BLUR_UPDATE_DEBUG, INDEX_VALUES);
+    _debugNullBlurRecords = context.getCounter(BLUR_UPDATE_DEBUG, NULL_BLUR_RECORDS);
+
   }
 
   @Override
   protected void reduce(IndexKey key, Iterable<IndexValue> values, Context context)
throws IOException,
       InterruptedException {
     if (key.getType() != TYPE.NEW_DATA_MARKER) {
-      // There is no new data for this row, skip.
-      return;
+      handleNoNewData(key, values);
     } else {
-      BlurRecord prevBlurRecord = null;
-      String prevRecordId = null;
-      for (IndexValue value : values) {
-        BlurRecord br = value.getBlurRecord();
-        if (br == null) {
-          // Skip null records because there were likely many new data markers
-          // for the row.
-          continue;
-        }
+      handleNewData(key, values, context);
+    }
+  }
 
+  private void handleNewData(IndexKey key, Iterable<IndexValue> values, Context context)
throws IOException,
+      InterruptedException {
+    BlurRecord prevBlurRecord = null;
+    String prevRecordId = null;
+    for (IndexValue value : values) {
+      updateCounters(true, key);
+      BlurRecord br = value.getBlurRecord();
+      if (br == null) {
+        // Skip null records because there were likely many new data markers
+        // for the row.
+        _debugNullBlurRecords.increment(1);
+      } else {
         // Safe Copy
-        BlurRecord blurRecord = new BlurRecord(br);
-        String recordId = blurRecord.getRecordId();
-        if (prevRecordId == null || prevRecordId.equals(recordId)) {
-          // reassign to new record.
-          prevBlurRecord = blurRecord;
-          prevRecordId = recordId;
-        } else {
-          // flush prev and assign
-          context.write(new Text(blurRecord.getRowId()), toMutate(blurRecord));
+        BlurRecord currentBlurRecord = new BlurRecord(br);
+        String currentRecordId = currentBlurRecord.getRecordId();
+        if (prevRecordId != null) {
+          if (prevRecordId.equals(currentRecordId)) {
+            _debugRecordsWithSameRecordId.increment(1);
+          } else {
+            // flush prev
+            context.write(new Text(prevBlurRecord.getRowId()), toMutate(prevBlurRecord));
+          }
         }
+        // assign
+        prevBlurRecord = currentBlurRecord;
+        prevRecordId = currentRecordId;
       }
-      if (prevBlurRecord != null) {
-        context.write(new Text(prevBlurRecord.getRowId()), toMutate(prevBlurRecord));
+    }
+    if (prevBlurRecord != null) {
+      context.write(new Text(prevBlurRecord.getRowId()), toMutate(prevBlurRecord));
+    }
+  }
+
+  private void updateCounters(boolean update, IndexKey key) {
+    _debugIndexValues.increment(1);
+    if (update) {
+      switch (key.getType()) {
+      case NEW_DATA:
+        _newRecordsUpdate.increment(1);
+        break;
+      case OLD_DATA:
+        _existingRecordsUpdate.increment(1);
+        break;
+      case NEW_DATA_MARKER:
+        _debugMarkerRecordsUpdate.increment(1);
+      default:
+        break;
       }
+    } else {
+      switch (key.getType()) {
+      case NEW_DATA:
+        _newRecordsNoUpdate.increment(1);
+        break;
+      case OLD_DATA:
+        _existingRecordsNoUpdate.increment(1);
+        break;
+      case NEW_DATA_MARKER:
+        _debugMarkerRecordsNoUpdate.increment(1);
+      default:
+        break;
+      }
+    }
+  }
+
+  private void handleNoNewData(IndexKey key, Iterable<IndexValue> values) {
+    _ignoredExistingRows.increment(1);
+    for (@SuppressWarnings("unused")
+    IndexValue indexValue : values) {
+      updateCounters(false, key);
     }
   }
 


Mime
View raw message