eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject incubator-eagle git commit: [EAGLE-435] make coordiantor schedule operation exclusive in distributed deployment
Date Thu, 11 Aug 2016 07:06:25 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/develop a772a0556 -> d74c186fc


[EAGLE-435] make coordiantor schedule operation exclusive in distributed deployment

Author: Li, Garrett
Reviewer: ralphsu

This closes #322


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/d74c186f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/d74c186f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/d74c186f

Branch: refs/heads/develop
Commit: d74c186fca81ebd3d5cb0f7fa1796eb1a5bd83ba
Parents: a772a05
Author: Ralph, Su <suliangfei@gmail.com>
Authored: Thu Aug 11 15:05:54 2016 +0800
Committer: Ralph, Su <suliangfei@gmail.com>
Committed: Thu Aug 11 15:05:54 2016 +0800

----------------------------------------------------------------------
 .../eagle/alert/coordinator/Coordinator.java    | 150 ++++++++++++-------
 .../alert/coordinator/CoordinatorListener.java  |  10 +-
 .../alert/coordinator/ExclusiveExecutor.java    | 118 +++++++++++++++
 .../LockWebApplicationException.java            |  34 +++++
 .../coordinator/TestExclusiveExecutor.java      | 117 +++++++++++++++
 .../eagle/service/app/TestServiceAppWithZk.java | 131 ++++++++++++++++
 6 files changed, 506 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d74c186f/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
