spot-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nlseg...@apache.org
Subject [2/6] incubator-spot git commit: This commit contains the update of spot-ml from Spark 1.6 to Spark 2.1. This new version introduces SparkSession as the new point of entry for SparkContext, SQLContext and HiveContext used in Spark 1.x
Date Mon, 19 Jun 2017 22:41:44 GMT
This commit contains the update of spot-ml from Spark 1.6 to Spark 2.1. This new version introduces SparkSession as the new point of entry for SparkContext, SQLContext and HiveContext used in Spark 1.x

Changed "tdur" variable from float to double because according to the spot-ml documentation this is the correct data type.

log4j.logger was changed to report ERROR instead of WARN + ERROR

Unit tests were modified to be able to run a local SparkSession instead of the old SparkContext

yarn-client was changed to yarn

Locals folders were changed from Scala 2.10 to 2.11

minExecutors in ml_test.sh was changed to the default 0 to avoid warnings while running the code complaining that initial executors = 1 is not < that minExecutors = 1


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/89883bd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/89883bd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/89883bd7

Branch: refs/heads/master
Commit: 89883bd7573b66c0e2c04d4640c8ee3b8dccd074
Parents: 4e46410
Author: lujangus <gustavo.lujan.moreno@intel.com>
Authored: Fri Jun 16 10:01:25 2017 -0500
Committer: lujangus <gustavo.lujan.moreno@intel.com>
Committed: Mon Jun 19 10:20:31 2017 -0500

----------------------------------------------------------------------
 spot-ml/build.sbt                               |  17 ++-
 spot-ml/ml_ops.sh                               |   4 +-
 spot-ml/ml_test.sh                              |   6 +-
 .../scala/org/apache/spark/sql/WideUDFs.scala   |   8 +-
 .../org/apache/spot/SuspiciousConnects.scala    |  29 ++--
 .../dns/DNSSuspiciousConnectsAnalysis.scala     |  12 +-
 .../org/apache/spot/dns/model/DNSFeedback.scala |  17 +--
 .../dns/model/DNSSuspiciousConnectsModel.scala  |  40 +++--
 .../org/apache/spot/lda/SpotLDAWrapper.scala    |  23 ++-
 .../org/apache/spot/netflow/FlowSchema.scala    |   2 +-
 .../FlowSuspiciousConnectsAnalysis.scala        |   9 +-
 .../spot/netflow/model/FlowFeedback.scala       |  15 +-
 .../model/FlowSuspiciousConnectsModel.scala     |  33 ++---
 .../org/apache/spot/proxy/ProxyFeedback.scala   |  15 +-
 .../proxy/ProxySuspiciousConnectsAnalysis.scala |  16 +-
 .../proxy/ProxySuspiciousConnectsModel.scala    |  40 +++--
 .../utilities/data/InputOutputDataHandler.scala |  19 ++-
 spot-ml/src/test/resources/log4j.properties     |   6 +-
 .../org/apache/spot/SpotLDAWrapperTest.scala    | 147 +++++++++++++++++++
 .../dns/DNSSuspiciousConnectsAnalysisTest.scala |  35 +++--
 .../org/apache/spot/netflow/FlowRecord.scala    |   6 +-
 .../FlowSuspiciousConnectsAnalysisTest.scala    | 109 ++++++++------
 .../model/FlowSuspiciousConnectsModelTest.scala |  70 +++++----
 .../ProxySuspiciousConnectsAnalysisTest.scala   |  29 ++--
 .../spot/proxy/ProxyWordCreationTest.scala      |   6 +-
 .../spot/testutils/TestingSparkContext.scala    |  49 +++----
 .../testutils/TestingSparkContextFlatSpec.scala |  23 ++-
 .../testutils/TestingSparkContextFunSuite.scala |  25 ++--
 .../testutils/TestingSparkContextWordSpec.scala |  24 ++-
 .../spot/utilities/DomainProcessorTest.scala    |  20 +--
 30 files changed, 496 insertions(+), 358 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/build.sbt
----------------------------------------------------------------------
diff --git a/spot-ml/build.sbt b/spot-ml/build.sbt
index c42112e..beb8c72 100644
--- a/spot-ml/build.sbt
+++ b/spot-ml/build.sbt
@@ -19,16 +19,18 @@ name := "spot-ml"
 
 version := "1.1"
 
-scalaVersion := "2.10.6"
+scalaVersion := "2.11.8"
 
-import AssemblyKeys._
+val sparkVersion = "2.1.0"
+
+import sbtassembly.Plugin.AssemblyKeys._
 
 assemblySettings
 
-libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
-libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.6.0"
-libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.0"
-libraryDependencies += "org.scalatest" % "scalatest_2.10" % "2.2.6"
+libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % "provided"
+libraryDependencies += "org.apache.spark" %% "spark-mllib" % sparkVersion
+libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % "provided"
+libraryDependencies += "org.scalatest" % "scalatest_2.11" % "2.2.6"
 libraryDependencies += "com.github.scopt" %% "scopt" % "3.5.0"
 
 resolvers += Resolver.sonatypeRepo("public")
@@ -43,6 +45,9 @@ mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) => {
   case PathList("org", "apache", "spark", xs@_*) => MergeStrategy.last
   case PathList("javax", "xml", xs@_*) => MergeStrategy.last
   case "about.html" => MergeStrategy.rename
+  case PathList("META-INF", xs@_*) => MergeStrategy.discard
+  case "about.html" => MergeStrategy.rename
+  case x => MergeStrategy.first
   case meta(_) => MergeStrategy.discard
   case x => old(x)
 }

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/ml_ops.sh
----------------------------------------------------------------------
diff --git a/spot-ml/ml_ops.sh b/spot-ml/ml_ops.sh
index 7900568..4536676 100755
--- a/spot-ml/ml_ops.sh
+++ b/spot-ml/ml_ops.sh
@@ -82,7 +82,7 @@ rm -f ${LPATH}/*.{dat,beta,gamma,other,pkl} # protect the flow_scores.csv file
 hdfs dfs -rm -R -f ${HDFS_SCORED_CONNECTS}
 
 time spark-submit --class "org.apache.spot.SuspiciousConnects" \
-  --master yarn-client \
+  --master yarn \
   --driver-memory ${SPK_DRIVER_MEM} \
   --conf spark.driver.maxResultSize=${SPK_DRIVER_MAX_RESULTS} \
   --conf spark.driver.maxPermSize=512m \
@@ -95,7 +95,7 @@ time spark-submit --class "org.apache.spot.SuspiciousConnects" \
   --conf spark.kryoserializer.buffer.max=512m \
   --conf spark.yarn.am.waitTime=100s \
   --conf spark.yarn.am.memoryOverhead=${SPK_DRIVER_MEM_OVERHEAD} \
-  --conf spark.yarn.executor.memoryOverhead=${SPK_EXEC_MEM_OVERHEAD} target/scala-2.10/spot-ml-assembly-1.1.jar \
+  --conf spark.yarn.executor.memoryOverhead=${SPK_EXEC_MEM_OVERHEAD} target/scala-2.11/spot-ml-assembly-1.1.jar \
   --analysis ${DSOURCE} \
   --input ${RAWDATA_PATH}  \
   --dupfactor ${DUPFACTOR} \

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/ml_test.sh
----------------------------------------------------------------------
diff --git a/spot-ml/ml_test.sh b/spot-ml/ml_test.sh
index 807b46c..9a064e6 100755
--- a/spot-ml/ml_test.sh
+++ b/spot-ml/ml_test.sh
@@ -53,13 +53,13 @@ rm -f ${LPATH}/*.{dat,beta,gamma,other,pkl} # protect the flow_scores.csv file
 hdfs dfs -rm -R -f ${HDFS_SCORED_CONNECTS}
 
 time spark-submit --class "org.apache.spot.SuspiciousConnects" \
-  --master yarn-client \
+  --master yarn \
   --driver-memory ${SPK_DRIVER_MEM} \
   --conf spark.driver.maxResultSize=${SPK_DRIVER_MAX_RESULTS} \
   --conf spark.driver.maxPermSize=512m \
   --conf spark.driver.cores=1 \
   --conf spark.dynamicAllocation.enabled=true \
-  --conf spark.dynamicAllocation.minExecutors=1 \
+  --conf spark.dynamicAllocation.minExecutors=0 \
   --conf spark.dynamicAllocation.maxExecutors=${SPK_EXEC} \
   --conf spark.executor.cores=${SPK_EXEC_CORES} \
   --conf spark.executor.memory=${SPK_EXEC_MEM} \
@@ -70,7 +70,7 @@ time spark-submit --class "org.apache.spot.SuspiciousConnects" \
   --conf spark.shuffle.service.enabled=true \
   --conf spark.yarn.am.waitTime=1000000 \
   --conf spark.yarn.driver.memoryOverhead=${SPK_DRIVER_MEM_OVERHEAD} \
-  --conf spark.yarn.executor.memoryOverhead=${SPK_EXEC_MEM_OVERHEAD} target/scala-2.10/spot-ml-assembly-1.1.jar \
+  --conf spark.yarn.executor.memoryOverhead=${SPK_EXEC_MEM_OVERHEAD} target/scala-2.11/spot-ml-assembly-1.1.jar \
   --analysis ${DSOURCE} \
   --input ${RAWDATA_PATH}  \
   --dupfactor ${DUPFACTOR} \

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/main/scala/org/apache/spark/sql/WideUDFs.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spark/sql/WideUDFs.scala b/spot-ml/src/main/scala/org/apache/spark/sql/WideUDFs.scala
index fca37e2..c5e4dec 100644
--- a/spot-ml/src/main/scala/org/apache/spark/sql/WideUDFs.scala
+++ b/spot-ml/src/main/scala/org/apache/spark/sql/WideUDFs.scala
@@ -16,8 +16,8 @@
  */
 
 package org.apache.spark.sql
