iota-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tonyfaust...@apache.org
Subject [1/3] incubator-iota git commit: [IOTA-34] Add Global Watch Service to Fey - No tests added yet
Date Tue, 31 Jan 2017 22:26:42 GMT
Repository: incubator-iota
Updated Branches:
  refs/heads/master 2895047d1 -> 4775f9905


[IOTA-34] Add Global Watch Service to Fey
- No tests added yet


Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/3d2e99d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/3d2e99d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/3d2e99d9

Branch: refs/heads/master
Commit: 3d2e99d9a8e989a4a10910f19169b3d8cf1345c8
Parents: 9c41fb0
Author: Barbara Gomes <barbaramaltagomes@gmail.com>
Authored: Fri Nov 4 12:58:35 2016 -0700
Committer: Barbara Gomes <barbaramaltagomes@gmail.com>
Committed: Fri Nov 4 12:58:35 2016 -0700

----------------------------------------------------------------------
 .../src/main/scala/GlobalWatchService.scala     | 106 +++++++++++++++
 .../scala/org/apache/iota/fey/FeyCore.scala     |   4 +
 .../org/apache/iota/fey/FeyGenericActor.scala   |  63 +++++++--
 .../iota/fey/GlobalWatchServiceTask.scala       | 132 +++++++++++++++++++
 .../scala/org/apache/iota/fey/FeyCoreSpec.scala |   2 +-
 5 files changed, 293 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3d2e99d9/fey-core/src/main/scala/GlobalWatchService.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/GlobalWatchService.scala b/fey-core/src/main/scala/GlobalWatchService.scala
