eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject incubator-eagle git commit: EAGLE-158 Remove org.apache.eagle.datastream.EagleTuple
Date Fri, 19 Feb 2016 01:11:33 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/master fba3aebae -> c7ce706b4


EAGLE-158 Remove org.apache.eagle.datastream.EagleTuple

https://issues.apache.org/jira/browse/EAGLE-158

Remove org.apache.eagle.datastream.EagleTuple
Replace all org.apache.eagle.datastream.TupleN with scala.TupleN

Reviewer: @yonzhang <yonzhang2012@apache.org>

Closes #93


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c7ce706b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c7ce706b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c7ce706b

Branch: refs/heads/master
Commit: c7ce706b4217d8e68388bf1fffade2eec489f2bc
Parents: fba3aeb
Author: Hao Chen <hao@apache.org>
Authored: Fri Feb 19 09:11:21 2016 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Fri Feb 19 09:11:21 2016 +0800

----------------------------------------------------------------------
 .../dedup/AlertDeduplicationExecutorBase.java   |  4 +-
 .../notification/AlertNotificationExecutor.java |  4 +-
 .../alert/persist/AlertPersistExecutor.java     |  2 +-
 .../eagle/alert/cep/TestSiddhiEvaluator.java    |  2 +-
 .../impl/aggregate/SimpleAggregateExecutor.java |  2 +-
 .../dataproc/impl/persist/PersistExecutor.java  |  2 +-
 .../JavaStormExecutorForAlertWrapper.java       |  4 +-
 .../datastream/core/StreamAlertExpansion.scala  |  6 +--
 .../datastream/storm/JavaStormBoltWrapper.scala | 12 +++---
 .../datastream/storm/StormBoltFactory.scala     | 14 +++----
 .../datastream/storm/StormBoltWrapper.scala     | 10 ++---
 .../storm/StormExecutorForAlertWrapper.scala    |  4 +-
 .../datastream/storm/StormWrapperUtils.scala    | 28 ++++++++++++++
 .../eagle/datastream/JavaEchoExecutor.java      |  1 +
 .../datastream/TestSimpleAggregateExecutor.java |  3 +-
 .../datastream/StormWrapperUtilsSpec.scala      | 40 ++++++++++++++++++++
 .../eagle/datastream/TestStormRunner.scala      |  2 +-
 .../apache/eagle/datastream/EagleTuple.scala    | 38 -------------------
 .../eagle/datastream/StormStreamExecutor.scala  |  4 +-
 .../policy/executor/PolicyProcessExecutor.java  |  2 +-
 .../gc/executor/GCLogAnalysorExecutor.java      |  2 +-
 .../gc/executor/GCMetricGeneratorExecutor.java  |  1 +
 .../kafka/KafkaMessageDistributionExecutor.java |  2 +-
 ...baseResourceSensitivityDataJoinExecutor.java |  2 +-
 .../FileSensitivityDataJoinExecutor.java        |  2 +-
 .../auditlog/HdfsUserCommandReassembler.java    |  2 +-
 .../auditlog/IPZoneDataJoinExecutor.java        |  2 +-
 .../auditlog/TestUserCommandReassembler.java    |  8 ++--
 eagle-security/eagle-security-hive/pom.xml      |  5 +++
 .../jobrunning/HiveQueryParserExecutor.java     |  2 +-
 .../JobConfigurationAdaptorExecutor.java        |  2 +-
 ...HiveResourceSensitivityDataJoinExecutor.java |  2 +-
 .../detection/pom.xml                           |  5 +++
 .../userprofile/UserActivityAggregator.java     |  2 +-
 .../UserProfileAggregatorExecutor.java          |  2 +-
 .../UserProfileDetectionBatchMain.java          |  1 +
 .../UserProfileDetectionStreamMain.java         |  1 +
 .../impl/UserActivityAggregatorImpl.java        |  6 +--
 .../userprofile/TestUserActivityAggregator.java | 12 +++---
 pom.xml                                         |  1 +
 40 files changed, 144 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
index 27be5d6..6c774ab 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.policy.common.Constants;
 import org.apache.eagle.alert.config.DeduplicatorConfig;
 import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
@@ -30,14 +31,13 @@ import org.apache.eagle.policy.DynamicPolicyLoader;
 import org.apache.eagle.policy.PolicyLifecycleMethods;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.Tuple2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.sun.jersey.client.impl.CopyOnWriteHashMap;
 import com.typesafe.config.Config;
