kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qhz...@apache.org
Subject incubator-kylin git commit: helix-based job engine
Date Thu, 19 Nov 2015 06:06:12 GMT
Repository: incubator-kylin
Updated Branches:
  refs/heads/helix-latest [created] be19bec6f


helix-based job engine


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/be19bec6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/be19bec6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/be19bec6

Branch: refs/heads/helix-latest
Commit: be19bec6f334697679d99730ed4dc65a8f845f76
Parents: ccf0207
Author: qianhao.zhou <qianzhou@ebay.com>
Authored: Mon Nov 9 15:02:19 2015 +0800
Committer: qianhao.zhou <qianzhou@ebay.com>
Committed: Thu Nov 19 10:26:00 2015 +0800

----------------------------------------------------------------------
 .../engine/spark/BuildCubeWithSparkTest.java    |   2 +-
 .../kylin/job/BuildCubeWithEngineTest.java      |   2 +-
 .../apache/kylin/job/BuildIIWithEngineTest.java |   2 +-
 .../org/apache/kylin/common/KylinConfig.java    |  14 +-
 .../apache/kylin/common/util/HostnameUtils.java |  53 ++++++++
 .../job/impl/threadpool/DefaultScheduler.java   |  54 ++++----
 .../job/impl/threadpool/BaseSchedulerTest.java  |   2 +-
 pom.xml                                         |  12 ++
 server/pom.xml                                  |  22 +++-
 .../kylin/rest/controller/JobController.java    |  68 +++++-----
 .../kylin/rest/controller/QueryController.java  |  11 +-
 .../kylin/rest/helix/HelixJobEngineAdmin.java   | 127 +++++++++++++++++++
 .../rest/helix/JobControllerConnector.java      |  79 ++++++++++++
 .../rest/helix/JobControllerConstants.java      |  37 ++++++
 .../helix/v1/AbstractJobEngineStateModelV1.java |  46 +++++++
 .../helix/v1/DefaultJobEngineStateModelV1.java  |  76 +++++++++++
 .../rest/helix/v1/DefaultStateModelFactory.java |  40 ++++++
 .../helix/v1/EmptyJobEngineStateModelV1.java    |  51 ++++++++
 .../kylin/rest/helix/v1/JobEngineSMDV1.java     |  69 ++++++++++
 .../apache/kylin/rest/service/CubeService.java  |  12 +-
 .../rest/helix/JobControllerConnectorTest.java  | 121 ++++++++++++++++++
 .../helix/v1/TestJobEngineStateModelV1.java     |  74 +++++++++++
 .../rest/helix/v1/TestStateModelFactory.java    |  38 ++++++
 23 files changed, 931 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java b/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java
