pirk-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eawilli...@apache.org
Subject incubator-pirk git commit: [PIRK-21] - Initial Spark Streaming Responder Implementation -- closes apache/incubator-pirk#76
Date Sat, 27 Aug 2016 15:17:01 GMT
Repository: incubator-pirk
Updated Branches:
  refs/heads/master 4bb94cec3 -> 3957be411


[PIRK-21] - Initial Spark Streaming Responder Implementation -- closes apache/incubator-pirk#76


Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/3957be41
Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/3957be41
Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/3957be41

Branch: refs/heads/master
Commit: 3957be41109ec6ffab5f2702da16ee035c481ef7
Parents: 4bb94ce
Author: eawilliams <eawilliams@apache.org>
Authored: Sat Aug 27 11:16:43 2016 -0400
Committer: eawilliams <eawilliams@apache.org>
Committed: Sat Aug 27 11:16:43 2016 -0400

----------------------------------------------------------------------
 pom.xml                                         |  15 +-
 .../pirk/querier/wideskies/QuerierDriver.java   |   2 +-
 .../pirk/responder/wideskies/ResponderCLI.java  |  40 ++
 .../responder/wideskies/ResponderDriver.java    |  51 ++
 .../responder/wideskies/ResponderProps.java     |  57 ++-
 .../wideskies/common/ComputeEncryptedRow.java   |   4 +-
 .../responder/wideskies/spark/Accumulators.java |  15 +-
 .../wideskies/spark/BroadcastVars.java          |  30 +-
 .../wideskies/spark/ComputeResponse.java        |  40 +-
 .../spark/EncColMultGroupedMapper.java          |   2 +-
 .../wideskies/spark/EncColMultReducer.java      |   2 +-
 .../responder/wideskies/spark/EncRowCalc.java   |   5 +-
 .../responder/wideskies/spark/FilterData.java   |   6 +-
 .../streaming/ComputeStreamingResponse.java     | 467 +++++++++++++++++++
 .../spark/streaming/FinalResponseFunction.java  |  79 ++++
 .../org/apache/pirk/schema/data/DataSchema.java |   5 +
 .../test/distributed/DistributedTestCLI.java    |   3 +
 .../test/distributed/DistributedTestDriver.java |  30 +-
 .../distributed/testsuite/DistTestSuite.java    | 148 +++++-
 .../org/apache/pirk/test/utils/BaseTests.java   |  30 +-
 .../java/org/apache/pirk/test/utils/Inputs.java |  11 +-
 .../apache/pirk/utils/SystemConfiguration.java  |  18 +
 src/main/resources/pirk.properties              |   4 +-
 src/main/resources/responder.properties         |  22 +-
 .../pirk/general/QueryParserUtilsTest.java      |   5 -
 .../pirk/schema/query/LoadQuerySchemaTest.java  |   2 +-
 .../wideskies/standalone/StandaloneTest.java    |   3 +-
 27 files changed, 987 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 09c1faa..b46325e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,9 +87,9 @@
 		<hadoop.version>2.7.2</hadoop.version>
 		<apache-commons.version>3.3</apache-commons.version>
 		<elasticsearch.version>2.3.3</elasticsearch.version>
+		<spark-streaming.version>2.0.0</spark-streaming.version>
 		<pirk.forkCount>1C</pirk.forkCount>
 		<pirk.reuseForks>true</pirk.reuseForks>
-		<jackson.version>2.8.1</jackson.version>
 	</properties>
 
 	<dependencies>
@@ -217,6 +217,12 @@
 			</exclusions>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.spark</groupId>
+			<artifactId>spark-streaming_2.10</artifactId>
+			<version>${spark-streaming.version}</version>
+		</dependency>
+
 		<!-- Square's JNA GMP module -->
 		<dependency>
 			<groupId>com.squareup.jnagmp</groupId>
@@ -258,13 +264,6 @@
 			<version>2.6.2</version>
 		</dependency>
 
-		<!-- Jackson dependency -->
-		<dependency>
-			<groupId>com.fasterxml.jackson.core</groupId>
-			<artifactId>jackson-core</artifactId>
-			<version>${jackson.version}</version>
-		</dependency>
-
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
index d91fa2d..4f26a71 100644
--- a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
+++ b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
@@ -113,7 +113,7 @@ public class QuerierDriver implements Serializable
     {
       queryType = SystemConfiguration.getProperty(QuerierProps.QUERYTYPE);
       hashBitSize = Integer.parseInt(SystemConfiguration.getProperty(QuerierProps.HASHBITSIZE));
-      hashKey = SystemConfiguration.getProperty(QuerierProps.HASHBITSIZE);
+      hashKey = SystemConfiguration.getProperty(QuerierProps.HASHKEY);
       dataPartitionBitSize = Integer.parseInt(SystemConfiguration.getProperty(QuerierProps.DATAPARTITIONSIZE));
       paillierBitSize = Integer.parseInt(SystemConfiguration.getProperty(QuerierProps.PAILLIERBITSIZE));
       certainty = Integer.parseInt(SystemConfiguration.getProperty(QuerierProps.CERTAINTY));

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
index ff43be6..0c031ec 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
@@ -377,6 +377,46 @@ public class ResponderCLI
     optionAllowEmbeddedQS.setType(String.class);
     options.addOption(optionAllowEmbeddedQS);
 
+    // batchSeconds - spark streaming
+    Option optionBatchSeconds = new Option("batchSeconds", ResponderProps.BATCHSECONDS, true,
+        "optional -- Number of seconds per batch in Spark Streaming; defaults to 30");
+    optionBatchSeconds.setRequired(false);
+    optionBatchSeconds.setArgName(ResponderProps.BATCHSECONDS);
+    optionBatchSeconds.setType(String.class);
+    options.addOption(optionBatchSeconds);
+
+    // windowLength - spark streaming
+    Option optionWindowLength = new Option("windowLength", ResponderProps.WINDOWLENGTH, true,
+        "optional -- Number of seconds per window in Spark Streaming; defaults to 60");
+    optionWindowLength.setRequired(false);
+    optionWindowLength.setArgName(ResponderProps.WINDOWLENGTH);
+    optionWindowLength.setType(String.class);
+    options.addOption(optionWindowLength);
+
+    // maxBatches - spark streaming
+    Option optionMaxBatches = new Option("maxBatches", ResponderProps.MAXBATCHES, true,
+        "optional -- Max batches to process in Spark Streaming; defaults to -1 - unlimited");
+    optionMaxBatches.setRequired(false);
+    optionMaxBatches.setArgName(ResponderProps.MAXBATCHES);
+    optionMaxBatches.setType(String.class);
+    options.addOption(optionMaxBatches);
+
+    // stopGracefully - spark streaming
+    Option optionStopGracefully = new Option("stopGracefully", ResponderProps.STOPGRACEFULLY, true,
+        "optional -- Whether or not to stop gracefully in Spark Streaming; defaults to false");
+    optionStopGracefully.setRequired(false);
+    optionStopGracefully.setArgName(ResponderProps.STOPGRACEFULLY);
+    optionStopGracefully.setType(String.class);
+    options.addOption(optionStopGracefully);
+
+    // useQueueStream - spark streaming
+    Option optionUseQueueStream = new Option("queueStream", ResponderProps.USEQUEUESTREAM, true,
+        "optional -- Whether or not to use a queue stream in Spark Streaming; defaults to false");
+    optionUseQueueStream.setRequired(false);
+    optionUseQueueStream.setArgName(ResponderProps.USEQUEUESTREAM);
+    optionUseQueueStream.setType(String.class);
+    options.addOption(optionUseQueueStream);
+
     return options;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
index da24ae4..6b32418 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
@@ -18,12 +18,15 @@
  */
 package org.apache.pirk.responder.wideskies;
 
+import java.security.Permission;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.pirk.query.wideskies.Query;
 import org.apache.pirk.responder.wideskies.mapreduce.ComputeResponseTool;
 import org.apache.pirk.responder.wideskies.spark.ComputeResponse;
+import org.apache.pirk.responder.wideskies.spark.streaming.ComputeStreamingResponse;
 import org.apache.pirk.responder.wideskies.standalone.Responder;
 import org.apache.pirk.serialization.LocalFileSystemStore;
 import org.apache.pirk.utils.SystemConfiguration;
@@ -50,6 +53,9 @@ public class ResponderDriver
   {
     ResponderCLI responderCLI = new ResponderCLI(args);
 
+    // For handling System.exit calls from Spark Streaming
+    System.setSecurityManager(new SystemExitManager());
+
     if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("mapreduce"))
     {
       logger.info("Launching MapReduce ResponderTool:");
@@ -65,6 +71,25 @@ public class ResponderDriver
       ComputeResponse computeResponse = new ComputeResponse(fs);
       computeResponse.performQuery();
     }
+    else if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("sparkstreaming"))
+    {
+      logger.info("Launching Spark ComputeStreamingResponse:");
+
+      FileSystem fs = FileSystem.get(new Configuration());
+      ComputeStreamingResponse computeSR = new ComputeStreamingResponse(fs);
+      try
+      {
+        computeSR.performQuery();
+      } catch (SystemExitException e)
+      {
+        // If System.exit(0) is not caught from Spark Streaming,
+        // the application will complete with a 'failed' status
+        logger.info("Exited with System.exit(0) from Spark Streaming");
+      }
+
+      // Teardown the context
+      computeSR.teardown();
+    }
     else if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("standalone"))
     {
       logger.info("Launching Standalone Responder:");
@@ -76,4 +101,30 @@ public class ResponderDriver
       pirResponder.computeStandaloneResponse();
     }
   }
