incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [23/50] [abbrv] Rename packages in preparation for move to Apache
Date Tue, 03 Jan 2012 11:19:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/core/GenericSender.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/GenericSender.java b/s4-comm/src/main/java/org/apache/s4/comm/core/GenericSender.java
new file mode 100644
index 0000000..0fff992
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/GenericSender.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import java.util.List;
+import java.util.Map;
+import org.apache.log4j.Logger;
+
+public class GenericSender {
+    static Logger logger = Logger.getLogger(GenericSender.class);
+    Map<String, String> map;
+    private DatagramSocket socket;
+    private final String zkAddress;
+    ProcessMonitor listenerMonitor;
+    int rotationCounter = 0;
+    private final Serializer serializer;
+    private final int listenerTaskCount;
+    private CommEventCallback callbackHandler;
+    private String mode;
+
+    public GenericSender(String zkAddress, String appName,
+            Object senderConfigData) {
+        this(zkAddress, appName, appName, senderConfigData);
+    }
+
+    @SuppressWarnings("unchecked")
+    public GenericSender(String zkAddress, String adapterClusterName,
+            String s4ClusterName, Object senderConfigData, Serializer serializer) {
+        this.zkAddress = zkAddress;
+        this.serializer = serializer;
+        try {
+            map = (Map<String, String>) senderConfigData;
+            mode = map.get("mode");
+            if (mode.equals("multicast")) {
+                socket = new MulticastSocket();
+            }
+            if (mode.equals("unicast")) {
+                socket = new DatagramSocket();
+            }
+            listenerMonitor = CommServiceFactory.getProcessMonitor(this.zkAddress,
+                                                                   s4ClusterName,
+                                                                   callbackHandler);
+            if (callbackHandler != null) {
+                // listenerMonitor.setCallbackHandler(callbackHandler);
+            }
+            listenerMonitor.monitor();
+            this.listenerTaskCount = listenerMonitor.getTaskCount();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public GenericSender(String zkAddress, String senderAppName,
+            String listenerAppName, Object senderConfigData) {
+        this(zkAddress,
+             senderAppName,
+             listenerAppName,
+             senderConfigData,
+             new GenericSerDeser());
+    }
+
+    /**
+     * This method will send the data to receivers in a round robin fashion
+     * 
+     * @param data
+     * @return true if data was successfully sent, false otherwise
+     */
+    @SuppressWarnings("unchecked")
+    public boolean send(Object data) {
+        try {
+            List<Object> destinationList = listenerMonitor.getDestinationList();
+            if (destinationList == null || destinationList.size() == 0) {
+                logger.error("Failed to send message: No destination available"
+                        + data);
+                return false;
+            }
+            byte[] byteBuffer = serializer.serialize(data);
+            rotationCounter = rotationCounter + 1;
+
+            int index = rotationCounter % destinationList.size();
+            Map<String, String> dest = (Map<String, String>) destinationList.get(Math.abs(index));
+            InetAddress inetAddress;
+            int port;
+            if (mode.equals("unicast")) {
+                inetAddress = InetAddress.getByName(dest.get("address"));
+                port = Integer.parseInt((dest.get("port")));
+            } else if (mode.equals("multicast")) {
+                inetAddress = InetAddress.getByName(dest.get("channel"));
+                port = Integer.parseInt((dest.get("port")));
+            } else {
+                logger.error("Failed to send message unknown mode: " + mode);
+                return false;
+            }
+            DatagramPacket dp = new DatagramPacket(byteBuffer,
+                                                   byteBuffer.length,
+                                                   inetAddress,
+                                                   port);
+            socket.send(dp);
+        } catch (IOException e) {
+            // add retry
+            logger.error("Failed to send message: " + data, e);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * This will send the data to a specific channel/receiver/partition
+     * 
+     * @param partition
+     * @param data
+     * @return true if data was successfully sent, false otherwise
+     */
+    @SuppressWarnings("unchecked")
+    public boolean sendToPartition(int partition, Object data) {
+        try {
+            byte[] byteBuffer = serializer.serialize(data);
+            Map<Integer, Object> destinationMap = listenerMonitor.getDestinationMap();
+            if (logger.isDebugEnabled()) {
+                logger.debug("Destination Map:" + destinationMap);
+            }
+            Map<String, String> dest = (Map<String, String>) destinationMap.get(partition);
+            if (dest != null) {
+                InetAddress inetAddress = InetAddress.getByName(dest.get("address"));
+                int port = Integer.parseInt((dest.get("port")));
+                DatagramPacket dp = new DatagramPacket(byteBuffer,
+                                                       byteBuffer.length,
+                                                       inetAddress,
+                                                       port);
+                socket.send(dp);
+            } else {
+                logger.warn("Destination not available for partition:"
+                        + partition + " Skipping message:" + data);
+                return false;
+            }
+        } catch (IOException e) {
+            // add retry
+            logger.error("Failed to send message: " + data, e);
+            return false;
+        }
+
+        return true;
+
+    }
+
+    /**
+     * compute partition using hashcode and send to appropriate partition
+     * 
+     * @param hashcode
+     * @param data
+     * @return true if data was successfully sent, false otherwise
+     */
+    public boolean sendUsingHashCode(int hashcode, Object data) {
+        int partition = (hashcode & Integer.MAX_VALUE) % listenerTaskCount;
+        return sendToPartition(partition, data);
+
+    }
+
+    public CommEventCallback getCallbackHandler() {
+        return callbackHandler;
+    }
+
+    public void setCallbackHandler(CommEventCallback callbackHandler) {
+        this.callbackHandler = callbackHandler;
+    }
+
+    public int getListenerTaskCount() {
+        return listenerTaskCount;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/core/GenericSerDeser.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/GenericSerDeser.java b/s4-comm/src/main/java/org/apache/s4/comm/core/GenericSerDeser.java
new file mode 100644
index 0000000..eb9af81
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/GenericSerDeser.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import org.apache.s4.comm.util.IOUtil;
+
+public class GenericSerDeser implements Serializer, Deserializer {
+
+    public byte[] serialize(Object obj) {
+        return IOUtil.serializeToBytes(obj);
+    }
+
+    public Object deserialize(byte[] buffer) {
+        return IOUtil.deserializeToObject(buffer);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/core/ListenerProcess.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/ListenerProcess.java b/s4-comm/src/main/java/org/apache/s4/comm/core/ListenerProcess.java
new file mode 100644
index 0000000..65f8c91
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/ListenerProcess.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class ListenerProcess {
+    static Logger logger = Logger.getLogger(ListenerProcess.class);
+    private final String zkaddress;
+    private final String clusterName;
+    private String listenerRoot;
+    private GenericListener genericListener;
+    private Deserializer deserializer;
+    private CommEventCallback callbackHandler;
+
+    public ListenerProcess(String zkaddress, String clusterName) {
+        this.zkaddress = zkaddress;
+        this.clusterName = clusterName;
+    }
+
+    /**
+     * This will be a blocking call and will wait until it gets a task
+     * 
+     * @return listener configuration
+     */
+    public Object acquireTaskAndCreateListener(Map<String, String> map) {
+        TaskManager manager = CommServiceFactory.getTaskManager(zkaddress,
+                                                                clusterName,
+                                                                ClusterType.S4,
+                                                                callbackHandler);
+        logger.info("Waiting for task");
+        Object listenerConfig = manager.acquireTask(map);
+        createListenerFromConfig(listenerConfig);
+        return listenerConfig;
+    }
+
+    public void createListenerFromConfig(Object listenerConfig) {
+        logger.info("Starting listener with config: " + listenerConfig);
+        if (deserializer != null) {
+            genericListener = new GenericListener(zkaddress,
+                                                  clusterName,
+                                                  listenerConfig,
+                                                  deserializer);
+        } else {
+            genericListener = new GenericListener(zkaddress,
+                                                  clusterName,
+                                                  listenerConfig);
+        }
+        genericListener.start();
+
+    }
+
+    public Deserializer getDeserializer() {
+        return deserializer;
+    }
+
+    public void setDeserializer(Deserializer deserializer) {
+        this.deserializer = deserializer;
+    }
+
+    public Object listen() {
+        return genericListener.receive();
+    }
+
+    public CommEventCallback getCallbackHandler() {
+        return callbackHandler;
+    }
+
+    public void setCallbackHandler(CommEventCallback callbackHandler) {
+        this.callbackHandler = callbackHandler;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/core/MulticastSender.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/MulticastSender.java b/s4-comm/src/main/java/org/apache/s4/comm/core/MulticastSender.java
new file mode 100644
index 0000000..3b2a72c
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/MulticastSender.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import org.apache.s4.comm.util.IOUtil;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import java.util.Map;
+
+public class MulticastSender {
+
+    Map<String, String> map;
+    private MulticastSocket ms;
+    private InetAddress inetAddress;
+    private int port;
+
+    @SuppressWarnings("unchecked")
+    public MulticastSender(Object senderConfigData) {
+        try {
+            map = (Map<String, String>) senderConfigData;
+            ms = new MulticastSocket();
+            inetAddress = InetAddress.getByName(map.get("multicast.address"));
+            ms.joinGroup(inetAddress);
+            this.port = Integer.parseInt(map.get("multicast.port"));
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * This method will send the data to receivers in a round robin fashion
+     * 
+     * @param data
+     * @return true if data was successfully sent, false otherwise
+     */
+    public boolean send(Object data) {
+        try {
+            byte[] byteBuffer = IOUtil.serializeToBytes(data);
+            DatagramPacket dp = new DatagramPacket(byteBuffer,
+                                                   byteBuffer.length,
+                                                   inetAddress,
+                                                   port);
+            ms.send(dp);
+        } catch (IOException e) {
+            // add retry
+            e.printStackTrace();
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * This will send the data to a specific channel/receiver
+     * 
+     * @param partition
+     * @param data
+     * @return true if data was successfully sent, false otherwise
+     */
+    public boolean send(int partition, Object data) {
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/core/ProcessMonitor.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/ProcessMonitor.java b/s4-comm/src/main/java/org/apache/s4/comm/core/ProcessMonitor.java
new file mode 100644
index 0000000..8331e5b
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/ProcessMonitor.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import java.util.List;
+import java.util.Map;
+
+public interface ProcessMonitor {
+
+    // void setCallbackHandler(CommEventCallback callbackHandler);
+
+    void monitor();
+
+    List<Object> getDestinationList();
+
+    Map<Integer, Object> getDestinationMap();
+
+    int getTaskCount();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/core/SendMode.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/SendMode.java b/s4-comm/src/main/java/org/apache/s4/comm/core/SendMode.java
new file mode 100644
index 0000000..0e186a1
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/SendMode.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+public enum SendMode {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/core/SenderProcess.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/SenderProcess.java b/s4-comm/src/main/java/org/apache/s4/comm/core/SenderProcess.java
new file mode 100644
index 0000000..0334203
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/SenderProcess.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+
+import java.util.Map;
+
+public class SenderProcess {
+    protected final String zkaddress;
+    protected final String adapterClusterName;
+    protected final String s4ClusterName;
+    protected Serializer serializer;
+    protected CommEventCallback callbackHandler;
+
+    public SenderProcess(String zkaddress, String clusterName) {
+        this(zkaddress, clusterName, clusterName);
+    }
+
+    public SenderProcess(String zkaddress, String adapterClusterName,
+            String s4ClusterName) {
+        this.zkaddress = zkaddress;
+        this.adapterClusterName = adapterClusterName;
+        this.s4ClusterName = s4ClusterName;
+    }
+
+    public void setSerializer(Serializer serializer) {
+        this.serializer = serializer;
+    }
+
+    public CommEventCallback getCallbackHandler() {
+        return callbackHandler;
+    }
+
+    public void setCallbackHandler(CommEventCallback callbackHandler) {
+        this.callbackHandler = callbackHandler;
+    }
+
+    protected GenericSender genericSender;
+
+    /**
+     * This will be a blocking call and will wait until it gets a task
+     * 
+     * @return senderConfig object, currently its map
+     */
+
+    public Object acquireTaskAndCreateSender(Map<String, String> map) {
+        TaskManager manager = CommServiceFactory.getTaskManager(zkaddress,
+                                                                adapterClusterName,
+                                                                ClusterType.ADAPTER,
+                                                                callbackHandler);
+        if (callbackHandler != null) {
+            // manager.setCallbackHandler(callbackHandler);
+        }
+        Object senderConfig = manager.acquireTask(map);
+        createSenderFromConfig(senderConfig);
+        return senderConfig;
+    }
+
+    public void createSenderFromConfig(Object senderConfig) {
+        if (serializer != null) {
+            this.genericSender = new GenericSender(zkaddress,
+                                                   adapterClusterName,
+                                                   s4ClusterName,
+                                                   senderConfig,
+                                                   serializer);
+        } else {
+            this.genericSender = new GenericSender(zkaddress,
+                                                   adapterClusterName,
+                                                   s4ClusterName,
+                                                   senderConfig);
+        }
+        if (callbackHandler != null) {
+            this.genericSender.setCallbackHandler(callbackHandler);
+        }
+
+    }
+
+    /**
+     * This method will send the data to receivers in a round robin fashion
+     * 
+     * @param data
+     * @return true if data was successfully sent, false otherwise
+     */
+    public boolean send(Object data) {
+        return genericSender.send(data);
+    }
+
+    /**
+     * This will send the data to a specific channel/receiver/partition
+     * 
+     * @param partition
+     * @param data
+     * @return true if data was successfully sent, false otherwise
+     */
+    public boolean sendToPartition(int partition, Object data) {
+        return genericSender.sendToPartition(partition, data);
+    }
+
+    /**
+     * compute partition using hashcode and send to appropriate partition
+     * 
+     * @param hashcode
+     * @param data
+     * @return true if data was successfully sent, false otherwise
+     */
+
+    public boolean sendUsingHashCode(int hashcode, Object data) {
+        return genericSender.sendUsingHashCode(hashcode, data);
+    }
+
+    /**
+     * Returns the number of partitions on the receiver app side TODO: Currently
+     * it returns the number of tasks on the listener side. It works for now
+     * since numofPartitions=taskCount
+     */
+
+    public int getNumOfPartitions() {
+        return genericSender.getListenerTaskCount();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/core/Serializer.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/Serializer.java b/s4-comm/src/main/java/org/apache/s4/comm/core/Serializer.java
new file mode 100644
index 0000000..1e337e2
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/Serializer.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+public interface Serializer {
+
+    public byte[] serialize(Object obj);
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/core/TaskManager.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/core/TaskManager.java b/s4-comm/src/main/java/org/apache/s4/comm/core/TaskManager.java
new file mode 100644
index 0000000..705e150
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/core/TaskManager.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.core;
+
+import java.util.Map;
+
+public interface TaskManager {
+
+    Object acquireTask(Map<String, String> customTaskData);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/file/StaticProcessMonitor.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/file/StaticProcessMonitor.java b/s4-comm/src/main/java/org/apache/s4/comm/file/StaticProcessMonitor.java
new file mode 100644
index 0000000..5852f63
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/file/StaticProcessMonitor.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.file;
+
+import org.apache.s4.comm.core.ProcessMonitor;
+import org.apache.s4.comm.util.ConfigUtils;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class StaticProcessMonitor implements ProcessMonitor {
+    static Logger logger = Logger.getLogger(StaticProcessMonitor.class);
+    private List<Object> destinationList = new ArrayList<Object>();
+    private Map<Integer, Object> destinationMap = new HashMap<Integer, Object>();
+    private int taskCount;
+    private final String clusterName;
+    private final ClusterType clusterType;
+
+    public StaticProcessMonitor(String address, String clusterName,
+            ClusterType clusterType) {
+        this.clusterName = clusterName;
+        this.clusterType = clusterType;
+    }
+
+    public void monitor() {
+        readConfig();
+    }
+
+    private void readConfig() {
+        List<Map<String, String>> processList = ConfigUtils.readConfig("clusters.xml",
+                                                                             clusterName,
+                                                                             clusterType,
+                                                                             true);
+        for (Map<String, String> processMap : processList) {
+            destinationList.add(processMap);
+            String key = (String) processMap.get("partition");
+            if (key != null) {
+                destinationMap.put(Integer.parseInt(key), processMap);
+            }
+        }
+        taskCount = destinationList.size();
+        logger.info("Destination List: " + destinationList);
+        logger.info("Destination Map: " + destinationMap);
+        logger.info("TaskCount: " + taskCount);
+    }
+
+    public List<Object> getDestinationList() {
+        return destinationList;
+    }
+
+    public Map<Integer, Object> getDestinationMap() {
+        return destinationMap;
+    }
+
+    @Override
+    public int getTaskCount() {
+        return taskCount;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/file/StaticTaskManager.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/file/StaticTaskManager.java b/s4-comm/src/main/java/org/apache/s4/comm/file/StaticTaskManager.java
new file mode 100644
index 0000000..8532fc6
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/file/StaticTaskManager.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.file;
+
+import org.apache.s4.comm.core.CommEventCallback;
+import org.apache.s4.comm.core.CommLayerState;
+import org.apache.s4.comm.core.TaskManager;
+import org.apache.s4.comm.util.ConfigUtils;
+import org.apache.s4.comm.util.SystemUtils;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.InetAddress;
+import java.nio.channels.FileLock;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+
+public class StaticTaskManager implements TaskManager {
+    static Logger logger = Logger.getLogger(StaticTaskManager.class);
+    Set<Map<String, String>> processSet = new HashSet<Map<String, String>>();
+    private final String clusterName;
+    private final ClusterType clusterType;
+
+    /**
+     * Constructor of TaskManager
+     * 
+     * @param address
+     * @param clusterName
+     */
+    public StaticTaskManager(String address, String clusterName,
+            ClusterType clusterType, CommEventCallback callbackHandler) {
+        this.clusterName = clusterName;
+        this.clusterType = clusterType;
+        // read the configuration file
+        readStaticConfig();
+        if (callbackHandler != null) {
+            Map<String, Object> eventData = new HashMap<String, Object>();
+            eventData.put("state", CommLayerState.INITIALIZED);
+            callbackHandler.handleCallback(eventData);
+        }
+    }
+
+    private void readStaticConfig() {
+        // It should be available in classpath
+        List<Map<String, String>> processList = ConfigUtils.readConfig("clusters.xml",
+                                                                             clusterName,
+                                                                             clusterType,
+                                                                             true);
+
+        processSet.addAll(processList);
+    }
+
+    /**
+     * Will clean up taskList Node and process List Node
+     */
+    public boolean cleanUp() {
+        throw new UnsupportedOperationException("cleanUp Not supported in red button mode");
+    }
+
+    /**
+     * Creates task nodes.
+     * 
+     * @param numTasks
+     * @param data
+     */
+    public void setUpTasks(int numTasks, Object[] data) {
+        throw new UnsupportedOperationException("setUpTasks Not supported in red button mode");
+    }
+
+    /**
+     * This will block the process thread from starting the task, when it is
+     * unblocked it will return the data stored in the task node. This data can
+     * be used by the This call assumes that the tasks are already set up
+     * 
+     * @return Object containing data related to the task
+     */
+    @Override
+    public Object acquireTask(Map<String, String> customTaskData) {
+        while (true) {
+            try {
+                for (Map<String, String> processConfig : processSet) {
+                    boolean processAvailable = canTakeupProcess(processConfig);
+                    logger.info("processAvailable:" + processAvailable);
+                    if (processAvailable) {
+                        boolean success = takeProcess(processConfig);
+                        logger.info("Acquire task:"
+                                + ((success) ? "Success" : "failure"));
+                        if (success) {
+                            return processConfig;
+                        }
+                    }
+                }
+                Thread.sleep(5000);
+            } catch (Exception e) {
+                logger.error("Exception in acquireTask Method:"
+                        + customTaskData, e);
+            }
+        }
+    }
+
+    private boolean takeProcess(Map<String, String> processConfig) {
+        File lockFile = null;
+        try {
+            // TODO:contruct from processConfig
+            String lockFileName = createLockFileName(processConfig);
+            lockFile = new File(lockFileName);
+            if (!lockFile.exists()) {
+                FileOutputStream fos = new FileOutputStream(lockFile);
+                FileLock fl = fos.getChannel().tryLock();
+                if (fl != null) {
+                    String message = "Task acquired by PID:"
+                            + SystemUtils.getPID() + " HOST:"
+                            + InetAddress.getLocalHost().getHostName();
+                    fos.write(message.getBytes());
+                    fos.close();
+                    logger.info(message + "  Lock File location: "
+                            + lockFile.getAbsolutePath());
+                    return true;
+                }
+            }
+        } catch (Exception e) {
+            logger.error("Exception trying to take up process:" + processConfig,
+                         e);
+        } finally {
+            if (lockFile != null) {
+                lockFile.deleteOnExit();
+            }
+        }
+        return false;
+    }
+
+    private String createLockFileName(Map<String, String> processConfig) {
+        String lockDir = System.getProperty("lock_dir");
+        String lockFileName = clusterName + processConfig.get("ID");
+        if (lockDir != null && lockDir.trim().length() > 0) {
+            File file = new File(lockDir);
+            if (!file.exists()) {
+                file.mkdirs();
+            }
+            return lockDir + "/" + lockFileName;
+        } else {
+            return lockFileName;
+        }
+    }
+
+    private boolean canTakeupProcess(Map<String, String> processConfig) {
+        String host = processConfig.get("process.host");
+        try {
+            InetAddress inetAddress = InetAddress.getByName(host);
+            logger.info("Host Name: "
+                    + InetAddress.getLocalHost().getCanonicalHostName());
+            if (!host.equals("localhost")) {
+                if (!InetAddress.getLocalHost().equals(inetAddress)) {
+                    return false;
+                }
+            }
+        } catch (Exception e) {
+            logger.error("Invalid host:" + host);
+            return false;
+        }
+        String lockFileName = createLockFileName(processConfig);
+        File lockFile = new File(lockFileName);
+        if (!lockFile.exists()) {
+            return true;
+        } else {
+            logger.info("Process taken up by another process lockFile:"
+                    + lockFileName);
+        }
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/test/ProcessMonitorTest.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/test/ProcessMonitorTest.java b/s4-comm/src/main/java/org/apache/s4/comm/test/ProcessMonitorTest.java
new file mode 100644
index 0000000..7780a3e
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/test/ProcessMonitorTest.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.test;
+
+import org.apache.s4.comm.file.StaticProcessMonitor;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+import org.apache.s4.comm.zk.ZkTaskSetup;
+import org.apache.s4.comm.zk.ZkTaskManager;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ProcessMonitorTest {
+    public static void main(String[] args) throws Exception {
+        // testZkProcessMonitor(args);
+        testStaticProcessMonitor(args);
+        Thread.sleep(10000);
+    }
+
+    private static void testStaticProcessMonitor(String[] args) {
+        String address = null;
+        address = "localhost:2181";
+        StaticProcessMonitor monitor = new StaticProcessMonitor(address,
+                                                                "taskmanagerTest",
+                                                                ClusterType.S4);
+        monitor.monitor();
+        System.out.println(monitor.getDestinationList());
+        System.out.println(monitor.getDestinationMap());
+    }
+
+    private static void testZkProcessMonitor(String[] args) {
+        System.out.println("Hereh");
+        // "effortfell.greatamerica.corp.yahoo.com:2181"
+        String address = args[0];
+        address = "localhost:2181";
+        String processName = args[1];
+        ZkTaskSetup zkTaskSetup = new ZkTaskSetup(address,
+                                                        "/taskmanagerTest",
+                                                        ClusterType.S4);
+        zkTaskSetup.cleanUp();
+        zkTaskSetup.setUpTasks("1.0.0.", new String[] { "task0", "task1" });
+        Object obj;
+        System.out.println(processName + " Going to Wait for a task");
+        HashMap<String, String> map = new HashMap<String, String>();
+        ZkTaskManager taskManager = new ZkTaskManager(address,
+                                                      "/taskmanagerTest",
+                                                      ClusterType.S4);
+        obj = taskManager.acquireTask(map);
+        System.out.println(processName + "taking up task: " + obj);
+        File f = new File("c:/" + obj + ".file");
+        f.delete();
+        while (true) {
+            if (f.exists()) {
+                break;
+            }
+            System.out.println(processName + " processing task: " + obj);
+            try {
+                Thread.sleep(10000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+        System.out.println("Exiting task:" + obj);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java b/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java
new file mode 100644
index 0000000..0ea56a2
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.test;
+
+import org.apache.s4.comm.core.TaskManager;
+import org.apache.s4.comm.file.StaticTaskManager;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+import org.apache.s4.comm.zk.ZkTaskSetup;
+import org.apache.s4.comm.zk.ZkTaskManager;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TaskManagerTest {
+    public static void main(String[] args) throws Exception {
+        // testZkTaskManager(args);
+        testStaticTaskManager(args);
+        Thread.sleep(10000);
+    }
+
+    private static void testStaticTaskManager(String[] args) {
+        String address = null;
+        address = "localhost:2181";
+        TaskManager taskManager = new StaticTaskManager(address,
+                                                        "taskmanagerTest",
+                                                        ClusterType.S4,
+                                                        null);
+        Map<String, String> customTaskData = new HashMap<String, String>();
+        Object acquireTask = taskManager.acquireTask(customTaskData);
+        System.out.println("Acuired Task:" + acquireTask);
+
+    }
+
+    private static void testZkTaskManager(String[] args) {
+        System.out.println("Here");
+        // "effortfell.greatamerica.corp.yahoo.com:2181"
+        String address = args[0];
+        address = "localhost:2181";
+        String processName = args[1];
+        ZkTaskSetup taskSetup = new ZkTaskSetup(address,
+                                                      "/taskmanagerTest",
+                                                      ClusterType.S4);
+        taskSetup.cleanUp();
+        taskSetup.setUpTasks("1.0.0.0", new String[] { "task0", "task1" });
+        Object obj;
+        System.out.println(processName + " Going to Wait for a task");
+        HashMap<String, String> map = new HashMap<String, String>();
+        ZkTaskManager taskManager = new ZkTaskManager(address,
+                                                      "/taskmanagerTest",
+                                                      ClusterType.S4);
+        obj = taskManager.acquireTask(map);
+        System.out.println(processName + "taking up task: " + obj);
+        File f = new File("c:/" + obj + ".file");
+        f.delete();
+        while (true) {
+            if (f.exists()) {
+                break;
+            }
+            System.out.println(processName + " processing task: " + obj);
+            try {
+                Thread.sleep(10000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+        System.out.println("Exiting task:" + obj);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java b/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java
new file mode 100644
index 0000000..d0c80c0
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.test;
+
+import org.apache.s4.comm.util.CommUtil;
+import org.apache.s4.comm.util.JSONUtil;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+import org.apache.s4.comm.zk.ZkTaskSetup;
+import org.apache.s4.comm.zk.ZkTaskManager;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+public class TestTaskSetupApp {
+
+    public static void main(String[] args) throws Exception {
+        new TestTaskSetupApp().testTaskSetup1();
+    }
+
+    // test the case
+    public void testTaskSetup1() throws Exception {
+        String address = "effortfell.greatamerica.corp.yahoo.com:2181";
+        Watcher watcher = new Watcher() {
+
+            @Override
+            public void process(WatchedEvent event) {
+
+            }
+
+        };
+        // setup
+        ZooKeeper zk = new ZooKeeper(address, 30000, watcher);
+        String root = "/tasksetup_app_test";
+        ZkTaskSetup zkSetup = new ZkTaskSetup(address, root, ClusterType.S4);
+        Map<String, String> task1 = new HashMap<String, String>();
+        task1.put("name", "task-1");
+
+        Map<String, String> task2 = new HashMap<String, String>();
+        task2.put("name", "task-2");
+        String tasksListRoot = root + "/tasks";
+        zkSetup.cleanUp();
+        Stat exists = zk.exists(tasksListRoot, false);
+        myassert(exists == null);
+        Object[] data = new Object[] { task1, task2 };
+        zkSetup.setUpTasks(data);
+
+        // verify that tasks are created
+        exists = zk.exists(tasksListRoot, false);
+        myassert(exists != null);
+        List<String> children = zk.getChildren(tasksListRoot, false);
+        myassert(children.size() == data.length);
+        boolean[] matched = new boolean[data.length];
+        for (String child : children) {
+            System.out.println(child);
+            String childPath = tasksListRoot + "/" + child;
+            Stat sTemp = zk.exists(childPath, false);
+            byte[] tempData = zk.getData(tasksListRoot + "/" + child,
+                                         false,
+                                         sTemp);
+            Map<String, Object> map = (Map<String, Object>) JSONUtil.getMapFromJson(new String(tempData));
+            // check if it matches any of the data
+            for (int i = 0; i < data.length; i++) {
+                Map<String, Object> newData = (Map<String, Object>) data[i];
+                if (!matched[i] && CommUtil.compareMaps(newData, map)) {
+                    matched[i] = true;
+                    break;
+                }
+            }
+        }
+        for (int i = 0; i < matched.length; i++) {
+            myassert(matched[i]);
+        }
+
+        // try running again and make verify new node is not created
+        Stat oldStat = zk.exists(tasksListRoot, false);
+        System.out.println("oldStat=" + oldStat);
+        zkSetup.setUpTasks(data);
+        Stat newStat = zk.exists(tasksListRoot, false);
+        System.out.println("newstat=" + newStat);
+        myassert(oldStat.getMtime() == newStat.getMtime());
+
+        // make change to task config and try running again and verify new
+        // config is uploaded
+        oldStat = zk.exists(tasksListRoot, false);
+        System.out.println("oldStat=" + oldStat.getVersion());
+        ((Map<String, String>) data[data.length - 1]).put("name", "changedname");
+        zkSetup.setUpTasks(data);
+        newStat = zk.exists(tasksListRoot, false);
+        System.out.println("newstat=" + newStat.getVersion());
+        System.out.println();
+        myassert(oldStat.getMtime() != newStat.getMtime());
+
+        // ensure version change is working
+        zkSetup.setUpTasks("1.0.0.0", data);
+    }
+
+    private void myassert(boolean b) {
+        if (!b) {
+            throw new AssertionError();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/test/ZkQueueTest.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/test/ZkQueueTest.java b/s4-comm/src/main/java/org/apache/s4/comm/test/ZkQueueTest.java
new file mode 100644
index 0000000..c9cc826
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/test/ZkQueueTest.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.test;
+
+import org.apache.s4.comm.zk.ZkQueue;
+
+import org.apache.zookeeper.KeeperException;
+
+public class ZkQueueTest {
+    public static void main(String args[]) {
+        ZkQueue q = new ZkQueue(args[0], "/app1");
+
+        System.out.println("Input: " + args[0]);
+        int i;
+        Integer max = new Integer(args[1]);
+
+        if (args[2].equals("p")) {
+            System.out.println("Producer");
+            for (i = 0; i < max; i++)
+                try {
+                    q.produce(new Integer(10 + i));
+                } catch (KeeperException e) {
+
+                } catch (InterruptedException e) {
+
+                }
+        } else {
+            System.out.println("Consumer");
+
+            for (i = 0; i < max || true; i++) {
+                try {
+                    Integer r = (Integer) q.consume();
+                    System.out.println("Item: " + r);
+                } catch (KeeperException e) {
+                    e.printStackTrace();
+                    i--;
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java b/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java
new file mode 100644
index 0000000..df214c3
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.tools;
+
+import org.apache.s4.comm.util.ConfigUtils;
+import org.apache.s4.comm.util.ConfigParser;
+import org.apache.s4.comm.util.ConfigParser.Cluster;
+import org.apache.s4.comm.util.ConfigParser.Config;
+import org.apache.s4.comm.zk.ZkTaskSetup;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class will set up initial tasks on the zookeeper USAGE: java AppTask
+ * [clean] setup config.xml
+ * 
+ * @author kishoreg
+ * 
+ */
+public class TaskSetupApp {
+    public static void main(String[] args) {
+        String zkAddress = "";
+        boolean clean = false;
+        boolean setup = false;
+        String setupXml = null;
+        for (int i = 0; i < args.length; i++) {
+            if (i == 0) {
+                zkAddress = args[0];
+            }
+            if (args[i].equals("clean")) {
+                clean = true;
+            } else if (args[i].equals("setup")) {
+                setup = true;
+            } else if (i == args.length - 1) {
+                setupXml = args[i];
+            }
+        }
+        if (setupXml == null || !new File(setupXml).exists()) {
+            printusage("Set up xml: " + setupXml + " does not exist");
+        }
+        if (!setup && !clean) {
+            System.err.println("Invalid usage.");
+            printusage("Must specify at least one of of clean, setup.");
+        }
+        doMain(zkAddress, clean, setup, setupXml);
+    }
+
+    private static void printusage(String message) {
+        System.err.println(message);
+        System.err.println("java TaskSetupApp <zk_address> [clean|setup] setup_config_xml");
+        System.exit(1);
+    }
+
+    private static void doMain(String zkAddress, boolean clean, boolean setup, String setupXml) {
+        ConfigParser parser = new ConfigParser();
+        Config config = parser.parse(setupXml);
+        for (Cluster cluster : config.getClusters()) {
+            processCluster(clean, zkAddress, cluster, config.getVersion());
+        }
+    }
+
+    private static void processCluster(boolean clean, String zkAddress, Cluster cluster, String version) {
+        List<Map<String,String>> clusterInfo = ConfigUtils.readConfig(cluster, cluster.getName(), cluster.getType(), false);
+        ZkTaskSetup zkSetup = new ZkTaskSetup(zkAddress, cluster.getName(), cluster.getType());
+        if (clean) {
+            zkSetup.cleanUp();
+        }
+        
+        zkSetup.setUpTasks(version, clusterInfo.toArray());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java b/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java
new file mode 100644
index 0000000..e440f10
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.util;
+
+import java.util.Map;
+
+public class CommUtil {
+
+    public static boolean compareMaps(Map<String, Object> map1, Map<String, Object> map2) {
+        boolean equals = true;
+        if (map1.size() == map2.size()) {
+            for (String key : map1.keySet()) {
+                if (!(map2.containsKey(key) && map1.get(key)
+                                                   .equals(map2.get(key)))) {
+                    equals = false;
+                    break;
+                }
+            }
+        } else {
+            equals = false;
+        }
+        return equals;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/util/ConfigParser.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/util/ConfigParser.java b/s4-comm/src/main/java/org/apache/s4/comm/util/ConfigParser.java
new file mode 100644
index 0000000..8fabf89
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/util/ConfigParser.java
@@ -0,0 +1,439 @@
+package org.apache.s4.comm.util;
+
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilderFactory;
+
+import org.apache.log4j.Logger;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+import org.xml.sax.SAXParseException;
+
+public class ConfigParser {
+	private static Logger logger =  Logger.getLogger(ConfigParser.class);
+	
+	public ConfigParser() {
+	}
+	
+	public Config parse(String configFilename) {
+		Config config = null;
+
+		Document document = createDocument(configFilename);
+		NodeList topLevelNodeList = document.getChildNodes();
+		for (int i = 0; i < topLevelNodeList.getLength(); i++) {
+			Node node = topLevelNodeList.item(i);
+			if (node.getNodeType() == Node.ELEMENT_NODE && node.getNodeName().equals("config")) {
+				config = processConfigElement(node);
+			}
+		}
+		verifyConfig(config);
+		return config;
+	}
+	
+	private void verifyConfig(Config config) {
+		if (config.getClusters().size() == 0) {
+			throw new VerifyError("No clusters specified");
+		}
+		
+		for (Cluster cluster : config.getClusters()) {
+			verifyCluster(cluster);
+			
+		}
+	}
+	
+	public void verifyCluster(Cluster cluster) {
+		if (cluster.getNodes().size() == 0) {
+			throw new VerifyError("No nodes in cluster " + cluster.getName());
+		}
+		
+		Set<String> taskSet = new HashSet<String>();
+		for (ClusterNode node : cluster.getNodes()) {
+			if (taskSet.contains(node.getTaskId())) {
+				throw new VerifyError("Duplicate task id " + node.getTaskId());
+			}
+			if (node.getTaskId() == null) {
+				throw new VerifyError("Missing task id");
+			}
+			taskSet.add(node.getTaskId());			
+		}
+		
+		if (cluster.getType().equals(ClusterType.S4)) {
+			verifyS4Cluster(cluster);
+		}
+		else {
+			verifyAdapterCluster(cluster);
+		}
+	}
+	
+	public void verifyS4Cluster(Cluster cluster) {
+		/*
+		 * rules:
+		 * 1)	if any node has a partition id,
+		 * 		a)	all must have partition ids
+		 * 		b)	the partition ids must be 0-n, where n is the number of nodes
+		 * 			minus 1
+		*/
+		int nodeCount = cluster.getNodes().size();
+		Set<Integer> idSet = new HashSet<Integer>();
+		for (ClusterNode node : cluster.getNodes()) {			
+			int partitionId = node.getPartition();
+			if (partitionId == -1) {
+				throw new VerifyError("No partition specified on node " + node.getTaskId());
+			}
+			if (partitionId < 0 || partitionId > (nodeCount-1)) {
+				throw new VerifyError("Bad partition specified " + partitionId);
+			}
+			if (idSet.contains(new Integer(partitionId))) {
+				throw new VerifyError("Duplicate partition in cluster: " + partitionId);
+			}
+			idSet.add(partitionId);
+			
+			if (node.getPort() == -1) {
+				throw new VerifyError("Missing port number on node " + node.getTaskId());
+			}
+		}
+		
+		if (idSet.size() != nodeCount && idSet.size() != 0) {
+			throw new VerifyError("Bad partition ids in cluster " + idSet);
+		}		
+	}
+	
+	public void verifyAdapterCluster(Cluster cluster) {
+		for (ClusterNode node : cluster.getNodes()) {			
+			if (node.getPartition() != -1) {
+				throw new VerifyError("Cannot specify partition for adapter node");
+			}
+		}
+	}
+	
+	public Config processConfigElement(Node configElement) {
+	    String version = ((Element)configElement).getAttribute("version");
+	    if (version == null ||     version.length() > 0) {
+	        version = "-1";
+	    }
+	    
+		NodeList nodeList = configElement.getChildNodes();
+		
+		Config config = new Config(version);
+		for (int i = 0; i < nodeList.getLength(); i++) {
+			Node node = nodeList.item(i);
+			if (node.getNodeType() == Node.ELEMENT_NODE && node.getNodeName().equals("cluster")) {
+				config.addCluster(processClusterElement(node));
+			}
+		}
+		
+		return config;
+	}
+	
+	public Cluster processClusterElement(Node clusterElement) {
+		Cluster cluster = new Cluster();
+		 
+		String mode = ((Element)clusterElement).getAttribute("mode");
+		if (mode != null) {
+			cluster.setMode(mode);
+		}
+		String name = ((Element)clusterElement).getAttribute("name");
+		if (name != null) {
+			cluster.setName(name);
+		}
+		String typeString = ((Element)clusterElement).getAttribute("type");
+		if (typeString != null) {
+			if (typeString.equals("adapter")) {
+				cluster.setType(ClusterType.ADAPTER);
+			}
+			else if (typeString.equals("s4")) {
+				cluster.setType(ClusterType.S4);
+			}			
+		}
+		
+		NodeList nodeList = clusterElement.getChildNodes();
+		for (int i = 0; i < nodeList.getLength(); i++) {
+			Node node = nodeList.item(i);
+			if (node.getNodeType() == Node.ELEMENT_NODE && node.getNodeName().equals("node")) {
+				cluster.addNode(processClusterNodeElement(node));
+			}
+		}
+		return cluster;
+	}
+
+	public ClusterNode processClusterNodeElement(Node clusterNodeElement) {
+		int partition = -1;
+		int port = 0;
+		String machineName = null;
+		String taskId = null;
+		
+		NodeList nodeList = clusterNodeElement.getChildNodes();
+		for (int i = 0; i < nodeList.getLength(); i++) {
+			Node node = nodeList.item(i);
+			
+			if (node.getNodeType() != Node.ELEMENT_NODE) {
+				continue;
+			}
+			
+			if (node.getNodeName().equals("partition")) {
+				try {
+					partition = Integer.parseInt(getElementContentText(node));
+					
+				}
+				catch (NumberFormatException nfe) {
+					throw new VerifyError("Bad partition specified " + getElementContentText(node));
+				}
+			}
+			else if (node.getNodeName().equals("port")) {
+				try {
+					port = Integer.parseInt(getElementContentText(node));
+					
+				}
+				catch (NumberFormatException nfe) {
+					throw new VerifyError("Bad port specified " + getElementContentText(node));
+				}
+			}
+			else if (node.getNodeName().equals("machine")) {
+				machineName = getElementContentText(node);
+			}
+			else if (node.getNodeName().equals("taskId")) {
+				taskId = getElementContentText(node);
+			}
+		}
+		
+		return new ClusterNode(partition, port, machineName, taskId);
+	}
+	
+	private static Document createDocument(String configFilename) {
+		try {
+			Document document;
+			// Get a JAXP parser factory object
+			javax.xml.parsers.DocumentBuilderFactory dbf = DocumentBuilderFactory
+					.newInstance();
+			// Tell the factory what kind of parser we want
+			dbf.setValidating(false);
+			dbf.setIgnoringComments(true);
+			dbf.setIgnoringElementContentWhitespace(true);
+			// Use the factory to get a JAXP parser object
+			javax.xml.parsers.DocumentBuilder parser = dbf.newDocumentBuilder();
+
+			// Tell the parser how to handle errors. Note that in the JAXP API,
+			// DOM parsers rely on the SAX API for error handling
+			parser.setErrorHandler(new org.xml.sax.ErrorHandler() {
+				public void warning(SAXParseException e) {
+					logger.warn("WARNING: " + e.getMessage(), e);
+				}
+
+				public void error(SAXParseException e) {
+					logger.error("ERROR: " + e.getMessage(),e);
+				}
+
+				public void fatalError(SAXParseException e) throws SAXException {
+					logger.error("FATAL ERROR: " + e.getMessage(), e);
+					throw e; // re-throw the error
+				}
+			});
+
+			// Finally, use the JAXP parser to parse the file. This call returns
+			// A Document object. Now that we have this object, the rest of this
+			// class uses the DOM API to work with it; JAXP is no longer
+			// required.
+			InputStream is = getResourceStream(configFilename);
+			if(is == null){
+				throw new RuntimeException("Unable to find config file:"+ configFilename);
+			}
+			document = parser.parse(is);
+			return document;
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+	
+	public String getElementContentText(Node node) {
+		if (node.getNodeType() != Node.ELEMENT_NODE) {
+			return "";
+		}
+		NodeList children = node.getChildNodes();
+		for (int i = 0; i < children.getLength(); i++) {
+			Node child = children.item(i);
+			if (child.getNodeType() == Node.TEXT_NODE) {
+				return child.getNodeValue();
+			}
+		}
+		
+		return "";
+	}
+	
+	public static void main(String[] args) {
+		ConfigParser parser = new ConfigParser();
+		Config config = parser.parse(args[0]);
+		System.out.println(config);
+	}
+
+	private static InputStream getResourceStream(String configfile) {
+		try {
+			File f = new File(configfile);
+			if (f.exists()) {
+				if (f.isFile()) {
+					return new FileInputStream(configfile);
+				} else {
+					throw new RuntimeException("configFile " + configfile
+							+ "  is not a regular file:");
+				}
+			}
+			InputStream is = Thread.currentThread().getContextClassLoader()
+					.getResourceAsStream(configfile);
+			if (is != null) {
+				return is;
+			}
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+		return null;
+	}
+	
+	static public class Config {
+		List<Cluster> clusters = new ArrayList<Cluster>();
+		private String version = "-1";
+		
+		public String getVersion() {
+            return version;
+        }
+
+        public void setVersion(String version) {
+            this.version = version;
+        }
+
+        public Config(String version) {
+		    this.version = version;
+		}
+		
+		public Config() {
+		}
+		
+		public void addCluster(Cluster cluster) {
+			clusters.add(cluster);
+		}
+
+		public List<Cluster> getClusters() {
+			return Collections.unmodifiableList(clusters);
+		}
+		
+		public String toString() {
+			return "{version="+version+",clusters="+clusters+"}";
+		}
+	}
+	
+	static public class Cluster {
+		public enum ClusterType {
+			S4("s4"),
+			ADAPTER("adapter");
+			
+			private final String clusterTypeString;
+			
+			private ClusterType(String eventShortName){
+				this.clusterTypeString = eventShortName;
+			}
+			
+		    public String toString() {
+		        return clusterTypeString;
+		    }			
+		}
+		
+		List<ConfigParser.ClusterNode> nodes = new ArrayList<ConfigParser.ClusterNode>();
+		String mode = "unicast";
+		String name = "unknown";
+		ClusterType type = ClusterType.S4;
+		
+		public void addNode(ConfigParser.ClusterNode node) {
+			nodes.add(node);
+		}
+		
+		public List<ConfigParser.ClusterNode> getNodes() {
+			return Collections.unmodifiableList(nodes);
+		}
+
+		public String getMode() {
+			return mode;
+		}
+
+		public void setMode(String mode) {
+			this.mode = mode;
+		}
+		
+		public String getName() {
+			return name;
+		}
+
+		public void setName(String name) {
+			this.name = name;
+		}
+
+		public ClusterType getType() {
+			return type;
+		}
+
+		public void setType(ClusterType type) {
+			this.type = type;
+		}
+
+		public String toString() {
+			StringBuffer sb = new StringBuffer();
+			sb.append("{name=").append(name).
+				append(",mode=").append(mode).
+				append(",type=").append(type).
+				append(",nodes=").append(nodes).append("}");
+			return sb.toString();
+		}
+		
+	}
+	
+	static public class ClusterNode {
+		private int partition;
+		private int port;
+		private String machineName;
+		private String taskId;
+		
+		public ClusterNode(int partition, int port, String machineName, String taskId) {
+			this.partition = partition;
+			this.port = port;
+			this.machineName = machineName;
+			this.taskId = taskId;
+		}
+		
+		public int getPartition() {
+			return partition;
+		}
+		public int getPort() {
+			return port;
+		}
+		public String getMachineName() {
+			return machineName;
+		}
+		public String getTaskId() {
+			return taskId;
+		}
+		
+		public String toString() {
+			StringBuffer sb = new StringBuffer();
+			sb.append("{").append("partition=").append(partition).
+				append(",port=").append(port).
+				append(",machineName=").append(machineName).
+				append(",taskId=").append(taskId).append("}");
+			return sb.toString();
+		}
+	}
+	
+	public class VerifyError extends RuntimeException {
+		public VerifyError(String message) {
+			super(message);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/util/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/util/ConfigUtils.java b/s4-comm/src/main/java/org/apache/s4/comm/util/ConfigUtils.java
new file mode 100644
index 0000000..5550f8c
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/util/ConfigUtils.java
@@ -0,0 +1,58 @@
+package org.apache.s4.comm.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.s4.comm.util.ConfigParser.Cluster;
+import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType;
+import org.apache.s4.comm.util.ConfigParser.ClusterNode;
+
+public class ConfigUtils {
+    public static List<Map<String, String>> readConfig(String configFilename,
+            String clusterName, ClusterType clusterType, boolean isStatic) {
+        ConfigParser parser = new ConfigParser();
+        ConfigParser.Config config = parser.parse(configFilename);
+
+        // find the requested cluster
+        Cluster cluster = null;
+        for (Cluster checkCluster : config.getClusters()) {
+            if (checkCluster.getName().equals(clusterName)
+                    && checkCluster.getType().equals(clusterType)) {
+                cluster = checkCluster;
+                break;
+            }
+        }
+        if (cluster == null) {
+            throw new RuntimeException("Cluster " + clusterName + " of type "
+                    + clusterType + " not configured");
+        }
+        return readConfig(cluster, clusterName, clusterType, isStatic);
+    }
+
+    public static List<Map<String, String>> readConfig(Cluster cluster,
+            String clusterName, ClusterType clusterType, boolean isStatic) {
+
+        List<Map<String, String>> processSet = new ArrayList<Map<String, String>>();
+        for (ClusterNode node : cluster.getNodes()) {
+            Map<String, String> nodeInfo = new HashMap<String, String>();
+            if (node.getPartition() != -1) {
+                nodeInfo.put("partition", String.valueOf(node.getPartition()));
+            }
+            if (node.getPort() != -1) {
+                nodeInfo.put("port", String.valueOf(node.getPort()));
+            }
+            nodeInfo.put("cluster.type", String.valueOf(clusterType));
+            nodeInfo.put("cluster.name", clusterName);
+            if (isStatic) {
+                nodeInfo.put("address", node.getMachineName());
+                nodeInfo.put("process.host", node.getMachineName());
+            }
+            nodeInfo.put("mode", cluster.getMode());
+            nodeInfo.put("ID", node.getTaskId());
+            processSet.add(nodeInfo);
+        }
+        return processSet;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-comm/src/main/java/org/apache/s4/comm/util/IOUtil.java
----------------------------------------------------------------------
diff --git a/s4-comm/src/main/java/org/apache/s4/comm/util/IOUtil.java b/s4-comm/src/main/java/org/apache/s4/comm/util/IOUtil.java
new file mode 100644
index 0000000..2687023
--- /dev/null
+++ b/s4-comm/src/main/java/org/apache/s4/comm/util/IOUtil.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.comm.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+public class IOUtil {
+
+    public static void save(Object obj, String path) throws Exception {
+        File f = new File(path);
+        FileOutputStream fos = new FileOutputStream(f);
+        ObjectOutputStream objectOutputStream = new ObjectOutputStream(fos);
+        objectOutputStream.writeObject(obj);
+        objectOutputStream.close();
+    }
+
+    public static Object read(String path) throws Exception {
+        File f = new File(path);
+        FileInputStream fos = new FileInputStream(f);
+        ObjectInputStream objectInputStream = new ObjectInputStream(fos);
+        Object readObject = objectInputStream.readObject();
+        objectInputStream.close();
+        return readObject;
+    }
+
+    public static byte[] serializeToBytes(Object obj) {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos);
+            objectOutputStream.writeObject(obj);
+            objectOutputStream.close();
+            return bos.toByteArray();
+        } catch (Exception e) {
+            throw new RuntimeException("Exception trying to serialize to bytes, obj="
+                                               + obj,
+                                       e);
+        }
+    }
+
+    public static Object deserializeToObject(byte[] bytes) {
+        try {
+            ByteArrayInputStream bos = new ByteArrayInputStream(bytes);
+            ObjectInputStream objectInputStream = new ObjectInputStream(bos);
+            Object readObject = objectInputStream.readObject();
+            objectInputStream.close();
+            return readObject;
+        } catch (Exception e) {
+            throw new RuntimeException("Exception trying to deserialize bytes to obj, bytes="
+                                               + new String(bytes),
+                                       e);
+        }
+
+    }
+
+}


Mime
View raw message