Repository: incubator-kylin
Updated Branches:
refs/heads/helix-latest [created] be19bec6f
helix-based job engine
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/be19bec6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/be19bec6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/be19bec6
Branch: refs/heads/helix-latest
Commit: be19bec6f334697679d99730ed4dc65a8f845f76
Parents: ccf0207
Author: qianhao.zhou <qianzhou@ebay.com>
Authored: Mon Nov 9 15:02:19 2015 +0800
Committer: qianhao.zhou <qianzhou@ebay.com>
Committed: Thu Nov 19 10:26:00 2015 +0800
----------------------------------------------------------------------
.../engine/spark/BuildCubeWithSparkTest.java | 2 +-
.../kylin/job/BuildCubeWithEngineTest.java | 2 +-
.../apache/kylin/job/BuildIIWithEngineTest.java | 2 +-
.../org/apache/kylin/common/KylinConfig.java | 14 +-
.../apache/kylin/common/util/HostnameUtils.java | 53 ++++++++
.../job/impl/threadpool/DefaultScheduler.java | 54 ++++----
.../job/impl/threadpool/BaseSchedulerTest.java | 2 +-
pom.xml | 12 ++
server/pom.xml | 22 +++-
.../kylin/rest/controller/JobController.java | 68 +++++-----
.../kylin/rest/controller/QueryController.java | 11 +-
.../kylin/rest/helix/HelixJobEngineAdmin.java | 127 +++++++++++++++++++
.../rest/helix/JobControllerConnector.java | 79 ++++++++++++
.../rest/helix/JobControllerConstants.java | 37 ++++++
.../helix/v1/AbstractJobEngineStateModelV1.java | 46 +++++++
.../helix/v1/DefaultJobEngineStateModelV1.java | 76 +++++++++++
.../rest/helix/v1/DefaultStateModelFactory.java | 40 ++++++
.../helix/v1/EmptyJobEngineStateModelV1.java | 51 ++++++++
.../kylin/rest/helix/v1/JobEngineSMDV1.java | 69 ++++++++++
.../apache/kylin/rest/service/CubeService.java | 12 +-
.../rest/helix/JobControllerConnectorTest.java | 121 ++++++++++++++++++
.../helix/v1/TestJobEngineStateModelV1.java | 74 +++++++++++
.../rest/helix/v1/TestStateModelFactory.java | 38 ++++++
23 files changed, 931 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java b/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java
index 755b1ac..2846c23 100644
--- a/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java
+++ b/assembly/src/test/java/org/apache/kylin/engine/spark/BuildCubeWithSparkTest.java
@@ -97,7 +97,7 @@ public class BuildCubeWithSparkTest {
for (String jobId : jobService.getAllJobIds()) {
jobService.deleteJob(jobId);
}
- scheduler = DefaultScheduler.getInstance();
+ scheduler = DefaultScheduler.createInstance();
scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock());
if (!scheduler.hasStarted()) {
throw new RuntimeException("scheduler has not been started");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index 75a6c54..4aaaca6 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -112,7 +112,7 @@ public class BuildCubeWithEngineTest {
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
jobService = ExecutableManager.getInstance(kylinConfig);
- scheduler = DefaultScheduler.getInstance();
+ scheduler = DefaultScheduler.createInstance();
scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
if (!scheduler.hasStarted()) {
throw new RuntimeException("scheduler has not been started");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
index 0158fad..ef8cf42 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
@@ -109,7 +109,7 @@ public class BuildIIWithEngineTest {
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
jobService = ExecutableManager.getInstance(kylinConfig);
- scheduler = DefaultScheduler.getInstance();
+ scheduler = DefaultScheduler.createInstance();
scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
if (!scheduler.hasStarted()) {
throw new RuntimeException("scheduler has not been started");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index e76aa94..f2ec347 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -145,6 +145,8 @@ public class KylinConfig implements Serializable {
public static final String VERSION = "${project.version}";
public static final String HTABLE_DEFAULT_COMPRESSION_CODEC = "kylin.hbase.default.compression.codec";
+
+ public static final String ZOOKEEPER_ADDRESS = "kylin.zookeeper.address";
// static cached instances
private static KylinConfig ENV_INSTANCE = null;
@@ -454,10 +456,6 @@ public class KylinConfig implements Serializable {
return Long.parseLong(getOptional("kylin.job.step.timeout", String.valueOf(2 * 60 * 60)));
}
- public String getServerMode() {
- return this.getOptional("kylin.server.mode", "all");
- }
-
public int getDictionaryMaxCardinality() {
return Integer.parseInt(getOptional("kylin.dictionary.max.cardinality", "5000000"));
}
@@ -684,6 +682,10 @@ public class KylinConfig implements Serializable {
public void setStorageUrl(String storageUrl) {
properties.setProperty(KYLIN_STORAGE_URL, storageUrl);
}
+
+ public String getZookeeperAddress() {
+ return properties.getProperty(ZOOKEEPER_ADDRESS);
+ }
public String getHiveDatabaseForIntermediateTable() {
return this.getOptional(HIVE_DATABASE_FOR_INTERMEDIATE_TABLE, "default");
@@ -756,5 +758,9 @@ public class KylinConfig implements Serializable {
public String toString() {
return getMetadataUrl();
}
+
+ public String getHelixClusterName() {
+ return getMetadataUrlPrefix();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/core-common/src/main/java/org/apache/kylin/common/util/HostnameUtils.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HostnameUtils.java b/core-common/src/main/java/org/apache/kylin/common/util/HostnameUtils.java
new file mode 100644
index 0000000..78c3bf5
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HostnameUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.common.util;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ */
+public final class HostnameUtils {
+
+ private HostnameUtils() {
+ }
+
+ public static String getHostname() {
+ try {
+ String result = InetAddress.getLocalHost().getHostName();
+ if (StringUtils.isNotEmpty(result)) {
+ return result;
+ }
+ } catch (UnknownHostException e) {
+ // failed; try alternate means.
+ }
+ // try environment properties.
+ String host = System.getenv("COMPUTERNAME");
+ if (host != null) {
+ return host;
+ }
+ host = System.getenv("HOSTNAME");
+ if (host != null) {
+ return host;
+ }
+ // undetermined.
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 417e279..d94ceae 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -18,17 +18,7 @@
package org.apache.kylin.job.impl.threadpool;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
+import com.google.common.collect.Maps;
import org.apache.kylin.job.Scheduler;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
@@ -43,11 +33,12 @@ import org.apache.kylin.job.manager.ExecutableManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
+import java.util.Map;
+import java.util.concurrent.*;
/**
*/
-public class DefaultScheduler implements Scheduler<AbstractExecutable>, ConnectionStateListener {
+public class DefaultScheduler implements Scheduler<AbstractExecutable> {
private ExecutableManager executableManager;
private FetcherRunner fetcher;
@@ -55,12 +46,12 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
private ExecutorService jobPool;
private DefaultContext context;
- private Logger logger = LoggerFactory.getLogger(DefaultScheduler.class);
+ private static Logger logger = LoggerFactory.getLogger(DefaultScheduler.class);
private volatile boolean initialized = false;
private volatile boolean hasStarted = false;
private JobEngineConfig jobEngineConfig;
- private static final DefaultScheduler INSTANCE = new DefaultScheduler();
+ private static DefaultScheduler INSTANCE;
private DefaultScheduler() {
}
@@ -134,21 +125,25 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
}
}
- public static DefaultScheduler getInstance() {
+ public synchronized static DefaultScheduler createInstance() {
+ destroyInstance();
+ INSTANCE = new DefaultScheduler();
return INSTANCE;
}
-
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState) {
- if ((newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST)) {
+
+ public synchronized static void destroyInstance() {
+ DefaultScheduler tmp = INSTANCE;
+ INSTANCE = null;
+ if (tmp != null) {
try {
- shutdown();
+ tmp.shutdown();
} catch (SchedulerException e) {
- throw new RuntimeException("failed to shutdown scheduler", e);
+ logger.error("error stop DefaultScheduler", e);
+ throw new RuntimeException(e);
}
}
}
-
+
@Override
public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException {
if (!initialized) {
@@ -159,10 +154,6 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
this.jobEngineConfig = jobEngineConfig;
- if (jobLock.lock() == false) {
- throw new IllegalStateException("Cannot start job scheduler due to lack of job lock");
- }
-
executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig());
//load all executable, set them to a consistent status
fetcherPool = Executors.newScheduledThreadPool(1);
@@ -182,7 +173,6 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
logger.debug("Closing zk connection");
try {
shutdown();
- jobLock.unlock();
} catch (SchedulerException e) {
logger.error("error shutdown scheduler", e);
}
@@ -196,8 +186,12 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
@Override
public void shutdown() throws SchedulerException {
- fetcherPool.shutdown();
- jobPool.shutdown();
+ if (fetcherPool != null) {
+ fetcherPool.shutdown();
+ }
+ if (jobPool != null) {
+ jobPool.shutdown();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index ecac973..4e092a1 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -45,7 +45,7 @@ public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase {
createTestMetadata();
setFinalStatic(ExecutableConstants.class.getField("DEFAULT_SCHEDULER_INTERVAL_SECONDS"), 10);
jobService = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
- scheduler = DefaultScheduler.getInstance();
+ scheduler = DefaultScheduler.createInstance();
scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()), new MockJobLock());
if (!scheduler.hasStarted()) {
throw new RuntimeException("scheduler has not been started");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3c696f7..5be477a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,6 +109,7 @@
org/apache/kylin/**/tools/**:**/*CLI.java
</sonar.jacoco.excludes>
+ <helix.version>0.6.5</helix.version>
</properties>
<licenses>
@@ -460,6 +461,17 @@
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <version>${helix.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-examples</artifactId>
+ <version>${helix.version}</version>
+ </dependency>
+
</dependencies>
</dependencyManagement>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index f2f9e32..59d6747 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -222,7 +222,27 @@
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.101tec</groupId>
+ <artifactId>zkclient</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.github.sgroschupf</groupId>
+ <artifactId>zkclient</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.101tec</groupId>
+ <artifactId>zkclient</artifactId>
+ <version>0.5</version>
+ </dependency>
+
+
<!-- spring aop -->
<dependency>
<groupId>org.aspectj</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
index f6323ed..73c7bd4 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -18,22 +18,17 @@
package org.apache.kylin.rest.controller;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-
+import com.google.common.base.Preconditions;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HostnameUtils;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.constant.JobStatusEnum;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
import org.apache.kylin.job.lock.JobLock;
-import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.rest.helix.HelixJobEngineAdmin;
+import org.apache.kylin.rest.helix.JobControllerConnector;
+import org.apache.kylin.rest.helix.JobControllerConstants;
+import org.apache.kylin.rest.helix.v1.DefaultStateModelFactory;
import org.apache.kylin.rest.request.JobListRequest;
import org.apache.kylin.rest.service.JobService;
import org.slf4j.Logger;
@@ -46,6 +41,11 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.kylin.rest.helix.JobControllerConstants.RESOURCE_NAME;
+
/**
* @author ysong1
* @author Jack
@@ -61,7 +61,7 @@ public class JobController extends BasicController implements InitializingBean {
@Autowired
private JobLock jobLock;
-
+
/*
* (non-Javadoc)
*
@@ -76,27 +76,29 @@ public class JobController extends BasicController implements InitializingBean {
TimeZone.setDefault(tzone);
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- String serverMode = kylinConfig.getServerMode();
-
- if (Constant.SERVER_MODE_JOB.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase())) {
- logger.info("Initializing Job Engine ....");
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- DefaultScheduler scheduler = DefaultScheduler.getInstance();
- scheduler.init(new JobEngineConfig(kylinConfig), jobLock);
- while (!scheduler.hasStarted()) {
- logger.error("scheduler has not been started");
- Thread.sleep(1000);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }).start();
- }
+
+ final String hostname = Preconditions.checkNotNull(HostnameUtils.getHostname());
+ final String port = JobControllerConstants.PARTICIPANT_PORT + "";
+ final String instanceName = JobControllerConstants.INSTANCE_NAME;
+ final String zkAddress = Preconditions.checkNotNull(kylinConfig.getZookeeperAddress());
+ HelixJobEngineAdmin helixJobEngineAdmin = HelixJobEngineAdmin.getInstance(zkAddress);
+ helixJobEngineAdmin.initV1(kylinConfig.getHelixClusterName(), JobControllerConstants.RESOURCE_NAME);
+ helixJobEngineAdmin.startControllers(kylinConfig.getHelixClusterName());
+ final JobControllerConnector jcc = new JobControllerConnector(hostname,
+ port,
+ zkAddress,
+ kylinConfig.getHelixClusterName(),
+ new DefaultStateModelFactory(instanceName, kylinConfig));
+ jcc.register();
+ helixJobEngineAdmin.rebalance(kylinConfig.getHelixClusterName(), RESOURCE_NAME);
+ jcc.start();
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ jcc.stop();
+ }
+ }));
+
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
index f60894e..7115bea 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/QueryController.java
@@ -67,7 +67,7 @@ import com.google.common.base.Preconditions;
/**
* Handle query requests.
- *
+ *
* @author xduo
*/
@Controller
@@ -169,16 +169,15 @@ public class QueryController extends BasicController {
private SQLResponse doQueryWithCache(SQLRequest sqlRequest) {
try {
BackdoorToggles.setToggles(sqlRequest.getBackdoorToggles());
-
String sql = sqlRequest.getSql();
String project = sqlRequest.getProject();
logger.info("Using project: " + project);
logger.info("The original query: " + sql);
- String serverMode = KylinConfig.getInstanceFromEnv().getServerMode();
- if (!(Constant.SERVER_MODE_QUERY.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase()))) {
- throw new InternalErrorException("Query is not allowed in " + serverMode + " mode.");
- }
+// String serverMode = KylinConfig.getInstanceFromEnv().getServerMode();
+// if (!(Constant.SERVER_MODE_QUERY.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase()))) {
+// throw new InternalErrorException("Query is not allowed in " + serverMode + " mode.");
+// }
if (!sql.toLowerCase().contains("select")) {
logger.debug("Directly return exception as not supported");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/helix/HelixJobEngineAdmin.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixJobEngineAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixJobEngineAdmin.java
new file mode 100644
index 0000000..938ed5d
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixJobEngineAdmin.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.kylin.rest.helix.v1.JobEngineSMDV1;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentMap;
+
+
+/**
+ */
+public class HelixJobEngineAdmin {
+
+ private static ConcurrentMap<String, HelixJobEngineAdmin> instanceMaps = Maps.newConcurrentMap();
+
+ public static HelixJobEngineAdmin getInstance(String zkAddress) {
+ Preconditions.checkNotNull(zkAddress);
+ instanceMaps.putIfAbsent(zkAddress, new HelixJobEngineAdmin(zkAddress));
+ return instanceMaps.get(zkAddress);
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(HelixJobEngineAdmin.class);
+ private final String zkAddress;
+ private final ZKHelixAdmin admin;
+
+ private HelixJobEngineAdmin(String zkAddress) {
+ this.zkAddress = zkAddress;
+ this.admin = new ZKHelixAdmin(zkAddress);
+ }
+
+ public void initV1(String clusterName, String resourceName) {
+ final StateModelDefinition jobEngineSMDV1 = JobEngineSMDV1.getJobEngineStateModelDefinitionV1();
+ admin.addCluster(clusterName, false);
+ if (admin.getStateModelDef(clusterName, jobEngineSMDV1.getId()) == null) {
+ admin.addStateModelDef(clusterName, jobEngineSMDV1.getId(), jobEngineSMDV1);
+ }
+ if (!admin.getResourcesInCluster(clusterName).contains(resourceName)) {
+ admin.addResource(clusterName, resourceName, 1, jobEngineSMDV1.getId(), "AUTO");
+ }
+ }
+
+ public void startControllers(String clusterName) {
+ HelixControllerMain.startHelixController(zkAddress, clusterName, "localhost_" + JobControllerConstants.CONTROLLER_PORT,
+ HelixControllerMain.STANDALONE);
+ }
+
+ public String getInstanceState(String clusterName, String resourceName, String instanceName) {
+ final ExternalView resourceExternalView = admin.getResourceExternalView(clusterName, resourceName);
+ if (resourceExternalView == null) {
+ logger.warn("fail to get ExternalView, clusterName:" + clusterName + " resourceName:" + resourceName);
+ return "ERROR";
+ }
+ final Set<String> partitionSet = resourceExternalView.getPartitionSet();
+ Preconditions.checkArgument(partitionSet.size() == 1);
+ final Map<String, String> stateMap = resourceExternalView.getStateMap(partitionSet.iterator().next());
+ if (stateMap.containsKey(instanceName)) {
+ return stateMap.get(instanceName);
+ } else {
+ logger.warn("fail to get state, clusterName:" + clusterName + " resourceName:" + resourceName + " instance:" + instanceName);
+ return "ERROR";
+ }
+ }
+
+
+ public String collectStateInfo(String msg, String clusterName, String resourceName) {
+ StringBuilder sb = new StringBuilder("");
+ sb.append("CLUSTER STATE: ").append(msg).append("\n");
+ ExternalView resourceExternalView = admin.getResourceExternalView(clusterName, resourceName);
+ if (resourceExternalView == null) {
+ sb.append("no participant joined yet").append("\n");
+ return sb.toString();
+ }
+ final List<String> instancesInCluster = admin.getInstancesInCluster(clusterName);
+ TreeSet<String> sortedSet = new TreeSet<String>(resourceExternalView.getPartitionSet());
+ sb.append("\t\t");
+ for (String instance : instancesInCluster) {
+ sb.append(instance).append("\t");
+ }
+ sb.append("\n");
+ for (String partitionName : sortedSet) {
+ sb.append(partitionName).append("\t");
+ for (String instance : instancesInCluster) {
+ Map<String, String> stateMap = resourceExternalView.getStateMap(partitionName);
+ if (stateMap != null && stateMap.containsKey(instance)) {
+ sb.append(stateMap.get(instance)).append(
+ "\t\t");
+ } else {
+ sb.append("-").append("\t\t");
+ }
+ }
+ sb.append("\n");
+ }
+ sb.append("###################################################################").append("\n");
+ return sb.toString();
+ }
+
+ public void rebalance(String clusterName, String resourceName) {
+ final List<String> instancesInCluster = admin.getInstancesInCluster(clusterName);
+ admin.rebalance(clusterName, resourceName, instancesInCluster.size(), instancesInCluster);
+ logger.info("cluster:" + clusterName + " ideal state:" + admin.getResourceIdealState(clusterName, resourceName));
+ logger.info("cluster:" + clusterName + " instances:" + instancesInCluster);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConnector.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConnector.java b/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConnector.java
new file mode 100644
index 0000000..19854a9
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConnector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.kylin.rest.helix.v1.JobEngineSMDV1;
+
+
+/**
+ */
+public class JobControllerConnector {
+
+ private final String instanceName;
+ private final HelixAdmin admin;
+ private final StateModelFactory<StateModel> stateModelFactory;
+ private final String zkAddress;
+ private final String hostname;
+ private final String port;
+ private final String clusterName;
+
+ private HelixManager manager;
+
+ public JobControllerConnector(String hostname, String port, String zkAddress, String clusterName, StateModelFactory<StateModel> stateModelFactory) {
+ this.instanceName = hostname + "_" + port;
+ this.hostname = hostname;
+ this.port = port;
+ this.zkAddress = zkAddress;
+ this.admin = new ZKHelixAdmin(zkAddress);
+ this.stateModelFactory = stateModelFactory;
+ this.clusterName = clusterName;
+ }
+
+ public void register() {
+ if (!admin.getInstancesInCluster(clusterName).contains(instanceName)) {
+ InstanceConfig instanceConfig = new InstanceConfig(instanceName);
+ instanceConfig.setHostName(hostname);
+ instanceConfig.setPort(port);
+ admin.addInstance(clusterName, instanceConfig);
+ }
+ }
+
+
+ public void start() throws Exception {
+ this.manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
+ InstanceType.PARTICIPANT, zkAddress);
+ StateMachineEngine stateMach = manager.getStateMachineEngine();
+ stateMach.registerStateModelFactory(JobEngineSMDV1.STATE_MODEL_NAME, stateModelFactory);
+ manager.connect();
+ }
+
+ public void stop() {
+ if (manager != null) {
+ manager.disconnect();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConstants.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConstants.java b/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConstants.java
new file mode 100644
index 0000000..db8d75a
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/JobControllerConstants.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+import org.apache.kylin.common.util.HostnameUtils;
+
+/**
+ */
+public final class JobControllerConstants {
+
+ private JobControllerConstants() {
+ }
+
+ public static final String RESOURCE_NAME = "job_engine_v1";
+
+
+ public static final int PARTICIPANT_PORT = 17077;
+ public static final int CONTROLLER_PORT = 17078;
+
+ public static final String INSTANCE_NAME = HostnameUtils.getHostname() + "_" + JobControllerConstants.PARTICIPANT_PORT;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/helix/v1/AbstractJobEngineStateModelV1.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/v1/AbstractJobEngineStateModelV1.java b/server/src/main/java/org/apache/kylin/rest/helix/v1/AbstractJobEngineStateModelV1.java
new file mode 100644
index 0000000..0e171c1
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/v1/AbstractJobEngineStateModelV1.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.kylin.rest.controller.JobController;
+
+/**
+ */
+abstract class AbstractJobEngineStateModelV1 extends StateModel {
+
+ private final String instanceName;
+
+ public AbstractJobEngineStateModelV1(String instanceName) {
+ this.instanceName = instanceName;
+ }
+
+ public final String getInstanceName() {
+ return instanceName;
+ }
+
+ public abstract void onBecomeLeaderFromStandby(Message message, NotificationContext context);
+
+ public abstract void onBecomeStandbyFromLeader(Message message, NotificationContext context);
+
+ public abstract void onBecomeOfflineFromStandby(Message message, NotificationContext context);
+
+ public abstract void onBecomeStandbyFromOffline(Message message, NotificationContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultJobEngineStateModelV1.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultJobEngineStateModelV1.java b/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultJobEngineStateModelV1.java
new file mode 100644
index 0000000..13a2a08
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultJobEngineStateModelV1.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.SchedulerException;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.lock.MockJobLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class DefaultJobEngineStateModelV1 extends AbstractJobEngineStateModelV1 {
+
+ private static final Logger logger = LoggerFactory.getLogger(DefaultJobEngineStateModelV1.class);
+ private KylinConfig kylinConfig;
+
+
+ public DefaultJobEngineStateModelV1(String instanceName, KylinConfig kylinConfig) {
+ super(instanceName);
+ this.kylinConfig = kylinConfig;
+ }
+
+ @Override
+ public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+ logger.info(getInstanceName() + " onBecomeLeaderFromStandby");
+ try {
+ DefaultScheduler scheduler = DefaultScheduler.createInstance();
+ scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock());
+ while (!scheduler.hasStarted()) {
+ logger.error("scheduler has not been started");
+ Thread.sleep(1000);
+ }
+ } catch (Exception e) {
+ logger.error("error start DefaultScheduler", e);
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @Override
+ public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+ logger.info(getInstanceName() + " onBecomeStandbyFromLeader");
+ DefaultScheduler.destroyInstance();
+ }
+
+ @Override
+ public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+ logger.info(getInstanceName() + " onBecomeOfflineFromStandby");
+ }
+
+ @Override
+ public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+ logger.info(getInstanceName() + " onBecomeStandbyFromOffline");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultStateModelFactory.java
new file mode 100644
index 0000000..fe27263
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/v1/DefaultStateModelFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.kylin.common.KylinConfig;
+
+/**
+ */
+public class DefaultStateModelFactory extends StateModelFactory<StateModel> {
+
+ private final String instanceName;
+ private final KylinConfig kylinConfig;
+
+ public DefaultStateModelFactory(String instanceName, KylinConfig kylinConfig) {
+ this.instanceName = instanceName;
+ this.kylinConfig = kylinConfig;
+ }
+
+ @Override
+ public StateModel createNewStateModel(String resourceName, String stateUnitKey) {
+ return new DefaultJobEngineStateModelV1(instanceName, kylinConfig);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/helix/v1/EmptyJobEngineStateModelV1.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/v1/EmptyJobEngineStateModelV1.java b/server/src/main/java/org/apache/kylin/rest/helix/v1/EmptyJobEngineStateModelV1.java
new file mode 100644
index 0000000..22e9b2c
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/v1/EmptyJobEngineStateModelV1.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+
+/**
+ */
+public final class EmptyJobEngineStateModelV1 extends AbstractJobEngineStateModelV1 {
+
+ public EmptyJobEngineStateModelV1(String instanceName) {
+ super(instanceName);
+ }
+
+ @Override
+ public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+ System.out.println(getInstanceName() + " onBecomeLeaderFromStandby");
+ }
+
+ @Override
+ public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+ System.out.println(getInstanceName() + " onBecomeStandbyFromLeader");
+ }
+
+ @Override
+ public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+ System.out.println(getInstanceName() + " onBecomeOfflineFromStandby");
+ }
+
+ @Override
+ public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+ System.out.println(getInstanceName() + " onBecomeStandbyFromOffline");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/helix/v1/JobEngineSMDV1.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/v1/JobEngineSMDV1.java b/server/src/main/java/org/apache/kylin/rest/helix/v1/JobEngineSMDV1.java
new file mode 100644
index 0000000..4ccf9ae
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/v1/JobEngineSMDV1.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.model.StateModelDefinition;
+
+/**
+ */
+public final class JobEngineSMDV1 {
+
+ public static final String STATE_MODEL_NAME = "job_engine_model_v1";
+
+ public enum States {
+ LEADER,
+ STANDBY,
+ OFFLINE
+ }
+
+ private static class StateModelDefinitionV1Holder {
+ private static StateModelDefinition instance = build();
+ private static StateModelDefinition build() {
+ StateModelDefinition.Builder builder = new StateModelDefinition.Builder(STATE_MODEL_NAME);
+ // init state
+ builder.initialState(States.OFFLINE.name());
+
+ // add states
+ builder.addState(States.LEADER.name(), 0);
+ builder.addState(States.STANDBY.name(), 2);
+ builder.addState(States.OFFLINE.name(), 1);
+ for (HelixDefinedState state : HelixDefinedState.values()) {
+ builder.addState(state.name());
+ }
+
+ // add transitions
+ builder.addTransition(States.LEADER.name(), States.STANDBY.name(), 0);
+ builder.addTransition(States.STANDBY.name(), States.LEADER.name(), 1);
+ builder.addTransition(States.OFFLINE.name(), States.STANDBY.name(), 2);
+ builder.addTransition(States.STANDBY.name(), States.OFFLINE.name(), 3);
+ builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name());
+
+ // bounds
+ builder.upperBound(States.LEADER.name(), 1);
+ builder.dynamicUpperBound(States.STANDBY.name(), "R");
+
+ return builder.build();
+ }
+ }
+
+ public static StateModelDefinition getJobEngineStateModelDefinitionV1() {
+ return StateModelDefinitionV1Holder.instance;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 6670de1..0c50228 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -55,6 +55,9 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.rest.helix.HelixJobEngineAdmin;
+import org.apache.kylin.rest.helix.JobControllerConstants;
+import org.apache.kylin.rest.helix.v1.JobEngineSMDV1;
import org.apache.kylin.rest.request.MetricsRequest;
import org.apache.kylin.rest.response.HBaseResponse;
import org.apache.kylin.rest.response.MetricsResponse;
@@ -574,9 +577,12 @@ public class CubeService extends BasicService {
public void updateOnNewSegmentReady(String cubeName) {
logger.debug("on updateOnNewSegmentReady: " + cubeName);
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- String serverMode = kylinConfig.getServerMode();
- logger.debug("server mode: " + serverMode);
- if (Constant.SERVER_MODE_JOB.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase())) {
+ final String instanceState = HelixJobEngineAdmin.getInstance(kylinConfig.getZookeeperAddress()).
+ getInstanceState(kylinConfig.getHelixClusterName(),
+ JobControllerConstants.RESOURCE_NAME,
+ JobControllerConstants.INSTANCE_NAME);
+ logger.debug("server state: " + instanceState);
+ if (JobEngineSMDV1.States.LEADER.toString().equalsIgnoreCase(instanceState)) {
keepCubeRetention(cubeName);
mergeCubeSegment(cubeName);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/test/java/org/apache/kylin/rest/helix/JobControllerConnectorTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/helix/JobControllerConnectorTest.java b/server/src/test/java/org/apache/kylin/rest/helix/JobControllerConnectorTest.java
new file mode 100644
index 0000000..8b11728
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/helix/JobControllerConnectorTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix;
+
+import com.google.common.collect.Lists;
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+
+import static org.apache.kylin.rest.helix.JobControllerConstants.*;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kylin.rest.helix.v1.TestJobEngineStateModelV1;
+import org.apache.kylin.rest.helix.v1.TestStateModelFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ */
+public class JobControllerConnectorTest {
+
+
+ String zkAddress = "localhost:2199";
+ ZkServer server;
+
+ List<JobControllerConnector> connectors ;
+ HelixJobEngineAdmin helixJobEngineAdmin;
+
+ private static final String CLUSTER_NAME = "test_cluster";
+
+
+ @Before
+ public void setup() {
+ // start zookeeper on localhost
+ final File tmpDir = new File("/tmp/helix-quickstart");
+ FileUtil.fullyDelete(tmpDir);
+ tmpDir.mkdirs();
+ server = new ZkServer("/tmp/helix-quickstart/dataDir", "/tmp/helix-quickstart/logDir",
+ new IDefaultNameSpace() {
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient) {
+ }
+ }, 2199);
+ server.start();
+
+ final ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkAddress);
+ zkHelixAdmin.dropCluster(CLUSTER_NAME);
+ connectors = Lists.newArrayList();
+ helixJobEngineAdmin = HelixJobEngineAdmin.getInstance(zkAddress);
+ helixJobEngineAdmin.initV1(CLUSTER_NAME, RESOURCE_NAME);
+ helixJobEngineAdmin.startControllers(CLUSTER_NAME);
+
+ }
+
+ @Test
+ public void test() throws Exception {
+ JobControllerConnector connector = new JobControllerConnector("localhost", "10000", zkAddress, CLUSTER_NAME, new TestStateModelFactory("localhost", "10000"));
+ connector.register();
+ helixJobEngineAdmin.rebalance(CLUSTER_NAME, RESOURCE_NAME);
+ connector.start();
+ connectors.add(connector);
+ Thread.sleep(1000);
+ System.out.println(helixJobEngineAdmin.collectStateInfo("add 1 nodes", CLUSTER_NAME, RESOURCE_NAME));
+ assertEquals(1, TestJobEngineStateModelV1.getLeaderCount().get());
+ assertEquals(0, TestJobEngineStateModelV1.getStandbyCount().get());
+ assertEquals(0, TestJobEngineStateModelV1.getOfflineCount().get());
+
+ connector = new JobControllerConnector("localhost", "10001", zkAddress, CLUSTER_NAME, new TestStateModelFactory("localhost", "10001"));
+ connector.register();
+ helixJobEngineAdmin.rebalance(CLUSTER_NAME, RESOURCE_NAME);
+ connector.start();
+ connectors.add(connector);
+ Thread.sleep(1000);
+ System.out.println(helixJobEngineAdmin.collectStateInfo("add 2 nodes", CLUSTER_NAME, RESOURCE_NAME));
+ assertEquals(1, TestJobEngineStateModelV1.getLeaderCount().get());
+ assertEquals(1, TestJobEngineStateModelV1.getStandbyCount().get());
+ assertEquals(0, TestJobEngineStateModelV1.getOfflineCount().get());
+
+
+ connectors.remove(0).stop();
+ TestJobEngineStateModelV1.getLeaderCount().decrementAndGet();
+ Thread.sleep(1000);
+ assertEquals(1, TestJobEngineStateModelV1.getLeaderCount().get());
+ assertEquals(0, TestJobEngineStateModelV1.getStandbyCount().get());
+ assertEquals(0, TestJobEngineStateModelV1.getOfflineCount().get());
+
+ connectors.remove(0).stop();
+ TestJobEngineStateModelV1.getLeaderCount().decrementAndGet();
+ Thread.sleep(1000);
+ assertEquals(0, TestJobEngineStateModelV1.getLeaderCount().get());
+ assertEquals(0, TestJobEngineStateModelV1.getStandbyCount().get());
+ assertEquals(0, TestJobEngineStateModelV1.getOfflineCount().get());
+
+ }
+
+ @After
+ public void tearDown() {
+ server.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/test/java/org/apache/kylin/rest/helix/v1/TestJobEngineStateModelV1.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/helix/v1/TestJobEngineStateModelV1.java b/server/src/test/java/org/apache/kylin/rest/helix/v1/TestJobEngineStateModelV1.java
new file mode 100644
index 0000000..e8014d9
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/helix/v1/TestJobEngineStateModelV1.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ */
+public class TestJobEngineStateModelV1 extends AbstractJobEngineStateModelV1 {
+
+ private static AtomicInteger offlineCount = new AtomicInteger(0);
+ private static AtomicInteger standbyCount = new AtomicInteger(0);
+ private static AtomicInteger leaderCount = new AtomicInteger(0);
+
+ public TestJobEngineStateModelV1(String instanceName) {
+ super(instanceName);
+ offlineCount.incrementAndGet();
+ }
+
+ @Override
+ public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+ leaderCount.incrementAndGet();
+ standbyCount.decrementAndGet();
+ }
+
+ @Override
+ public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+ standbyCount.incrementAndGet();
+ leaderCount.decrementAndGet();
+ }
+
+ @Override
+ public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+ offlineCount.incrementAndGet();
+ standbyCount.decrementAndGet();
+ }
+
+ @Override
+ public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+ standbyCount.incrementAndGet();
+ offlineCount.decrementAndGet();
+ }
+
+ public static AtomicInteger getStandbyCount() {
+ return standbyCount;
+ }
+
+ public static AtomicInteger getOfflineCount() {
+ return offlineCount;
+ }
+
+ public static AtomicInteger getLeaderCount() {
+ return leaderCount;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/be19bec6/server/src/test/java/org/apache/kylin/rest/helix/v1/TestStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/helix/v1/TestStateModelFactory.java b/server/src/test/java/org/apache/kylin/rest/helix/v1/TestStateModelFactory.java
new file mode 100644
index 0000000..ce0820b
--- /dev/null
+++ b/server/src/test/java/org/apache/kylin/rest/helix/v1/TestStateModelFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.rest.helix.v1;
+
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+/**
+ */
+public class TestStateModelFactory extends StateModelFactory<StateModel> {
+
+ private final String instanceName;
+
+ public TestStateModelFactory(String hostname, String port) {
+ this.instanceName = hostname + "_" + port;
+ }
+
+ @Override
+ public StateModel createNewStateModel(String resourceName, String stateUnitKey) {
+ TestJobEngineStateModelV1 stateModel = new TestJobEngineStateModelV1(instanceName);
+ return stateModel;
+ }
+}
|