incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Fixed BLUR-420 BLUR-421
Date Mon, 16 Mar 2015 12:28:20 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 5e46ce88e -> 183f8451a


Fixed BLUR-420 BLUR-421


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/183f8451
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/183f8451
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/183f8451

Branch: refs/heads/master
Commit: 183f8451a66c0ac533d3103359843fffae2267af
Parents: 5e46ce8
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Mar 16 08:28:00 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Mar 16 08:28:00 2015 -0400

----------------------------------------------------------------------
 .../blur/mapreduce/lib/CsvBlurDriver.java       | 169 +++++++++++--------
 .../blur/mapreduce/lib/CsvBlurDriverTest.java   |  32 ++--
 .../blur/mapreduce/lib/CsvBlurDriver.java       |  41 ++++-
 .../blur/mapreduce/lib/CsvBlurDriverTest.java   |  25 ++-
 4 files changed, 176 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/183f8451/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
index 1f3ea80..ae8f602 100644
--- a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
@@ -20,6 +20,8 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.blur.thrift.BlurClient;
 import org.apache.blur.thrift.generated.Blur.Iface;
@@ -55,6 +57,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
 
 import com.google.common.base.Splitter;
@@ -66,13 +69,14 @@ public class CsvBlurDriver {
   public static final String MAPRED_COMPRESS_MAP_OUTPUT = "mapred.compress.map.output";
   public static final String MAPRED_MAP_OUTPUT_COMPRESSION_CODEC = "mapred.map.output.compression.codec";
   public static final int DEFAULT_WIDTH = 100;
-  public static final String HEADER = "The \"" +CSVLOADER +
-  		"\" command is used to load delimited into a Blur table.\nThe required options are \"-c\",
\"-t\", \"-d\". The " +
-  		"standard format for the contents of a file is:\"rowid,recordid,family,col1,col2,...\".
However there are " +
-  		"several options, such as the rowid and recordid can be generated based on the data in
the record via the " +
-  		"\"-A\" and \"-a\" options. The family can assigned based on the path via the \"-I\"
option. The column " +
-  		"name order can be mapped via the \"-d\" option. Also you can set the input " +
-  		"format to either sequence files vie the \"-S\" option or leave the default text files.";
+  public static final String HEADER = "The \""
+      + CSVLOADER
+      + "\" command is used to load delimited into a Blur table.\nThe required options are
\"-c\", \"-t\", \"-d\". The "
+      + "standard format for the contents of a file is:\"rowid,recordid,family,col1,col2,...\".
However there are "
+      + "several options, such as the rowid and recordid can be generated based on the data
in the record via the "
+      + "\"-A\" and \"-a\" options. The family can assigned based on the path via the \"-I\"
option. The column "
+      + "name order can be mapped via the \"-d\" option. Also you can set the input "
+      + "format to either sequence files vie the \"-S\" option or leave the default text
files.";
 
   enum COMPRESSION {
     SNAPPY(SnappyCodec.class), GZIP(GzipCodec.class), BZIP(BZip2Codec.class), DEFAULT(DefaultCodec.class);
@@ -95,22 +99,28 @@ public class CsvBlurDriver {
   public static void main(String... args) throws Exception {
     Configuration configuration = new Configuration();
     String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
+    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
     Job job = setupJob(configuration, new ControllerPool() {
       @Override
       public Iface getClient(String controllerConnectionStr) {
         return BlurClient.getClient(controllerConnectionStr);
       }
-    }, otherArgs);
+    }, ref, otherArgs);
     if (job == null) {
       System.exit(1);
     }
-
     boolean waitForCompletion = job.waitForCompletion(true);
+    if (waitForCompletion) {
+      Callable<Void> callable = ref.get();
+      if (callable != null) {
+        callable.call();
+      }
+    }
     System.exit(waitForCompletion ? 0 : 1);
   }
 
-  public static Job setupJob(Configuration configuration, ControllerPool controllerPool,
String... otherArgs)
-      throws Exception {
+  public static Job setupJob(Configuration configuration, ControllerPool controllerPool,
+      AtomicReference<Callable<Void>> ref, String... otherArgs) throws Exception
{
     CommandLine cmd = parse(otherArgs);
     if (cmd == null) {
       return null;
@@ -180,10 +190,10 @@ public class CsvBlurDriver {
     }
     // processing the 'I' option
     if (cmd.hasOption("I")) {
-    	if(cmd.hasOption("C")){
-    		 System.err.println("'I' and 'C' both parameters can not be used together.");
-             return null;
-    	}
+      if (cmd.hasOption("C")) {
+        System.err.println("'I' and 'C' both parameters can not be used together.");
+        return null;
+      }
       Option[] options = cmd.getOptions();
       for (Option option : options) {
         if (option.getOpt().equals("I")) {
@@ -232,6 +242,24 @@ public class CsvBlurDriver {
       int reducerMultiplier = Integer.parseInt(cmd.getOptionValue("r"));
       BlurOutputFormat.setReducerMultiplier(job, reducerMultiplier);
     }
+    final Path output;
+    if (cmd.hasOption("out")) {
+      output = new Path(cmd.getOptionValue("out"));
+    } else {
+      UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+      String userName = currentUser.getUserName();
+      output = new Path("/user/" + userName + "/.blur-" + System.currentTimeMillis());
+    }
+    BlurOutputFormat.setOutputPath(job, output);
+    if (cmd.hasOption("import")) {
+      ref.set(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          client.loadData(tableName, output.toUri().toString());
+          return null;
+        }
+      });
+    }
     return job;
   }
 
@@ -272,10 +300,20 @@ public class CsvBlurDriver {
             "The file delimiter to be used. (default value ',')  NOTE: For special "
                 + "charactors like the default hadoop separator of ASCII value 1, you can
use standard "
                 + "java escaping (\\u0001)").create("s"));
-    options.addOption(OptionBuilder.withArgName("path*").hasArg()
-        .withDescription("The directory to index, the family name is assumed to BE present
in the file contents. (hdfs://namenode/input/in1)").create("i"));
-    options.addOption(OptionBuilder.withArgName("family path*").hasArgs()
-        .withDescription("The directory to index with a family name, the family name is assumed
to NOT be present in the file contents. (family hdfs://namenode/input/in1)").create("I"));
+    options
+        .addOption(OptionBuilder
+            .withArgName("path*")
+            .hasArg()
+            .withDescription(
+                "The directory to index, the family name is assumed to BE present in the
file contents. (hdfs://namenode/input/in1)")
+            .create("i"));
+    options
+        .addOption(OptionBuilder
+            .withArgName("family path*")
+            .hasArgs()
+            .withDescription(
+                "The directory to index with a family name, the family name is assumed to
NOT be present in the file contents. (family hdfs://namenode/input/in1)")
+            .create("I"));
     options
         .addOption(OptionBuilder
             .withArgName("auto generate record ids")
@@ -352,58 +390,57 @@ public class CsvBlurDriver {
 
   public static class CsvBlurCombineSequenceFileInputFormat extends CombineFileInputFormat<Writable,
Text> {
 
-    
-    private static class SequenceFileRecordReaderWrapper extends RecordReader<Writable,
Text>{
-    	
-    	private final RecordReader<Writable,Text> delegate;
-    	private final FileSplit fileSplit;
-
-		@SuppressWarnings("unused")
-		public SequenceFileRecordReaderWrapper(CombineFileSplit split,
-            TaskAttemptContext context, Integer index) throws IOException{
-            fileSplit = new FileSplit(split.getPath(index),
-                      split.getOffset(index), split.getLength(index),
-                      split.getLocations());
-            delegate = new SequenceFileInputFormat<Writable,Text>().createRecordReader(fileSplit,
context);
-        }
+    private static class SequenceFileRecordReaderWrapper extends RecordReader<Writable,
Text> {
 
-        @Override public float getProgress() throws IOException, InterruptedException {
-            return delegate.getProgress();
-        }
+      private final RecordReader<Writable, Text> delegate;
+      private final FileSplit fileSplit;
 
-		@Override
-		public Writable getCurrentKey() throws IOException,
-				InterruptedException {
-			return delegate.getCurrentKey();
-		}
-
-		@Override
-		public Text getCurrentValue() throws IOException, InterruptedException {
-			return delegate.getCurrentValue();
-		}
-
-		@Override
-		public void initialize(InputSplit arg0, TaskAttemptContext context)
-				throws IOException, InterruptedException {
-			delegate.initialize(fileSplit, context);
-		}
-
-		@Override
-		public boolean nextKeyValue() throws IOException, InterruptedException {
-			return delegate.nextKeyValue();
-		}
-		
-		@Override public void close() throws IOException {
-            delegate.close();
-		}
+      @SuppressWarnings("unused")
+      public SequenceFileRecordReaderWrapper(CombineFileSplit split, TaskAttemptContext context,
Integer index)
+          throws IOException {
+        fileSplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index),
+            split.getLocations());
+        delegate = new SequenceFileInputFormat<Writable, Text>().createRecordReader(fileSplit,
context);
+      }
+
+      @Override
+      public float getProgress() throws IOException, InterruptedException {
+        return delegate.getProgress();
+      }
+
+      @Override
+      public Writable getCurrentKey() throws IOException, InterruptedException {
+        return delegate.getCurrentKey();
+      }
+
+      @Override
+      public Text getCurrentValue() throws IOException, InterruptedException {
+        return delegate.getCurrentValue();
+      }
+
+      @Override
+      public void initialize(InputSplit arg0, TaskAttemptContext context) throws IOException,
InterruptedException {
+        delegate.initialize(fileSplit, context);
+      }
+
+      @Override
+      public boolean nextKeyValue() throws IOException, InterruptedException {
+        return delegate.nextKeyValue();
+      }
+
+      @Override
+      public void close() throws IOException {
+        delegate.close();
+      }
 
     }
-    	
+
     @Override
-	public RecordReader<Writable, Text> createRecordReader(
-			InputSplit split, TaskAttemptContext context) throws IOException {
-		return new CombineFileRecordReader<Writable, Text>((CombineFileSplit) split, context,
SequenceFileRecordReaderWrapper.class);
-	}
+    public RecordReader<Writable, Text> createRecordReader(InputSplit split, TaskAttemptContext
context)
+        throws IOException {
+      return new CombineFileRecordReader<Writable, Text>((CombineFileSplit) split,
context,
+          SequenceFileRecordReaderWrapper.class);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/183f8451/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
index fd6f6c9..340d2b3 100644
--- a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
+++ b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
@@ -27,6 +27,8 @@ import java.lang.reflect.Proxy;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.blur.mapreduce.lib.CsvBlurDriver.ControllerPool;
 import org.apache.blur.thrift.generated.Blur.Iface;
@@ -63,7 +65,8 @@ public class CsvBlurDriverTest {
         return null;
       }
     };
-    assertNull(CsvBlurDriver.setupJob(configuration, controllerPool, new String[] {}));
+    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
+    assertNull(CsvBlurDriver.setupJob(configuration, controllerPool, ref, new String[] {}));
   }
 
   @Test
@@ -75,8 +78,10 @@ public class CsvBlurDriverTest {
         return getMockIface();
       }
     };
-    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010",
"-d", "family1", "col1",
-        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i", "file:///tmp/test2");
+    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010",
"-d", "family1",
+        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i",
+        "file:///tmp/test2");
     assertNotNull(job);
     Configuration configuration = job.getConfiguration();
     TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
@@ -96,9 +101,10 @@ public class CsvBlurDriverTest {
         return getMockIface();
       }
     };
-    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010",
"-d", "family1", "col1",
-        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i", "file:///tmp/test2",
-        "-S", "-C", "1000000", "2000000");
+    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010",
"-d", "family1",
+        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i",
+        "file:///tmp/test2", "-S", "-C", "1000000", "2000000");
     assertNotNull(job);
     Configuration configuration = job.getConfiguration();
     TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
@@ -118,9 +124,10 @@ public class CsvBlurDriverTest {
         return getMockIface();
       }
     };
-    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010",
"-d", "family1", "col1",
-        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i", "file:///tmp/test2",
-        "-S", "-C", "1000000", "2000000", "-p", "SNAPPY");
+    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010",
"-d", "family1",
+        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i",
+        "file:///tmp/test2", "-S", "-C", "1000000", "2000000", "-p", "SNAPPY");
     assertNotNull(job);
     Configuration configuration = job.getConfiguration();
     TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
@@ -143,9 +150,10 @@ public class CsvBlurDriverTest {
       }
     };
     int multiplierParam = 10;
-    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010",
"-d", "family1", "col1",
-        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i", "file:///tmp/test2",
-        "-S", "-C", "1000000", "2000000", "-p", "SNAPPY", "-r", Integer.toString(multiplierParam));
+    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010",
"-d", "family1",
+        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i",
+        "file:///tmp/test2", "-S", "-C", "1000000", "2000000", "-p", "SNAPPY", "-r", Integer.toString(multiplierParam));
     assertNotNull(job);
 
     assertEquals(multiplierParam * shardCount, job.getNumReduceTasks());

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/183f8451/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
index db5ece9..83a6aa6 100644
--- a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
@@ -20,6 +20,8 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
@@ -57,6 +59,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
 
 import com.google.common.base.Splitter;
@@ -100,22 +103,28 @@ public class CsvBlurDriver {
   public static void main(String... args) throws Exception {
     Configuration configuration = new Configuration();
     String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
+    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
     Job job = setupJob(configuration, new ControllerPool() {
       @Override
       public Iface getClient(String controllerConnectionStr) {
         return BlurClient.getClient(controllerConnectionStr);
       }
-    }, otherArgs);
+    }, ref, otherArgs);
     if (job == null) {
       System.exit(1);
     }
-
     boolean waitForCompletion = job.waitForCompletion(true);
+    if (waitForCompletion) {
+      Callable<Void> callable = ref.get();
+      if (callable != null) {
+        callable.call();
+      }
+    }
     System.exit(waitForCompletion ? 0 : 1);
   }
 
-  public static Job setupJob(Configuration configuration, ControllerPool controllerPool,
String... otherArgs)
-      throws Exception {
+  public static Job setupJob(Configuration configuration, ControllerPool controllerPool,
+      AtomicReference<Callable<Void>> ref, String... otherArgs) throws Exception
{
     CommandLine cmd = parse(otherArgs);
     if (cmd == null) {
       return null;
@@ -237,6 +246,24 @@ public class CsvBlurDriver {
       int reducerMultiplier = Integer.parseInt(cmd.getOptionValue("r"));
       BlurOutputFormat.setReducerMultiplier(job, reducerMultiplier);
     }
+    final Path output;
+    if (cmd.hasOption("out")) {
+      output = new Path(cmd.getOptionValue("out"));
+    } else {
+      UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+      String userName = currentUser.getUserName();
+      output = new Path("/user/" + userName + "/.blur-" + System.currentTimeMillis());
+    }
+    BlurOutputFormat.setOutputPath(job, output);
+    if (cmd.hasOption("import")) {
+      ref.set(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          client.loadData(tableName, output.toUri().toString());
+          return null;
+        }
+      });
+    }
     return job;
   }
 
@@ -344,6 +371,12 @@ public class CsvBlurDriver {
         .withDescription(
             "Sets the compression codec for the map compress output setting. (SNAPPY,GZIP,BZIP,DEFAULT,
or classname)")
         .create("p"));
+    options.addOption(OptionBuilder.withArgName("path").hasArg()
+        .withDescription("Sets the output directory for the map reduce job before the indexes
are loaded into Blur.")
+        .create("out"));
+    options.addOption(OptionBuilder.withArgName("path").hasArg()
+        .withDescription("Imports the data into Blur after the map reduce job completes.")
+        .create("import"));
 
     CommandLineParser parser = new PosixParser();
     CommandLine cmd = null;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/183f8451/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
index ec3239e..89458f8 100644
--- a/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
+++ b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
@@ -26,6 +26,8 @@ import java.lang.reflect.Proxy;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.blur.mapreduce.lib.CsvBlurDriver.ControllerPool;
 import org.apache.blur.thrift.generated.Blur.Iface;
@@ -49,7 +51,8 @@ public class CsvBlurDriverTest {
         return null;
       }
     };
-    assertNull(CsvBlurDriver.setupJob(configuration, controllerPool, new String[] {}));
+    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
+    assertNull(CsvBlurDriver.setupJob(configuration, controllerPool, ref, new String[] {}));
   }
 
   @Test
@@ -61,8 +64,10 @@ public class CsvBlurDriverTest {
         return getMockIface();
       }
     };