-
 import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.expressions.UserDefinedFunction
 
 import scala.language.implicitConversions
 import scala.reflect.runtime.universe.{TypeTag, typeTag}
@@ -39,7 +39,7 @@ object WideUDFs {
       :: ScalaReflection.schemaFor(typeTag[A9]).dataType
       :: ScalaReflection.schemaFor(typeTag[A10]).dataType
       :: Nil).getOrElse(Nil)
-    UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, inputTypes)
+    UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Option(inputTypes))
   }
 
 
@@ -64,7 +64,7 @@ object WideUDFs {
       :: ScalaReflection.schemaFor(typeTag[A10]).dataType
       :: ScalaReflection.schemaFor(typeTag[A11]).dataType
       :: Nil).getOrElse(Nil)
-    UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, inputTypes)
+    UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Option(inputTypes))
   }
   def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag,
   A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag](f: Function12[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, RT]):
@@ -82,6 +82,6 @@ object WideUDFs {
       :: ScalaReflection.schemaFor(typeTag[A11]).dataType
       :: ScalaReflection.schemaFor(typeTag[A12]).dataType
       :: Nil).getOrElse(Nil)
-    UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, inputTypes)
+    UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Option(inputTypes))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala b/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
index 588bd9b..1c21579 100644
--- a/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
@@ -18,8 +18,7 @@
 package org.apache.spot
 
 import org.apache.log4j.{Level, LogManager, Logger}
-import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.dns.DNSSuspiciousConnectsAnalysis
 import org.apache.spot.netflow.FlowSuspiciousConnectsAnalysis