+
+  // Exception and Security Manager classes used to catch System.exit from Spark Streaming
+  private static class SystemExitException extends SecurityException
+  {}
+
+  private static class SystemExitManager extends SecurityManager
+  {
+    @Override
+    public void checkPermission(Permission perm)
+    {}
+
+    @Override
+    public void checkExit(int status)
+    {
+      super.checkExit(status);
+      if (status == 0) // If we exited cleanly, throw SystemExitException
+      {
+        throw new SystemExitException();
+      }
+      else
+      {
+        throw new SecurityException();
+      }
+
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
index a9f8fae..4fee85a 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
@@ -21,6 +21,7 @@ package org.apache.pirk.responder.wideskies;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.commons.cli.Option;
 import org.apache.pirk.inputformat.hadoop.InputFormatConst;
 import org.apache.pirk.schema.data.DataSchemaLoader;
 import org.apache.pirk.schema.query.QuerySchemaLoader;
@@ -59,19 +60,26 @@ public class ResponderProps
   public static final String NUMCOLMULTPARTITIONS = "pir.numColMultPartitions";
   public static final String USEMODEXPJOIN = "pir.useModExpJoin";
   public static final String COLMULTREDUCEBYKEY = "pir.colMultReduceByKey";
-  static final String NUMREDUCETASKS = "pir.numReduceTasks";
-  static final String MAPMEMORY = "mapreduce.map.memory.mb";
-  static final String REDUCEMEMORY = "mapreduce.reduce.memory.mb";
-  static final String MAPJAVAOPTS = "mapreduce.map.java.opts";
-  static final String REDUCEJAVAOPTS = "mapreduce.reduce.java.opts";
-  static final String USEHDFSLOOKUPTABLE = "pir.useHDFSLookupTable";
-  static final String NUMDATAPARTITIONS = "pir.numDataPartitions";
-  static final String ALLOWEMBEDDEDQUERYSCHEMAS = "pir.allowEmbeddedQuerySchemas";
+  public static final String NUMREDUCETASKS = "pir.numReduceTasks";
+  public static final String MAPMEMORY = "mapreduce.map.memory.mb";
+  public static final String REDUCEMEMORY = "mapreduce.reduce.memory.mb";
+  public static final String MAPJAVAOPTS = "mapreduce.map.java.opts";
+  public static final String REDUCEJAVAOPTS = "mapreduce.reduce.java.opts";
+  public static final String USEHDFSLOOKUPTABLE = "pir.useHDFSLookupTable";
+  public static final String NUMDATAPARTITIONS = "pir.numDataPartitions";
+  public static final String ALLOWEMBEDDEDQUERYSCHEMAS = "pir.allowEmbeddedQuerySchemas";
+
+  // For Spark Streaming - optional
+  public static final String BATCHSECONDS = "pir.sparkstreaming.batchSeconds";
+  public static final String WINDOWLENGTH = "pir.sparkstreaming.windowLength";
+  public static final String USEQUEUESTREAM = "pir.sparkstreaming.useQueueStream";
+  public static final String MAXBATCHES = "pir.sparkstreaming.maxBatches";
+  public static final String STOPGRACEFULLY = "spark.streaming.stopGracefullyOnShutdown";
 
   static final List<String> PROPSLIST = Arrays.asList(PLATFORM, QUERYINPUT, DATAINPUTFORMAT, INPUTDATA, BASEQUERY, ESRESOURCE, ESQUERY, ESNODES, ESPORT,
       OUTPUTFILE, BASEINPUTFORMAT, STOPLISTFILE, NUMREDUCETASKS, USELOCALCACHE, LIMITHITSPERSELECTOR, MAXHITSPERSELECTOR, MAPMEMORY, REDUCEMEMORY, MAPJAVAOPTS,
       REDUCEJAVAOPTS, QUERYSCHEMAS, DATASCHEMAS, NUMEXPLOOKUPPARTS, USEHDFSLOOKUPTABLE, NUMDATAPARTITIONS, NUMCOLMULTPARTITIONS, USEMODEXPJOIN,
-      COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS);
+      COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS, BATCHSECONDS, WINDOWLENGTH, USEQUEUESTREAM, MAXBATCHES, STOPGRACEFULLY);
 
   /**
    * Validates the responder properties
@@ -90,7 +98,7 @@ public class ResponderProps
     }
 
     String platform = SystemConfiguration.getProperty(PLATFORM).toLowerCase();
-    if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("standalone"))
+    if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("sparkstreaming") && !platform.equals("standalone"))
     {
       logger.info("Unsupported platform: " + platform);
       valid = false;
@@ -176,7 +184,7 @@ public class ResponderProps
       valid = false;
     }
 
-    // Parse optional properties with defaults
+    // Parse optional properties
 
     if (SystemConfiguration.hasProperty(QUERYSCHEMAS))
     {
@@ -188,6 +196,8 @@ public class ResponderProps
       SystemConfiguration.appendProperty("data.schemas", SystemConfiguration.getProperty(DATASCHEMAS));
     }
 
+    // Parse optional properties with defaults
+
     if (!SystemConfiguration.hasProperty(USEHDFSLOOKUPTABLE))
     {
       SystemConfiguration.setProperty(USEHDFSLOOKUPTABLE, "false");
@@ -223,6 +233,31 @@ public class ResponderProps
       SystemConfiguration.setProperty(USELOCALCACHE, "true");
     }
 
+    if (!SystemConfiguration.hasProperty(BATCHSECONDS))
+    {
+      SystemConfiguration.setProperty(BATCHSECONDS, "30");
+    }
+
+    if (!SystemConfiguration.hasProperty(WINDOWLENGTH))
+    {
+      SystemConfiguration.setProperty(WINDOWLENGTH, "30");
+    }
+
+    if (!SystemConfiguration.hasProperty(USEQUEUESTREAM))
+    {
+      SystemConfiguration.setProperty(USEQUEUESTREAM, "false");
+    }
+
+    if (!SystemConfiguration.hasProperty(MAXBATCHES))
+    {
+      SystemConfiguration.setProperty(MAXBATCHES, "-1");
+    }
+
+    if (!SystemConfiguration.hasProperty(STOPGRACEFULLY))
+    {
+      SystemConfiguration.setProperty(STOPGRACEFULLY, "false");
+    }
+
     // Load the new local query and data schemas
     if (valid)
     {

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
index 6ef1bdc..0745bea 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
@@ -184,7 +184,7 @@ public class ComputeEncryptedRow
       {
         if (elementCounter >= maxHitsPerSelector)
         {
-          logger.info("maxHits: rowIndex = " + rowIndex + " elementCounter = " + elementCounter);
+          logger.debug("maxHits: rowIndex = " + rowIndex + " elementCounter = " + elementCounter);
           break;
         }
       }
@@ -222,7 +222,7 @@ public class ComputeEncryptedRow
 
       ++elementCounter;
     }
-    logger.info("totalHits: rowIndex = " + rowIndex + " elementCounter = " + elementCounter);
+    logger.debug("totalHits: rowIndex = " + rowIndex + " elementCounter = " + elementCounter);
 
     return returnPairs;
   }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java
index 11473a0..fb5fb91 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java
@@ -40,6 +40,7 @@ public class Accumulators implements Serializable
   private Accumulator<Integer> numRecordsAfterFilter = null;
   private Accumulator<Integer> numHashes = null;
   private Accumulator<Integer> numColumns = null;
+  private Accumulator<Integer> numBatches = null;
 
   public Accumulators(JavaSparkContext sc)
   {
@@ -48,6 +49,7 @@ public class Accumulators implements Serializable
     numRecordsAfterFilter = sc.accumulator(0);
     numHashes = sc.accumulator(0);
     numColumns = sc.accumulator(0);
+    numBatches = sc.accumulator(0);
   }
 
   public Integer numRecordsReceivedGetValue()
@@ -100,6 +102,16 @@ public class Accumulators implements Serializable
     numColumns.add(val);
   }
 
+  public Integer numBatchesGetValue()
+  {
+    return numBatches.value();
+  }
+
+  public void incNumBatches(int val)
+  {
+    numBatches.add(val);
+  }
+
   public void resetAll()
   {
     numRecordsReceived.setValue(0);
@@ -107,11 +119,12 @@ public class Accumulators implements Serializable
     numRecordsAfterFilter.setValue(0);
     numHashes.setValue(0);
     numColumns.setValue(0);
+    numBatches.setValue(0);
   }
 
   public void printAll()
   {
     logger.info("numRecordsReceived = " + numRecordsReceived.value() + " \n numRecordsFiltered = " + numRecordsFiltered + " \n numRecordsAfterFilter = "
-        + numRecordsAfterFilter + " \n numHashes = " + numHashes + " \n numColumns = " + numColumns);
+        + numRecordsAfterFilter + " \n numHashes = " + numHashes + " \n numColumns = " + numColumns.value() + " \n numBatches = " + numBatches.value());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java
index bab4ae9..d0215fc 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java
@@ -45,7 +45,7 @@ public class BroadcastVars implements Serializable
 
   private Broadcast<QuerySchema> querySchema = null;
 
-  private Broadcast<String> useLocalCache = null;
+  private Broadcast<Boolean> useLocalCache = null;
 
   private Broadcast<Boolean> limitHitsPerSelector = null;
 
@@ -53,6 +53,10 @@ public class BroadcastVars implements Serializable
 
   private Broadcast<String> expDir = null;
 
+  private Broadcast<String> output = null;
+
+  private Broadcast<Integer> maxBatches = null;
+
   public BroadcastVars(JavaSparkContext sc)
   {
     jsc = sc;
@@ -73,6 +77,16 @@ public class BroadcastVars implements Serializable
     return queryInfo.getValue();
   }
 
+  public void setOutput(String outputIn)
+  {
+    output = jsc.broadcast(outputIn);
+  }
+
+  public String getOutput()
+  {
+    return output.getValue();
+  }
+
   public void setQueryInfo(QueryInfo queryInfoIn)
   {
     queryInfo = jsc.broadcast(queryInfoIn);
@@ -98,12 +112,12 @@ public class BroadcastVars implements Serializable
     return dataSchema.getValue();
   }
 
-  public void setUseLocalCache(String useLocalCacheInput)
+  public void setUseLocalCache(Boolean useLocalCacheInput)
   {
     useLocalCache = jsc.broadcast(useLocalCacheInput);
   }
 
-  public String getUseLocalCache()
+  public Boolean getUseLocalCache()
   {
     return useLocalCache.getValue();
   }
@@ -137,4 +151,14 @@ public class BroadcastVars implements Serializable
   {
     return expDir.getValue();
   }
+
+  public Integer getMaxBatches()
+  {
+    return maxBatches.getValue();
+  }
+
+  public void setMaxBatches(Integer maxBatchesIn)
+  {
+    maxBatches = jsc.broadcast(maxBatchesIn);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
index 2050643..f34acf8 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
@@ -91,6 +91,7 @@ public class ComputeResponse
 
   private QueryInfo queryInfo = null;
   Query query = null;
+  QuerySchema qSchema = null;
 
   private int numDataPartitions = 0;
   private int numColMultPartitions = 0;
@@ -175,7 +176,6 @@ public class ComputeResponse
     bVars.setQuery(query);
     bVars.setQueryInfo(queryInfo);
 
-    QuerySchema qSchema = null;
     if (SystemConfiguration.getBooleanProperty("pir.allowAdHocQuerySchemas", false))
     {
       qSchema = queryInfo.getQuerySchema();
@@ -190,7 +190,7 @@ public class ComputeResponse
     bVars.setDataSchema(dSchema);
 
     // Set the local cache flag
-    bVars.setUseLocalCache(SystemConfiguration.getProperty("pir.useLocalCache", "true"));
+    bVars.setUseLocalCache(SystemConfiguration.getBooleanProperty("pir.useLocalCache", true));
 
     useHDFSLookupTable = SystemConfiguration.isSetTrue("pir.useHDFSLookupTable");
 
@@ -246,7 +246,7 @@ public class ComputeResponse
   {
     logger.info("Reading data ");
 
-    JavaRDD<MapWritable> dataRDD;
+    JavaRDD<MapWritable> jsonRDD;
 
     Job job = new Job();
     String baseQuery = SystemConfiguration.getProperty("pir.baseQuery");
@@ -262,7 +262,6 @@ public class ComputeResponse
       logger.debug("schemaName = " + name);
     }
 
-    QuerySchema qSchema = QuerySchemaRegistry.get(bVars.getQueryInfo().getQueryType());
     job.getConfiguration().set("dataSchemaName", qSchema.getDataSchemaName());
     job.getConfiguration().set("data.schemas", SystemConfiguration.getProperty("data.schemas"));
 
@@ -278,12 +277,19 @@ public class ComputeResponse
     FileInputFormat.setInputPaths(job, inputData);
 
     // Read data from hdfs
-    JavaRDD<MapWritable> jsonRDD = sc.newAPIHadoopRDD(job.getConfiguration(), inputClass, Text.class, MapWritable.class).values().coalesce(numDataPartitions);
+    jsonRDD = sc.newAPIHadoopRDD(job.getConfiguration(), inputClass, Text.class, MapWritable.class).values().coalesce(numDataPartitions);
 
     // Filter out by the provided stopListFile entries
-    dataRDD = jsonRDD.filter(new FilterData(accum, bVars));
-
-    return dataRDD;
+    if (qSchema.getFilter() != null)
+    {
+      JavaRDD<MapWritable> filteredRDD = jsonRDD.filter(new FilterData(accum, bVars));
+      return filteredRDD;
+    }
+    else
+    {
+      logger.info("qSchema.getFilter() is null");
+      return jsonRDD;
+    }
   }
 
   /**
@@ -294,7 +300,7 @@ public class ComputeResponse
   {
     logger.info("Reading data ");
 
-    JavaRDD<MapWritable> dataRDD;
+    JavaRDD<MapWritable> jsonRDD;
 
     Job job = new Job();
     String jobName = "pirSpark_ES_" + esQuery + "_" + System.currentTimeMillis();
@@ -304,13 +310,19 @@ public class ComputeResponse
     job.getConfiguration().set("es.resource", esResource);
     job.getConfiguration().set("es.query", esQuery);
 
-    JavaRDD<MapWritable> jsonRDD = sc.newAPIHadoopRDD(job.getConfiguration(), EsInputFormat.class, Text.class, MapWritable.class).values()
-        .coalesce(numDataPartitions);
+    jsonRDD = sc.newAPIHadoopRDD(job.getConfiguration(), EsInputFormat.class, Text.class, MapWritable.class).values().coalesce(numDataPartitions);
 
     // Filter out by the provided stopListFile entries
-    dataRDD = jsonRDD.filter(new FilterData(accum, bVars));
-
-    return dataRDD;
+    if (qSchema.getFilter() != null)
+    {
+      JavaRDD<MapWritable> filteredRDD = jsonRDD.filter(new FilterData(accum, bVars));
+      return filteredRDD;
+    }
+    else
+    {
+      logger.info("qSchema.getFilter() is null");
+      return jsonRDD;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
index 0f82b6d..56c917c 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
@@ -39,7 +39,7 @@ public class EncColMultGroupedMapper implements PairFunction<Tuple2<Long,Iterabl
 
   Query query = null;
 
-  EncColMultGroupedMapper(Accumulators accumIn, BroadcastVars bbVarsIn)
+  public EncColMultGroupedMapper(Accumulators accumIn, BroadcastVars bbVarsIn)
   {
 
     query = bbVarsIn.getQuery();

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
index 44bce8d..9242df7 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
@@ -37,7 +37,7 @@ public class EncColMultReducer implements Function2<BigInteger,BigInteger,BigInt
 
   Query query = null;
 
-  EncColMultReducer(Accumulators accumIn, BroadcastVars bbVarsIn)
+  public EncColMultReducer(Accumulators accumIn, BroadcastVars bbVarsIn)
   {
 
     query = bbVarsIn.getQuery();

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
index dc25439..ef279e2 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
@@ -63,10 +63,7 @@ public class EncRowCalc implements PairFlatMapFunction<Tuple2<Integer,Iterable<L
 
     query = bvIn.getQuery();
     queryInfo = bvIn.getQueryInfo();
-    if (bvIn.getUseLocalCache().equals("true"))
-    {
-      useLocalCache = true;
-    }
+    useLocalCache = bvIn.getUseLocalCache();
     limitHitsPerSelector = bvIn.getLimitHitsPerSelector();
     maxHitsPerSelector = bvIn.getMaxHitsPerSelector();
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
index fb87b06..0a8959a 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
@@ -61,11 +61,7 @@ public class FilterData implements Function<MapWritable,Boolean>
     accum.incNumRecordsReceived(1);
 
     // Perform the filter
-    boolean passFilter = true;
-    if (filter != null)
-    {
-      passFilter = ((DataFilter) filter).filterDataElement(dataElement, dSchema);
-    }
+    boolean passFilter = ((DataFilter) filter).filterDataElement(dataElement, dSchema);
 
     if (passFilter)
     {

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
new file mode 100644
index 0000000..eaf7384
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.spark.streaming;
+
+import java.math.BigInteger;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.pirk.inputformat.hadoop.BaseInputFormat;
+import org.apache.pirk.inputformat.hadoop.InputFormatConst;
+import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.responder.wideskies.spark.Accumulators;
+import org.apache.pirk.responder.wideskies.spark.BroadcastVars;
+import org.apache.pirk.responder.wideskies.spark.EncColMultGroupedMapper;
+import org.apache.pirk.responder.wideskies.spark.EncColMultReducer;
+import org.apache.pirk.responder.wideskies.spark.EncRowCalc;
+import org.apache.pirk.responder.wideskies.spark.FilterData;
+import org.apache.pirk.responder.wideskies.spark.HashSelectorsAndPartitionData;
+import org.apache.pirk.schema.data.DataSchema;
+import org.apache.pirk.schema.data.DataSchemaLoader;
+import org.apache.pirk.schema.data.DataSchemaRegistry;
+import org.apache.pirk.schema.query.QuerySchema;
+import org.apache.pirk.schema.query.QuerySchemaLoader;
+import org.apache.pirk.schema.query.QuerySchemaRegistry;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
+import org.apache.pirk.utils.PIRException;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaPairInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.elasticsearch.hadoop.mr.EsInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Master class for the PIR query spark streaming application
+ * <p>
+ * NOTE:
+ * <p>
+ * - NOT using Elasticsearch in practice - proved to be some speed issues with ES and Spark that appear to be ES-Spark specific - leave code in anticipation
+ * that the ES-Spark issues resolve...
+ * <p>
+ * - Even if rdd.count() calls are embedded in logger.debug statements, they are computed by Spark. Thus, they are commented out in the code below - uncomment
+ * for rdd.count() debug
+ * 
+ */
+public class ComputeStreamingResponse
+{
+  private static final Logger logger = LoggerFactory.getLogger(ComputeStreamingResponse.class);
+
+  private String dataInputFormat = null;
+  private String inputData = null;
+  private String outputFile = null;
+  private String outputDirExp = null;
+
+  private String queryInput = null;
+  QuerySchema qSchema = null;
+
+  private String esQuery = "none";
+  private String esResource = "none";
+
+  private FileSystem fs = null;
+  private HadoopFileSystemStore storage = null;
+  private JavaStreamingContext jssc = null;
+
+  boolean useQueueStream = false;
+
+  private long batchSeconds = 0;
+  private long windowLength = 0;
+
+  private Accumulators accum = null;
+  private BroadcastVars bVars = null;
+
+  private QueryInfo queryInfo = null;
+  Query query = null;
+
+  private int numDataPartitions = 0;
+  private int numColMultPartitions = 0;
+
+  private boolean colMultReduceByKey = false;
+
+  public ComputeStreamingResponse(FileSystem fileSys) throws Exception
+  {
+    fs = fileSys;
+    storage = new HadoopFileSystemStore(fs);
+
+    dataInputFormat = SystemConfiguration.getProperty("pir.dataInputFormat");
+    if (!InputFormatConst.ALLOWED_FORMATS.contains(dataInputFormat))
+    {
+      throw new IllegalArgumentException("inputFormat = " + dataInputFormat + " is of an unknown form");
+    }
+    logger.info("inputFormat = " + dataInputFormat);
+    if (dataInputFormat.equals(InputFormatConst.BASE_FORMAT))
+    {
+      inputData = SystemConfiguration.getProperty("pir.inputData", "none");
+      if (inputData.equals("none"))
+      {
+        throw new IllegalArgumentException("For inputFormat = " + dataInputFormat + " an inputFile must be specified");
+      }
+      logger.info("inputFile = " + inputData);
+    }
+    else if (dataInputFormat.equals(InputFormatConst.ES))
+    {
+      esQuery = SystemConfiguration.getProperty("pir.esQuery", "none");
+      esResource = SystemConfiguration.getProperty("pir.esResource", "none");
+      if (esQuery.equals("none"))
+      {
+        throw new IllegalArgumentException("esQuery must be specified");
+      }
+      if (esResource.equals("none"))
+      {
+        throw new IllegalArgumentException("esResource must be specified");
+      }
+      logger.info("esQuery = " + esQuery + " esResource = " + esResource);
+    }
+    outputFile = SystemConfiguration.getProperty("pir.outputFile");
+    outputDirExp = outputFile + "_exp";
+
+    queryInput = SystemConfiguration.getProperty("pir.queryInput");
+    String stopListFile = SystemConfiguration.getProperty("pir.stopListFile");
+
+    logger.info("outputFile = " + outputFile + " queryInputDir = " + queryInput + " stopListFile = " + stopListFile + " esQuery = " + esQuery
+        + " esResource = " + esResource);
+
+    // Pull the batchSeconds and windowLength parameters
+    batchSeconds = SystemConfiguration.getLongProperty("pir.sparkstreaming.batchSeconds", 30);
+    windowLength = SystemConfiguration.getLongProperty("pir.sparkstreaming.windowLength", 60);
+    if (windowLength % batchSeconds != 0)
+    {
+      throw new IllegalArgumentException("batchSeconds = " + batchSeconds + " must divide windowLength = " + windowLength);
+    }
+    useQueueStream = SystemConfiguration.getBooleanProperty("pir.sparkstreaming.useQueueStream", false);
+    logger.info("useQueueStream = " + useQueueStream);
+
+    // Set the necessary configurations
+    SparkConf conf = new SparkConf().setAppName("SparkPIR").setMaster("yarn-cluster");
+    conf.set("es.nodes", SystemConfiguration.getProperty("es.nodes", "none"));
+    conf.set("es.port", SystemConfiguration.getProperty("es.port", "none"));
+    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+    conf.set("spark.streaming.stopGracefullyOnShutdown", SystemConfiguration.getProperty("spark.streaming.stopGracefullyOnShutdown", "false"));
+
+    JavaSparkContext sc = new JavaSparkContext(conf);
+    jssc = new JavaStreamingContext(sc, Durations.seconds(batchSeconds));
+
+    // Setup, run query, teardown
+    logger.info("Setting up for query run");
+    setup();
+    logger.info("Setup complete");
+  }
+
+  // Setup for the accumulators and broadcast variables
+  private void setup() throws Exception
+  {
+    // Load the schemas
+    DataSchemaLoader.initialize(true, fs);
+    QuerySchemaLoader.initialize(true, fs);
+
+    // Create the accumulators and broadcast variables
+    accum = new Accumulators(jssc.sparkContext());
+    bVars = new BroadcastVars(jssc.sparkContext());
+
+    // Set the Query and QueryInfo broadcast variables
+    query = storage.recall(queryInput, Query.class);
+    queryInfo = query.getQueryInfo();
+    bVars.setQuery(query);
+    bVars.setQueryInfo(queryInfo);
+
+    if (SystemConfiguration.getBooleanProperty("pir.allowAdHocQuerySchemas", false))
+    {
+      qSchema = queryInfo.getQuerySchema();
+    }
+    if (qSchema == null)
+    {
+      qSchema = QuerySchemaRegistry.get(queryInfo.getQueryType());
+    }
+
+    DataSchema dSchema = DataSchemaRegistry.get(qSchema.getDataSchemaName());
+    bVars.setQuerySchema(qSchema);
+    bVars.setDataSchema(dSchema);
+
+    // Set the local cache flag
+    bVars.setUseLocalCache(SystemConfiguration.getBooleanProperty("pir.useLocalCache", true));
+
+    // Set the hit limit variables
+    bVars.setLimitHitsPerSelector(SystemConfiguration.getBooleanProperty("pir.limitHitsPerSelector", false));
+    bVars.setMaxHitsPerSelector(SystemConfiguration.getIntProperty("pir.maxHitsPerSelector", 100));
+
+    // Set the number of data and column multiplication partitions
+    numDataPartitions = SystemConfiguration.getIntProperty("pir.numDataPartitions", 1000);
+    numColMultPartitions = SystemConfiguration.getIntProperty("pir.numColMultPartitions", numDataPartitions);
+
+    // Whether or not we are performing a reduceByKey or a groupByKey->reduce for column multiplication
+    colMultReduceByKey = SystemConfiguration.getBooleanProperty("pir.colMultReduceByKey", false);
+
+    // Set the expDir
+    bVars.setExpDir(outputDirExp);
+
+    // Set the maxBatches
+    int maxBatches = SystemConfiguration.getIntProperty("pir.sparkstreaming.maxBatches", -1);
+    logger.info("maxBatches = " + maxBatches);
+    bVars.setMaxBatches(maxBatches);
+  }
+
+  /**
+   * Method to start the computation
+   * 
+   * @throws InterruptedException
+   */
+  public void start() throws InterruptedException
+  {
+    logger.info("Starting computation...");
+
+    jssc.start();
+    jssc.awaitTermination();
+  }
+
+  /**
+   * Method to tear down necessary elements when app is complete
+   */
+  public void teardown()
+  {
+    logger.info("Tearing down...");
+    jssc.stop();
+    logger.info("Tear down complete");
+  }
+
+  /**
+   * Method to read in data from an allowed input source/format and perform the query
+   */
+  public void performQuery() throws Exception
+  {
+    logger.info("Performing query: ");
+
+    JavaDStream<MapWritable> inputRDD = null;
+    if (dataInputFormat.equals(InputFormatConst.BASE_FORMAT))
+    {
+      inputRDD = readData();
+    }
+    else if (dataInputFormat.equals(InputFormatConst.ES))
+    {
+      inputRDD = readDataES();
+    }
+
+    performQuery(inputRDD);
+  }
+
+  /**
+   * Method to read in the data from an allowed input format, filter, and return a RDD of MapWritable data elements
+   */
+  @SuppressWarnings("unchecked")
+  public JavaDStream<MapWritable> readData() throws ClassNotFoundException, Exception
+  {
+    logger.info("Reading data ");
+
+    Job job = new Job();
+    String baseQuery = SystemConfiguration.getProperty("pir.baseQuery");
+    String jobName = "pirSpark_base_" + baseQuery + "_" + System.currentTimeMillis();
+    job.setJobName(jobName);
+    job.getConfiguration().setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true);
+    job.getConfiguration().set("query", baseQuery);
+
+    job.getConfiguration().set("dataSchemaName", qSchema.getDataSchemaName());
+    job.getConfiguration().set("data.schemas", SystemConfiguration.getProperty("data.schemas"));
+
+    // Set the inputFormatClass based upon the baseInputFormat property
+    String classString = SystemConfiguration.getProperty("pir.baseInputFormat");
+    Class<BaseInputFormat> inputClass = (Class<BaseInputFormat>) Class.forName(classString);
+    if (!Class.forName("org.apache.pirk.inputformat.hadoop.BaseInputFormat").isAssignableFrom(inputClass))
+    {
+      throw new Exception("baseInputFormat class = " + classString + " does not extend BaseInputFormat");
+    }
+    job.setInputFormatClass(inputClass);
+
+    FileInputFormat.setInputPaths(job, inputData);
+
+    // Read data from hdfs
+    logger.info("useQueueStream = " + useQueueStream);
+    JavaDStream<MapWritable> mwStream = null;
+    if (useQueueStream)
+    {
+      Queue<JavaRDD<MapWritable>> rddQueue = new LinkedList<JavaRDD<MapWritable>>();
+      JavaRDD<MapWritable> rddIn = jssc.sparkContext().newAPIHadoopRDD(job.getConfiguration(), inputClass, Text.class, MapWritable.class).values()
+          .coalesce(numDataPartitions);
+
+      rddQueue.add(rddIn);
+      mwStream = jssc.queueStream(rddQueue);
+    }
+    else
+    {
+      JavaPairInputDStream<Text,MapWritable> inputRDD = jssc.fileStream(inputData, Text.class, MapWritable.class, inputClass);
+      mwStream = inputRDD.transform(new Function<JavaPairRDD<Text,MapWritable>,JavaRDD<MapWritable>>()
+      {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public JavaRDD<MapWritable> call(JavaPairRDD<Text,MapWritable> pair) throws Exception
+        {
+          return pair.values();
+        }
+      }).repartition(numDataPartitions);
+    }
+
+    // Filter out by the provided stopListFile entries
+    if (qSchema.getFilter() != null)
+    {
+      JavaDStream<MapWritable> filteredRDD = mwStream.filter(new FilterData(accum, bVars));
+      return filteredRDD;
+    }
+
+    return mwStream;
+  }
+
+  /**
+   * Method to read in the data from elasticsearch, filter, and return a RDD of MapWritable data elements
+   */
+  @SuppressWarnings("unchecked")
+  public JavaDStream<MapWritable> readDataES() throws Exception
+  {
+    logger.info("Reading data ");
+
+    Job job = new Job();
+    String jobName = "pirSpark_ES_" + esQuery + "_" + System.currentTimeMillis();
+    job.setJobName(jobName);
+    job.getConfiguration().set("es.nodes", SystemConfiguration.getProperty("es.nodes"));
+    job.getConfiguration().set("es.port", SystemConfiguration.getProperty("es.port"));
+    job.getConfiguration().set("es.resource", esResource);
+    job.getConfiguration().set("es.query", esQuery);
+
+    // Read data from hdfs
+    JavaDStream<MapWritable> mwStream = null;
+    if (useQueueStream)
+    {
+      Queue<JavaRDD<MapWritable>> rddQueue = new LinkedList<JavaRDD<MapWritable>>();
+      JavaRDD<MapWritable> rddIn = jssc.sparkContext().newAPIHadoopRDD(job.getConfiguration(), EsInputFormat.class, Text.class, MapWritable.class).values()
+          .coalesce(numDataPartitions);
+      rddQueue.add(rddIn);
+
+      mwStream = jssc.queueStream(rddQueue);
+    }
+    else
+    {
+      JavaPairInputDStream<Text,MapWritable> inputRDD = jssc.fileStream(inputData, Text.class, MapWritable.class, EsInputFormat.class);
+      mwStream = inputRDD.transform(new Function<JavaPairRDD<Text,MapWritable>,JavaRDD<MapWritable>>()
+      {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public JavaRDD<MapWritable> call(JavaPairRDD<Text,MapWritable> pair) throws Exception
+        {
+          return pair.values();
+        }
+      }).repartition(numDataPartitions);
+    }
+
+    // Filter out by the provided stopListFile entries
+    if (qSchema.getFilter() != null)
+    {
+      JavaDStream<MapWritable> filteredRDD = mwStream.filter(new FilterData(accum, bVars));
+      return filteredRDD;
+    }
+    else
+    {
+      return mwStream;
+    }
+  }
+
+  /**
+   * Method to perform the query given an input JavaDStream of JSON
+   * 
+   * @throws InterruptedException
+   * 
+   */
+  public void performQuery(JavaDStream<MapWritable> input) throws PIRException, InterruptedException
+  {
+    logger.info("Performing query: ");
+
+    // Process non-overlapping windows of data of duration windowLength seconds
+    // If we are using queue streams, there is no need to window
+    if (!useQueueStream)
+    {
+      input.window(Durations.seconds(windowLength), Durations.seconds(windowLength));
+    }
+
+    // Extract the selectors for each dataElement based upon the query type
+    // and perform a keyed hash of the selectors
+    JavaPairDStream<Integer,List<BigInteger>> selectorHashToDocRDD = input.mapToPair(new HashSelectorsAndPartitionData(bVars));
+
+    // Group by hashed selector (row) -- can combine with the line above, separating for testing and benchmarking...
+    JavaPairDStream<Integer,Iterable<List<BigInteger>>> selectorGroupRDD = selectorHashToDocRDD.groupByKey();
+
+    // Calculate the encrypted row values for each row, emit <colNum, colVal> for each row
+    JavaPairDStream<Long,BigInteger> encRowRDD = selectorGroupRDD.flatMapToPair(new EncRowCalc(accum, bVars));
+
+    // Multiply the column values by colNum: emit <colNum, finalColVal> and write the final result object
+    encryptedColumnCalc(encRowRDD);
+
+    // Start the streaming computation
+    start();
+  }
+
+  // Method to compute the final encrypted columns
+  private void encryptedColumnCalc(JavaPairDStream<Long,BigInteger> encRowRDD) throws PIRException
+  {
+    // Multiply the column values by colNum: emit <colNum, finalColVal>
+    JavaPairDStream<Long,BigInteger> encColRDD;
+    if (colMultReduceByKey)
+    {
+      encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(accum, bVars), numColMultPartitions);
+    }
+    else
+    {
+      encColRDD = encRowRDD.groupByKey(numColMultPartitions).mapToPair(new EncColMultGroupedMapper(accum, bVars));
+    }
+
+    // Update the output name, by batch number
+    bVars.setOutput(outputFile + "_" + accum.numBatchesGetValue());
+
+    // Form and write the response object
+    encColRDD.repartition(1).foreachRDD(new VoidFunction<JavaPairRDD<Long,BigInteger>>()
+    {
+      @Override
+      public void call(JavaPairRDD<Long,BigInteger> rdd)
+      {
+        rdd.foreachPartition(new FinalResponseFunction(accum, bVars));
+
+        int maxBatchesVar = bVars.getMaxBatches();
+        if (maxBatchesVar != -1 && accum.numBatchesGetValue() == maxBatchesVar)
+        {
+          logger.info("num batches = maxBatches = " + maxBatchesVar + "; shutting down");
+          System.exit(0);
+        }
+
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/FinalResponseFunction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/FinalResponseFunction.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/FinalResponseFunction.java
new file mode 100644
index 0000000..a9f07bd
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/FinalResponseFunction.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.spark.streaming;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.responder.wideskies.spark.Accumulators;
+import org.apache.pirk.responder.wideskies.spark.BroadcastVars;
+import org.apache.pirk.response.wideskies.Response;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Tuple2;
+
+public class FinalResponseFunction implements VoidFunction<Iterator<Tuple2<Long,BigInteger>>>
+{
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger logger = LoggerFactory.getLogger(FinalResponseFunction.class);
+
+  private BroadcastVars bVars = null;
+  private Accumulators accum = null;
+
+  public FinalResponseFunction(Accumulators accumIn, BroadcastVars bbVarsIn)
+  {
+    bVars = bbVarsIn;
+    accum = accumIn;
+  }
+
+  public void call(Iterator<Tuple2<Long,BigInteger>> iter) throws Exception
+  {
+    // Form the response object
+    QueryInfo queryInfo = bVars.getQueryInfo();
+    Response response = new Response(queryInfo);
+    while (iter.hasNext())
+    {
+      Tuple2<Long,BigInteger> input = iter.next();
+      response.addElement(input._1().intValue(), input._2());
+      logger.debug("colNum = " + input._1().intValue() + " column = " + input._2().toString());
+    }
+
+    // Write out the response
+    FileSystem fs = FileSystem.get(new Configuration());
+    HadoopFileSystemStore storage = new HadoopFileSystemStore(fs);
+    String outputFile = bVars.getOutput();
+    logger.debug("outputFile = " + outputFile);
+    try
+    {
+      storage.store(outputFile, response);
+    } catch (IOException e)
+    {
+      throw new RuntimeException(e);
+    }
+    accum.incNumBatches(1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/schema/data/DataSchema.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchema.java b/src/main/java/org/apache/pirk/schema/data/DataSchema.java
index 9557ce6..8728b96 100644
--- a/src/main/java/org/apache/pirk/schema/data/DataSchema.java
+++ b/src/main/java/org/apache/pirk/schema/data/DataSchema.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.io.Text;
 import org.apache.pirk.schema.data.partitioner.DataPartitioner;
 import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner;
 import org.apache.pirk.utils.PIRException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A data schema describes the target data being referenced by a <code>Querier</code> and a <code>Responder</code>.
@@ -43,6 +45,8 @@ public class DataSchema implements Serializable
 {
   private static final long serialVersionUID = 1L;
 
+  private static final Logger logger = LoggerFactory.getLogger(DataSchema.class);
+
   // This schema's name.
   private final String schemaName;
 
@@ -212,6 +216,7 @@ public class DataSchema implements Serializable
       text = new Text(elementName);
       textRep.put(elementName, text);
     }
+
     return text;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java b/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java
index 1535e1f..7930464 100644
--- a/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java
+++ b/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java
@@ -166,6 +166,9 @@ public class DistributedTestCLI
     tests += "J - JSON/HDFS MapReduce\n";
     tests += "ES - Elasticsearch Spark \n";
     tests += "JS - JSON/HDFS Spark \n";
+    tests += "SS - Spark Streaming Tests \n";
+    tests += "JSS - JSON/HDFS Spark Streaming \n";
+    tests += "ESS - Elasticsearch Spark Streaming \n";
 
     Option optionTestSelection = new Option("t", "tests", true, "optional -- Select which tests to execute: \n" + tests);
     optionTestSelection.setRequired(false);

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java b/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java
index a8cec45..f312932 100755
--- a/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java
+++ b/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java
@@ -88,6 +88,7 @@ public class DistributedTestDriver
     List<JSONObject> dataElements = Inputs.createJSONInput(fs);
 
     String localStopListFile = Inputs.createStopList(fs, true);
+
     SystemConfiguration.setProperty("pir.stopListFile", localStopListFile);
 
     Inputs.createSchemaFiles(fs, true, StopListFilter.class.getName());
@@ -100,11 +101,30 @@ public class DistributedTestDriver
    */
   public static void test(FileSystem fs, DistributedTestCLI cli, List<JSONObject> pirDataElements) throws Exception
   {
+    // MapReduce JSON input
     if (cli.run("1:J"))
     {
       DistTestSuite.testJSONInputMR(fs, pirDataElements);
     }
-    if (cli.run("1:E") || cli.run("1:ES"))
+
+    // Spark with JSON input
+    if (cli.run("1:JS"))
+    {
+      DistTestSuite.testJSONInputSpark(fs, pirDataElements);
+    }
+
+    // Spark Streaming
+    if (cli.run("1:SS"))
+    {
+      DistTestSuite.testSparkStreaming(fs, pirDataElements);
+    }
+    if (cli.run("1:JSS"))
+    {
+      DistTestSuite.testJSONInputSparkStreaming(fs, pirDataElements);
+    }
+
+    // Elasticsearch input
+    if (cli.run("1:E") || cli.run("1:ES") || cli.run("1:ESS"))
     {
       Inputs.createESInput();
       if (cli.run("1:E"))
@@ -115,10 +135,10 @@ public class DistributedTestDriver
       {
         DistTestSuite.testESInputSpark(fs, pirDataElements);
       }
-    }
-    if (cli.run("1:JS"))
-    {
-      DistTestSuite.testJSONInputSpark(fs, pirDataElements);
+      if (cli.run("1:ESS"))
+      {
+        DistTestSuite.testESInputSparkStreaming(fs, pirDataElements);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java
index bc59619..f44815a 100644
--- a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java
+++ b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java
@@ -80,13 +80,13 @@ public class DistTestSuite
     // Run tests
     SystemConfiguration.setProperty("pirTest.embedSelector", "true");
     BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1);
-    BaseTests.testDNSIPQuery(dataElements, fs, false, true, 1);
+    BaseTests.testDNSIPQuery(dataElements, fs, false, true, 1, false);
 
     SystemConfiguration.setProperty("pirTest.embedSelector", "false");
     BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 2);
-    BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2);
+    BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2, false);
 
-    BaseTests.testSRCIPQueryNoFilter(dataElements, fs, false, true, 2);
+    BaseTests.testSRCIPQueryNoFilter(dataElements, fs, false, true, 2, false);
 
     // Test hit limits per selector
     SystemConfiguration.setProperty("pir.limitHitsPerSelector", "true");
@@ -97,8 +97,8 @@ public class DistTestSuite
 
     // Test the local cache for modular exponentiation
     SystemConfiguration.setProperty("pir.useLocalCache", "true");
-    BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2);
-    BaseTests.testSRCIPQuery(dataElements, fs, false, true, 2);
+    BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2, false);
+    BaseTests.testSRCIPQuery(dataElements, fs, false, true, 2, false);
     SystemConfiguration.setProperty("pir.useLocalCache", "false");
 
     // Change query for NXDOMAIN
@@ -114,7 +114,7 @@ public class DistTestSuite
     // In memory table
     SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false");
     SystemConfiguration.setProperty("pirTest.useExpLookupTable", "true");
-    BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2);
+    BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2, false);
 
     // Create exp table in hdfs
     SystemConfiguration.setProperty("mapreduce.map.memory.mb", "10000");
@@ -126,7 +126,7 @@ public class DistTestSuite
     SystemConfiguration.setProperty("pirTest.useExpLookupTable", "false");
     SystemConfiguration.setProperty("pir.expCreationSplits", "50");
     SystemConfiguration.setProperty("pir.numExpLookupPartitions", "150");
-    BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2);
+    BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2, false);
 
     // Reset exp properties
     SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false");
