incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: BLUR:305- Blur CombineInputFormat.
Date Thu, 05 Dec 2013 14:30:41 GMT
BLUR:305- Blur CombineInputFormat.

Signed-off-by: Aaron McCurry <amccurry@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/8bf22a10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/8bf22a10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/8bf22a10

Branch: refs/heads/master
Commit: 8bf22a10e83690ad83e21198ddaeeba5bec1967b
Parents: bbfaf51
Author: Gagan <gagandeepjuneja@gmail.com>
Authored: Wed Dec 4 15:17:50 2013 +0530
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Dec 5 09:29:52 2013 -0500

----------------------------------------------------------------------
 .../blur/mapreduce/lib/CsvBlurDriver.java       | 65 ++++++++++++++++++--
 .../blur/mapreduce/lib/CsvBlurMapper.java       |  2 +-
 2 files changed, 60 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8bf22a10/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
index 570db31..6830e32 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
@@ -49,9 +49,11 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
 
@@ -178,6 +180,10 @@ public class CsvBlurDriver {
     }
     // processing the 'I' option
     if (cmd.hasOption("I")) {
+    	if(cmd.hasOption("C")){
+    		 System.err.println("'I' and 'C' both parameters can not be used together.");
+             return null;
+    	}
       Option[] options = cmd.getOptions();
       for (Option option : options) {
         if (option.getOpt().equals("I")) {
@@ -346,11 +352,58 @@ public class CsvBlurDriver {
 
   public static class CsvBlurCombineSequenceFileInputFormat extends CombineFileInputFormat<Writable,
Text> {
 
-    @Override
-    public RecordReader<Writable, Text> createRecordReader(InputSplit split, TaskAttemptContext
context)
-        throws IOException {
-      return new SequenceFileRecordReader<Writable, Text>();
-    }
+    
+    private static class SequenceFileRecordReaderWrapper extends RecordReader<Writable,
Text>{
+    	
+    	private final RecordReader<Writable,Text> delegate;
+    	private final FileSplit fileSplit;
+
+		@SuppressWarnings("unused")
+		public SequenceFileRecordReaderWrapper(CombineFileSplit split,
+            TaskAttemptContext context, Integer index) throws IOException{
+            fileSplit = new FileSplit(split.getPath(index),
+                      split.getOffset(index), split.getLength(index),
+                      split.getLocations());
+            delegate = new SequenceFileInputFormat<Writable,Text>().createRecordReader(fileSplit,
context);
+        }
+
+        @Override public float getProgress() throws IOException, InterruptedException {
+            return delegate.getProgress();
+        }
+
+		@Override
+		public Writable getCurrentKey() throws IOException,
+				InterruptedException {
+			return delegate.getCurrentKey();
+		}
 
+		@Override
+		public Text getCurrentValue() throws IOException, InterruptedException {
+			return delegate.getCurrentValue();
+		}
+
+		@Override
+		public void initialize(InputSplit arg0, TaskAttemptContext context)
+				throws IOException, InterruptedException {
+			delegate.initialize(fileSplit, context);
+		}
+
+		@Override
+		public boolean nextKeyValue() throws IOException, InterruptedException {
+			return delegate.nextKeyValue();
+		}
+		
+		@Override public void close() throws IOException {
+            delegate.close();
+		}
+
+    }
+    	
+    @Override
+	public RecordReader<Writable, Text> createRecordReader(
+			InputSplit split, TaskAttemptContext context) throws IOException {
+		return new CombineFileRecordReader<Writable, Text>((CombineFileSplit) split, context,
SequenceFileRecordReaderWrapper.class);
+	}
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8bf22a10/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
index 4c769b6..154fc21 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
@@ -366,7 +366,7 @@ public class CsvBlurMapper extends BaseBlurMapper<Writable, Text>
{
 
   private Path getCurrentFile(Context context) throws IOException {
     InputSplit split = context.getInputSplit();
-    if (split != null) {
+    if (split != null && split instanceof FileSplit) {
       FileSplit inputSplit = (FileSplit) split;
       Path path = inputSplit.getPath();
       return path.makeQualified(path.getFileSystem(context.getConfiguration()));


Mime
View raw message