Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 07B60200B70 for ; Sat, 27 Aug 2016 17:17:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EAC68160AB0; Sat, 27 Aug 2016 15:17:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6362B160A93 for ; Sat, 27 Aug 2016 17:17:10 +0200 (CEST) Received: (qmail 78361 invoked by uid 500); 27 Aug 2016 15:17:09 -0000 Mailing-List: contact commits-help@pirk.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pirk.incubator.apache.org Delivered-To: mailing list commits@pirk.incubator.apache.org Received: (qmail 78346 invoked by uid 99); 27 Aug 2016 15:17:09 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 27 Aug 2016 15:17:09 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id B1B33C012E for ; Sat, 27 Aug 2016 15:17:08 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.739 X-Spam-Level: X-Spam-Status: No, score=-3.739 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.519] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id YZch51LiJ9Yp for ; Sat, 27 Aug 2016 15:17:03 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 874E15F1F0 for ; Sat, 27 Aug 2016 15:17:02 +0000 (UTC) Received: (qmail 78313 invoked by uid 99); 27 Aug 2016 15:17:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 27 Aug 2016 15:17:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CC7EADFF57; Sat, 27 Aug 2016 15:17:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: eawilliams@apache.org To: commits@pirk.incubator.apache.org Message-Id: <0954035775b24bab8d066ff9ac597a55@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-pirk git commit: [PIRK-21] - Initial Spark Streaming Responder Implementation -- closes apache/incubator-pirk#76 Date: Sat, 27 Aug 2016 15:17:01 +0000 (UTC) archived-at: Sat, 27 Aug 2016 15:17:13 -0000 Repository: incubator-pirk Updated Branches: refs/heads/master 4bb94cec3 -> 3957be411 [PIRK-21] - Initial Spark Streaming Responder Implementation -- closes apache/incubator-pirk#76 Project: http://git-wip-us.apache.org/repos/asf/incubator-pirk/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-pirk/commit/3957be41 Tree: http://git-wip-us.apache.org/repos/asf/incubator-pirk/tree/3957be41 Diff: http://git-wip-us.apache.org/repos/asf/incubator-pirk/diff/3957be41 Branch: refs/heads/master Commit: 3957be41109ec6ffab5f2702da16ee035c481ef7 Parents: 4bb94ce Author: eawilliams Authored: Sat Aug 27 11:16:43 2016 -0400 Committer: eawilliams Committed: Sat Aug 27 11:16:43 2016 -0400 ---------------------------------------------------------------------- pom.xml | 15 +- .../pirk/querier/wideskies/QuerierDriver.java | 2 +- .../pirk/responder/wideskies/ResponderCLI.java | 40 ++ .../responder/wideskies/ResponderDriver.java | 51 ++ .../responder/wideskies/ResponderProps.java | 57 ++- .../wideskies/common/ComputeEncryptedRow.java | 4 +- .../responder/wideskies/spark/Accumulators.java | 15 +- .../wideskies/spark/BroadcastVars.java | 30 +- .../wideskies/spark/ComputeResponse.java | 40 +- .../spark/EncColMultGroupedMapper.java | 2 +- .../wideskies/spark/EncColMultReducer.java | 2 +- .../responder/wideskies/spark/EncRowCalc.java | 5 +- .../responder/wideskies/spark/FilterData.java | 6 +- .../streaming/ComputeStreamingResponse.java | 467 +++++++++++++++++++ .../spark/streaming/FinalResponseFunction.java | 79 ++++ .../org/apache/pirk/schema/data/DataSchema.java | 5 + .../test/distributed/DistributedTestCLI.java | 3 + .../test/distributed/DistributedTestDriver.java | 30 +- .../distributed/testsuite/DistTestSuite.java | 148 +++++- .../org/apache/pirk/test/utils/BaseTests.java | 30 +- .../java/org/apache/pirk/test/utils/Inputs.java | 11 +- .../apache/pirk/utils/SystemConfiguration.java | 18 + src/main/resources/pirk.properties | 4 +- src/main/resources/responder.properties | 22 +- .../pirk/general/QueryParserUtilsTest.java | 5 - .../pirk/schema/query/LoadQuerySchemaTest.java | 2 +- .../wideskies/standalone/StandaloneTest.java | 3 +- 27 files changed, 987 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 09c1faa..b46325e 100644 --- a/pom.xml +++ b/pom.xml @@ -87,9 +87,9 @@ 2.7.2 3.3 2.3.3 + 2.0.0 1C true - 2.8.1 @@ -217,6 +217,12 @@ + + org.apache.spark + spark-streaming_2.10 + ${spark-streaming.version} + + com.squareup.jnagmp @@ -258,13 +264,6 @@ 2.6.2 - - - com.fasterxml.jackson.core - jackson-core - ${jackson.version} - - http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java index d91fa2d..4f26a71 100644 --- a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java +++ b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java @@ -113,7 +113,7 @@ public class QuerierDriver implements Serializable { queryType = SystemConfiguration.getProperty(QuerierProps.QUERYTYPE); hashBitSize = Integer.parseInt(SystemConfiguration.getProperty(QuerierProps.HASHBITSIZE)); - hashKey = SystemConfiguration.getProperty(QuerierProps.HASHBITSIZE); + hashKey = SystemConfiguration.getProperty(QuerierProps.HASHKEY); dataPartitionBitSize = Integer.parseInt(SystemConfiguration.getProperty(QuerierProps.DATAPARTITIONSIZE)); paillierBitSize = Integer.parseInt(SystemConfiguration.getProperty(QuerierProps.PAILLIERBITSIZE)); certainty = Integer.parseInt(SystemConfiguration.getProperty(QuerierProps.CERTAINTY)); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java index ff43be6..0c031ec 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java @@ -377,6 +377,46 @@ public class ResponderCLI optionAllowEmbeddedQS.setType(String.class); options.addOption(optionAllowEmbeddedQS); + // batchSeconds - spark streaming + Option optionBatchSeconds = new Option("batchSeconds", ResponderProps.BATCHSECONDS, true, + "optional -- Number of seconds per batch in Spark Streaming; defaults to 30"); + optionBatchSeconds.setRequired(false); + optionBatchSeconds.setArgName(ResponderProps.BATCHSECONDS); + optionBatchSeconds.setType(String.class); + options.addOption(optionBatchSeconds); + + // windowLength - spark streaming + Option optionWindowLength = new Option("windowLength", ResponderProps.WINDOWLENGTH, true, + "optional -- Number of seconds per window in Spark Streaming; defaults to 60"); + optionWindowLength.setRequired(false); + optionWindowLength.setArgName(ResponderProps.WINDOWLENGTH); + optionWindowLength.setType(String.class); + options.addOption(optionWindowLength); + + // maxBatches - spark streaming + Option optionMaxBatches = new Option("maxBatches", ResponderProps.MAXBATCHES, true, + "optional -- Max batches to process in Spark Streaming; defaults to -1 - unlimited"); + optionMaxBatches.setRequired(false); + optionMaxBatches.setArgName(ResponderProps.MAXBATCHES); + optionMaxBatches.setType(String.class); + options.addOption(optionMaxBatches); + + // stopGracefully - spark streaming + Option optionStopGracefully = new Option("stopGracefully", ResponderProps.STOPGRACEFULLY, true, + "optional -- Whether or not to stop gracefully in Spark Streaming; defaults to false"); + optionStopGracefully.setRequired(false); + optionStopGracefully.setArgName(ResponderProps.STOPGRACEFULLY); + optionStopGracefully.setType(String.class); + options.addOption(optionStopGracefully); + + // useQueueStream - spark streaming + Option optionUseQueueStream = new Option("queueStream", ResponderProps.USEQUEUESTREAM, true, + "optional -- Whether or not to use a queue stream in Spark Streaming; defaults to false"); + optionUseQueueStream.setRequired(false); + optionUseQueueStream.setArgName(ResponderProps.USEQUEUESTREAM); + optionUseQueueStream.setType(String.class); + options.addOption(optionUseQueueStream); + return options; } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java index da24ae4..6b32418 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java @@ -18,12 +18,15 @@ */ package org.apache.pirk.responder.wideskies; +import java.security.Permission; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.util.ToolRunner; import org.apache.pirk.query.wideskies.Query; import org.apache.pirk.responder.wideskies.mapreduce.ComputeResponseTool; import org.apache.pirk.responder.wideskies.spark.ComputeResponse; +import org.apache.pirk.responder.wideskies.spark.streaming.ComputeStreamingResponse; import org.apache.pirk.responder.wideskies.standalone.Responder; import org.apache.pirk.serialization.LocalFileSystemStore; import org.apache.pirk.utils.SystemConfiguration; @@ -50,6 +53,9 @@ public class ResponderDriver { ResponderCLI responderCLI = new ResponderCLI(args); + // For handling System.exit calls from Spark Streaming + System.setSecurityManager(new SystemExitManager()); + if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("mapreduce")) { logger.info("Launching MapReduce ResponderTool:"); @@ -65,6 +71,25 @@ public class ResponderDriver ComputeResponse computeResponse = new ComputeResponse(fs); computeResponse.performQuery(); } + else if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("sparkstreaming")) + { + logger.info("Launching Spark ComputeStreamingResponse:"); + + FileSystem fs = FileSystem.get(new Configuration()); + ComputeStreamingResponse computeSR = new ComputeStreamingResponse(fs); + try + { + computeSR.performQuery(); + } catch (SystemExitException e) + { + // If System.exit(0) is not caught from Spark Streaming, + // the application will complete with a 'failed' status + logger.info("Exited with System.exit(0) from Spark Streaming"); + } + + // Teardown the context + computeSR.teardown(); + } else if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("standalone")) { logger.info("Launching Standalone Responder:"); @@ -76,4 +101,30 @@ public class ResponderDriver pirResponder.computeStandaloneResponse(); } } + + // Exception and Security Manager classes used to catch System.exit from Spark Streaming + private static class SystemExitException extends SecurityException + {} + + private static class SystemExitManager extends SecurityManager + { + @Override + public void checkPermission(Permission perm) + {} + + @Override + public void checkExit(int status) + { + super.checkExit(status); + if (status == 0) // If we exited cleanly, throw SystemExitException + { + throw new SystemExitException(); + } + else + { + throw new SecurityException(); + } + + } + } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java index a9f8fae..4fee85a 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java @@ -21,6 +21,7 @@ package org.apache.pirk.responder.wideskies; import java.util.Arrays; import java.util.List; +import org.apache.commons.cli.Option; import org.apache.pirk.inputformat.hadoop.InputFormatConst; import org.apache.pirk.schema.data.DataSchemaLoader; import org.apache.pirk.schema.query.QuerySchemaLoader; @@ -59,19 +60,26 @@ public class ResponderProps public static final String NUMCOLMULTPARTITIONS = "pir.numColMultPartitions"; public static final String USEMODEXPJOIN = "pir.useModExpJoin"; public static final String COLMULTREDUCEBYKEY = "pir.colMultReduceByKey"; - static final String NUMREDUCETASKS = "pir.numReduceTasks"; - static final String MAPMEMORY = "mapreduce.map.memory.mb"; - static final String REDUCEMEMORY = "mapreduce.reduce.memory.mb"; - static final String MAPJAVAOPTS = "mapreduce.map.java.opts"; - static final String REDUCEJAVAOPTS = "mapreduce.reduce.java.opts"; - static final String USEHDFSLOOKUPTABLE = "pir.useHDFSLookupTable"; - static final String NUMDATAPARTITIONS = "pir.numDataPartitions"; - static final String ALLOWEMBEDDEDQUERYSCHEMAS = "pir.allowEmbeddedQuerySchemas"; + public static final String NUMREDUCETASKS = "pir.numReduceTasks"; + public static final String MAPMEMORY = "mapreduce.map.memory.mb"; + public static final String REDUCEMEMORY = "mapreduce.reduce.memory.mb"; + public static final String MAPJAVAOPTS = "mapreduce.map.java.opts"; + public static final String REDUCEJAVAOPTS = "mapreduce.reduce.java.opts"; + public static final String USEHDFSLOOKUPTABLE = "pir.useHDFSLookupTable"; + public static final String NUMDATAPARTITIONS = "pir.numDataPartitions"; + public static final String ALLOWEMBEDDEDQUERYSCHEMAS = "pir.allowEmbeddedQuerySchemas"; + + // For Spark Streaming - optional + public static final String BATCHSECONDS = "pir.sparkstreaming.batchSeconds"; + public static final String WINDOWLENGTH = "pir.sparkstreaming.windowLength"; + public static final String USEQUEUESTREAM = "pir.sparkstreaming.useQueueStream"; + public static final String MAXBATCHES = "pir.sparkstreaming.maxBatches"; + public static final String STOPGRACEFULLY = "spark.streaming.stopGracefullyOnShutdown"; static final List PROPSLIST = Arrays.asList(PLATFORM, QUERYINPUT, DATAINPUTFORMAT, INPUTDATA, BASEQUERY, ESRESOURCE, ESQUERY, ESNODES, ESPORT, OUTPUTFILE, BASEINPUTFORMAT, STOPLISTFILE, NUMREDUCETASKS, USELOCALCACHE, LIMITHITSPERSELECTOR, MAXHITSPERSELECTOR, MAPMEMORY, REDUCEMEMORY, MAPJAVAOPTS, REDUCEJAVAOPTS, QUERYSCHEMAS, DATASCHEMAS, NUMEXPLOOKUPPARTS, USEHDFSLOOKUPTABLE, NUMDATAPARTITIONS, NUMCOLMULTPARTITIONS, USEMODEXPJOIN, - COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS); + COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS, BATCHSECONDS, WINDOWLENGTH, USEQUEUESTREAM, MAXBATCHES, STOPGRACEFULLY); /** * Validates the responder properties @@ -90,7 +98,7 @@ public class ResponderProps } String platform = SystemConfiguration.getProperty(PLATFORM).toLowerCase(); - if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("standalone")) + if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("sparkstreaming") && !platform.equals("standalone")) { logger.info("Unsupported platform: " + platform); valid = false; @@ -176,7 +184,7 @@ public class ResponderProps valid = false; } - // Parse optional properties with defaults + // Parse optional properties if (SystemConfiguration.hasProperty(QUERYSCHEMAS)) { @@ -188,6 +196,8 @@ public class ResponderProps SystemConfiguration.appendProperty("data.schemas", SystemConfiguration.getProperty(DATASCHEMAS)); } + // Parse optional properties with defaults + if (!SystemConfiguration.hasProperty(USEHDFSLOOKUPTABLE)) { SystemConfiguration.setProperty(USEHDFSLOOKUPTABLE, "false"); @@ -223,6 +233,31 @@ public class ResponderProps SystemConfiguration.setProperty(USELOCALCACHE, "true"); } + if (!SystemConfiguration.hasProperty(BATCHSECONDS)) + { + SystemConfiguration.setProperty(BATCHSECONDS, "30"); + } + + if (!SystemConfiguration.hasProperty(WINDOWLENGTH)) + { + SystemConfiguration.setProperty(WINDOWLENGTH, "30"); + } + + if (!SystemConfiguration.hasProperty(USEQUEUESTREAM)) + { + SystemConfiguration.setProperty(USEQUEUESTREAM, "false"); + } + + if (!SystemConfiguration.hasProperty(MAXBATCHES)) + { + SystemConfiguration.setProperty(MAXBATCHES, "-1"); + } + + if (!SystemConfiguration.hasProperty(STOPGRACEFULLY)) + { + SystemConfiguration.setProperty(STOPGRACEFULLY, "false"); + } + // Load the new local query and data schemas if (valid) { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java index 6ef1bdc..0745bea 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java @@ -184,7 +184,7 @@ public class ComputeEncryptedRow { if (elementCounter >= maxHitsPerSelector) { - logger.info("maxHits: rowIndex = " + rowIndex + " elementCounter = " + elementCounter); + logger.debug("maxHits: rowIndex = " + rowIndex + " elementCounter = " + elementCounter); break; } } @@ -222,7 +222,7 @@ public class ComputeEncryptedRow ++elementCounter; } - logger.info("totalHits: rowIndex = " + rowIndex + " elementCounter = " + elementCounter); + logger.debug("totalHits: rowIndex = " + rowIndex + " elementCounter = " + elementCounter); return returnPairs; } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java index 11473a0..fb5fb91 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java @@ -40,6 +40,7 @@ public class Accumulators implements Serializable private Accumulator numRecordsAfterFilter = null; private Accumulator numHashes = null; private Accumulator numColumns = null; + private Accumulator numBatches = null; public Accumulators(JavaSparkContext sc) { @@ -48,6 +49,7 @@ public class Accumulators implements Serializable numRecordsAfterFilter = sc.accumulator(0); numHashes = sc.accumulator(0); numColumns = sc.accumulator(0); + numBatches = sc.accumulator(0); } public Integer numRecordsReceivedGetValue() @@ -100,6 +102,16 @@ public class Accumulators implements Serializable numColumns.add(val); } + public Integer numBatchesGetValue() + { + return numBatches.value(); + } + + public void incNumBatches(int val) + { + numBatches.add(val); + } + public void resetAll() { numRecordsReceived.setValue(0); @@ -107,11 +119,12 @@ public class Accumulators implements Serializable numRecordsAfterFilter.setValue(0); numHashes.setValue(0); numColumns.setValue(0); + numBatches.setValue(0); } public void printAll() { logger.info("numRecordsReceived = " + numRecordsReceived.value() + " \n numRecordsFiltered = " + numRecordsFiltered + " \n numRecordsAfterFilter = " - + numRecordsAfterFilter + " \n numHashes = " + numHashes + " \n numColumns = " + numColumns); + + numRecordsAfterFilter + " \n numHashes = " + numHashes + " \n numColumns = " + numColumns.value() + " \n numBatches = " + numBatches.value()); } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java index bab4ae9..d0215fc 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java @@ -45,7 +45,7 @@ public class BroadcastVars implements Serializable private Broadcast querySchema = null; - private Broadcast useLocalCache = null; + private Broadcast useLocalCache = null; private Broadcast limitHitsPerSelector = null; @@ -53,6 +53,10 @@ public class BroadcastVars implements Serializable private Broadcast expDir = null; + private Broadcast output = null; + + private Broadcast maxBatches = null; + public BroadcastVars(JavaSparkContext sc) { jsc = sc; @@ -73,6 +77,16 @@ public class BroadcastVars implements Serializable return queryInfo.getValue(); } + public void setOutput(String outputIn) + { + output = jsc.broadcast(outputIn); + } + + public String getOutput() + { + return output.getValue(); + } + public void setQueryInfo(QueryInfo queryInfoIn) { queryInfo = jsc.broadcast(queryInfoIn); @@ -98,12 +112,12 @@ public class BroadcastVars implements Serializable return dataSchema.getValue(); } - public void setUseLocalCache(String useLocalCacheInput) + public void setUseLocalCache(Boolean useLocalCacheInput) { useLocalCache = jsc.broadcast(useLocalCacheInput); } - public String getUseLocalCache() + public Boolean getUseLocalCache() { return useLocalCache.getValue(); } @@ -137,4 +151,14 @@ public class BroadcastVars implements Serializable { return expDir.getValue(); } + + public Integer getMaxBatches() + { + return maxBatches.getValue(); + } + + public void setMaxBatches(Integer maxBatchesIn) + { + maxBatches = jsc.broadcast(maxBatchesIn); + } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java index 2050643..f34acf8 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java @@ -91,6 +91,7 @@ public class ComputeResponse private QueryInfo queryInfo = null; Query query = null; + QuerySchema qSchema = null; private int numDataPartitions = 0; private int numColMultPartitions = 0; @@ -175,7 +176,6 @@ public class ComputeResponse bVars.setQuery(query); bVars.setQueryInfo(queryInfo); - QuerySchema qSchema = null; if (SystemConfiguration.getBooleanProperty("pir.allowAdHocQuerySchemas", false)) { qSchema = queryInfo.getQuerySchema(); @@ -190,7 +190,7 @@ public class ComputeResponse bVars.setDataSchema(dSchema); // Set the local cache flag - bVars.setUseLocalCache(SystemConfiguration.getProperty("pir.useLocalCache", "true")); + bVars.setUseLocalCache(SystemConfiguration.getBooleanProperty("pir.useLocalCache", true)); useHDFSLookupTable = SystemConfiguration.isSetTrue("pir.useHDFSLookupTable"); @@ -246,7 +246,7 @@ public class ComputeResponse { logger.info("Reading data "); - JavaRDD dataRDD; + JavaRDD jsonRDD; Job job = new Job(); String baseQuery = SystemConfiguration.getProperty("pir.baseQuery"); @@ -262,7 +262,6 @@ public class ComputeResponse logger.debug("schemaName = " + name); } - QuerySchema qSchema = QuerySchemaRegistry.get(bVars.getQueryInfo().getQueryType()); job.getConfiguration().set("dataSchemaName", qSchema.getDataSchemaName()); job.getConfiguration().set("data.schemas", SystemConfiguration.getProperty("data.schemas")); @@ -278,12 +277,19 @@ public class ComputeResponse FileInputFormat.setInputPaths(job, inputData); // Read data from hdfs - JavaRDD jsonRDD = sc.newAPIHadoopRDD(job.getConfiguration(), inputClass, Text.class, MapWritable.class).values().coalesce(numDataPartitions); + jsonRDD = sc.newAPIHadoopRDD(job.getConfiguration(), inputClass, Text.class, MapWritable.class).values().coalesce(numDataPartitions); // Filter out by the provided stopListFile entries - dataRDD = jsonRDD.filter(new FilterData(accum, bVars)); - - return dataRDD; + if (qSchema.getFilter() != null) + { + JavaRDD filteredRDD = jsonRDD.filter(new FilterData(accum, bVars)); + return filteredRDD; + } + else + { + logger.info("qSchema.getFilter() is null"); + return jsonRDD; + } } /** @@ -294,7 +300,7 @@ public class ComputeResponse { logger.info("Reading data "); - JavaRDD dataRDD; + JavaRDD jsonRDD; Job job = new Job(); String jobName = "pirSpark_ES_" + esQuery + "_" + System.currentTimeMillis(); @@ -304,13 +310,19 @@ public class ComputeResponse job.getConfiguration().set("es.resource", esResource); job.getConfiguration().set("es.query", esQuery); - JavaRDD jsonRDD = sc.newAPIHadoopRDD(job.getConfiguration(), EsInputFormat.class, Text.class, MapWritable.class).values() - .coalesce(numDataPartitions); + jsonRDD = sc.newAPIHadoopRDD(job.getConfiguration(), EsInputFormat.class, Text.class, MapWritable.class).values().coalesce(numDataPartitions); // Filter out by the provided stopListFile entries - dataRDD = jsonRDD.filter(new FilterData(accum, bVars)); - - return dataRDD; + if (qSchema.getFilter() != null) + { + JavaRDD filteredRDD = jsonRDD.filter(new FilterData(accum, bVars)); + return filteredRDD; + } + else + { + logger.info("qSchema.getFilter() is null"); + return jsonRDD; + } } /** http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java index 0f82b6d..56c917c 100644 --- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java @@ -39,7 +39,7 @@ public class EncColMultGroupedMapper implements PairFunction accum.incNumRecordsReceived(1); // Perform the filter - boolean passFilter = true; - if (filter != null) - { - passFilter = ((DataFilter) filter).filterDataElement(dataElement, dSchema); - } + boolean passFilter = ((DataFilter) filter).filterDataElement(dataElement, dSchema); if (passFilter) { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java new file mode 100644 index 0000000..eaf7384 --- /dev/null +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java @@ -0,0 +1,467 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.responder.wideskies.spark.streaming; + +import java.math.BigInteger; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.pirk.inputformat.hadoop.BaseInputFormat; +import org.apache.pirk.inputformat.hadoop.InputFormatConst; +import org.apache.pirk.query.wideskies.Query; +import org.apache.pirk.query.wideskies.QueryInfo; +import org.apache.pirk.responder.wideskies.spark.Accumulators; +import org.apache.pirk.responder.wideskies.spark.BroadcastVars; +import org.apache.pirk.responder.wideskies.spark.EncColMultGroupedMapper; +import org.apache.pirk.responder.wideskies.spark.EncColMultReducer; +import org.apache.pirk.responder.wideskies.spark.EncRowCalc; +import org.apache.pirk.responder.wideskies.spark.FilterData; +import org.apache.pirk.responder.wideskies.spark.HashSelectorsAndPartitionData; +import org.apache.pirk.schema.data.DataSchema; +import org.apache.pirk.schema.data.DataSchemaLoader; +import org.apache.pirk.schema.data.DataSchemaRegistry; +import org.apache.pirk.schema.query.QuerySchema; +import org.apache.pirk.schema.query.QuerySchemaLoader; +import org.apache.pirk.schema.query.QuerySchemaRegistry; +import org.apache.pirk.serialization.HadoopFileSystemStore; +import org.apache.pirk.utils.PIRException; +import org.apache.pirk.utils.SystemConfiguration; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.VoidFunction; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaPairInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.elasticsearch.hadoop.mr.EsInputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Master class for the PIR query spark streaming application + *

