hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject hama git commit: HAMA-992: Adding Hama streaming
Date Tue, 07 Jun 2016 23:14:07 GMT
Repository: hama
Updated Branches:
  refs/heads/master fb28bcbcf -> 556c7b386


HAMA-992: Adding Hama streaming


Project: http://git-wip-us.apache.org/repos/asf/hama/repo
Commit: http://git-wip-us.apache.org/repos/asf/hama/commit/556c7b38
Tree: http://git-wip-us.apache.org/repos/asf/hama/tree/556c7b38
Diff: http://git-wip-us.apache.org/repos/asf/hama/diff/556c7b38

Branch: refs/heads/master
Commit: 556c7b386b1fb27a41763a61d1126e1dd1bd216a
Parents: fb28bcb
Author: Thomas Jungblut <thomas.jungblut@gmail.com>
Authored: Sat Jun 4 17:34:25 2016 +0100
Committer: Edward J. Yoon <edwardyoon@apache.org>
Committed: Wed Jun 8 08:13:35 2016 +0900

----------------------------------------------------------------------
 CHANGES.txt                   |   1 +
 bin/grooms.sh                 |   0
 bin/hama                      |   0
 bin/hama-config.sh            |   0
 bin/hama-daemon.sh            |   0
 bin/hama-daemons.sh           |   0
 bin/start-bspd.sh             |   0
 bin/stop-bspd.sh              |   0
 bin/zookeepers.sh             |   0
 conf/groomservers             |   3 +-
 conf/hama-default.xml         |   4 +-
 conf/hama-env.sh              |   2 +-
 conf/hama-site.xml            |  47 +++++++++
 python/BSP.py                 |  35 +++++++
 python/BSPPeer.py             | 189 +++++++++++++++++++++++++++++++++++++
 python/BSPRunner.py           |  35 +++++++
 python/BinaryProtocol.py      |  64 +++++++++++++
 python/BspJobConfiguration.py |  29 ++++++
 python/HelloWorldBSP.py       |  35 +++++++
 python/KMeansBSP.py           | 171 +++++++++++++++++++++++++++++++++
 python/README.txt             |  12 +++
 21 files changed, 623 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8c25d0e..fce4097 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@ Release 0.7.2 (unreleased changes)
   NEW FEATURES
  
     HAMA-988: Allow to add additional no-input tasks (edwardyoon)
+    HAMA-992: Adding Hama streaming (Thomas Jungblut via edwardyoon)
 
   BUG FIXES
 

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/bin/grooms.sh
----------------------------------------------------------------------
diff --git a/bin/grooms.sh b/bin/grooms.sh
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/bin/hama
----------------------------------------------------------------------
diff --git a/bin/hama b/bin/hama
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/bin/hama-config.sh
----------------------------------------------------------------------
diff --git a/bin/hama-config.sh b/bin/hama-config.sh
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/bin/hama-daemon.sh
----------------------------------------------------------------------
diff --git a/bin/hama-daemon.sh b/bin/hama-daemon.sh
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/bin/hama-daemons.sh
----------------------------------------------------------------------
diff --git a/bin/hama-daemons.sh b/bin/hama-daemons.sh
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/bin/start-bspd.sh
----------------------------------------------------------------------
diff --git a/bin/start-bspd.sh b/bin/start-bspd.sh
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/bin/stop-bspd.sh
----------------------------------------------------------------------
diff --git a/bin/stop-bspd.sh b/bin/stop-bspd.sh
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/bin/zookeepers.sh
----------------------------------------------------------------------
diff --git a/bin/zookeepers.sh b/bin/zookeepers.sh
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/conf/groomservers
----------------------------------------------------------------------
diff --git a/conf/groomservers b/conf/groomservers
index 2fbb50c..8449335 100644
--- a/conf/groomservers
+++ b/conf/groomservers
@@ -1 +1,2 @@
-localhost
+master.edward.org
+slave.edward.org

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/conf/hama-default.xml
----------------------------------------------------------------------
diff --git a/conf/hama-default.xml b/conf/hama-default.xml
index 50403e5..1dee2ea 100644
--- a/conf/hama-default.xml
+++ b/conf/hama-default.xml
@@ -47,13 +47,13 @@
   </property>
   <property>
     <name>bsp.http.infoserver.port</name>
