iota-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tonyfaust...@apache.org
Subject incubator-iota git commit: Adding Sample 1) Fey Configuration file 2) Sample JSON files 3) A Stream Performer 4) A ZMQ Performer
Date Sat, 02 Jul 2016 03:13:52 GMT
Repository: incubator-iota
Updated Branches:
  refs/heads/master d0fd6c830 -> 02d8ab643


Adding Sample
1) Fey Configuration file
2) Sample JSON files
3) A Stream Performer
4) A ZMQ Performer


Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/02d8ab64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/02d8ab64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/02d8ab64

Branch: refs/heads/master
Commit: 02d8ab643a21accdcde561778bf59ee0ece79ba0
Parents: d0fd6c8
Author: Tony Faustini <tonyfaustini@apache.org>
Authored: Fri Jul 1 20:13:40 2016 -0700
Committer: Tony Faustini <tonyfaustini@apache.org>
Committed: Fri Jul 1 20:13:40 2016 -0700

----------------------------------------------------------------------
 Performers/NOTICE                               |   0
 .../fey/FeyConfiguration/feyConfiguration       |   5 +
 Performers/fey/FeyJSONRepo/raspberryPi.json     | 368 +++++++++++++++++++
 Performers/fey/FeyJSONRepo/test.json            |  53 +++
 .../apache/iota/fey/performer/Application.scala |  39 ++
 .../apache/iota/fey/performer/Heartbeat.scala   |  56 +++
 .../apache/iota/fey/performer/Timestamp.scala   |  56 +++
 .../fey/performer/ActorParamsNotSatisfied.scala |  20 +
 .../apache/iota/fey/performer/Application.scala |  36 ++
 .../iota/fey/performer/UnknownException.scala   |  19 +
 .../iota/fey/performer/ZMQPublisher.scala       | 134 +++++++
 .../iota/fey/performer/ZMQSubscriber.scala      | 124 +++++++
 12 files changed, 910 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/02d8ab64/Performers/NOTICE