@@ -172,12 +172,12 @@ public class DistTestSuite
     // Run tests
     SystemConfiguration.setProperty("pirTest.embedSelector", "true");
     BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1);
-    BaseTests.testSRCIPQuery(dataElements, fs, false, true, 2);
-    BaseTests.testDNSIPQuery(dataElements, fs, false, true, 1);
+    BaseTests.testSRCIPQuery(dataElements, fs, false, true, 2, false);
+    BaseTests.testDNSIPQuery(dataElements, fs, false, true, 1, false);
 
     SystemConfiguration.setProperty("pirTest.embedSelector", "false");
     BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 2);
-    BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2);
+    BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2, false);
 
     // Change query for NXDOMAIN
     SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:3");
@@ -215,14 +215,14 @@ public class DistTestSuite
     // Run tests
     SystemConfiguration.setProperty("pirTest.embedSelector", "true");
     BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 1);
-    BaseTests.testDNSIPQuery(dataElements, fs, true, true, 1);
+    BaseTests.testDNSIPQuery(dataElements, fs, true, true, 1, false);
 
     SystemConfiguration.setProperty("pirTest.embedSelector", "false");
     BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 2);
-    BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2);
-    BaseTests.testSRCIPQuery(dataElements, fs, true, true, 2);
+    BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2, false);
+    BaseTests.testSRCIPQuery(dataElements, fs, true, true, 2, false);
 
