From s4-commits-return-12-apmail-incubator-s4-commits-archive=incubator.apache.org@incubator.apache.org Tue Jan 3 11:20:25 2012 Return-Path: X-Original-To: apmail-incubator-s4-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-s4-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 984F99DCC for ; Tue, 3 Jan 2012 11:20:25 +0000 (UTC) Received: (qmail 30323 invoked by uid 500); 3 Jan 2012 11:20:25 -0000 Delivered-To: apmail-incubator-s4-commits-archive@incubator.apache.org Received: (qmail 30303 invoked by uid 500); 3 Jan 2012 11:20:25 -0000 Mailing-List: contact s4-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: s4-dev@incubator.apache.org Delivered-To: mailing list s4-commits@incubator.apache.org Received: (qmail 30296 invoked by uid 99); 3 Jan 2012 11:20:25 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Jan 2012 11:20:25 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.114] (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Jan 2012 11:20:20 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 012483151F7; Tue, 3 Jan 2012 11:19:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mmorel@apache.org To: s4-commits@incubator.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [9/50] [abbrv] Remove troublesome carriage return Message-Id: <20120103111915.012483151F7@tyr.zones.apache.org> Date: Tue, 3 Jan 2012 11:19:14 +0000 (UTC) http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java b/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java index 0ea56a2..4405dd7 100644 --- a/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java +++ b/s4-comm/src/main/java/org/apache/s4/comm/test/TaskManagerTest.java @@ -1,82 +1,82 @@ -/* - * Copyright (c) 2010 Yahoo! Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific - * language governing permissions and limitations under the - * License. See accompanying LICENSE file. - */ -package org.apache.s4.comm.test; - -import org.apache.s4.comm.core.TaskManager; -import org.apache.s4.comm.file.StaticTaskManager; -import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType; -import org.apache.s4.comm.zk.ZkTaskSetup; -import org.apache.s4.comm.zk.ZkTaskManager; - -import java.io.File; -import java.util.HashMap; -import java.util.Map; - -public class TaskManagerTest { - public static void main(String[] args) throws Exception { - // testZkTaskManager(args); - testStaticTaskManager(args); - Thread.sleep(10000); - } - - private static void testStaticTaskManager(String[] args) { - String address = null; - address = "localhost:2181"; - TaskManager taskManager = new StaticTaskManager(address, - "taskmanagerTest", - ClusterType.S4, - null); - Map customTaskData = new HashMap(); - Object acquireTask = taskManager.acquireTask(customTaskData); - System.out.println("Acuired Task:" + acquireTask); - - } - - private static void testZkTaskManager(String[] args) { - System.out.println("Here"); - // "effortfell.greatamerica.corp.yahoo.com:2181" - String address = args[0]; - address = "localhost:2181"; - String processName = args[1]; - ZkTaskSetup taskSetup = new ZkTaskSetup(address, - "/taskmanagerTest", - ClusterType.S4); - taskSetup.cleanUp(); - taskSetup.setUpTasks("1.0.0.0", new String[] { "task0", "task1" }); - Object obj; - System.out.println(processName + " Going to Wait for a task"); - HashMap map = new HashMap(); - ZkTaskManager taskManager = new ZkTaskManager(address, - "/taskmanagerTest", - ClusterType.S4); - obj = taskManager.acquireTask(map); - System.out.println(processName + "taking up task: " + obj); - File f = new File("c:/" + obj + ".file"); - f.delete(); - while (true) { - if (f.exists()) { - break; - } - System.out.println(processName + " processing task: " + obj); - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - System.out.println("Exiting task:" + obj); - } -} +/* + * Copyright (c) 2010 Yahoo! Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the + * License. See accompanying LICENSE file. + */ +package org.apache.s4.comm.test; + +import org.apache.s4.comm.core.TaskManager; +import org.apache.s4.comm.file.StaticTaskManager; +import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType; +import org.apache.s4.comm.zk.ZkTaskSetup; +import org.apache.s4.comm.zk.ZkTaskManager; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +public class TaskManagerTest { + public static void main(String[] args) throws Exception { + // testZkTaskManager(args); + testStaticTaskManager(args); + Thread.sleep(10000); + } + + private static void testStaticTaskManager(String[] args) { + String address = null; + address = "localhost:2181"; + TaskManager taskManager = new StaticTaskManager(address, + "taskmanagerTest", + ClusterType.S4, + null); + Map customTaskData = new HashMap(); + Object acquireTask = taskManager.acquireTask(customTaskData); + System.out.println("Acuired Task:" + acquireTask); + + } + + private static void testZkTaskManager(String[] args) { + System.out.println("Here"); + // "effortfell.greatamerica.corp.yahoo.com:2181" + String address = args[0]; + address = "localhost:2181"; + String processName = args[1]; + ZkTaskSetup taskSetup = new ZkTaskSetup(address, + "/taskmanagerTest", + ClusterType.S4); + taskSetup.cleanUp(); + taskSetup.setUpTasks("1.0.0.0", new String[] { "task0", "task1" }); + Object obj; + System.out.println(processName + " Going to Wait for a task"); + HashMap map = new HashMap(); + ZkTaskManager taskManager = new ZkTaskManager(address, + "/taskmanagerTest", + ClusterType.S4); + obj = taskManager.acquireTask(map); + System.out.println(processName + "taking up task: " + obj); + File f = new File("c:/" + obj + ".file"); + f.delete(); + while (true) { + if (f.exists()) { + break; + } + System.out.println(processName + " processing task: " + obj); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + System.out.println("Exiting task:" + obj); + } +} http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java ---------------------------------------------------------------------- diff --git a/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java b/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java index d0c80c0..9cac542 100644 --- a/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java +++ b/s4-comm/src/main/java/org/apache/s4/comm/test/TestTaskSetupApp.java @@ -1,122 +1,122 @@ -/* - * Copyright (c) 2010 Yahoo! Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific - * language governing permissions and limitations under the - * License. See accompanying LICENSE file. - */ -package org.apache.s4.comm.test; - -import org.apache.s4.comm.util.CommUtil; -import org.apache.s4.comm.util.JSONUtil; -import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType; -import org.apache.s4.comm.zk.ZkTaskSetup; -import org.apache.s4.comm.zk.ZkTaskManager; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; - -public class TestTaskSetupApp { - - public static void main(String[] args) throws Exception { - new TestTaskSetupApp().testTaskSetup1(); - } - - // test the case - public void testTaskSetup1() throws Exception { - String address = "effortfell.greatamerica.corp.yahoo.com:2181"; - Watcher watcher = new Watcher() { - - @Override - public void process(WatchedEvent event) { - - } - - }; - // setup - ZooKeeper zk = new ZooKeeper(address, 30000, watcher); - String root = "/tasksetup_app_test"; - ZkTaskSetup zkSetup = new ZkTaskSetup(address, root, ClusterType.S4); - Map task1 = new HashMap(); - task1.put("name", "task-1"); - - Map task2 = new HashMap(); - task2.put("name", "task-2"); - String tasksListRoot = root + "/tasks"; - zkSetup.cleanUp(); - Stat exists = zk.exists(tasksListRoot, false); - myassert(exists == null); - Object[] data = new Object[] { task1, task2 }; - zkSetup.setUpTasks(data); - - // verify that tasks are created - exists = zk.exists(tasksListRoot, false); - myassert(exists != null); - List children = zk.getChildren(tasksListRoot, false); - myassert(children.size() == data.length); - boolean[] matched = new boolean[data.length]; - for (String child : children) { - System.out.println(child); - String childPath = tasksListRoot + "/" + child; - Stat sTemp = zk.exists(childPath, false); - byte[] tempData = zk.getData(tasksListRoot + "/" + child, - false, - sTemp); - Map map = (Map) JSONUtil.getMapFromJson(new String(tempData)); - // check if it matches any of the data - for (int i = 0; i < data.length; i++) { - Map newData = (Map) data[i]; - if (!matched[i] && CommUtil.compareMaps(newData, map)) { - matched[i] = true; - break; - } - } - } - for (int i = 0; i < matched.length; i++) { - myassert(matched[i]); - } - - // try running again and make verify new node is not created - Stat oldStat = zk.exists(tasksListRoot, false); - System.out.println("oldStat=" + oldStat); - zkSetup.setUpTasks(data); - Stat newStat = zk.exists(tasksListRoot, false); - System.out.println("newstat=" + newStat); - myassert(oldStat.getMtime() == newStat.getMtime()); - - // make change to task config and try running again and verify new - // config is uploaded - oldStat = zk.exists(tasksListRoot, false); - System.out.println("oldStat=" + oldStat.getVersion()); - ((Map) data[data.length - 1]).put("name", "changedname"); - zkSetup.setUpTasks(data); - newStat = zk.exists(tasksListRoot, false); - System.out.println("newstat=" + newStat.getVersion()); - System.out.println(); - myassert(oldStat.getMtime() != newStat.getMtime()); - - // ensure version change is working - zkSetup.setUpTasks("1.0.0.0", data); - } - - private void myassert(boolean b) { - if (!b) { - throw new AssertionError(); - } - } -} +/* + * Copyright (c) 2010 Yahoo! Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the + * License. See accompanying LICENSE file. + */ +package org.apache.s4.comm.test; + +import org.apache.s4.comm.util.CommUtil; +import org.apache.s4.comm.util.JSONUtil; +import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType; +import org.apache.s4.comm.zk.ZkTaskSetup; +import org.apache.s4.comm.zk.ZkTaskManager; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; + +public class TestTaskSetupApp { + + public static void main(String[] args) throws Exception { + new TestTaskSetupApp().testTaskSetup1(); + } + + // test the case + public void testTaskSetup1() throws Exception { + String address = "effortfell.greatamerica.corp.yahoo.com:2181"; + Watcher watcher = new Watcher() { + + @Override + public void process(WatchedEvent event) { + + } + + }; + // setup + ZooKeeper zk = new ZooKeeper(address, 30000, watcher); + String root = "/tasksetup_app_test"; + ZkTaskSetup zkSetup = new ZkTaskSetup(address, root, ClusterType.S4); + Map task1 = new HashMap(); + task1.put("name", "task-1"); + + Map task2 = new HashMap(); + task2.put("name", "task-2"); + String tasksListRoot = root + "/tasks"; + zkSetup.cleanUp(); + Stat exists = zk.exists(tasksListRoot, false); + myassert(exists == null); + Object[] data = new Object[] { task1, task2 }; + zkSetup.setUpTasks(data); + + // verify that tasks are created + exists = zk.exists(tasksListRoot, false); + myassert(exists != null); + List children = zk.getChildren(tasksListRoot, false); + myassert(children.size() == data.length); + boolean[] matched = new boolean[data.length]; + for (String child : children) { + System.out.println(child); + String childPath = tasksListRoot + "/" + child; + Stat sTemp = zk.exists(childPath, false); + byte[] tempData = zk.getData(tasksListRoot + "/" + child, + false, + sTemp); + Map map = (Map) JSONUtil.getMapFromJson(new String(tempData)); + // check if it matches any of the data + for (int i = 0; i < data.length; i++) { + Map newData = (Map) data[i]; + if (!matched[i] && CommUtil.compareMaps(newData, map)) { + matched[i] = true; + break; + } + } + } + for (int i = 0; i < matched.length; i++) { + myassert(matched[i]); + } + + // try running again and make verify new node is not created + Stat oldStat = zk.exists(tasksListRoot, false); + System.out.println("oldStat=" + oldStat); + zkSetup.setUpTasks(data); + Stat newStat = zk.exists(tasksListRoot, false); + System.out.println("newstat=" + newStat); + myassert(oldStat.getMtime() == newStat.getMtime()); + + // make change to task config and try running again and verify new + // config is uploaded + oldStat = zk.exists(tasksListRoot, false); + System.out.println("oldStat=" + oldStat.getVersion()); + ((Map) data[data.length - 1]).put("name", "changedname"); + zkSetup.setUpTasks(data); + newStat = zk.exists(tasksListRoot, false); + System.out.println("newstat=" + newStat.getVersion()); + System.out.println(); + myassert(oldStat.getMtime() != newStat.getMtime()); + + // ensure version change is working + zkSetup.setUpTasks("1.0.0.0", data); + } + + private void myassert(boolean b) { + if (!b) { + throw new AssertionError(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java ---------------------------------------------------------------------- diff --git a/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java b/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java index df214c3..3bfcaa0 100644 --- a/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java +++ b/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetupApp.java @@ -1,86 +1,86 @@ -/* - * Copyright (c) 2010 Yahoo! Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific - * language governing permissions and limitations under the - * License. See accompanying LICENSE file. - */ -package org.apache.s4.comm.tools; - -import org.apache.s4.comm.util.ConfigUtils; -import org.apache.s4.comm.util.ConfigParser; -import org.apache.s4.comm.util.ConfigParser.Cluster; -import org.apache.s4.comm.util.ConfigParser.Config; -import org.apache.s4.comm.zk.ZkTaskSetup; - -import java.io.File; -import java.util.List; -import java.util.Map; - -/** - * This class will set up initial tasks on the zookeeper USAGE: java AppTask - * [clean] setup config.xml - * - * @author kishoreg - * - */ -public class TaskSetupApp { - public static void main(String[] args) { - String zkAddress = ""; - boolean clean = false; - boolean setup = false; - String setupXml = null; - for (int i = 0; i < args.length; i++) { - if (i == 0) { - zkAddress = args[0]; - } - if (args[i].equals("clean")) { - clean = true; - } else if (args[i].equals("setup")) { - setup = true; - } else if (i == args.length - 1) { - setupXml = args[i]; - } - } - if (setupXml == null || !new File(setupXml).exists()) { - printusage("Set up xml: " + setupXml + " does not exist"); - } - if (!setup && !clean) { - System.err.println("Invalid usage."); - printusage("Must specify at least one of of clean, setup."); - } - doMain(zkAddress, clean, setup, setupXml); - } - - private static void printusage(String message) { - System.err.println(message); - System.err.println("java TaskSetupApp [clean|setup] setup_config_xml"); - System.exit(1); - } - - private static void doMain(String zkAddress, boolean clean, boolean setup, String setupXml) { - ConfigParser parser = new ConfigParser(); - Config config = parser.parse(setupXml); - for (Cluster cluster : config.getClusters()) { - processCluster(clean, zkAddress, cluster, config.getVersion()); - } - } - - private static void processCluster(boolean clean, String zkAddress, Cluster cluster, String version) { - List> clusterInfo = ConfigUtils.readConfig(cluster, cluster.getName(), cluster.getType(), false); - ZkTaskSetup zkSetup = new ZkTaskSetup(zkAddress, cluster.getName(), cluster.getType()); - if (clean) { - zkSetup.cleanUp(); - } - - zkSetup.setUpTasks(version, clusterInfo.toArray()); - } -} +/* + * Copyright (c) 2010 Yahoo! Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the + * License. See accompanying LICENSE file. + */ +package org.apache.s4.comm.tools; + +import org.apache.s4.comm.util.ConfigUtils; +import org.apache.s4.comm.util.ConfigParser; +import org.apache.s4.comm.util.ConfigParser.Cluster; +import org.apache.s4.comm.util.ConfigParser.Config; +import org.apache.s4.comm.zk.ZkTaskSetup; + +import java.io.File; +import java.util.List; +import java.util.Map; + +/** + * This class will set up initial tasks on the zookeeper USAGE: java AppTask + * [clean] setup config.xml + * + * @author kishoreg + * + */ +public class TaskSetupApp { + public static void main(String[] args) { + String zkAddress = ""; + boolean clean = false; + boolean setup = false; + String setupXml = null; + for (int i = 0; i < args.length; i++) { + if (i == 0) { + zkAddress = args[0]; + } + if (args[i].equals("clean")) { + clean = true; + } else if (args[i].equals("setup")) { + setup = true; + } else if (i == args.length - 1) { + setupXml = args[i]; + } + } + if (setupXml == null || !new File(setupXml).exists()) { + printusage("Set up xml: " + setupXml + " does not exist"); + } + if (!setup && !clean) { + System.err.println("Invalid usage."); + printusage("Must specify at least one of of clean, setup."); + } + doMain(zkAddress, clean, setup, setupXml); + } + + private static void printusage(String message) { + System.err.println(message); + System.err.println("java TaskSetupApp [clean|setup] setup_config_xml"); + System.exit(1); + } + + private static void doMain(String zkAddress, boolean clean, boolean setup, String setupXml) { + ConfigParser parser = new ConfigParser(); + Config config = parser.parse(setupXml); + for (Cluster cluster : config.getClusters()) { + processCluster(clean, zkAddress, cluster, config.getVersion()); + } + } + + private static void processCluster(boolean clean, String zkAddress, Cluster cluster, String version) { + List> clusterInfo = ConfigUtils.readConfig(cluster, cluster.getName(), cluster.getType(), false); + ZkTaskSetup zkSetup = new ZkTaskSetup(zkAddress, cluster.getName(), cluster.getType()); + if (clean) { + zkSetup.cleanUp(); + } + + zkSetup.setUpTasks(version, clusterInfo.toArray()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java ---------------------------------------------------------------------- diff --git a/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java b/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java index e440f10..98499a1 100644 --- a/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java +++ b/s4-comm/src/main/java/org/apache/s4/comm/util/CommUtil.java @@ -1,38 +1,38 @@ -/* - * Copyright (c) 2010 Yahoo! Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific - * language governing permissions and limitations under the - * License. See accompanying LICENSE file. - */ -package org.apache.s4.comm.util; - -import java.util.Map; - -public class CommUtil { - - public static boolean compareMaps(Map map1, Map map2) { - boolean equals = true; - if (map1.size() == map2.size()) { - for (String key : map1.keySet()) { - if (!(map2.containsKey(key) && map1.get(key) - .equals(map2.get(key)))) { - equals = false; - break; - } - } - } else { - equals = false; - } - return equals; - } - -} +/* + * Copyright (c) 2010 Yahoo! Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the + * License. See accompanying LICENSE file. + */ +package org.apache.s4.comm.util; + +import java.util.Map; + +public class CommUtil { + + public static boolean compareMaps(Map map1, Map map2) { + boolean equals = true; + if (map1.size() == map2.size()) { + for (String key : map1.keySet()) { + if (!(map2.containsKey(key) && map1.get(key) + .equals(map2.get(key)))) { + equals = false; + break; + } + } + } else { + equals = false; + } + return equals; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java ---------------------------------------------------------------------- diff --git a/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java b/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java index 0e968d9..3b5caac 100644 --- a/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java +++ b/s4-comm/src/main/java/org/apache/s4/comm/util/SystemUtils.java @@ -1,39 +1,39 @@ -/* - * Copyright (c) 2010 Yahoo! Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific - * language governing permissions and limitations under the - * License. See accompanying LICENSE file. - */ -package org.apache.s4.comm.util; - -public class SystemUtils { - - private SystemUtils() { - } - - public static long getPID() { - String processName = java.lang.management.ManagementFactory.getRuntimeMXBean() - .getName(); - return Long.parseLong(processName.split("@")[0]); - } - - public static void main(String[] args) { - String msg = "My PID is " + SystemUtils.getPID(); - - javax.swing.JOptionPane.showConfirmDialog((java.awt.Component) null, - msg, - "SystemUtils", - javax.swing.JOptionPane.DEFAULT_OPTION); - - } - -} +/* + * Copyright (c) 2010 Yahoo! Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the + * License. See accompanying LICENSE file. + */ +package org.apache.s4.comm.util; + +public class SystemUtils { + + private SystemUtils() { + } + + public static long getPID() { + String processName = java.lang.management.ManagementFactory.getRuntimeMXBean() + .getName(); + return Long.parseLong(processName.split("@")[0]); + } + + public static void main(String[] args) { + String msg = "My PID is " + SystemUtils.getPID(); + + javax.swing.JOptionPane.showConfirmDialog((java.awt.Component) null, + msg, + "SystemUtils", + javax.swing.JOptionPane.DEFAULT_OPTION); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java ---------------------------------------------------------------------- diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java index f5293b8..1588e54 100644 --- a/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java +++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ThreadTest.java @@ -1,64 +1,64 @@ -/* - * Copyright (c) 2010 Yahoo! Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific - * language governing permissions and limitations under the - * License. See accompanying LICENSE file. - */ -package org.apache.s4.comm.zk; - -public class ThreadTest { - static Object lock = new Object(); - - public static void main(String[] args) { - - Thread t1 = new Thread(new Runnable() { - @Override - public void run() { - while (true) { - synchronized (lock) { - System.out.println("In thread T1"); - try { - System.out.println("Going to wait"); - long start = System.currentTimeMillis(); - lock.wait(); - long end = System.currentTimeMillis(); - System.out.println("Woke up T1 after :" - + (end - start) / 1000 + "secs"); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - } - }); - t1.start(); - Thread t2 = new Thread(new Runnable() { - @Override - public void run() { - while (true) { - synchronized (lock) { - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - System.out.println("In thread T2"); - lock.notify(); - break; - } - } - } - }); - t2.start(); - } -} +/* + * Copyright (c) 2010 Yahoo! Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the + * License. See accompanying LICENSE file. + */ +package org.apache.s4.comm.zk; + +public class ThreadTest { + static Object lock = new Object(); + + public static void main(String[] args) { + + Thread t1 = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + synchronized (lock) { + System.out.println("In thread T1"); + try { + System.out.println("Going to wait"); + long start = System.currentTimeMillis(); + lock.wait(); + long end = System.currentTimeMillis(); + System.out.println("Woke up T1 after :" + + (end - start) / 1000 + "secs"); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + }); + t1.start(); + Thread t2 = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + synchronized (lock) { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + System.out.println("In thread T2"); + lock.notify(); + break; + } + } + } + }); + t2.start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java ---------------------------------------------------------------------- diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java index 0fe7fa4..7434901 100644 --- a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java +++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkProcessMonitor.java @@ -1,147 +1,147 @@ -/* - * Copyright (c) 2010 Yahoo! Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific - * language governing permissions and limitations under the - * License. See accompanying LICENSE file. - */ -package org.apache.s4.comm.zk; - -import org.apache.s4.comm.core.CommEventCallback; -import org.apache.s4.comm.core.DefaultWatcher; -import org.apache.s4.comm.core.ProcessMonitor; -import org.apache.s4.comm.util.JSONUtil; -import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.log4j.Logger; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; - -public class ZkProcessMonitor extends DefaultWatcher implements Runnable, - ProcessMonitor { - static Logger logger = Logger.getLogger(ZkProcessMonitor.class); - private List destinationList; - private Map destinationMap; - private String processZNode; - private Object lock = new Object(); - private volatile boolean updateMode = false; - private String taskZNode; - private int taskCount; - - public ZkProcessMonitor(String address, String clusterName, ClusterType clusterType) { - this(address, clusterName, clusterType, null); - } - - public ZkProcessMonitor(String address, String ClusterName, ClusterType clusterType, - CommEventCallback callbackHandler) { - super(address, callbackHandler); - String root = "/" + ClusterName + "/" + clusterType.toString(); - this.taskZNode = root + "/task"; - this.processZNode = root + "/process"; - destinationMap = new HashMap(); - destinationList = new ArrayList(); - } - - public void monitor() { - synchronized (mutex) { - readConfig(); - } - new Thread(this).start(); - } - - private void readConfig() { - try { - synchronized (lock) { - Map tempDestinationMap = new HashMap(); - List tempDestinationList = new ArrayList(); - updateMode = true; - List tasks = zk.getChildren(taskZNode, false); - this.taskCount = tasks.size(); - List children = zk.getChildren(processZNode, false); - for (String name : children) { - Stat stat = zk.exists(processZNode + "/" + name, false); - if (stat != null) { - byte[] data = zk.getData(processZNode + "/" + name, - false, - stat); - Map map = (Map) JSONUtil.getMapFromJson(new String(data)); - String key = (String) map.get("partition"); - if (key != null) { - tempDestinationMap.put(Integer.parseInt(key), map); - } - tempDestinationList.add(map); - } - } - destinationList.clear(); - destinationMap.clear(); - destinationList.addAll(tempDestinationList); - destinationMap.putAll(tempDestinationMap); - logger.info("Updated Destination List to" + destinationList); - logger.info("Updated Destination Map to" + destinationMap); - } - } catch (KeeperException e) { - logger.warn("Ignorable exception if it happens once in a while", e); - } catch (InterruptedException e) { - logger.error("Interrupted exception cause while reading process znode", - e); - } finally { - updateMode = false; - } - } - - public void run() { - try { - while (true) { - synchronized (mutex) { - // set watch - logger.info("Setting watch on " + processZNode); - zk.getChildren(processZNode, true); - readConfig(); - mutex.wait(); - } - } - } catch (KeeperException e) { - logger.warn("KeeperException in ProcessMonitor.run", e); - } catch (InterruptedException e) { - logger.warn("InterruptedException in ProcessMonitor.run", e); - } - } - - public List getDestinationList() { - if (updateMode) { - synchronized (lock) { - return destinationList; - } - } else { - return destinationList; - } - } - - public Map getDestinationMap() { - if (updateMode) { - synchronized (lock) { - return destinationMap; - } - } else { - return destinationMap; - } - } - - @Override - public int getTaskCount() { - return taskCount; - } -} +/* + * Copyright (c) 2010 Yahoo! Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the + * License. See accompanying LICENSE file. + */ +package org.apache.s4.comm.zk; + +import org.apache.s4.comm.core.CommEventCallback; +import org.apache.s4.comm.core.DefaultWatcher; +import org.apache.s4.comm.core.ProcessMonitor; +import org.apache.s4.comm.util.JSONUtil; +import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +public class ZkProcessMonitor extends DefaultWatcher implements Runnable, + ProcessMonitor { + static Logger logger = Logger.getLogger(ZkProcessMonitor.class); + private List destinationList; + private Map destinationMap; + private String processZNode; + private Object lock = new Object(); + private volatile boolean updateMode = false; + private String taskZNode; + private int taskCount; + + public ZkProcessMonitor(String address, String clusterName, ClusterType clusterType) { + this(address, clusterName, clusterType, null); + } + + public ZkProcessMonitor(String address, String ClusterName, ClusterType clusterType, + CommEventCallback callbackHandler) { + super(address, callbackHandler); + String root = "/" + ClusterName + "/" + clusterType.toString(); + this.taskZNode = root + "/task"; + this.processZNode = root + "/process"; + destinationMap = new HashMap(); + destinationList = new ArrayList(); + } + + public void monitor() { + synchronized (mutex) { + readConfig(); + } + new Thread(this).start(); + } + + private void readConfig() { + try { + synchronized (lock) { + Map tempDestinationMap = new HashMap(); + List tempDestinationList = new ArrayList(); + updateMode = true; + List tasks = zk.getChildren(taskZNode, false); + this.taskCount = tasks.size(); + List children = zk.getChildren(processZNode, false); + for (String name : children) { + Stat stat = zk.exists(processZNode + "/" + name, false); + if (stat != null) { + byte[] data = zk.getData(processZNode + "/" + name, + false, + stat); + Map map = (Map) JSONUtil.getMapFromJson(new String(data)); + String key = (String) map.get("partition"); + if (key != null) { + tempDestinationMap.put(Integer.parseInt(key), map); + } + tempDestinationList.add(map); + } + } + destinationList.clear(); + destinationMap.clear(); + destinationList.addAll(tempDestinationList); + destinationMap.putAll(tempDestinationMap); + logger.info("Updated Destination List to" + destinationList); + logger.info("Updated Destination Map to" + destinationMap); + } + } catch (KeeperException e) { + logger.warn("Ignorable exception if it happens once in a while", e); + } catch (InterruptedException e) { + logger.error("Interrupted exception cause while reading process znode", + e); + } finally { + updateMode = false; + } + } + + public void run() { + try { + while (true) { + synchronized (mutex) { + // set watch + logger.info("Setting watch on " + processZNode); + zk.getChildren(processZNode, true); + readConfig(); + mutex.wait(); + } + } + } catch (KeeperException e) { + logger.warn("KeeperException in ProcessMonitor.run", e); + } catch (InterruptedException e) { + logger.warn("InterruptedException in ProcessMonitor.run", e); + } + } + + public List getDestinationList() { + if (updateMode) { + synchronized (lock) { + return destinationList; + } + } else { + return destinationList; + } + } + + public Map getDestinationMap() { + if (updateMode) { + synchronized (lock) { + return destinationMap; + } + } else { + return destinationMap; + } + } + + @Override + public int getTaskCount() { + return taskCount; + } +} http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java ---------------------------------------------------------------------- diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java index 0b58018..1003baa 100644 --- a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java +++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkTaskSetup.java @@ -1,282 +1,282 @@ -/* - * Copyright (c) 2010 Yahoo! Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific - * language governing permissions and limitations under the - * License. See accompanying LICENSE file. - */ -package org.apache.s4.comm.zk; - -import org.apache.s4.comm.core.CommEventCallback; -import org.apache.s4.comm.core.DefaultWatcher; -import org.apache.s4.comm.util.CommUtil; -import org.apache.s4.comm.util.JSONUtil; -import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType; - -import java.io.File; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.log4j.Logger; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; - -public class ZkTaskSetup extends DefaultWatcher { - static Logger logger = Logger.getLogger(ZkTaskSetup.class); - String tasksListRoot; - String processListRoot; - - public ZkTaskSetup(String address, String clusterName, ClusterType clusterType) { - this(address, clusterName, clusterType, null); - } - - /** - * Constructor of ZkTaskSetup - * - * @param address - * @param clusterName - */ - public ZkTaskSetup(String address, String clusterName, ClusterType clusterType, - CommEventCallback callbackHandler) { - super(address, callbackHandler); - - this.root = "/" + clusterName + "/" + clusterType.toString(); - this.tasksListRoot = root + "/task"; - this.processListRoot = root + "/process"; - } - - public void setUpTasks(Object[] data) { - setUpTasks("-1", data); - } - - /** - * Creates task nodes. - * - * @param version - * @param data - */ - public void setUpTasks(String version, Object[] data) { - try { - logger.info("Trying to set up configuration with new version:" - + version); - if (!version.equals("-1")) { - if (!isConfigVersionNewer(version)) { - logger.info("Config version not newer than current version"); - return; - } else { - cleanUp(); - } - } else { - logger.info("Not checking version number since it is set to -1"); - } - - // check if config data newer - if (!isConfigDataNewer(data)) { - logger.info("Config data not newer than current version"); - return; - } else { - logger.info("Found newer Config data. Cleaning old data"); - cleanUp(); - } - - // Create ZK node name - if (zk != null) { - Stat s; - s = zk.exists(root, false); - if (s == null) { - String parent = new File(root).getParent() - .replace(File.separatorChar, - '/'); - if (logger.isDebugEnabled()) { - logger.debug("parent:" + parent); - } - Stat exists = zk.exists(parent, false); - if (exists == null) { - zk.create(parent, - new byte[0], - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } - zk.create(root, - new byte[0], - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } - } - Stat s; - s = zk.exists(tasksListRoot, false); - if (s == null) { - Map map = new HashMap(); - map.put("config.version", version); - String jsonString = JSONUtil.toJsonString(map); - zk.create(tasksListRoot, - jsonString.getBytes(), - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } - s = zk.exists(processListRoot, false); - if (s == null) { - zk.create(processListRoot, - new byte[0], - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - - } - - for (int i = 0; i < data.length; i++) { - String nodeName = tasksListRoot + "/" + "task" + "-" + i; - Stat sTask = zk.exists(nodeName, false); - if (sTask == null) { - logger.info("Creating taskNode: " + nodeName); - byte[] byteBuffer = JSONUtil.toJsonString(data[i]) - .getBytes(); - zk.create(nodeName, - byteBuffer, - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } else { - logger.warn("TaskNode already exisits: " + nodeName); - } - } - } catch (Exception e) { - logger.error("Keeper exception when creating task nodes: " - + e.toString(), - e); - throw new RuntimeException(e); - } - } - - private boolean isConfigDataNewer(Object[] data) { - try { - Stat s; - s = zk.exists(tasksListRoot, false); - if (s != null) { - List children = zk.getChildren(tasksListRoot, false); - if (children.size() != data.length) { - return true; - } - boolean[] matched = new boolean[data.length]; - for (String child : children) { - String childPath = tasksListRoot + "/" + child; - Stat sTemp = zk.exists(childPath, false); - byte[] tempData = zk.getData(tasksListRoot + "/" + child, - false, - sTemp); - Map map = (Map) JSONUtil.getMapFromJson(new String(tempData)); - - // check if it matches any of the data - for (int i = 0; i < data.length; i++) { - Map newData = (Map) data[i]; - if (!matched[i] && CommUtil.compareMaps(newData, map)) { - matched[i] = true; - break; - } - } - } - for (int i = 0; i < matched.length; i++) { - if (!matched[i]) { - return true; - } - } - } else { - return true; - } - } catch (Exception e) { - throw new RuntimeException(" Exception in isConfigDataNewer method ", - e); - } - return false; - } - - private boolean isConfigVersionNewer(String version) throws Exception { - Stat s; - s = zk.exists(tasksListRoot, false); - if (s != null) { - byte[] data = zk.getData(tasksListRoot, false, s); - if (data != null && data.length > 0) { - String jsonString = new String(data); - if (jsonString != null) { - Map map = JSONUtil.getMapFromJson(jsonString); - if (map.containsKey("config.version")) { - boolean update = false; - String currentVersion = map.get("config.version") - .toString(); - logger.info("Current config version:" + currentVersion); - String[] curV = currentVersion.split("\\."); - String[] newV = version.split("\\."); - for (int i = 0; i < Math.max(curV.length, newV.length); i++) { - if (Integer.parseInt(newV[i]) > Integer.parseInt(curV[i])) { - update = true; - break; - } - } - if (!update) { - logger.info("Current config version is newer. Config will not be updated"); - } - return update; - } - } else { - logger.info("No data at znode " + tasksListRoot - + " so version checking will not be done"); - } - } else { - logger.info("No data at znode " + tasksListRoot - + " so version checking will not be done"); - } - } else { - logger.info("znode " + tasksListRoot - + " does not exist, so creating new one is fine"); - } - return true; - } - - /** - * Will clean up taskList Node and process List Node - */ - public boolean cleanUp() { - try { - logger.info("Cleaning :" + tasksListRoot); - Stat exists = zk.exists(tasksListRoot, false); - if (exists != null) { - List children = zk.getChildren(tasksListRoot, false); - if (children.size() > 0) { - for (String child : children) { - logger.info("Cleaning :" + tasksListRoot + "/" + child); - zk.delete(tasksListRoot + "/" + child, 0); - } - } - zk.delete(tasksListRoot, 0); - } - - exists = zk.exists(processListRoot, false); - if (exists != null) { - List children = zk.getChildren(processListRoot, false); - if (children.size() > 0) { - logger.warn("Some processes are already running. Cleaning them up. Might result in unpredictable behavior"); - for (String child : children) { - logger.info("Cleaning :" + processListRoot + "/" - + child); - zk.delete(processListRoot + "/" + child, 0); - } - } - logger.info("Finished cleaning :" + processListRoot); - zk.delete(processListRoot, 0); - } - return true; - } catch (Exception e) { - logger.error("Exception while cleaning up: " + e.getMessage(), e); - return false; - } - } - -} +/* + * Copyright (c) 2010 Yahoo! Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the + * License. See accompanying LICENSE file. + */ +package org.apache.s4.comm.zk; + +import org.apache.s4.comm.core.CommEventCallback; +import org.apache.s4.comm.core.DefaultWatcher; +import org.apache.s4.comm.util.CommUtil; +import org.apache.s4.comm.util.JSONUtil; +import org.apache.s4.comm.util.ConfigParser.Cluster.ClusterType; + +import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.Stat; + +public class ZkTaskSetup extends DefaultWatcher { + static Logger logger = Logger.getLogger(ZkTaskSetup.class); + String tasksListRoot; + String processListRoot; + + public ZkTaskSetup(String address, String clusterName, ClusterType clusterType) { + this(address, clusterName, clusterType, null); + } + + /** + * Constructor of ZkTaskSetup + * + * @param address + * @param clusterName + */ + public ZkTaskSetup(String address, String clusterName, ClusterType clusterType, + CommEventCallback callbackHandler) { + super(address, callbackHandler); + + this.root = "/" + clusterName + "/" + clusterType.toString(); + this.tasksListRoot = root + "/task"; + this.processListRoot = root + "/process"; + } + + public void setUpTasks(Object[] data) { + setUpTasks("-1", data); + } + + /** + * Creates task nodes. + * + * @param version + * @param data + */ + public void setUpTasks(String version, Object[] data) { + try { + logger.info("Trying to set up configuration with new version:" + + version); + if (!version.equals("-1")) { + if (!isConfigVersionNewer(version)) { + logger.info("Config version not newer than current version"); + return; + } else { + cleanUp(); + } + } else { + logger.info("Not checking version number since it is set to -1"); + } + + // check if config data newer + if (!isConfigDataNewer(data)) { + logger.info("Config data not newer than current version"); + return; + } else { + logger.info("Found newer Config data. Cleaning old data"); + cleanUp(); + } + + // Create ZK node name + if (zk != null) { + Stat s; + s = zk.exists(root, false); + if (s == null) { + String parent = new File(root).getParent() + .replace(File.separatorChar, + '/'); + if (logger.isDebugEnabled()) { + logger.debug("parent:" + parent); + } + Stat exists = zk.exists(parent, false); + if (exists == null) { + zk.create(parent, + new byte[0], + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } + zk.create(root, + new byte[0], + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } + } + Stat s; + s = zk.exists(tasksListRoot, false); + if (s == null) { + Map map = new HashMap(); + map.put("config.version", version); + String jsonString = JSONUtil.toJsonString(map); + zk.create(tasksListRoot, + jsonString.getBytes(), + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } + s = zk.exists(processListRoot, false); + if (s == null) { + zk.create(processListRoot, + new byte[0], + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + } + + for (int i = 0; i < data.length; i++) { + String nodeName = tasksListRoot + "/" + "task" + "-" + i; + Stat sTask = zk.exists(nodeName, false); + if (sTask == null) { + logger.info("Creating taskNode: " + nodeName); + byte[] byteBuffer = JSONUtil.toJsonString(data[i]) + .getBytes(); + zk.create(nodeName, + byteBuffer, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } else { + logger.warn("TaskNode already exisits: " + nodeName); + } + } + } catch (Exception e) { + logger.error("Keeper exception when creating task nodes: " + + e.toString(), + e); + throw new RuntimeException(e); + } + } + + private boolean isConfigDataNewer(Object[] data) { + try { + Stat s; + s = zk.exists(tasksListRoot, false); + if (s != null) { + List children = zk.getChildren(tasksListRoot, false); + if (children.size() != data.length) { + return true; + } + boolean[] matched = new boolean[data.length]; + for (String child : children) { + String childPath = tasksListRoot + "/" + child; + Stat sTemp = zk.exists(childPath, false); + byte[] tempData = zk.getData(tasksListRoot + "/" + child, + false, + sTemp); + Map map = (Map) JSONUtil.getMapFromJson(new String(tempData)); + + // check if it matches any of the data + for (int i = 0; i < data.length; i++) { + Map newData = (Map) data[i]; + if (!matched[i] && CommUtil.compareMaps(newData, map)) { + matched[i] = true; + break; + } + } + } + for (int i = 0; i < matched.length; i++) { + if (!matched[i]) { + return true; + } + } + } else { + return true; + } + } catch (Exception e) { + throw new RuntimeException(" Exception in isConfigDataNewer method ", + e); + } + return false; + } + + private boolean isConfigVersionNewer(String version) throws Exception { + Stat s; + s = zk.exists(tasksListRoot, false); + if (s != null) { + byte[] data = zk.getData(tasksListRoot, false, s); + if (data != null && data.length > 0) { + String jsonString = new String(data); + if (jsonString != null) { + Map map = JSONUtil.getMapFromJson(jsonString); + if (map.containsKey("config.version")) { + boolean update = false; + String currentVersion = map.get("config.version") + .toString(); + logger.info("Current config version:" + currentVersion); + String[] curV = currentVersion.split("\\."); + String[] newV = version.split("\\."); + for (int i = 0; i < Math.max(curV.length, newV.length); i++) { + if (Integer.parseInt(newV[i]) > Integer.parseInt(curV[i])) { + update = true; + break; + } + } + if (!update) { + logger.info("Current config version is newer. Config will not be updated"); + } + return update; + } + } else { + logger.info("No data at znode " + tasksListRoot + + " so version checking will not be done"); + } + } else { + logger.info("No data at znode " + tasksListRoot + + " so version checking will not be done"); + } + } else { + logger.info("znode " + tasksListRoot + + " does not exist, so creating new one is fine"); + } + return true; + } + + /** + * Will clean up taskList Node and process List Node + */ + public boolean cleanUp() { + try { + logger.info("Cleaning :" + tasksListRoot); + Stat exists = zk.exists(tasksListRoot, false); + if (exists != null) { + List children = zk.getChildren(tasksListRoot, false); + if (children.size() > 0) { + for (String child : children) { + logger.info("Cleaning :" + tasksListRoot + "/" + child); + zk.delete(tasksListRoot + "/" + child, 0); + } + } + zk.delete(tasksListRoot, 0); + } + + exists = zk.exists(processListRoot, false); + if (exists != null) { + List children = zk.getChildren(processListRoot, false); + if (children.size() > 0) { + logger.warn("Some processes are already running. Cleaning them up. Might result in unpredictable behavior"); + for (String child : children) { + logger.info("Cleaning :" + processListRoot + "/" + + child); + zk.delete(processListRoot + "/" + child, 0); + } + } + logger.info("Finished cleaning :" + processListRoot); + zk.delete(processListRoot, 0); + } + return true; + } catch (Exception e) { + logger.error("Exception while cleaning up: " + e.getMessage(), e); + return false; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java ---------------------------------------------------------------------- diff --git a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java index c743db8..1ee2ccb 100644 --- a/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java +++ b/s4-comm/src/main/java/org/apache/s4/comm/zk/ZkUtil.java @@ -1,144 +1,144 @@ -/* - * Copyright (c) 2010 Yahoo! Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific - * language governing permissions and limitations under the - * License. See accompanying LICENSE file. - */ -package org.apache.s4.comm.zk; - -import org.apache.s4.comm.core.DefaultWatcher; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.List; - -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.ACL; - -public class ZkUtil extends DefaultWatcher { - - public ZkUtil(String address) { - super(address); - - } - - public int getChildCount(String path) { - try { - List children = zk.getChildren(path, false); - return children.size(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public List getChildren(String path) { - try { - List children = zk.getChildren(path, false); - return children; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public byte[] getData(String path) { - try { - byte[] data = zk.getData(path, false, null); - return data; - } catch (Exception e) { - throw new RuntimeException(e); - } - - } - - public void create(String path) { - create(path, ""); - } - - public void create(String path, String data) { - try { - zk.create(path, - data.getBytes(), - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } catch (Exception e) { - throw new RuntimeException(e); - } - - } - - public void deleteRecursive(String path) { - List children = getChildren(path); - for (String child : children) { - deleteRecursive(path + "/" + child); - } - delete(path); - } - - public void delete(String path) { - try { - zk.delete(path, -1); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public static void main(String[] args) throws Exception { - if (args.length == 0) { - printUsage(); - - } - String address = args[0]; - String methodName = args[1]; - - String[] methodArgs = new String[args.length - 2]; - for (int i = 2; i < args.length; i++) { - methodArgs[i - 2] = args[i]; - } - Method[] methods = ZkUtil.class.getMethods(); - Method method = null; - for (Method met : methods) { - if (met.getName().equals(methodName) - && met.getParameterTypes().length == methodArgs.length) { - method = met; - break; - } - } - - if (method != null) { - ZkUtil zkUtil = new ZkUtil(address); - Object ret = method.invoke(zkUtil, (Object[]) methodArgs); - if (ret != null) { - System.out.println("**********"); - System.out.println(ret); - System.out.println("**********"); - } - } else { - printUsage(); - } - // zkUtil.deleteRecursive("/s4/listener/process/task-0"); - // zkUtil.create("/s4_apps_test/sender/process"); - } - - private static void printUsage() { - System.out.println("USAGE"); - System.out.println("java methodName arguments"); - Method[] methods = ZkUtil.class.getMethods(); - for (Method met : methods) { - System.out.println(met.getName() + ":" - + Arrays.toString(met.getParameterTypes())); - } - System.exit(1); - } -} +/* + * Copyright (c) 2010 Yahoo! Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific + * language governing permissions and limitations under the + * License. See accompanying LICENSE file. + */ +package org.apache.s4.comm.zk; + +import org.apache.s4.comm.core.DefaultWatcher; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.ACL; + +public class ZkUtil extends DefaultWatcher { + + public ZkUtil(String address) { + super(address); + + } + + public int getChildCount(String path) { + try { + List children = zk.getChildren(path, false); + return children.size(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public List getChildren(String path) { + try { + List children = zk.getChildren(path, false); + return children; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public byte[] getData(String path) { + try { + byte[] data = zk.getData(path, false, null); + return data; + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + + public void create(String path) { + create(path, ""); + } + + public void create(String path, String data) { + try { + zk.create(path, + data.getBytes(), + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + + public void deleteRecursive(String path) { + List children = getChildren(path); + for (String child : children) { + deleteRecursive(path + "/" + child); + } + delete(path); + } + + public void delete(String path) { + try { + zk.delete(path, -1); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void main(String[] args) throws Exception { + if (args.length == 0) { + printUsage(); + + } + String address = args[0]; + String methodName = args[1]; + + String[] methodArgs = new String[args.length - 2]; + for (int i = 2; i < args.length; i++) { + methodArgs[i - 2] = args[i]; + } + Method[] methods = ZkUtil.class.getMethods(); + Method method = null; + for (Method met : methods) { + if (met.getName().equals(methodName) + && met.getParameterTypes().length == methodArgs.length) { + method = met; + break; + } + } + + if (method != null) { + ZkUtil zkUtil = new ZkUtil(address); + Object ret = method.invoke(zkUtil, (Object[]) methodArgs); + if (ret != null) { + System.out.println("**********"); + System.out.println(ret); + System.out.println("**********"); + } + } else { + printUsage(); + } + // zkUtil.deleteRecursive("/s4/listener/process/task-0"); + // zkUtil.create("/s4_apps_test/sender/process"); + } + + private static void printUsage() { + System.out.println("USAGE"); + System.out.println("java methodName arguments"); + Method[] methods = ZkUtil.class.getMethods(); + for (Method met : methods) { + System.out.println(met.getName() + ":" + + Arrays.toString(met.getParameterTypes())); + } + System.exit(1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/s4-comm/src/main/resources/log4j.xml b/s4-comm/src/main/resources/log4j.xml index 3e3c2ce..7a51b1e 100644 --- a/s4-comm/src/main/resources/log4j.xml +++ b/s4-comm/src/main/resources/log4j.xml @@ -1,33 +1,33 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file + + + + + + + + + + + + + + + + + + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/resources/sample_static_task_manager_test.xml ---------------------------------------------------------------------- diff --git a/s4-comm/src/main/resources/sample_static_task_manager_test.xml b/s4-comm/src/main/resources/sample_static_task_manager_test.xml index 11912aa..072735a 100644 --- a/s4-comm/src/main/resources/sample_static_task_manager_test.xml +++ b/s4-comm/src/main/resources/sample_static_task_manager_test.xml @@ -1,15 +1,15 @@ - - - s4_listener_process - - - 2 - s4 - listener - unicast - 0,1 - 5001,5002 - halfalways-lx - PROCESS_1,PROCESS_2 - - \ No newline at end of file + + + s4_listener_process + + + 2 + s4 + listener + unicast + 0,1 + 5001,5002 + halfalways-lx + PROCESS_1,PROCESS_2 + + http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/resources/sample_task_setup.xml ---------------------------------------------------------------------- diff --git a/s4-comm/src/main/resources/sample_task_setup.xml b/s4-comm/src/main/resources/sample_task_setup.xml index b518913..bd59c77 100644 --- a/s4-comm/src/main/resources/sample_task_setup.xml +++ b/s4-comm/src/main/resources/sample_task_setup.xml @@ -1,23 +1,23 @@ - - - - ep_tasks - - - 1.0.0.0 - 1.0.0.0 - - 2 - s4 - listener - unicast - 0,1 - 5001,5002 - - - 1 - s4_event_pipe - sender - unicast - - \ No newline at end of file + + + + ep_tasks + + + 1.0.0.0 + 1.0.0.0 + + 2 + s4 + listener + unicast + 0,1 + 5001,5002 + + + 1 + s4_event_pipe + sender + unicast + + http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f4a2c594/s4-comm/src/main/resources/taskManagerTest.xml ---------------------------------------------------------------------- diff --git a/s4-comm/src/main/resources/taskManagerTest.xml b/s4-comm/src/main/resources/taskManagerTest.xml index 11912aa..072735a 100644 --- a/s4-comm/src/main/resources/taskManagerTest.xml +++ b/s4-comm/src/main/resources/taskManagerTest.xml @@ -1,15 +1,15 @@ - - - s4_listener_process - - - 2 - s4 - listener - unicast - 0,1 - 5001,5002 - halfalways-lx - PROCESS_1,PROCESS_2 - - \ No newline at end of file + + + s4_listener_process + + + 2 + s4 + listener + unicast + 0,1 + 5001,5002 + halfalways-lx + PROCESS_1,PROCESS_2 + +