new file mode 100644
index 0000000..3d4377c
--- /dev/null
+++ b/fey-core/src/main/scala/GlobalWatchService.scala
@@ -0,0 +1,106 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import java.nio.file.{Files, Path, Paths, WatchEvent}
+
+import akka.actor.{Actor, ActorLogging, ActorRef}
+import org.apache.iota.fey.GlobalWatchService.{ENTRY_CREATED, REGISTER_WATCHER_PERFORMER}
+import org.apache.iota.fey.WatchingDirectories.STOPPED
+
+class GlobalWatchService extends Actor with ActorLogging{
+
+  //WatchService
+  var watchThread:Thread = null
+  val watchFileTask:GlobalWatchServiceTask = new GlobalWatchServiceTask(self)
+
+  override def preStart(): Unit = {
+    startWatcher("PRE-START")
+  }
+
+  override def postStop(): Unit = {
+    stopWatcher("POST-STOP")
+  }
+
+  private def startWatcher(from: String) = {
+    log.info(s"Starting Global Watcher from $from")
+    watchThread = new Thread(watchFileTask, "FEY_GLOBAL_WATCH_SERVICE_PERFORMERS")
+    watchThread.setDaemon(true)
+    watchThread.start()
+  }
+
+  private def stopWatcher(from: String) = {
+    log.info(s"Stopping Global Watcher from $from")
+    if(watchThread != null && watchThread.isAlive){
+      watchThread.interrupt()
+      watchThread = null
+    }
+  }
+
+  override def receive: Receive = {
+    case REGISTER_WATCHER_PERFORMER(path, file_name, actor, events, loadExists) =>
+      registerPath(path,file_name,actor,events,loadExists)
+    case STOPPED =>
+      stopWatcher("STOPPED-THREAD")
+      startWatcher("STOPPED-THREAD")
+    case x => log.error(s"Unknown message $x")
+  }
+
+  private def broadcastMessageIfFileExists(actor: ActorRef, pathWithFile: String) = {
+    val filePath = Paths.get(pathWithFile)
+    if(Files.exists(filePath)){
+      log.info(s"File $pathWithFile exists. Broadcasting message to actor ${actor.path.toString}")
+      actor ! GlobalWatchService.ENTRY_CREATED(filePath)
+    }
+  }
+
+  private def registerPath(dir_path: String, file_name:Option[String], actor: ActorRef, events:
Array[WatchEvent.Kind[_]], loadExists: Boolean) = {
+    WatchingDirectories.actorsInfo.get((dir_path,file_name)) match {
+      case Some(info) =>
+        val newInfo:Map[WatchEvent.Kind[_], Array[ActorRef]] = events.map(event => {
+          info.get(event) match {
+            case Some(actors) => (event, (Array(actor) ++ actors))
+            case None => (event, Array(actor))
+          }
+        }).toMap
+        WatchingDirectories.actorsInfo.put((dir_path,file_name), info ++ newInfo)
+        watchFileTask.watch(Paths.get(dir_path),actor.path.toString,events)
+      case None =>
+        val tmpEvents:Map[WatchEvent.Kind[_], Array[ActorRef]] = events.map(event => {(event,
Array(actor))}).toMap
+        WatchingDirectories.actorsInfo.put((dir_path,file_name), tmpEvents)
+        watchFileTask.watch(Paths.get(dir_path),actor.path.toString,events)
+    }
+
+    if(file_name.isDefined && loadExists){
+      log.info(s"Checking if file $dir_path/${file_name.get} already exist")
+      broadcastMessageIfFileExists(actor, s"$dir_path/${file_name.get}")
+    }
+
+  }
+
+}
+
+object GlobalWatchService{
+  sealed case class ENTRY_CREATED(path:Path)
+  sealed case class ENTRY_MODIFIED(path:Path)
+  sealed case class ENTRY_DELETED(path:Path)
+  sealed case class REGISTER_WATCHER_PERFORMER(dir_path: String, file_name:Option[String],
+                                               actor: ActorRef, events: Array[WatchEvent.Kind[_]],
+                                               loadIfExists: Boolean)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3d2e99d9/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
index 3018d03..c3a7ac6 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
@@ -30,6 +30,7 @@ import JSON_PATH._
 import akka.routing.GetRoutees
 import org.apache.iota.fey.Orchestration.{CREATE_ENSEMBLES, DELETE_ENSEMBLES, UPDATE_ENSEMBLES}
 import com.eclipsesource.schema._
+import org.apache.iota.fey.GlobalWatchService.REGISTER_WATCHER_PERFORMER
 
 import scala.collection.mutable.HashMap
 
@@ -41,6 +42,7 @@ protected class FeyCore extends Actor with ActorLogging{
   val monitoring_actor = FEY_MONITOR.actorRef
 
   val identifier: ActorRef = context.actorOf(Props(classOf[IdentifyFeyActors]), name = IDENTIFIER_NAME)
+  val globalWatcher: ActorRef = context.actorOf(Props(classOf[GlobalWatchService]), name
= "GLOBAL_WATCH_SERVICE")
   context.watch(identifier)
 
   override def receive: Receive = {
@@ -60,6 +62,8 @@ protected class FeyCore extends Actor with ActorLogging{
           orchestrationReceivedNoFile(orchestrationJson)
       }
 
+    case REGISTER_WATCHER_PERFORMER(path, file_name, actor, events,ifExists) =>
+      globalWatcher ! REGISTER_WATCHER_PERFORMER(path, file_name, actor, events, ifExists)
 
     case STOP_EMPTY_ORCHESTRATION(orchID) =>
       log.warning(s"Deleting Empty Orchestration $orchID")

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3d2e99d9/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
index 36f39fa..83df62f 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
@@ -18,6 +18,8 @@
 
 package org.apache.iota.fey
 
+import java.nio.file.{Path, WatchEvent}
+
 import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable}
 import akka.routing.GetRoutees
 
@@ -46,7 +48,7 @@ abstract class FeyGenericActor(val params: Map[String,String] = Map.empty,
   extends Actor with ActorLogging{
 
   import FeyGenericActor._
-
+  import GlobalWatchService._
   /**
     * Keeps reference to the cancellable
     */
@@ -56,23 +58,27 @@ abstract class FeyGenericActor(val params: Map[String,String] = Map.empty,
 
   override final def receive: Receive = {
 
-    case PRINT_PATH =>
-      log.info(s"** ${self.path} **")
+    case PRINT_PATH => log.info(s"** ${self.path} **")
 
-    case STOP =>
-      context.stop(self)
+    case STOP => context.stop(self)
 
-    case PROCESS(message) =>
-      if(System.nanoTime() >= endBackoff) {
-        processMessage(message, sender())
-      }
-    // In case
-    case EXCEPTION(reason) => throw reason
+    case ENTRY_CREATED(path) => onReceiveWatcherCreate(path)
+
+    case ENTRY_MODIFIED(path) => onReceiveWatcherModify(path)
+
+    case ENTRY_DELETED(path) => onReceiveWatcherDelete(path)
 
+    case PROCESS(message) => checkBackoff(message, sender())
+
+    case EXCEPTION(reason) => throw reason
     case GetRoutees => //Discard
+    case x => customReceive(x) //Not treated messages will be pass over to the receiveComplement
+  }
 
-    //Not treated messages will be pass over to the receiveComplement
-    case x => customReceive(x)
+  private def checkBackoff[T](message: T, sender: ActorRef) = {
+    if(System.nanoTime() >= endBackoff) {
+      processMessage(message, sender)
+    }
   }
 
   override final def preRestart(reason: Throwable, message: Option[Any]): Unit = {
@@ -107,6 +113,11 @@ abstract class FeyGenericActor(val params: Map[String,String] = Map.empty,
     log.info("RESTARTED method")
   }
 
+  final def registerPathToGlobalWatcher(dir_path: String, file_name:Option[String],
+                                        events: Array[WatchEvent.Kind[_]], loadIfExists:
Boolean = false): Unit = {
+    FEY_CORE_ACTOR.actorRef !  REGISTER_WATCHER_PERFORMER(dir_path, file_name, self, events,
loadIfExists)
+  }
+
   /**
     * Stops the scheduler
     */
@@ -230,6 +241,32 @@ abstract class FeyGenericActor(val params: Map[String,String] = Map.empty,
     */
   def startMonitorInfo:String = "Started"
 
+  /**
+    * Called every time the performer is notified of a file watcher event
+    * of type MODIFY
+    * @param path path of the file
+    */
+  def onReceiveWatcherModify(path: Path):Unit = {
+    log.info(s"File Modified: $path")
+  }
+
+  /**
+    * Called every time the performer is notified of a file watcher event
+    * of type DELETE
+    * @param path path of the file
+    */
+  def onReceiveWatcherDelete(path: Path):Unit = {
+    log.info(s"File Deleted: $path")
+  }
+
+  /**
+    * Called every time the performer is notified of a file watcher event
+    * of type CREATE
+    * @param path path of the file
+    */
+  def onReceiveWatcherCreate(path: Path):Unit = {
+    log.info(s"File Created: $path")
+  }
 }
 
 object FeyGenericActor {

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3d2e99d9/fey-core/src/main/scala/org/apache/iota/fey/GlobalWatchServiceTask.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/GlobalWatchServiceTask.scala b/fey-core/src/main/scala/org/apache/iota/fey/GlobalWatchServiceTask.scala
new file mode 100644
index 0000000..d1d9d31
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/GlobalWatchServiceTask.scala
@@ -0,0 +1,132 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import java.nio.file.{FileSystems, Path, WatchEvent}
+import java.nio.file.StandardWatchEventKinds._
+import akka.actor.ActorRef
+import org.slf4j.LoggerFactory
+import scala.collection.mutable.HashMap
+
+class GlobalWatchServiceTask(parentActor: ActorRef) extends Runnable {
+
+  private val watchService = FileSystems.getDefault.newWatchService()
+  private val log = LoggerFactory.getLogger(this.getClass)
+
+  def watch(path: Path, actorName: String, events:Array[WatchEvent.Kind[_]]): Boolean = {
+    try {
+      log.info(s"Watching directory $path for actor $actorName")
+      WatchingDirectories.watchingEvents.get(path.toString) match {
+        case None =>
+          WatchingDirectories.watchingEvents.put(path.toString, events)
+          path.register(watchService, events)
+        case Some(oldEvents) =>
+          val newEvents = (oldEvents ++ events).distinct
+          path.register(watchService, newEvents)
+          WatchingDirectories.watchingEvents.put(path.toString, newEvents)
+      }
+      true
+    }catch{
+      case e: Exception =>
+        log.error(s"Could not register to directory $path for actor $actorName", e)
+        false
+    }
+  }
+
+  override def run() {
+    try {
+      while (!Thread.currentThread().isInterrupted) {
+        val key = watchService.take()
+        val eventsIterator = key.pollEvents().iterator()
+        while(eventsIterator.hasNext) {
+          val event = eventsIterator.next()
+          val relativePath = event.context().asInstanceOf[Path]
+          val path = key.watchable().asInstanceOf[Path].resolve(relativePath)
+          event.kind() match {
+            case ENTRY_CREATE => distributeFileEvent(path, ENTRY_CREATE)
+            case ENTRY_MODIFY => distributeFileEvent(path, ENTRY_MODIFY)
+            case ENTRY_DELETE => distributeFileEvent(path, ENTRY_DELETE)
+            case x => log.warn(s"Event unknown $x")
+          }
+        }
+        key.reset()
+      }
+    } catch {
+      case e: Exception =>
+        log.error("Global Watch service task died.",e)
+    } finally {
+      watchService.close()
+      parentActor ! WatchingDirectories.STOPPED
+    }
+  }
+
+  private def distributeFileEvent(path: Path, event: WatchEvent.Kind[_]) = {
+    val dir = path.getParent.toString
+    val name = path.getFileName.toString
+
+    WatchingDirectories.actorsInfo.get((dir,Some(name))) match{
+      case Some(actorInfo) =>
+        actorInfo.get(event) match {
+          case Some(actors) =>
+            broadcastToActors(actors, event, path)
+          case None =>
+        }
+      case None =>
+        WatchingDirectories.actorsInfo.get((dir,None)) match{
+          case Some(actorInfo) =>
+            actorInfo.get(event) match {
+              case Some(actors) =>
+                broadcastToActors(actors, event, path)
+              case None =>
+            }
+          case None =>
+        }
+    }
+  }
+
+  private def broadcastToActors(actors: Array[ActorRef], event: WatchEvent.Kind[_], path:
Path) = {
+    actors.foreach( actor => {
+      event match {
+        case ENTRY_CREATE => actor ! GlobalWatchService.ENTRY_CREATED(path)
+        case ENTRY_MODIFY => actor ! GlobalWatchService.ENTRY_MODIFIED(path)
+        case ENTRY_DELETE => actor ! GlobalWatchService.ENTRY_DELETED(path)
+        case x => log.warn(s"Event unknown $x")
+      }
+    })
+  }
+}
+
+private object WatchingDirectories{
+
+  case object STOPPED
+
+  /**
+    * Key: (dir_path, file_name)
+    * value: Map[ EventType, List of ActorsRef]
+    */
+  val actorsInfo:HashMap[(String,Option[String]), Map[WatchEvent.Kind[_], Array[ActorRef]]]
= HashMap.empty[(String,Option[String]), Map[WatchEvent.Kind[_], Array[ActorRef]]]
+
+  /**
+    * Keeps track off all events that has been asked to monitor
+    * for each path
+    * key: dir path
+    * value: list of events already being tracked
+    */
+  val watchingEvents:HashMap[String,Array[WatchEvent.Kind[_]]] = HashMap.empty[String,Array[WatchEvent.Kind[_]]]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/3d2e99d9/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala
----------------------------------------------------------------------
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala
index 5ae55c2..39f2984 100644
--- a/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala
+++ b/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala
@@ -112,7 +112,7 @@ class FeyCoreSpec extends BaseAkkaSpec  {
 
   "Sending FeyCore.JSON_TREE to FeyCore" should {
     s"result in logging a 6 path messages at Info " in {
-      EventFilter.info(pattern = s"^akka://.*/user/.*", occurrences = 6) intercept {
+      EventFilter.info(pattern = s"^akka://.*/user/.*", occurrences = 7) intercept {
         feyCoreRef ! FeyCore.JSON_TREE
       }
     }


Mime
View raw message