-    BaseTests.testSRCIPQueryNoFilter(dataElements, fs, true, true, 2);
+    BaseTests.testSRCIPQueryNoFilter(dataElements, fs, true, true, 2, false);
 
     // Test embedded QuerySchema
     SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "true");
@@ -253,18 +253,18 @@ public class DistTestSuite
     // Test the local cache for modular exponentiation
     SystemConfiguration.setProperty("pirTest.embedSelector", "true");
     SystemConfiguration.setProperty("pir.useLocalCache", "true");
-    BaseTests.testDNSIPQuery(dataElements, fs, true, true, 3);
+    BaseTests.testDNSIPQuery(dataElements, fs, true, true, 3, false);
 
     // Test the join functionality for the modular exponentiation table
     SystemConfiguration.setProperty("pir.useModExpJoin", "true");
-    BaseTests.testDNSIPQuery(dataElements, fs, true, true, 3);
+    BaseTests.testDNSIPQuery(dataElements, fs, true, true, 3, false);
     SystemConfiguration.setProperty("pir.useModExpJoin", "false");
 
     // Test file based exp lookup table for modular exponentiation
     SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "true");
     SystemConfiguration.setProperty("pir.expCreationSplits", "500");
     SystemConfiguration.setProperty("pir.numExpLookupPartitions", "150");