@@ -57,12 +56,14 @@ object SuspiciousConnects {
         Logger.getLogger("akka").setLevel(Level.OFF)
 
         val analysis = config.analysis
-        val sparkConfig = new SparkConf().setAppName("Spot ML:  " + analysis + " suspicious connects analysis")
-        val sparkContext = new SparkContext(sparkConfig)
-        val sqlContext = new SQLContext(sparkContext)
 
-        val inputDataFrame = InputOutputDataHandler.getInputDataFrame(sqlContext, config.inputPath, logger)
-          .getOrElse(sqlContext.emptyDataFrame)
+        val spark = SparkSession.builder
+          .appName("Spot ML:  " + analysis + " suspicious connects analysis")
+          .master("yarn")
+          .getOrCreate()
+
+        val inputDataFrame = InputOutputDataHandler.getInputDataFrame(spark, config.inputPath, logger)
+          .getOrElse(spark.emptyDataFrame)
         if(inputDataFrame.rdd.isEmpty()) {
           logger.error("Couldn't read data from location " + config.inputPath +", please verify it's a valid location and that " +
             s"contains parquet files with a given schema and try again.")
@@ -70,11 +71,11 @@ object SuspiciousConnects {
         }
 
         val results: Option[SuspiciousConnectsAnalysisResults] = analysis match {
-          case "flow" => Some(FlowSuspiciousConnectsAnalysis.run(config, sparkContext, sqlContext, logger,
+          case "flow" => Some(FlowSuspiciousConnectsAnalysis.run(config, spark, logger,
             inputDataFrame))
-          case "dns" => Some(DNSSuspiciousConnectsAnalysis.run(config, sparkContext, sqlContext, logger,
+          case "dns" => Some(DNSSuspiciousConnectsAnalysis.run(config, spark, logger,
             inputDataFrame))
-          case "proxy" => Some(ProxySuspiciousConnectsAnalysis.run(config, sparkContext, sqlContext, logger,
+          case "proxy" => Some(ProxySuspiciousConnectsAnalysis.run(config, spark, logger,
             inputDataFrame))
           case _ => None
         }
@@ -84,9 +85,11 @@ object SuspiciousConnects {
 
             logger.info(s"$analysis suspicious connects analysis completed.")
             logger.info("Saving results to : " + config.hdfsScoredConnect)
-            resultRecords.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
 
-            InputOutputDataHandler.mergeResultsFiles(sparkContext, config.hdfsScoredConnect, analysis, logger)
+            import spark.implicits._
+            resultRecords.map(_.mkString(config.outputDelimiter)).rdd.saveAsTextFile(config.hdfsScoredConnect)
+
+            InputOutputDataHandler.mergeResultsFiles(spark, config.hdfsScoredConnect, analysis, logger)
 
             InvalidDataHandler.showAndSaveInvalidRecords(invalidRecords, config.hdfsScoredConnect, logger)
           }
@@ -94,7 +97,7 @@ object SuspiciousConnects {
           case None => logger.error("Unsupported (or misspelled) analysis: " + analysis)
         }
 
-        sparkContext.stop()
+        spark.stop()
 
       case None => logger.error("Error parsing arguments.")
     }

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
index d1c87ed..3e1e7a1 100644
--- a/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
@@ -18,10 +18,9 @@
 package org.apache.spot.dns
 
 import org.apache.log4j.Logger
-import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spot.SuspiciousConnects.SuspiciousConnectsAnalysisResults
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.dns.DNSSchema._
@@ -59,11 +58,10 @@ object DNSSuspiciousConnectsAnalysis {
     * Saves the most suspicious connections to a CSV file on HDFS.
     *
     * @param config Object encapsulating runtime parameters and CLI options.
-    * @param sparkContext
-    * @param sqlContext
+    * @param spark
     * @param logger
     */
-  def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger,
+  def run(config: SuspiciousConnectsConfig, spark: SparkSession, logger: Logger,
           inputDNSRecords: DataFrame): SuspiciousConnectsAnalysisResults = {
 
 
@@ -77,10 +75,10 @@ object DNSSuspiciousConnectsAnalysis {
 
     logger.info("Fitting probabilistic model to data")
     val model =
-      DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, config, dnsRecords)
+      DNSSuspiciousConnectsModel.trainModel(spark, logger, config, dnsRecords)
 
     logger.info("Identifying outliers")
-    val scoredDNSRecords = model.score(sparkContext, sqlContext, dnsRecords, config.userDomain, config.precisionUtility)
+    val scoredDNSRecords = model.score(spark, dnsRecords, config.userDomain, config.precisionUtility)
 
     val filteredScored = filterScoredRecords(scoredDNSRecords, config.threshold)
 

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSFeedback.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSFeedback.scala b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSFeedback.scala
index 9fac047..3e96e20 100644
--- a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSFeedback.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSFeedback.scala
@@ -17,9 +17,8 @@
 
 package org.apache.spot.dns.model
 
-import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spot.dns.model.DNSSuspiciousConnectsModel.{ModelSchema, modelColumns}
 
 import scala.io.Source
@@ -33,14 +32,12 @@ object DNSFeedback {
   /**
     * Load the feedback file for DNS data.
  *
-    * @param sc Spark context.
-    * @param sqlContext Spark SQL context.
-    * @param feedbackFile Local machine path to the DNS feedback file.
+    * @param spark             Spark Session
+    * @param feedbackFile      Local machine path to the DNS feedback file.
     * @param duplicationFactor Number of words to create per flagged feedback entry.
     * @return DataFrame of the feedback events.
     */
-  def loadFeedbackDF(sc: SparkContext,
-                     sqlContext: SQLContext,
+  def loadFeedbackDF(spark: SparkSession,
                      feedbackFile: String,
                      duplicationFactor: Int): DataFrame = {
 
@@ -52,7 +49,7 @@ object DNSFeedback {
       */
 
       val lines = Source.fromFile(feedbackFile).getLines().toArray.drop(1)
-      val feedback: RDD[String] = sc.parallelize(lines)
+      val feedback: RDD[String] = spark.sparkContext.parallelize(lines)
 
       /*
       The columns and their entries are as follows:
@@ -85,7 +82,7 @@ object DNSFeedback {
       val DnsQryRcodeIndex = 6
       val DnsSevIndex = 12
 
-      sqlContext.createDataFrame(feedback.map(_.split("\t"))
+      spark.createDataFrame(feedback.map(_.split("\t"))
         .filter(row => row(DnsSevIndex).trim.toInt == 3)
         .map(row => Row.fromSeq(Seq(row(FrameTimeIndex),
           row(UnixTimeStampIndex).toLong,
@@ -98,7 +95,7 @@ object DNSFeedback {
         .flatMap(row => List.fill(duplicationFactor)(row)), ModelSchema)
         .select(modelColumns:_*)
     } else {
-      sqlContext.createDataFrame(sc.emptyRDD[Row], ModelSchema)
+      spark.createDataFrame(spark.sparkContext.emptyRDD[Row], ModelSchema)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
index d8efcb2..c591f19 100644
--- a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
@@ -18,12 +18,11 @@
 package org.apache.spot.dns.model
 
 import org.apache.log4j.Logger
-import org.apache.spark.SparkContext
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.dns.DNSSchema._
 import org.apache.spot.dns.DNSWordCreation
@@ -66,18 +65,16 @@ class DNSSuspiciousConnectsModel(inTopicCount: Int,
     * Use a suspicious connects model to assign estimated probabilities to a dataframe of
     * DNS log events.
     *
-    * @param sc         Spark Context
-    * @param sqlContext Spark SQL context
+    * @param spark      Spark Session
     * @param inDF       Dataframe of DNS log events, containing at least the columns of [[DNSSuspiciousConnectsModel.ModelSchema]]
     * @param userDomain Domain associated to network data (ex: 'intel')
     * @return Dataframe with a column named [[org.apache.spot.dns.DNSSchema.Score]] that contains the
     *         probability estimated for the network event at that row
     */
-  def score(sc: SparkContext, sqlContext: SQLContext, inDF: DataFrame, userDomain: String,
-            precisionUtility: FloatPointPrecisionUtility): DataFrame = {
+  def score(spark: SparkSession, inDF: DataFrame, userDomain: String, precisionUtility: FloatPointPrecisionUtility): DataFrame = {
 
-    val topDomainsBC = sc.broadcast(TopDomains.TopDomains)
-    val wordToPerTopicProbBC = sc.broadcast(wordToPerTopicProb)
+    val topDomainsBC = spark.sparkContext.broadcast(TopDomains.TopDomains)
+    val wordToPerTopicProbBC = spark.sparkContext.broadcast(wordToPerTopicProb)
 
     val scoreFunction =
       new DNSScoreFunction(topicCount,
@@ -137,16 +134,14 @@ object DNSSuspiciousConnectsModel {
   /**
     * Create a new DNS Suspicious Connects model by training it on a data frame and a feedback file.
     *
-    * @param sparkContext
-    * @param sqlContext
+    * @param spark        Spark Session
     * @param logger
     * @param config       Analysis configuration object containing CLI parameters.
     *                     Contains the path to the feedback file in config.scoresFile
     * @param inputRecords Data used to train the model.
     * @return A new [[DNSSuspiciousConnectsModel]] instance trained on the dataframe and feedback file.
     */
-  def trainModel(sparkContext: SparkContext,
-                 sqlContext: SQLContext,
+  def trainModel(spark: SparkSession,
                  logger: Logger,
                  config: SuspiciousConnectsConfig,
                  inputRecords: DataFrame): DNSSuspiciousConnectsModel = {
@@ -155,13 +150,12 @@ object DNSSuspiciousConnectsModel {
 
     val selectedRecords = inputRecords.select(modelColumns: _*)
 
-    val totalRecords = selectedRecords.unionAll(DNSFeedback.loadFeedbackDF(sparkContext,
-      sqlContext,
+    val totalRecords = selectedRecords.union(DNSFeedback.loadFeedbackDF(spark,
       config.feedbackFile,
       config.duplicationFactor))
 
-    val countryCodesBC = sparkContext.broadcast(CountryCodes.CountryCodes)
-    val topDomainsBC = sparkContext.broadcast(TopDomains.TopDomains)
+    val countryCodesBC = spark.sparkContext.broadcast(CountryCodes.CountryCodes)
+    val topDomainsBC = spark.sparkContext.broadcast(TopDomains.TopDomains)
     val userDomain = config.userDomain
 
     val domainStatsRecords = createDomainStatsDF(sparkContext, sqlContext, countryCodesBC, topDomainsBC, userDomain, totalRecords)
@@ -172,18 +166,20 @@ object DNSSuspiciousConnectsModel {
 
     val dataWithWord = totalRecords.withColumn(Word, dnsWordCreator.wordCreationUDF(modelColumns: _*))
 
+    import spark.implicits._
+
     // aggregate per-word counts at each IP
     val ipDstWordCounts =
       dataWithWord
         .select(ClientIP, Word)
         .filter(dataWithWord(Word).notEqual(InvalidDataHandler.WordError))
         .map({ case Row(destIP: String, word: String) => (destIP, word) -> 1 })
+        .rdd
         .reduceByKey(_ + _)
         .map({ case ((ipDst, word), count) => SpotLDAInput(ipDst, word, count) })
 
 
-    val SpotLDAOutput(ipToTopicMix, wordToPerTopicProb) = SpotLDAWrapper.runLDA(sparkContext,
-      sqlContext,
+    val SpotLDAOutput(ipToTopicMix, wordToPerTopicProb) = SpotLDAWrapper.runLDA(spark,
       ipDstWordCounts,
       config.topicCount,
       logger,
@@ -201,8 +197,7 @@ object DNSSuspiciousConnectsModel {
   /**
     * Add  domain statistics fields to a data frame.
     *
-    * @param sparkContext   Spark context.
-    * @param sqlContext     Spark SQL context.
+    * @param spark          Spark Session
     * @param countryCodesBC Broadcast of the country codes set.
     * @param topDomainsBC   Broadcast of the most-popular domains set.
     * @param userDomain     Domain associated to network data (ex: 'intel')
@@ -210,8 +205,7 @@ object DNSSuspiciousConnectsModel {
     * @return A new dataframe with the new columns added. The new columns have the schema [[DomainStatsSchema]]
     */
 
-  def createDomainStatsDF(sparkContext: SparkContext,
-                          sqlContext: SQLContext,
+  def createDomainStatsDF(spark: SparkSession,
                           countryCodesBC: Broadcast[Set[String]],
                           topDomainsBC: Broadcast[Set[String]],
                           userDomain: String,
@@ -222,7 +216,7 @@ object DNSSuspiciousConnectsModel {
     val domainStatsRDD: RDD[Row] = inDF.rdd.map(row =>
       Row.fromTuple(createTempFields(countryCodesBC, topDomainsBC, userDomain, row.getString(queryNameIndex))))
 
-    sqlContext.createDataFrame(domainStatsRDD, DomainStatsSchema)
+    spark.createDataFrame(domainStatsRDD, DomainStatsSchema)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
index 315aae6..b674ea3 100644
--- a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
@@ -18,14 +18,13 @@
 package org.apache.spot.lda
 
 import org.apache.log4j.Logger
-import org.apache.spark.SparkContext
 import org.apache.spark.mllib.clustering._
 import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 import org.apache.spot.lda.SpotLDAWrapperSchema._
 import org.apache.spot.utilities.FloatPointPrecisionUtility
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 
 import scala.collection.immutable.Map
 
@@ -40,8 +39,7 @@ import scala.collection.immutable.Map
 
 object SpotLDAWrapper {
 
-  def runLDA(sparkContext: SparkContext,
-             sqlContext: SQLContext,
+  def runLDA(spark: SparkSession,
              docWordCount: RDD[SpotLDAInput],
              topicCount: Int,
              logger: Logger,
@@ -52,7 +50,7 @@ object SpotLDAWrapper {
              maxIterations: Int,
              precisionUtility: FloatPointPrecisionUtility): SpotLDAOutput = {
 
-    import sqlContext.implicits._
+    import spark.implicits._
 
     val docWordCountCache = docWordCount.cache()
 
@@ -80,7 +78,7 @@ object SpotLDAWrapper {
       formatSparkLDAInput(docWordCountCache,
         documentDictionary,
         wordDictionary,
-        sqlContext)
+        spark)
 
     docWordCountCache.unpersist()
 
@@ -153,7 +151,7 @@ object SpotLDAWrapper {
 
     // Create doc results from vector: convert docID back to string, convert vector of probabilities to array
     val docToTopicMixDF =
-      formatSparkLDADocTopicOutput(docTopicDist, documentDictionary, sqlContext, precisionUtility)
+      formatSparkLDADocTopicOutput(docTopicDist, documentDictionary, spark, precisionUtility)
 
     documentDictionary.unpersist()
 
@@ -170,9 +168,9 @@ object SpotLDAWrapper {
   def formatSparkLDAInput(docWordCount: RDD[SpotLDAInput],
                           documentDictionary: DataFrame,
                           wordDictionary: Map[String, Int],
-                          sqlContext: SQLContext): RDD[(Long, Vector)] = {
+                          spark: SparkSession): RDD[(Long, Vector)] = {
 
-    import sqlContext.implicits._
+    import spark.implicits._
 
     val getWordId = {
       udf((word: String) => (wordDictionary(word)))
@@ -193,6 +191,7 @@ object SpotLDAWrapper {
     val wordCountsPerDoc: RDD[(Long, Iterable[(Int, Double)])]
     = wordCountsPerDocDF
       .select(DocumentNumber, WordNumber, WordNameWordCount)
+      .rdd
       .map({ case Row(documentId: Long, wordId: Int, wordCount: Int) => (documentId.toLong, (wordId, wordCount.toDouble)) })
       .groupByKey
 
@@ -217,10 +216,10 @@ object SpotLDAWrapper {
     wordProbs.zipWithIndex.map({ case (topicProbs, wordInd) => (wordMap(wordInd), topicProbs) }).toMap
   }
 
-  def formatSparkLDADocTopicOutput(docTopDist: RDD[(Long, Vector)], documentDictionary: DataFrame, sqlContext:
-  SQLContext, precisionUtility: FloatPointPrecisionUtility):
+  def formatSparkLDADocTopicOutput(docTopDist: RDD[(Long, Vector)], documentDictionary: DataFrame, spark: SparkSession,
+  precisionUtility: FloatPointPrecisionUtility):
   DataFrame = {
-    import sqlContext.implicits._
+    import spark.implicits._
 
     val topicDistributionToArray = udf((topicDistribution: Vector) => topicDistribution.toArray)
     val documentToTopicDistributionDF = docTopDist.toDF(DocumentNumber, TopicProbabilityMix)

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSchema.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSchema.scala b/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSchema.scala
index 906b019..1b25f55 100644
--- a/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSchema.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSchema.scala
@@ -49,7 +49,7 @@ object FlowSchema {
   val SecondField = StructField(Second, IntegerType, nullable = true)
 
   val Duration = "tdur"
-  val DurationField = StructField(Duration, FloatType, nullable = true)
+  val DurationField = StructField(Duration, DoubleType, nullable = true)
 
   val SourceIP = "sip"
   val SourceIPField = StructField(SourceIP, StringType, nullable = true)

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
index ba3bf9a..c5dd7db 100644
--- a/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
@@ -18,10 +18,9 @@
 package org.apache.spot.netflow
 
 import org.apache.log4j.Logger
-import org.apache.spark.SparkContext
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spot.SuspiciousConnects.SuspiciousConnectsAnalysisResults
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.netflow.FlowSchema._
@@ -74,7 +73,7 @@ object FlowSuspiciousConnectsAnalysis {
       ObytField,
       ScoreField)).fieldNames.map(col)
 
-  def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger,
+  def run(config: SuspiciousConnectsConfig, spark: SparkSession, logger: Logger,
           inputFlowRecords: DataFrame): SuspiciousConnectsAnalysisResults = {
 
     logger.info("Starting flow suspicious connects analysis.")
@@ -83,10 +82,10 @@ object FlowSuspiciousConnectsAnalysis {
 
     logger.info("Fitting probabilistic model to data")
     val model =
-      FlowSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, config, flowRecords)
+      FlowSuspiciousConnectsModel.trainModel(spark, logger, config, flowRecords)
 
     logger.info("Identifying outliers")
-    val scoredFlowRecords = model.score(sparkContext, sqlContext, flowRecords, config.precisionUtility)
+    val scoredFlowRecords = model.score(spark, flowRecords, config.precisionUtility)
 
     val filteredScored = filterScoredRecords(scoredFlowRecords, config.threshold)
 

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowFeedback.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowFeedback.scala b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowFeedback.scala
index 6955d54..09a55d1 100644
--- a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowFeedback.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowFeedback.scala
@@ -17,9 +17,8 @@
 
 package org.apache.spot.netflow.model
 
-import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spot.netflow.model.FlowSuspiciousConnectsModel._
 
 import scala.io.Source
@@ -34,14 +33,12 @@ object FlowFeedback {
   /**
     * Load the feedback file for netflow data.
     *
-    * @param sc                Spark context.
-    * @param sqlContext        Spark SQL context.
+    * @param spark             Spark Session.
     * @param feedbackFile      Local machine path to the netflow feedback file.
     * @param duplicationFactor Number of words to create per flagged feedback entry.
     * @return DataFrame of the feedback events.
     */
-  def loadFeedbackDF(sc: SparkContext,
-                     sqlContext: SQLContext,
+  def loadFeedbackDF(spark: SparkSession,
                      feedbackFile: String,
                      duplicationFactor: Int): DataFrame = {
 
@@ -53,7 +50,7 @@ object FlowFeedback {
       */
 
       val lines = Source.fromFile(feedbackFile).getLines().toArray.drop(1)
-      val feedback: RDD[String] = sc.parallelize(lines)
+      val feedback: RDD[String] = spark.sparkContext.parallelize(lines)
 
       /*
          flow_scores.csv - feedback file structure
@@ -82,7 +79,7 @@ object FlowFeedback {
       val MinuteIndex = 21
       val SecondIndex = 22
 
-      sqlContext.createDataFrame(feedback.map(_.split("\t"))
+      spark.createDataFrame(feedback.map(_.split("\t"))
         .filter(row => row(ScoreIndex).trim.toInt == 3)
         .map(row => Row.fromSeq(Seq(
           row(TimeStartIndex).split(" ")(1).split(":")(0).trim.toInt, // hour
@@ -98,7 +95,7 @@ object FlowFeedback {
         .select(ModelColumns: _*)
 
     } else {
-      sqlContext.createDataFrame(sc.emptyRDD[Row], ModelSchema)
+      spark.createDataFrame(spark.sparkContext.emptyRDD[Row], ModelSchema)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
index e053437..4933a91 100644
--- a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
@@ -18,11 +18,10 @@
 package org.apache.spot.netflow.model
 
 import org.apache.log4j.Logger
-import org.apache.spark.SparkContext
 import org.apache.spark.sql.WideUDFs.udf
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.lda.SpotLDAWrapper
 import org.apache.spot.lda.SpotLDAWrapper.{SpotLDAInput, SpotLDAOutput}
@@ -57,10 +56,9 @@ class FlowSuspiciousConnectsModel(topicCount: Int,
                                   ipToTopicMix: DataFrame,
                                   wordToPerTopicProb: Map[String, Array[Double]]) {
 
-  def score(sc: SparkContext, sqlContext: SQLContext, flowRecords: DataFrame,
-            precisionUtility: FloatPointPrecisionUtility): DataFrame = {
+  def score(spark: SparkSession, flowRecords: DataFrame, precisionUtility: FloatPointPrecisionUtility): DataFrame = {
 
-    val wordToPerTopicProbBC = sc.broadcast(wordToPerTopicProb)
+    val wordToPerTopicProbBC = spark.sparkContext.broadcast(wordToPerTopicProb)
 
 
     /** A left outer join (below) takes rows from the left DF for which the join expression is not
@@ -83,6 +81,7 @@ class FlowSuspiciousConnectsModel(topicCount: Int,
 
     val scoreFunction = new FlowScoreFunction(topicCount, wordToPerTopicProbBC)
 
+    import org.apache.spark.sql.functions.udf
 
     val scoringUDF = udf((hour: Int,
                           srcIP: String,
@@ -132,11 +131,10 @@ object FlowSuspiciousConnectsModel {
   val ModelColumns = ModelSchema.fieldNames.toList.map(col)
 
 
-    def trainModel(sparkContext: SparkContext,
-                   sqlContext: SQLContext,
-                   logger: Logger,
-                   config: SuspiciousConnectsConfig,
-                   inputRecords: DataFrame): FlowSuspiciousConnectsModel = {
+  def trainModel(spark: SparkSession,
+                 logger: Logger,
+                 config: SuspiciousConnectsConfig,
+                 inputRecords: DataFrame): FlowSuspiciousConnectsModel = {
 
 
     logger.info("Training netflow suspicious connects model from " + config.inputPath)
@@ -144,41 +142,40 @@ object FlowSuspiciousConnectsModel {
     val selectedRecords = inputRecords.select(ModelColumns: _*)
 
 
-    val totalRecords = selectedRecords.unionAll(FlowFeedback.loadFeedbackDF(sparkContext,
-      sqlContext,
+    val totalRecords = selectedRecords.unionAll(FlowFeedback.loadFeedbackDF(spark,
       config.feedbackFile,
       config.duplicationFactor))
 
 
-
-
     // simplify netflow log entries into "words"
 
     val dataWithWords = totalRecords.withColumn(SourceWord, FlowWordCreator.srcWordUDF(ModelColumns: _*))
       .withColumn(DestinationWord, FlowWordCreator.dstWordUDF(ModelColumns: _*))
 
+    import spark.implicits._
+
     // Aggregate per-word counts at each IP
     val srcWordCounts = dataWithWords
       .filter(dataWithWords(SourceWord).notEqual(InvalidDataHandler.WordError))
       .select(SourceIP, SourceWord)
       .map({ case Row(sourceIp: String, sourceWord: String) => (sourceIp, sourceWord) -> 1 })
+      .rdd
       .reduceByKey(_ + _)
 
     val dstWordCounts = dataWithWords
       .filter(dataWithWords(DestinationWord).notEqual(InvalidDataHandler.WordError))
       .select(DestinationIP, DestinationWord)
       .map({ case Row(destinationIp: String, destinationWord: String) => (destinationIp, destinationWord) -> 1 })
+      .rdd
       .reduceByKey(_ + _)
 
     val ipWordCounts =
-      sparkContext.union(srcWordCounts, dstWordCounts)
+      spark.sparkContext.union(srcWordCounts, dstWordCounts)
         .reduceByKey(_ + _)
         .map({ case ((ip, word), count) => SpotLDAInput(ip, word, count) })
 
 
-    val SpotLDAOutput(ipToTopicMix, wordToPerTopicProb) =
-    SpotLDAWrapper.runLDA(sparkContext,
-      sqlContext,
+    val SpotLDAOutput(ipToTopicMix, wordToPerTopicProb) = SpotLDAWrapper.runLDA(spark,
       ipWordCounts,
       config.topicCount,
       logger,

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/main/scala/org/apache/spot/proxy/ProxyFeedback.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxyFeedback.scala b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxyFeedback.scala
index 158e6e1..6df0de4 100644
--- a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxyFeedback.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxyFeedback.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spot.proxy
 
-import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -31,14 +30,12 @@ object ProxyFeedback {
   /**
     * Load the feedback file for proxy data.
  *
-    * @param sc Spark context.
-    * @param sqlContext Spark SQL context.
-    * @param feedbackFile Local machine path to the proxy feedback file.
+    * @param spark             Spark Session
+    * @param feedbackFile      Local machine path to the proxy feedback file.
     * @param duplicationFactor Number of words to create per flagged feedback entry.
     * @return DataFrame of the feedback events.
     */
-  def loadFeedbackDF(sc: SparkContext,
-                     sqlContext: SQLContext,
+  def loadFeedbackDF(spark: SparkSession,
                      feedbackFile: String,
                      duplicationFactor: Int): DataFrame = {
 
@@ -69,9 +66,9 @@ object ProxyFeedback {
       val fullURISeverityIndex = 22
 
       val lines = Source.fromFile(feedbackFile).getLines().toArray.drop(1)
-      val feedback: RDD[String] = sc.parallelize(lines)
+      val feedback: RDD[String] = spark.sparkContext.parallelize(lines)
 
-      sqlContext.createDataFrame(feedback.map(_.split("\t"))
+      spark.createDataFrame(feedback.map(_.split("\t"))
         .filter(row => row(fullURISeverityIndex).trim.toInt == 3)
         .map(row => Row.fromSeq(List(row(dateIndex),
           row(timeIndex),
@@ -85,7 +82,7 @@ object ProxyFeedback {
         .flatMap(row => List.fill(duplicationFactor)(row)), feedbackSchema)
         .select(Date, Time, ClientIP, Host, ReqMethod, UserAgent, ResponseContentType, RespCode, FullURI)
     } else {
-      sqlContext.createDataFrame(sc.emptyRDD[Row], feedbackSchema)
+      spark.createDataFrame(spark.sparkContext.emptyRDD[Row], feedbackSchema)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
index 6f3bcc1..689ccfd 100644
--- a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
@@ -18,10 +18,9 @@
 package org.apache.spot.proxy
 
 import org.apache.log4j.Logger
-import org.apache.spark.SparkContext
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spot.SuspiciousConnects.SuspiciousConnectsAnalysisResults
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.proxy.ProxySchema._
@@ -80,12 +79,11 @@ object ProxySuspiciousConnectsAnalysis {
   /**
     * Run suspicious connections analysis on proxy data.
     *
-    * @param config       SuspicionConnectsConfig object, contains runtime parameters from CLI.
-    * @param sparkContext Apache Spark context.
-    * @param sqlContext   Spark SQL context.
-    * @param logger       Logs execution progress, information and errors for user.
+    * @param config SuspicionConnectsConfig object, contains runtime parameters from CLI.
+    * @param spark  Spark Session
+    * @param logger Logs execution progress, information and errors for user.
     */
-  def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger,
+  def run(config: SuspiciousConnectsConfig, spark: SparkSession, logger: Logger,
           inputProxyRecords: DataFrame): SuspiciousConnectsAnalysisResults = {
 
     logger.info("Starting proxy suspicious connects analysis.")
@@ -96,10 +94,10 @@ object ProxySuspiciousConnectsAnalysis {
       .na.fill(DefaultResponseContentType, Seq(ResponseContentType))
 
     logger.info("Fitting probabilistic model to data")
-    val model = ProxySuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, config, proxyRecords)
+    val model = ProxySuspiciousConnectsModel.trainModel(spark, logger, config, proxyRecords)
 
     logger.info("Identifying outliers")
-    val scoredProxyRecords = model.score(sparkContext, proxyRecords, config.precisionUtility)
+    val scoredProxyRecords = model.score(spark, proxyRecords, config.precisionUtility)
 
     // take the maxResults least probable events of probability below the threshold and sort
 

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
index cebd19c..798706b 100644
--- a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
@@ -18,11 +18,10 @@
 package org.apache.spot.proxy
 
 import org.apache.log4j.Logger
-import org.apache.spark.SparkContext
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.SuspiciousConnectsScoreFunction
 import org.apache.spot.lda.SpotLDAWrapper
@@ -46,19 +45,19 @@ class ProxySuspiciousConnectsModel(topicCount: Int,
   /**
     * Calculate suspicious connection scores for an incoming dataframe using this proxy suspicious connects model.
     *
-    * @param sc        Spark context.
+    * @param spark     Spark Session.
     * @param dataFrame Dataframe with columns Host, Time, ReqMethod, FullURI, ResponseContentType, UserAgent, RespCode
     *                  (as defined in ProxySchema object).
     * @return Dataframe with Score column added.
     */
-  def score(sc: SparkContext, dataFrame: DataFrame, precisionUtility: FloatPointPrecisionUtility): DataFrame = {
+  def score(spark: SparkSession, dataFrame: DataFrame, precisionUtility: FloatPointPrecisionUtility)): DataFrame = {
 
-    val topDomains: Broadcast[Set[String]] = sc.broadcast(TopDomains.TopDomains)
+    val topDomains: Broadcast[Set[String]] = spark.sparkContext.broadcast(TopDomains.TopDomains)
 
     val agentToCount: Map[String, Long] =
       dataFrame.select(UserAgent).rdd.map({ case Row(ua: String) => (ua, 1L) }).reduceByKey(_ + _).collect().toMap
 
-    val agentToCountBC = sc.broadcast(agentToCount)
+    val agentToCountBC = spark.sparkContext.broadcast(agentToCount)
 
     val udfWordCreation =
       ProxyWordCreation.udfWordCreation(topDomains, agentToCountBC)
@@ -72,7 +71,7 @@ class ProxySuspiciousConnectsModel(topicCount: Int,
         dataFrame(UserAgent),
         dataFrame(RespCode)))
 
-    val wordToPerTopicProbBC = sc.broadcast(wordToPerTopicProb)
+    val wordToPerTopicProbBC = spark.sparkContext.broadcast(wordToPerTopicProb)
 
     val scoreFunction = new SuspiciousConnectsScoreFunction(topicCount, wordToPerTopicProbBC)
 
@@ -106,16 +105,14 @@ object ProxySuspiciousConnectsModel {
     * Trains the model from the incoming DataFrame using the specified number of topics
     * for clustering in the topic model.
     *
-    * @param sparkContext Spark context.
-    * @param sqlContext   SQL context.
+    * @param spark        Spark Session
     * @param logger       Logge object.
     * @param config       SuspiciousConnetsArgumnetParser.Config object containg CLI arguments.
     * @param inputRecords Dataframe for training data, with columns Host, Time, ReqMethod, FullURI, ResponseContentType,
     *                     UserAgent, RespCode (as defined in ProxySchema object).
     * @return ProxySuspiciousConnectsModel
     */
-  def trainModel(sparkContext: SparkContext,
-                 sqlContext: SQLContext,
+  def trainModel(spark: SparkSession,
                  logger: Logger,
                  config: SuspiciousConnectsConfig,
                  inputRecords: DataFrame): ProxySuspiciousConnectsModel = {
@@ -125,7 +122,7 @@ object ProxySuspiciousConnectsModel {
 
     val selectedRecords =
       inputRecords.select(Date, Time, ClientIP, Host, ReqMethod, UserAgent, ResponseContentType, RespCode, FullURI)
-      .unionAll(ProxyFeedback.loadFeedbackDF(sparkContext, sqlContext, config.feedbackFile, config.duplicationFactor))
+        .unionAll(ProxyFeedback.loadFeedbackDF(spark, config.feedbackFile, config.duplicationFactor))
 
 
 
@@ -136,17 +133,15 @@ object ProxySuspiciousConnectsModel {
         .reduceByKey(_ + _).collect()
         .toMap
 
-    val agentToCountBC = sparkContext.broadcast(agentToCount)
+    val agentToCountBC = spark.sparkContext.broadcast(agentToCount)
 
 
 
     val docWordCount: RDD[SpotLDAInput] =
-      getIPWordCounts(sparkContext, sqlContext, logger, selectedRecords, config.feedbackFile, config.duplicationFactor,
+      getIPWordCounts(spark, logger, selectedRecords, config.feedbackFile, config.duplicationFactor,
         agentToCount)
 
-
-    val SpotLDAOutput(ipToTopicMix, wordResults) = SpotLDAWrapper.runLDA(sparkContext,
-      sqlContext,
+    val SpotLDAOutput(ipToTopicMixDF, wordResults) = SpotLDAWrapper.runLDA(spark,
       docWordCount,
       config.topicCount,
       logger,
@@ -167,8 +162,7 @@ object ProxySuspiciousConnectsModel {
     *
     * @return RDD of [[SpotLDAInput]] objects containing the aggregated IP-word counts.
     */
-  def getIPWordCounts(sc: SparkContext,
-                      sqlContext: SQLContext,
+  def getIPWordCounts(spark: SparkSession,
                       logger: Logger,
                       inputRecords: DataFrame,
                       feedbackFile: String,
@@ -179,19 +173,19 @@ object ProxySuspiciousConnectsModel {
     logger.info("Read source data")
     val selectedRecords = inputRecords.select(Date, Time, ClientIP, Host, ReqMethod, UserAgent, ResponseContentType, RespCode, FullURI)
 
-    val wc = ipWordCountFromDF(sc, selectedRecords, agentToCount)
+    val wc = ipWordCountFromDF(spark, selectedRecords, agentToCount)
     logger.info("proxy pre LDA completed")
 
     wc
   }
 
-  def ipWordCountFromDF(sc: SparkContext,
+  def ipWordCountFromDF(spark: SparkSession,
                         dataFrame: DataFrame,
                         agentToCount: Map[String, Long]): RDD[SpotLDAInput] = {
 
-    val topDomains: Broadcast[Set[String]] = sc.broadcast(TopDomains.TopDomains)
+    val topDomains: Broadcast[Set[String]] = spark.sparkContext.broadcast(TopDomains.TopDomains)
 
-    val agentToCountBC = sc.broadcast(agentToCount)
+    val agentToCountBC = spark.sparkContext.broadcast(agentToCount)
     val udfWordCreation = ProxyWordCreation.udfWordCreation(topDomains, agentToCountBC)
 
     val ipWord = dataFrame.withColumn(Word,

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala b/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala
index 945ac7c..8b0ea82 100644
--- a/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala
@@ -19,8 +19,7 @@ package org.apache.spot.utilities.data
 
 import org.apache.hadoop.fs.{LocatedFileStatus, Path, RemoteIterator, FileUtil => fileUtil}
 import org.apache.log4j.Logger
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.{DataFrame, SparkSession}
 
 
 /**
@@ -31,15 +30,15 @@ object InputOutputDataHandler {
 
   /**
     *
-    * @param sqlContext Application SqlContext.
-    * @param inputPath  HDFS input folder for every execution; flow, dns or proxy.
-    * @param logger     Application logger.
+    * @param spark     Spark Session.
+    * @param inputPath HDFS input folder for every execution; flow, dns or proxy.
+    * @param logger    Application logger.
     * @return raw data frame.
     */
-  def getInputDataFrame(sqlContext: SQLContext, inputPath: String, logger: Logger): Option[DataFrame] = {
+  def getInputDataFrame(spark: SparkSession, inputPath: String, logger: Logger): Option[DataFrame] = {
     try {
       logger.info("Loading data from: " + inputPath)
-      Some(sqlContext.read.parquet(inputPath))
+      Some(spark.read.parquet(inputPath))
     } catch {
       case _: Throwable => None
     }
@@ -47,13 +46,13 @@ object InputOutputDataHandler {
 
   /**
     *
-    * @param sparkContext      Application SparkContext.
+    * @param spark             Spark Session.
     * @param hdfsScoredConnect HDFS output folder. The location where results were saved; flow, dns or proxy.
     * @param analysis          Data type to analyze.
     * @param logger            Application Logger.
     */
-  def mergeResultsFiles(sparkContext: SparkContext, hdfsScoredConnect: String, analysis: String, logger: Logger) {
-    val hadoopConfiguration = sparkContext.hadoopConfiguration
+  def mergeResultsFiles(spark: SparkSession, hdfsScoredConnect: String, analysis: String, logger: Logger) {
+    val hadoopConfiguration = spark.sparkContext.hadoopConfiguration
     val fileSystem = org.apache.hadoop.fs.FileSystem.get(hadoopConfiguration)
 
     val exists = fileSystem.exists(new org.apache.hadoop.fs.Path(hdfsScoredConnect))

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/resources/log4j.properties b/spot-ml/src/test/resources/log4j.properties
index 3fd5d2b..feca830 100644
--- a/spot-ml/src/test/resources/log4j.properties
+++ b/spot-ml/src/test/resources/log4j.properties
@@ -1,8 +1,8 @@
 # Change this to set Spark log level
-log4j.logger.org.apache.spark=WARN
+log4j.logger.org.apache.spark=ERROR
 
 # Silence akka remoting
-log4j.logger.Remoting=WARN
+log4j.logger.Remoting=ERROR
 
 # Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.eclipse.jetty=WARN
\ No newline at end of file
+log4j.logger.org.eclipse.jetty=ERROR
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/test/scala/org/apache/spot/SpotLDAWrapperTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/SpotLDAWrapperTest.scala b/spot-ml/src/test/scala/org/apache/spot/SpotLDAWrapperTest.scala
new file mode 100644
index 0000000..1a5fecf
--- /dev/null
+++ b/spot-ml/src/test/scala/org/apache/spot/SpotLDAWrapperTest.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spot
+
+import org.apache.log4j.{Level, LogManager}
+import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spot.lda.SpotLDAWrapper
+import org.apache.spot.lda.SpotLDAWrapper._
+import org.apache.spot.lda.SpotLDAWrapperSchema._
+import org.apache.spot.testutils.TestingSparkContextFlatSpec
+import org.scalatest.Matchers
+
+import scala.collection.immutable.Map
+
+class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
+
+    val ldaAlpha = 1.02
+    val ldaBeta = 1.001
+    val ldaMaxiterations = 20
+
+    "SparkLDA" should "handle an extremely unbalanced two word doc" in {
+      val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
+      logger.setLevel(Level.WARN)
+
+      val catFancy = SpotLDAInput("pets", "cat", 1)
+      val dogWorld = SpotLDAInput("pets", "dog", 999)
+
+      val data = spark.sparkContext.parallelize(Seq(catFancy, dogWorld))
+      val out = SpotLDAWrapper.runLDA(spark, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta, ldaMaxiterations)
+
+      val topicMixDF = out.docToTopicMix
+
+      var topicMix =
+        topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq(0).asInstanceOf[Seq[Double]].toArray
+      val catTopics = out.wordResults("cat")
+      val dogTopics = out.wordResults("dog")
+
+      Math.abs(topicMix(0) * catTopics(0) + topicMix(1) * catTopics(1)) should be < 0.01
+      Math.abs(0.999 - (topicMix(0) * dogTopics(0) + topicMix(1) * dogTopics(1))) should be < 0.01
+    }
+
+    "SparkLDA" should "handle distinct docs on distinct words" in {
+      val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
+      logger.setLevel(Level.WARN)
+      val catFancy = SpotLDAInput("cat fancy", "cat", 1)
+      val dogWorld = SpotLDAInput("dog world", "dog", 1)
+
+      val data = spark.sparkContext.parallelize(Seq(catFancy, dogWorld))
+      val out = SpotLDAWrapper.runLDA(spark, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta, ldaMaxiterations)
+
+      val topicMixDF = out.docToTopicMix
+      var dogTopicMix: Array[Double] =
+        topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first().toSeq(0).asInstanceOf[Seq[Double]].toArray
+
+      val catTopicMix: Array[Double] =
+        topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first().toSeq(0).asInstanceOf[Seq[Double]].toArray
+
+      val catTopics = out.wordResults("cat")
+      val dogTopics = out.wordResults("dog")
+
+      Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
+      Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
+    }
+
+    "formatSparkLDAInput" should "return input in RDD[(Long, Vector)] (collected as Array for testing) format. The index " +
+      "is the docID, values are the vectors of word occurrences in that doc" in {
+
+
+      val documentWordData = spark.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
+        SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
+        SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
+        SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
+
+      val wordDictionary = Map("333333_7.0_0.0_1.0" -> 0, "1111111_6.0_3.0_5.0" -> 1, "-1_43_7.0_2.0_6.0" -> 2, "-1_80_6.0_1.0_1.0" -> 3)
+
+      val documentDictionary: DataFrame = spark.createDataFrame(documentWordData
+          .map({ case SpotLDAInput(doc, word, count) => doc })
+          .distinct
+          .zipWithIndex.map({case (d,c) => Row(d,c)}), StructType(List(DocumentNameField, DocumentNumberField)))
+
+
+      val sparkLDAInput: RDD[(Long, Vector)] = SpotLDAWrapper.formatSparkLDAInput(documentWordData, documentDictionary, wordDictionary, spark)
+      val sparkLDAInArr: Array[(Long, Vector)] = sparkLDAInput.collect()
+
+      sparkLDAInArr shouldBe Array((0, Vectors.sparse(4, Array(0, 3), Array(8.0, 5.0))), (2, Vectors.sparse(4, Array(2), Array(2.0))), (1, Vectors.sparse(4, Array(1), Array(4.0))))
+    }
+
+    "formatSparkLDADocTopicOuptut" should "return RDD[(String,Array(Double))] after converting doc results from vector: " +
+      "convert docID back to string, convert vector of probabilities to array" in {
+
+
+      val documentWordData = spark.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
+        SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
+        SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
+        SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
+
+      val documentDictionary: DataFrame = spark.createDataFrame(documentWordData
+          .map({ case SpotLDAInput(doc, word, count) => doc })
+          .distinct
+          .zipWithIndex.map({case (d,c) => Row(d,c)}), StructType(List(DocumentNameField, DocumentNumberField)))
+
+      val docTopicDist: RDD[(Long, Vector)] = spark.sparkContext.parallelize(Array((0.toLong, Vectors.dense(0.15,
+        0.3, 0.5, 0.05)), (1.toLong,
+        Vectors.dense(0.25, 0.15, 0.4, 0.2)), (2.toLong, Vectors.dense(0.4, 0.1, 0.3, 0.2))))
+
+      val sparkDocRes: DataFrame = formatSparkLDADocTopicOutput(docTopicDist, documentDictionary, spark)
+
+      import testImplicits._
+      val documents = sparkDocRes.select(DocumentName).map(documentName => documentName.toString.replaceAll("\\[", "").replaceAll("\\]", "")).collect()
+
+      documents(0) should be("66.23.45.11")
+      documents(1) should be("192.168.1.1")
+      documents(2) should be("10.10.98.123")
+    }
+
+    "formatSparkLDAWordOutput" should "return Map[Int,String] after converting word matrix to sequence, wordIDs back to strings, and sequence of probabilities to array" in {
+      val testMat = Matrices.dense(4, 4, Array(0.5, 0.2, 0.05, 0.25, 0.25, 0.1, 0.15, 0.5, 0.1, 0.4, 0.25, 0.25, 0.7, 0.2, 0.02, 0.08))
+
+      val wordDictionary = Map("-1_23.0_7.0_7.0_4.0" -> 3, "23.0_7.0_7.0_4.0" -> 0, "333333.0_7.0_7.0_4.0" -> 2, "80.0_7.0_7.0_4.0" -> 1)
+      val revWordMap: Map[Int, String] = wordDictionary.map(_.swap)
+
+      val sparkWordRes = formatSparkLDAWordOutput(testMat, revWordMap)
+
+      sparkWordRes should contain key ("23.0_7.0_7.0_4.0")
+      sparkWordRes should contain key ("80.0_7.0_7.0_4.0")
+      sparkWordRes should contain key ("333333.0_7.0_7.0_4.0")
+      sparkWordRes should contain key ("-1_23.0_7.0_7.0_4.0")
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala b/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
index 9a53591..d5e8d9a 100644
--- a/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
@@ -19,8 +19,8 @@ package org.apache.spot.dns
 
 
 import org.apache.log4j.{Level, LogManager}
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Row, SQLContext}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.dns.DNSSchema._
 import org.apache.spot.dns.model.DNSSuspiciousConnectsModel
@@ -96,9 +96,9 @@ class DNSSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec with
 
     val anomalousRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT", 1463735425L, 1, "172.16.9.132", "122.2o7.turner.com", "0x00000001", 1, 0)
     val typicalRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT", 1463735425L, 168, "172.16.9.132", "122.2o7.turner.com", "0x00000001", 1, 0)
-    val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
-    val model = DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, emTestConfig, data)
-    val scoredData = model.score(sparkContext, sqlContext, data, emTestConfig.userDomain, emTestConfig.precisionUtility)
+    val data = spark.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
+    val model = DNSSuspiciousConnectsModel.trainModel(spark, logger, emTestConfig, data)
+    val scoredData = model.score(spark, data, emTestConfig.userDomain, emTestConfig.precisionUtility)
     val anomalyScore = scoredData.filter(scoredData(FrameLength) === 1).first().getAs[Double](Score)
     val typicalScores = scoredData.filter(scoredData(FrameLength) === 168).collect().map(_.getAs[Double](Score))
 
@@ -118,9 +118,9 @@ class DNSSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec with
 
     val anomalousRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT", 1463735425L, 1, "172.16.9.132", "122.2o7.turner.com", "0x00000001", 1, 0)
     val typicalRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT", 1463735425L, 168, "172.16.9.132", "122.2o7.turner.com", "0x00000001", 1, 0)
-    val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
-    val model = DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, onlineTestConfig, data)
-    val scoredData = model.score(sparkContext, sqlContext, data, onlineTestConfig.userDomain, onlineTestConfig.precisionUtility)
+    val data = sparkSession.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
+    val model = DNSSuspiciousConnectsModel.trainModel(sparkSession, logger, onlineTestConfig, data)
+    val scoredData = model.score(sparkSession, data, onlineTestConfig.userDomain, onlineTestConfig.precisionUtility)
     val anomalyScore = scoredData.filter(scoredData(FrameLength) === 1).first().getAs[Double](Score)
     val typicalScores = scoredData.filter(scoredData(FrameLength) === 168).collect().map(_.getAs[Double](Score))
 
@@ -154,9 +154,9 @@ class DNSSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec with
       "0x00000001",
       1,
       0)
-    val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
-    val model = DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, emTestConfig, data)
-    val scoredData = model.score(sparkContext, sqlContext, data, emTestConfig.userDomain, emTestConfig.precisionUtility)
+    val data = sparkSession.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
+    val model = DNSSuspiciousConnectsModel.trainModel(sparkSession, logger, emTestConfig, data)
+    val scoredData = model.score(sparkSession, data, emTestConfig.userDomain, emTestConfig.precisionUtility)
     val anomalyScore = scoredData.
       filter(scoredData(QueryName) === "1111111111111111111111111111111111111111111111111111111111111.tinker.turner.com").
       first().
@@ -194,9 +194,9 @@ class DNSSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec with
       1,
       0)
 
-    val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
-    val model = DNSSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, onlineTestConfig, data)
-    val scoredData = model.score(sparkContext, sqlContext, data, onlineTestConfig.userDomain, onlineTestConfig.precisionUtility)
+    val data = sparkSession.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord))
+    val model = DNSSuspiciousConnectsModel.trainModel(sparkSession, logger, onlineTestConfig, data)
+    val scoredData = model.score(sparkSession, data, onlineTestConfig.userDomain, onlineTestConfig.precisionUtility)
 
     val anomalyScore = scoredData.
       filter(scoredData(QueryName) === "1111111111111111111111111111111111111111111111111111111111111.tinker.turner.com").
@@ -306,9 +306,8 @@ class DNSSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec with
 
   def testDNSRecords = new {
 
-    val sqlContext = new SQLContext(sparkContext)
 
-    val inputDNSRecordsRDD = sparkContext.parallelize(wrapRefArray(Array(
+    val inputDNSRecordsRDD = spark.sparkContext.parallelize(wrapRefArray(Array(
       Seq(null, 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "0x00000001", 1, 0),
       Seq("", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "0x00000001", 1, 0),
       Seq("-", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "0x00000001", 1, 0),
@@ -344,9 +343,9 @@ class DNSSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec with
         QueryTypeField,
         QueryResponseCodeField))
 
-    val inputDNSRecordsDF = sqlContext.createDataFrame(inputDNSRecordsRDD, inputDNSRecordsSchema)
+    val inputDNSRecordsDF = spark.createDataFrame(inputDNSRecordsRDD, inputDNSRecordsSchema)
 
-    val scoredDNSRecordsRDD = sparkContext.parallelize(wrapRefArray(Array(
+    val scoredDNSRecordsRDD = spark.sparkContext.parallelize(wrapRefArray(Array(
       Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "0x00000001", 1, 0, 1d),
       Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "0x00000001", 1, 0, 0.0000005),
       Seq("May 20 2016 02:10:25.970987000 PDT", 1463735425l, 168, "172.16.9.132", "turner.com.122.2o...", "0x00000001", 1, 0, 0.05),
@@ -365,7 +364,7 @@ class DNSSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec with
         QueryResponseCodeField,
         ScoreField))
 
-    val scoredDNSRecordsDF = sqlContext.createDataFrame(scoredDNSRecordsRDD, scoredDNSRecordsSchema)
+    val scoredDNSRecordsDF = spark.createDataFrame(scoredDNSRecordsRDD, scoredDNSRecordsSchema)
 
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/test/scala/org/apache/spot/netflow/FlowRecord.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/netflow/FlowRecord.scala b/spot-ml/src/test/scala/org/apache/spot/netflow/FlowRecord.scala
index 87078bc..88b84b3 100644
--- a/spot-ml/src/test/scala/org/apache/spot/netflow/FlowRecord.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/netflow/FlowRecord.scala
@@ -10,7 +10,7 @@ case class FlowRecord(treceived: String,
                       trhour: Int,
                       trminute: Int,
                       trsec: Int,
-                      tdur: Float,
+                      tdur: Double,
                       sip: String,
                       dip: String,
                       sport: Int,
@@ -18,5 +18,5 @@ case class FlowRecord(treceived: String,
                       proto: String,
                       ipkt: Long,
                       ibyt: Long,
-                      opkt: Int,
-                      obyt: Int)
+                      opkt: Long,
+                      obyt: Long)

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/89883bd7/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala b/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala
index e0bdf83..1d14d0d 100644
--- a/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala
@@ -19,7 +19,7 @@ package org.apache.spot.netflow
 
 import org.apache.log4j.{Level, LogManager}
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.netflow.FlowSchema._
 import org.apache.spot.netflow.FlowSuspiciousConnectsAnalysis.InSchema
@@ -83,18 +83,18 @@ class FlowSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wit
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.OFF)
 
-    val anomalousRecord = FlowRecord("2016-05-05 00:11:01", 2016, 5, 5, 0, 0, 1, 0.972f, "172.16.0.129", "10.0.2" +
-      ".202", 1024, 80, "TCP", 39L, 12522L, 0, 0)
-    val typicalRecord = FlowRecord("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972f, "172.16.0.129", "10.0.2" +
-      ".202", 1024, 80, "TCP", 39L, 12522L, 0, 0)
+    val anomalousRecord = FlowRecord("2016-05-05 00:11:01", 2016, 5, 5, 0, 0, 1, 0.972d, "172.16.0.129", "10.0.2" +
+      ".202", 1024, 80, "TCP", 39l, 12522l, 0l, 0l)
+    val typicalRecord = FlowRecord("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972d, "172.16.0.129", "10.0.2" +
+      ".202", 1024, 80, "TCP", 39l, 12522l, 0l, 0l)
 
-    val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord,
+    val data = sparkSession.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord,
       typicalRecord, typicalRecord, typicalRecord, typicalRecord))
 
     val model =
-      FlowSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, emTestConfig, data)
+      FlowSuspiciousConnectsModel.trainModel(sparkSession, logger, emTestConfig, data)
 
-    val scoredData = model.score(sparkContext, sqlContext, data, emTestConfig.precisionUtility)
+    val scoredData = model.score(sparkSession, data, emTestConfig.precisionUtility)
 
     val anomalyScore = scoredData.filter(scoredData(Hour) === 0).first().getAs[Double](Score)
     val typicalScores = scoredData.filter(scoredData(Hour) === 13).collect().map(_.getAs[Double](Score))
@@ -122,13 +122,13 @@ class FlowSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wit
     val typicalRecord = FlowRecord("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972f, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39, 12522, 0, 0)
 
 
-    val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord,
+    val data = sparkSession.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord,
       typicalRecord, typicalRecord, typicalRecord, typicalRecord))
 
     val model =
-      FlowSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, onlineTestConfig, data)
+      FlowSuspiciousConnectsModel.trainModel(sparkSession, logger, onlineTestConfig, data)
 
-    val scoredData = model.score(sparkContext, sqlContext, data, onlineTestConfig.precisionUtility)
+    val scoredData = model.score(sparkSession data, onlineTestConfig.precisionUtility)
 
     val anomalyScore = scoredData.filter(scoredData(Hour) === 0).first().getAs[Double](Score)
     val typicalScores = scoredData.filter(scoredData(Hour) === 13).collect().map(_.getAs[Double](Score))
@@ -166,11 +166,14 @@ class FlowSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wit
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.INFO)
 
-    val anomalousRecord = FlowRecord("2016-05-05 00:11:01", 2016, 5, 5, 0, 0, 1, 0.972f, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39, 12522, 0, 0)
-    val typicalRecord = FlowRecord("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972f, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39, 12522, 0, 0)
+    val anomalousRecord = FlowRecord("2016-05-05 00:11:01", 2016, 5, 5, 0, 0, 1, 0.972d, "172.16.0.129", "10.0.2" +
+      ".202", 1024, 80, "TCP", 39l, 12522l, 0l, 0l)
+    val typicalRecord = FlowRecord("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972d, "172.16.0.129", "10.0.2" +
+      ".202", 1024, 80, "TCP", 39l, 12522l, 0l, 0l)
 
 
-    val data = sqlContext.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord,
+    val data = spark.createDataFrame(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord,
+      typicalRecord,
       typicalRecord, typicalRecord, typicalRecord, typicalRecord))
 
 
@@ -179,10 +182,10 @@ class FlowSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wit
 
     logger.info("Fitting probabilistic model to data")
     val model =
-      FlowSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, testConfig2, flows)
+      FlowSuspiciousConnectsModel.trainModel(spark, logger, testConfig2, flows)
 
     logger.info("Identifying outliers")
-    val scoredData = model.score(sparkContext, sqlContext, flows, testConfig2.precisionUtility)
+    val scoredData = model.score(spark, flows, testConfig2.precisionUtility)
 
     val anomalyScore = scoredData.filter(scoredData(Hour) === 0).first().getAs[Double](Score)
     val typicalScores = scoredData.filter(scoredData(Hour) === 13).collect().map(_.getAs[Double](Score))
@@ -220,7 +223,7 @@ class FlowSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wit
       FlowSuspiciousConnectsModel.trainModel(sparkContext, sqlContext, logger, testingConfigFloatConversion, data)
 
     logger.info("Identifying outliers")
-    val scoredData = model.score(sparkContext, sqlContext, data, testingConfigFloatConversion.precisionUtility)
+    val scoredData = model.score(sparkSession, data, testingConfigFloatConversion.precisionUtility)
 
 
     val anomalyScore = scoredData.filter(scoredData(Hour) === 0).first().getAs[Double](Score)
@@ -269,24 +272,37 @@ class FlowSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wit
   }
 
   def testFlowRecords = new {
-    val sqlContext = new SQLContext(sparkContext)
-
-    val inputFlowRecordsRDD = sparkContext.parallelize(wrapRefArray(Array(
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 24, 54, 58, 0.972, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l, 12522l, 0, 0),
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 60, 0.972, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l, 12522l, 0, 0),
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 60, 58, 0.972, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l, 12522l, 0, 0),
-      Seq(null, 2016, 5, 5, 13, 54, 58, 0.972, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l, 12522l, 0, 0),
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972, null, "10.0.2.202", 1024, 80, "TCP", 39l, 12522l, 0, 0),
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972, "172.16.0.129", null, 1024, 80, "TCP", 39l, 12522l, 0, 0),
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972, "172.16.0.129", "10.0.2.202", null, 80, "TCP", 39l, 12522l, 0, 0),
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972, "172.16.0.129", "10.0.2.202", 1024, null, "TCP", 39l, 12522l, 0, 0),
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", null, 12522l, 0, 0),
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l, null, 0, 0),
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l, 12522l, null, 0),
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l, 12522l, 0, null),
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l, 12522l, 0, 0),
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l, 12522l, 0, 0),
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l, 12522l, 0, 0))
+
+    val inputFlowRecordsRDD = spark.sparkContext.parallelize(wrapRefArray(Array(
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 24, 54, 58, 0.972d, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l,
+        12522l, 0l, 0l),
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 60, 0.972d, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l,
+        12522l, 0l, 0l),
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 60, 58, 0.972d, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l,
+        12522l, 0l, 0l),
+      Seq(null, 2016, 5, 5, 13, 54, 58, 0.972, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l, 12522l, 0l, 0l),
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972d, null, "10.0.2.202", 1024, 80, "TCP", 39l, 12522l, 0l,
+        0l),
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972d, "172.16.0.129", null, 1024, 80, "TCP", 39l, 12522l,
+        0l, 0l),
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972d, "172.16.0.129", "10.0.2.202", null, 80, "TCP", 39l,
+        12522l, 0l, 0l),
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972d, "172.16.0.129", "10.0.2.202", 1024, null, "TCP",
+        39l, 12522l, 0l, 0l),
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972d, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", null,
+        12522l, 0l, 0l),
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972d, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l,
+        null, 0l, 0l),
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972d, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l,
+        12522l, null, 0l),
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972d, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l,
+        12522l, 0l, null),
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972d, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l,
+        12522l, 0l, 0l),
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972d, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l,
+        12522l, 0l, 0l),
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972d, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l,
+        12522l, 0l, 0l))
       .map(row => Row.fromSeq(row))))
 
     val inputFlowRecordsSchema = StructType(
@@ -308,14 +324,19 @@ class FlowSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wit
         OpktField,
         ObytField))
 
-    val inputFlowRecordsDF = sqlContext.createDataFrame(inputFlowRecordsRDD, inputFlowRecordsSchema)
-
-    val scoredFlowRecordsRDD = sparkContext.parallelize(wrapRefArray(Array(
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l, 12522l, 0, 0, -1d),
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l, 12522l, 0, 0, 1d),
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l, 12522l, 0, 0, 0.0000005),
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l, 12522l, 0, 0, 0.05),
-      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l, 12522l, 0, 0, 0.0001))
+    val inputFlowRecordsDF = spark.createDataFrame(inputFlowRecordsRDD, inputFlowRecordsSchema)
+
+    val scoredFlowRecordsRDD = spark.sparkContext.parallelize(wrapRefArray(Array(
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972d, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l,
+        12522l, 0l, 0l, -1d),
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972d, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l,
+        12522l, 0l, 0l, 1d),
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972d, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l,
+        12522l, 0l, 0l, 0.0000005d),
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972d, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l,
+        12522l, 0l, 0l, 0.05d),
+      Seq("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972d, "172.16.0.129", "10.0.2.202", 1024, 80, "TCP", 39l,
+        12522l, 0l, 0l, 0.0001d))
       .map(row => Row.fromSeq(row))))
 
     val scoredFlowRecordsSchema = StructType(
@@ -338,7 +359,7 @@ class FlowSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wit
         ObytField,
         ScoreField))
 
-    val scoredFlowRecordsDF = sqlContext.createDataFrame(scoredFlowRecordsRDD, scoredFlowRecordsSchema)
+    val scoredFlowRecordsDF = spark.createDataFrame(scoredFlowRecordsRDD, scoredFlowRecordsSchema)
   }
 
 }


Mime
View raw message