incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/4] git commit: Extracting the spark stream processor to allow for generic usage.
Date Mon, 12 Jan 2015 04:21:52 GMT
Extracting the spark stream processor to allow for generic usage.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/2db2ee80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/2db2ee80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/2db2ee80

Branch: refs/heads/master
Commit: 2db2ee802e4272a2b3c3888008432e5eb19ac82a
Parents: fdabc5e
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sun Jan 11 23:15:06 2015 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun Jan 11 23:15:06 2015 -0500

----------------------------------------------------------------------
 contrib/blur-spark/pom.xml                      | 220 +++++++--------
 .../blur/spark/BlurBulkLoadSparkProcessor.java  |  59 ++++
 .../blur/spark/BlurLoadSparkProcessor.java      | 166 +++++++++++
 .../spark/BlurMRBulkLoadSparkProcessor.java     | 145 ++++++++++
 .../apache/blur/spark/BlurSparkPartitioner.java |  37 ++-
 .../java/org/apache/blur/spark/Consumer.java    | 280 -------------------
 .../org/apache/blur/spark/ConsumerEnqueue.java  | 240 ----------------
 .../spark/example/SimpleSparkLoaderExample.java | 143 ++++++++++
 .../apache/blur/spark/util/JavaSparkUtil.java   | 137 +++++++++
 9 files changed, 769 insertions(+), 658 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2db2ee80/contrib/blur-spark/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/blur-spark/pom.xml b/contrib/blur-spark/pom.xml
index 47e5904..83f845c 100644
--- a/contrib/blur-spark/pom.xml
+++ b/contrib/blur-spark/pom.xml
@@ -1,22 +1,14 @@
 <?xml version="1.0" encoding="UTF-8" ?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	you under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
@@ -24,7 +16,7 @@ under the License.
 		<groupId>org.apache.blur</groupId>
 		<artifactId>blur</artifactId>
 		<version>0.2.4-incubating-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
+		<relativePath>../../pom.xml</relativePath>
 	</parent>
 	<groupId>org.apache.blur</groupId>
 	<artifactId>blur-spark-connector</artifactId>
@@ -108,59 +100,9 @@ under the License.
 
 	<dependencies>
 		<dependency>