-    BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2);
+    BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2, false);
     SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false");
 
     // Change query for NXDOMAIN
@@ -303,12 +303,12 @@ public class DistTestSuite
     // Run tests
     SystemConfiguration.setProperty("pirTest.embedSelector", "true");
     BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 1);
-    BaseTests.testDNSIPQuery(dataElements, fs, true, true, 1);
-    BaseTests.testSRCIPQuery(dataElements, fs, true, true, 2);
+    BaseTests.testDNSIPQuery(dataElements, fs, true, true, 1, false);
+    BaseTests.testSRCIPQuery(dataElements, fs, true, true, 2, false);
 
     SystemConfiguration.setProperty("pirTest.embedSelector", "false");
     BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 2);
-    BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2);
+    BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2, false);
 
     // Change query for NXDOMAIN
     SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:3");
@@ -321,9 +321,89 @@ public class DistTestSuite
     logger.info("Completed testESInputSpark");
   }
 
+  public static void testSparkStreaming(FileSystem fs, List<JSONObject> pirDataElements) throws Exception
+  {
+    testJSONInputSparkStreaming(fs, pirDataElements);
+    testESInputSparkStreaming(fs, pirDataElements);
+  }
+
+  public static void testJSONInputSparkStreaming(FileSystem fs, List<JSONObject> pirDataElements) throws Exception
+  {
+    logger.info("Starting testJSONInputSparkStreaming");
+
+    SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false");
+    SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1000");
+
+    SystemConfiguration.setProperty("pir.numColMultPartitions", "20");
+    SystemConfiguration.setProperty("pir.colMultReduceByKey", "false");
+
+    SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false");
+    SystemConfiguration.setProperty("pir.embedQuerySchema", "false");
+
+    SystemConfiguration.setProperty("pirTest.embedSelector", "true");
+
+    SystemConfiguration.setProperty("pir.numDataPartitions", "3");
+
+    SystemConfiguration.setProperty("pir.sparkstreaming.batchSeconds", "30");
+    SystemConfiguration.setProperty("pir.sparkstreaming.windowLength", "60");
+    SystemConfiguration.setProperty("pir.sparkstreaming.useQueueStream", "true");
+    SystemConfiguration.setProperty("pir.sparkstreaming.maxBatches", "1");
+
+    SystemConfiguration.setProperty("spark.streaming.stopGracefullyOnShutdown", "false");
+
+    // Set up JSON configs
+    SystemConfiguration.setProperty("pir.dataInputFormat", InputFormatConst.BASE_FORMAT);
+    SystemConfiguration.setProperty("pir.inputData", SystemConfiguration.getProperty(DistributedTestDriver.JSON_PIR_INPUT_FILE_PROPERTY));
+    SystemConfiguration.setProperty("pir.baseQuery", "?q=rcode:0");
+
+    // Run tests
+    BaseTests.testDNSHostnameQuery(pirDataElements, fs, true, true, 1, false, true);
+    BaseTests.testDNSIPQuery(pirDataElements, fs, true, true, 1, true);
+    BaseTests.testSRCIPQuery(pirDataElements, fs, true, true, 3, true);
+    BaseTests.testSRCIPQueryNoFilter(pirDataElements, fs, true, true, 2, true);
+
+    logger.info("Completed testJSONInputSparkStreaming");
+  }
+
+  public static void testESInputSparkStreaming(FileSystem fs, List<JSONObject> pirDataElements) throws Exception
+  {
+    logger.info("Starting testESInputSparkStreaming");
+
+    SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false");
+    SystemConfiguration.setProperty("pirTest.useExpLookupTable", "false");
+
+    SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false");
+    SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1000");
+
+    SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false");
+    SystemConfiguration.setProperty("pir.embedQuerySchema", "false");
+
+    SystemConfiguration.setProperty("pir.sparkstreaming.batchSeconds", "30");
+    SystemConfiguration.setProperty("pir.sparkstreaming.windowLength", "60");
+    SystemConfiguration.setProperty("pir.sparkstreaming.useQueueStream", "true");
+    SystemConfiguration.setProperty("pir.sparkstreaming.maxBatches", "1");
+
+    SystemConfiguration.setProperty("spark.streaming.stopGracefullyOnShutdown", "false");
+
+    // Set up ES configs
+    SystemConfiguration.setProperty("pir.dataInputFormat", InputFormatConst.ES);
+    SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:0");
+    SystemConfiguration.setProperty("pir.esResource", SystemConfiguration.getProperty(DistributedTestDriver.ES_INPUT_RESOURCE_PROPERTY));
+
+    // Run tests
+    SystemConfiguration.setProperty("pirTest.embedSelector", "true");
+    BaseTests.testDNSHostnameQuery(pirDataElements, fs, true, true, 1, false, true);
+    BaseTests.testDNSIPQuery(pirDataElements, fs, true, true, 1, true);
+    BaseTests.testSRCIPQuery(pirDataElements, fs, true, true, 3, true);
+
+    logger.info("Completed testESInputSparkStreaming");
+  }
+
   // Base method to perform query
