eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [2/7] incubator-eagle git commit: EAGLE-334 clean user profile code clean user profile code in Eagle 0.5 and redesign in Eagle 0.6
Date Fri, 10 Jun 2016 01:29:27 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/resources/log4j.properties b/eagle-security/eagle-security-userprofile/training/src/main/resources/log4j.properties
deleted file mode 100644
index aff248b..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,35 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=DEBUG, stdout
-
- eagle.log.dir=./logs
- eagle.log.file=eagle.log
-
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
-
-# Daily Rolling File Appender
- log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
- log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
- log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
-## 30-day backup
-# log4j.appender.DRFA.MaxBackupIndex=30
- log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-
-# Pattern format: Date LogLevel LoggerName LogMessage
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/resources/reference.conf b/eagle-security/eagle-security-userprofile/training/src/main/resources/reference.conf
deleted file mode 100644
index c57f4a1..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/resources/reference.conf
+++ /dev/null
@@ -1,35 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-akka {
-
-	# Loggers to register at boot time (akka.event.Logging$DefaultLogger logs
-	# to STDOUT)
-	loggers = ["akka.event.slf4j.Slf4jLogger"]
-
-	# Log level used by the configured loggers (see "loggers") as soon
-	# as they have been started; before that, see "stdout-loglevel"
-	# Options: OFF, ERROR, WARNING, INFO, DEBUG
-	loglevel = "DEBUG"
-
-	# Log level for the very basic logger activated during ActorSystem startup.
-	# This logger prints the log messages to stdout (System.out).
-	# Options: OFF, ERROR, WARNING, INFO, DEBUG
-	stdout-loglevel = "DEBUG"
-
-	# Filter of log events that is used by the LoggingAdapter before
-	# publishing log events to the eventStream.
-	logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/sbin/submit-userprofile-training.sh
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/sbin/submit-userprofile-training.sh b/eagle-security/eagle-security-userprofile/training/src/main/sbin/submit-userprofile-training.sh
deleted file mode 100644
index 6d84357..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/sbin/submit-userprofile-training.sh
+++ /dev/null
@@ -1,23 +0,0 @@
-#!/usr/bin/env bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-cd $(dirname $0)/../
-
-./bin/submit-userprofile-training.sh --master yarn-cluster \
-									--input /logs/auditlog/* \
-									--service-host localhost \
-									--service-port 9099

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/UserProfileJobFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/UserProfileJobFactory.scala b/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/UserProfileJobFactory.scala
deleted file mode 100644
index 2bb8b5a..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/UserProfileJobFactory.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile
-
-import org.apache.eagle.security.userprofile.job.AuditLogTrainingSparkJob
-import org.joda.time.Period
-
-/**
-* @since  7/21/15
-*/
-object UserProfileJobFactory {
-  def AuditlogTrainingSparkJob(site:String=null,input:String=null, master:String = "local[1]", appName:String = UserProfileConstants.DEFAULT_TRAINING_APP_NAME, cmdTypes:Seq[String] = UserProfileConstants.DEFAULT_CMD_TYPES,period:Period)(f: AuditLogTrainingSparkJob => Unit = null) = {
-    val app = new AuditLogTrainingSparkJob(site,input,master,appName,cmdTypes,period)
-    if (f != null) f(app)
-    app
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/UserProfileTrainingApp.scala
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/UserProfileTrainingApp.scala b/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/UserProfileTrainingApp.scala
deleted file mode 100644
index f704a61..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/UserProfileTrainingApp.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile
-
-import java.util.Properties
-
-import org.apache.eagle.security.userprofile.model.eigen.UserProfileEigenModeler
-import org.apache.eagle.security.userprofile.model.kde.UserProfileKDEModeler
-import org.apache.eagle.security.userprofile.sink.{UserActivityAggKafkaSink, UserProfileHDFSSink, UserProfileEagleServiceSink}
-import org.apache.eagle.security.userprofile.UserProfileJobFactory._
-import org.apache.spark.Logging
-import org.joda.time.Period
-
-/**
- * @since  7/28/15
- */
-case class UserProfileTrainingApp(config:UserProfileTrainingConfig) extends Logging{
-  AuditlogTrainingSparkJob(config.site,config.input, config.master, config.appName, config.cmdTypes, config.period) { job =>
-    logInfo(s"Registering UserProfileEigenModeler(${job.cmdTypes.mkString(",")})")
-    job.model(new UserProfileEigenModeler(job.cmdTypes.toArray))
-    logInfo(s"Registering UserProfileKDEModeler(${job.cmdTypes.mkString(",")})")
-    job.model(new UserProfileKDEModeler(job.cmdTypes.toArray))
-
-    if (config.serviceHost != null) {
-      logInfo(s"Registering UserProfileEagleServiceSink(${config.serviceHost},${config.servicePort}, ${config.username}, ${config.password})")
-      job.sink(new UserProfileEagleServiceSink(config.serviceHost, config.servicePort, config.username, config.password))
-    }
-
-    if (config.modelOutput != null) {
-      logInfo(s"Registering UserProfileHdfsSink(${config.modelOutput})")
-      job.sink(new UserProfileHDFSSink(config.modelOutput))
-    }
-
-    if (config.kafkaProps != null) {
-      logInfo(s"Registering UserActivityAggKafkaSink(${config.kafkaProps}))")
-      job.sink(new UserActivityAggKafkaSink(config.kafkaProps))
-    }
-
-    logInfo("Starting to run")
-
-    job.run()
-  }
-}
-
-/**
- * @param input         audit log input
- * @param period        aggregation period
- * @param master        spark master url
- * @param appName       spark application name
- * @param cmdTypes      command types
- * @param modelOutput   model output path
- * @param serviceHost   service host
- * @param servicePort   service port
- */
-case class UserProfileTrainingConfig(site:String = null,
-                                     input:String = null,
-                                     period:Period = Period.parse("PT1M"), // changing it to 1M aggregation interval
-                                     master:String="local[10]",
-                                     appName:String="UserProfileTraining",
-                                     cmdTypes:Seq[String]=UserProfileConstants.DEFAULT_CMD_TYPES,
-                                     modelOutput:String=null,
-                                     serviceHost:String=null,
-                                     servicePort:Int = 9099,
-                                     username:String=null,
-                                     password:String=null,
-                                     kafkaProps:Properties = null)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/UserProfileTrainingCLI.scala
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/UserProfileTrainingCLI.scala b/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/UserProfileTrainingCLI.scala
deleted file mode 100644
index feb6548..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/UserProfileTrainingCLI.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile
-
-import org.apache.commons.configuration.{MapConfiguration, ConfigurationConverter}
-import org.joda.time.Period
-
-import scala.collection.JavaConversions
-
-/**
- * Must be the same name with UserProfileTrainingCLI for class path loader
- *
- * @since  7/28/15
- */
-class UserProfileTrainingCLI{}
-
-object UserProfileTrainingCLI{
-  val PARSER  = new scopt.OptionParser[UserProfileTrainingConfig]("[submit-command] --class eagle.security.userprofile.UserProfileTrainingMain [jar]") {
-    head("User Profile Training Application","0.0.1")
-    opt[String]('s',"site") required() action {(x,c) => c.copy(site = x)} text "Site"
-    opt[String]('i',"input") required() action {(x,c) => c.copy(input = x)} text "Input audit log file path"
-    opt[String]('o',"output") optional() action {(x,c) => c.copy(modelOutput = x)} text "Model output HDFS directory, final output path format is: ${output}_${algorithm}"
-    opt[String]('m',"master") optional() action {(x,c) => c.copy(master = x)} text "Spark master url, default: local[10]"
-    opt[String]('n',"app-name") optional() action {(x,c) => c.copy(appName = x)} text "Application name, default: UserProfile"
-    opt[Seq[String]]('c',"cmds") optional() action {(x,c) => c.copy(cmdTypes = x)} text s"Command types, default: [${UserProfileConstants.DEFAULT_CMD_TYPES.mkString(",")}]"
-    opt[String]('h',"service-host") optional() action {(x,c) => c.copy(serviceHost = x)} text "Eagle service host, default: localhost"
-    opt[Int]('p',"service-port") optional()  action {(x,c) => c.copy(servicePort = x)} text "Eagle service port, default: 9099"
-    opt[String]('u',"service-username") optional() action {(x,c) => c.copy(username = x)} text "Eagle service authentication username, default: admin"
-    opt[String]('w',"service-password") optional() action {(x,c) => c.copy(password = x)} text "Eagle service authentication password, default: secure"
-    opt[Map[String,String]]('k',"kafka-props") optional()  action { (x,c) => {c.copy(kafkaProps = ConfigurationConverter.getProperties(new MapConfiguration(JavaConversions.mapAsJavaMap(x))))}} text "Kafka properties, for example: topic=sometopic,metadata.brokers.list=localhost:9200"
-    opt[String]('r',"period") optional() action {(x,c) => c.copy(period = Period.parse(x))} text "Period window (https://en.wikipedia.org/wiki/ISO_8601#Durations), default: PT1M" // changing it to 1M interval
-  }
-
-  def main(args:Array[String]): Unit = {
-    PARSER.parse(args,UserProfileTrainingConfig()) match {
-      case Some(config) =>
-        UserProfileTrainingApp(config)
-        sys.exit(0)
-      case None =>
-        System.err.println("\nMore details: https://spark.apache.org/docs/latest/submitting-applications.html")
-        sys.exit(1)
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Command.scala
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Command.scala b/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Command.scala
deleted file mode 100644
index 3c9e5a0..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Command.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile.daemon
-
-import org.apache.eagle.security.userprofile.daemon.SchedulerContext.{COMMAND_SOURCE, COMMAND_TYPE, _}
-import org.apache.eagle.security.userprofile.model.ScheduleCommandEntity
-import org.apache.eagle.security.userprofile.model.ScheduleCommandEntity.STATUS
-
-import scala.util.{Failure, Success, Try}
-
-/**
- * @since  0.3.0
- */
-trait Executable[T]{
-  def execute:(SchedulerContext => Try[T])
-}
-
-/**
- * @since  0.3.0
- */
-trait ShellExecutable extends Executable[String] {
-    def shell:(SchedulerContext => Seq[String])
-    override def execute = { context =>
-      val command = shell(context)
-      var result:Try[String]  = null
-      var process:Process = null
-      try {
-        val builder = new ProcessBuilder(scala.collection.JavaConversions.seqAsJavaList(command))
-        builder.redirectOutput(ProcessBuilder.Redirect.INHERIT)
-        builder.redirectError(ProcessBuilder.Redirect.INHERIT)
-        process = builder.start()
-        process.waitFor()
-        val exitCode = process.exitValue()
-        if(exitCode != 0 ) {
-          result = Failure(new IllegalMonitorStateException(s"Exit code of process is not zero, but: $exitCode"))
-        }else {
-          result = Success(s"Successfully executed command $command")
-        }
-      }catch{
-        case e: Exception => result = Failure(e)
-      } finally {
-        if(process!=null) {
-          process.destroy()
-        }
-      }
-      result
-    }
-}
-
-abstract class Command(protected val timestamp:Long,protected val site:String,protected val category: COMMAND_TYPE.TYPE,source: COMMAND_SOURCE.TYPE,protected val status:ScheduleCommandEntity.STATUS, val updateTime:Long, val persistable:Boolean) {}
-
-abstract class UserProfileCommand(
-                               timestamp:Long,
-                               inputPath:String,
-                               site:String,
-                               override val category: COMMAND_TYPE.TYPE,
-                               status:STATUS,
-                               source: COMMAND_SOURCE.TYPE,
-                               updateTime:Long,
-                               persistable:Boolean ) extends Command(timestamp,site,category,source,status,updateTime,persistable) with ShellExecutable
-
-case class UserProfileModelCommand(override val timestamp:Long,
-                                   inputPath:String,
-                                   override val site:String,
-                                    override val status:ScheduleCommandEntity.STATUS,
-                                   source: COMMAND_SOURCE.TYPE,
-                                   createTime:Long,
-                                   override val updateTime:Long,
-                                   override val persistable:Boolean = true ) extends UserProfileCommand(timestamp,inputPath,site,COMMAND_TYPE.USER_PROFILE_TRAINING,status,source,updateTime,persistable) {
-
-  override def shell = { context: SchedulerContext =>
-    var builder = Seq(context.driverShell)
-    if(context.driverClasspath!=null)
-      builder = builder ++ Seq("--driver-class-path",context.driverClasspath)
-    builder ++ Seq (
-      "--master",context.sparkMaster,
-      "--deploy-mode",context.sparkMode,
-      "--class",UserProfileTrainingCLIClass,context.jobJar,
-      "--site",this.site,
-      "--period",context.period,
-      "--input", this.inputPath,
-      "--service-host",context.eagleServiceContext.serviceHost,
-      "--service-port",context.eagleServiceContext.servicePort.toString,
-      "--service-username", context.eagleServiceContext.username,
-      "--service-password", context.eagleServiceContext.password
-    )
-  }
-}
-
-object Command{
-  def asEntity(command:Command):ScheduleCommandEntity = {
-    val entity = new ScheduleCommandEntity
-    val tags = new java.util.HashMap[String,String]
-    tags.put("site",command.site)
-    tags.put("type",command.category.toString)
-    entity.setTags(tags)
-    entity.setTimestamp(command.timestamp)
-    entity.setUpdateTime(command.updateTime)
-    entity.setStatus(command.status.toString)
-    entity
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Consumers.scala
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Consumers.scala b/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Consumers.scala
deleted file mode 100644
index 2a6ec5a..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Consumers.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile.daemon
-
-import akka.actor._
-import org.apache.eagle.common.EagleExceptionWrapper
-import org.apache.eagle.security.userprofile.model.ScheduleCommandEntity
-import org.apache.eagle.security.userprofile.model.ScheduleCommandEntity.STATUS
-
-import scala.util.{Failure, Success}
-
-/**
- * @since  9/10/15
- */
-abstract class CommandConsumer extends UntypedActor with ActorLogging{
-  @volatile var dao:UserProfileCommandDao = null
-  @volatile var config:SchedulerContext=null
-
-  override def onReceive(message: Any): Unit = message match {
-    case _config: SchedulerContext =>
-      if(log.isDebugEnabled) {
-        log.debug(s"Initialized with config: $config")
-      } else {
-        if(config == null) log.info(s"Initialized") else log.info("Reinitialized")
-      }
-      config = _config
-      dao = new UserProfileCommandDao(_config.eagleServiceContext.serviceHost, _config.eagleServiceContext.servicePort, _config.eagleServiceContext.username, _config.eagleServiceContext.password, this)
-    case _command: Command =>
-      val _config = config
-      if(_config == null) throw new IllegalStateException("Config is called before asigned")
-      val _sender = sender()
-      handle.apply(_command,_config,_sender)
-    case _ => throw new IllegalArgumentException(s"Unknown message $message")
-  }
-  def handle: PartialFunction[(Command,SchedulerContext,ActorRef), Any]
-}
-
-class UserProfileCommandConsumer extends CommandConsumer{
-  import context.dispatcher
-  override def handle = {
-    case (command, cfg, sender) => command match {
-      case c:UserProfileCommand =>
-        val entity = Command.asEntity(command)
-        if(c.persistable) {
-          dao.updateCommandStatus(entity, STATUS.EXECUTING, s"Command [$entity] is executing ") onComplete {  case _ => execute(c,entity,cfg) }
-        }else{
-          execute(c,entity,cfg)
-        }
-    }
-  }
-
-  private def execute(cmd: UserProfileCommand, entity:ScheduleCommandEntity,cfg: SchedulerContext): Unit ={
-    cmd.execute(cfg) match {
-      case Success(message) =>
-        log.info(s"Executed successfully, returned: $message")
-        if (cmd.persistable)
-          dao.updateCommandStatus(entity, STATUS.SUCCEEDED, message) recover {
-            case ex: Throwable => log.error(ex, s"Failed to update status of [$entity] as SUCCEEDED, due to exception" + ex.getMessage)
-          }
-      case Failure(ex) =>
-        log.error(ex, s"Failed to execute: ${cmd.shell},due to: ${ex.getMessage}")
-        if (cmd.persistable) {
-          dao.updateCommandStatus(entity, STATUS.FAILED, EagleExceptionWrapper.wrap(ex.asInstanceOf[Exception])) recover {
-            case ex: Throwable => log.error(ex, s"Failed to update status of [$entity]  as SUCCEEDED, due to exception: " + ex.getMessage)
-        }
-       }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Coordinator.scala
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Coordinator.scala b/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Coordinator.scala
deleted file mode 100644
index 4b952ea..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Coordinator.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile.daemon
-
-import akka.actor.{Props, _}
-
-/**
- * @since  9/11/15
- */
-case class Initialized(context: SchedulerContext)
-case class CheckOndemandTrainingStatus(var site: String, var category: SchedulerContext.COMMAND_TYPE.TYPE)
-case class CheckPeriodicTrainingStatus(var site: String, var category: SchedulerContext.COMMAND_TYPE.TYPE)
-case class Terminated(context: SchedulerContext)
-
-class CommandCoordinator extends UntypedActor with ActorLogging{
-  var consumer:ActorRef = null
-  var ondemandTrainingProducer:ActorRef = null
-  var periodicTrainingProducer:ActorRef = null
-
-  override def preStart(): Unit = {
-    consumer = context.actorOf(Props[UserProfileCommandConsumer],"userprofile-command-consumer")
-    ondemandTrainingProducer = context.actorOf(Props[OndemandTrainingProducer],"persisted-command-producer")
-    periodicTrainingProducer = context.actorOf(Props[PeriodicTrainingProducer],"scheduled-command-producer")
-  }
-
-  override def onReceive(message: Any): Unit = message match {
-    case Initialized(config) =>
-      log.info(s"Config updated: $config")
-      ondemandTrainingProducer ! config
-      periodicTrainingProducer ! config
-      consumer ! config
-    case request: CheckOndemandTrainingStatus =>
-      ondemandTrainingProducer ! request
-    case request: CheckPeriodicTrainingStatus =>
-      periodicTrainingProducer ! request
-    case command: Command =>
-      log.info(s"*** NEW Command: $command ***")
-      consumer ! command
-    case Terminated(config) =>
-      log.info("Coordinator exited")
-    case _ =>
-      log.warning(s"Unhandled message: $message")
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Producers.scala
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Producers.scala b/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Producers.scala
deleted file mode 100644
index fbf917d..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Producers.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile.daemon
-
-import akka.actor.{ActorLogging, ActorRef, UntypedActor}
-import akka.event.LoggingAdapter
-import org.apache.eagle.common.DateTimeUtil
-import org.apache.eagle.security.userprofile.daemon.SchedulerContext.{COMMAND_SOURCE, COMMAND_TYPE}
-import org.apache.eagle.security.userprofile.model.ScheduleCommandEntity
-import org.apache.eagle.security.userprofile.model.ScheduleCommandEntity.STATUS
-
-import scala.collection.JavaConversions
-import scala.util.{Failure, Success}
-
-/**
- * @since  9/10/15
- */
-
-object CommandProducer{
-  def toStateName(site:String,category:COMMAND_TYPE.TYPE):String = s"$site-${category.toString.toLowerCase}"
-
-  def getProgramStatus(site:String, category:COMMAND_TYPE.TYPE,policy: SchedulePolicy):Option[Long] = policy.getStatus(toStateName(site,category))
-
-  def checkProgramStatus(site:String, category:COMMAND_TYPE.TYPE,policy: SchedulePolicy,inputPath:String, sender: ActorRef, log: LoggingAdapter)(buildCommand: (Long,String) => UserProfileCommand): Unit = {
-    val stateName = toStateName(site,category)
-    var status: Long = policy.getStatus(stateName).getOrElse(0l)
-    if (status > 0) {
-      status = policy.formatStatus(status)
-      if (log.isDebugEnabled) log.debug(s"State [$stateName] is found: [${DateTimeUtil.millisecondsToHumanDateWithSeconds(status)}]")
-      if (!policy.validateStatus(status)) {
-        if (log.isDebugEnabled) {
-          val nextStatus = policy.nextStatus(status)
-          log.debug(s"State [$stateName]: [${DateTimeUtil.millisecondsToHumanDateWithSeconds(status)}], will execute at [${DateTimeUtil.millisecondsToHumanDateWithSeconds(nextStatus)}]")
-        }
-        return
-      }
-    } else {
-      status = policy.formatStatus(System.currentTimeMillis())
-      log.info(s"Initialized State [$stateName] as current timestamp by default: $status")
-      policy updateStatus(stateName, status)
-      // return
-    }
-
-    val path = Utils.formatPathWithMilliseconds(inputPath)(status)
-    log.info(s"Create a new command for [status: ${DateTimeUtil.millisecondsToHumanDateWithSeconds(status)}, path: $path]")
-    sender ! buildCommand(status,path)
-    val nextStatus = policy nextStatus status
-    policy updateStatus(stateName, nextStatus)
-  }
-}
-
-class OndemandTrainingProducer extends UntypedActor with ActorLogging{
-  @volatile var dao:UserProfileCommandDao=null
-  @volatile var config:SchedulerContext=null
-
-  import context.dispatcher
-
-  override def onReceive(message: Any): Unit = message match {
-    case _config: SchedulerContext =>
-      if(log.isDebugEnabled) {
-        log.debug(s"Initialized with config: ${_config}")
-      } else {
-        if(config == null) log.info(s"Initialized") else log.info("Reinitialized")
-      }
-      config = _config
-      dao = new UserProfileCommandDao(config.eagleServiceContext.serviceHost,config.eagleServiceContext.servicePort,config.eagleServiceContext.username,config.eagleServiceContext.password,this)
-    case CheckOndemandTrainingStatus(site,category) =>
-      val _sender = sender()
-      dao.readNewInitializedCommandByType(site,category) onComplete {
-        case Success(optionalEntities) =>
-          optionalEntities match {
-            case Some(entities) =>
-              log.info(s"Load ${entities.size} new $category commands")
-              JavaConversions.collectionAsScalaIterable(entities) foreach { entity =>
-                dao.updateCommandStatus(entity,STATUS.PENDING,"Command is pending to execute") onComplete {
-                  case Success(response) =>
-                    if(response.isSuccess) {
-                      _sender ! entity2Command(site, category, entity, config)
-                    }else{
-                      log.error(s"Got exception to update status as PENDING for command:$entity, due to service exception: ${response.getException}")
-                    }
-                  case Failure(ex) =>
-                    log.error(ex, s"Got exception to update status as PENDING for command:$entity, due to: ${ex.getMessage}")
-                }
-              }
-            case None =>
-              if(log.isDebugEnabled) {
-                log.debug(s"Loaded 0 new $category commands")
-              }
-          }
-        case Failure(exception:Exception) => {
-          log.error(exception, s"Failed to get commands for site = [$site] and category = [$category], due to ${exception.getMessage}")
-        }
-    }
-  }
-
-  private def entity2Command(site:String,category:COMMAND_TYPE.TYPE,entity: ScheduleCommandEntity,config:SchedulerContext):Command = {
-    category match {
-      case COMMAND_TYPE.USER_PROFILE_TRAINING =>
-        val policy = config.trainingSchedulePolicy
-        val status = entity.getTimestamp
-        val path = Utils.formatPathWithMilliseconds(config.trainingAuditPath)(status)
-        UserProfileModelCommand(entity.getTimestamp,path,site,STATUS.INITIALIZED,COMMAND_SOURCE.ONDEMAND,entity.getTimestamp,entity.getUpdateTime,persistable = true)
-      case _ => throw new IllegalArgumentException(s"Unknown type of command type: $category")
-    }
-  }
-}
-
-class PeriodicTrainingProducer extends UntypedActor with ActorLogging{
-  @volatile var config:SchedulerContext = null
-  @volatile var dao:UserProfileCommandDao=null
-
-  import CommandProducer._
-
-  def checkTrainingProgramStatus(site: String,category:COMMAND_TYPE.TYPE,sender: ActorRef): Unit = {
-    checkProgramStatus(site,category,config.trainingSchedulePolicy,config.trainingAuditPath,sender,log) {(status,path) =>
-      UserProfileModelCommand(status,path,site,STATUS.INITIALIZED,COMMAND_SOURCE.PERIODIC,System.currentTimeMillis(),System.currentTimeMillis(),persistable = false)
-    }
-  }
-
-  override def onReceive(message: Any): Unit = message match {
-    case _config: SchedulerContext =>
-      if (log.isDebugEnabled) {
-        log.debug(s"Initialized with config: $config")
-      } else {
-        if(config == null) log.info(s"Initialized") else log.info("Reinitialized")
-      }
-      config = _config
-      dao = new UserProfileCommandDao(config.eagleServiceContext.serviceHost,config.eagleServiceContext.servicePort,config.eagleServiceContext.username,config.eagleServiceContext.password,this)
-    case CheckPeriodicTrainingStatus(site,category) =>
-      category match {
-        case COMMAND_TYPE.USER_PROFILE_TRAINING =>
-          log.debug("Checking training program status")
-          checkTrainingProgramStatus(site,category,sender())
-        case _ =>
-          log.error(s"Unknown type of category: $category")
-      }
-    case _ => throw new IllegalArgumentException(s"Unknown message $message")
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/SchedulePolicy.scala
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/SchedulePolicy.scala b/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/SchedulePolicy.scala
deleted file mode 100644
index 2dde4ce..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/SchedulePolicy.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile.daemon
-
-import org.apache.eagle.security.userprofile.UserProfileUtils
-import org.joda.time.{LocalDate, Period}
-
-import scala.collection.mutable
-
-/**
- * User Profile Daemon Schedule Policy
- * Configure scheduler by defining policy for managing status
- *
- * @since  9/5/15
- */
-trait SchedulePolicy{
-  /**
-   * @param timestamp
-   * @return
-   */
-  def validateStatus(timestamp:Long):Boolean
-
-  /**
-   *
-   * @param timestamp
-   * @return
-   */
-  def nextStatus(timestamp:Long):Long
-
-  /**
-   *
-   * @param name
-   * @return
-   */
-  def getStatus(name:String):Option[Long]
-
-  /**
-   *
-   * @param name
-   * @param status
-   */
-  def updateStatus(name:String,status:Long):Unit
-
-  /**
-   *
-   * @param timestamp
-   * @return
-   */
-  def formatStatus(timestamp:Long):Long
-}
-
-trait StateManager {
-  def getState(name:String):Option[Long]
-  def updateState(name:String,value:Long)
-}
-
-object MemoryStateManager extends StateManager {
-  val map = new mutable.HashMap[String,Long]()
-  override def getState(name: String): Option[Long] = this.synchronized { map.get(name) }
-  override def updateState(name: String, value: Long): Unit = this.synchronized {
-    map += (name -> value)
-  }
-}
-
-object ZKStateManager extends StateManager{
-  override def getState(name: String): Option[Long] = ???
-  override def updateState(name: String, value: Long): Unit = ???
-}
-
-class DefaultSchedulePolicy(duration:Period,stateManager:StateManager = MemoryStateManager) extends SchedulePolicy{
-  override def validateStatus(timestamp: Long): Boolean = formatStatus(timestamp) < formatStatus(System.currentTimeMillis())
-
-//  override def nextStatus(timestamp: Long): Long = new LocalDate(timestamp).plus(duration).toDate.getTime
-  override def nextStatus(timestamp: Long): Long = timestamp + Int.int2long(duration.toStandardSeconds.getSeconds) * 1000
-
-  override def getStatus(name: String): Option[Long] = stateManager.getState(name)
-
-  override def updateStatus(name: String, status: Long): Unit = stateManager.updateState(name,status)
-
-  override def formatStatus(timestamp: Long): Long = UserProfileUtils.formatMillisecondsByPeriod(timestamp,duration)
-}
-
-class MonthlySchedulePolicy(stateManager:StateManager = MemoryStateManager) extends DefaultSchedulePolicy(duration = null,stateManager=stateManager){
-  override def formatStatus(timestamp: Long): Long = new LocalDate(timestamp).withDayOfMonth(1).toDate.getTime
-  override def nextStatus(timestamp: Long): Long = new LocalDate(timestamp).plusMonths(1).withDayOfMonth(1).toDate.getTime
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Scheduler.scala
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Scheduler.scala b/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Scheduler.scala
deleted file mode 100644
index 770ad6a..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Scheduler.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile.daemon
-
-import akka.actor.{ActorSystem, Props}
-import SchedulerContext.COMMAND_TYPE
-import org.apache.eagle.dataproc.util.ConfigOptionParser
-
-import scala.concurrent.duration._
-
-/**
- * User Profile Training Scheduler
- *
- * @since  0.3.0
- */
-class Scheduler(config:SchedulerContext) {
-  /**
-   * Start Daemon Scheduler
-   */
-  def start(): Unit = {
-    val system = ActorSystem(SchedulerContext.SchedulerSystem)
-
-    system.log.info(s"Started actor system: $system")
-
-    import system.dispatcher
-
-    val coordinator = system.actorOf(Props[CommandCoordinator])
-    // Initialize when start
-    system.scheduler.scheduleOnce(0.seconds,coordinator,Initialized(config))
-
-    system.scheduler.schedule(1.seconds,config.syncIntervalSeconds.seconds,coordinator,CheckOndemandTrainingStatus(config.site,COMMAND_TYPE.USER_PROFILE_TRAINING))
-
-    system.scheduler.schedule(config.trainingInitialDelaySeconds.seconds,config.trainingIntervalSeconds.seconds,coordinator,CheckPeriodicTrainingStatus(config.site,COMMAND_TYPE.USER_PROFILE_TRAINING))
-
-    system.registerOnTermination(new Runnable {
-      override def run(): Unit = {
-        coordinator ! Terminated(config)
-      }
-    })
-  }
-}
-
-object Scheduler{
-  def main(args:Array[String]): Unit ={
-    new ConfigOptionParser().load(args)
-    new Scheduler(SchedulerContext.load).start()
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/SchedulerContext.scala
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/SchedulerContext.scala b/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/SchedulerContext.scala
deleted file mode 100644
index fcc64fc..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/SchedulerContext.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile.daemon
-
-import java.net.URLDecoder
-
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.eagle.security.userprofile.UserProfileTrainingCLI
-
-object SchedulerContext {
-  val SchedulerSystem = "userprofile-scheduler-system"
-  val UserProfileTrainingJarFilePath = URLDecoder.decode(classOf[UserProfileTrainingCLI].getProtectionDomain.getCodeSource.getLocation.getPath,"UTF-8")
-  val UserProfileTrainingCLIClass = classOf[UserProfileTrainingCLI].getCanonicalName
-  val HdfsAuditLogAggStatus = "hdfs.audit.agg.status"
-  val HdfsAuditLogModelStatus = "hdfs.audit.model.status"
-
-  // {
-  // ======================================================================================
-  // Configuration Keys
-  // ======================================================================================
-  val UserProfileConfigKey_Site="eagle.site"
-  val UserProfileConfigKey_DryRun="eagle.userprofile.dry-run"
-
-
-  val UserProfileConfigKey_Features="eagle.userprofile.features"
-  val UserProfileConfigKey_Period="eagle.userprofile.period"
-
-  val UserProfileConfigKey_JobJarFilePath="eagle.userprofile.jar"
-  val UserProfileConfigKey_DriverShell="eagle.userprofile.driver-shell"
-  val UserProfileConfigKey_DriverClasspath="eagle.userprofile.driver-classpath"
-  val UserProfileConfigKey_SparkMaster="eagle.userprofile.spark-master"
-  val UserProfileConfigKey_SparkMode="eagle.userprofile.spark-mode"
-
-  val UserProfileConfigKey_ModelAuditPath="eagle.userprofile.training-audit-path"
-  val UserProfileConfigKey_ModelIntervalSeconds="eagle.userprofile.training-interval-seconds"
-  val UserProfileConfigKey_ModelInitialDelaySeconds="eagle.userprofile.training-initial-delay-seconds"
-  val UserProfileConfigKey_ServiceHost="eagle.service.host"
-  val UserProfileConfigKey_ServicePort="eagle.service.port"
-  val UserProfileConfigKey_UserName="eagle.service.username"
-  val UserProfileConfigKey_Password="eagle.service.password"
-  val UserProfileConfigKey_SyncIntervalSeconds="eagle.userprofile.sync-interval-seconds"
-  // ======================================================================================
-  // End of Configuration Keys
-  // ======================================================================================
-  // }
-
-  object COMMAND_TYPE extends Enumeration {
-    type TYPE = Value
-    val USER_PROFILE_TRAINING = Value
-  }
-
-  object COMMAND_SOURCE extends Enumeration {
-    type TYPE = Value
-    val PERIODIC, ONDEMAND  = Value
-  }
-
-  /**
-   * Load scheduler context from configuration
-   */
-  def load:SchedulerContext={
-    val config:Config = ConfigFactory.load()
-    var site = ""
-
-    if(config.hasPath(UserProfileConfigKey_Site)) site = config.getString(UserProfileConfigKey_Site)
-
-    if(site == null || site.isEmpty) throw new IllegalArgumentException(s"Config [$UserProfileConfigKey_Site] is required, but not given")
-
-    val context = new SchedulerContext(site)
-
-    if(config.hasPath(UserProfileConfigKey_DryRun)) context.dryRun = config.getBoolean(UserProfileConfigKey_DryRun)
-    if(config.hasPath(UserProfileConfigKey_Features)) context.features = config.getString(UserProfileConfigKey_Features)
-    if(config.hasPath(UserProfileConfigKey_Period)) context.period = config.getString(UserProfileConfigKey_Period)
-
-    if(config.hasPath(UserProfileConfigKey_ModelAuditPath)) {
-      context.trainingAuditPath = config.getString(UserProfileConfigKey_ModelAuditPath)
-    }else{
-      throw new IllegalArgumentException(s"Config[$UserProfileConfigKey_ModelAuditPath] should not be null")
-    }
-
-    if(config.hasPath(UserProfileConfigKey_ModelIntervalSeconds)) context.trainingIntervalSeconds = config.getLong(UserProfileConfigKey_ModelIntervalSeconds)
-    if(config.hasPath(UserProfileConfigKey_ModelInitialDelaySeconds)) context.trainingInitialDelaySeconds = config.getLong(UserProfileConfigKey_ModelInitialDelaySeconds)
-
-    if(config.hasPath(UserProfileConfigKey_ServiceHost)) context.eagleServiceContext.serviceHost = config.getString(UserProfileConfigKey_ServiceHost)
-    if(config.hasPath(UserProfileConfigKey_ServicePort)) context.eagleServiceContext.servicePort = config.getInt(UserProfileConfigKey_ServicePort)
-    if(config.hasPath(UserProfileConfigKey_UserName)) context.eagleServiceContext.username = config.getString(UserProfileConfigKey_UserName)
-    if(config.hasPath(UserProfileConfigKey_Password)) context.eagleServiceContext.password = config.getString(UserProfileConfigKey_Password)
-
-    if(config.hasPath(UserProfileConfigKey_JobJarFilePath)) context.jobJar = config.getString(UserProfileConfigKey_JobJarFilePath)
-
-    if(config.hasPath(UserProfileConfigKey_SparkMaster)) context.sparkMaster = config.getString(UserProfileConfigKey_SparkMaster)
-    if(config.hasPath(UserProfileConfigKey_SparkMode)) context.sparkMode = config.getString(UserProfileConfigKey_SparkMode)
-    if(config.hasPath(UserProfileConfigKey_DriverShell)) context.driverShell = config.getString(UserProfileConfigKey_DriverShell)
-    if(config.hasPath(UserProfileConfigKey_DriverClasspath)) context.driverClasspath = config.getString(UserProfileConfigKey_DriverClasspath)
-    if(config.hasPath(UserProfileConfigKey_SyncIntervalSeconds)) context.syncIntervalSeconds = config.getLong(UserProfileConfigKey_SyncIntervalSeconds)
-
-    context
-  }
-}
-
-case class EagleServiceContext(
-  var serviceHost:String = "localhost",
-  var servicePort:Int = 9099,
-  var username:String = "admin",
-  var password:String = "secure"
-)
-
-case class SchedulerContext(
-  var site:String,
-  var dryRun:Boolean = true,
-  var features:String = null,
-  var period:String = "PT1m", // changing to 1m aggregation
-  var eagleServiceContext: EagleServiceContext=new EagleServiceContext(),
-  var jobJar: String = SchedulerContext.UserProfileTrainingJarFilePath,
-  var driverShell: String = "spark-submit",
-  var driverClasspath: String = null,
-  var sparkMaster:String = "local[10]",
-  var sparkMode:String = "client",
-  var trainingAuditPath:String = null,
-  var trainingIntervalSeconds:Long = 60,
-  var trainingInitialDelaySeconds:Long = 0,
-  /**
-   * Detection sync commands interval in seconds
-   */
-  var syncIntervalSeconds:Long = 5,
-  /**
-   * Training program schedule policy
-   */
-  trainingSchedulePolicy:SchedulePolicy = new MonthlySchedulePolicy()
-)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/UserProfileCommandDao.scala
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/UserProfileCommandDao.scala b/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/UserProfileCommandDao.scala
deleted file mode 100644
index a129074..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/UserProfileCommandDao.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile.daemon
-
-import java.util
-import java.util.concurrent.Callable
-
-import akka.actor.UntypedActor
-import akka.dispatch.Futures
-import akka.event.Logging
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity
-import org.apache.eagle.security.userprofile.daemon.SchedulerContext.COMMAND_TYPE
-import org.apache.eagle.security.userprofile.model.ScheduleCommandEntity
-import org.apache.eagle.security.userprofile.model.ScheduleCommandEntity.STATUS
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl
-
-/**
- * User Profile Command Dao By Eagle Service
- * @since  9/11/15
- */
-class UserProfileCommandDao(host:String,port:Int,username:String,password:String,actor:UntypedActor){
-  val client = new EagleServiceClientImpl(host,port,username,password)
-  val log = Logging.getLogger(actor)
-
-  /**
-   * readNewInitializedCommandByType
-   *
-   * @param site eagle site name
-   * @param commandType command type
-   * @return Future
-   */
-  def readNewInitializedCommandByType(site:String,commandType: COMMAND_TYPE.TYPE) = {
-    Futures.future(new Callable[Option[java.util.List[ScheduleCommandEntity]]]{
-      override def call(): Option[java.util.List[ScheduleCommandEntity]] = {
-        val response:GenericServiceAPIResponseEntity[ScheduleCommandEntity]  =
-          new EagleServiceClientImpl(host,port,username,password)
-            .silence(true)
-            .search(s"${ScheduleCommandEntity.ScheduleTaskService}[@site = ${"\"" + site +"\""} AND @type = ${"\""+commandType.toString+"\""} AND (@status = ${"\""+STATUS.INITIALIZED.name()+"\""} or @status = ${"\"\""} or @status is null)]{*}")
-            .startTime(0).endTime(Long.MaxValue).pageSize(Int.MaxValue).send()
-
-        if(!response.isSuccess){
-          throw new RuntimeException(s"Got server side exception: ${response.getException}")
-        }
-
-        if(response.getObj != null && response.getObj.size() == 0) None else Option(response.getObj)
-      }
-    },actor.getContext().dispatcher)
-  }
-
-  /**
-   * updateCommandStatus
-   *
-   * @param task task entity
-   * @param status task status
-   * @param message message string
-   * @return
-   */
-  def updateCommandStatus(task:ScheduleCommandEntity,status:STATUS,message:String) ={
-    Futures.future(new Callable[GenericServiceAPIResponseEntity[String]] {
-      override def call(): GenericServiceAPIResponseEntity[String] = {
-        if(log.isDebugEnabled) log.debug(s"Updating status of task[$task] as: $status")
-        task.setStatus(status.name())
-        task.setDetail(s"[${status.name()}] $message")
-        task.setUpdateTime(System.currentTimeMillis())
-        val response = client.update(util.Arrays.asList(task),classOf[ScheduleCommandEntity])
-        if(response.isSuccess){
-          log.info(s"Updated status of command [$task] as: $status")
-        } else {
-          log.error(s"Failed to update status of command [$task] as: $status, because of exception: ${response.getException}")
-          throw new RuntimeException(s"Failed to update status due to exception: ${response.getException}")
-        }
-        response
-      }
-    },actor.getContext().dispatcher)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Utils.scala
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Utils.scala b/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Utils.scala
deleted file mode 100644
index 1a0f3bb..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/daemon/Utils.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile.daemon
-
-import java.util.regex.Pattern
-import org.apache.eagle.common.DateTimeUtil
-import org.apache.eagle.security.userprofile.UserProfileUtils
-import org.joda.time.Period
-
-import scala.collection.mutable
-
-/**
- * @since  9/2/15
- */
-object Utils {
-  val PATH_TIME_FORMAT_PATTERN = Pattern.compile("\\$\\{([\\*\\/\\w\\-_\\s]+)\\}")
-
-  /**
-   * Example: hdfs:///logs/nn/auditlog/${yyyy-mm-dd}/hdfs-audit.log.${yyyy-mm-dd-hh}.gz
-   *
-   * @param path
-   * @return
-   */
-  def formatPathWithMilliseconds(path:String)(milliseconds:Long):String = {
-    val matcher = PATH_TIME_FORMAT_PATTERN.matcher(path)
-    var result = path
-    while(matcher.find()){
-      val grouped = matcher.group()
-      val format = grouped.substring(2,grouped.length()-1)
-      val formatted = DateTimeUtil.format(milliseconds,format)
-      result = result.replace(grouped,formatted)
-    }
-    result
-  }
-
-  /**
-   * For example:
-   *
-   * current: 2015/09/02 10:12:32
-   * granularity: 15m
-   * duration: 1M // changing it to 1M interval of aggregation
-   *
-   * 1) flatten to duration: 2015/09/02 10:12:32 => 2015/09/02 10:00:00
-   * 2) split by granularity: 2015/09/02 10:00:00 / 15m => [2015/09/02 10:00:00,2015/09/02 10:15:00,2015/09/02 10:30:00,2015/09/02 10:45:00]
-   * 3) format file paths
-   */
-  def formatPathsInDuration(pathFormat:String,milliseconds:Long,duration:Period,granularity: Period = Period.parse("PT1M")): Seq[String] = {
-    val durationIsSeconds = duration.toStandardSeconds
-    val durationInMs = Int.int2long(durationIsSeconds.getSeconds) * 1000
-    val granularityInMs = Int.int2long(granularity.toStandardSeconds.getSeconds) * 1000
-
-    val flattenTimestamp =UserProfileUtils.formatSecondsByPeriod(milliseconds/1000,durationIsSeconds) * 1000
-    val timeSplits:mutable.MutableList[Long] = new mutable.MutableList[Long]()
-    var tmp = flattenTimestamp
-
-    while(tmp - flattenTimestamp < durationInMs){
-      timeSplits += tmp
-      tmp = tmp + granularityInMs
-    }
-
-    timeSplits.map(formatPathWithMilliseconds(pathFormat))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/job/AuditLogTrainingSparkJob.scala
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/job/AuditLogTrainingSparkJob.scala b/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/job/AuditLogTrainingSparkJob.scala
deleted file mode 100644
index 6253351..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/job/AuditLogTrainingSparkJob.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile.job
-
-import org.apache.commons.math3.linear.{Array2DRowRealMatrix, RealMatrix}
-import org.apache.eagle.security.userprofile.UserProfileConstants
-import org.apache.eagle.security.userprofile.model.AuditLogTransformer
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.{Logging, SparkConf, SparkContext}
-import org.joda.time.Period
-
-import scala.collection.mutable
-
-/**
- * Audit Log UserProfileTrainingJob Implementation
- *
- * @param site site
- * @param input input path
- * @param master  spark master
- * @param appName spark application name
- * @param cmdTypes command features
- * @param period model aggregation granularity
- */
-case class AuditLogTrainingSparkJob(
-    site:String = null,
-    var input:String=null,
-    master:String="local[4]",
-    appName:String=UserProfileConstants.DEFAULT_TRAINING_APP_NAME,
-    cmdTypes:Seq[String] = UserProfileConstants.DEFAULT_CMD_TYPES,
-    period:Period = Period.parse("PT1m")
-  ) extends UserProfileTrainingJob with Logging {
-  import org.apache.eagle.security.userprofile.job.AuditlogTrainingSparkJobFuncs._
-
-  val conf = new SparkConf().setMaster(master).setAppName(appName)
-  val sc = new SparkContext(conf)
-
-  override def run(){
-    logInfo(s"Starting $appName")
-    buildDAG(sc)
-    logInfo(s"Stopping $appName")
-    sc.stop()
-    logInfo(s"Finished $appName")
-  }
-
-  /**
-   * Build Spark DAG
-   *
-   * @param sc
-   */
-  private def buildDAG(sc:SparkContext): Unit = {
-    if (input == null) throw new IllegalArgumentException("input is null")
-
-    val _site = site
-    val _sc = sc
-    val _cmdTypes = cmdTypes
-    val _modelers = modelers
-    val _modelSinks = modelSinks
-    val _aggSinks = aggSinks
-
-    val _period = period
-    val tmp = _sc.textFile(input)
-      .map(AuditLogTransformer(_period).transform)
-      .filter(e => e.isDefined)
-      .map(e => {((e.get.user,e.get.periodWindowSeconds,e.get.cmd),1.toDouble) }) // [(user,period,cmd),count]
-
-    val _aggRDD = tmp
-      .reduceByKey(_ + _)                                           // [(user,period,cmd),totalCount]
-      .map(kv => {((kv._1._1, kv._1._2), (kv._1._3, kv._2))})       // [(user,period),(cmd,totalCount)]
-      .groupByKey()
-      .map(asUserCmdCountArray(_cmdTypes))                          // [(user,(period,array))]
-      .sortBy(asUserPeriod, ascending = true)
-      .groupByKey()
-      .map(asMatrix)
-
-    if (_modelers.size > 1 || _aggSinks.size > 1) _aggRDD.persist(StorageLevel.MEMORY_ONLY)
-
-    _aggSinks.foreach(sink =>{
-      sink.persist(_aggRDD,_cmdTypes,_site)
-    })
-
-    val modelRDDS = _modelers.map(
-      modeler => {
-        (_aggRDD.flatMap(v => {
-          val _modeler = modeler
-          _modeler.build(_site,v._1, v._2)
-        }), modeler.context())
-    })
-
-    if(_modelSinks.size > 1) modelRDDS.foreach(_._1.cache())
-
-    modelRDDS.foreach(kv => {
-      _modelSinks.foreach(sink => {
-        sink.persist(kv._1, kv._2)
-      })
-    })
-  }
-
-  def input(path:String):AuditLogTrainingSparkJob = {
-    this.input = path
-    this
-  }
-}
-
-private object AuditlogTrainingSparkJobFuncs extends Logging{
-  /**
-   * From: [(user,period),(cmd,count)]
-   *
-   * To: [(user), (period,"getfileinfo", "open", "listStatus", "setTimes", "setPermission", "rename", "mkdirs", "create", "setReplication", "contentSummary", "delete", "setOwner", "fsck")]
-   *
-   * @param keyValue
-   * @return
-   */
-  def asUserCmdCountArray(cmdTypes:Seq[String])(keyValue:(((String,Long),Iterable[(String,Double)])))={
-    // make sure instance variables are always in local for closure
-    val key = keyValue._1
-    val it = keyValue._2
-    var cmdCount = Map[String, Double]()
-    it.foreach(k => {
-      cmdCount += (k._1 -> k._2)
-    })
-    var cmdCounts = mutable.MutableList[Double]()
-    cmdTypes.foreach(_cmdType => {
-      cmdCounts += cmdCount.getOrElse[Double](_cmdType, 0.0)
-    })
-    val cmdCountsInDouble:Array[Double] = cmdCounts.toArray
-    (key._1,(key._2,cmdCountsInDouble))
-  }
-
-  /**
-   * @param pair
-   * @return (user,matrix)
-   */
-  def asMatrix(pair:(String,Iterable[(Long,Array[Double])]))={
-    val data = new mutable.MutableList[Array[Double]]
-    pair._2.foreach(v => data += v._2)
-    val matrix:RealMatrix = new Array2DRowRealMatrix(data.toArray)
-    (pair._1,matrix)
-  }
-
-  def asUserPeriod(param:(String,(Long,Array[Double]))) ={
-    (param._1, param._2._1)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/job/UserProfileTrainingJob.scala
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/job/UserProfileTrainingJob.scala b/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/job/UserProfileTrainingJob.scala
deleted file mode 100644
index aae11a0..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/job/UserProfileTrainingJob.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile.job
-
-import org.apache.eagle.security.userprofile.model.{UserProfileModel, UserProfileContext, UserProfileModeler}
-import org.apache.eagle.security.userprofile.sink.{UserActivityAggRDDSink, UserProfileModelRDDSink}
-
-import scala.collection.mutable
-
-/**
- * User Profile Training Job Interface
- *
- * @since  7/19/15
- */
-trait UserProfileTrainingJob {
-  /**
-   * Start the app
-   *
-   * @return
-   */
-  def run()
-
-  /**
-   * Register training modeler
-   *
-   * @param modeler
-   */
-  def model(modeler:UserProfileModeler[UserProfileModel,UserProfileContext]): UserProfileTrainingJob = {
-    this.modelers += modeler
-    this
-  }
-
-  /**
-   * Register model data consumer
-   * @param sink
-   */
-  def sink(sink: UserProfileModelRDDSink): UserProfileTrainingJob = {
-    this.modelSinks += sink
-    this
-  }
-
-  def sink(sink: UserActivityAggRDDSink): UserProfileTrainingJob = {
-    this.aggSinks += sink
-    this
-  }
-
-  val modelers = new mutable.MutableList[UserProfileModeler[UserProfileModel,UserProfileContext]]()
-  val modelSinks = new mutable.MutableList[UserProfileModelRDDSink]()
-  val aggSinks = new mutable.MutableList[UserActivityAggRDDSink]()
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/model/UserProfileContext.scala
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/model/UserProfileContext.scala b/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/model/UserProfileContext.scala
deleted file mode 100644
index ea84315..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/model/UserProfileContext.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile.model
-
-/**
- * @since  0.3.0
- */
-case class UserProfileContext(algrithm:String)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/model/UserProfileModeler.scala
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/model/UserProfileModeler.scala b/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/model/UserProfileModeler.scala
deleted file mode 100644
index de52e73..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/model/UserProfileModeler.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile.model
-
-import org.apache.commons.math3.linear.RealMatrix
-
-/**
- * @since  0.3.0
- */
-trait UserProfileModeler[+M,+C <: UserProfileContext] extends Serializable{
-  /**
-   * @param site site
-   * @param user user
-   * @param matrix user profile matrix
-   */
-  def build(site:String,user:String,matrix: RealMatrix):List[M]
-
-  /**
-   * @return
-   */
-  def context():C
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/sink/UserActivityAggSink.scala
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/sink/UserActivityAggSink.scala b/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/sink/UserActivityAggSink.scala
deleted file mode 100644
index bd16ba9..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/sink/UserActivityAggSink.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile.sink
-
-import java.util.Properties
-
-import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
-import org.apache.commons.math3.linear.RealMatrix
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity
-import org.apache.eagle.security.userprofile.model.{EntityConversion, UserActivityAggModel}
-import org.apache.spark.Logging
-import org.apache.spark.rdd.RDD
-import org.codehaus.jackson.map.ObjectMapper
-
-/**
- * @since  0.3.0
- */
-trait UserActivityAggRDDSink {
-  def persist(rdd: RDD[(String, RealMatrix)],cmdTypes:Seq[String],site:String): Unit
-}
-
-abstract class UserActivityAggModelSink extends UserActivityAggRDDSink{
-  override def persist(rdd: RDD[(String, RealMatrix)],cmdTypes:Seq[String],site:String): Unit = {
-    rdd.foreach(kv => {
-      this.persist(UserActivityAggModel(kv._1,kv._2,cmdTypes,site,System.currentTimeMillis()))
-    })
-  }
-
-  def persist(model: UserActivityAggModel): Unit
-}
-
-case class UserActivityAggKafkaSink(properties: Properties) extends UserActivityAggModelSink with Logging{
-
-  @transient var producer:Producer[String, String] = null
-  @transient var objectMapper:ObjectMapper  = null
-
-  val TOPIC_KEY = "topic"
-  val topicName  = properties.getProperty(TOPIC_KEY)
-  if(topicName == null) throw new IllegalArgumentException(s"$TOPIC_KEY is null")
-
-  private[this] def getProducer = {
-    if(producer == null){
-      if(!properties.containsKey("serializer.class")) properties.setProperty("serializer.class", "kafka.serializer.StringEncoder")
-      if(!properties.containsKey("metadata.broker.list")) properties.setProperty("metadata.broker.list", "localhost:9092")
-      val config = new ProducerConfig(properties)
-        producer = new Producer[String,String](config)
-    }
-    producer
-  }
-
-  private[this] def getObjectMapper = {
-    if(objectMapper == null) {
-      objectMapper = TaggedLogAPIEntity.buildObjectMapper()
-    }
-    objectMapper
-  }
-
-  override def persist(model: UserActivityAggModel): Unit = {
-    val entity = model.asInstanceOf[EntityConversion[TaggedLogAPIEntity]].toEntity
-    val message = new KeyedMessage[String, String](topicName, model.user, getObjectMapper.writeValueAsString(entity))
-    try {
-      getProducer.send(message)
-    }catch {
-      case e:Exception => {
-        logError(s"Failed to send message to kafka[$properties]",e)
-        throw e
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/sink/UserProfileModelSink.scala
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/sink/UserProfileModelSink.scala b/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/sink/UserProfileModelSink.scala
deleted file mode 100644
index d6fbae9..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/main/scala/org/apache/eagle/security/userprofile/sink/UserProfileModelSink.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile.sink
-
-import java.util
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.eagle.security.userprofile.model.{EntityConversion, UserProfileContext, UserProfileModel}
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity
-import org.apache.eagle.service.client.IEagleServiceClient
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl
-import org.apache.spark.Logging
-import org.apache.spark.rdd.RDD
-
-trait UserProfileModelRDDSink extends Serializable{
-  def persist(model: RDD[UserProfileModel],context: UserProfileContext)
-}
-
-abstract class UserProfileModelSink extends UserProfileModelRDDSink{
-  def persist(model: UserProfileModel)
-  override def persist(model: RDD[UserProfileModel],context:UserProfileContext): Unit = {
-    model.foreach(persist)
-  }
-}
-
-case class UserProfileHDFSSink(path:String) extends UserProfileModelRDDSink with Logging{
-  override def persist(model: RDD[UserProfileModel],ctx: UserProfileContext): Unit = {
-    val finalPath = if(path.endsWith("/")) String.format("%s%s",path,ctx.algrithm) else String.format("%s/%s",path,ctx.algrithm)
-    logInfo(s"Saving to output path: $finalPath, algrithm: ${ctx.algrithm}")
-    model.map(e => {
-      val entity = e.asInstanceOf[EntityConversion[TaggedLogAPIEntity]].toEntity
-      entity.setSerializeVerbose(false)
-      TaggedLogAPIEntity.buildObjectMapper().writeValueAsString(entity)
-    }).saveAsTextFile(finalPath)
-  }
-}
-
-case class UserProfileEagleServiceSink(host:String = "localhost",port:Int = 9099, username:String = "admin", password:String = "secret", maxRetryTimes:Int = 100) extends UserProfileModelSink with Logging{
-  @transient var client:IEagleServiceClient = null
-
-  def persist(model: UserProfileModel): Unit = {
-    if (model == null) {
-      logDebug("model is null")
-      return
-    }
-    if (client == null) client = new EagleServiceClientImpl(host, port, username, password)
-    var triedTimes = 0
-    var response: GenericServiceAPIResponseEntity[String] = null
-    while (triedTimes < maxRetryTimes && (response == null || !response.isSuccess)) {
-      triedTimes += 1
-      try{
-        response = client.create(util.Arrays.asList(model.asInstanceOf[EntityConversion[TaggedLogAPIEntity]].toEntity))
-        if (response.isSuccess) {
-          logInfo(s"Successfully persist 1 model for user: ${model.user} algorithm: ${model.algorithm}")
-        } else {
-          logError(s"Failed to persist 1 model for user: ${model.user} algorithm: ${model.algorithm}, tried $triedTimes times, exception: ${response.getException}")
-        }
-      } catch {
-        case e: Throwable => {
-          logError(s"Failed to persist for exception 1 model for user: ${model.user} algorithm: ${model.algorithm}, tried $triedTimes times, exception: ${e.getMessage}",e)
-          response = null
-        }
-      }
-    }
-  }
-}
-
-case object UserProfileStdoutAsJsonSink extends UserProfileModelSink with Logging{
-  val objMapper = new ObjectMapper
-  override def persist(model: UserProfileModel): Unit = {
-    val json = objMapper.writeValueAsString(model)
-    println(json)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8b646715/eagle-security/eagle-security-userprofile/training/src/test/java/org/apache/eagle/security/userprofile/model/UserProfileEigenModelerTest.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/training/src/test/java/org/apache/eagle/security/userprofile/model/UserProfileEigenModelerTest.java b/eagle-security/eagle-security-userprofile/training/src/test/java/org/apache/eagle/security/userprofile/model/UserProfileEigenModelerTest.java
deleted file mode 100644
index 3d5d6a6..0000000
--- a/eagle-security/eagle-security-userprofile/training/src/test/java/org/apache/eagle/security/userprofile/model/UserProfileEigenModelerTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.userprofile.model;
-
-import org.apache.commons.math3.linear.Array2DRowRealMatrix;
-import org.apache.commons.math3.linear.RealMatrix;
-import org.apache.eagle.security.userprofile.model.eigen.UserProfileEigenModeler;
-import org.junit.Assert;
-import scala.collection.immutable.List;
-
-public class UserProfileEigenModelerTest {
-
-    @org.junit.Test
-    public void testBuild() throws Exception {
-
-        UserProfileEigenModeler modeler = new UserProfileEigenModeler();
-        String user = "user1";
-        final RealMatrix mockMatrix = new Array2DRowRealMatrix(buildMockData());
-        List<UserProfileEigenModel> model = modeler.build("default",user, mockMatrix);
-        Assert.assertEquals(model.length(), 1);
-
-        UserProfileEigenModel eigenModel = model.head();
-        Assert.assertNotNull(eigenModel.statistics());
-        Assert.assertNotNull(eigenModel.principalComponents());
-        Assert.assertNotNull(eigenModel.maxVector());
-        Assert.assertNotNull(eigenModel.minVector());
-        Assert.assertEquals(eigenModel.statistics().length, mockMatrix.getColumnDimension());
-        Assert.assertTrue(eigenModel.principalComponents().length <= mockMatrix.getColumnDimension());
-        Assert.assertTrue(eigenModel.maxVector().getDimension() <= mockMatrix.getColumnDimension());
-        Assert.assertTrue(eigenModel.minVector().getDimension() <= mockMatrix.getColumnDimension());
-        Assert.assertEquals(true, eigenModel.statistics()[3].isLowVariant());
-    }
-
-    private double[][] buildMockData(){
-        double[][] mockData = new double[][]{
-                {114503.0,2.8820906E7,123618.0,0.0,64.0,15772.0,186.0,24296.0,12.0,9.0,32.0,0.0,0.0},
-                {53300.0,390772.0,157626.0,0.0,67.0,10501.0,226.0,8567.0,14.0,0.0,12.0,0.0,0.0},
-                {25659.0,140858.0,35731.0,0.0,169.0,1619.0,520.0,2965.0,34.0,0.0,34.0,0.0,0.0}
-        };
-        return mockData;
-    }
-}
\ No newline at end of file



Mime
View raw message