-    <value>40013</value>
+    <value>40015</value>
     <description>The port where the web-interface can be seen.
     </description>
   </property>
   <property>
     <name>bsp.http.groomserver.port</name>
-    <value>40015</value>
+    <value>40017</value>
     <description>The port where the web-interface can be seen.
     </description>
   </property>

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/conf/hama-env.sh
----------------------------------------------------------------------
diff --git a/conf/hama-env.sh b/conf/hama-env.sh
index 58e3d83..edc3c30 100644
--- a/conf/hama-env.sh
+++ b/conf/hama-env.sh
@@ -22,7 +22,7 @@
 # Set environment variables here.
 
 # The java implementation to use.  Required.
-# export JAVA_HOME=/usr/lib/jvm/java-7-oracle
+export JAVA_HOME=/usr/lib/jvm/java-8-oracle
 
 # Where log files are stored.  $HAMA_HOME/logs by default.
 # export HAMA_LOG_DIR=${HAMA_HOME}/logs

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/conf/hama-site.xml
----------------------------------------------------------------------
diff --git a/conf/hama-site.xml b/conf/hama-site.xml
index dbccd1d..474f0e2 100644
--- a/conf/hama-site.xml
+++ b/conf/hama-site.xml
@@ -22,4 +22,51 @@
  */
 -->
 <configuration>