+import scala.Tuple2;
 
 public abstract class AlertDeduplicationExecutorBase extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods<AlertDefinitionAPIEntity> {
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
index 06ecb0d..332fde4 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
@@ -27,11 +27,11 @@ import org.apache.eagle.policy.DynamicPolicyLoader;
 import org.apache.eagle.policy.PolicyLifecycleMethods;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor1;
-import org.apache.eagle.datastream.Tuple1;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.typesafe.config.Config;
+import scala.Tuple1;
 
 /**
  * notify alert by email, kafka message, storage or other means
@@ -125,4 +125,4 @@ public class AlertNotificationExecutor extends JavaStormStreamExecutor1<String>
 			notificationManager.updateNotificationPlugins( alertDef , true );
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java
index d6be567..61bb7dc 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java
@@ -21,7 +21,7 @@ import com.typesafe.config.Config;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor1;
-import org.apache.eagle.datastream.Tuple1;
+import scala.Tuple1;
 
 import java.util.Arrays;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
index 3aac2eb..3f180ce 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
@@ -27,7 +27,6 @@ import org.apache.eagle.alert.executor.AlertExecutor;
 import org.apache.eagle.alert.siddhi.SiddhiAlertAPIEntityRender;
 import org.apache.eagle.dataproc.core.ValuesArray;
 import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.policy.PolicyEvaluationContext;
 import org.apache.eagle.policy.common.Constants;
 import org.apache.eagle.policy.dao.*;
@@ -36,6 +35,7 @@ import org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluator;
 import org.apache.eagle.policy.siddhi.StreamMetadataManager;
 import org.apache.eagle.service.client.EagleServiceConnector;
 import org.junit.Test;
+import scala.Tuple2;
 
 import java.util.*;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
index 1d8eecc..e0dadbf 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
@@ -24,7 +24,6 @@ import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEnt
 import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.policy.PolicyEvaluationContext;
 import org.apache.eagle.policy.PolicyEvaluator;
 import org.apache.eagle.policy.PolicyManager;
@@ -35,6 +34,7 @@ import org.apache.eagle.policy.siddhi.SiddhiEvaluationHandler;
 import org.apache.hadoop.hbase.util.MD5Hash;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 import java.util.Date;
 import java.util.HashMap;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java
index ac0325d..2e1754b 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java
@@ -21,10 +21,10 @@ import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
 import org.apache.eagle.dataproc.impl.persist.druid.KafkaPersistService;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.datastream.core.StorageType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 import java.text.MessageFormat;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
index 2dec286..a485d76 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.SortedMap;
 
 import com.typesafe.config.Config;
+import scala.Tuple2;
+import scala.Tuple3;
 
 public class JavaStormExecutorForAlertWrapper extends JavaStormStreamExecutor3<String, String, SortedMap<Object, Object>>{
     private JavaStormStreamExecutor<Tuple2<String, SortedMap<Object, Object>>> delegate;
@@ -47,7 +49,7 @@ public class JavaStormExecutorForAlertWrapper extends JavaStormStreamExecutor3<S
             @Override
             public void collect(Object o) {
                 Tuple2 tuple2 = (Tuple2)o;
-                collector.collect(new Tuple3(tuple2.f0(), streamName, tuple2.f1()));
+                collector.collect(new Tuple3(tuple2._1, streamName, tuple2._2));
             }
         };
         delegate.flatMap(input, delegateCollector);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
index 01d3a70..618bba3 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
@@ -27,11 +27,9 @@ import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl
 
 import scala.collection.JavaConversions.asScalaSet
 import scala.collection.mutable.ListBuffer
-import org.apache.eagle.datastream.EagleTuple
 import org.apache.eagle.datastream.JavaStormExecutorForAlertWrapper
 import org.apache.eagle.datastream.JavaStormStreamExecutor
 import org.apache.eagle.datastream.StormStreamExecutor
-import org.apache.eagle.datastream.Tuple2
 import org.apache.eagle.datastream.storm.StormExecutorForAlertWrapper
 import org.apache.eagle.datastream.utils.AlertExecutorConsumerUtils
 import org.apache.eagle.service.client.EagleServiceConnector
@@ -159,11 +157,11 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
       case _: FlatMapProducer[AnyRef, AnyRef] => {
         val mapper = current.asInstanceOf[FlatMapProducer[_, _]].mapper
         mapper match {
-          case a: JavaStormStreamExecutor[EagleTuple] => {
+          case a: JavaStormStreamExecutor[AnyRef] => {
             val newmapper = new JavaStormExecutorForAlertWrapper(a.asInstanceOf[JavaStormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
             newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false).stream(current.streamId)
           }
-          case b: StormStreamExecutor[EagleTuple] => {
+          case b: StormStreamExecutor[AnyRef] => {
             val newmapper = StormExecutorForAlertWrapper(b.asInstanceOf[StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
             newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false).stream(current.streamId)
           }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala
index 1c0d42e..802c782 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala
@@ -22,12 +22,10 @@ import backtype.storm.task.{OutputCollector, TopologyContext}
 import backtype.storm.topology.OutputFieldsDeclarer
 import backtype.storm.topology.base.BaseRichBolt
 import backtype.storm.tuple.{Fields, Tuple}
-import org.apache.eagle.datastream.{Collector, EagleTuple, JavaStormStreamExecutor}
+import org.apache.eagle.datastream.{Collector, JavaStormStreamExecutor}
 import org.slf4j.LoggerFactory
 
-import scala.collection.JavaConverters._
-
-case class JavaStormBoltWrapper(worker : JavaStormStreamExecutor[EagleTuple]) extends BaseRichBolt{
+case class JavaStormBoltWrapper(worker : JavaStormStreamExecutor[AnyRef]) extends BaseRichBolt{
   val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass)
   var _collector : OutputCollector = null
 
@@ -37,9 +35,9 @@ case class JavaStormBoltWrapper(worker : JavaStormStreamExecutor[EagleTuple]) ex
   }
 
   override def execute(input : Tuple): Unit ={
-    worker.flatMap(input.getValues, new Collector[EagleTuple](){
-      def collect(t: EagleTuple): Unit ={
-        _collector.emit(input, t.getList.asJava)
+    worker.flatMap(input.getValues, new Collector[AnyRef](){
+      def collect(t: AnyRef): Unit ={
+        _collector.emit(input, StormWrapperUtils.productAsJavaList(t.asInstanceOf[Product]))
       }
     })
     _collector.ack(input)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
index 42a030d..21057e7 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
@@ -29,12 +29,12 @@ object StormBoltFactory {
     implicit val streamInfo = producer.getInfo
     producer match{
       case FlatMapProducer(worker) => {
-        if(worker.isInstanceOf[JavaStormStreamExecutor[EagleTuple]]){
-          worker.asInstanceOf[JavaStormStreamExecutor[EagleTuple]].prepareConfig(config)
-          JavaStormBoltWrapper(worker.asInstanceOf[JavaStormStreamExecutor[EagleTuple]])
-        }else if(worker.isInstanceOf[StormStreamExecutor[EagleTuple]]){
-          worker.asInstanceOf[StormStreamExecutor[EagleTuple]].prepareConfig(config)
-          StormBoltWrapper(worker.asInstanceOf[StormStreamExecutor[EagleTuple]])
+        if(worker.isInstanceOf[JavaStormStreamExecutor[AnyRef]]){
+          worker.asInstanceOf[JavaStormStreamExecutor[AnyRef]].prepareConfig(config)
+          JavaStormBoltWrapper(worker.asInstanceOf[JavaStormStreamExecutor[AnyRef]])
+        }else if(worker.isInstanceOf[StormStreamExecutor[AnyRef]]){
+          worker.asInstanceOf[StormStreamExecutor[AnyRef]].prepareConfig(config)
+          StormBoltWrapper(worker.asInstanceOf[StormStreamExecutor[AnyRef]])
         }else if(worker.isInstanceOf[FlatMapperWrapper[Any]]){
           StormFlatFunctionWrapper(worker.asInstanceOf[FlatMapperWrapper[Any]].func)
         } else {
@@ -56,7 +56,7 @@ object StormBoltFactory {
       case persist : PersistProducer[Any] => {
         val persisExecutor = new PersistExecutor(persist.executorId, persist.storageType.toString)
         persisExecutor.prepareConfig(config)
-        JavaStormBoltWrapper(persisExecutor.asInstanceOf[JavaStormStreamExecutor[EagleTuple]])
+        JavaStormBoltWrapper(persisExecutor.asInstanceOf[JavaStormStreamExecutor[AnyRef]])
       }
       case _ => throw new UnsupportedOperationException(s"Unsupported producer: ${producer.toString}")
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltWrapper.scala
index 3b91e77..75f045e 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltWrapper.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltWrapper.scala
@@ -22,12 +22,12 @@ import backtype.storm.task.{OutputCollector, TopologyContext}
 import backtype.storm.topology.OutputFieldsDeclarer
 import backtype.storm.topology.base.BaseRichBolt
 import backtype.storm.tuple.{Fields, Tuple}
-import org.apache.eagle.datastream.{Collector, EagleTuple, StormStreamExecutor}
+import org.apache.eagle.datastream.{Collector, StormStreamExecutor}
 import org.slf4j.LoggerFactory
 
 import scala.collection.JavaConverters._
 
-case class StormBoltWrapper(worker : StormStreamExecutor[EagleTuple]) extends BaseRichBolt{
+case class StormBoltWrapper(worker : StormStreamExecutor[AnyRef]) extends BaseRichBolt{
   val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass)
   var _collector : OutputCollector = null
 
@@ -38,9 +38,9 @@ case class StormBoltWrapper(worker : StormStreamExecutor[EagleTuple]) extends Ba
 
   override def execute(input : Tuple): Unit = {
     try {
-      worker.flatMap(input.getValues.asScala, new Collector[EagleTuple] {
-        override def collect(t: EagleTuple): Unit = {
-          _collector.emit(input, t.getList.asJava)
+      worker.flatMap(input.getValues.asScala, new Collector[AnyRef] {
+        override def collect(t: AnyRef): Unit = {
+          _collector.emit(input, StormWrapperUtils.productAsJavaList(t.asInstanceOf[Product]))
         }
       })
     }catch{

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
index bb34972..f15e736 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
@@ -21,8 +21,6 @@ import java.util
 import org.apache.eagle.datastream.Collector
 import org.apache.eagle.datastream.StormStreamExecutor
 import org.apache.eagle.datastream.StormStreamExecutor3
-import org.apache.eagle.datastream.Tuple2
-import org.apache.eagle.datastream.Tuple3
 
 import com.typesafe.config.Config
 
@@ -39,7 +37,7 @@ case class StormExecutorForAlertWrapper(delegate: StormStreamExecutor[Tuple2[Str
   override def flatMap(input: Seq[AnyRef], collector: Collector[Tuple3[String, String, util.SortedMap[Object, Object]]]): Unit = {
     delegate.flatMap(input, new Collector[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]] {
       override def collect(r: Tuple2[String, util.SortedMap[AnyRef, AnyRef]]): Unit = {
-        collector.collect(Tuple3(r.f0, streamName, r.f1))
+        collector.collect(Tuple3(r._1, streamName, r._2))
       }
     })
   }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormWrapperUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormWrapperUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormWrapperUtils.scala
new file mode 100644
index 0000000..95c6ac1
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormWrapperUtils.scala
@@ -0,0 +1,28 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.eagle.datastream.storm
+
+import java.util
+
+object StormWrapperUtils {
+  def productAsJavaList(product:Product):util.List[AnyRef]={
+    val list = new util.LinkedList[AnyRef]()
+    product.productIterator.foreach((p:Any) => list.add(p.asInstanceOf[AnyRef]))
+    list
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/JavaEchoExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/JavaEchoExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/JavaEchoExecutor.java
index 7b21288..511db38 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/JavaEchoExecutor.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/JavaEchoExecutor.java
@@ -19,6 +19,7 @@ package org.apache.eagle.datastream;
 import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple1;
 
 public class JavaEchoExecutor extends JavaStormStreamExecutor1<String>{
     private static Logger LOG = LoggerFactory.getLogger(JavaEchoExecutor.class);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java
index 974cd19..0d0638d 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestSimpleAggregateExecutor.java
@@ -22,6 +22,7 @@ import junit.framework.Assert;
 import org.apache.eagle.dataproc.impl.aggregate.SimpleAggregateExecutor;
 import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity;
 import org.junit.Test;
+import scala.Tuple2;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -61,7 +62,7 @@ public class TestSimpleAggregateExecutor {
         sae.flatMap(tuple, new Collector<Tuple2<String, AggregateEntity>>(){
             @Override
             public void collect(Tuple2<String, AggregateEntity> stringAggregateEntityTuple2) {
-                System.out.print(stringAggregateEntityTuple2.f0());
+                System.out.print(stringAggregateEntityTuple2._1());
                 count.incrementAndGet();
             }
         });

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/StormWrapperUtilsSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/StormWrapperUtilsSpec.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/StormWrapperUtilsSpec.scala
new file mode 100644
index 0000000..9d378c9
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/StormWrapperUtilsSpec.scala
@@ -0,0 +1,40 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package org.apache.eagle.datastream
+
+import org.apache.eagle.datastream.storm.StormWrapperUtils
+import org.scalatest.{FlatSpec, Matchers}
+
+class StormWrapperUtilsSpec extends FlatSpec with Matchers{
+  import StormWrapperUtils._
+  "StormWrapperUtils" should "convert Tuple{1,2,3,..} to java.util.List" in {
+    val list1 = productAsJavaList(new Tuple1("a"))
+    list1.size() should be(1)
+    list1.get(0) should be("a")
+
+    val list2 = productAsJavaList(new Tuple2("a","b"))
+    list2.size() should be(2)
+    list2.get(0) should be("a")
+    list2.get(1) should be("b")
+
+    val list3 = productAsJavaList(new Tuple3("a","b","c"))
+    list3.size() should be(3)
+    list3.get(0) should be("a")
+    list3.get(1) should be("b")
+    list3.get(2) should be("c")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
index 27fdd88..c071eb2 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
@@ -33,7 +33,7 @@ object UnionForAlert extends App{
   val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
   val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key2",a))
   tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume = false)
-  //env.execute
+  env.execute()
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala
deleted file mode 100644
index 63aa4fb..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-trait EagleTuple extends Serializable{
-  def getList : List[AnyRef]
-}
-
-case class Tuple1[T0](f0 : T0) extends EagleTuple{
-  override def getList : List[AnyRef] = List(f0.asInstanceOf[AnyRef])
-
-}
-
-case class Tuple2[T0, T1](f0 : T0, f1: T1) extends EagleTuple{
-  override def getList : List[AnyRef] = List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef])
-}
-
-case class Tuple3[T0, T1, T2](f0 : T0, f1: T1, f2: T2) extends EagleTuple{
-  override def getList : List[AnyRef] = List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef], f2.asInstanceOf[AnyRef])
-}
-
-case class Tuple4[T0, T1, T2, T3](f0 : T0, f1: T1, f2: T2, f3 : T3) extends EagleTuple{
-  override def getList : List[AnyRef] = List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef], f2.asInstanceOf[AnyRef], f3.asInstanceOf[AnyRef])
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
index 22ae213..836c7eb 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
@@ -19,13 +19,13 @@ package org.apache.eagle.datastream
 import com.typesafe.config.Config
 import scala.collection.JavaConverters._
 
-trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[R] {
+trait StormStreamExecutor[R <: Any] extends FlatMapper[R] {
   def prepareConfig(config : Config)
   def init
   def fields : Array[String]
 }
 
-trait JavaStormStreamExecutor[R <: EagleTuple] extends FlatMapper[R] {
+trait JavaStormStreamExecutor[R <: AnyRef] extends FlatMapper[R] {
   def prepareConfig(config : Config)
   def init
   def fields : Array[String]

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
index 6c580f6..e71a045 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
@@ -27,7 +27,6 @@ import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
 import org.apache.eagle.dataproc.core.ValuesArray;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.metric.reportor.EagleCounterMetric;
 import org.apache.eagle.metric.reportor.EagleMetricListener;
 import org.apache.eagle.metric.reportor.EagleServiceReporterMetricListener;
@@ -41,6 +40,7 @@ import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
 import org.apache.eagle.policy.siddhi.StreamMetadataManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 import java.lang.management.ManagementFactory;
 import java.util.HashMap;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalysorExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalysorExecutor.java b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalysorExecutor.java
index 5eec9ba..e74bc44 100644
--- a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalysorExecutor.java
+++ b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalysorExecutor.java
@@ -22,13 +22,13 @@ package org.apache.eagle.gc.executor;
 import com.typesafe.config.Config;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.gc.model.GCPausedEvent;
 import org.apache.eagle.gc.stream.GCStreamBuilder;
 import org.apache.eagle.gc.parser.exception.IgnoredLogFormatException;
 import org.apache.eagle.gc.parser.exception.UnrecognizedLogFormatException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 import java.util.List;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorExecutor.java b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorExecutor.java
index 03f00aa..eda6564 100644
--- a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorExecutor.java
+++ b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorExecutor.java
@@ -29,6 +29,7 @@ import org.apache.eagle.gc.model.GCPausedEvent;
 import org.apache.eagle.metric.reportor.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 import java.util.*;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
index b521d65..dbb6e9a 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
@@ -25,10 +25,10 @@ import org.apache.commons.lang.time.DateUtils;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor1;
-import org.apache.eagle.datastream.Tuple1;
 import org.apache.eagle.metric.reportor.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple1;
 
 import java.util.HashMap;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java
index 92cc206..82db688 100644
--- a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java
+++ b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java
@@ -20,12 +20,12 @@ package org.apache.eagle.security.hbase.sensitivity;
 import com.typesafe.config.Config;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.security.entity.HbaseResourceSensitivityAPIEntity;
 import org.apache.eagle.security.util.ExternalDataCache;
 import org.apache.eagle.security.util.ExternalDataJoiner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 import java.util.List;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java
index 60b7b57..ddc8ce3 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java
@@ -19,7 +19,6 @@ package org.apache.eagle.security.auditlog;
 import com.typesafe.config.Config;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.security.auditlog.timer.FileSensitivityPollingJob;
 import org.apache.eagle.security.auditlog.util.SimplifyPath;
 import org.apache.eagle.security.entity.FileSensitivityAPIEntity;
@@ -27,6 +26,7 @@ import org.apache.eagle.security.util.ExternalDataCache;
 import org.apache.eagle.security.util.ExternalDataJoiner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 import java.util.Map;
 import java.util.TreeMap;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
index ea1ea8d..1b2df3b 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
@@ -23,7 +23,6 @@ import org.apache.eagle.policy.siddhi.AttributeType;
 import org.apache.eagle.policy.siddhi.SiddhiStreamMetadataUtils;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.security.entity.HdfsUserCommandPatternEntity;
 import org.apache.eagle.service.client.EagleServiceConnector;
 import org.slf4j.Logger;
@@ -33,6 +32,7 @@ import org.wso2.siddhi.core.SiddhiManager;
 import org.wso2.siddhi.core.event.Event;
 import org.wso2.siddhi.core.query.output.callback.QueryCallback;
 import org.wso2.siddhi.core.stream.input.InputHandler;
+import scala.Tuple2;
 
 import java.util.*;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
index b98f0cf..6baae35 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
@@ -19,13 +19,13 @@ package org.apache.eagle.security.auditlog;
 import com.typesafe.config.Config;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.security.auditlog.timer.IPZonePollingJob;
 import org.apache.eagle.security.entity.IPZoneEntity;
 import org.apache.eagle.security.util.ExternalDataCache;
 import org.apache.eagle.security.util.ExternalDataJoiner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 import java.util.Map;
 import java.util.TreeMap;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
index 27a2e7d..2bca39f 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
@@ -23,8 +23,8 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import junit.framework.Assert;
 import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.Tuple2;
 import org.junit.Test;
+import scala.Tuple2;
 
 import java.util.*;
 
@@ -54,7 +54,7 @@ public class TestUserCommandReassembler {
         Collector<Tuple2<String, Map>> collector = new Collector<Tuple2<String, Map>>(){
             @Override
             public void collect(Tuple2<String, Map> stringMapTuple2) {
-                String cmd = (String)stringMapTuple2.f1().get("cmd");Assert.assertEquals("user:appendToFile", cmd);
+                String cmd = (String)stringMapTuple2._2().get("cmd");Assert.assertEquals("user:appendToFile", cmd);
                 System.out.println("assert passed!!!");
             }
         };
@@ -83,7 +83,7 @@ public class TestUserCommandReassembler {
         Collector<Tuple2<String, Map>> collector = new Collector<Tuple2<String, Map>>(){
             @Override
             public void collect(Tuple2<String, Map> stringMapTuple2) {
-                String cmd = (String)stringMapTuple2.f1().get("cmd");
+                String cmd = (String)stringMapTuple2._2().get("cmd");
                 Assert.assertEquals("user:read", cmd);
                 System.out.println("assert passed!!!");
             }
@@ -120,7 +120,7 @@ public class TestUserCommandReassembler {
         Collector<Tuple2<String, Map>> collector = new Collector<Tuple2<String, Map>>(){
             @Override
             public void collect(Tuple2<String, Map> stringMapTuple2) {
-                String cmd = (String)stringMapTuple2.f1().get("cmd");
+                String cmd = (String)stringMapTuple2._2().get("cmd");
                 Assert.assertEquals("user:copyFromLocal", cmd);
                 Assert.assertEquals("user:appendToFile", cmd);
                 System.out.println("assert passed!!!");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-security/eagle-security-hive/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/pom.xml b/eagle-security/eagle-security-hive/pom.xml
index 4843a76..dcacd5b 100644
--- a/eagle-security/eagle-security-hive/pom.xml
+++ b/eagle-security/eagle-security-hive/pom.xml
@@ -82,5 +82,10 @@
 	   	<groupId>org.apache.hive</groupId>
 	   	<artifactId>hive-exec</artifactId>
 	   </dependency>
+	  <dependency>
+		  <groupId>org.scala-lang</groupId>
+		  <artifactId>scala-library</artifactId>
+		  <version>2.10.4</version>
+	  </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveQueryParserExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveQueryParserExecutor.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveQueryParserExecutor.java
index 757f51d..ad06bd4 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveQueryParserExecutor.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveQueryParserExecutor.java
@@ -19,11 +19,11 @@ package org.apache.eagle.security.hive.jobrunning;
 import com.typesafe.config.Config;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.security.hive.ql.HiveQLParserContent;
 import org.apache.eagle.security.hive.ql.Parser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 import java.util.Map;
 import java.util.Map.Entry;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobConfigurationAdaptorExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobConfigurationAdaptorExecutor.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobConfigurationAdaptorExecutor.java
index 3bf4f29..29c4fd2 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobConfigurationAdaptorExecutor.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobConfigurationAdaptorExecutor.java
@@ -18,7 +18,6 @@ package org.apache.eagle.security.hive.jobrunning;
 
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.jobrunning.common.JobConstants;
 import org.apache.eagle.jobrunning.common.JobConstants.ResourceType;
 import org.apache.eagle.jobrunning.storm.JobRunningContentFilter;
@@ -26,6 +25,7 @@ import org.apache.eagle.jobrunning.storm.JobRunningContentFilterImpl;
 import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 import java.util.HashMap;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinExecutor.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinExecutor.java
index 56c694d..e937b0c 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinExecutor.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinExecutor.java
@@ -19,12 +19,12 @@ package org.apache.eagle.security.hive.sensitivity;
 import com.typesafe.config.Config;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.security.entity.HiveResourceSensitivityAPIEntity;
 import org.apache.eagle.security.util.ExternalDataCache;
 import org.apache.eagle.security.util.ExternalDataJoiner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 import java.util.Arrays;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-security/eagle-security-userprofile/detection/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/detection/pom.xml b/eagle-security/eagle-security-userprofile/detection/pom.xml
index 866ba06..945f8f6 100644
--- a/eagle-security/eagle-security-userprofile/detection/pom.xml
+++ b/eagle-security/eagle-security-userprofile/detection/pom.xml
@@ -78,5 +78,10 @@
           <artifactId>eagle-security-hdfs-auditlog</artifactId>
           <version>${project.version}</version>
     </dependency>
+    <dependency>
+        <groupId>org.scala-lang</groupId>
+        <artifactId>scala-library</artifactId>
+        <version>${scala-lang.version}</version>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserActivityAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserActivityAggregator.java b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserActivityAggregator.java
index a856bbe..7e3b867 100644
--- a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserActivityAggregator.java
+++ b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserActivityAggregator.java
@@ -17,8 +17,8 @@
 package org.apache.eagle.security.userprofile;
 
 import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.security.userprofile.model.UserActivityAggModelEntity;
+import scala.Tuple2;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileAggregatorExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileAggregatorExecutor.java b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileAggregatorExecutor.java
index 8de49ac..4dd4c7f 100644
--- a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileAggregatorExecutor.java
+++ b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileAggregatorExecutor.java
@@ -20,12 +20,12 @@ import com.typesafe.config.Config;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.security.userprofile.impl.UserActivityAggregatorImpl;
 import org.apache.eagle.security.userprofile.model.UserActivityAggModelEntity;
 import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 import java.util.Arrays;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionBatchMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionBatchMain.java b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionBatchMain.java
index 1fdb3bd..c39f8fb 100644
--- a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionBatchMain.java
+++ b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionBatchMain.java
@@ -25,6 +25,7 @@ import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
 import org.apache.eagle.security.userprofile.model.UserActivityAggModelEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionStreamMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionStreamMain.java b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionStreamMain.java
index 1048bdd..8b82ef7 100644
--- a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionStreamMain.java
+++ b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionStreamMain.java
@@ -22,6 +22,7 @@ import com.typesafe.config.Config;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
 import org.apache.eagle.datastream.*;
 import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
+import scala.Tuple2;
 
 import java.util.Arrays;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/impl/UserActivityAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/impl/UserActivityAggregatorImpl.java b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/impl/UserActivityAggregatorImpl.java
index 1b60600..52bdbf0 100644
--- a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/impl/UserActivityAggregatorImpl.java
+++ b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/impl/UserActivityAggregatorImpl.java
@@ -20,7 +20,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.security.userprofile.TimeWindow;
 import org.apache.eagle.security.userprofile.UserActivityAggregator;
 import org.apache.eagle.security.userprofile.UserProfileConstants;
@@ -29,6 +28,7 @@ import org.apache.eagle.security.userprofile.model.UserActivityAggModelEntity;
 import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 import java.util.*;
 
@@ -126,10 +126,10 @@ public class UserActivityAggregatorImpl implements UserActivityAggregator {
 
         Map<String,Map<String,Double>> tmp = new HashMap<>();
         for(Map.Entry<Tuple2<String,String>,Double> entry:counter.entrySet()){
-            String user = entry.getKey().f0();
+            String user = entry.getKey()._1();
             Map<String,Double> cmdCount = tmp.get(user);
             if(cmdCount == null) cmdCount = new HashMap<>();
-            cmdCount.put(entry.getKey().f1(),entry.getValue());
+            cmdCount.put(entry.getKey()._2(),entry.getValue());
             tmp.put(user,cmdCount);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/eagle-security/eagle-security-userprofile/detection/src/test/java/org/apache/eagle/security/userprofile/TestUserActivityAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/detection/src/test/java/org/apache/eagle/security/userprofile/TestUserActivityAggregator.java b/eagle-security/eagle-security-userprofile/detection/src/test/java/org/apache/eagle/security/userprofile/TestUserActivityAggregator.java
index ea55334..c6f1124 100644
--- a/eagle-security/eagle-security-userprofile/detection/src/test/java/org/apache/eagle/security/userprofile/TestUserActivityAggregator.java
+++ b/eagle-security/eagle-security-userprofile/detection/src/test/java/org/apache/eagle/security/userprofile/TestUserActivityAggregator.java
@@ -21,13 +21,13 @@ package org.apache.eagle.security.userprofile;
 
 import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.security.userprofile.impl.UserActivityAggregatorImpl;
 import org.apache.eagle.security.userprofile.model.UserActivityAggModelEntity;
 import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import scala.Tuple2;
 
 import java.text.ParseException;
 import java.util.*;
@@ -57,8 +57,8 @@ public class TestUserActivityAggregator {
         aggregator_0.accumulate(event(DateTimeUtil.humanDateToMilliseconds("2015-09-29 01:02:02,000"),"user2","open"),collector);
 
         Assert.assertEquals(2, collector.getResult().size());
-        Assert.assertEquals("user2",collector.getResult().get(0).f0());
-        Assert.assertEquals("user1",collector.getResult().get(1).f0());
+        Assert.assertEquals("user2",collector.getResult().get(0)._1());
+        Assert.assertEquals("user1",collector.getResult().get(1)._1());
     }
 
     @Test
@@ -76,8 +76,8 @@ public class TestUserActivityAggregator {
         aggregator_1.accumulate(event(DateTimeUtil.humanDateToMilliseconds("2015-09-29 01:06:02,000"),"user2","open"),collector); //  outside safe-window
 
         Assert.assertEquals(4, collector.getResult().size());
-        Assert.assertEquals("user2",collector.getResult().get(0).f0());
-        Assert.assertEquals("user1",collector.getResult().get(1).f0());
+        Assert.assertEquals("user2",collector.getResult().get(0)._1());
+        Assert.assertEquals("user1",collector.getResult().get(1)._1());
     }
 
 
@@ -100,4 +100,4 @@ public class TestUserActivityAggregator {
             return result;
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c7ce706b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6bf7abc..0895004 100755
--- a/pom.xml
+++ b/pom.xml
@@ -139,6 +139,7 @@
         <!-- General Properties -->
         <java.version>1.7</java.version>
         <scala.version>2.10</scala.version>
+        <scala-lang.version>2.10.4</scala-lang.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 


Mime
View raw message