-    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010",
"-d", "family1", "col1",
-        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i", "file:///tmp/test2");
+    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010",
"-d", "family1",
+        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i",
+        "file:///tmp/test2");
     assertNotNull(job);
     Configuration configuration = job.getConfiguration();
     TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
@@ -82,9 +87,10 @@ public class CsvBlurDriverTest {
         return getMockIface();
       }
     };
-    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010",
"-d", "family1", "col1",
-        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i", "file:///tmp/test2",
-        "-S", "-C", "1000000", "2000000");
+    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010",
"-d", "family1",
+        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i",
+        "file:///tmp/test2", "-S", "-C", "1000000", "2000000");
     assertNotNull(job);
     Configuration configuration = job.getConfiguration();
     TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
@@ -104,9 +110,10 @@ public class CsvBlurDriverTest {
         return getMockIface();
       }
     };
-    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010",
"-d", "family1", "col1",
-        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i", "file:///tmp/test2",
-        "-S", "-C", "1000000", "2000000", "-p", "SNAPPY");
+    AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010",
"-d", "family1",
+        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i",
+        "file:///tmp/test2", "-S", "-C", "1000000", "2000000", "-p", "SNAPPY");
     assertNotNull(job);
     Configuration configuration = job.getConfiguration();
     TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);


Mime
View raw message