----------------------------------------------------------------------
diff --git a/Performers/NOTICE b/Performers/NOTICE
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/02d8ab64/Performers/fey/FeyConfiguration/feyConfiguration
----------------------------------------------------------------------
diff --git a/Performers/fey/FeyConfiguration/feyConfiguration b/Performers/fey/FeyConfiguration/feyConfiguration
new file mode 100644
index 0000000..ca23bc8
--- /dev/null
+++ b/Performers/fey/FeyConfiguration/feyConfiguration
@@ -0,0 +1,5 @@
+fey-global-configuration{
+  json-repository = "/Users/<username>/Performers/fey/FeyJSONRepo"
+  jar-repository = "/Users/<username>/Performers/fey/FeyJARRepo"
+  enable-checkpoint=false
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/02d8ab64/Performers/fey/FeyJSONRepo/raspberryPi.json
----------------------------------------------------------------------
diff --git a/Performers/fey/FeyJSONRepo/raspberryPi.json b/Performers/fey/FeyJSONRepo/raspberryPi.json
new file mode 100644
index 0000000..541bcda
--- /dev/null
+++ b/Performers/fey/FeyJSONRepo/raspberryPi.json
@@ -0,0 +1,368 @@
+{
+  "guid": "MAESTRO-3",
+  "command": "RECREATE",
+  "timestamp": "7919767890",
+  "name": "DESCRIPTION",
+  "ensembles": [
+    {
+      "guid": "TEMPERATURE",
+      "command": "NONE",
+      "performers": [
+        {
+          "guid": "Heartbeat",
+          "schedule": 30000,
+          "backoff": 0,
+          "source": {
+            "name": "fey_stream.jar",
+            "classPath": "org.apache.iota.fey.performer.Heartbeat",
+            "parameters": {
+            }
+          }
+        },
+        {
+          "guid": "Cherry",
+          "schedule": 1000,
+          "backoff": 0,
+          "source": {
+            "name": "fey_cherry_rp.jar",
+            "classPath": "org.apache.iota.fey.performer.CherryRP",
+            "parameters": {
+              "cherry": "{\"op\":\"cov\", \"args\":[{\"string\":\"1ed6aed2b028486fa478b3d3f6e7ee61\"},
{\"float\":0.5} ]}",
+              "lrns": "[\"1ed6aed2b028486fa478b3d3f6e7ee61\"]",
+              "host": "192.168.0.17"
+            }
+          }
+        },
+        {
+          "guid": "ZMQPublisher",
+          "schedule": 0,
+          "backoff": 0,
+          "source": {
+            "name": "fey_zmq.jar",
+            "classPath": "org.apache.iota.fey.performer.ZMQPublisher",
+            "parameters": {
+              "zmq_port": "5559",
+              "zmq_target": "192.168.0.139"
+            }
+          }
+        }
+      ],
+      "connections": [
+        {
+          "Heartbeat": [
+            "Cherry"
+          ]
+        },
+        {
+          "Cherry": [
+            "ZMQPublisher"
+          ]
+        }
+      ]
+    },
+    {
+      "guid": "PRESSURE",
+      "command": "NONE",
+      "performers": [
+        {
+          "guid": "Heartbeat",
+          "schedule": 10000,
+          "backoff": 0,
+          "source": {
+            "name": "fey_stream.jar",
+            "classPath": "org.apache.iota.fey.performer.Heartbeat",
+            "parameters": {
+            }
+          }
+        },
+        {
+          "guid": "Cherry",
+          "schedule": 1000,
+          "backoff": 0,
+          "source": {
+            "name": "fey_cherry_rp.jar",
+            "classPath": "org.apache.iota.fey.performer.CherryRP",
+            "parameters": {
+              "cherry": "{\"op\":\"cov\", \"args\":[{\"string\":\"3fd7aed2b028486fa478b3d3f6e7ee61\"},
{\"float\":0.1} ]}",
+              "lrns": "[\"3fd7aed2b028486fa478b3d3f6e7ee61\"]",
+              "host": "192.168.0.17"
+            }
+          }
+        },
+        {
+          "guid": "ZMQPublisher",
+          "schedule": 0,
+          "backoff": 0,
+          "source": {
+            "name": "fey_zmq.jar",
+            "classPath": "org.apache.iota.fey.performer.ZMQPublisher",
+            "parameters": {
+              "zmq_port": "5559",
+              "zmq_target": "192.168.0.139"
+            }
+          }
+        }
+      ],
+      "connections": [
+        {
+          "Heartbeat": [
+            "Cherry"
+          ]
+        },
+        {
+          "Cherry": [
+            "ZMQPublisher"
+          ]
+        }
+      ]
+    },
+    {
+      "guid": "HUMIDITY",
+      "command": "NONE",
+      "performers": [
+        {
+          "guid": "Heartbeat",
+          "schedule": 30000,
+          "backoff": 0,
+          "source": {
+            "name": "fey_stream.jar",
+            "classPath": "org.apache.iota.fey.performer.Heartbeat",
+            "parameters": {
+            }
+          }
+        },
+        {
+          "guid": "Cherry",
+          "schedule": 1000,
+          "backoff": 0,
+          "source": {
+            "name": "fey_cherry_rp.jar",
+            "classPath": "org.apache.iota.fey.performer.CherryRP",
+            "parameters": {
+              "cherry": "{\"op\":\"cov\", \"args\":[{\"string\":\"2fd6aed2b028486fa478b3d3f6e7ee61\"},
{\"float\":5.0} ]}",
+              "lrns": "[\"2fd6aed2b028486fa478b3d3f6e7ee61\"]",
+              "host": "192.168.0.17"
+            }
+          }
+        },
+        {
+          "guid": "ZMQPublisher",
+          "schedule": 0,
+          "backoff": 0,
+          "source": {
+            "name": "fey_zmq.jar",
+            "classPath": "org.apache.iota.fey.performer.ZMQPublisher",
+            "parameters": {
+              "zmq_port": "5559",
+              "zmq_target": "192.168.0.139"
+            }
+          }
+        }
+      ],
+      "connections": [
+        {
+          "Heartbeat": [
+            "Cherry"
+          ]
+        },
+        {
+          "Cherry": [
+            "ZMQPublisher"
+          ]
+        }
+      ]
+    },
+    {
+      "guid": "SOUNDDB",
+      "command": "NONE",
+      "performers": [
+        {
+          "guid": "Heartbeat",
+          "schedule": 10000,
+          "backoff": 0,
+          "source": {
+            "name": "fey_stream.jar",
+            "classPath": "org.apache.iota.fey.performer.Heartbeat",
+            "parameters": {
+            }
+          }
+        },
+        {
+          "guid": "Cherry",
+          "schedule": 1000,
+          "backoff": 0,
+          "source": {
+            "name": "fey_cherry_rp.jar",
+            "classPath": "org.apache.iota.fey.performer.CherryRP",
+            "parameters": {
+              "cherry": "{\"op\":\"cov\", \"args\":[{\"string\":\"7fd6aed2b028486fa478b3d3f6e7ee61\"},
{\"float\":3.0} ]}",
+              "lrns": "[\"7fd6aed2b028486fa478b3d3f6e7ee61\"]",
+              "host": "192.168.0.17"
+            }
+          }
+        },
+        {
+          "guid": "ZMQPublisher",
+          "schedule": 0,
+          "backoff": 0,
+          "source": {
+            "name": "fey_zmq.jar",
+            "classPath": "org.apache.iota.fey.performer.ZMQPublisher",
+            "parameters": {
+              "zmq_port": "5559",
+              "zmq_target": "192.168.0.139"
+            }
+          }
+        }
+      ],
+      "connections": [
+        {
+          "Heartbeat": [
+            "Cherry"
+          ]
+        },
+        {
+          "Cherry": [
+            "ZMQPublisher"
+          ]
+        }
+      ]
+    },
+    {
+      "guid": "VIBRATION_AGGREGATE",
+      "command": "NONE",
+      "performers": [
+        {
+          "guid": "Heartbeat",
+          "schedule": 10000,
+          "backoff": 0,
+          "source": {
+            "name": "fey_stream.jar",
+            "classPath": "org.apache.iota.fey.performer.Heartbeat",
+            "parameters": {
+            }
+          }
+        },
+        {
+          "guid": "Cherry",
+          "schedule": 1000,
+          "backoff": 0,
+          "source": {
+            "name": "fey_cherry_rp.jar",
+            "classPath": "org.apache.iota.fey.performer.CherryRP",
+            "parameters": {
+              "cherry": "{\"op\":\"cov\", \"args\":[{\"string\":\"5fd6aed2b028486fa478b3d3f6e7ee61\"},
{\"float\":0.02} ]}",
+              "lrns": "[\"5fd6aed2b028486fa478b3d3f6e7ee61\"]",
+              "host": "192.168.0.17"
+            }
+          }
+        },
+        {
+          "guid": "ZMQPublisher",
+          "schedule": 0,
+          "backoff": 0,
+          "source": {
+            "name": "fey_zmq.jar",
+            "classPath": "org.apache.iota.fey.performer.ZMQPublisher",
+            "parameters": {
+              "zmq_port": "5559",
+              "zmq_target": "192.168.0.139"
+            }
+          }
+        }
+      ],
+      "connections": [
+        {
+          "Heartbeat": [
+            "Cherry"
+          ]
+        },
+        {
+          "Cherry": [
+            "ZMQPublisher"
+          ]
+        }
+      ]
+    },
+    {
+      "guid": "VIBRATION",
+      "command": "NONE",
+      "performers": [
+        {
+          "guid": "VibrationStream",
+          "schedule": 1000,
+          "backoff": 0,
+          "source": {
+            "name": "fey_cherry_rp.jar",
+            "classPath": "org.apache.iota.fey.performer.CherryRP",
+            "parameters": {
+              "cherry": "{\"didVibrationChange\":\"6fd6aed2b028486fa478b3d3f6e7ee61\"}",
+              "lrns": "[\"6fd6aed2b028486fa478b3d3f6e7ee61\"]",
+              "host": "192.168.0.17"
+            }
+          }
+        },
+        {
+          "guid": "ZMQPublisher",
+          "schedule": 0,
+          "backoff": 0,
+          "source": {
+            "name": "fey_zmq.jar",
+            "classPath": "org.apache.iota.fey.performer.ZMQPublisher",
+            "parameters": {
+              "zmq_port": "5559",
+              "zmq_target": "192.168.0.139"
+            }
+          }
+        }
+      ],
+      "connections": [
+        {
+          "VibrationStream": [
+            "ZMQPublisher"
+          ]
+        }
+      ]
+    },
+    {
+      "guid": "SOUND_WAV",
+      "command": "NONE",
+      "performers": [
+        {
+          "guid": "Cherry",
+          "schedule": 3000,
+          "backoff": 0,
+          "source": {
+            "name": "fey_cherry_rp.jar",
+            "classPath": "org.apache.iota.fey.performer.CherryRP",
+            "parameters": {
+              "cherry": "{\"didSoundChange\":\"9fd6aed2b028486fa478b3d3f6e7ee61\"}",
+              "lrns": "[\"9fd6aed2b028486fa478b3d3f6e7ee61\"]",
+              "host": "192.168.0.17"
+            }
+          }
+        },
+        {
+          "guid": "ZMQPublisher",
+          "schedule": 0,
+          "backoff": 0,
+          "source": {
+            "name": "fey_zmq.jar",
+            "classPath": "org.apache.iota.fey.performer.ZMQPublisher",
+            "parameters": {
+              "zmq_port": "5559",
+              "zmq_target": "192.168.0.139"
+            }
+          }
+        }
+      ],
+      "connections": [
+        {
+          "Cherry": [
+            "ZMQPublisher"
+          ]
+        }
+      ]
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/02d8ab64/Performers/fey/FeyJSONRepo/test.json
----------------------------------------------------------------------
diff --git a/Performers/fey/FeyJSONRepo/test.json b/Performers/fey/FeyJSONRepo/test.json
new file mode 100644
index 0000000..b3e00f0
--- /dev/null
+++ b/Performers/fey/FeyJSONRepo/test.json
@@ -0,0 +1,53 @@
+{
+  "guid": "OrchestrationZ",
+  "command": "CREATE",
+  "timestamp": "591997890",
+  "name": "DESCRIPTION",
+  "ensembles": [
+    {
+      "guid": "EnsembleZ",
+      "command": "NONE",
+      "performers": [
+        {
+          "guid": "PUBLISHER",
+          "schedule": 0,
+          "backoff": 0,
+          "source": {
+            "name": "fey_zmq.jar",
+            "classPath": "org.apache.iota.fey.performer.ZMQPublisher",
+            "parameters": {}
+          }
+        },
+        {
+          "guid": "SUBSCRIBER",
+          "schedule": 1000,
+          "backoff": 0,
+          "source": {
+            "name": "fey_zmq.jar",
+            "classPath": "org.apache.iota.fey.performer.ZMQSubscriber",
+            "parameters": {}
+          }
+        },
+        {
+          "guid": "Timestamp",
+          "schedule": 1000,
+          "backoff": 0,
+          "source": {
+            "name": "fey_stream.jar",
+            "classPath": "org.apache.iota.fey.performer.Timestamp",
+            "parameters": {}
+          }
+        }
+
+      ],
+      "connections": [
+        {
+          "Timestamp": [
+            "PUBLISHER"
+          ],
+          "SUBSCRIBER": []
+        }
+      ]
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/02d8ab64/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Application.scala
----------------------------------------------------------------------
diff --git a/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Application.scala
b/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Application.scala
new file mode 100644
index 0000000..a574c26
--- /dev/null
+++ b/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Application.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey.performer
+
+
+import akka.actor.{ActorSystem, Props}
+import org.apache.iota.fey.FeyGenericActor.PROCESS
+import scala.concurrent.duration._
+
+object Application extends App {
+
+  println("Starting")
+
+  implicit val system = ActorSystem("STREAM-RUN")
+
+  val timestamp = system.actorOf(Props(classOf[Timestamp], Map.empty, 1.minutes, Map.empty,
1.seconds, "", "", false), name = "TIMESTAMP")
+
+  timestamp ! PROCESS("Stream it")
+
+  val heartbeat = system.actorOf(Props(classOf[Heartbeat], Map.empty, 1.minutes, Map.empty,
1.seconds, "", "", false), name = "HEARTBEAT")
+
+  heartbeat ! PROCESS("Stream it")
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/02d8ab64/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Heartbeat.scala
----------------------------------------------------------------------
diff --git a/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Heartbeat.scala
b/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Heartbeat.scala
new file mode 100644
index 0000000..ad16632
--- /dev/null
+++ b/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Heartbeat.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey.performer
+
+import akka.actor.ActorRef
+import org.apache.iota.fey.FeyGenericActor
+
+import scala.collection.immutable.Map
+import scala.concurrent.duration._
+
+class Heartbeat(override val params: Map[String, String] = Map.empty,
+                override val backoff: FiniteDuration = 1.minutes,
+                override val connectTo: Map[String, ActorRef] = Map.empty,
+                override val schedulerTimeInterval: FiniteDuration = 30.seconds,
+                override val orchestrationName: String = "",
+                override val orchestrationID: String = "",
+                override val autoScale: Boolean = false) extends FeyGenericActor {
+
+  override def onStart = {
+  }
+
+  override def onStop = {
+  }
+
+  override def onRestart(reason: Throwable) = {
+    // Called after actor is up and running - after self restart
+  }
+
+  override def customReceive: Receive = {
+    case x => log.info(s"Untreated $x")
+  }
+
+  override def processMessage[T](message: T, sender: ActorRef): Unit = {
+  }
+
+  override def execute() = {
+    log.info("Alive")
+    propagateMessage("Alive")
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/02d8ab64/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Timestamp.scala
----------------------------------------------------------------------
diff --git a/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Timestamp.scala
b/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Timestamp.scala
new file mode 100644
index 0000000..3f22688
--- /dev/null
+++ b/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Timestamp.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iota.fey.performer
+
+import akka.actor.ActorRef
+import org.apache.iota.fey.FeyGenericActor
+
+import scala.collection.immutable.Map
+import scala.concurrent.duration._
+
+class Timestamp(override val params: Map[String, String] = Map.empty,
+                override val backoff: FiniteDuration = 1.minutes,
+                override val connectTo: Map[String, ActorRef] = Map.empty,
+                override val schedulerTimeInterval: FiniteDuration = 30.seconds,
+                override val orchestrationName: String = "",
+                override val orchestrationID: String = "",
+                override val autoScale: Boolean = false) extends FeyGenericActor {
+
+  override def onStart = {
+  }
+
+  override def onStop = {
+  }
+
+  override def onRestart(reason: Throwable) = {
+    // Called after actor is up and running - after self restart
+  }
+
+  override def customReceive: Receive = {
+    case x => log.info(s"Untreated $x")
+  }
+
+  override def processMessage[T](message: T, sender: ActorRef): Unit = {
+  }
+
+  override def execute() = {
+    val ts = java.lang.System.currentTimeMillis()
+    log.info(ts.toString)
+    propagateMessage(ts.toString)
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/02d8ab64/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ActorParamsNotSatisfied.scala
----------------------------------------------------------------------
diff --git a/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ActorParamsNotSatisfied.scala
b/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ActorParamsNotSatisfied.scala
new file mode 100644
index 0000000..8b6359f
--- /dev/null
+++ b/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ActorParamsNotSatisfied.scala
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iota.fey.performer
+
+
+case class ActorParamsNotSatisfied(message: String) extends Exception(message)

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/02d8ab64/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/Application.scala
----------------------------------------------------------------------
diff --git a/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/Application.scala
b/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/Application.scala
new file mode 100644
index 0000000..d99a826
--- /dev/null
+++ b/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/Application.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iota.fey.performer
+
+import akka.actor.{ActorSystem, Props}
+import org.apache.iota.fey.FeyGenericActor.PROCESS
+import scala.concurrent.duration._
+
+object Application extends App {
+
+  //println("Starting")
+
+  //implicit val system = ActorSystem("ZMQ-RUN")
+
+  //val publish = system.actorOf(Props(classOf[ZMQPublisher], Map.empty,1.minutes, Map.empty,
1.seconds,"","",false ), name = "PUBLISH")
+
+  //publish ! PROCESS("Publish it")
+
+  //  val subscribe = system.actorOf(Props(classOf[ZMQPublisher], Map.empty,1.minutes, Map.empty,
1.seconds,"","",false ), name = "SUBSCRIBE")
+  //
+  //  subscribe ! PROCESS("Subscribe to it")
+}

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/02d8ab64/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/UnknownException.scala
----------------------------------------------------------------------
diff --git a/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/UnknownException.scala
b/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/UnknownException.scala
new file mode 100644
index 0000000..032eb27
--- /dev/null
+++ b/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/UnknownException.scala
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iota.fey.performer
+
+case class UnknownException(message: String) extends Exception(message)

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/02d8ab64/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
----------------------------------------------------------------------
diff --git a/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
b/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
new file mode 100644
index 0000000..4c43d45
--- /dev/null
+++ b/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iota.fey.performer
+
+import akka.actor.ActorRef
+import org.apache.iota.fey.FeyGenericActor
+import org.zeromq.ZMQ
+
+import scala.concurrent.duration._
+
+
+class ZMQPublisher(override val params: Map[String, String] = Map.empty,
+                   override val backoff: FiniteDuration = 1.minutes,
+                   override val connectTo: Map[String, ActorRef] = Map.empty,
+                   override val schedulerTimeInterval: FiniteDuration = 2.seconds,
+                   override val orchestrationName: String = "",
+                   override val orchestrationID: String = "",
+                   override val autoScale: Boolean = false) extends FeyGenericActor {
+
+  //-------default params----------
+  var port: Int = 5559
+  var target: String = "localhost"
+
+  //-------class vars-------------------
+  var ctx: ZMQ.Context = null
+  var pub: ZMQ.Socket = null
+  var count: Int = 0
+
+  override def onStart = {
+    log.info("Starting ZMQ Publisher")
+    try {
+      _params_check()
+
+      ctx = ZMQ.context(1)
+
+      pub = ctx.socket(ZMQ.PUB)
+      pub.setLinger(200)
+      pub.setHWM(10)
+      pub.connect("tcp://" + target + ":" + port)
+    }
+    catch {
+      case e: ActorParamsNotSatisfied => throw e
+    }
+  }
+
+  override def onStop = {
+    pub.disconnect("tcp://" + target + ":" + port)
+  }
+
+  override def onRestart(reason: Throwable) = {
+    // Called after actor is up and running - after self restart
+    try {
+      if (pub != null) {
+        pub.close()
+      }
+      if (ctx != null) {
+        ctx.close()
+      }
+      ctx = ZMQ.context(1)
+      pub = ctx.socket(ZMQ.PUB)
+      pub.setLinger(200)
+      pub.setHWM(10)
+      pub.connect("tcp://" + target + ":" + port)
+    }
+    catch {
+      case e: ActorParamsNotSatisfied => throw e
+      case default: Throwable => throw new UnknownException("onRestart failed because
of an exception")
+    }
+
+  }
+
+  override def customReceive: Receive = {
+    case x => log.info(s"Untreated $x")
+  }
+
+  override def execute() = {
+    log.info(s"Msg count: $count")
+  }
+
+  override def processMessage[T](message: T, sender: ActorRef): Unit = {
+    //log.info(message.asInstanceOf[String])
+    message match {
+      case message: String =>
+        // Assuming each String message has only point data
+        _zmq_send(s"$message")
+
+      //      case message: Map[String, (String,String,String,String)] =>
+      //        val formatted_msgs: Array[String] = message.map(point => _format_messages(point._2)).toArray
+      //        formatted_msgs.foreach(x => _zmq_send(x))
+
+      case _ => log.error("Ignoring this message as format not expected")
+    }
+  }
+
+  def _format_messages(fields: (String, String, String, String)): String = {
+    // The tuple has the following elements: lrn, timestamp, value, type
+    // And we have to create a message with the format:
+    // DATA|cloud|lrn|timestamp|{"<type>" : <value>}
+    "DATA|cloud|" + fields._1 + "|" + fields._2 + "|" + s"""{"${fields._3}":"${fields._4}"}"""
+  }
+
+  def _zmq_send(Message: String) = {
+    log.info(s"messsage =$Message")
+    pub.send(Message)
+    count += 1
+  }
+
+  def _params_check() = {
+    if (params.contains("zmq_port")) {
+      port = params("zmq_port").toInt
+    }
+    if (params.contains("zmq_target")) {
+      target = params("zmq_target")
+    }
+  }
+
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/02d8ab64/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQSubscriber.scala
----------------------------------------------------------------------
diff --git a/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQSubscriber.scala
b/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQSubscriber.scala
new file mode 100644
index 0000000..3decb70
--- /dev/null
+++ b/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQSubscriber.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iota.fey.performer
+
+import akka.actor.ActorRef
+import org.apache.iota.fey.FeyGenericActor
+import org.zeromq.ZMQ
+
+import scala.concurrent.duration._
+
+class ZMQSubscriber(override val params: Map[String, String] = Map.empty,
+                    override val backoff: FiniteDuration = 1.minutes,
+                    override val connectTo: Map[String, ActorRef] = Map.empty,
+                    override val schedulerTimeInterval: FiniteDuration = 2.seconds,
+                    override val orchestrationName: String = "",
+                    override val orchestrationID: String = "",
+                    override val autoScale: Boolean = false) extends FeyGenericActor {
+
+  //-------default params----------
+  var port: Int = 5563
+  var target: String = "localhost"
+  val topic_filter: String = "DATA"
+
+  //-------class vars-------------------
+  var ctx: ZMQ.Context = null
+  var pub: ZMQ.Socket = null
+  var count: Int = 0
+
+  override def onStart = {
+    log.info("Starting ZMQ Subscriber")
+    try {
+
+      _params_check()
+
+      // Prepare our context and subscriber
+      ctx = ZMQ.context(1)
+      val subscriber = ctx.socket(ZMQ.SUB)
+
+      subscriber.bind(s"tcp://$target:$port")
+      subscriber.subscribe(topic_filter.getBytes())
+      while (true) {
+        // Read envelope with address
+        val address = new String(subscriber.recv(0))
+        // Read message contents
+        val contents = new String(subscriber.recv(0))
+        log.info(s"HERE IT IS $address : $contents")
+        count += 1
+      }
+    }
+    catch {
+      case e: ActorParamsNotSatisfied => throw e
+    }
+  }
+
+  override def onStop = {
+    pub.disconnect("tcp://" + target + ":" + port)
+    pub.close()
+    ctx.close()
+    pub = null
+    ctx = null
+  }
+
+  override def onRestart(reason: Throwable) = {
+    // Called after actor is up and running - after self restart
+    try {
+      if (pub != null) {
+        pub.close()
+      }
+      if (ctx != null) {
+        ctx.close()
+      }
+    }
+    catch {
+      case e: ActorParamsNotSatisfied => throw e
+      case default: Throwable => throw new UnknownException("onRestart failed because
of an exception")
+    }
+
+  }
+
+  override def customReceive: Receive = {
+    case x => log.info(s"Untreated $x")
+  }
+
+  override def execute() = {
+    log.info(s"Msg count: $count")
+  }
+
+  override def processMessage[T](message: T, sender: ActorRef): Unit = {
+    message match {
+      case _ => log.info("Ignoring this message as format not expected")
+    }
+  }
+
+  def _format_messages(fields: (String, String, String, String)): String = {
+    // The tuple has the following elements: lrn, timestamp, value, type
+    // And we have to create a message with the format:
+    // DATA|cloud|lrn|timestamp|{"<type>" : <value>}
+    "DATA|cloud|" + fields._1 + "|" + fields._2 + "|" + s"""{"${fields._3}":"${fields._4}"}"""
+  }
+
+  def _params_check() = {
+    if (params.contains("zmq_port")) {
+      port = params("zmq_port").toInt
+    }
+    if (params.contains("zmq_target")) {
+      target = params("zmq_target")
+    }
+  }
+
+}
\ No newline at end of file


Mime
View raw message