+ * NOTE: + *

+ * - NOT using Elasticsearch in practice - proved to be some speed issues with ES and Spark that appear to be ES-Spark specific - leave code in anticipation + * that the ES-Spark issues resolve... + *

+ * - Even if rdd.count() calls are embedded in logger.debug statements, they are computed by Spark. Thus, they are commented out in the code below - uncomment + * for rdd.count() debug + * + */ +public class ComputeStreamingResponse +{ + private static final Logger logger = LoggerFactory.getLogger(ComputeStreamingResponse.class); + + private String dataInputFormat = null; + private String inputData = null; + private String outputFile = null; + private String outputDirExp = null; + + private String queryInput = null; + QuerySchema qSchema = null; + + private String esQuery = "none"; + private String esResource = "none"; + + private FileSystem fs = null; + private HadoopFileSystemStore storage = null; + private JavaStreamingContext jssc = null; + + boolean useQueueStream = false; + + private long batchSeconds = 0; + private long windowLength = 0; + + private Accumulators accum = null; + private BroadcastVars bVars = null; + + private QueryInfo queryInfo = null; + Query query = null; + + private int numDataPartitions = 0; + private int numColMultPartitions = 0; + + private boolean colMultReduceByKey = false; + + public ComputeStreamingResponse(FileSystem fileSys) throws Exception + { + fs = fileSys; + storage = new HadoopFileSystemStore(fs); + + dataInputFormat = SystemConfiguration.getProperty("pir.dataInputFormat"); + if (!InputFormatConst.ALLOWED_FORMATS.contains(dataInputFormat)) + { + throw new IllegalArgumentException("inputFormat = " + dataInputFormat + " is of an unknown form"); + } + logger.info("inputFormat = " + dataInputFormat); + if (dataInputFormat.equals(InputFormatConst.BASE_FORMAT)) + { + inputData = SystemConfiguration.getProperty("pir.inputData", "none"); + if (inputData.equals("none")) + { + throw new IllegalArgumentException("For inputFormat = " + dataInputFormat + " an inputFile must be specified"); + } + logger.info("inputFile = " + inputData); + } + else if (dataInputFormat.equals(InputFormatConst.ES)) + { + esQuery = SystemConfiguration.getProperty("pir.esQuery", "none"); + esResource = SystemConfiguration.getProperty("pir.esResource", "none"); + if (esQuery.equals("none")) + { + throw new IllegalArgumentException("esQuery must be specified"); + } + if (esResource.equals("none")) + { + throw new IllegalArgumentException("esResource must be specified"); + } + logger.info("esQuery = " + esQuery + " esResource = " + esResource); + } + outputFile = SystemConfiguration.getProperty("pir.outputFile"); + outputDirExp = outputFile + "_exp"; + + queryInput = SystemConfiguration.getProperty("pir.queryInput"); + String stopListFile = SystemConfiguration.getProperty("pir.stopListFile"); + + logger.info("outputFile = " + outputFile + " queryInputDir = " + queryInput + " stopListFile = " + stopListFile + " esQuery = " + esQuery + + " esResource = " + esResource); + + // Pull the batchSeconds and windowLength parameters + batchSeconds = SystemConfiguration.getLongProperty("pir.sparkstreaming.batchSeconds", 30); + windowLength = SystemConfiguration.getLongProperty("pir.sparkstreaming.windowLength", 60); + if (windowLength % batchSeconds != 0) + { + throw new IllegalArgumentException("batchSeconds = " + batchSeconds + " must divide windowLength = " + windowLength); + } + useQueueStream = SystemConfiguration.getBooleanProperty("pir.sparkstreaming.useQueueStream", false); + logger.info("useQueueStream = " + useQueueStream); + + // Set the necessary configurations + SparkConf conf = new SparkConf().setAppName("SparkPIR").setMaster("yarn-cluster"); + conf.set("es.nodes", SystemConfiguration.getProperty("es.nodes", "none")); + conf.set("es.port", SystemConfiguration.getProperty("es.port", "none")); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.set("spark.streaming.stopGracefullyOnShutdown", SystemConfiguration.getProperty("spark.streaming.stopGracefullyOnShutdown", "false")); + + JavaSparkContext sc = new JavaSparkContext(conf); + jssc = new JavaStreamingContext(sc, Durations.seconds(batchSeconds)); + + // Setup, run query, teardown + logger.info("Setting up for query run"); + setup(); + logger.info("Setup complete"); + } + + // Setup for the accumulators and broadcast variables + private void setup() throws Exception + { + // Load the schemas + DataSchemaLoader.initialize(true, fs); + QuerySchemaLoader.initialize(true, fs); + + // Create the accumulators and broadcast variables + accum = new Accumulators(jssc.sparkContext()); + bVars = new BroadcastVars(jssc.sparkContext()); + + // Set the Query and QueryInfo broadcast variables + query = storage.recall(queryInput, Query.class); + queryInfo = query.getQueryInfo(); + bVars.setQuery(query); + bVars.setQueryInfo(queryInfo); + + if (SystemConfiguration.getBooleanProperty("pir.allowAdHocQuerySchemas", false)) + { + qSchema = queryInfo.getQuerySchema(); + } + if (qSchema == null) + { + qSchema = QuerySchemaRegistry.get(queryInfo.getQueryType()); + } + + DataSchema dSchema = DataSchemaRegistry.get(qSchema.getDataSchemaName()); + bVars.setQuerySchema(qSchema); + bVars.setDataSchema(dSchema); + + // Set the local cache flag + bVars.setUseLocalCache(SystemConfiguration.getBooleanProperty("pir.useLocalCache", true)); + + // Set the hit limit variables + bVars.setLimitHitsPerSelector(SystemConfiguration.getBooleanProperty("pir.limitHitsPerSelector", false)); + bVars.setMaxHitsPerSelector(SystemConfiguration.getIntProperty("pir.maxHitsPerSelector", 100)); + + // Set the number of data and column multiplication partitions + numDataPartitions = SystemConfiguration.getIntProperty("pir.numDataPartitions", 1000); + numColMultPartitions = SystemConfiguration.getIntProperty("pir.numColMultPartitions", numDataPartitions); + + // Whether or not we are performing a reduceByKey or a groupByKey->reduce for column multiplication + colMultReduceByKey = SystemConfiguration.getBooleanProperty("pir.colMultReduceByKey", false); + + // Set the expDir + bVars.setExpDir(outputDirExp); + + // Set the maxBatches + int maxBatches = SystemConfiguration.getIntProperty("pir.sparkstreaming.maxBatches", -1); + logger.info("maxBatches = " + maxBatches); + bVars.setMaxBatches(maxBatches); + } + + /** + * Method to start the computation + * + * @throws InterruptedException + */ + public void start() throws InterruptedException + { + logger.info("Starting computation..."); + + jssc.start(); + jssc.awaitTermination(); + } + + /** + * Method to tear down necessary elements when app is complete + */ + public void teardown() + { + logger.info("Tearing down..."); + jssc.stop(); + logger.info("Tear down complete"); + } + + /** + * Method to read in data from an allowed input source/format and perform the query + */ + public void performQuery() throws Exception + { + logger.info("Performing query: "); + + JavaDStream inputRDD = null; + if (dataInputFormat.equals(InputFormatConst.BASE_FORMAT)) + { + inputRDD = readData(); + } + else if (dataInputFormat.equals(InputFormatConst.ES)) + { + inputRDD = readDataES(); + } + + performQuery(inputRDD); + } + + /** + * Method to read in the data from an allowed input format, filter, and return a RDD of MapWritable data elements + */ + @SuppressWarnings("unchecked") + public JavaDStream readData() throws ClassNotFoundException, Exception + { + logger.info("Reading data "); + + Job job = new Job(); + String baseQuery = SystemConfiguration.getProperty("pir.baseQuery"); + String jobName = "pirSpark_base_" + baseQuery + "_" + System.currentTimeMillis(); + job.setJobName(jobName); + job.getConfiguration().setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true); + job.getConfiguration().set("query", baseQuery); + + job.getConfiguration().set("dataSchemaName", qSchema.getDataSchemaName()); + job.getConfiguration().set("data.schemas", SystemConfiguration.getProperty("data.schemas")); + + // Set the inputFormatClass based upon the baseInputFormat property + String classString = SystemConfiguration.getProperty("pir.baseInputFormat"); + Class inputClass = (Class) Class.forName(classString); + if (!Class.forName("org.apache.pirk.inputformat.hadoop.BaseInputFormat").isAssignableFrom(inputClass)) + { + throw new Exception("baseInputFormat class = " + classString + " does not extend BaseInputFormat"); + } + job.setInputFormatClass(inputClass); + + FileInputFormat.setInputPaths(job, inputData); + + // Read data from hdfs + logger.info("useQueueStream = " + useQueueStream); + JavaDStream mwStream = null; + if (useQueueStream) + { + Queue> rddQueue = new LinkedList>(); + JavaRDD rddIn = jssc.sparkContext().newAPIHadoopRDD(job.getConfiguration(), inputClass, Text.class, MapWritable.class).values() + .coalesce(numDataPartitions); + + rddQueue.add(rddIn); + mwStream = jssc.queueStream(rddQueue); + } + else + { + JavaPairInputDStream inputRDD = jssc.fileStream(inputData, Text.class, MapWritable.class, inputClass); + mwStream = inputRDD.transform(new Function,JavaRDD>() + { + private static final long serialVersionUID = 1L; + + @Override + public JavaRDD call(JavaPairRDD pair) throws Exception + { + return pair.values(); + } + }).repartition(numDataPartitions); + } + + // Filter out by the provided stopListFile entries + if (qSchema.getFilter() != null) + { + JavaDStream filteredRDD = mwStream.filter(new FilterData(accum, bVars)); + return filteredRDD; + } + + return mwStream; + } + + /** + * Method to read in the data from elasticsearch, filter, and return a RDD of MapWritable data elements + */ + @SuppressWarnings("unchecked") + public JavaDStream readDataES() throws Exception + { + logger.info("Reading data "); + + Job job = new Job(); + String jobName = "pirSpark_ES_" + esQuery + "_" + System.currentTimeMillis(); + job.setJobName(jobName); + job.getConfiguration().set("es.nodes", SystemConfiguration.getProperty("es.nodes")); + job.getConfiguration().set("es.port", SystemConfiguration.getProperty("es.port")); + job.getConfiguration().set("es.resource", esResource); + job.getConfiguration().set("es.query", esQuery); + + // Read data from hdfs + JavaDStream mwStream = null; + if (useQueueStream) + { + Queue> rddQueue = new LinkedList>(); + JavaRDD rddIn = jssc.sparkContext().newAPIHadoopRDD(job.getConfiguration(), EsInputFormat.class, Text.class, MapWritable.class).values() + .coalesce(numDataPartitions); + rddQueue.add(rddIn); + + mwStream = jssc.queueStream(rddQueue); + } + else + { + JavaPairInputDStream inputRDD = jssc.fileStream(inputData, Text.class, MapWritable.class, EsInputFormat.class); + mwStream = inputRDD.transform(new Function,JavaRDD>() + { + private static final long serialVersionUID = 1L; + + @Override + public JavaRDD call(JavaPairRDD pair) throws Exception + { + return pair.values(); + } + }).repartition(numDataPartitions); + } + + // Filter out by the provided stopListFile entries + if (qSchema.getFilter() != null) + { + JavaDStream filteredRDD = mwStream.filter(new FilterData(accum, bVars)); + return filteredRDD; + } + else + { + return mwStream; + } + } + + /** + * Method to perform the query given an input JavaDStream of JSON + * + * @throws InterruptedException + * + */ + public void performQuery(JavaDStream input) throws PIRException, InterruptedException + { + logger.info("Performing query: "); + + // Process non-overlapping windows of data of duration windowLength seconds + // If we are using queue streams, there is no need to window + if (!useQueueStream) + { + input.window(Durations.seconds(windowLength), Durations.seconds(windowLength)); + } + + // Extract the selectors for each dataElement based upon the query type + // and perform a keyed hash of the selectors + JavaPairDStream> selectorHashToDocRDD = input.mapToPair(new HashSelectorsAndPartitionData(bVars)); + + // Group by hashed selector (row) -- can combine with the line above, separating for testing and benchmarking... + JavaPairDStream>> selectorGroupRDD = selectorHashToDocRDD.groupByKey(); + + // Calculate the encrypted row values for each row, emit for each row + JavaPairDStream encRowRDD = selectorGroupRDD.flatMapToPair(new EncRowCalc(accum, bVars)); + + // Multiply the column values by colNum: emit and write the final result object + encryptedColumnCalc(encRowRDD); + + // Start the streaming computation + start(); + } + + // Method to compute the final encrypted columns + private void encryptedColumnCalc(JavaPairDStream encRowRDD) throws PIRException + { + // Multiply the column values by colNum: emit + JavaPairDStream encColRDD; + if (colMultReduceByKey) + { + encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(accum, bVars), numColMultPartitions); + } + else + { + encColRDD = encRowRDD.groupByKey(numColMultPartitions).mapToPair(new EncColMultGroupedMapper(accum, bVars)); + } + + // Update the output name, by batch number + bVars.setOutput(outputFile + "_" + accum.numBatchesGetValue()); + + // Form and write the response object + encColRDD.repartition(1).foreachRDD(new VoidFunction>() + { + @Override + public void call(JavaPairRDD rdd) + { + rdd.foreachPartition(new FinalResponseFunction(accum, bVars)); + + int maxBatchesVar = bVars.getMaxBatches(); + if (maxBatchesVar != -1 && accum.numBatchesGetValue() == maxBatchesVar) + { + logger.info("num batches = maxBatches = " + maxBatchesVar + "; shutting down"); + System.exit(0); + } + + } + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/FinalResponseFunction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/FinalResponseFunction.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/FinalResponseFunction.java new file mode 100644 index 0000000..a9f07bd --- /dev/null +++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/FinalResponseFunction.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pirk.responder.wideskies.spark.streaming; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.pirk.query.wideskies.QueryInfo; +import org.apache.pirk.responder.wideskies.spark.Accumulators; +import org.apache.pirk.responder.wideskies.spark.BroadcastVars; +import org.apache.pirk.response.wideskies.Response; +import org.apache.pirk.serialization.HadoopFileSystemStore; +import org.apache.spark.api.java.function.VoidFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Tuple2; + +public class FinalResponseFunction implements VoidFunction>> +{ + private static final long serialVersionUID = 1L; + + private static final Logger logger = LoggerFactory.getLogger(FinalResponseFunction.class); + + private BroadcastVars bVars = null; + private Accumulators accum = null; + + public FinalResponseFunction(Accumulators accumIn, BroadcastVars bbVarsIn) + { + bVars = bbVarsIn; + accum = accumIn; + } + + public void call(Iterator> iter) throws Exception + { + // Form the response object + QueryInfo queryInfo = bVars.getQueryInfo(); + Response response = new Response(queryInfo); + while (iter.hasNext()) + { + Tuple2 input = iter.next(); + response.addElement(input._1().intValue(), input._2()); + logger.debug("colNum = " + input._1().intValue() + " column = " + input._2().toString()); + } + + // Write out the response + FileSystem fs = FileSystem.get(new Configuration()); + HadoopFileSystemStore storage = new HadoopFileSystemStore(fs); + String outputFile = bVars.getOutput(); + logger.debug("outputFile = " + outputFile); + try + { + storage.store(outputFile, response); + } catch (IOException e) + { + throw new RuntimeException(e); + } + accum.incNumBatches(1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/schema/data/DataSchema.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchema.java b/src/main/java/org/apache/pirk/schema/data/DataSchema.java index 9557ce6..8728b96 100644 --- a/src/main/java/org/apache/pirk/schema/data/DataSchema.java +++ b/src/main/java/org/apache/pirk/schema/data/DataSchema.java @@ -29,6 +29,8 @@ import org.apache.hadoop.io.Text; import org.apache.pirk.schema.data.partitioner.DataPartitioner; import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner; import org.apache.pirk.utils.PIRException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A data schema describes the target data being referenced by a Querier and a Responder. @@ -43,6 +45,8 @@ public class DataSchema implements Serializable { private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(DataSchema.class); + // This schema's name. private final String schemaName; @@ -212,6 +216,7 @@ public class DataSchema implements Serializable text = new Text(elementName); textRep.put(elementName, text); } + return text; } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java b/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java index 1535e1f..7930464 100644 --- a/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java +++ b/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java @@ -166,6 +166,9 @@ public class DistributedTestCLI tests += "J - JSON/HDFS MapReduce\n"; tests += "ES - Elasticsearch Spark \n"; tests += "JS - JSON/HDFS Spark \n"; + tests += "SS - Spark Streaming Tests \n"; + tests += "JSS - JSON/HDFS Spark Streaming \n"; + tests += "ESS - Elasticsearch Spark Streaming \n"; Option optionTestSelection = new Option("t", "tests", true, "optional -- Select which tests to execute: \n" + tests); optionTestSelection.setRequired(false); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java b/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java index a8cec45..f312932 100755 --- a/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java +++ b/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java @@ -88,6 +88,7 @@ public class DistributedTestDriver List dataElements = Inputs.createJSONInput(fs); String localStopListFile = Inputs.createStopList(fs, true); + SystemConfiguration.setProperty("pir.stopListFile", localStopListFile); Inputs.createSchemaFiles(fs, true, StopListFilter.class.getName()); @@ -100,11 +101,30 @@ public class DistributedTestDriver */ public static void test(FileSystem fs, DistributedTestCLI cli, List pirDataElements) throws Exception { + // MapReduce JSON input if (cli.run("1:J")) { DistTestSuite.testJSONInputMR(fs, pirDataElements); } - if (cli.run("1:E") || cli.run("1:ES")) + + // Spark with JSON input + if (cli.run("1:JS")) + { + DistTestSuite.testJSONInputSpark(fs, pirDataElements); + } + + // Spark Streaming + if (cli.run("1:SS")) + { + DistTestSuite.testSparkStreaming(fs, pirDataElements); + } + if (cli.run("1:JSS")) + { + DistTestSuite.testJSONInputSparkStreaming(fs, pirDataElements); + } + + // Elasticsearch input + if (cli.run("1:E") || cli.run("1:ES") || cli.run("1:ESS")) { Inputs.createESInput(); if (cli.run("1:E")) @@ -115,10 +135,10 @@ public class DistributedTestDriver { DistTestSuite.testESInputSpark(fs, pirDataElements); } - } - if (cli.run("1:JS")) - { - DistTestSuite.testJSONInputSpark(fs, pirDataElements); + if (cli.run("1:ESS")) + { + DistTestSuite.testESInputSparkStreaming(fs, pirDataElements); + } } } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java index bc59619..f44815a 100644 --- a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java +++ b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java @@ -80,13 +80,13 @@ public class DistTestSuite // Run tests SystemConfiguration.setProperty("pirTest.embedSelector", "true"); BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1); - BaseTests.testDNSIPQuery(dataElements, fs, false, true, 1); + BaseTests.testDNSIPQuery(dataElements, fs, false, true, 1, false); SystemConfiguration.setProperty("pirTest.embedSelector", "false"); BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 2); - BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2); + BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2, false); - BaseTests.testSRCIPQueryNoFilter(dataElements, fs, false, true, 2); + BaseTests.testSRCIPQueryNoFilter(dataElements, fs, false, true, 2, false); // Test hit limits per selector SystemConfiguration.setProperty("pir.limitHitsPerSelector", "true"); @@ -97,8 +97,8 @@ public class DistTestSuite // Test the local cache for modular exponentiation SystemConfiguration.setProperty("pir.useLocalCache", "true"); - BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2); - BaseTests.testSRCIPQuery(dataElements, fs, false, true, 2); + BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2, false); + BaseTests.testSRCIPQuery(dataElements, fs, false, true, 2, false); SystemConfiguration.setProperty("pir.useLocalCache", "false"); // Change query for NXDOMAIN @@ -114,7 +114,7 @@ public class DistTestSuite // In memory table SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false"); SystemConfiguration.setProperty("pirTest.useExpLookupTable", "true"); - BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2); + BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2, false); // Create exp table in hdfs SystemConfiguration.setProperty("mapreduce.map.memory.mb", "10000"); @@ -126,7 +126,7 @@ public class DistTestSuite SystemConfiguration.setProperty("pirTest.useExpLookupTable", "false"); SystemConfiguration.setProperty("pir.expCreationSplits", "50"); SystemConfiguration.setProperty("pir.numExpLookupPartitions", "150"); - BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2); + BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2, false); // Reset exp properties SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false"); @@ -172,12 +172,12 @@ public class DistTestSuite // Run tests SystemConfiguration.setProperty("pirTest.embedSelector", "true"); BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1); - BaseTests.testSRCIPQuery(dataElements, fs, false, true, 2); - BaseTests.testDNSIPQuery(dataElements, fs, false, true, 1); + BaseTests.testSRCIPQuery(dataElements, fs, false, true, 2, false); + BaseTests.testDNSIPQuery(dataElements, fs, false, true, 1, false); SystemConfiguration.setProperty("pirTest.embedSelector", "false"); BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 2); - BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2); + BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2, false); // Change query for NXDOMAIN SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:3"); @@ -215,14 +215,14 @@ public class DistTestSuite // Run tests SystemConfiguration.setProperty("pirTest.embedSelector", "true"); BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 1); - BaseTests.testDNSIPQuery(dataElements, fs, true, true, 1); + BaseTests.testDNSIPQuery(dataElements, fs, true, true, 1, false); SystemConfiguration.setProperty("pirTest.embedSelector", "false"); BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 2); - BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2); - BaseTests.testSRCIPQuery(dataElements, fs, true, true, 2); + BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2, false); + BaseTests.testSRCIPQuery(dataElements, fs, true, true, 2, false); - BaseTests.testSRCIPQueryNoFilter(dataElements, fs, true, true, 2); + BaseTests.testSRCIPQueryNoFilter(dataElements, fs, true, true, 2, false); // Test embedded QuerySchema SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "true"); @@ -253,18 +253,18 @@ public class DistTestSuite // Test the local cache for modular exponentiation SystemConfiguration.setProperty("pirTest.embedSelector", "true"); SystemConfiguration.setProperty("pir.useLocalCache", "true"); - BaseTests.testDNSIPQuery(dataElements, fs, true, true, 3); + BaseTests.testDNSIPQuery(dataElements, fs, true, true, 3, false); // Test the join functionality for the modular exponentiation table SystemConfiguration.setProperty("pir.useModExpJoin", "true"); - BaseTests.testDNSIPQuery(dataElements, fs, true, true, 3); + BaseTests.testDNSIPQuery(dataElements, fs, true, true, 3, false); SystemConfiguration.setProperty("pir.useModExpJoin", "false"); // Test file based exp lookup table for modular exponentiation SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "true"); SystemConfiguration.setProperty("pir.expCreationSplits", "500"); SystemConfiguration.setProperty("pir.numExpLookupPartitions", "150"); - BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2); + BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2, false); SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false"); // Change query for NXDOMAIN @@ -303,12 +303,12 @@ public class DistTestSuite // Run tests SystemConfiguration.setProperty("pirTest.embedSelector", "true"); BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 1); - BaseTests.testDNSIPQuery(dataElements, fs, true, true, 1); - BaseTests.testSRCIPQuery(dataElements, fs, true, true, 2); + BaseTests.testDNSIPQuery(dataElements, fs, true, true, 1, false); + BaseTests.testSRCIPQuery(dataElements, fs, true, true, 2, false); SystemConfiguration.setProperty("pirTest.embedSelector", "false"); BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 2); - BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2); + BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2, false); // Change query for NXDOMAIN SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:3"); @@ -321,9 +321,89 @@ public class DistTestSuite logger.info("Completed testESInputSpark"); } + public static void testSparkStreaming(FileSystem fs, List pirDataElements) throws Exception + { + testJSONInputSparkStreaming(fs, pirDataElements); + testESInputSparkStreaming(fs, pirDataElements); + } + + public static void testJSONInputSparkStreaming(FileSystem fs, List pirDataElements) throws Exception + { + logger.info("Starting testJSONInputSparkStreaming"); + + SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false"); + SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1000"); + + SystemConfiguration.setProperty("pir.numColMultPartitions", "20"); + SystemConfiguration.setProperty("pir.colMultReduceByKey", "false"); + + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + + SystemConfiguration.setProperty("pirTest.embedSelector", "true"); + + SystemConfiguration.setProperty("pir.numDataPartitions", "3"); + + SystemConfiguration.setProperty("pir.sparkstreaming.batchSeconds", "30"); + SystemConfiguration.setProperty("pir.sparkstreaming.windowLength", "60"); + SystemConfiguration.setProperty("pir.sparkstreaming.useQueueStream", "true"); + SystemConfiguration.setProperty("pir.sparkstreaming.maxBatches", "1"); + + SystemConfiguration.setProperty("spark.streaming.stopGracefullyOnShutdown", "false"); + + // Set up JSON configs + SystemConfiguration.setProperty("pir.dataInputFormat", InputFormatConst.BASE_FORMAT); + SystemConfiguration.setProperty("pir.inputData", SystemConfiguration.getProperty(DistributedTestDriver.JSON_PIR_INPUT_FILE_PROPERTY)); + SystemConfiguration.setProperty("pir.baseQuery", "?q=rcode:0"); + + // Run tests + BaseTests.testDNSHostnameQuery(pirDataElements, fs, true, true, 1, false, true); + BaseTests.testDNSIPQuery(pirDataElements, fs, true, true, 1, true); + BaseTests.testSRCIPQuery(pirDataElements, fs, true, true, 3, true); + BaseTests.testSRCIPQueryNoFilter(pirDataElements, fs, true, true, 2, true); + + logger.info("Completed testJSONInputSparkStreaming"); + } + + public static void testESInputSparkStreaming(FileSystem fs, List pirDataElements) throws Exception + { + logger.info("Starting testESInputSparkStreaming"); + + SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false"); + SystemConfiguration.setProperty("pirTest.useExpLookupTable", "false"); + + SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false"); + SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1000"); + + SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false"); + SystemConfiguration.setProperty("pir.embedQuerySchema", "false"); + + SystemConfiguration.setProperty("pir.sparkstreaming.batchSeconds", "30"); + SystemConfiguration.setProperty("pir.sparkstreaming.windowLength", "60"); + SystemConfiguration.setProperty("pir.sparkstreaming.useQueueStream", "true"); + SystemConfiguration.setProperty("pir.sparkstreaming.maxBatches", "1"); + + SystemConfiguration.setProperty("spark.streaming.stopGracefullyOnShutdown", "false"); + + // Set up ES configs + SystemConfiguration.setProperty("pir.dataInputFormat", InputFormatConst.ES); + SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:0"); + SystemConfiguration.setProperty("pir.esResource", SystemConfiguration.getProperty(DistributedTestDriver.ES_INPUT_RESOURCE_PROPERTY)); + + // Run tests + SystemConfiguration.setProperty("pirTest.embedSelector", "true"); + BaseTests.testDNSHostnameQuery(pirDataElements, fs, true, true, 1, false, true); + BaseTests.testDNSIPQuery(pirDataElements, fs, true, true, 1, true); + BaseTests.testSRCIPQuery(pirDataElements, fs, true, true, 3, true); + + logger.info("Completed testESInputSparkStreaming"); + } + // Base method to perform query - public static List performQuery(String queryType, ArrayList selectors, FileSystem fs, boolean isSpark, int numThreads) - throws Exception + // TODO: This could be changed to pass in the platform instead of isSpark and isStreaming... + @SuppressWarnings("unused") + public static List performQuery(String queryType, ArrayList selectors, FileSystem fs, boolean isSpark, int numThreads, + boolean isStreaming) throws Exception { logger.info("performQuery: "); @@ -379,11 +459,28 @@ public class DistTestSuite logger.info("Performing encrypted query:"); if (isSpark) { + logger.info("spark.home = " + SystemConfiguration.getProperty("spark.home")); + // Build args String inputFormat = SystemConfiguration.getProperty("pir.dataInputFormat"); logger.info("inputFormat = " + inputFormat); ArrayList args = new ArrayList<>(); - args.add("-" + ResponderProps.PLATFORM + "=spark"); + if (isStreaming) + { + logger.info("platform = sparkstreaming"); + args.add("-" + ResponderProps.PLATFORM + "=sparkstreaming"); + args.add("-" + ResponderProps.BATCHSECONDS + "=" + SystemConfiguration.getProperty("pir.sparkstreaming.batchSeconds", "30")); + args.add("-" + ResponderProps.WINDOWLENGTH + "=" + SystemConfiguration.getProperty("pir.sparkstreaming.windowLength", "60")); + args.add("-" + ResponderProps.MAXBATCHES + "=" + SystemConfiguration.getProperty("pir.sparkstreaming.maxBatches", "-1")); + args.add("-" + ResponderProps.STOPGRACEFULLY + "=" + SystemConfiguration.getProperty("spark.streaming.stopGracefullyOnShutdown", "false")); + args.add("-" + ResponderProps.NUMDATAPARTITIONS + "=" + SystemConfiguration.getProperty("pir.numDataPartitions", "3")); + args.add("-" + ResponderProps.USEQUEUESTREAM + "=" + SystemConfiguration.getProperty("pir.sparkstreaming.useQueueStream", "false")); + } + else + { + logger.info("platform = spark"); + args.add("-" + ResponderProps.PLATFORM + "=spark"); + } args.add("-" + ResponderProps.DATAINPUTFORMAT + "=" + inputFormat); args.add("-" + ResponderProps.QUERYINPUT + "=" + SystemConfiguration.getProperty("pir.queryInput")); args.add("-" + ResponderProps.OUTPUTFILE + "=" + SystemConfiguration.getProperty("pir.outputFile")); @@ -436,6 +533,11 @@ public class DistTestSuite // Perform decryption // Reconstruct the necessary objects from the files logger.info("Performing decryption; writing final results file"); + if (isStreaming) + { + outputFile = outputFile + "_0"; // currently only processing one batch for testing + } + logger.info("Pulling results from outputFile = " + outputFile); Response response = new HadoopFileSystemStore(fs).recall(outputFile, Response.class); // Perform decryption and output the result file http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/test/utils/BaseTests.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/BaseTests.java b/src/main/java/org/apache/pirk/test/utils/BaseTests.java index c1fa1e9..962e467 100644 --- a/src/main/java/org/apache/pirk/test/utils/BaseTests.java +++ b/src/main/java/org/apache/pirk/test/utils/BaseTests.java @@ -62,18 +62,18 @@ public class BaseTests public static void testDNSHostnameQuery(ArrayList dataElements, int numThreads, boolean testFalsePositive) throws Exception { - testDNSHostnameQuery(dataElements, null, false, false, numThreads, testFalsePositive); + testDNSHostnameQuery(dataElements, null, false, false, numThreads, testFalsePositive, false); } public static void testDNSHostnameQuery(List dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception { - testDNSHostnameQuery(dataElements, fs, isSpark, isDistributed, numThreads, false); + testDNSHostnameQuery(dataElements, fs, isSpark, isDistributed, numThreads, false, false); } // Query for the watched hostname occurred; ; watched value type: hostname (String) public static void testDNSHostnameQuery(List dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads, - boolean testFalsePositive) throws Exception + boolean testFalsePositive, boolean isStreaming) throws Exception { logger.info("Running testDNSHostnameQuery(): "); @@ -83,7 +83,7 @@ public class BaseTests List results; if (isDistributed) { - results = DistTestSuite.performQuery(Inputs.DNS_HOSTNAME_QUERY, selectorsDomain, fs, isSpark, numThreads); + results = DistTestSuite.performQuery(Inputs.DNS_HOSTNAME_QUERY, selectorsDomain, fs, isSpark, numThreads, isStreaming); } else { @@ -193,11 +193,12 @@ public class BaseTests public static void testDNSIPQuery(ArrayList dataElements, int numThreads) throws Exception { - testDNSIPQuery(dataElements, null, false, false, numThreads); + testDNSIPQuery(dataElements, null, false, false, numThreads, false); } // The watched IP address was detected in the response to a query; watched value type: IP address (String) - public static void testDNSIPQuery(List dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception + public static void testDNSIPQuery(List dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads, boolean isStreaming) + throws Exception { logger.info("Running testDNSIPQuery(): "); @@ -206,7 +207,7 @@ public class BaseTests if (isDistributed) { - results = DistTestSuite.performQuery(Inputs.DNS_IP_QUERY, selectorsIP, fs, isSpark, numThreads); + results = DistTestSuite.performQuery(Inputs.DNS_IP_QUERY, selectorsIP, fs, isSpark, numThreads, isStreaming); if (results.size() != 5) { @@ -280,7 +281,7 @@ public class BaseTests if (isDistributed) { - results = DistTestSuite.performQuery(Inputs.DNS_NXDOMAIN_QUERY, selectorsDomain, fs, isSpark, numThreads); + results = DistTestSuite.performQuery(Inputs.DNS_NXDOMAIN_QUERY, selectorsDomain, fs, isSpark, numThreads, false); } else { @@ -330,11 +331,12 @@ public class BaseTests public static void testSRCIPQuery(ArrayList dataElements, int numThreads) throws Exception { - testSRCIPQuery(dataElements, null, false, false, numThreads); + testSRCIPQuery(dataElements, null, false, false, numThreads, false); } // Query for responses from watched srcIPs - public static void testSRCIPQuery(List dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception + public static void testSRCIPQuery(List dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads, boolean isStreaming) + throws Exception { logger.info("Running testSRCIPQuery(): "); @@ -345,7 +347,7 @@ public class BaseTests int numExpectedResults = 1; if (isDistributed) { - results = DistTestSuite.performQuery(Inputs.DNS_SRCIP_QUERY, selectorsIP, fs, isSpark, numThreads); + results = DistTestSuite.performQuery(Inputs.DNS_SRCIP_QUERY, selectorsIP, fs, isSpark, numThreads, isStreaming); removeTailElements = 2; // The last two elements are on the distributed stoplist } else @@ -406,8 +408,8 @@ public class BaseTests } // Query for responses from watched srcIPs - public static void testSRCIPQueryNoFilter(List dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) - throws Exception + public static void testSRCIPQueryNoFilter(List dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads, + boolean isStreaming) throws Exception { logger.info("Running testSRCIPQueryNoFilter(): "); @@ -417,7 +419,7 @@ public class BaseTests int numExpectedResults = 3; if (isDistributed) { - results = DistTestSuite.performQuery(Inputs.DNS_SRCIP_QUERY_NO_FILTER, selectorsIP, fs, isSpark, numThreads); + results = DistTestSuite.performQuery(Inputs.DNS_SRCIP_QUERY_NO_FILTER, selectorsIP, fs, isSpark, numThreads, isStreaming); } else { http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/test/utils/Inputs.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/test/utils/Inputs.java b/src/main/java/org/apache/pirk/test/utils/Inputs.java index 587ac99..b6e7251 100644 --- a/src/main/java/org/apache/pirk/test/utils/Inputs.java +++ b/src/main/java/org/apache/pirk/test/utils/Inputs.java @@ -207,8 +207,7 @@ public class Inputs dataElementsJSON.add(jsonObj7); - // This should never be returned - doesn't hit on any domain selectors - // resolution ip on stoplist + // Doesn't hit on any domain selectors; resolution ip on stoplist JSONObject jsonObj8 = new JSONObject(); jsonObj8.put(DATE, "2016-02-20T23:29:12.000Z"); jsonObj8.put(QNAME, "something.else2"); @@ -220,7 +219,7 @@ public class Inputs dataElementsJSON.add(jsonObj8); - // This should never be returned in distributed case -- domain and resolution ip on stoplist + // Domain and resolution ip on stoplist JSONObject jsonObj9 = new JSONObject(); jsonObj9.put(DATE, "2016-02-20T23:29:13.000Z"); jsonObj9.put(QNAME, "something.else.on.stoplist"); @@ -340,14 +339,14 @@ public class Inputs "{\"qname\":\"something.else\",\"date\":\"2016-02-20T23:29:11.000Z\",\"qtype\":[\"1\"]" + ",\"rcode\":\"0\",\"src_ip\":\"1.1.1.1\",\"dest_ip\":\"2.2.2.2\"" + ",\"ip\":[\"3.3.3.3\"]}"); - // Never should be returned - doesn't hit on any selectors + // Doesn't hit on any domain selectors; resolution ip on stoplist String indexTypeNum8 = esTestIndex + "/" + esType + "/8"; logger.info("indexTypeNum8 = " + indexTypeNum8); ProcessBuilder pAdd8 = new ProcessBuilder("curl", "-XPUT", indexTypeNum8, "-d", "{\"qname\":\"something.else2\",\"date\":\"2016-02-20T23:29:12.000Z\",\"qtype\":[\"1\"]" - + ",\"rcode\":\"0\",\"src_ip\":\"1.1.1.12\",\"dest_ip\":\"2.2.2.22\"" + ",\"ip\":[\"3.3.3.32\"]}"); + + ",\"rcode\":\"0\",\"src_ip\":\"5.6.7.8\",\"dest_ip\":\"2.2.2.22\"" + ",\"ip\":[\"3.3.3.132\"]}"); - // This should never be returned -- domain on stoplist + // Domain on stoplist String indexTypeNum9 = esTestIndex + "/" + esType + "/9"; logger.info("indexTypeNum9 = " + indexTypeNum9); ProcessBuilder pAdd9 = new ProcessBuilder("curl", "-XPUT", indexTypeNum9, "-d", http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/java/org/apache/pirk/utils/SystemConfiguration.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/pirk/utils/SystemConfiguration.java b/src/main/java/org/apache/pirk/utils/SystemConfiguration.java index 4146e5b..8cb5d17 100755 --- a/src/main/java/org/apache/pirk/utils/SystemConfiguration.java +++ b/src/main/java/org/apache/pirk/utils/SystemConfiguration.java @@ -117,6 +117,23 @@ public class SystemConfiguration } /** + * Gets the specified property as an long, or the default value if the property isn't found. + * + * @param propertyName + * The name of the requested long property value. + * @param defaultValue + * The value to return if the property is undefined. + * @return The value of the requested property, or the default value if the property is undefined. + * @throws NumberFormatException + * If the property does not contain a parsable long value. + */ + public static long getLongProperty(String propertyName, long defaultValue) + { + String value = props.getProperty(propertyName); + return (value == null) ? defaultValue : Long.parseLong(value); + } + + /** * Gets the specified property as a boolean, or the default value if the property isn't defined. * * @param propertyName @@ -207,6 +224,7 @@ public class SystemConfiguration */ public static void loadPropsFromDir(String dirName) { + logger.info("Loading properties from dirName = " + dirName); File[] directoryListing = new File(dirName).listFiles(new FilenameFilter() { @Override http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/resources/pirk.properties ---------------------------------------------------------------------- diff --git a/src/main/resources/pirk.properties b/src/main/resources/pirk.properties index f8efea7..963fa34 100755 --- a/src/main/resources/pirk.properties +++ b/src/main/resources/pirk.properties @@ -81,7 +81,7 @@ query.schemas = none ## #ES host address - One Elasticsearch node in the cluster - may include port specification -es.nodes= none +es.nodes=none #Default HTTP/REST port used for connecting to Elasticsearch es.port=9200 @@ -106,7 +106,7 @@ test.es.type = pkt #Elasticsearch resource - Elasticsearch resource location where data is read and written to. #Requires the format / -test.es.resource= none +test.es.resource= /testindex/pkt #Pathname in hdfs to place input JSON file testing test.inputJSONFile = /tmp/testJSONInput http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/main/resources/responder.properties ---------------------------------------------------------------------- diff --git a/src/main/resources/responder.properties b/src/main/resources/responder.properties index 11ad7f6..3ae92c7 100644 --- a/src/main/resources/responder.properties +++ b/src/main/resources/responder.properties @@ -27,7 +27,7 @@ pir.dataInputFormat= #outputFile -- required -- Fully qualified name of output file in hdfs pir.outputFile= -#platform -- required -- 'mapreduce', 'spark', or 'standalone' +#platform -- required -- 'mapreduce', 'spark', 'sparkstreaming', or 'standalone' #Processing platform technology for the responder platform= @@ -134,4 +134,24 @@ pir.queryInput= #numExpLookupPartitions -- optional -- Number of partitions for the exp lookup table #pir.numExpLookupPartitions= + +##Props for Spark Streaming + +#batchSeconds - optional - Batch size (in seconds) for Spark Streaming - defaults to 30 sec +#pir.sparkstreaming.batchSeconds= + +#windowLength - optional - Window size (in seconds) for Spark Streaming - defaults to 60 sec +#pir.sparkstreaming.windowLength= + +#queueStream - optional - Use queue stream for Spark Streaming - defaults to false +#pir.sparkstreaming.useQueueStream= + +#pir.sparkstreaming.maxBatches - optional - Spark Streaming - Max number of batches to process +#defaults to -1 (no maximum) +#pir.sparkstreaming.maxBatches= + +#spark.streaming.stopGracefullyOnShutdown - Spark Streaming - Whether or not to stop 'gracefully' during shutdown +#default is false +#spark.streaming.stopGracefullyOnShutdown= + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java b/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java index 9ac2522..bb70153 100644 --- a/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java +++ b/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java @@ -66,11 +66,6 @@ public class QueryParserUtilsTest Inputs.createSchemaFiles(null, false, null); dSchema = DataSchemaRegistry.get(Inputs.TEST_DATA_SCHEMA_NAME); - - // ProcessBuilder pAdd1 = new ProcessBuilder("curl", "-XPUT", indexTypeNum1, "-d", - // "{\"qname\":\"a.b.c.com\",\"date\":\"2016-02-20T23:29:05.000Z\",\"qtype\":[\"1\"]" - // + ",\"rcode\":\"0\",\"src_ip\":\"55.55.55.55\",\"dest_ip\":\"1.2.3.6\"" + ",\"ip\":[\"10.20.30.40\",\"10.20.30.60\"]}"); - // doc = StringUtils.jsonStringToMapWritableWithArrayWritable(dataElementsJSON.get(0).toJSONString(), dSchema); docWAW = StringUtils.jsonStringToMapWritableWithWritableArrayWritable(dataElementsJSON.get(0).toJSONString(), dSchema); docMap = StringUtils.jsonStringToMap(dataElementsJSON.get(0).toJSONString(), dSchema); http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/test/java/org/apache/pirk/schema/query/LoadQuerySchemaTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/pirk/schema/query/LoadQuerySchemaTest.java b/src/test/java/org/apache/pirk/schema/query/LoadQuerySchemaTest.java index a801b01..0b09fa9 100644 --- a/src/test/java/org/apache/pirk/schema/query/LoadQuerySchemaTest.java +++ b/src/test/java/org/apache/pirk/schema/query/LoadQuerySchemaTest.java @@ -305,8 +305,8 @@ public class LoadQuerySchemaTest // Create the stoplist file and alter the properties accordingly private void createStopListFile() throws IOException, PIRException { - SystemConfiguration.setProperty("pir.stopListFile", "testStopListFile"); String newSLFile = Inputs.createStopList(null, false); + SystemConfiguration.setProperty("pir.stopListFile", newSLFile); } http://git-wip-us.apache.org/repos/asf/incubator-pirk/blob/3957be41/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java b/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java index 2144ee1..cb65a60 100644 --- a/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java +++ b/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java @@ -62,9 +62,10 @@ public class StandaloneTest // Create the stoplist file stopListFileProp = SystemConfiguration.getProperty("pir.stopListFile"); - SystemConfiguration.setProperty("pir.stopListFile", STOPLIST_FILE); + String newSLFile = Inputs.createStopList(null, false); SystemConfiguration.setProperty("pir.stopListFile", newSLFile); + logger.info("stopListFileProp = " + stopListFileProp + " new prop = " + SystemConfiguration.getProperty("pir.stopListFile")); // Create data and query schemas