incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/3] git commit: Fixing issue with record reader where needed all the files in the processing cluster cache directory instead of the few required.
Date Fri, 31 Jul 2015 14:40:36 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 4fd868712 -> 13c7e3d89


Fixing issue with record reader where needed all the files in the processing cluster cache
directory instead of the few required.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/ab31c5fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/ab31c5fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/ab31c5fe

Branch: refs/heads/master
Commit: ab31c5fea8c167841e641c7154a421ae6f4983d6
Parents: 4fd8687
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri Jul 31 10:02:29 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri Jul 31 10:02:29 2015 -0400

----------------------------------------------------------------------
 .../blur/mapreduce/lib/GenericRecordReader.java | 65 +++++++++++++++-----
 1 file changed, 51 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ab31c5fe/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java
b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java
index af5ce9f..1335a73 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericRecordReader.java
@@ -30,17 +30,22 @@ import org.apache.blur.utils.RowDocumentUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.lucene.codecs.Codec;
+import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.codecs.StoredFieldsReader;
 import org.apache.lucene.document.DocumentStoredFieldVisitor;
+import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.FieldInfos;
 import org.apache.lucene.index.SegmentInfo;
 import org.apache.lucene.index.SegmentInfoPerCommit;
 import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.store.ChecksumIndexInput;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.IOUtils;
 
 public class GenericRecordReader {
 
@@ -71,9 +76,8 @@ public class GenericRecordReader {
     LOG.info("Local cache path [{0}]", localCachePath);
     _directory = BlurInputFormat.getDirectory(configuration, _table.toString(), blurInputSplit.getDir(),
files);
 
-    SegmentInfos segmentInfos = new SegmentInfos();
-    segmentInfos.read(_directory, blurInputSplit.getSegmentsName());
-    SegmentInfoPerCommit commit = findSegmentInfoPerCommit(segmentInfos, blurInputSplit);
+    SegmentInfoPerCommit commit = segmentInfosRead(_directory, blurInputSplit.getSegmentsName(),
+        blurInputSplit.getSegmentInfoName());
 
     SegmentInfo segmentInfo = commit.info;
     if (localCachePath != null) {
@@ -98,6 +102,50 @@ public class GenericRecordReader {
     _maxDoc = commit.info.getDocCount();
   }
 
+  private SegmentInfoPerCommit segmentInfosRead(Directory directory, String segmentFileName,
String segmentInfoName)
+      throws IOException {
+    boolean success = false;
+
+    ChecksumIndexInput input = new ChecksumIndexInput(directory.openInput(segmentFileName,
IOContext.READ));
+    try {
+      final int format = input.readInt();
+      if (format == CodecUtil.CODEC_MAGIC) {
+        // 4.0+
+        CodecUtil.checkHeaderNoMagic(input, "segments", SegmentInfos.VERSION_40, SegmentInfos.VERSION_40);
+        input.readLong();// read version
+        input.readInt(); // read counter
+        int numSegments = input.readInt();
+        if (numSegments < 0) {
+          throw new CorruptIndexException("invalid segment count: " + numSegments + " (resource:
" + input + ")");
+        }
+        for (int seg = 0; seg < numSegments; seg++) {
+          String segName = input.readString();
+          Codec codec = Codec.forName(input.readString());
+          SegmentInfo info = codec.segmentInfoFormat().getSegmentInfoReader().read(directory,
segName, IOContext.READ);
+          info.setCodec(codec);
+          long delGen = input.readLong();
+          int delCount = input.readInt();
+          if (delCount < 0 || delCount > info.getDocCount()) {
+            throw new CorruptIndexException("invalid deletion count: " + delCount + " (resource:
" + input + ")");
+          }
+          if (segName.equals(segmentInfoName)) {
+            success = true;
+            return new SegmentInfoPerCommit(info, delCount, delGen);
+          }
+        }
+      } else {
+        throw new IOException("Legacy Infos not supported for dir [" + directory + "].");
+      }
+      throw new IOException("Segment [" + segmentInfoName + "] nout found in dir [" + directory
+ "]");
+    } finally {
+      if (!success) {
+        IOUtils.closeWhileHandlingException(input);
+      } else {
+        input.close();
+      }
+    }
+  }
+
   private static Directory copyFilesLocally(Configuration configuration, Directory dir, String
table, Path shardDir,
       Path localCachePath, Collection<String> files) throws IOException {
     LOG.info("Copying files need to local cache for faster reads [{0}].", shardDir);
@@ -170,17 +218,6 @@ public class GenericRecordReader {
     return false;
   }
 
-  private SegmentInfoPerCommit findSegmentInfoPerCommit(SegmentInfos segmentInfos, BlurInputSplit
blurInputSplit)
-      throws IOException {
-    String segmentInfoName = blurInputSplit.getSegmentInfoName();
-    for (SegmentInfoPerCommit commit : segmentInfos) {
-      if (commit.info.name.equals(segmentInfoName)) {
-        return commit;
-      }
-    }
-    throw new IOException("SegmentInfoPerCommit of [" + segmentInfoName + "] not found.");
-  }
-
   public boolean nextKeyValue() throws IOException {
     if (_docId >= _maxDoc) {
       return false;


Mime
View raw message