index 755b1ac..2846c23 100644
--- a/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java
+++ b/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java
@@ -97,7 +97,7 @@ public class BuildCubeWithSparkTest {
         for (String jobId : jobService.getAllJobIds()) {
             jobService.deleteJob(jobId);
         }
-        scheduler = DefaultScheduler.getInstance();
+        scheduler = DefaultScheduler.createInstance();
         scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock());
         if (!scheduler.hasStarted()) {
             throw new RuntimeException("scheduler has not been started");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index 75a6c54..4aaaca6 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -112,7 +112,7 @@ public class BuildCubeWithEngineTest {
 
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         jobService = ExecutableManager.getInstance(kylinConfig);
-        scheduler = DefaultScheduler.getInstance();
+        scheduler = DefaultScheduler.createInstance();
         scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
         if (!scheduler.hasStarted()) {
             throw new RuntimeException("scheduler has not been started");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
index 0158fad..ef8cf42 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
@@ -109,7 +109,7 @@ public class BuildIIWithEngineTest {
 
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         jobService = ExecutableManager.getInstance(kylinConfig);
-        scheduler = DefaultScheduler.getInstance();
+        scheduler = DefaultScheduler.createInstance();
         scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
         if (!scheduler.hasStarted()) {
             throw new RuntimeException("scheduler has not been started");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index e76aa94..f2ec347 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -145,6 +145,8 @@ public class KylinConfig implements Serializable {
     public static final String VERSION = "${project.version}";
 
     public static final String HTABLE_DEFAULT_COMPRESSION_CODEC = "kylin.hbase.default.compression.codec";
+    
+    public static final String ZOOKEEPER_ADDRESS = "kylin.zookeeper.address";
 
     // static cached instances
     private static KylinConfig ENV_INSTANCE = null;
@@ -454,10 +456,6 @@ public class KylinConfig implements Serializable {
         return Long.parseLong(getOptional("kylin.job.step.timeout", String.valueOf(2 * 60 * 60)));
     }
 
-    public String getServerMode() {
-        return this.getOptional("kylin.server.mode", "all");
-    }
-
     public int getDictionaryMaxCardinality() {
         return Integer.parseInt(getOptional("kylin.dictionary.max.cardinality", "5000000"));
     }
@@ -684,6 +682,10 @@ public class KylinConfig implements Serializable {
     public void setStorageUrl(String storageUrl) {
         properties.setProperty(KYLIN_STORAGE_URL, storageUrl);
     }
+    
+    public String getZookeeperAddress() {
+        return properties.getProperty(ZOOKEEPER_ADDRESS);
+    }
 
     public String getHiveDatabaseForIntermediateTable() {
         return this.getOptional(HIVE_DATABASE_FOR_INTERMEDIATE_TABLE, "default");
@@ -756,5 +758,9 @@ public class KylinConfig implements Serializable {
     public String toString() {
         return getMetadataUrl();
     }
+    
+    public String getHelixClusterName() {
+        return getMetadataUrlPrefix();
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/core-common/src/main/java/org/apache/kylin/common/util/HostnameUtils.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HostnameUtils.java b/core-common/src/main/java/org/apache/kylin/common/util/HostnameUtils.java
new file mode 100644
index 0000000..78c3bf5
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HostnameUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.common.util;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ */
+public final class HostnameUtils {
+
+    private HostnameUtils() {
+    }
+
+    public static String getHostname() {
+        try {
+            String result = InetAddress.getLocalHost().getHostName();
+            if (StringUtils.isNotEmpty(result)) {
+                return result;
+            }
+        } catch (UnknownHostException e) {
+            // failed;  try alternate means.
+        }
+        // try environment properties.
+        String host = System.getenv("COMPUTERNAME");
+        if (host != null) {
+            return host;
+        }
+        host = System.getenv("HOSTNAME");
+        if (host != null) {
+            return host;
+        }
+        // undetermined.
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 417e279..d94ceae 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -18,17 +18,7 @@
 
 package org.apache.kylin.job.impl.threadpool;
 
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
+import com.google.common.collect.Maps;
 import org.apache.kylin.job.Scheduler;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
@@ -43,11 +33,12 @@ import org.apache.kylin.job.manager.ExecutableManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
+import java.util.Map;
+import java.util.concurrent.*;
 
 /**
  */
-public class DefaultScheduler implements Scheduler<AbstractExecutable>, ConnectionStateListener {
+public class DefaultScheduler implements Scheduler<AbstractExecutable> {
 
     private ExecutableManager executableManager;
     private FetcherRunner fetcher;
@@ -55,12 +46,12 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
     private ExecutorService jobPool;
     private DefaultContext context;
 
-    private Logger logger = LoggerFactory.getLogger(DefaultScheduler.class);
+    private static Logger logger = LoggerFactory.getLogger(DefaultScheduler.class);
     private volatile boolean initialized = false;
     private volatile boolean hasStarted = false;
     private JobEngineConfig jobEngineConfig;
 
-    private static final DefaultScheduler INSTANCE = new DefaultScheduler();
+    private static DefaultScheduler INSTANCE;
 
     private DefaultScheduler() {
     }
@@ -134,21 +125,25 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
         }
     }
 
-    public static DefaultScheduler getInstance() {
+    public synchronized static DefaultScheduler createInstance() {
+        destroyInstance();
+        INSTANCE = new DefaultScheduler();
         return INSTANCE;
     }
-
-    @Override
-    public void stateChanged(CuratorFramework client, ConnectionState newState) {
-        if ((newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST)) {
+    
+    public synchronized static void destroyInstance() {
+        DefaultScheduler tmp = INSTANCE;
+        INSTANCE = null;
+        if (tmp != null) {
             try {
-                shutdown();
+                tmp.shutdown();
             } catch (SchedulerException e) {
-                throw new RuntimeException("failed to shutdown scheduler", e);
+                logger.error("error stop DefaultScheduler", e);
+                throw new RuntimeException(e);
             }
         }
     }
-
+    
     @Override
     public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException {
         if (!initialized) {
@@ -159,10 +154,6 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
 
         this.jobEngineConfig = jobEngineConfig;
 
-        if (jobLock.lock() == false) {
-            throw new IllegalStateException("Cannot start job scheduler due to lack of job lock");
-        }
-
         executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig());
         //load all executable, set them to a consistent status
         fetcherPool = Executors.newScheduledThreadPool(1);
@@ -182,7 +173,6 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
                 logger.debug("Closing zk connection");
                 try {
                     shutdown();
-                    jobLock.unlock();
                 } catch (SchedulerException e) {
                     logger.error("error shutdown scheduler", e);
                 }
@@ -196,8 +186,12 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
 
     @Override
     public void shutdown() throws SchedulerException {
-        fetcherPool.shutdown();
-        jobPool.shutdown();
+        if (fetcherPool != null) {
+            fetcherPool.shutdown();
+        }
+        if (jobPool != null) {
+            jobPool.shutdown();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index ecac973..4e092a1 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -45,7 +45,7 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase {
         createTestMetadata();
         setFinalStatic(ExecutableConstants.class.getField("DEFAULT_SCHEDULER_INTERVAL_SECONDS"), 10);
         jobService = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-        scheduler = DefaultScheduler.getInstance();
+        scheduler = DefaultScheduler.createInstance();
         scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()), new MockJobLock());
         if (!scheduler.hasStarted()) {
             throw new RuntimeException("scheduler has not been started");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3c696f7..5be477a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,6 +109,7 @@
             org/apache/kylin/**/tools/**:**/*CLI.java
         </sonar.jacoco.excludes>
 
+        <helix.version>0.6.5</helix.version>
     </properties>
 
     <licenses>
@@ -460,6 +461,17 @@
                 <artifactId>curator-recipes</artifactId>
                 <version>${curator.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.helix</groupId>
+                <artifactId>helix-core</artifactId>
+                <version>${helix.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.helix</groupId>
+                <artifactId>helix-examples</artifactId>
+                <version>${helix.version}</version>
+            </dependency>
+
 
         </dependencies>
     </dependencyManagement>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index f2f9e32..59d6747 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -222,7 +222,27 @@
             <groupId>com.h2database</groupId>
             <artifactId>h2</artifactId>
         </dependency>
-		
+        <dependency>
+            <groupId>org.apache.helix</groupId>
+            <artifactId>helix-core</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.101tec</groupId>
+                    <artifactId>zkclient</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.github.sgroschupf</groupId>
+                    <artifactId>zkclient</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.101tec</groupId>
+            <artifactId>zkclient</artifactId>
+            <version>0.5</version>
+        </dependency>
+
+
         <!-- spring aop -->
         <dependency>
             <groupId>org.aspectj</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
index f6323ed..73c7bd4 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -18,22 +18,17 @@
 
 package org.apache.kylin.rest.controller;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-
+import com.google.common.base.Preconditions;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HostnameUtils;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.constant.JobStatusEnum;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
 import org.apache.kylin.job.lock.JobLock;
-import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.rest.helix.HelixJobEngineAdmin;
+import org.apache.kylin.rest.helix.JobControllerConnector;
+import org.apache.kylin.rest.helix.JobControllerConstants;
+import org.apache.kylin.rest.helix.v1.DefaultStateModelFactory;
 import org.apache.kylin.rest.request.JobListRequest;
 import org.apache.kylin.rest.service.JobService;
 import org.slf4j.Logger;
@@ -46,6 +41,11 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.ResponseBody;
 
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.kylin.rest.helix.JobControllerConstants.RESOURCE_NAME;
+
 /**
  * @author ysong1
  * @author Jack
@@ -61,7 +61,7 @@ public class JobController extends BasicController implements InitializingBean {
 
     @Autowired
     private JobLock jobLock;
-
+    
     /*
      * (non-Javadoc)
      * 
@@ -76,27 +76,29 @@ public class JobController extends BasicController implements InitializingBean {
         TimeZone.setDefault(tzone);
 
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        String serverMode = kylinConfig.getServerMode();
-
-        if (Constant.SERVER_MODE_JOB.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase())) {
-            logger.info("Initializing Job Engine ....");
-
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        DefaultScheduler scheduler = DefaultScheduler.getInstance();
-                        scheduler.init(new JobEngineConfig(kylinConfig), jobLock);
-                        while (!scheduler.hasStarted()) {
-                            logger.error("scheduler has not been started");
-                            Thread.sleep(1000);
-                        }
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
-                }
-            }).start();
-        }
+
+        final String hostname = Preconditions.checkNotNull(HostnameUtils.getHostname());
+        final String port = JobControllerConstants.PARTICIPANT_PORT + "";
+        final String instanceName = JobControllerConstants.INSTANCE_NAME;
+        final String zkAddress = Preconditions.checkNotNull(kylinConfig.getZookeeperAddress());
+        HelixJobEngineAdmin helixJobEngineAdmin = HelixJobEngineAdmin.getInstance(zkAddress);
+        helixJobEngineAdmin.initV1(kylinConfig.getHelixClusterName(), JobControllerConstants.RESOURCE_NAME);
+        helixJobEngineAdmin.startControllers(kylinConfig.getHelixClusterName());
+        final JobControllerConnector jcc = new JobControllerConnector(hostname,
+                port,
+                zkAddress,
+                kylinConfig.getHelixClusterName(),
+                new DefaultStateModelFactory(instanceName, kylinConfig));
+        jcc.register();
+        helixJobEngineAdmin.rebalance(kylinConfig.getHelixClusterName(), RESOURCE_NAME);
+        jcc.start();
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                jcc.stop();
+            }
+        }));
+
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
index f60894e..7115bea 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
@@ -67,7 +67,7 @@ import com.google.common.base.Preconditions;
 
 /**
  * Handle query requests.
- * 
+ *
  * @author xduo
  */
 @Controller
@@ -169,16 +169,15 @@ public class QueryController extends BasicController {
     private SQLResponse doQueryWithCache(SQLRequest sqlRequest) {
         try {
             BackdoorToggles.setToggles(sqlRequest.getBackdoorToggles());
-
             String sql = sqlRequest.getSql();
             String project = sqlRequest.getProject();
             logger.info("Using project: " + project);
             logger.info("The original query:  " + sql);
 
-            String serverMode = KylinConfig.getInstanceFromEnv().getServerMode();
-            if (!(Constant.SERVER_MODE_QUERY.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase()))) {
-                throw new InternalErrorException("Query is not allowed in " + serverMode + " mode.");
-            }
+//        String serverMode = KylinConfig.getInstanceFromEnv().getServerMode();
+//        if (!(Constant.SERVER_MODE_QUERY.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase()))) {
+//            throw new InternalErrorException("Query is not allowed in " + serverMode + " mode.");
+//        }
 
             if (!sql.toLowerCase().contains("select")) {
                 logger.debug("Directly return exception as not supported");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/helix/HelixJobEngineAdmin.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixJobEngineAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixJobEngineAdmin.java
new file mode 100644
index 0000000..938ed5d
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixJobEngineAdmin.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.kylin.rest.helix.v1.JobEngineSMDV1;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentMap;
+
+
+/**
+ */
+public class HelixJobEngineAdmin {
+    
+    private static ConcurrentMap<String, HelixJobEngineAdmin> instanceMaps = Maps.newConcurrentMap();
+    
+    public static HelixJobEngineAdmin getInstance(String zkAddress) {
+        Preconditions.checkNotNull(zkAddress);
+        instanceMaps.putIfAbsent(zkAddress, new HelixJobEngineAdmin(zkAddress));
+        return instanceMaps.get(zkAddress);
+    }
+
+    private static final Logger logger = LoggerFactory.getLogger(HelixJobEngineAdmin.class);
+    private final String zkAddress;
+    private final ZKHelixAdmin admin;
+
+    private HelixJobEngineAdmin(String zkAddress) {
+        this.zkAddress = zkAddress;
+        this.admin = new ZKHelixAdmin(zkAddress);
+    }
+
+    public void initV1(String clusterName, String resourceName) {
+        final StateModelDefinition jobEngineSMDV1 = JobEngineSMDV1.getJobEngineStateModelDefinitionV1();
+        admin.addCluster(clusterName, false);
+        if (admin.getStateModelDef(clusterName, jobEngineSMDV1.getId()) == null) {
+            admin.addStateModelDef(clusterName, jobEngineSMDV1.getId(), jobEngineSMDV1);
+        }
+        if (!admin.getResourcesInCluster(clusterName).contains(resourceName)) {
+            admin.addResource(clusterName, resourceName, 1, jobEngineSMDV1.getId(), "AUTO");
+        }
+    }
+
+    public void startControllers(String clusterName) {
+        HelixControllerMain.startHelixController(zkAddress, clusterName, "localhost_" + JobControllerConstants.CONTROLLER_PORT,
+                HelixControllerMain.STANDALONE);
+    }
+
+    public String getInstanceState(String clusterName, String resourceName, String instanceName) {
+        final ExternalView resourceExternalView = admin.getResourceExternalView(clusterName, resourceName);
+        if (resourceExternalView == null) {
+            logger.warn("fail to get ExternalView, clusterName:" + clusterName + " resourceName:" + resourceName);
+            return "ERROR";
+        }
+        final Set<String> partitionSet = resourceExternalView.getPartitionSet();
+        Preconditions.checkArgument(partitionSet.size() == 1);
+        final Map<String, String> stateMap = resourceExternalView.getStateMap(partitionSet.iterator().next());
+        if (stateMap.containsKey(instanceName)) {
+            return stateMap.get(instanceName); 
+        } else {
+            logger.warn("fail to get state, clusterName:" + clusterName + " resourceName:" + resourceName + " instance:" + instanceName);
+            return "ERROR";
+        }
+    }
+
+
+    public String collectStateInfo(String msg, String clusterName, String resourceName) {
+        StringBuilder sb = new StringBuilder("");
+        sb.append("CLUSTER STATE: ").append(msg).append("\n");
+        ExternalView resourceExternalView = admin.getResourceExternalView(clusterName, resourceName);
+        if (resourceExternalView == null) {
+            sb.append("no participant joined yet").append("\n");
+            return sb.toString();
+        }
+        final List<String> instancesInCluster = admin.getInstancesInCluster(clusterName);
+        TreeSet<String> sortedSet = new TreeSet<String>(resourceExternalView.getPartitionSet());
+        sb.append("\t\t");
+        for (String instance : instancesInCluster) {
+            sb.append(instance).append("\t");
+        }
+        sb.append("\n");
+        for (String partitionName : sortedSet) {
+            sb.append(partitionName).append("\t");
+            for (String instance : instancesInCluster) {
+                Map<String, String> stateMap = resourceExternalView.getStateMap(partitionName);
+                if (stateMap != null && stateMap.containsKey(instance)) {
+                    sb.append(stateMap.get(instance)).append(
+                            "\t\t");
+                } else {
+                    sb.append("-").append("\t\t");
+                }
+            }
+            sb.append("\n");
+        }
+        sb.append("###################################################################").append("\n");
+        return sb.toString();
+    }
+
+    public void rebalance(String clusterName, String resourceName) {
+        final List<String> instancesInCluster = admin.getInstancesInCluster(clusterName);
+        admin.rebalance(clusterName, resourceName, instancesInCluster.size(), instancesInCluster);
+        logger.info("cluster:" + clusterName + " ideal state:" + admin.getResourceIdealState(clusterName, resourceName));
+        logger.info("cluster:" + clusterName + " instances:" + instancesInCluster);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConnector.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConnector.java b/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConnector.java
new file mode 100644
index 0000000..19854a9
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConnector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.kylin.rest.helix.v1.JobEngineSMDV1;
+
+
+/**
+ */
+public class JobControllerConnector {
+
+    private final String instanceName;
+    private final HelixAdmin admin;
+    private final StateModelFactory<StateModel> stateModelFactory;
+    private final String zkAddress;
+    private final String hostname;
+    private final String port;
+    private final String clusterName;
+
+    private HelixManager manager;
+
+    public JobControllerConnector(String hostname, String port, String zkAddress, String clusterName, StateModelFactory<StateModel> stateModelFactory) {
+        this.instanceName = hostname + "_" + port;
+        this.hostname = hostname;
+        this.port = port;
+        this.zkAddress = zkAddress;
+        this.admin = new ZKHelixAdmin(zkAddress);
+        this.stateModelFactory = stateModelFactory;
+        this.clusterName = clusterName;
+    }
+    
+    public void register() {
+        if (!admin.getInstancesInCluster(clusterName).contains(instanceName)) {
+            InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+            instanceConfig.setHostName(hostname);
+            instanceConfig.setPort(port);
+            admin.addInstance(clusterName, instanceConfig);
+        }
+    }
+
+
+    public void start() throws Exception {
+        this.manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
+                InstanceType.PARTICIPANT, zkAddress);
+        StateMachineEngine stateMach = manager.getStateMachineEngine();
+        stateMach.registerStateModelFactory(JobEngineSMDV1.STATE_MODEL_NAME, stateModelFactory);
+        manager.connect();
+    }
+
+    public void stop() {
+        if (manager != null) {
+            manager.disconnect();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConstants.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConstants.java b/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConstants.java
new file mode 100644
index 0000000..db8d75a
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConstants.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+import org.apache.kylin.common.util.HostnameUtils;
+
+/**
+ */
+public final class JobControllerConstants {
+
+    private JobControllerConstants() {
+    }
+
+    public static final String RESOURCE_NAME = "job_engine_v1";
+    
+    
+    public static final int PARTICIPANT_PORT = 17077;
+    public static final int CONTROLLER_PORT = 17078;
+    
+    public static final String INSTANCE_NAME = HostnameUtils.getHostname() + "_" + JobControllerConstants.PARTICIPANT_PORT;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/helix/v1/AbstractJobEngineStateModelV1.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/v1/AbstractJobEngineStateModelV1.java b/server/src/main/java/org/apache/kylin/rest/helix/v1/AbstractJobEngineStateModelV1.java
new file mode 100644
index 0000000..0e171c1
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/v1/AbstractJobEngineStateModelV1.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.kylin.rest.controller.JobController;
+
+/**
+ */
+abstract class AbstractJobEngineStateModelV1 extends StateModel {
+
+    private final String instanceName;
+
+    public AbstractJobEngineStateModelV1(String instanceName) {
+        this.instanceName = instanceName;
+    }
+
+    public final String getInstanceName() {
+        return instanceName;
+    }
+
+    public abstract void onBecomeLeaderFromStandby(Message message, NotificationContext context);
+
+    public abstract void onBecomeStandbyFromLeader(Message message, NotificationContext context);
+
+    public abstract void onBecomeOfflineFromStandby(Message message, NotificationContext context);
+
+    public abstract void onBecomeStandbyFromOffline(Message message, NotificationContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultJobEngineStateModelV1.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultJobEngineStateModelV1.java b/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultJobEngineStateModelV1.java
new file mode 100644
index 0000000..13a2a08
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultJobEngineStateModelV1.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.SchedulerException;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.lock.MockJobLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class DefaultJobEngineStateModelV1 extends AbstractJobEngineStateModelV1 {
+
+    private static final Logger logger = LoggerFactory.getLogger(DefaultJobEngineStateModelV1.class);
+    private KylinConfig kylinConfig;
+
+
+    public DefaultJobEngineStateModelV1(String instanceName, KylinConfig kylinConfig) {
+        super(instanceName);
+        this.kylinConfig = kylinConfig;
+    }
+
+    @Override
+    public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+        logger.info(getInstanceName() + " onBecomeLeaderFromStandby");
+        try {
+            DefaultScheduler scheduler = DefaultScheduler.createInstance();
+            scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock());
+            while (!scheduler.hasStarted()) {
+                logger.error("scheduler has not been started");
+                Thread.sleep(1000);
+            }
+        } catch (Exception e) {
+            logger.error("error start DefaultScheduler", e);
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    @Override
+    public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+        logger.info(getInstanceName() + " onBecomeStandbyFromLeader");
+        DefaultScheduler.destroyInstance();
+    }
+
+    @Override
+    public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+        logger.info(getInstanceName() + " onBecomeOfflineFromStandby");
+    }
+
+    @Override
+    public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+        logger.info(getInstanceName() + " onBecomeStandbyFromOffline");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultStateModelFactory.java
new file mode 100644
index 0000000..fe27263
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultStateModelFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.kylin.common.KylinConfig;
+
+/**
+ */
+public class DefaultStateModelFactory extends StateModelFactory<StateModel> {
+    
+    private final String instanceName;
+    private final KylinConfig kylinConfig;
+
+    public DefaultStateModelFactory(String instanceName, KylinConfig kylinConfig) {
+        this.instanceName = instanceName;
+        this.kylinConfig = kylinConfig;
+    }
+
+    @Override
+    public StateModel createNewStateModel(String resourceName, String stateUnitKey) {
+        return new DefaultJobEngineStateModelV1(instanceName, kylinConfig);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/helix/v1/EmptyJobEngineStateModelV1.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/v1/EmptyJobEngineStateModelV1.java b/server/src/main/java/org/apache/kylin/rest/helix/v1/EmptyJobEngineStateModelV1.java
new file mode 100644
index 0000000..22e9b2c
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/v1/EmptyJobEngineStateModelV1.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+
+/**
+ */
+public final class EmptyJobEngineStateModelV1 extends AbstractJobEngineStateModelV1 {
+
+    public EmptyJobEngineStateModelV1(String instanceName) {
+        super(instanceName);
+    }
+
+    @Override
+    public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+        System.out.println(getInstanceName() + " onBecomeLeaderFromStandby");
+    }
+
+    @Override
+    public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+        System.out.println(getInstanceName() + " onBecomeStandbyFromLeader");
+    }
+
+    @Override
+    public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+        System.out.println(getInstanceName() + " onBecomeOfflineFromStandby");
+    }
+
+    @Override
+    public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+        System.out.println(getInstanceName() + " onBecomeStandbyFromOffline");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/helix/v1/JobEngineSMDV1.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/v1/JobEngineSMDV1.java b/server/src/main/java/org/apache/kylin/rest/helix/v1/JobEngineSMDV1.java
new file mode 100644
index 0000000..4ccf9ae
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/v1/JobEngineSMDV1.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.model.StateModelDefinition;
+
+/**
+ */
+public final class JobEngineSMDV1 {
+
+    public static final String STATE_MODEL_NAME = "job_engine_model_v1";
+
+    public enum States {
+        LEADER,
+        STANDBY,
+        OFFLINE
+    }
+    
+    private static class StateModelDefinitionV1Holder {
+        private static StateModelDefinition instance = build();
+        private static StateModelDefinition build() {
+            StateModelDefinition.Builder builder = new StateModelDefinition.Builder(STATE_MODEL_NAME);
+            // init state
+            builder.initialState(States.OFFLINE.name());
+
+            // add states
+            builder.addState(States.LEADER.name(), 0);
+            builder.addState(States.STANDBY.name(), 2);
+            builder.addState(States.OFFLINE.name(), 1);
+            for (HelixDefinedState state : HelixDefinedState.values()) {
+                builder.addState(state.name());
+            }
+
+            // add transitions
+            builder.addTransition(States.LEADER.name(), States.STANDBY.name(), 0);
+            builder.addTransition(States.STANDBY.name(), States.LEADER.name(), 1);
+            builder.addTransition(States.OFFLINE.name(), States.STANDBY.name(), 2);
+            builder.addTransition(States.STANDBY.name(), States.OFFLINE.name(), 3);
+            builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name());
+
+            // bounds
+            builder.upperBound(States.LEADER.name(), 1);
+            builder.dynamicUpperBound(States.STANDBY.name(), "R");
+
+            return builder.build();
+        }
+    }
+    
+    public static StateModelDefinition getJobEngineStateModelDefinitionV1() {
+        return StateModelDefinitionV1Holder.instance;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 6670de1..0c50228 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -55,6 +55,9 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.rest.helix.HelixJobEngineAdmin;
+import org.apache.kylin.rest.helix.JobControllerConstants;
+import org.apache.kylin.rest.helix.v1.JobEngineSMDV1;
 import org.apache.kylin.rest.request.MetricsRequest;
 import org.apache.kylin.rest.response.HBaseResponse;
 import org.apache.kylin.rest.response.MetricsResponse;
@@ -574,9 +577,12 @@ public class CubeService extends BasicService {
     public void updateOnNewSegmentReady(String cubeName) {
         logger.debug("on updateOnNewSegmentReady: " + cubeName);
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        String serverMode = kylinConfig.getServerMode();
-        logger.debug("server mode: " + serverMode);
-        if (Constant.SERVER_MODE_JOB.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase())) {
+        final String instanceState = HelixJobEngineAdmin.getInstance(kylinConfig.getZookeeperAddress()).
+                getInstanceState(kylinConfig.getHelixClusterName(), 
+                        JobControllerConstants.RESOURCE_NAME, 
+                        JobControllerConstants.INSTANCE_NAME);
+        logger.debug("server state: " + instanceState);
+        if (JobEngineSMDV1.States.LEADER.toString().equalsIgnoreCase(instanceState)) {
             keepCubeRetention(cubeName);
             mergeCubeSegment(cubeName);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/test/java/org/apache/kylin/rest/helix/JobControllerConnectorTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/helix/JobControllerConnectorTest.java b/server/src/test/java/org/apache/kylin/rest/helix/JobControllerConnectorTest.java
new file mode 100644
index 0000000..8b11728
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/helix/JobControllerConnectorTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+import com.google.common.collect.Lists;
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+
+import static org.apache.kylin.rest.helix.JobControllerConstants.*;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kylin.rest.helix.v1.TestJobEngineStateModelV1;
+import org.apache.kylin.rest.helix.v1.TestStateModelFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ */
+public class JobControllerConnectorTest {
+
+
+    String zkAddress = "localhost:2199";
+    ZkServer server;
+
+    List<JobControllerConnector> connectors ;
+    HelixJobEngineAdmin helixJobEngineAdmin;
+    
+    private static final String CLUSTER_NAME = "test_cluster";
+
+
+    @Before
+    public void setup() {
+        // start zookeeper on localhost
+        final File tmpDir = new File("/tmp/helix-quickstart");
+        FileUtil.fullyDelete(tmpDir);
+        tmpDir.mkdirs();
+        server = new ZkServer("/tmp/helix-quickstart/dataDir", "/tmp/helix-quickstart/logDir",
+                new IDefaultNameSpace() {
+                    @Override
+                    public void createDefaultNameSpace(ZkClient zkClient) {
+                    }
+                }, 2199);
+        server.start();
+        
+        final ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkAddress);
+        zkHelixAdmin.dropCluster(CLUSTER_NAME);
+        connectors = Lists.newArrayList();
+        helixJobEngineAdmin = HelixJobEngineAdmin.getInstance(zkAddress);
+        helixJobEngineAdmin.initV1(CLUSTER_NAME, RESOURCE_NAME);
+        helixJobEngineAdmin.startControllers(CLUSTER_NAME);
+
+    }
+
+    @Test
+    public void test() throws Exception {
+        JobControllerConnector connector = new JobControllerConnector("localhost", "10000", zkAddress, CLUSTER_NAME, new TestStateModelFactory("localhost", "10000"));
+        connector.register();
+        helixJobEngineAdmin.rebalance(CLUSTER_NAME, RESOURCE_NAME);
+        connector.start();
+        connectors.add(connector);
+        Thread.sleep(1000);
+        System.out.println(helixJobEngineAdmin.collectStateInfo("add 1 nodes", CLUSTER_NAME, RESOURCE_NAME));
+        assertEquals(1, TestJobEngineStateModelV1.getLeaderCount().get());
+        assertEquals(0, TestJobEngineStateModelV1.getStandbyCount().get());
+        assertEquals(0, TestJobEngineStateModelV1.getOfflineCount().get());
+
+        connector = new JobControllerConnector("localhost", "10001", zkAddress, CLUSTER_NAME, new TestStateModelFactory("localhost", "10001"));
+        connector.register();
+        helixJobEngineAdmin.rebalance(CLUSTER_NAME, RESOURCE_NAME);
+        connector.start();
+        connectors.add(connector);
+        Thread.sleep(1000);
+        System.out.println(helixJobEngineAdmin.collectStateInfo("add 2 nodes", CLUSTER_NAME, RESOURCE_NAME));
+        assertEquals(1, TestJobEngineStateModelV1.getLeaderCount().get());
+        assertEquals(1, TestJobEngineStateModelV1.getStandbyCount().get());
+        assertEquals(0, TestJobEngineStateModelV1.getOfflineCount().get());
+
+        
+        connectors.remove(0).stop();
+        TestJobEngineStateModelV1.getLeaderCount().decrementAndGet();
+        Thread.sleep(1000);
+        assertEquals(1, TestJobEngineStateModelV1.getLeaderCount().get());
+        assertEquals(0, TestJobEngineStateModelV1.getStandbyCount().get());
+        assertEquals(0, TestJobEngineStateModelV1.getOfflineCount().get());
+
+        connectors.remove(0).stop();
+        TestJobEngineStateModelV1.getLeaderCount().decrementAndGet();
+        Thread.sleep(1000);
+        assertEquals(0, TestJobEngineStateModelV1.getLeaderCount().get());
+        assertEquals(0, TestJobEngineStateModelV1.getStandbyCount().get());
+        assertEquals(0, TestJobEngineStateModelV1.getOfflineCount().get());
+        
+    }
+
+    @After
+    public void tearDown() {
+        server.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/test/java/org/apache/kylin/rest/helix/v1/TestJobEngineStateModelV1.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/helix/v1/TestJobEngineStateModelV1.java b/server/src/test/java/org/apache/kylin/rest/helix/v1/TestJobEngineStateModelV1.java
new file mode 100644
index 0000000..e8014d9
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/helix/v1/TestJobEngineStateModelV1.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ */
+public class TestJobEngineStateModelV1 extends AbstractJobEngineStateModelV1 {
+    
+    private static AtomicInteger offlineCount = new AtomicInteger(0);
+    private static AtomicInteger standbyCount = new AtomicInteger(0);
+    private static AtomicInteger leaderCount = new AtomicInteger(0);
+    
+    public TestJobEngineStateModelV1(String instanceName) {
+        super(instanceName);
+        offlineCount.incrementAndGet();
+    }
+
+    @Override
+    public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+        leaderCount.incrementAndGet();
+        standbyCount.decrementAndGet();
+    }
+
+    @Override
+    public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+        standbyCount.incrementAndGet();
+        leaderCount.decrementAndGet();
+    }
+
+    @Override
+    public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+        offlineCount.incrementAndGet();
+        standbyCount.decrementAndGet();
+    }
+
+    @Override
+    public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+        standbyCount.incrementAndGet();
+        offlineCount.decrementAndGet();
+    }
+    
+    public static AtomicInteger getStandbyCount() {
+        return standbyCount;
+    }
+
+    public static AtomicInteger getOfflineCount() {
+        return offlineCount;
+    }
+
+    public static AtomicInteger getLeaderCount() {
+        return leaderCount;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/test/java/org/apache/kylin/rest/helix/v1/TestStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/helix/v1/TestStateModelFactory.java b/server/src/test/java/org/apache/kylin/rest/helix/v1/TestStateModelFactory.java
new file mode 100644
index 0000000..ce0820b
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/helix/v1/TestStateModelFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+/**
+ */
+public class TestStateModelFactory extends StateModelFactory<StateModel> {
+
+    private final String instanceName;
+
+    public TestStateModelFactory(String hostname, String port) {
+        this.instanceName = hostname + "_" + port;
+    }
+
+    @Override
+    public StateModel createNewStateModel(String resourceName, String stateUnitKey) {
+        TestJobEngineStateModelV1 stateModel = new TestJobEngineStateModelV1(instanceName);
+        return stateModel;
+    }
+}


Mime
View raw message