-			<groupId>org.apache.blur</groupId>
-			<artifactId>blur-thrift</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.blur</groupId>
-			<artifactId>blur-util</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.blur</groupId>
-			<artifactId>blur-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.blur</groupId>
-			<artifactId>blur-store</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.blur</groupId>
-			<artifactId>blur-query</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.blur</groupId>
-			<artifactId>blur-mapred-hadoop1</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.lucene</groupId>
-			<artifactId>lucene-core</artifactId>
-			<version>${lucene.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.lucene</groupId>
-			<artifactId>lucene-spatial</artifactId>
-			<version>${lucene.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.lucene</groupId>
-			<artifactId>lucene-codecs</artifactId>
-			<version>${lucene.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.lucene</groupId>
-			<artifactId>lucene-analyzers-common</artifactId>
-			<version>${lucene.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.lucene</groupId>
-			<artifactId>lucene-queries</artifactId>
-			<version>${lucene.version}</version>
+			<groupId>org.apache.spark</groupId>
+			<artifactId>spark-streaming_2.10</artifactId>
+			<version>1.2.0</version>
 		</dependency>
 		<dependency>
 			<groupId>junit</groupId>
@@ -168,44 +110,8 @@ under the License.
 			<version>${junit.version}</version>
 			<scope>test</scope>
 		</dependency>
-		<!-- dependency>
-			<groupId>commons-codec</groupId>
-			<artifactId>commons-codec</artifactId>
-			<version>1.3</version>
-		</dependency>
-		<dependency>
-			<groupId>commons-logging</groupId>
-			<artifactId>commons-logging</artifactId>
-			<version>1.1.3</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-core</artifactId>
-			<version>1.2.1</version>
-		</dependency>
-		<dependency>
-			<groupId>org.slf4j</groupId>
-			<artifactId>slf4j-log4j12</artifactId>
-			<version>1.7.4</version>
-		</dependency -->
-		
-		<dependency>
-	<groupId>org.apache.spark</groupId>
-	<artifactId>spark-core_2.10</artifactId>
-	<version>1.1.0</version>
-</dependency>
-<dependency>
-	<groupId>org.apache.spark</groupId>
-	<artifactId>spark-streaming_2.10</artifactId>
-	<version>1.1.0</version>
-</dependency>
-		<!-- dependency>
-			<groupId>kafka.spark.consumer</groupId>
-			<artifactId>kafka-spark-consumer</artifactId>
-			<version>0.0.1-SNAPSHOT</version>
-		</dependency -->
 	</dependencies>
-	
+
 	<profiles>
 		<profile>
 			<id>hadoop1</id>
@@ -214,9 +120,35 @@ under the License.
 					<name>hadoop1</name>
 				</property>
 			</activation>
-        <properties>
-            <projectVersion>${project.parent.version}-hadoop1</projectVersion>
-        </properties>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-core</artifactId>
+					<version>${hadoop.version}</version>
+					<scope>provided</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.blur</groupId>
+					<artifactId>blur-mapred-hadoop1</artifactId>
+					<version>${project.version}</version>
+					<exclusions>
+						<exclusion>
+							<groupId>org.apache.blur</groupId>
+							<artifactId>blur-status</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.eclipse.jetty</groupId>
+							<artifactId>jetty-webapp</artifactId>
+						</exclusion>
+					</exclusions>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-test</artifactId>
+					<version>${hadoop.version}</version>
+					<scope>test</scope>
+				</dependency>
+			</dependencies>
 		</profile>
 		<profile>
 			<id>hadoop2-mr1</id>
@@ -225,9 +157,35 @@ under the License.
 					<name>hadoop2-mr1</name>
 				</property>
 			</activation>
-        <properties>
-            <projectVersion>${project.parent.version}-hadoop2-mr1</projectVersion>
-        </properties>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-client</artifactId>
+					<version>${hadoop.version}</version>
+					<scope>provided</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-minicluster</artifactId>
+					<version>${hadoop.version}</version>
+					<scope>test</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.blur</groupId>
+					<artifactId>blur-mapred-hadoop1</artifactId>
+					<version>${project.version}</version>
+					<exclusions>
+						<exclusion>
+							<groupId>org.apache.blur</groupId>
+							<artifactId>blur-status</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.eclipse.jetty</groupId>
+							<artifactId>jetty-webapp</artifactId>
+						</exclusion>
+					</exclusions>
+				</dependency>
+			</dependencies>
 		</profile>
 		<profile>
 			<id>hadoop2</id>
@@ -236,9 +194,35 @@ under the License.
 					<name>hadoop2</name>
 				</property>
 			</activation>
-        <properties>
-            <projectVersion>${project.parent.version}-hadoop2</projectVersion>
-        </properties>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-client</artifactId>
+					<version>${hadoop.version}</version>
+					<scope>provided</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-minicluster</artifactId>
+					<version>${hadoop.version}</version>
+					<scope>test</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.blur</groupId>
+					<artifactId>blur-mapred-hadoop2</artifactId>
+					<version>${project.version}</version>
+					<exclusions>
+						<exclusion>
+							<groupId>org.apache.blur</groupId>
+							<artifactId>blur-status</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.eclipse.jetty</groupId>
+							<artifactId>jetty-webapp</artifactId>
+						</exclusion>
+					</exclusions>
+				</dependency>
+			</dependencies>
 		</profile>
 	</profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2db2ee80/contrib/blur-spark/src/main/java/org/apache/blur/spark/BlurBulkLoadSparkProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/blur-spark/src/main/java/org/apache/blur/spark/BlurBulkLoadSparkProcessor.java b/contrib/blur-spark/src/main/java/org/apache/blur/spark/BlurBulkLoadSparkProcessor.java
new file mode 100644
index 0000000..b89f981
--- /dev/null
+++ b/contrib/blur-spark/src/main/java/org/apache/blur/spark/BlurBulkLoadSparkProcessor.java
@@ -0,0 +1,59 @@
+package org.apache.blur.spark;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.streaming.Time;
+
+import scala.Tuple2;
+
+@SuppressWarnings("serial")
+public abstract class BlurBulkLoadSparkProcessor<T> extends BlurLoadSparkProcessor<T> {
+  
+  private static final Log LOG = LogFactory.getLog(BlurBulkLoadSparkProcessor.class);
+
+  @Override
+  protected Function2<JavaPairRDD<String, RowMutation>, Time, Void> getFunction() {
+    return new Function2<JavaPairRDD<String, RowMutation>, Time, Void>() {
+      // Blur Thrift Client
+      @Override
+      public Void call(JavaPairRDD<String, RowMutation> rdd, Time time) throws Exception {
+        Iface client = getBlurClient();
+        for (Tuple2<String, RowMutation> tuple : rdd.collect()) {
+          if (tuple != null) {
+            try {
+              RowMutation rm = tuple._2;
+              // Index using enqueue mutate call
+              client.enqueueMutate(rm);
+            } catch (Exception ex) {
+              LOG.error("Unknown error while trying to call enqueueMutate.", ex);
+              throw ex;
+            }
+          }
+        }
+        return null;
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2db2ee80/contrib/blur-spark/src/main/java/org/apache/blur/spark/BlurLoadSparkProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/blur-spark/src/main/java/org/apache/blur/spark/BlurLoadSparkProcessor.java b/contrib/blur-spark/src/main/java/org/apache/blur/spark/BlurLoadSparkProcessor.java
new file mode 100644
index 0000000..daa856c
--- /dev/null
+++ b/contrib/blur-spark/src/main/java/org/apache/blur/spark/BlurLoadSparkProcessor.java
@@ -0,0 +1,166 @@
+package org.apache.blur.spark;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.blur.mapreduce.lib.BlurMutate;
+import org.apache.blur.spark.util.JavaSparkUtil;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Time;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+import scala.Tuple2;
+
+@SuppressWarnings("serial")
+public abstract class BlurLoadSparkProcessor<T> implements Serializable {
+
+  protected static final String MAPRED_OUTPUT_COMMITTER_CLASS = "mapred.output.committer.class";
+  protected static final String MAPREDUCE_PARTITIONER_CLASS = "mapreduce.partitioner.class";
+  protected static final String SPARK_STREAMING_BLOCK_INTERVAL = "spark.streaming.blockInterval";
+  protected static final String SPARK_EXECUTOR_EXTRA_CLASS_PATH = "spark.executor.extraClassPath";
+  protected static final String ORG_APACHE_SPARK_SERIALIZER_KRYO_SERIALIZER = "org.apache.spark.serializer.KryoSerializer";
+  protected static final String SPARK_SERIALIZER = "spark.serializer";
+
+  public void run() throws IOException {
+    SparkConf conf = new SparkConf();
+    conf.setAppName(getAppName());
+    conf.set(SPARK_SERIALIZER, ORG_APACHE_SPARK_SERIALIZER_KRYO_SERIALIZER);
+    JavaSparkUtil.packProjectJars(conf);
+    setupSparkConf(conf);
+
+    JavaStreamingContext ssc = new JavaStreamingContext(conf, getDuration());
+    List<JavaDStream<T>> streamsList = getStreamsList(ssc);
+
+    // Union all the streams if there is more than 1 stream
+    JavaDStream<T> streams = unionStreams(ssc, streamsList);
+
+    JavaPairDStream<String, RowMutation> pairDStream = streams.mapToPair(new PairFunction<T, String, RowMutation>() {
+      public Tuple2<String, RowMutation> call(T t) {
+        RowMutation rowMutation = convert(t);
+        return new Tuple2<String, RowMutation>(rowMutation.getRowId(), rowMutation);
+      }
+    });
+
+    pairDStream.foreachRDD(getFunction());
+
+    ssc.start();
+    ssc.awaitTermination();
+  }
+
+  protected abstract Function2<JavaPairRDD<String, RowMutation>, Time, Void> getFunction();
+
+  private JavaDStream<T> unionStreams(JavaStreamingContext ssc, List<JavaDStream<T>> streamsList) {
+    JavaDStream<T> unionStreams;
+    if (streamsList.size() > 1) {
+      unionStreams = ssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size()));
+    } else {
+      // Otherwise, just use the 1 stream
+      unionStreams = streamsList.get(0);
+    }
+    return unionStreams;
+  }
+
+  protected abstract String getOutputPath();
+
+  /**
+   * Gets the storage level for the spark job, default of MEMORY_ONLY_2.
+   * 
+   * @return
+   */
+  protected StorageLevel getStorageLevel() {
+    return StorageLevel.MEMORY_AND_DISK();
+  }
+
+  /**
+   * Called just before spark job is executed.
+   * 
+   * @param configuration
+   */
+  protected void setupBlurHadoopConfig(Configuration configuration) {
+
+  }
+
+  /**
+   * Add custom spark information.
+   * 
+   * @param conf
+   */
+  protected void setupSparkConf(SparkConf conf) {
+
+  }
+
+  /**
+   * Gets the duration for the batch, default of 10 seconds.
+   * 
+   * @return
+   */
+  protected Duration getDuration() {
+    return new Duration(10000);
+  }
+
+  /**
+   * Gets the blur table name to load.
+   * 
+   * @return
+   */
+  protected abstract String getBlurTableName();
+
+  /**
+   * Gets the blur client for the table.
+   * 
+   * @return
+   */
+  protected abstract Iface getBlurClient();
+
+  /**
+   * Gets the spark application name.
+   * 
+   * @return
+   */
+  protected abstract String getAppName();
+
+  /**
+   * Gets the list of streams to load into Blur.
+   * 
+   * @param ssc
+   * 
+   * @return
+   */
+  protected abstract List<JavaDStream<T>> getStreamsList(JavaStreamingContext ssc);
+
+  /**
+   * Converts the data into a {@link BlurMutate} object.
+   * 
+   * @param t
+   * @return
+   */
+  protected abstract RowMutation convert(T t);
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2db2ee80/contrib/blur-spark/src/main/java/org/apache/blur/spark/BlurMRBulkLoadSparkProcessor.java
----------------------------------------------------------------------
diff --git a/contrib/blur-spark/src/main/java/org/apache/blur/spark/BlurMRBulkLoadSparkProcessor.java b/contrib/blur-spark/src/main/java/org/apache/blur/spark/BlurMRBulkLoadSparkProcessor.java
new file mode 100644
index 0000000..124b000
--- /dev/null
+++ b/contrib/blur-spark/src/main/java/org/apache/blur/spark/BlurMRBulkLoadSparkProcessor.java
@@ -0,0 +1,145 @@
+package org.apache.blur.spark;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.blur.manager.BlurPartitioner;
+import org.apache.blur.mapreduce.lib.BlurColumn;
+import org.apache.blur.mapreduce.lib.BlurMutate;
+import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
+import org.apache.blur.mapreduce.lib.BlurOutputCommitter;
+import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.RecordMutation;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.streaming.Time;
+
+import scala.Tuple2;
+
+@SuppressWarnings("serial")
+public abstract class BlurMRBulkLoadSparkProcessor<T> extends BlurLoadSparkProcessor<T> {
+
+  @Override
+  protected Function2<JavaPairRDD<String, RowMutation>, Time, Void> getFunction() {
+    return new Function2<JavaPairRDD<String, RowMutation>, Time, Void>() {
+      @Override
+      public Void call(JavaPairRDD<String, RowMutation> rdd, Time time) throws Exception {
+
+        // Blur Table Details
+        Iface client = getBlurClient();
+        TableDescriptor tableDescriptor = client.describe(getBlurTableName());
+        Configuration conf = new Configuration();
+        // Blur specific Configuration
+        conf.setClass(MAPREDUCE_PARTITIONER_CLASS, BlurPartitioner.class, Partitioner.class);
+        conf.set(MAPRED_OUTPUT_COMMITTER_CLASS, BlurOutputCommitter.class.getName());
+
+        // Partition RDD to match Blur Table Shard Count. Used Custom
+        // Partitioner to channel correct BlurMutate to correct Shard.
+        BlurSparkPartitioner blurSparkPartitioner = new BlurSparkPartitioner(tableDescriptor.getShardCount());
+        JavaPairRDD<Text, BlurMutate> flatMapToPair = rdd
+            .flatMapToPair(new PairFlatMapFunction<Tuple2<String, RowMutation>, Text, BlurMutate>() {
+              @Override
+              public Iterable<Tuple2<Text, BlurMutate>> call(Tuple2<String, RowMutation> tuple2) throws Exception {
+                RowMutation rowMutation = tuple2._2;
+                final List<BlurMutate> result = new ArrayList<BlurMutate>();
+                List<RecordMutation> recordMutations = rowMutation.getRecordMutations();
+                String rowId = rowMutation.getRowId();
+                for (RecordMutation recordMutation : recordMutations) {
+                  Record record = recordMutation.getRecord();
+                  String family = record.getFamily();
+                  String recordId = record.getRecordId();
+                  List<BlurColumn> columns = toColumns(record.getColumns());
+
+                  BlurRecord blurRecord = new BlurRecord();
+                  blurRecord.setRowId(rowId);
+                  blurRecord.setFamily(family);
+                  blurRecord.setRecordId(recordId);
+                  blurRecord.setColumns(columns);
+                  result.add(new BlurMutate(MUTATE_TYPE.REPLACE, blurRecord));
+                }
+                return new Iterable<Tuple2<Text, BlurMutate>>() {
+                  @Override
+                  public Iterator<Tuple2<Text, BlurMutate>> iterator() {
+                    final Iterator<BlurMutate> iterator = result.iterator();
+                    return new Iterator<Tuple2<Text, BlurMutate>>() {
+
+                      @Override
+                      public boolean hasNext() {
+                        return iterator.hasNext();
+                      }
+
+                      @Override
+                      public Tuple2<Text, BlurMutate> next() {
+                        BlurMutate blurMutate = iterator.next();
+                        return new Tuple2<Text, BlurMutate>(new Text(blurMutate.getRecord().getRowId()), blurMutate);
+                      }
+
+                      @Override
+                      public void remove() {
+
+                      }
+                    };
+                  }
+                };
+              }
+
+              private List<BlurColumn> toColumns(List<Column> columns) {
+                List<BlurColumn> cols = new ArrayList<BlurColumn>();
+                for (Column column : columns) {
+                  cols.add(new BlurColumn(column.getName(), column.getValue()));
+                }
+                return cols;
+              }
+            });
+
+        final JavaPairRDD<Text, BlurMutate> pRdd = flatMapToPair.partitionBy(blurSparkPartitioner).persist(
+            getStorageLevel());
+        Job job = new Job(conf);
+        BlurOutputFormat.setupJob(job, tableDescriptor);
+        Path path = new Path(getOutputPath());
+        FileSystem fileSystem = path.getFileSystem(conf);
+        Path qualified = fileSystem.makeQualified(path);
+        BlurOutputFormat.setOutputPath(job, qualified);
+        setupBlurHadoopConfig(job.getConfiguration());
+        // Write the RDD to Blur Table
+        if (pRdd.count() > 0) {
+          pRdd.saveAsNewAPIHadoopFile(tableDescriptor.getTableUri(), Text.class, BlurMutate.class,
+              BlurOutputFormat.class, job.getConfiguration());
+          client.loadData(getBlurTableName(), qualified.toString());
+        }
+        return null;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2db2ee80/contrib/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkPartitioner.java
----------------------------------------------------------------------
diff --git a/contrib/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkPartitioner.java b/contrib/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkPartitioner.java
index 728ad36..1c5c425 100644
--- a/contrib/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkPartitioner.java
+++ b/contrib/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkPartitioner.java
@@ -22,24 +22,21 @@ import org.apache.spark.HashPartitioner;
 
 public class BlurSparkPartitioner extends HashPartitioner {
 
-	private static final long serialVersionUID = 9853263327838L;
-
-	int totalShard;
-
-	public BlurSparkPartitioner(int partitions) {
-
-		super(partitions);
-		totalShard = partitions;
-	}
-
-	@Override
-	public int getPartition(Object key) {
-
-		if (key instanceof Text) {
-
-			return (key.hashCode() & Integer.MAX_VALUE) % totalShard;
-		} else {
-			return super.getPartition(key);
-		}
-	}
+  private static final long serialVersionUID = 9853263327838L;
+
+  private final int _totalShard;
+
+  public BlurSparkPartitioner(int partitions) {
+    super(partitions);
+    _totalShard = partitions;
+  }
+
+  @Override
+  public int getPartition(Object key) {
+    if (key instanceof Text) {
+      return (key.hashCode() & Integer.MAX_VALUE) % _totalShard;
+    } else {
+      return super.getPartition(key);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2db2ee80/contrib/blur-spark/src/main/java/org/apache/blur/spark/Consumer.java
----------------------------------------------------------------------
diff --git a/contrib/blur-spark/src/main/java/org/apache/blur/spark/Consumer.java b/contrib/blur-spark/src/main/java/org/apache/blur/spark/Consumer.java
deleted file mode 100644
index 962faa4..0000000
--- a/contrib/blur-spark/src/main/java/org/apache/blur/spark/Consumer.java
+++ /dev/null
@@ -1,280 +0,0 @@
-package org.apache.blur.spark;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.blur.manager.BlurPartitioner;
-import org.apache.blur.mapreduce.lib.BlurMapReduceUtil;
-import org.apache.blur.mapreduce.lib.BlurMutate;
-import org.apache.blur.mapreduce.lib.BlurOutputCommitter;
-import org.apache.blur.mapreduce.lib.BlurOutputFormat;
-import org.apache.blur.mapreduce.lib.DefaultBlurReducer;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.Time;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-import scala.Tuple2;
-import consumer.kafka.KafkaConfig;
-import consumer.kafka.MessageAndMetadata;
-import consumer.kafka.client.KafkaReceiver;
-
-/*
- * This Consumer uses Spark RDD saveAsNewAPIHadoopFile API to index BlurMutate
- */
-
-public class Consumer implements Serializable {
-
-	private static final long serialVersionUID = 4332618245650072140L;
-	private Properties _props;
-	private KafkaConfig _kafkaConfig;
-
-	public void start() throws InstantiationException, IllegalAccessException,
-			ClassNotFoundException {
-
-		_kafkaConfig = new KafkaConfig(_props);
-		run();
-	}
-
-	private void init(String[] args) throws Exception {
-
-		Options options = new Options();
-		this._props = new Properties();
-
-		options.addOption("p", true, "properties filename from the classpath");
-		options.addOption("P", true, "external properties filename");
-
-		OptionBuilder.withArgName("property=value");
-		OptionBuilder.hasArgs(2);
-		OptionBuilder.withValueSeparator();
-		OptionBuilder.withDescription("use value for given property");
-		options.addOption(OptionBuilder.create("D"));
-
-		CommandLineParser parser = new PosixParser();
-		CommandLine cmd = parser.parse(options, args);
-		if (cmd.hasOption('p')) {
-			this._props.load(ClassLoader.getSystemClassLoader()
-					.getResourceAsStream(cmd.getOptionValue('p')));
-		}
-		if (cmd.hasOption('P')) {
-			File file = new File(cmd.getOptionValue('P'));
-			FileInputStream fStream = new FileInputStream(file);
-			this._props.load(fStream);
-		}
-		this._props.putAll(cmd.getOptionProperties("D"));
-
-	}
-
-	private void run() {
-
-		String checkpointDirectory = "hdfs://10.252.5.113:9000/user/hadoop/spark";
-
-		// number of partition for Kafka Topic
-
-		int _partitionCount = 5;
-
-		List<JavaDStream<MessageAndMetadata>> streamsList = new ArrayList<JavaDStream<MessageAndMetadata>>(
-				_partitionCount);
-		JavaDStream<MessageAndMetadata> unionStreams;
-
-		SparkConf conf = new SparkConf().setAppName("KafkaReceiver").set(
-				"spark.streaming.blockInterval", "200");
-
-		// Path to Blur Libraries . Can be copied to each Node of Spark Cluster.
-
-		conf.set("spark.executor.extraClassPath",
-				"/home/apache-blur-0.2.4/lib/*");
-
-		// Used KryoSerializer for BlurMutate and Text.
-		conf.set("spark.serializer",
-				"org.apache.spark.serializer.KryoSerializer");
-
-		JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(
-				3000));
-
-		/*
-		 * Receive Kafka Stream. Create individual Receivers for each Topic
-		 * Partition
-		 */
-
-		for (int i = 0; i < _partitionCount; i++) {
-
-			streamsList.add(ssc.receiverStream(new KafkaReceiver(_props, i)));
-
-		}
-
-		/*
-		 * Union all the streams if there is more than 1 stream
-		 */
-
-		if (streamsList.size() > 1) {
-			unionStreams = ssc.union(streamsList.get(0),
-					streamsList.subList(1, streamsList.size()));
-		} else {
-			// Otherwise, just use the 1 stream
-			unionStreams = streamsList.get(0);
-		}
-
-		/*
-		 * Generate JavaPairDStream
-		 */
-
-		JavaPairDStream<Text, BlurMutate> pairDStream = unionStreams
-				.mapToPair(new PairFunction<MessageAndMetadata, Text, BlurMutate>() {
-
-					private static final long serialVersionUID = 443235214978L;
-
-					public Tuple2<Text, BlurMutate> call(
-							MessageAndMetadata mmeta) {
-
-						/*
-						 * create the BlurMutate from MessageAndMetadata
-						 */
-
-						String message = new String(mmeta.getPayload());
-						String keyStr = DigestUtils.shaHex(message);
-						Text key = new Text((keyStr).getBytes());
-						BlurMutate mutate = new BlurMutate(
-								BlurMutate.MUTATE_TYPE.REPLACE, keyStr, keyStr,
-								"family");
-						mutate.addColumn("message", message);
-
-						return new Tuple2<Text, BlurMutate>(key, mutate);
-					}
-				});
-
-		pairDStream
-				.foreachRDD(new Function2<JavaPairRDD<Text, BlurMutate>, Time, Void>() {
-
-					private static final long serialVersionUID = 88875777435L;
-
-					@Override
-					public Void call(JavaPairRDD<Text, BlurMutate> rdd,
-							Time time) throws Exception {
-
-						/*
-						 * Blur Table Details
-						 */
-						TableDescriptor tableDescriptor = new TableDescriptor();
-						String tableUri = new Path(
-								"hdfs://10.252.5.113:9000/blur/tables/nrt")
-								.toString();
-						tableDescriptor.tableUri = tableUri;
-						tableDescriptor.cluster = "pearson";
-						tableDescriptor.name = "nrt";
-						tableDescriptor.shardCount = 9;
-						Configuration conf = new Configuration();
-
-						/*
-						 * Partition RDD to match Blur Table Shard Count. Used
-						 * Custom Partitioner to channel correct BlurMutate to
-						 * correct Shard.
-						 */
-
-						final JavaPairRDD<Text, BlurMutate> pRdd = rdd
-								.partitionBy(
-										new BlurSparkPartitioner(
-												tableDescriptor.shardCount))
-								.persist(StorageLevel.MEMORY_ONLY_2());
-
-						/*
-						 * Blur specific Configuration
-						 */
-
-						BlurOutputFormat.setIndexLocally(conf, false);
-						BlurOutputFormat.setOptimizeInFlight(conf, false);
-						conf.setClass("mapreduce.reduce.class",
-								DefaultBlurReducer.class, Reducer.class);
-						conf.setClass("mapreduce.outputformat.class",
-								BlurOutputFormat.class, OutputFormat.class);
-						conf.setClass("mapreduce.partitioner.class",
-								BlurPartitioner.class, Partitioner.class);
-						conf.set("mapred.output.committer.class",
-								BlurOutputCommitter.class.getName());
-						conf.setInt("blur.output.max.document.buffer.size",
-								10000);
-
-						BlurOutputFormat.setTableDescriptor(conf,
-								tableDescriptor);
-
-						JobConf jobConf = new JobConf(conf);
-
-						jobConf.setNumReduceTasks(tableDescriptor.shardCount);
-						jobConf.setOutputKeyClass(Text.class);
-						jobConf.setOutputValueClass(BlurMutate.class);
-
-						BlurMapReduceUtil.addAllJarsInBlurLib(conf);
-						BlurMapReduceUtil
-								.addDependencyJars(
-										conf,
-										org.apache.zookeeper.ZooKeeper.class,
-										org.apache.lucene.codecs.lucene42.Lucene42Codec.class,
-										jobConf.getOutputKeyClass(),
-										jobConf.getOutputValueClass());
-
-						/*
-						 * Write the RDD to Blur Table
-						 */
-
-						if (pRdd.count() > 0)
-							pRdd.saveAsNewAPIHadoopFile(tableUri, Text.class,
-									BlurMutate.class, BlurOutputFormat.class,
-									jobConf);
-
-						return null;
-					}
-				});
-
-		// ssc.checkpoint(checkpointDirectory);
-		ssc.start();
-		ssc.awaitTermination();
-	}
-
-	public static void main(String[] args) throws Exception {
-
-		Consumer consumer = new Consumer();
-		consumer.init(args);
-		consumer.start();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2db2ee80/contrib/blur-spark/src/main/java/org/apache/blur/spark/ConsumerEnqueue.java
----------------------------------------------------------------------
diff --git a/contrib/blur-spark/src/main/java/org/apache/blur/spark/ConsumerEnqueue.java b/contrib/blur-spark/src/main/java/org/apache/blur/spark/ConsumerEnqueue.java
deleted file mode 100644
index 25f2d58..0000000
--- a/contrib/blur-spark/src/main/java/org/apache/blur/spark/ConsumerEnqueue.java
+++ /dev/null
@@ -1,240 +0,0 @@
-package org.apache.blur.spark;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.blur.thrift.BlurClient;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.Column;
-import org.apache.blur.thrift.generated.Record;
-import org.apache.blur.thrift.generated.RecordMutation;
-import org.apache.blur.thrift.generated.RecordMutationType;
-import org.apache.blur.thrift.generated.RowMutation;
-import org.apache.blur.thrift.generated.RowMutationType;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.Time;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-import scala.Tuple2;
-import consumer.kafka.KafkaConfig;
-import consumer.kafka.MessageAndMetadata;
-import consumer.kafka.client.KafkaReceiver;
-
-
-/*
- * This Consumer uses Blur Thrift Client enqueue mutate call to index Rowmutation
- */
-public class ConsumerEnqueue implements Serializable {
-
-	private static final long serialVersionUID = 4332618245650072140L;
-	private Properties _props;
-	private KafkaConfig _kafkaConfig;
-
-	public void start() throws InstantiationException, IllegalAccessException,
-			ClassNotFoundException {
-
-		_kafkaConfig = new KafkaConfig(_props);
-		run();
-	}
-
-	private void init(String[] args) throws Exception {
-
-		Options options = new Options();
-		this._props = new Properties();
-
-		options.addOption("p", true, "properties filename from the classpath");
-		options.addOption("P", true, "external properties filename");
-
-		OptionBuilder.withArgName("property=value");
-		OptionBuilder.hasArgs(2);
-		OptionBuilder.withValueSeparator();
-		OptionBuilder.withDescription("use value for given property");
-		options.addOption(OptionBuilder.create("D"));
-
-		CommandLineParser parser = new PosixParser();
-		CommandLine cmd = parser.parse(options, args);
-		if (cmd.hasOption('p')) {
-			this._props.load(ClassLoader.getSystemClassLoader()
-					.getResourceAsStream(cmd.getOptionValue('p')));
-		}
-		if (cmd.hasOption('P')) {
-			File file = new File(cmd.getOptionValue('P'));
-			FileInputStream fStream = new FileInputStream(file);
-			this._props.load(fStream);
-		}
-		this._props.putAll(cmd.getOptionProperties("D"));
-
-	}
-
-	private void run() {
-
-		String checkpointDirectory = "hdfs://10.252.5.113:9000/user/hadoop/spark";
-
-		// number of partition for Kafka Topic
-
-		int _partitionCount = 5;
-
-		List<JavaDStream<MessageAndMetadata>> streamsList = new ArrayList<JavaDStream<MessageAndMetadata>>(
-				_partitionCount);
-		JavaDStream<MessageAndMetadata> unionStreams;
-
-		SparkConf conf = new SparkConf().setAppName("KafkaReceiver").set(
-				"spark.streaming.blockInterval", "200");
-
-		// Path to Blur Libraries . Can be copied to each Node of Spark Cluster.
-
-		conf.set("spark.executor.extraClassPath",
-				"/home/apache-blur-0.2.4/lib/*");
-
-		// Used KryoSerializer for BlurMutate and Text.
-		conf.set("spark.serializer",
-				"org.apache.spark.serializer.KryoSerializer");
-
-		JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(
-				3000));
-
-		/*
-		 * Receive Kafka Stream. Create individual Receivers for each Topic
-		 * Partition
-		 */
-
-		for (int i = 0; i < _partitionCount; i++) {
-
-			streamsList.add(ssc.receiverStream(new KafkaReceiver(_props, i)));
-
-		}
-
-		/*
-		 * Union all the streams if there is more than 1 stream
-		 */
-
-		if (streamsList.size() > 1) {
-			unionStreams = ssc.union(streamsList.get(0),
-					streamsList.subList(1, streamsList.size()));
-		} else {
-			// Otherwise, just use the 1 stream
-			unionStreams = streamsList.get(0);
-		}
-
-		/*
-		 * Generate JavaPairDStream
-		 */
-
-		JavaPairDStream<String, RowMutation> pairDStream = unionStreams
-				.mapToPair(new PairFunction<MessageAndMetadata, String, RowMutation>() {
-
-					private static final long serialVersionUID = 443235214978L;
-
-					public Tuple2<String, RowMutation> call(
-							MessageAndMetadata mmeta) {
-
-						/*
-						 * create the RowMutation from MessageAndMetadata
-						 */
-
-						String message = new String(mmeta.getPayload());
-						String keyStr = DigestUtils.shaHex(message);
-
-						Record record = new Record();
-						record.setRecordId(keyStr);
-						record.addToColumns(new Column("message", message));
-						record.setFamily("family");
-
-						List recordMutations = new ArrayList();
-						recordMutations.add(new RecordMutation(
-								RecordMutationType.REPLACE_ENTIRE_RECORD,
-								record));
-						RowMutation mutation = new RowMutation("nrt", keyStr,
-								RowMutationType.REPLACE_ROW, recordMutations);
-						mutation.setRecordMutations(recordMutations);
-
-						return new Tuple2<String, RowMutation>(keyStr, mutation);
-					}
-				});
-
-		pairDStream
-				.foreachRDD(new Function2<JavaPairRDD<String, RowMutation>, Time, Void>() {
-
-					private static final long serialVersionUID = 88875777435L;
-					
-					/*
-					 * Blur Thrift Client
-					 */
-
-					Iface client = BlurClient.getClient("10.252.5.113:40010");
-
-					@Override
-					public Void call(JavaPairRDD<String, RowMutation> rdd,
-							Time time) throws Exception {
-
-						for (Tuple2<String, RowMutation> tuple : rdd.collect()) {
-
-							if (tuple != null) {
-
-								try {
-
-									RowMutation rm = tuple._2;
-									
-									/*
-									 * Index using enqueue mutate call
-									 */
-									client.enqueueMutate(rm);
-
-								} catch (Exception ex) {
-
-									ex.printStackTrace();
-								}
-
-							}
-
-						}
-
-						return null;
-					}
-				});
-
-		// ssc.checkpoint(checkpointDirectory);
-		ssc.start();
-		ssc.awaitTermination();
-	}
-
-	public static void main(String[] args) throws Exception {
-
-		ConsumerEnqueue consumer = new ConsumerEnqueue();
-		consumer.init(args);
-		consumer.start();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2db2ee80/contrib/blur-spark/src/main/java/org/apache/blur/spark/example/SimpleSparkLoaderExample.java
----------------------------------------------------------------------
diff --git a/contrib/blur-spark/src/main/java/org/apache/blur/spark/example/SimpleSparkLoaderExample.java b/contrib/blur-spark/src/main/java/org/apache/blur/spark/example/SimpleSparkLoaderExample.java
new file mode 100644
index 0000000..59b6992
--- /dev/null
+++ b/contrib/blur-spark/src/main/java/org/apache/blur/spark/example/SimpleSparkLoaderExample.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.spark.example;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.blur.spark.BlurMRBulkLoadSparkProcessor;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.RecordMutation;
+import org.apache.blur.thrift.generated.RecordMutationType;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.RowMutationType;
+import org.apache.spark.SparkConf;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+@SuppressWarnings("serial")
+public class SimpleSparkLoaderExample extends BlurMRBulkLoadSparkProcessor<String> {
+
+  public static void main(String[] args) throws IOException {
+    SimpleSparkLoaderExample loader = new SimpleSparkLoaderExample();
+    loader.setConnectionStr("127.0.0.1:40010");
+    // loader.setHdfsDirToMonitor("hdfs://localhost:9000/tmp/spark/input/");
+    loader.setHdfsDirToMonitor("file:///tmp/spark-input/");
+    loader.setOutputPath("hdfs://localhost:9000/tmp/spark/output-" + System.currentTimeMillis());
+    loader.setSparkMaster("spark://amccurry:7077");
+    loader.setTableName("test_hdfs");
+    loader.run();
+  }
+
+  private String _tableName;
+  private String _connectionStr;
+  private String _hdfsDirToMonitor;
+  private String _sparkMaster;
+  private String _outputPath;
+
+  @Override
+  protected void setupSparkConf(SparkConf conf) {
+    conf.set("spark.master", _sparkMaster);
+  }
+
+  @Override
+  protected String getBlurTableName() {
+    return _tableName;
+  }
+
+  @Override
+  protected Iface getBlurClient() {
+    return BlurClient.getClient(_connectionStr);
+  }
+
+  @Override
+  protected String getAppName() {
+    return "Sample Blur Loader";
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  protected List<JavaDStream<String>> getStreamsList(JavaStreamingContext ssc) {
+    return Arrays.asList(ssc.textFileStream(_hdfsDirToMonitor));
+  }
+
+  @Override
+  protected RowMutation convert(String s) {
+    s = s.trim();
+    String rowId = s;
+    String recordId = s;
+    String value = s;
+
+    List<Column> columns = new ArrayList<Column>();
+    columns.add(new Column("col", value));
+    Record record = new Record(recordId, "spark-test", columns);
+
+    RowMutation rowMutation = new RowMutation();
+    rowMutation.setTable(getTableName());
+    rowMutation.setRowMutationType(RowMutationType.REPLACE_ROW);
+    rowMutation.setRowId(rowId);
+    rowMutation.addToRecordMutations(new RecordMutation(RecordMutationType.REPLACE_ENTIRE_RECORD, record));
+    return rowMutation;
+  }
+
+  @Override
+  protected String getOutputPath() {
+    return _outputPath;
+  }
+
+  public String getTableName() {
+    return _tableName;
+  }
+
+  public String getConnectionStr() {
+    return _connectionStr;
+  }
+
+  public String getHdfsDirToMonitor() {
+    return _hdfsDirToMonitor;
+  }
+
+  public String getSparkMaster() {
+    return _sparkMaster;
+  }
+
+  public void setTableName(String tableName) {
+    _tableName = tableName;
+  }
+
+  public void setConnectionStr(String connectionStr) {
+    _connectionStr = connectionStr;
+  }
+
+  public void setHdfsDirToMonitor(String hdfsDirToMonitor) {
+    _hdfsDirToMonitor = hdfsDirToMonitor;
+  }
+
+  public void setSparkMaster(String sparkMaster) {
+    _sparkMaster = sparkMaster;
+  }
+
+  public void setOutputPath(String outputPath) {
+    _outputPath = outputPath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2db2ee80/contrib/blur-spark/src/main/java/org/apache/blur/spark/util/JavaSparkUtil.java
----------------------------------------------------------------------
diff --git a/contrib/blur-spark/src/main/java/org/apache/blur/spark/util/JavaSparkUtil.java b/contrib/blur-spark/src/main/java/org/apache/blur/spark/util/JavaSparkUtil.java
new file mode 100644
index 0000000..6cea91a
--- /dev/null
+++ b/contrib/blur-spark/src/main/java/org/apache/blur/spark/util/JavaSparkUtil.java
@@ -0,0 +1,137 @@
+package org.apache.blur.spark.util;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+
+import com.google.common.base.Splitter;
+
+public class JavaSparkUtil {
+
+  private static final String DOT = ".";
+  private static final String TMP_SPARK_JOB = "tmp-spark-job_";
+  private static final String JAR = ".jar";
+  private static final String PATH_SEPARATOR = "path.separator";
+  private static final String JAVA_CLASS_PATH = "java.class.path";
+  private static final String SEP = "/";
+
+  public static void packProjectJars(SparkConf conf) throws IOException {
+    String classPath = System.getProperty(JAVA_CLASS_PATH);
+    String pathSeparator = System.getProperty(PATH_SEPARATOR);
+    Splitter splitter = Splitter.on(pathSeparator);
+    Iterable<String> split = splitter.split(classPath);
+    List<String> list = toList(split);
+    List<String> classPathThatNeedsToBeIncluded = removeSparkLibs(list);
+    List<String> jars = new ArrayList<String>();
+    for (String s : classPathThatNeedsToBeIncluded) {
+      if (isJarFile(s)) {
+        jars.add(s);
+      } else {
+        jars.add(createJar(s));
+      }
+    }
+    conf.setJars(jars.toArray(new String[jars.size()]));
+  }
+
+  private static String createJar(String s) throws IOException {
+    File sourceFile = new File(s);
+    if (sourceFile.isDirectory()) {
+      File file = File.createTempFile(TMP_SPARK_JOB, JAR);
+      OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(file));
+      JarOutputStream jarOut = new JarOutputStream(outputStream);
+      for (File f : sourceFile.listFiles()) {
+        pack(sourceFile, f, jarOut);
+      }
+      jarOut.close();
+      file.deleteOnExit();
+      return file.getAbsolutePath();
+    }
+    throw new RuntimeException("File [" + s + "] is not a directory.");
+  }
+
+  private static void pack(File rootPath, File source, JarOutputStream target) throws IOException {
+    String name = getName(rootPath, source);
+    if (source.isDirectory()) {
+      if (!SEP.equals(name)) {
+        JarEntry entry = new JarEntry(name);
+        entry.setTime(source.lastModified());
+        target.putNextEntry(entry);
+        target.closeEntry();
+      }
+      for (File f : source.listFiles()) {
+        pack(rootPath, f, target);
+      }
+    } else {
+      JarEntry entry = new JarEntry(name);
+      entry.setTime(source.lastModified());
+      target.putNextEntry(entry);
+      BufferedInputStream in = new BufferedInputStream(new FileInputStream(source));
+      IOUtils.copy(in, target);
+      in.close();
+      target.closeEntry();
+    }
+  }
+
+  private static String getName(File rootPath, File source) {
+    String rootStr = rootPath.toURI().toString();
+    String sourceStr = source.toURI().toString();
+    if (sourceStr.startsWith(rootStr)) {
+      String result = sourceStr.substring(rootStr.length());
+      if (source.isDirectory() && !result.endsWith(SEP)) {
+        result += SEP;
+      }
+      return result;
+    } else {
+      throw new RuntimeException("Not sure what happened.");
+    }
+  }
+
+  private static boolean isJarFile(String s) {
+    if (s.endsWith(JAR) || s.endsWith(".zip")) {
+      return true;
+    }
+    return false;
+  }
+
+  private static List<String> removeSparkLibs(List<String> list) {
+    String sparkJar = findSparkJar(list);
+    String sparkLib = sparkJar.substring(0, sparkJar.lastIndexOf(SEP) + 1);
+    List<String> result = new ArrayList<String>();
+    for (String s : list) {
+      if (!s.startsWith(sparkLib)) {
+        result.add(s);
+      }
+    }
+    return result;
+  }
+
+  private static String findSparkJar(List<String> list) {
+    String name = SparkConf.class.getName();
+    String resourceName = SEP + name.replace(DOT, SEP) + ".class";
+    URL url = SparkConf.class.getResource(resourceName);
+    String urlStr = url.toString();
+    urlStr = urlStr.substring(0, urlStr.indexOf('!'));
+    urlStr = urlStr.substring(urlStr.lastIndexOf(':') + 1);
+    return urlStr;
+  }
+
+  private static List<String> toList(Iterable<String> split) {
+    List<String> list = new ArrayList<String>();
+    for (String s : split) {
+      list.add(s);
+    }
+    return list;
+  }
+}
\ No newline at end of file


Mime
View raw message