+  <property>
+    <name>bsp.master.address</name>
+    <value>master.edward.org:40000</value>
+    <description>The address of the bsp master server. Either the
+    literal string "local" or a host:port for distributed mode
+    </description>
+  </property>
+
+  <property>
+    <name>bsp.tasks.maximum</name>
+    <value>3</value>
+    <description>The maximum number of BSP tasks that will be run simultaneously 
+    by a groom server.</description>
+  </property>
+
+    <property>
+        <name>fs.defaultFS</name>
+        <value>hdfs://master.edward.org:9000</value>
+    </property>
+
+  <property>
+    <name>bsp.child.java.opts</name>
+    <value>-Xmx2048m</value>
+    <description>Java opts for the groom server child processes.  
+    The following symbol, if present, will be interpolated: @taskid@ is replaced 
+    by current TaskID. Any other occurrences of '@' will go unchanged.
+    For example, to enable verbose gc logging to a file named for the taskid in
+    /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
+          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
+    The configuration variable bsp.child.ulimit can be used to control the
+    maximum virtual memory of the child processes. 
+    </description>
+  </property>
+
+
+  <property>
+    <name>hama.zookeeper.quorum</name>
+    <value>master.edward.org</value>
+    <description>Comma separated list of servers in the ZooKeeper Quorum.
+    For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com".
+    By default this is set to localhost for local and pseudo-distributed modes
+    of operation. For a fully-distributed setup, this should be set to a full
+    list of ZooKeeper quorum servers. If HAMA_MANAGES_ZK is set in hama-env.sh
+    this is the list of servers which we will start/stop zookeeper on.
+    </description>
+  </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/python/BSP.py
----------------------------------------------------------------------
diff --git a/python/BSP.py b/python/BSP.py
new file mode 100644
index 0000000..1d0c959
--- /dev/null
+++ b/python/BSP.py
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+
+BSP Class that can be overridden to implement the computation logic.
+
+"""
+from BSPPeer import BSPPeer
+
+class BSP:
+
+    def __init__(self):
+        pass
+
+    def setup(self, peer):
+        pass
+
+    def bsp(self, peer):
+        pass
+
+    def cleanup(self, peer):
+        pass
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/python/BSPPeer.py
----------------------------------------------------------------------
diff --git a/python/BSPPeer.py b/python/BSPPeer.py
new file mode 100644
index 0000000..00aa346
--- /dev/null
+++ b/python/BSPPeer.py
@@ -0,0 +1,189 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+
+The BSPPeer handles the incoming protocol requests and forwards it to the BSP class.
+Basically you can register the to be executed BSP class into this peer,
+it will then callback the according methods.
+
+"""
+from BspJobConfiguration import BspJobConfiguration
+from sys import stdout, stdin
+from BinaryProtocol import BinaryProtocol as bp
+
+class BSPPeer:
+    PROTOCOL_VERSION = 0
+
+    def __init__(self, bspClass):
+        self.config = BspJobConfiguration()
+        self.bspClass = bspClass;
+        self.initialize()
+
+    def initialize(self):
+        """
+        INIT protocol works as follows:
+        START OP_CODE
+        PROTOCOL_NUMBER
+        SET_BSPJOB_CONF OP_CODE
+        NUMBER OF CONF ITEMS (#KEY + #VALUES)
+        N-LINES, where line is key and the following the value
+        """
+        # parse our initial values
+        line = readLine()
+        # start code is the first
+        if line == bp.getProtocolString(bp.START):
+            # check the protocol compatibility
+            protocolNumber = int(readLine())
+            if protocolNumber != self.PROTOCOL_VERSION:
+                raise RuntimeError(
+                    "Protocol version mismatch: Expected: " + str(self.PROTOCOL_VERSION)
+
+                    " but got: " + str(protocolNumber))
+        line = readLine()
+        # parse the configurations
+        if line == bp.getProtocolString(bp.SET_BSPJOB_CONF):
+            numberOfItems = readLine()
+            key = None
+            value = None
+            for i in range(0, int(numberOfItems), 2):
+                key = readLine()
+                value = readLine()
+                self.config.put(key, value)
+
+        self.ack(bp.START)
+
+    def send(self, peer, msg):
+        println(bp.getProtocolString(bp.SEND_MSG))
+        println(peer)
+        println(msg)
+
+    def getCurrentMessage(self):
+        println(bp.getProtocolString(bp.GET_MSG))
+        line = readLine()
+        # if no message is send it will send %%-1%%
+        if line == "%%-1%%":
+            return False
+
+        return line;
+
+    def getAllMessages(self):
+        msgs = []
+        numMessages = self.getNumCurrentMessages()
+        for i in range(int(numMessages)):
+            msgs.append(self.getCurrentMessage())
+        return msgs
+
+    def getNumCurrentMessages(self):
+        println(bp.getProtocolString(bp.GET_MSG_COUNT))
+        return readLine()
+
+    def sync(self):
+        println(bp.getProtocolString(bp.SYNC))
+        # this should block now until we get a response
+        line = readLine()
+        if line != (bp.getProtocolString(bp.SYNC) + "_SUCCESS"):
+            raise RuntimeError(
+                "Barrier sync failed!")
+
+    def getSuperstepCount(self):
+        println(bp.getProtocolString(bp.GET_SUPERSTEP_COUNT))
+        return readLine()
+
+    def getPeerName(self):
+        return self.getPeerNameForIndex(-1)
+
+    def getPeerNameForIndex(self, index):
+        println(bp.getProtocolString(bp.GET_PEERNAME))
+        println(str(index));
+        return readLine()
+
+    def getPeerIndex(self):
+        println(bp.getProtocolString(bp.GET_PEER_INDEX))
+        return readLine()
+
+    def getAllPeerNames(self):
+        println(bp.getProtocolString(bp.GET_ALL_PEERNAME))
+        ln = readLine()
+        names = []
+        for i in range(int(ln)):
+            peerName = readLine()
+            names.append(peerName)
+        return names
+
+    def getNumPeers(self):
+        println(bp.getProtocolString(bp.GET_PEER_COUNT))
+        return readLine()
+
+    def clear(self):
+        println(bp.getProtocolString(bp.CLEAR))
+
+    def write(self, key, value):
+        println(bp.getProtocolString(bp.WRITE_KEYVALUE))
+        println(key)
+        println(value)
+
+    def readNext(self):
+        println(bp.getProtocolString(bp.READ_KEYVALUE))
+        line = readLine()
+        secondLine = readLine()
+        # if no message is send it will send %%-1%%
+        if line == "%%-1%%" and secondLine == "%%-1%%":
+            return False
+        return [line, secondLine]
+
+    def reopenInput(self):
+        println(bp.getProtocolString(bp.REOPEN_INPUT))
+
+    # TODO counters, seq writes and partitioning
+
+    def runSetup(self):
+        line = readLine()
+        # start code is the first
+        if line.startswith(bp.getProtocolString(bp.RUN_SETUP)):
+            self.bspClass.setup(self);
+            self.ack(bp.RUN_SETUP)
+
+    def runBSP(self):
+        line = readLine()
+        # start code is the first
+        if line.startswith(bp.getProtocolString(bp.RUN_BSP)):
+            self.bspClass.bsp(self);
+            self.ack(bp.RUN_BSP)
+
+    def runCleanup(self):
+        line = readLine()
+        # start code is the first
+        if line.startswith(bp.getProtocolString(bp.RUN_CLEANUP)):
+            self.bspClass.cleanup(self);
+            self.ack(bp.RUN_CLEANUP)
+
+    def ack(self, code):
+        println(bp.getAckProtocolString(code))
+
+    def done(self):
+        println(bp.getProtocolString(bp.TASK_DONE))
+        println(bp.getProtocolString(bp.DONE))
+
+    def log(self, msg):
+        println(bp.getProtocolString(bp.LOG) + msg)
+
+
+def readLine():
+    return stdin.readline().rstrip('\n')
+
+
+def println(text):
+    print(text)
+    stdout.flush()

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/python/BSPRunner.py
----------------------------------------------------------------------
diff --git a/python/BSPRunner.py b/python/BSPRunner.py
new file mode 100644
index 0000000..f0a6e0d
--- /dev/null
+++ b/python/BSPRunner.py
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+
+Main Runner utility that will get the bsp class from the user, passed via args and start
+it with the whole context and stuff.
+
+"""
+import sys
+from BSPPeer import BSPPeer
+
+className = sys.argv[1]
+module = __import__(className)
+class_ = getattr(module, className)
+
+bspInstance = class_()
+
+peer = BSPPeer(bspInstance)
+peer.runSetup()
+peer.runBSP()
+peer.runCleanup()
+peer.done()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/python/BinaryProtocol.py
----------------------------------------------------------------------
diff --git a/python/BinaryProtocol.py b/python/BinaryProtocol.py
new file mode 100644
index 0000000..63100d8
--- /dev/null
+++ b/python/BinaryProtocol.py
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+
+Binary protocol to communicate with the Java BSP task via streams.
+
+"""
+
+class BinaryProtocol:
+    START = 0
+    SET_BSPJOB_CONF = 1
+    SET_INPUT_TYPES = 2
+    RUN_SETUP = 3
+    RUN_BSP = 4
+    RUN_CLEANUP = 5
+    READ_KEYVALUE = 6
+    WRITE_KEYVALUE = 7
+    GET_MSG = 8
+    GET_MSG_COUNT = 9
+    SEND_MSG = 10
+    SYNC = 11
+    GET_ALL_PEERNAME = 12
+    GET_PEERNAME = 13
+    GET_PEER_INDEX = 14
+    GET_PEER_COUNT = 15
+    GET_SUPERSTEP_COUNT = 16
+    REOPEN_INPUT = 17
+    CLEAR = 18
+    CLOSE = 19
+    ABORT = 20
+    DONE = 21
+    TASK_DONE = 22
+    REGISTER_COUNTER = 23
+    INCREMENT_COUNTER = 24
+    SEQFILE_OPEN = 25
+    SEQFILE_READNEXT = 26
+    SEQFILE_APPEND = 27
+    SEQFILE_CLOSE = 28
+    PARTITION_REQUEST = 29
+    PARTITION_RESPONSE = 30
+    LOG = 31
+    END_OF_DATA = 32
+
+    @staticmethod
+    def getProtocolString(opCode):
+        return "%" + str(opCode) + "%=";
+
+    @staticmethod
+    def getAckProtocolString(opCode):
+        return "%ACK_" + str(opCode) + "%=";
+

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/python/BspJobConfiguration.py
----------------------------------------------------------------------
diff --git a/python/BspJobConfiguration.py b/python/BspJobConfiguration.py
new file mode 100644
index 0000000..36eb94d
--- /dev/null
+++ b/python/BspJobConfiguration.py
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+
+A mimic configuration object contains a dictionary that maps keys to values to store information.
+
+"""
+class BspJobConfiguration:
+    def __init__(self):
+        self.conf = {}
+
+    def get(self, key):
+        return self.conf[key]
+
+    def put(self, key, value):
+        self.conf[key] = value
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/python/HelloWorldBSP.py
----------------------------------------------------------------------
diff --git a/python/HelloWorldBSP.py b/python/HelloWorldBSP.py
new file mode 100644
index 0000000..65dc4e2
--- /dev/null
+++ b/python/HelloWorldBSP.py
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+
+Basic Hello World BSP, in Hama this is called serialize printing.
+Each task sends its peer name to each other task who reads the
+message and outputs it to console.
+
+"""
+from BSP import BSP
+
+class HelloWorldBSP(BSP):
+    def bsp(self, peer):
+        name = peer.getPeerName()
+        for i in range(15):
+            for otherPeer in peer.getAllPeerNames():
+                peer.send(otherPeer, ("Hello from " + name + " in superstep " + str(i)))
+            peer.sync()
+            for msg in peer.getAllMessages():
+                peer.write(msg,"")
+
+

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/python/KMeansBSP.py
----------------------------------------------------------------------
diff --git a/python/KMeansBSP.py b/python/KMeansBSP.py
new file mode 100644
index 0000000..ea070b3
--- /dev/null
+++ b/python/KMeansBSP.py
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+
+Python implementation of the K-Means clustering example with BSP.
+According presentation: http://www.slideshare.net/tjungblut/kmeans-with-bsp
+
+"""
+from BSP import BSP
+from math import sqrt
+
+class KMeansBSP(BSP):
+    # read the first n clusters
+    def setup(self, peer):
+        self.numCenters = int(peer.config.get("kmeans.num.centers"))
+        self.maxIterations = -1
+        if not peer.config.get("kmeans.max.iterations"):
+            self.maxIterations = int(peer.config.get("kmeans.max.iterations"))
+        self.centers = []
+
+        for i in range(self.numCenters):
+            val = peer.readNext()
+            self.centers.append(self.toVector(val[1])) # the value (index 1) is the text
line
+        peer.reopenInput()
+
+    def bsp(self, peer):
+        while True:
+            self.assignCenters(peer)
+            peer.sync()
+            converged = self.updateCenters(peer)
+            peer.reopenInput()
+            if converged == 0:
+                break
+                # cool simplification of maxIterations > 0 && maxIterations <
peer.getSuperstepCount()
+            if 0 < self.maxIterations < peer.getSuperstepCount():
+                break
+
+        self.recalculateAssignmentsAndWrite(peer)
+
+    def assignCenters(self, peer):
+        newCenterArray = [None] * self.numCenters
+        summationCount = [None] * self.numCenters
+
+        while True:
+            line = peer.readNext()
+            if not line: break
+            self.assignCentersInternal(newCenterArray, summationCount, self.toVector(line[1]))
+
+        # now send messages about the local updates to each other peer
+        for i in range(self.numCenters):
+            if newCenterArray[i] is not None:
+                for peerName in peer.getAllPeerNames():
+                    # send message: "i_sum_vector" where i is the index in centers,
+                    # sum is the summation count and the rest is the vector, everything is
space separated
+                    peer.send(peerName,
+                        "%s %s %s" % (str(i), str(summationCount[i]), " ".join(map(str, newCenterArray[i]))))
+
+    def getNearestCenter(self, vector):
+        lowestIndex = 0
+        lowest = float("inf")
+        for i in range(self.numCenters):
+            dist = self.distance(vector, self.centers[i])
+            if lowest > dist:
+                lowest = dist
+                lowestIndex = i
+        return lowestIndex
+
+
+    def assignCentersInternal(self, newCenterArray, summationCount, vector):
+        lowestCenterIndex = self.getNearestCenter(vector)
+        center = self.centers[lowestCenterIndex]
+        if newCenterArray[lowestCenterIndex] is None:
+            newCenterArray[lowestCenterIndex] = center
+            summationCount[lowestCenterIndex] = 0
+        else:
+            newCenterArray[lowestCenterIndex] = self.sum(newCenterArray[lowestCenterIndex],
vector)
+            summationCount[lowestCenterIndex] += 1
+
+
+    def updateCenters(self, peer):
+        msgCenters = [None] * self.numCenters
+        incrementSum = [None] * self.numCenters
+        self.fill(incrementSum, 0)
+
+        for msg in peer.getAllMessages():
+            split = msg.split()
+            centerIndex = int(split[0])
+            oldCenter = msgCenters[centerIndex]
+            incrementSum[centerIndex] += int(split[1])
+            newCenter = self.toVector(" ".join(split[2:])) # join the vector part back to
string
+            if oldCenter is None:
+                msgCenters[centerIndex] = newCenter
+            else:
+                msgCenters[centerIndex] = self.sum(oldCenter, newCenter)
+
+        # update and convergence checks
+        for i in range(self.numCenters):
+            if msgCenters[i] is not None:
+                msgCenters[i] = [x / incrementSum[i] for x in msgCenters[i]] # average the
center messages
+                # finally check for convergence by the absolute difference
+        convergedCounter = 0;
+        for i in range(self.numCenters):
+            oldCenter = self.centers[i]
+            if msgCenters[i] is not None:
+                calculateError = self.subtractAbsSum(oldCenter, msgCenters[i])
+                if calculateError > 0.0:
+                    self.centers[i] = msgCenters[i]
+                    convergedCounter += 1
+
+        return convergedCounter
+
+
+    def recalculateAssignmentsAndWrite(self, peer):
+        # write the centers first
+        for center in self.centers:
+            peer.write(center, "")
+        peer.write("\n", "")
+
+        while True:
+            line = peer.readNext()
+            if not line: break
+            lowestDistantCenter = self.getNearestCenter(self.toVector(line[1]))
+            peer.write(str(lowestDistantCenter), line[1])
+
+        pass
+
+    def fill(self, incrementSum, x):
+        for i in range(len(incrementSum)):
+            incrementSum[i] = x
+
+    def sum(self, vector, vector2):
+        vec = []
+        for i in range(len(vector)):
+            vec.append(vector[i] + vector2[i])
+        return vec
+
+    def subtractAbsSum(self, vector, vector2):
+        diff = 0
+        for i in range(len(vector)):
+            diff += abs(vector[i] - vector2[i])
+        return diff
+
+    # distance between two vectors
+    def distance(self, a, b):
+        sum = 0
+        for i in range(len(a)):
+            diff = b[i] - a[i]
+            sum += (diff * diff)
+        return sqrt(sum)
+
+
+    # splits a line by space and puts it into a vector (list)
+    def toVector(self, line):
+        vec = []
+        split = line.split()
+        for part in split:
+            vec.append(float(part))
+        return vec
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hama/blob/556c7b38/python/README.txt
----------------------------------------------------------------------
diff --git a/python/README.txt b/python/README.txt
new file mode 100644
index 0000000..789326a
--- /dev/null
+++ b/python/README.txt
@@ -0,0 +1,12 @@
+Hama Streaming protocol implementation for Python.
+Features the usual hello world for BSP and a simple k means clustering.
+
+Using Python 3.2.3 and PyCharm 2.6.
+
+Documentation can be found here:
+
+http://wiki.apache.org/hama/HamaStreaming
+
+Protocol documentation can be found here:
+
+http://wiki.apache.org/hama/StreamingProtocol
\ No newline at end of file


Mime
View raw message