-  public static List<QueryResponseJSON> performQuery(String queryType, ArrayList<String> selectors, FileSystem fs, boolean isSpark, int numThreads)
-      throws Exception
+  // TODO: This could be changed to pass in the platform instead of isSpark and isStreaming...
+  @SuppressWarnings("unused")
+  public static List<QueryResponseJSON> performQuery(String queryType, ArrayList<String> selectors, FileSystem fs, boolean isSpark, int numThreads,
+      boolean isStreaming) throws Exception
   {
     logger.info("performQuery: ");
 
@@ -379,11 +459,28 @@ public class DistTestSuite
     logger.info("Performing encrypted query:");
     if (isSpark)
     {
+      logger.info("spark.home = " + SystemConfiguration.getProperty("spark.home"));
+
       // Build args
       String inputFormat = SystemConfiguration.getProperty("pir.dataInputFormat");
       logger.info("inputFormat = " + inputFormat);
       ArrayList<String> args = new ArrayList<>();
-      args.add("-" + ResponderProps.PLATFORM + "=spark");
+      if (isStreaming)
+      {
+        logger.info("platform = sparkstreaming");
+        args.add("-" + ResponderProps.PLATFORM + "=sparkstreaming");
+        args.add("-" + ResponderProps.BATCHSECONDS + "=" + SystemConfiguration.getProperty("pir.sparkstreaming.batchSeconds", "30"));
+        args.add("-" + ResponderProps.WINDOWLENGTH + "=" + SystemConfiguration.getProperty("pir.sparkstreaming.windowLength", "60"));
+        args.add("-" + ResponderProps.MAXBATCHES + "=" + SystemConfiguration.getProperty("pir.sparkstreaming.maxBatches", "-1"));
+        args.add("-" + ResponderProps.STOPGRACEFULLY + "=" + SystemConfiguration.getProperty("spark.streaming.stopGracefullyOnShutdown", "false"));
+        args.add("-" + ResponderProps.NUMDATAPARTITIONS + "=" + SystemConfiguration.getProperty("pir.numDataPartitions", "3"));
+        args.add("-" + ResponderProps.USEQUEUESTREAM + "=" + SystemConfiguration.getProperty("pir.sparkstreaming.useQueueStream", "false"));
+      }
+      else
+      {
+        logger.info("platform = spark");
+        args.add("-" + ResponderProps.PLATFORM + "=spark");
+      }
       args.add("-" + ResponderProps.DATAINPUTFORMAT + "=" + inputFormat);
       args.add("-" + ResponderProps.QUERYINPUT + "=" + SystemConfiguration.getProperty("pir.queryInput"));
       args.add("-" + ResponderProps.OUTPUTFILE + "=" + SystemConfiguration.getProperty("pir.outputFile"));
@@ -436,6 +533,11 @@ public class DistTestSuite
     // Perform decryption
     // Reconstruct the necessary objects from the files
     logger.info("Performing decryption; writing final results file");
+    if (isStreaming)
+    {
+      outputFile = outputFile + "_0"; // currently only processing one batch for testing
+    }
+    logger.info("Pulling results from outputFile = " + outputFile);
     Response response = new HadoopFileSystemStore(fs).recall(outputFile, Response.class);
 
     // Perform decryption and output the result file

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/test/utils/BaseTests.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/test/utils/BaseTests.java b/src/main/java/org/apache/pirk/test/utils/BaseTests.java
index c1fa1e9..962e467 100644
--- a/src/main/java/org/apache/pirk/test/utils/BaseTests.java
+++ b/src/main/java/org/apache/pirk/test/utils/BaseTests.java
@@ -62,18 +62,18 @@ public class BaseTests
 
   public static void testDNSHostnameQuery(ArrayList<JSONObject> dataElements, int numThreads, boolean testFalsePositive) throws Exception
   {
-    testDNSHostnameQuery(dataElements, null, false, false, numThreads, testFalsePositive);
+    testDNSHostnameQuery(dataElements, null, false, false, numThreads, testFalsePositive, false);
   }
 
   public static void testDNSHostnameQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads)
       throws Exception
   {
-    testDNSHostnameQuery(dataElements, fs, isSpark, isDistributed, numThreads, false);
+    testDNSHostnameQuery(dataElements, fs, isSpark, isDistributed, numThreads, false, false);
   }
 
   // Query for the watched hostname occurred; ; watched value type: hostname (String)
   public static void testDNSHostnameQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads,
-      boolean testFalsePositive) throws Exception
+      boolean testFalsePositive, boolean isStreaming) throws Exception
   {
     logger.info("Running testDNSHostnameQuery(): ");
 
@@ -83,7 +83,7 @@ public class BaseTests
     List<QueryResponseJSON> results;
     if (isDistributed)
     {
-      results = DistTestSuite.performQuery(Inputs.DNS_HOSTNAME_QUERY, selectorsDomain, fs, isSpark, numThreads);
+      results = DistTestSuite.performQuery(Inputs.DNS_HOSTNAME_QUERY, selectorsDomain, fs, isSpark, numThreads, isStreaming);
     }
     else
     {
@@ -193,11 +193,12 @@ public class BaseTests
 
   public static void testDNSIPQuery(ArrayList<JSONObject> dataElements, int numThreads) throws Exception
   {
-    testDNSIPQuery(dataElements, null, false, false, numThreads);
+    testDNSIPQuery(dataElements, null, false, false, numThreads, false);
   }
 
   // The watched IP address was detected in the response to a query; watched value type: IP address (String)
-  public static void testDNSIPQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception
+  public static void testDNSIPQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads, boolean isStreaming)
+      throws Exception
   {
     logger.info("Running testDNSIPQuery(): ");
 
@@ -206,7 +207,7 @@ public class BaseTests
 
     if (isDistributed)
     {
-      results = DistTestSuite.performQuery(Inputs.DNS_IP_QUERY, selectorsIP, fs, isSpark, numThreads);
+      results = DistTestSuite.performQuery(Inputs.DNS_IP_QUERY, selectorsIP, fs, isSpark, numThreads, isStreaming);
 
       if (results.size() != 5)
       {
@@ -280,7 +281,7 @@ public class BaseTests
 
     if (isDistributed)
     {
-      results = DistTestSuite.performQuery(Inputs.DNS_NXDOMAIN_QUERY, selectorsDomain, fs, isSpark, numThreads);
+      results = DistTestSuite.performQuery(Inputs.DNS_NXDOMAIN_QUERY, selectorsDomain, fs, isSpark, numThreads, false);
     }
     else
     {
@@ -330,11 +331,12 @@ public class BaseTests
 
   public static void testSRCIPQuery(ArrayList<JSONObject> dataElements, int numThreads) throws Exception
   {
-    testSRCIPQuery(dataElements, null, false, false, numThreads);
+    testSRCIPQuery(dataElements, null, false, false, numThreads, false);
   }
 
   // Query for responses from watched srcIPs
-  public static void testSRCIPQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception
+  public static void testSRCIPQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads, boolean isStreaming)
+      throws Exception
   {
     logger.info("Running testSRCIPQuery(): ");
 
@@ -345,7 +347,7 @@ public class BaseTests
     int numExpectedResults = 1;
     if (isDistributed)
     {
-      results = DistTestSuite.performQuery(Inputs.DNS_SRCIP_QUERY, selectorsIP, fs, isSpark, numThreads);
+      results = DistTestSuite.performQuery(Inputs.DNS_SRCIP_QUERY, selectorsIP, fs, isSpark, numThreads, isStreaming);
       removeTailElements = 2; // The last two elements are on the distributed stoplist
     }
     else
@@ -406,8 +408,8 @@ public class BaseTests
   }
 
   // Query for responses from watched srcIPs
-  public static void testSRCIPQueryNoFilter(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads)
-      throws Exception
+  public static void testSRCIPQueryNoFilter(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads,
+      boolean isStreaming) throws Exception
   {
     logger.info("Running testSRCIPQueryNoFilter(): ");
 
@@ -417,7 +419,7 @@ public class BaseTests
     int numExpectedResults = 3;
     if (isDistributed)
     {
-      results = DistTestSuite.performQuery(Inputs.DNS_SRCIP_QUERY_NO_FILTER, selectorsIP, fs, isSpark, numThreads);
+      results = DistTestSuite.performQuery(Inputs.DNS_SRCIP_QUERY_NO_FILTER, selectorsIP, fs, isSpark, numThreads, isStreaming);
     }
     else
     {

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/test/utils/Inputs.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/test/utils/Inputs.java b/src/main/java/org/apache/pirk/test/utils/Inputs.java
index 587ac99..b6e7251 100644
--- a/src/main/java/org/apache/pirk/test/utils/Inputs.java
+++ b/src/main/java/org/apache/pirk/test/utils/Inputs.java
@@ -207,8 +207,7 @@ public class Inputs
 
     dataElementsJSON.add(jsonObj7);
 
-    // This should never be returned - doesn't hit on any domain selectors
-    // resolution ip on stoplist
+    // Doesn't hit on any domain selectors; resolution ip on stoplist
     JSONObject jsonObj8 = new JSONObject();
     jsonObj8.put(DATE, "2016-02-20T23:29:12.000Z");
     jsonObj8.put(QNAME, "something.else2");
@@ -220,7 +219,7 @@ public class Inputs
 
     dataElementsJSON.add(jsonObj8);
 
-    // This should never be returned in distributed case -- domain and resolution ip on stoplist
+    // Domain and resolution ip on stoplist
     JSONObject jsonObj9 = new JSONObject();
     jsonObj9.put(DATE, "2016-02-20T23:29:13.000Z");
     jsonObj9.put(QNAME, "something.else.on.stoplist");
@@ -340,14 +339,14 @@ public class Inputs
         "{\"qname\":\"something.else\",\"date\":\"2016-02-20T23:29:11.000Z\",\"qtype\":[\"1\"]"
             + ",\"rcode\":\"0\",\"src_ip\":\"1.1.1.1\",\"dest_ip\":\"2.2.2.2\"" + ",\"ip\":[\"3.3.3.3\"]}");
 
-    // Never should be returned - doesn't hit on any selectors
+    // Doesn't hit on any domain selectors; resolution ip on stoplist
     String indexTypeNum8 = esTestIndex + "/" + esType + "/8";
     logger.info("indexTypeNum8 = " + indexTypeNum8);
     ProcessBuilder pAdd8 = new ProcessBuilder("curl", "-XPUT", indexTypeNum8, "-d",
         "{\"qname\":\"something.else2\",\"date\":\"2016-02-20T23:29:12.000Z\",\"qtype\":[\"1\"]"
-            + ",\"rcode\":\"0\",\"src_ip\":\"1.1.1.12\",\"dest_ip\":\"2.2.2.22\"" + ",\"ip\":[\"3.3.3.32\"]}");
+            + ",\"rcode\":\"0\",\"src_ip\":\"5.6.7.8\",\"dest_ip\":\"2.2.2.22\"" + ",\"ip\":[\"3.3.3.132\"]}");
 
-    // This should never be returned -- domain on stoplist
+    // Domain on stoplist
     String indexTypeNum9 = esTestIndex + "/" + esType + "/9";
     logger.info("indexTypeNum9 = " + indexTypeNum9);
     ProcessBuilder pAdd9 = new ProcessBuilder("curl", "-XPUT", indexTypeNum9, "-d",

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/utils/SystemConfiguration.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/pirk/utils/SystemConfiguration.java b/src/main/java/org/apache/pirk/utils/SystemConfiguration.java
index 4146e5b..8cb5d17 100755
--- a/src/main/java/org/apache/pirk/utils/SystemConfiguration.java
+++ b/src/main/java/org/apache/pirk/utils/SystemConfiguration.java
@@ -117,6 +117,23 @@ public class SystemConfiguration
   }
 
   /**
+   * Gets the specified property as an <code>long</code>, or the default value if the property isn't found.
+   * 
+   * @param propertyName
+   *          The name of the requested long property value.
+   * @param defaultValue
+   *          The value to return if the property is undefined.
+   * @return The value of the requested property, or the default value if the property is undefined.
+   * @throws NumberFormatException
+   *           If the property does not contain a parsable <code>long</code> value.
+   */
+  public static long getLongProperty(String propertyName, long defaultValue)
+  {
+    String value = props.getProperty(propertyName);
+    return (value == null) ? defaultValue : Long.parseLong(value);
+  }
+
+  /**
    * Gets the specified property as a <code>boolean</code>, or the default value if the property isn't defined.
    * 
    * @param propertyName
@@ -207,6 +224,7 @@ public class SystemConfiguration
    */
   public static void loadPropsFromDir(String dirName)
   {
+    logger.info("Loading properties from dirName = " + dirName);
     File[] directoryListing = new File(dirName).listFiles(new FilenameFilter()
     {
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/resources/pirk.properties
----------------------------------------------------------------------
diff --git a/src/main/resources/pirk.properties b/src/main/resources/pirk.properties
index f8efea7..963fa34 100755
--- a/src/main/resources/pirk.properties
+++ b/src/main/resources/pirk.properties
@@ -81,7 +81,7 @@ query.schemas = none
 ##
 
 #ES host address - One Elasticsearch node in the cluster - may include port specification
-es.nodes= none
+es.nodes=none
           
 #Default HTTP/REST port used for connecting to Elasticsearch 
 es.port=9200
@@ -106,7 +106,7 @@ test.es.type = pkt
 
 #Elasticsearch resource - Elasticsearch resource location where data is read and written to. 
 #Requires the format <index>/<type> 
-test.es.resource= none
+test.es.resource= /testindex/pkt
 
 #Pathname in hdfs to place input JSON file testing
 test.inputJSONFile = /tmp/testJSONInput

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/resources/responder.properties
----------------------------------------------------------------------
diff --git a/src/main/resources/responder.properties b/src/main/resources/responder.properties
index 11ad7f6..3ae92c7 100644
--- a/src/main/resources/responder.properties
+++ b/src/main/resources/responder.properties
@@ -27,7 +27,7 @@ pir.dataInputFormat=
 #outputFile -- required -- Fully qualified name of output file in hdfs
 pir.outputFile=
 
-#platform -- required -- 'mapreduce', 'spark', or 'standalone'
+#platform -- required -- 'mapreduce', 'spark', 'sparkstreaming', or 'standalone'
 #Processing platform technology for the responder                
 platform= 
 
@@ -134,4 +134,24 @@ pir.queryInput=
 
 #numExpLookupPartitions -- optional -- Number of partitions for the exp lookup table
 #pir.numExpLookupPartitions=
+
+##Props for Spark Streaming
+
+#batchSeconds - optional - Batch size (in seconds) for Spark Streaming - defaults to 30 sec
+#pir.sparkstreaming.batchSeconds=
+
+#windowLength - optional - Window size (in seconds) for Spark Streaming - defaults to 60 sec
+#pir.sparkstreaming.windowLength=
+
+#queueStream - optional - Use queue stream for Spark Streaming - defaults to false
+#pir.sparkstreaming.useQueueStream=
+
+#pir.sparkstreaming.maxBatches - optional - Spark Streaming - Max number of batches to process 
+#defaults to -1 (no maximum)
+#pir.sparkstreaming.maxBatches=
+
+#spark.streaming.stopGracefullyOnShutdown - Spark Streaming - Whether or not to stop 'gracefully' during shutdown
+#default is false
+#spark.streaming.stopGracefullyOnShutdown=
+
  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java b/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java
index 9ac2522..bb70153 100644
--- a/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java
+++ b/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java
@@ -66,11 +66,6 @@ public class QueryParserUtilsTest
     Inputs.createSchemaFiles(null, false, null);
 
     dSchema = DataSchemaRegistry.get(Inputs.TEST_DATA_SCHEMA_NAME);
-
-    // ProcessBuilder pAdd1 = new ProcessBuilder("curl", "-XPUT", indexTypeNum1, "-d",
-    // "{\"qname\":\"a.b.c.com\",\"date\":\"2016-02-20T23:29:05.000Z\",\"qtype\":[\"1\"]"
-    // + ",\"rcode\":\"0\",\"src_ip\":\"55.55.55.55\",\"dest_ip\":\"1.2.3.6\"" + ",\"ip\":[\"10.20.30.40\",\"10.20.30.60\"]}");
-    //
     doc = StringUtils.jsonStringToMapWritableWithArrayWritable(dataElementsJSON.get(0).toJSONString(), dSchema);
     docWAW = StringUtils.jsonStringToMapWritableWithWritableArrayWritable(dataElementsJSON.get(0).toJSONString(), dSchema);
     docMap = StringUtils.jsonStringToMap(dataElementsJSON.get(0).toJSONString(), dSchema);

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/test/java/org/apache/pirk/schema/query/LoadQuerySchemaTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/pirk/schema/query/LoadQuerySchemaTest.java b/src/test/java/org/apache/pirk/schema/query/LoadQuerySchemaTest.java
index a801b01..0b09fa9 100644
--- a/src/test/java/org/apache/pirk/schema/query/LoadQuerySchemaTest.java
+++ b/src/test/java/org/apache/pirk/schema/query/LoadQuerySchemaTest.java
@@ -305,8 +305,8 @@ public class LoadQuerySchemaTest
   // Create the stoplist file and alter the properties accordingly
   private void createStopListFile() throws IOException, PIRException
   {
-    SystemConfiguration.setProperty("pir.stopListFile", "testStopListFile");
     String newSLFile = Inputs.createStopList(null, false);
+
     SystemConfiguration.setProperty("pir.stopListFile", newSLFile);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java b/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java
index 2144ee1..cb65a60 100644
--- a/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java
+++ b/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java
@@ -62,9 +62,10 @@ public class StandaloneTest
 
     // Create the stoplist file
     stopListFileProp = SystemConfiguration.getProperty("pir.stopListFile");
-    SystemConfiguration.setProperty("pir.stopListFile", STOPLIST_FILE);
+
     String newSLFile = Inputs.createStopList(null, false);
     SystemConfiguration.setProperty("pir.stopListFile", newSLFile);
+
     logger.info("stopListFileProp = " + stopListFileProp + " new prop = " + SystemConfiguration.getProperty("pir.stopListFile"));
 
     // Create data and query schemas


Mime
View raw message