index 06aca01..f46e4c2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
@@ -75,6 +75,11 @@ public class Coordinator {
     private final static String METADATA_SERVICE_CONTEXT = "metadataService.context";
     private final static String DYNAMIC_POLICY_LOADER_INIT_MILLS = "metadataDynamicCheck.initDelayMillis";
     private final static String DYNAMIC_POLICY_LOADER_DELAY_MILLS = "metadataDynamicCheck.delayMillis";
+    
+    private final static String GREEDY_SCHEDULER_ZK_PATH = "/alert/greedy/leader";
+    private final static String POLICY_SCHEDULER_ZK_PATH = "/alert/policy/leader";
+    private final static int ACQUIRE_LOCK_WAIT_INTERVAL_MS = 2000;
+    private final static int ACQUIRE_LOCK_MAX_RETRIES_TIMES = 90; //about 9 minutes
 
     private volatile ScheduleState currentState = null;
     private final ConfigBusProducer producer;
@@ -98,28 +103,58 @@ public class Coordinator {
     }
 
     public synchronized ScheduleState schedule(ScheduleOption option) {
-        Stopwatch watch = Stopwatch.createStarted();
-        IScheduleContext context = new ScheduleContextBuilder(client).buildContext();
-        TopologyMgmtService mgmtService = new TopologyMgmtService();
-        IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
-
-        scheduler.init(context, mgmtService);
-        ScheduleState state = scheduler.schedule(option);
-
-        long scheduleTime = watch.elapsed(TimeUnit.MILLISECONDS);
-        state.setScheduleTimeMillis((int) scheduleTime);// hardcode to integer
-        watch.reset();
-        watch.start();
-
-        // persist & notify
-        postSchedule(client, state, producer);
-
-        watch.stop();
-        long postTime = watch.elapsed(TimeUnit.MILLISECONDS);
-        LOG.info("Schedule result, schedule time {} ms, post schedule time {} ms !", scheduleTime,
postTime);
-
-        currentState = state;
-        return state;
+    	ScheduleZkState scheduleZkState = new ScheduleZkState();
+    	ExclusiveExecutor.Runnable exclusiveRunnable = new ExclusiveExecutor.Runnable() {
+			@Override
+			public void run() throws Exception {
+				scheduleZkState.scheduleAcquired = true;
+				
+				while (!scheduleZkState.scheduleCompleted) {
+					Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS);
+				}
+			}
+    	};
+    	ExclusiveExecutor.execute(GREEDY_SCHEDULER_ZK_PATH, exclusiveRunnable);
+    	int waitMaxTimes = 0;
+    	while (waitMaxTimes < ACQUIRE_LOCK_MAX_RETRIES_TIMES) { //about 3 minutes waiting
+    		if (!scheduleZkState.scheduleAcquired) {
+    			waitMaxTimes ++;
+    			try {
+					Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS);
+				} catch (InterruptedException e) {}
+    			continue;
+    		}
+    		
+    		ScheduleState state = null;
+    		try {
+    			Stopwatch watch = Stopwatch.createStarted();
+    	        IScheduleContext context = new ScheduleContextBuilder(client).buildContext();
+    	        TopologyMgmtService mgmtService = new TopologyMgmtService();
+    	        IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
+    	
+    	        scheduler.init(context, mgmtService);
+    	        state = scheduler.schedule(option);
+    	        
+    	        long scheduleTime = watch.elapsed(TimeUnit.MILLISECONDS);
+    	        state.setScheduleTimeMillis((int) scheduleTime);// hardcode to integer
+    	        watch.reset();
+    	        watch.start();
+    	
+    	        // persist & notify
+    	        postSchedule(client, state, producer);
+    	
+    	        watch.stop();
+    	        long postTime = watch.elapsed(TimeUnit.MILLISECONDS);
+    	        LOG.info("Schedule result, schedule time {} ms, post schedule time {} ms !",
scheduleTime, postTime);
+    	
+    	        currentState = state;
+    		} finally {
+    			//schedule completed
+    			scheduleZkState.scheduleCompleted = true;
+    		}
+	        return state;
+    	}
+    	throw new LockWebApplicationException("Acquire scheduler lock failed, please retry later");
     }
 
     public static void postSchedule(IMetadataServiceClient client, ScheduleState state, ConfigBusProducer
producer) {
@@ -209,34 +244,43 @@ public class Coordinator {
     }
 
     public static void startSchedule() {
-        Config config = ConfigFactory.load().getConfig(COORDINATOR);
-        // build dynamic policy loader
-        String host = config.getString(METADATA_SERVICE_HOST);
-        int port = config.getInt(METADATA_SERVICE_PORT);
-        String context = config.getString(METADATA_SERVICE_CONTEXT);
-        IMetadataServiceClient client = new MetadataServiceClientImpl(host, port, context);
-        DynamicPolicyLoader loader = new DynamicPolicyLoader(client);
-        loader.addPolicyChangeListener(new PolicyChangeHandler(config, client));
-
-        // schedule dynamic policy loader
-        long initDelayMillis = config.getLong(DYNAMIC_POLICY_LOADER_INIT_MILLS);
-        long delayMillis = config.getLong(DYNAMIC_POLICY_LOADER_DELAY_MILLS);
-        ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(2, new ThreadFactory()
{
-            @Override
-            public Thread newThread(Runnable r) {
-                Thread t = new Thread(r);
-                t.setDaemon(true);
-                return t;
-            }
-        });
-        scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, delayMillis, TimeUnit.MILLISECONDS);
-        
-        // 
-        scheduleSrv.scheduleAtFixedRate(new CoordinatorTrigger(config, client), CoordinatorTrigger.INIT_PERIODICALLY_TRIGGER_DELAY,
-                CoordinatorTrigger.INIT_PERIODICALLY_TRIGGER_INTERVAL, TimeUnit.MILLISECONDS);
-        
-        Runtime.getRuntime().addShutdownHook(new Thread(new CoordinatorShutdownHook(scheduleSrv)));
-        LOG.info("Eagle Coordinator started ...");
+    	ExclusiveExecutor.execute(POLICY_SCHEDULER_ZK_PATH, new ExclusiveExecutor.Runnable()
{
+			
+			@Override
+			public void run() throws Exception {
+		        Config config = ConfigFactory.load().getConfig(COORDINATOR);
+		        // build dynamic policy loader
+		        String host = config.getString(METADATA_SERVICE_HOST);
+		        int port = config.getInt(METADATA_SERVICE_PORT);
+		        String context = config.getString(METADATA_SERVICE_CONTEXT);
+		        IMetadataServiceClient client = new MetadataServiceClientImpl(host, port, context);
+		        DynamicPolicyLoader loader = new DynamicPolicyLoader(client);
+		        loader.addPolicyChangeListener(new PolicyChangeHandler(config, client));
+
+		        // schedule dynamic policy loader
+		        long initDelayMillis = config.getLong(DYNAMIC_POLICY_LOADER_INIT_MILLS);
+		        long delayMillis = config.getLong(DYNAMIC_POLICY_LOADER_DELAY_MILLS);
+		        ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(2, new
ThreadFactory() {
+		            @Override
+		            public Thread newThread(Runnable r) {
+		                Thread t = new Thread(r);
+		                t.setDaemon(true);
+		                return t;
+		            }
+		        });
+		        scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, delayMillis, TimeUnit.MILLISECONDS);
+		        
+		        // 
+		        scheduleSrv.scheduleAtFixedRate(new CoordinatorTrigger(config, client), CoordinatorTrigger.INIT_PERIODICALLY_TRIGGER_DELAY,
+		                CoordinatorTrigger.INIT_PERIODICALLY_TRIGGER_INTERVAL, TimeUnit.MILLISECONDS);
+		        
+		        Runtime.getRuntime().addShutdownHook(new Thread(new CoordinatorShutdownHook(scheduleSrv)));
+		        LOG.info("Eagle Coordinator started ...");
+                
+                Thread.currentThread().join();
+			}
+			
+		});
     }
 
     public void enforcePeriodicallyBuild() {
@@ -250,4 +294,10 @@ public class Coordinator {
     public static boolean isPeriodicallyForceBuildEnable() {
         return forcePeriodicallyBuild.get();
     }
+    
+    public static class ScheduleZkState {
+    	volatile boolean scheduleAcquired = false;
+        volatile boolean scheduleCompleted = false;
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d74c186f/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java
index 1534a6b..c9bda16 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java
@@ -29,16 +29,18 @@ import org.slf4j.LoggerFactory;
 public class CoordinatorListener implements ServletContextListener {
     
     private static final Logger LOG = LoggerFactory.getLogger(CoordinatorListener.class);
-
+    
+    public CoordinatorListener() {
+    }
+    
     @Override
     public void contextInitialized(ServletContextEvent sce) {
-        LOG.info("start coordinator background tasks..");
+    	LOG.info("start coordinator background tasks..");
         Coordinator.startSchedule();
     }
-
+    
     @Override
     public void contextDestroyed(ServletContextEvent sce) {
-
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d74c186f/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
new file mode 100644
index 0000000..96e6fce
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.eagle.alert.config.ZKConfig;
+import org.apache.eagle.alert.config.ZKConfigBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+public class ExclusiveExecutor {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ExclusiveExecutor.class);
+
+	// private static final String PATH = "/alert/listener/leader";
+	private static final String COORDINATOR = "coordinator";
+	private static final int ZK_RETRYPOLICY_SLEEP_TIME_MS = 1000;
+	private static final int ZK_RETRYPOLICY_MAX_RETRIES = 3;
+
+	private static final CuratorFramework client;
+
+	static {
+		Config config = ConfigFactory.load().getConfig(COORDINATOR);
+		RetryPolicy retryPolicy = new ExponentialBackoffRetry(ZK_RETRYPOLICY_SLEEP_TIME_MS, ZK_RETRYPOLICY_MAX_RETRIES);
+		ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
+		client = CuratorFrameworkFactory.newClient(zkConfig.zkQuorum, retryPolicy);
+		client.start();
+	}
+
+	public static abstract class Runnable {
+
+		boolean completed = false;
+		LeaderSelector selector;
+
+		public abstract void run() throws Exception;
+
+		public void registerResources(LeaderSelector selector) {
+			this.selector = selector;
+		}
+
+		public void runElegantly() throws Exception {
+			this.run();
+
+			LOG.info("Close selector resources {}", this.selector);
+			CloseableUtils.closeQuietly(this.selector);
+
+			completed = true;
+		}
+
+		public boolean isCompleted() {
+			return completed;
+		}
+
+	}
+
+	public static void execute(String path, final Runnable runnable) {
+		LeaderSelectorListener listener = new LeaderSelectorListenerAdapter() {
+
+			@Override
+			public void takeLeadership(CuratorFramework client) throws Exception {
+				// this callback will get called when you are the leader
+				// do whatever leader work you need to and only exit
+				// this method when you want to relinquish leadership
+				LOG.info("this is leader node right now..");
+				runnable.runElegantly();
+			}
+
+			@Override
+			public void stateChanged(CuratorFramework client, ConnectionState newState) {
+				LOG.info(String.format("leader selector state change listener, new state: %s", newState.toString()));
+			}
+
+		};
+
+		LeaderSelector selector = new LeaderSelector(client, path, listener);
+		selector.autoRequeue(); // not required, but this is behavior that you
+								// will probably expect
+		selector.start();
+
+		runnable.registerResources(selector);
+
+		Runtime.getRuntime().addShutdownHook(new Thread(new java.lang.Runnable() {
+
+			@Override
+			public void run() {
+				LOG.info("Close zk client resources {}", ExclusiveExecutor.client);
+				CloseableUtils.closeQuietly(ExclusiveExecutor.client);
+			}
+
+		}));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d74c186f/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java
new file mode 100644
index 0000000..69e799f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+
+public class LockWebApplicationException extends WebApplicationException {
+
+	private static final long serialVersionUID = 3441072187262776401L;
+
+	public LockWebApplicationException() {
+		super(Response.Status.INTERNAL_SERVER_ERROR);
+	}
+
+	public LockWebApplicationException(String message) {
+		super(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(message).type("text/plain").build());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d74c186f/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java
new file mode 100644
index 0000000..04b760a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.alert.coordinator;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.eagle.alert.coordinator.ExclusiveExecutor;
+import org.apache.eagle.alert.utils.ZookeeperEmbedded;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+
+@Ignore
+public class TestExclusiveExecutor {
+
+	ZookeeperEmbedded zkEmbed;
+
+	@Before
+	public void setUp() throws Exception {
+		zkEmbed = new ZookeeperEmbedded(2181);
+		zkEmbed.start();
+
+		Thread.sleep(2000);
+	}
+
+	@After
+	public void tearDown() throws Exception {
+		zkEmbed.shutdown();
+	}
+
+	@Test
+	public void testConcurrency() throws Exception {
+		ByteArrayOutputStream newStreamOutput = new ByteArrayOutputStream();
+		PrintStream newStream = new PrintStream(newStreamOutput);
+		PrintStream oldStream = System.out;
+
+		System.setOut(newStream);
+
+		ExclusiveExecutor.Runnable runnableOne = new ExclusiveExecutor.Runnable() {
+
+			@Override
+			public void run() throws Exception {
+				System.out.println("this is thread one");
+			}
+
+		};
+
+		new Thread(new Runnable() {
+
+			@Override
+			public void run() {
+				ExclusiveExecutor.execute("/alert/test/leader", runnableOne);
+			}
+
+		}).start();
+
+		ExclusiveExecutor.Runnable runnableTwo = new ExclusiveExecutor.Runnable() {
+
+			@Override
+			public void run() throws Exception {
+				System.out.println("this is thread two");
+			}
+
+		};
+		new Thread(new Runnable() {
+
+			@Override
+			public void run() {
+				ExclusiveExecutor.execute("/alert/test/leader", runnableTwo);
+			}
+
+		}).start();
+
+		Thread.sleep(2000);
+
+		System.out.flush();
+		BufferedReader br = new BufferedReader(new StringReader(newStreamOutput.toString()));
+		List<String> logs = new ArrayList<String>();
+		String line = null;
+		while ((line = br.readLine()) != null) {
+			logs.add(line);
+		}
+
+		System.setOut(oldStream);
+		System.out.println("Cached logs: " + Joiner.on("\n").join(logs));
+
+		Assert.assertTrue(logs.stream().anyMatch((log) -> log.contains("this is thread one")));
+		Assert.assertTrue(logs.stream().anyMatch((log) -> log.contains("this is thread two")));
+
+		Assert.assertTrue(runnableOne.isCompleted());
+		Assert.assertTrue(runnableTwo.isCompleted());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d74c186f/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/test/java/com/apache/eagle/service/app/TestServiceAppWithZk.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/test/java/com/apache/eagle/service/app/TestServiceAppWithZk.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/test/java/com/apache/eagle/service/app/TestServiceAppWithZk.java
new file mode 100644
index 0000000..f312e19
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/test/java/com/apache/eagle/service/app/TestServiceAppWithZk.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.eagle.service.app;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition.Definition;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import org.apache.eagle.alert.utils.ZookeeperEmbedded;
+import org.apache.eagle.service.app.ServiceApp;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * 
+ * @author xiancli
+ *
+ */
+public class TestServiceAppWithZk {
+
+	ZookeeperEmbedded zkEmbed;
+
+	PrintStream oldStream;
+	PrintStream newStream;
+	ByteArrayOutputStream newStreamOutput;
+
+	@Before
+	public void setUp() throws Exception {
+		// Create a stream to hold the output
+		newStreamOutput = new ByteArrayOutputStream();
+		newStream = new PrintStream(newStreamOutput);
+		// IMPORTANT: Save the old System.out!
+		oldStream = System.out;
+		// Tell Java to use your special stream
+		System.setOut(newStream);
+
+		zkEmbed = new ZookeeperEmbedded(2181);
+		zkEmbed.start();
+
+		Thread.sleep(2000);
+
+		new ServiceApp().run(new String[] { "server" });
+	}
+
+	@After
+	public void tearDown() throws Exception {
+		zkEmbed.shutdown();
+	}
+
+	@Test
+	public void testMain() throws Exception {
+		try {
+			Thread.sleep(15000);
+		} catch (InterruptedException e1) {
+		}
+
+		// Put things back
+		System.out.flush();
+		System.setOut(oldStream);
+
+		BufferedReader br = new BufferedReader(new StringReader(newStreamOutput.toString()));
+		List<String> logs = new ArrayList<String>();
+		String line = null;
+		while ((line = br.readLine()) != null) {
+			logs.add(line);
+		}
+
+		System.out.println(Joiner.on("\n").join(logs));
+
+		Assert.assertTrue(logs.stream().anyMatch((log) -> log.contains("this is leader node
right now..")));
+		Assert.assertTrue(logs.stream().anyMatch((log) -> log.contains("start coordinator background
tasks..")));
+
+		Config config = ConfigFactory.load().getConfig("coordinator");
+		// build dynamic policy loader
+		String host = config.getString("metadataService.host");
+		int port = config.getInt("metadataService.port");
+		String context = config.getString("metadataService.context");
+		IMetadataServiceClient client = new MetadataServiceClientImpl(host, port, context);
+
+		List<PolicyDefinition> policies = client.listPolicies();
+
+		Assert.assertEquals(0, policies.size());
+
+		PolicyDefinition def = new PolicyDefinition();
+		def.setName("test-policy-1");
+		def.setInputStreams(Arrays.asList("testStreamDef"));
+		def.setOutputStreams(Arrays.asList("test-datasource-1"));
+		def.setParallelismHint(5);
+		def.setDefinition(new Definition());
+		client.addPolicy(def);
+
+		policies = client.listPolicies();
+
+		Assert.assertEquals(1, policies.size());
+
+		try {
+			client.close();
+		} catch (IOException e) {
+		}
+	}
+
+}


Mime
View raw message