kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shaofeng...@apache.org
Subject [12/15] kylin git commit: KYLIN-1311 on the way
Date Sun, 14 Feb 2016 02:53:28 GMT
KYLIN-1311 on the way


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4f41fd5c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4f41fd5c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4f41fd5c

Branch: refs/heads/helix-201602
Commit: 4f41fd5c80351d31e83330d070d5dbd8c11c4ea5
Parents: bbfe8ae
Author: shaofengshi <shaofengshi@apache.org>
Authored: Fri Jan 22 11:01:48 2016 +0800
Committer: shaofengshi <shaofengshi@apache.org>
Committed: Sat Feb 6 13:33:06 2016 +0800

----------------------------------------------------------------------
 build/bin/kylin.sh                              |   8 +-
 .../test_case_data/localmeta/kylin.properties   |   2 +-
 server/pom.xml                                  |   1 +
 .../rest/controller/ClusterController.java      |  71 ++
 .../kylin/rest/controller/JobController.java    |  33 -
 .../rest/controller/StreamingController.java    |  68 +-
 .../kylin/rest/helix/HelixClusterAdmin.java     |  31 +-
 .../rest/helix/JobEngineTransitionHandler.java  |  70 ++
 .../helix/LeaderStandbyStateModelFactory.java   | 125 +---
 .../helix/StreamCubeBuildTransitionHandler.java | 107 +++
 .../rest/request/StreamingBuildRequest.java     |  29 +-
 .../security/KylinAuthenticationProvider.java   |   3 +-
 .../kylin/rest/service/StreamingService.java    |  34 +-
 .../rest/controller/UserControllerTest.java     |   9 -
 .../kylin/rest/helix/HelixClusterAdminTest.java |  22 +-
 .../kylin/rest/service/CacheServiceTest.java    | 720 +++++++++----------
 .../kylin/rest/service/ServiceTestBase.java     |  40 +-
 .../rest/service/TestBaseWithZookeeper.java     |  74 ++
 .../source/kafka/TimedJsonStreamParser.java     |   7 +-
 19 files changed, 825 insertions(+), 629 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index 5b03f43..d196fe6 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -70,7 +70,7 @@ then
 
     if [ -z "$KYLIN_REST_ADDRESS" ]
     then
-        kylin_rest_address=`hostname`":"`grep "<Connector port=" ${tomcat_root}/conf/server.xml |grep protocol=\"HTTP/1.1\" | cut -d '=' -f 2 | cut -d \" -f 2`
+        kylin_rest_address=`hostname -f`":"`grep "<Connector port=" ${tomcat_root}/conf/server.xml |grep protocol=\"HTTP/1.1\" | cut -d '=' -f 2 | cut -d \" -f 2`
         echo "KYLIN_REST_ADDRESS not found, will use ${kylin_rest_address}"
     else
         echo "KYLIN_REST_ADDRESS is set to: $KYLIN_REST_ADDRESS"
@@ -154,12 +154,12 @@ then
         exit 0
     elif [ "$2" == "stop" ]
     then
-        if [ ! -f "${KYLIN_HOME}/$3_$4" ]
+        if [ ! -f "${KYLIN_HOME}/logs/$3_$4" ]
             then
                 echo "streaming is not running, please check"
                 exit 1
             fi
-            pid=`cat ${KYLIN_HOME}/$3_$4`
+            pid=`cat ${KYLIN_HOME}/logs/$3_$4`
             if [ "$pid" = "" ]
             then
                 echo "streaming is not running, please check"
@@ -168,7 +168,7 @@ then
                 echo "stopping streaming:$pid"
                 kill $pid
             fi
-            rm ${KYLIN_HOME}/$3_$4
+            rm ${KYLIN_HOME}/logs/$3_$4
             exit 0
         else
             echo

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/examples/test_case_data/localmeta/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties
index 978102f..41a9895 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -6,7 +6,7 @@
 kylin.owner=whoami@kylin.apache.org
 
 # List of web servers in use, this enables one web server instance to sync up with other servers.
-#kylin.rest.servers=localhost:7070
+kylin.rest.servers=localhost:7070
 
 # The metadata store in hbase
 kylin.metadata.url=

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index 86ec5a5..2359855 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -466,6 +466,7 @@
             <groupId>org.apache.zookeeper</groupId>
             <artifactId>zookeeper</artifactId>
             <version>${zookeeper.version}</version>
+            <scope>provided</scope>
             <exclusions>
                 <exclusion>
                     <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java b/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java
new file mode 100644
index 0000000..97fff36
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/controller/ClusterController.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.controller;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.constant.JobStatusEnum;
+import org.apache.kylin.job.constant.JobTimeFilterEnum;
+import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.rest.helix.HelixClusterAdmin;
+import org.apache.kylin.rest.request.JobListRequest;
+import org.apache.kylin.rest.service.JobService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+import java.util.*;
+
+/**
+ * 
+ */
+@Controller
+@RequestMapping(value = "cluster")
+public class ClusterController extends BasicController implements InitializingBean {
+    private static final Logger logger = LoggerFactory.getLogger(ClusterController.class);
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
+     */
+    @Override
+    public void afterPropertiesSet() throws Exception {
+
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+
+        final HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(kylinConfig);
+        clusterAdmin.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                clusterAdmin.stop();
+            }
+        }));
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 77d987f..a61635d 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -61,42 +61,9 @@ public class JobController extends BasicController implements InitializingBean {
      */
     @Override
     public void afterPropertiesSet() throws Exception {
-
         String timeZone = jobService.getConfig().getTimeZone();
         TimeZone tzone = TimeZone.getTimeZone(timeZone);
         TimeZone.setDefault(tzone);
-
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-
-        if (kylinConfig.isClusterEnabled() == true) {
-            logger.info("Kylin cluster enabled, will use Helix/zookeeper to coordinate.");
-            final HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(kylinConfig);
-            clusterAdmin.start();
-
-            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    clusterAdmin.stop();
-                }
-            }));
-        } else {
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        DefaultScheduler scheduler = DefaultScheduler.createInstance();
-                        scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
-                        if (!scheduler.hasStarted()) {
-                            logger.error("scheduler has not been started");
-                            System.exit(1);
-                        }
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
-                }
-            }).start();
-        }
-
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index fb806d1..209c552 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -26,11 +26,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeBuildTypeEnum;
-import org.apache.kylin.engine.streaming.BootstrapConfig;
 import org.apache.kylin.engine.streaming.StreamingConfig;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.exception.ForbiddenException;
 import org.apache.kylin.rest.exception.InternalErrorException;
@@ -45,7 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.security.access.AccessDeniedException;
-import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.*;
 
@@ -93,7 +88,6 @@ public class StreamingController extends BasicController {
         }
     }
 
-
     /**
      *
      * create Streaming Schema
@@ -105,7 +99,7 @@ public class StreamingController extends BasicController {
         //Update Model
         StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
         KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
-        if (streamingConfig == null ||kafkaConfig == null) {
+        if (streamingConfig == null || kafkaConfig == null) {
             return streamingRequest;
         }
         if (StringUtils.isEmpty(streamingConfig.getName())) {
@@ -124,7 +118,7 @@ public class StreamingController extends BasicController {
         try {
             kafkaConfig.setUuid(UUID.randomUUID().toString());
             kafkaConfigService.createKafkaConfig(kafkaConfig);
-        }catch (IOException e){
+        } catch (IOException e) {
             try {
                 streamingService.dropStreamingConfig(streamingConfig);
             } catch (IOException e1) {
@@ -139,7 +133,7 @@ public class StreamingController extends BasicController {
 
     @RequestMapping(value = "", method = { RequestMethod.PUT })
     @ResponseBody
-        public StreamingRequest updateModelDesc(@RequestBody StreamingRequest streamingRequest) throws JsonProcessingException {
+    public StreamingRequest updateModelDesc(@RequestBody StreamingRequest streamingRequest) throws JsonProcessingException {
         StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
         KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
 
@@ -156,7 +150,7 @@ public class StreamingController extends BasicController {
         }
         try {
             kafkaConfig = kafkaConfigService.updateKafkaConfig(kafkaConfig);
-        }catch (AccessDeniedException accessDeniedException) {
+        } catch (AccessDeniedException accessDeniedException) {
             throw new ForbiddenException("You don't have right to update this KafkaConfig.");
         } catch (Exception e) {
             logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
@@ -203,7 +197,6 @@ public class StreamingController extends BasicController {
         return desc;
     }
 
-
     private KafkaConfig deserializeKafkaSchemalDesc(StreamingRequest streamingRequest) {
         KafkaConfig desc = null;
         try {
@@ -227,16 +220,14 @@ public class StreamingController extends BasicController {
         request.setMessage(message);
     }
 
-
-
     /**
      * Send a stream build request
      *
-     * @param cubeName Cube ID
+     * @param cubeName Cube Name
      * @return
      * @throws IOException
      */
-    @RequestMapping(value = "/{cubeName}/build", method = {RequestMethod.PUT})
+    @RequestMapping(value = "/{cubeName}/build", method = { RequestMethod.PUT })
     @ResponseBody
     public StreamingBuildRequest buildStream(@PathVariable String cubeName, @RequestBody StreamingBuildRequest streamingBuildRequest) {
         StreamingConfig streamingConfig = streamingService.getStreamingManager().getStreamingConfigByCube(cubeName);
@@ -244,27 +235,54 @@ public class StreamingController extends BasicController {
         List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null, null);
         Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found.");
         CubeInstance cube = cubes.get(0);
-        if (streamingBuildRequest.isFillGap() == false) {
-            Preconditions.checkArgument(streamingBuildRequest.getEnd() > streamingBuildRequest.getStart(), "End time should be greater than start time.");
-            for (CubeSegment segment : cube.getSegments()) {
-                if (segment.getDateRangeStart() <= streamingBuildRequest.getStart() && segment.getDateRangeEnd() >= streamingBuildRequest.getEnd()) {
-                    streamingBuildRequest.setMessage("The segment already exists: " + segment.toString());
-                    streamingBuildRequest.setSuccessful(false);
-                    return streamingBuildRequest;
-                }
+        if (streamingBuildRequest.getEnd() <= streamingBuildRequest.getStart()) {
+            streamingBuildRequest.setMessage("End time should be greater than start time.");streamingBuildRequest.setSuccessful(false);
+            return streamingBuildRequest;
+        }
+
+        for (CubeSegment segment : cube.getSegments()) {
+            if (segment.getDateRangeStart() <= streamingBuildRequest.getStart() && segment.getDateRangeEnd() >= streamingBuildRequest.getEnd()) {
+                streamingBuildRequest.setMessage("The segment already exists: " + segment.toString());
+                streamingBuildRequest.setSuccessful(false);
+                return streamingBuildRequest;
             }
         }
 
         streamingBuildRequest.setStreaming(streamingConfig.getName());
-        streamingService.buildStream(cubeName, streamingBuildRequest);
+        streamingService.buildStream(cube, streamingBuildRequest);
         streamingBuildRequest.setMessage("Build request is submitted successfully.");
         streamingBuildRequest.setSuccessful(true);
         return streamingBuildRequest;
 
     }
 
+    /**
+     * Send a stream fillGap request
+     *
+     * @param cubeName Cube Name
+     * @return
+     * @throws IOException
+     */
+    @RequestMapping(value = "/{cubeName}/fillgap", method = { RequestMethod.PUT })
+    @ResponseBody
+    public StreamingBuildRequest fillGap(@PathVariable String cubeName) {
+        StreamingConfig streamingConfig = streamingService.getStreamingManager().getStreamingConfigByCube(cubeName);
+        Preconditions.checkNotNull(streamingConfig, "Stream config for '" + cubeName + "' is not found.");
+        List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null, null);
+        Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found.");
+        CubeInstance cube = cubes.get(0);
+
+        StreamingBuildRequest streamingBuildRequest = new StreamingBuildRequest();
+        streamingBuildRequest.setStreaming(streamingConfig.getName());
+        streamingService.fillGap(cube);
+        streamingBuildRequest.setMessage("FillGap request is submitted successfully.");
+        streamingBuildRequest.setSuccessful(true);
+        return streamingBuildRequest;
+
+    }
+
     public void setStreamingService(StreamingService streamingService) {
-        this.streamingService= streamingService;
+        this.streamingService = streamingService;
     }
 
     public void setKafkaConfigService(KafkaConfigService kafkaConfigService) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
index 9850e24..0758ef1 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/HelixClusterAdmin.java
@@ -33,8 +33,10 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.request.StreamingBuildRequest;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -126,14 +128,13 @@ public class HelixClusterAdmin {
 
     }
 
-    public void addStreamingJob(String streamingName, long start, long end) {
-        String resourceName = RESOURCE_STREAME_CUBE_PREFIX + streamingName + "_" + start + "_" + end;
-        if (!admin.getResourcesInCluster(clusterName).contains(resourceName)) {
-            admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name());
-        } else {
-            logger.warn("Resource '" + resourceName + "' already exists in cluster, skip adding.");
+    public void addStreamingJob(StreamingBuildRequest streamingBuildRequest) {
+        String resourceName = streamingBuildRequest.toResourceName();
+        if (admin.getResourcesInCluster(clusterName).contains(resourceName)) {
+            logger.warn("Resource '" + resourceName + "' already exists in cluster, remove and re-add.");
+            admin.dropResource(clusterName, resourceName);
         }
-
+        admin.addResource(clusterName, resourceName, 1, MODEL_LEADER_STANDBY, IdealState.RebalanceMode.SEMI_AUTO.name());
         admin.rebalance(clusterName, resourceName, 2, "", TAG_STREAM_BUILDER);
 
     }
@@ -150,7 +151,7 @@ public class HelixClusterAdmin {
      */
     protected void startInstance(String instanceName) throws Exception {
         participantManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddress);
-        participantManager.getStateMachineEngine().registerStateModelFactory(StateModelDefId.from(MODEL_LEADER_STANDBY), new LeaderStandbyStateModelFactory());
+        participantManager.getStateMachineEngine().registerStateModelFactory(StateModelDefId.from(MODEL_LEADER_STANDBY), new LeaderStandbyStateModelFactory(this.kylinConfig));
         participantManager.connect();
         participantManager.addLiveInstanceChangeListener(new KylinClusterLiveInstanceChangeListener());
 
@@ -179,10 +180,12 @@ public class HelixClusterAdmin {
     public void stop() {
         if (participantManager != null) {
             participantManager.disconnect();
+            participantManager = null;
         }
 
         if (controllerManager != null) {
             controllerManager.disconnect();
+            controllerManager = null;
         }
     }
 
@@ -269,11 +272,13 @@ public class HelixClusterAdmin {
                 int indexOfUnderscore = instanceName.lastIndexOf("_");
                 instanceRestAddresses.add(instanceName.substring(0, indexOfUnderscore) + ":" + instanceName.substring(indexOfUnderscore + 1));
             }
-            String restServersInCluster = StringUtil.join(instanceRestAddresses, ",");
-            kylinConfig.setProperty("kylin.rest.servers", restServersInCluster);
-            System.setProperty("kylin.rest.servers", restServersInCluster);
-            logger.info("kylin.rest.servers update to " + restServersInCluster);
-            Broadcaster.clearCache();
+            if (instanceRestAddresses.size() > 0) {
+                String restServersInCluster = StringUtil.join(instanceRestAddresses, ",");
+                kylinConfig.setProperty("kylin.rest.servers", restServersInCluster);
+                System.setProperty("kylin.rest.servers", restServersInCluster);
+                logger.info("kylin.rest.servers update to " + restServersInCluster);
+                Broadcaster.clearCache();
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/helix/JobEngineTransitionHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/JobEngineTransitionHandler.java b/server/src/main/java/org/apache/kylin/rest/helix/JobEngineTransitionHandler.java
new file mode 100644
index 0000000..3ef04ee
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/JobEngineTransitionHandler.java
@@ -0,0 +1,70 @@
+package org.apache.kylin.rest.helix;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
+import org.apache.kylin.job.lock.MockJobLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ */
+public class JobEngineTransitionHandler extends TransitionHandler {
+    private static final Logger logger = LoggerFactory.getLogger(JobEngineTransitionHandler.class);
+    private final KylinConfig kylinConfig;
+
+    private static ConcurrentMap<KylinConfig, JobEngineTransitionHandler> instanceMaps = Maps.newConcurrentMap();
+
+    private JobEngineTransitionHandler(KylinConfig kylinConfig) {
+        this.kylinConfig = kylinConfig;
+    }
+
+    public static JobEngineTransitionHandler getInstance(KylinConfig kylinConfig) {
+        Preconditions.checkNotNull(kylinConfig);
+        instanceMaps.putIfAbsent(kylinConfig, new JobEngineTransitionHandler(kylinConfig));
+        return instanceMaps.get(kylinConfig);
+    }
+
+    @Transition(to = "LEADER", from = "STANDBY")
+    public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+        logger.info("JobEngineStateModel.onBecomeLeaderFromStandby()");
+        try {
+            DefaultScheduler scheduler = DefaultScheduler.createInstance();
+            scheduler.init(new JobEngineConfig(this.kylinConfig), new MockJobLock());
+            while (!scheduler.hasStarted()) {
+                logger.error("scheduler has not been started");
+                Thread.sleep(1000);
+            }
+        } catch (Exception e) {
+            logger.error("error start DefaultScheduler", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Transition(to = "STANDBY", from = "LEADER")
+    public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+        logger.info("JobEngineStateModel.onBecomeStandbyFromLeader()");
+        DefaultScheduler.destroyInstance();
+
+    }
+
+    @Transition(to = "STANDBY", from = "OFFLINE")
+    public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+        logger.info("JobEngineStateModel.onBecomeStandbyFromOffline()");
+
+    }
+
+    @Transition(to = "OFFLINE", from = "STANDBY")
+    public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+        logger.info("JobEngineStateModel.onBecomeOfflineFromStandby()");
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
index 8614e8c..940c9c2 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
@@ -1,146 +1,35 @@
 package org.apache.kylin.rest.helix;
 
-import com.google.common.base.Preconditions;
-import org.apache.helix.NotificationContext;
 import org.apache.helix.api.StateTransitionHandlerFactory;
 import org.apache.helix.api.TransitionHandler;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.Transition;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.KylinConfigBase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.streaming.StreamingManager;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
-import org.apache.kylin.job.lock.MockJobLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
 import static org.apache.kylin.rest.helix.HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX;
 
 /**
  */
 public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactory<TransitionHandler> {
-    private static final Logger logger = LoggerFactory.getLogger(LeaderStandbyStateModelFactory.class);
+    private final KylinConfig kylinConfig;
+
+    public LeaderStandbyStateModelFactory(KylinConfig kylinConfig) {
+        this.kylinConfig = kylinConfig;
+    }
 
     @Override
     public TransitionHandler createStateTransitionHandler(PartitionId partitionId) {
         if (partitionId.getResourceId().equals(ResourceId.from(HelixClusterAdmin.RESOURCE_NAME_JOB_ENGINE))) {
-            return JobEngineStateModel.INSTANCE;
+            return JobEngineTransitionHandler.getInstance(kylinConfig);
         }
 
         if (partitionId.getResourceId().stringify().startsWith(RESOURCE_STREAME_CUBE_PREFIX)) {
-            return StreamCubeStateModel.INSTANCE;
+            return StreamCubeBuildTransitionHandler.getInstance(kylinConfig);
         }
 
         return null;
     }
 
-    public static class JobEngineStateModel extends TransitionHandler {
-
-        public static JobEngineStateModel INSTANCE = new JobEngineStateModel();
-
-        @Transition(to = "LEADER", from = "STANDBY")
-        public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
-            logger.info("JobEngineStateModel.onBecomeLeaderFromStandby()");
-            try {
-                final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-                DefaultScheduler scheduler = DefaultScheduler.createInstance();
-                scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock());
-                while (!scheduler.hasStarted()) {
-                    logger.error("scheduler has not been started");
-                    Thread.sleep(1000);
-                }
-            } catch (Exception e) {
-                logger.error("error start DefaultScheduler", e);
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Transition(to = "STANDBY", from = "LEADER")
-        public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
-            logger.info("JobEngineStateModel.onBecomeStandbyFromLeader()");
-            DefaultScheduler.destroyInstance();
-
-        }
-
-        @Transition(to = "STANDBY", from = "OFFLINE")
-        public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
-            logger.info("JobEngineStateModel.onBecomeStandbyFromOffline()");
-
-        }
-
-        @Transition(to = "OFFLINE", from = "STANDBY")
-        public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
-            logger.info("JobEngineStateModel.onBecomeOfflineFromStandby()");
-
-        }
-    }
-
-    public static class StreamCubeStateModel extends TransitionHandler {
-
-        public static StreamCubeStateModel INSTANCE = new StreamCubeStateModel();
-
-        @Transition(to = "LEADER", from = "STANDBY")
-        public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
-            String resourceName = message.getResourceId().stringify();
-            Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX));
-            long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_") + 1));
-            String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_"));
-            long start = Long.parseLong(temp.substring(temp.lastIndexOf("_") + 1));
-            String streamingConfig = temp.substring(0, temp.lastIndexOf("_"));
-
-            final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-            
-            final String cubeName = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingConfig).getCubeName();
-            final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
-            for (CubeSegment segment : cube.getSegments()) {
-                if (segment.getDateRangeStart() <= start && segment.getDateRangeEnd() >= end) {
-                    logger.info("Segment " + segment.getName() + " already exist, no need rebuild.");
-                    return;
-                }
-            }
-            
-            KylinConfigBase.getKylinHome();
-            String segmentId = start + "_" + end;
-            String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingConfig + " " + segmentId + " -oneoff true -start " + start + " -end " + end + " -streaming " + streamingConfig;
-            logger.info("Executing: " + cmd);
-            try {
-                String line;
-                Process p = Runtime.getRuntime().exec(cmd);
-                BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
-                while ((line = input.readLine()) != null) {
-                    logger.info(line);
-                }
-                input.close();
-            } catch (IOException err) {
-                logger.error("Error happens during build streaming  '" + resourceName + "'", err);
-                throw new RuntimeException(err);
-            }
-
-        }
-
-        @Transition(to = "STANDBY", from = "LEADER")
-        public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
-
-        }
-
-        @Transition(to = "STANDBY", from = "OFFLINE")
-        public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
-
-        }
-
-        @Transition(to = "OFFLINE", from = "STANDBY")
-        public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
-
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java b/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java
new file mode 100644
index 0000000..44d8302
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/helix/StreamCubeBuildTransitionHandler.java
@@ -0,0 +1,107 @@
+package org.apache.kylin.rest.helix;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigBase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.rest.request.StreamingBuildRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ */
+public class StreamCubeBuildTransitionHandler extends TransitionHandler {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamCubeBuildTransitionHandler.class);
+
+    private static ConcurrentMap<KylinConfig, StreamCubeBuildTransitionHandler> instanceMaps = Maps.newConcurrentMap();
+    private final KylinConfig kylinConfig;
+
+    private StreamCubeBuildTransitionHandler(KylinConfig kylinConfig) {
+        this.kylinConfig = kylinConfig;
+    }
+
+    public static StreamCubeBuildTransitionHandler getInstance(KylinConfig kylinConfig) {
+        Preconditions.checkNotNull(kylinConfig);
+        instanceMaps.putIfAbsent(kylinConfig, new StreamCubeBuildTransitionHandler(kylinConfig));
+        return instanceMaps.get(kylinConfig);
+    }
+
+    @Transition(to = "LEADER", from = "STANDBY")
+    public void onBecomeLeaderFromStandby(Message message, NotificationContext context) {
+        String resourceName = message.getResourceId().stringify();
+        StreamingBuildRequest streamingBuildRequest = StreamingBuildRequest.fromResourceName(resourceName);
+
+        final String cubeName = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingBuildRequest.getStreaming()).getCubeName();
+        final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+        for (CubeSegment segment : cube.getSegments()) {
+            if (segment.getDateRangeStart() <= streamingBuildRequest.getStart() && segment.getDateRangeEnd() >= streamingBuildRequest.getEnd()) {
+                logger.info("Segment " + segment.getName() + " already exist, no need rebuild.");
+                return;
+            }
+        }
+
+        KylinConfigBase.getKylinHome();
+        String segmentId = streamingBuildRequest.getStart() + "_" + streamingBuildRequest.getEnd();
+        String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start " + streamingBuildRequest.getStreaming() + " " + segmentId + " -oneoff true -start " + streamingBuildRequest.getStart() + " -end " + streamingBuildRequest.getEnd() + " -streaming " + streamingBuildRequest.getStreaming();
+        logger.info("Executing: " + cmd);
+        try {
+            String line;
+            Process p = Runtime.getRuntime().exec(cmd);
+            BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
+            while ((line = input.readLine()) != null) {
+                logger.info(line);
+            }
+            input.close();
+        } catch (IOException err) {
+            logger.error("Error happens during build streaming  '" + resourceName + "'", err);
+            throw new RuntimeException(err);
+        }
+
+    }
+
+    @Transition(to = "STANDBY", from = "LEADER")
+    public void onBecomeStandbyFromLeader(Message message, NotificationContext context) {
+        String resourceName = message.getResourceId().stringify();
+        StreamingBuildRequest streamingBuildRequest = StreamingBuildRequest.fromResourceName(resourceName);
+        KylinConfigBase.getKylinHome();
+        String segmentId = streamingBuildRequest.getStart() + "_" + streamingBuildRequest.getEnd();
+        String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming stop " + streamingBuildRequest.getStreaming() + " " + segmentId;
+        logger.info("Executing: " + cmd);
+        try {
+            String line;
+            Process p = Runtime.getRuntime().exec(cmd);
+            BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
+            while ((line = input.readLine()) != null) {
+                logger.info(line);
+            }
+            input.close();
+        } catch (IOException err) {
+            logger.error("Error happens during build streaming  '" + resourceName + "'", err);
+            throw new RuntimeException(err);
+        }
+    }
+
+    @Transition(to = "STANDBY", from = "OFFLINE")
+    public void onBecomeStandbyFromOffline(Message message, NotificationContext context) {
+
+    }
+
+    @Transition(to = "OFFLINE", from = "STANDBY")
+    public void onBecomeOfflineFromStandby(Message message, NotificationContext context) {
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
index e06a06c..dcf91fd 100644
--- a/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
+++ b/server/src/main/java/org/apache/kylin/rest/request/StreamingBuildRequest.java
@@ -18,15 +18,28 @@
 
 package org.apache.kylin.rest.request;
 
+import com.google.common.base.Preconditions;
+import org.apache.kylin.rest.helix.HelixClusterAdmin;
+
+import static org.apache.kylin.rest.helix.HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX;
+
 public class StreamingBuildRequest {
 
     private String streaming;
     private long start;
     private long end;
-    private boolean fillGap;
     private String message;
     private boolean successful;
 
+    public StreamingBuildRequest() {
+    }
+
+    public StreamingBuildRequest(String streaming, long start, long end) {
+        this.streaming = streaming;
+        this.start = start;
+        this.end = end;
+    }
+
     public String getStreaming() {
         return streaming;
     }
@@ -67,11 +80,17 @@ public class StreamingBuildRequest {
         this.end = end;
     }
 
-    public boolean isFillGap() {
-        return fillGap;
+    public String toResourceName() {
+        return HelixClusterAdmin.RESOURCE_STREAME_CUBE_PREFIX + streaming + "_" + start + "_" + end;
     }
 
-    public void setFillGap(boolean fillGap) {
-        this.fillGap = fillGap;
+    public static StreamingBuildRequest fromResourceName(String resourceName) {
+        Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX));
+        long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_") + 1));
+        String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_"));
+        long start = Long.parseLong(temp.substring(temp.lastIndexOf("_") + 1));
+        String streamingConfig = temp.substring(0, temp.lastIndexOf("_"));
+
+        return new StreamingBuildRequest(streamingConfig, start, end);
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java b/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java
index 1f147ef..b8dcd43 100644
--- a/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java
+++ b/server/src/main/java/org/apache/kylin/rest/security/KylinAuthenticationProvider.java
@@ -72,7 +72,8 @@ public class KylinAuthenticationProvider implements AuthenticationProvider {
             }
 
             logger.debug("Authenticated user " + authed.toString());
-            
+
+            SecurityContextHolder.getContext().setAuthentication(authed);
             UserDetails user;
             
             if (authed.getDetails() == null) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
index da20949..7c2cc48 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -21,7 +21,6 @@ package org.apache.kylin.rest.service;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.engine.streaming.BootstrapConfig;
 import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
@@ -54,8 +53,8 @@ public class StreamingService extends BasicService {
         if (null == cubeInstance) {
             streamingConfigs = getStreamingManager().listAllStreaming();
         } else {
-            for(StreamingConfig config : getStreamingManager().listAllStreaming()){
-                if(cubeInstance.getName().equals(config.getCubeName())){
+            for (StreamingConfig config : getStreamingManager().listAllStreaming()) {
+                if (cubeInstance.getName().equals(config.getCubeName())) {
                     streamingConfigs.add(config);
                 }
             }
@@ -84,34 +83,35 @@ public class StreamingService extends BasicService {
         if (getStreamingManager().getStreamingConfig(config.getName()) != null) {
             throw new InternalErrorException("The streamingConfig named " + config.getName() + " already exists");
         }
-        StreamingConfig streamingConfig =  getStreamingManager().saveStreamingConfig(config);
+        StreamingConfig streamingConfig = getStreamingManager().saveStreamingConfig(config);
         return streamingConfig;
     }
 
-//    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
+    //    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
     public StreamingConfig updateStreamingConfig(StreamingConfig config) throws IOException {
         return getStreamingManager().updateStreamingConfig(config);
     }
 
-//    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
+    //    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#desc, 'ADMINISTRATION') or hasPermission(#desc, 'MANAGEMENT')")
     public void dropStreamingConfig(StreamingConfig config) throws IOException {
         getStreamingManager().removeStreamingConfig(config);
     }
 
+    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
+    public void buildStream(CubeInstance cube, StreamingBuildRequest streamingBuildRequest) {
+        HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(KylinConfig.getInstanceFromEnv());
+        clusterAdmin.addStreamingJob(streamingBuildRequest);
+    }
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
-    public void buildStream(String cube, StreamingBuildRequest streamingBuildRequest) {
+    public void fillGap(CubeInstance cube) {
         HelixClusterAdmin clusterAdmin = HelixClusterAdmin.getInstance(KylinConfig.getInstanceFromEnv());
-        if (streamingBuildRequest.isFillGap()) {
-            final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingBuildRequest.getStreaming());
-            final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
-            logger.info("all gaps:" + org.apache.commons.lang3.StringUtils.join(gaps, ","));
-            for (Pair<Long, Long> gap : gaps) {
-                clusterAdmin.addStreamingJob(streamingBuildRequest.getStreaming(), gap.getFirst(), gap.getSecond());
-            }
-        } else {
-            clusterAdmin.addStreamingJob(streamingBuildRequest.getStreaming(), streamingBuildRequest.getStart(), streamingBuildRequest.getEnd());
+        final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfigByCube(cube.getName());
+        final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
+        logger.info("all gaps:" + org.apache.commons.lang3.StringUtils.join(gaps, ","));
+        for (Pair<Long, Long> gap : gaps) {
+            StreamingBuildRequest streamingBuildRequest = new StreamingBuildRequest(streamingConfig.getName(), gap.getFirst(), gap.getSecond());
+            clusterAdmin.addStreamingJob(streamingBuildRequest);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java
index ab77a9a..fe0e67a 100644
--- a/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/controller/UserControllerTest.java
@@ -41,15 +41,6 @@ public class UserControllerTest extends ServiceTestBase {
 
     private UserController userController;
 
-    @BeforeClass
-    public static void setupResource() {
-        staticCreateTestMetadata();
-        List<GrantedAuthority> authorities = new ArrayList<GrantedAuthority>();
-        User user = new User("ADMIN", "ADMIN", authorities);
-        Authentication authentication = new TestingAuthenticationToken(user, "ADMIN", "ROLE_ADMIN");
-        SecurityContextHolder.getContext().setAuthentication(authentication);
-    }
-
     @Before
     public void setup() throws Exception {
         super.setup();

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java b/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java
index 594e76b5..1c8b779 100644
--- a/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/helix/HelixClusterAdminTest.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.rest.service.TestBaseWithZookeeper;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -39,10 +40,7 @@ import static org.junit.Assert.assertTrue;
 
 /**
 */
-public class HelixClusterAdminTest extends LocalFileMetadataTestCase {
-
-    String zkAddress = "localhost:2199";
-    ZkServer server;
+public class HelixClusterAdminTest extends TestBaseWithZookeeper {
 
     HelixClusterAdmin clusterAdmin1;
     HelixClusterAdmin clusterAdmin2;
@@ -52,21 +50,8 @@ public class HelixClusterAdminTest extends LocalFileMetadataTestCase {
 
     @Before
     public void setup() throws Exception {
-        createTestMetadata();
-        // start zookeeper on localhost
-        final File tmpDir = File.createTempFile("HelixClusterAdminTest", null); 
-        FileUtil.fullyDelete(tmpDir);
-        tmpDir.mkdirs();
-        server = new ZkServer(tmpDir.getAbsolutePath() + "/dataDir", tmpDir.getAbsolutePath() + "/logDir", new IDefaultNameSpace() {
-            @Override
-            public void createDefaultNameSpace(ZkClient zkClient) {
-            }
-        }, 2199);
-        server.start();
-
         kylinConfig = this.getTestConfig();
         kylinConfig.setRestAddress("localhost:7070");
-        kylinConfig.setZookeeperAddress(zkAddress);
         kylinConfig.setClusterName(CLUSTER_NAME);
         
         final ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkAddress);
@@ -105,7 +90,7 @@ public class HelixClusterAdminTest extends LocalFileMetadataTestCase {
         
         // 3. shutdown the first instance
         clusterAdmin1.stop();
-        clusterAdmin1 = null;
+//        clusterAdmin1 = null;
         Thread.sleep(1000);
         assertTrue(clusterAdmin2.isLeaderRole(RESOURCE_NAME_JOB_ENGINE));
         assertEquals(1, kylinConfig.getRestServers().length);
@@ -133,7 +118,6 @@ public class HelixClusterAdminTest extends LocalFileMetadataTestCase {
             clusterAdmin2.stop();
         }
         
-        server.shutdown();
         cleanupTestMetadata();
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index 763bebe..8193884 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -1,366 +1,354 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.rest.service;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.I0Itec.zkclient.IDefaultNameSpace;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkServer;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeDescManager;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.LookupDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.rest.broadcaster.BroadcasterReceiveServlet;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class CacheServiceTest extends LocalFileMetadataTestCase {
-
-    private static Server server;
-
-    private static String ZK_ADDRESS = "localhost:2199";
-    
-    private static KylinConfig configA;
-    private static KylinConfig configB;
-
-    private static final Logger logger = LoggerFactory.getLogger(CacheServiceTest.class);
-
-    private static AtomicLong counter = new AtomicLong();
-
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        staticCreateTestMetadata();
-        configA = KylinConfig.getInstanceFromEnv();
-        configA.setProperty("kylin.rest.servers", "localhost:7070");
-        configB = KylinConfig.getKylinConfigFromInputStream(KylinConfig.getKylinPropertiesAsInputSteam());
-        configB.setProperty("kylin.rest.servers", "localhost:7070");
-        configB.setMetadataUrl("../examples/test_metadata");
-
-        server = new Server(7070);
-        ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
-        context.setContextPath("/");
-        server.setHandler(context);
-
-        final CacheService serviceA = new CacheService() {
-            @Override
-            public KylinConfig getConfig() {
-                return configA;
-            }
-        };
-        final CacheService serviceB = new CacheService() {
-            @Override
-            public KylinConfig getConfig() {
-                return configB;
-            }
-        };
-
-        final CubeService cubeServiceA = new CubeService() {
-            @Override
-            public KylinConfig getConfig() {
-                return configA;
-            }
-        };
-        final CubeService cubeServiceB = new CubeService() {
-            @Override
-            public KylinConfig getConfig() {
-                return configB;
-            }
-        };
-
-        serviceA.setCubeService(cubeServiceA);
-        serviceA.initCubeChangeListener();
-        serviceB.setCubeService(cubeServiceB);
-        serviceB.initCubeChangeListener();
-
-        context.addServlet(new ServletHolder(new BroadcasterReceiveServlet(new BroadcasterReceiveServlet.BroadcasterHandler() {
-            @Override
-            public void handle(String type, String name, String event) {
-
-                Broadcaster.TYPE wipeType = Broadcaster.TYPE.getType(type);
-                Broadcaster.EVENT wipeEvent = Broadcaster.EVENT.getEvent(event);
-                final String log = "wipe cache type: " + wipeType + " event:" + wipeEvent + " name:" + name;
-                logger.info(log);
-                try {
-                    switch (wipeEvent) {
-                    case CREATE:
-                    case UPDATE:
-                        serviceA.rebuildCache(wipeType, name);
-                        serviceB.rebuildCache(wipeType, name);
-                        break;
-                    case DROP:
-                        serviceA.removeCache(wipeType, name);
-                        serviceB.removeCache(wipeType, name);
-                        break;
-                    default:
-                        throw new RuntimeException("invalid type:" + wipeEvent);
-                    }
-                } finally {
-                    counter.incrementAndGet();
-                }
-            }
-        })), "/");
-
-        server.start();
-    }
-
-    @AfterClass
-    public static void afterClass() throws Exception {
-        server.stop();
-        cleanAfterClass();
-    }
-
-    @Before
-    public void setUp() throws Exception {
-        counter.set(0L);
-        createTestMetadata();
-    }
-
-    @After
-    public void after() throws Exception {
-        cleanupTestMetadata();
-    }
-
-    private void waitForCounterAndClear(long count) {
-        int retryTimes = 0;
-        while ((!counter.compareAndSet(count, 0L))) {
-            if (++retryTimes > 30) {
-                throw new RuntimeException("timeout");
-            }
-            try {
-                Thread.sleep(100L);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    private static CubeManager getCubeManager(KylinConfig config) throws Exception {
-        return CubeManager.getInstance(config);
-    }
-
-    private static ProjectManager getProjectManager(KylinConfig config) throws Exception {
-        return ProjectManager.getInstance(config);
-    }
-
-    private static CubeDescManager getCubeDescManager(KylinConfig config) throws Exception {
-        return CubeDescManager.getInstance(config);
-    }
-
-    private static MetadataManager getMetadataManager(KylinConfig config) throws Exception {
-        return MetadataManager.getInstance(config);
-    }
-
-    @Test
-    public void testBasic() throws Exception {
-        assertTrue(!configA.equals(configB));
-
-        assertNotNull(getCubeManager(configA));
-        assertNotNull(getCubeManager(configB));
-        assertNotNull(getCubeDescManager(configA));
-        assertNotNull(getCubeDescManager(configB));
-        assertNotNull(getProjectManager(configB));
-        assertNotNull(getProjectManager(configB));
-        assertNotNull(getMetadataManager(configB));
-        assertNotNull(getMetadataManager(configB));
-
-        assertTrue(!getCubeManager(configA).equals(getCubeManager(configB)));
-        assertTrue(!getCubeDescManager(configA).equals(getCubeDescManager(configB)));
-        assertTrue(!getProjectManager(configA).equals(getProjectManager(configB)));
-        assertTrue(!getMetadataManager(configA).equals(getMetadataManager(configB)));
-
-        assertEquals(getProjectManager(configA).listAllProjects().size(), getProjectManager(configB).listAllProjects().size());
-    }
-
-    @Test
-    public void testCubeCRUD() throws Exception {
-        final Broadcaster broadcaster = Broadcaster.getInstance(configA);
-        broadcaster.getCounterAndClear();
-
-        getStore().deleteResource("/cube/a_whole_new_cube.json");
-
-        //create cube
-
-        final String cubeName = "a_whole_new_cube";
-        final CubeManager cubeManager = getCubeManager(configA);
-        final CubeManager cubeManagerB = getCubeManager(configB);
-        final ProjectManager projectManager = getProjectManager(configA);
-        final ProjectManager projectManagerB = getProjectManager(configB);
-        final CubeDescManager cubeDescManager = getCubeDescManager(configA);
-        final CubeDescManager cubeDescManagerB = getCubeDescManager(configB);
-        final CubeDesc cubeDesc = getCubeDescManager(configA).getCubeDesc("test_kylin_cube_with_slr_desc");
-
-        assertTrue(cubeManager.getCube(cubeName) == null);
-        assertTrue(cubeManagerB.getCube(cubeName) == null);
-        assertTrue(!containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
-        assertTrue(!containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
-        cubeManager.createCube(cubeName, ProjectInstance.DEFAULT_PROJECT_NAME, cubeDesc, null);
-        //one for cube update, one for project update
-        assertEquals(2, broadcaster.getCounterAndClear());
-        waitForCounterAndClear(2);
-
-        assertNotNull(cubeManager.getCube(cubeName));
-        assertNotNull(cubeManagerB.getCube(cubeName));
-        assertTrue(containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
-        assertTrue(containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
-
-        //update cube
-        CubeInstance cube = cubeManager.getCube(cubeName);
-        assertEquals(0, cube.getSegments().size());
-        assertEquals(0, cubeManagerB.getCube(cubeName).getSegments().size());
-        CubeSegment segment = new CubeSegment();
-        segment.setName("test_segment");
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToAddSegs(segment);
-        cube = cubeManager.updateCube(cubeBuilder);
-        //one for cube update
-        assertEquals(1, broadcaster.getCounterAndClear());
-        waitForCounterAndClear(1);
-        assertEquals(1, cubeManagerB.getCube(cubeName).getSegments().size());
-        assertEquals(segment.getName(), cubeManagerB.getCube(cubeName).getSegments().get(0).getName());
-
-        //delete cube
-        cubeManager.dropCube(cubeName, false);
-        //one for cube update, one for project update
-        assertEquals(2, broadcaster.getCounterAndClear());
-        waitForCounterAndClear(2);
-
-        assertTrue(cubeManager.getCube(cubeName) == null);
-        assertTrue(!containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
-        assertTrue(cubeManagerB.getCube(cubeName) == null);
-        assertTrue(!containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
-
-        final String cubeDescName = "test_cube_desc";
-        cubeDesc.setName(cubeDescName);
-        cubeDesc.setLastModified(0);
-        assertTrue(cubeDescManager.getCubeDesc(cubeDescName) == null);
-        assertTrue(cubeDescManagerB.getCubeDesc(cubeDescName) == null);
-        cubeDescManager.createCubeDesc(cubeDesc);
-        //one for add cube desc
-        assertEquals(1, broadcaster.getCounterAndClear());
-        waitForCounterAndClear(1);
-        assertNotNull(cubeDescManager.getCubeDesc(cubeDescName));
-        assertNotNull(cubeDescManagerB.getCubeDesc(cubeDescName));
-
-        cubeDesc.setNotifyList(Arrays.asList("test@email", "test@email", "test@email"));
-        cubeDescManager.updateCubeDesc(cubeDesc);
-        assertEquals(1, broadcaster.getCounterAndClear());
-        waitForCounterAndClear(1);
-        assertEquals(cubeDesc.getNotifyList(), cubeDescManagerB.getCubeDesc(cubeDescName).getNotifyList());
-
-        cubeDescManager.removeCubeDesc(cubeDesc);
-        //one for add cube desc
-        assertEquals(1, broadcaster.getCounterAndClear());
-        waitForCounterAndClear(1);
-        assertTrue(cubeDescManager.getCubeDesc(cubeDescName) == null);
-        assertTrue(cubeDescManagerB.getCubeDesc(cubeDescName) == null);
-
-        getStore().deleteResource("/cube/a_whole_new_cube.json");
-    }
-
-    private TableDesc createTestTableDesc() {
-        TableDesc tableDesc = new TableDesc();
-        tableDesc.setDatabase("TEST_DB");
-        tableDesc.setName("TEST_TABLE");
-        tableDesc.setUuid(UUID.randomUUID().toString());
-        tableDesc.setLastModified(0);
-        return tableDesc;
-    }
-
-    @Test
-    public void testMetaCRUD() throws Exception {
-        final MetadataManager metadataManager = MetadataManager.getInstance(configA);
-        final MetadataManager metadataManagerB = MetadataManager.getInstance(configB);
-        final Broadcaster broadcaster = Broadcaster.getInstance(configA);
-        broadcaster.getCounterAndClear();
-
-        TableDesc tableDesc = createTestTableDesc();
-        assertTrue(metadataManager.getTableDesc(tableDesc.getIdentity()) == null);
-        assertTrue(metadataManagerB.getTableDesc(tableDesc.getIdentity()) == null);
-        metadataManager.saveSourceTable(tableDesc);
-        //only one for table insert
-        assertEquals(1, broadcaster.getCounterAndClear());
-        waitForCounterAndClear(1);
-        assertNotNull(metadataManager.getTableDesc(tableDesc.getIdentity()));
-        assertNotNull(metadataManagerB.getTableDesc(tableDesc.getIdentity()));
-
-        final String dataModelName = "test_data_model";
-        DataModelDesc dataModelDesc = metadataManager.getDataModelDesc("test_kylin_left_join_model_desc");
-        dataModelDesc.setName(dataModelName);
-        dataModelDesc.setLastModified(0);
-        assertTrue(metadataManager.getDataModelDesc(dataModelName) == null);
-        assertTrue(metadataManagerB.getDataModelDesc(dataModelName) == null);
-
-        dataModelDesc.setName(dataModelName);
-        metadataManager.createDataModelDesc(dataModelDesc, "default", "ADMIN");
-        //one for data model creation, one for project meta update
-        assertEquals(2, broadcaster.getCounterAndClear());
-        waitForCounterAndClear(2);
-        assertEquals(dataModelDesc.getName(), metadataManagerB.getDataModelDesc(dataModelName).getName());
-
-        final LookupDesc[] lookups = dataModelDesc.getLookups();
-        assertTrue(lookups.length > 0);
-        dataModelDesc.setLookups(lookups);
-        metadataManager.updateDataModelDesc(dataModelDesc);
-        //only one for data model update
-        assertEquals(1, broadcaster.getCounterAndClear());
-        waitForCounterAndClear(1);
-        assertEquals(dataModelDesc.getLookups().length, metadataManagerB.getDataModelDesc(dataModelName).getLookups().length);
-
-    }
-
-    private boolean containsRealization(Set<IRealization> realizations, RealizationType type, String name) {
-        for (IRealization realization : realizations) {
-            if (realization.getType() == type && realization.getName().equals(name)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// * 
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// * 
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+//*/
+//
+//package org.apache.kylin.rest.service;
+//
+//import org.apache.kylin.common.KylinConfig;
+//import org.apache.kylin.common.restclient.Broadcaster;
+//import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+//import org.apache.kylin.cube.*;
+//import org.apache.kylin.cube.model.CubeDesc;
+//import org.apache.kylin.metadata.MetadataManager;
+//import org.apache.kylin.metadata.model.DataModelDesc;
+//import org.apache.kylin.metadata.model.LookupDesc;
+//import org.apache.kylin.metadata.model.TableDesc;
+//import org.apache.kylin.metadata.project.ProjectInstance;
+//import org.apache.kylin.metadata.project.ProjectManager;
+//import org.apache.kylin.metadata.realization.IRealization;
+//import org.apache.kylin.metadata.realization.RealizationType;
+//import org.apache.kylin.rest.broadcaster.BroadcasterReceiveServlet;
+//import org.eclipse.jetty.server.Server;
+//import org.eclipse.jetty.servlet.ServletContextHandler;
+//import org.eclipse.jetty.servlet.ServletHolder;
+//import org.junit.*;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import java.util.Arrays;
+//import java.util.Set;
+//import java.util.UUID;
+//import java.util.concurrent.atomic.AtomicLong;
+//
+//import static org.junit.Assert.*;
+//
+///**
+// */
+//public class CacheServiceTest extends LocalFileMetadataTestCase {
+//
+//    private static Server server;
+//
+//    private static String ZK_ADDRESS = "localhost:2199";
+//
+//    private static KylinConfig configA;
+//    private static KylinConfig configB;
+//
+//    private static final Logger logger = LoggerFactory.getLogger(CacheServiceTest.class);
+//
+//    private static AtomicLong counter = new AtomicLong();
+//
+//    @BeforeClass
+//    public static void beforeClass() throws Exception {
+//        staticCreateTestMetadata();
+//        configA = KylinConfig.getInstanceFromEnv();
+//        configA.setProperty("kylin.rest.servers", "localhost:7070");
+//        configB = KylinConfig.getKylinConfigFromInputStream(KylinConfig.getKylinPropertiesAsInputSteam());
+//        configB.setProperty("kylin.rest.servers", "localhost:7070");
+//        configB.setMetadataUrl("../examples/test_metadata");
+//
+//        server = new Server(7070);
+//        ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+//        context.setContextPath("/");
+//        server.setHandler(context);
+//
+//        final CacheService serviceA = new CacheService() {
+//            @Override
+//            public KylinConfig getConfig() {
+//                return configA;
+//            }
+//        };
+//        final CacheService serviceB = new CacheService() {
+//            @Override
+//            public KylinConfig getConfig() {
+//                return configB;
+//            }
+//        };
+//
+//        final CubeService cubeServiceA = new CubeService() {
+//            @Override
+//            public KylinConfig getConfig() {
+//                return configA;
+//            }
+//        };
+//        final CubeService cubeServiceB = new CubeService() {
+//            @Override
+//            public KylinConfig getConfig() {
+//                return configB;
+//            }
+//        };
+//
+//        serviceA.setCubeService(cubeServiceA);
+//        serviceA.initCubeChangeListener();
+//        serviceB.setCubeService(cubeServiceB);
+//        serviceB.initCubeChangeListener();
+//
+//        context.addServlet(new ServletHolder(new BroadcasterReceiveServlet(new BroadcasterReceiveServlet.BroadcasterHandler() {
+//            @Override
+//            public void handle(String type, String name, String event) {
+//
+//                Broadcaster.TYPE wipeType = Broadcaster.TYPE.getType(type);
+//                Broadcaster.EVENT wipeEvent = Broadcaster.EVENT.getEvent(event);
+//                final String log = "wipe cache type: " + wipeType + " event:" + wipeEvent + " name:" + name;
+//                logger.info(log);
+//                try {
+//                    switch (wipeEvent) {
+//                    case CREATE:
+//                    case UPDATE:
+//                        serviceA.rebuildCache(wipeType, name);
+//                        serviceB.rebuildCache(wipeType, name);
+//                        break;
+//                    case DROP:
+//                        serviceA.removeCache(wipeType, name);
+//                        serviceB.removeCache(wipeType, name);
+//                        break;
+//                    default:
+//                        throw new RuntimeException("invalid type:" + wipeEvent);
+//                    }
+//                } finally {
+//                    counter.incrementAndGet();
+//                }
+//            }
+//        })), "/");
+//
+//        server.start();
+//    }
+//
+//    @AfterClass
+//    public static void afterClass() throws Exception {
+//        server.stop();
+//        cleanAfterClass();
+//    }
+//
+//    @Before
+//    public void setUp() throws Exception {
+//        counter.set(0L);
+//        createTestMetadata();
+//    }
+//
+//    @After
+//    public void after() throws Exception {
+//        cleanupTestMetadata();
+//    }
+//
+//    private void waitForCounterAndClear(long count) {
+//        int retryTimes = 0;
+//        while ((!counter.compareAndSet(count, 0L))) {
+//            if (++retryTimes > 30) {
+//                throw new RuntimeException("timeout");
+//            }
+//            try {
+//                Thread.sleep(100L);
+//            } catch (InterruptedException e) {
+//                e.printStackTrace();
+//            }
+//        }
+//    }
+//
+//    private static CubeManager getCubeManager(KylinConfig config) throws Exception {
+//        return CubeManager.getInstance(config);
+//    }
+//
+//    private static ProjectManager getProjectManager(KylinConfig config) throws Exception {
+//        return ProjectManager.getInstance(config);
+//    }
+//
+//    private static CubeDescManager getCubeDescManager(KylinConfig config) throws Exception {
+//        return CubeDescManager.getInstance(config);
+//    }
+//
+//    private static MetadataManager getMetadataManager(KylinConfig config) throws Exception {
+//        return MetadataManager.getInstance(config);
+//    }
+//
+//    @Test
+//    public void testBasic() throws Exception {
+//        assertTrue(!configA.equals(configB));
+//
+//        assertNotNull(getCubeManager(configA));
+//        assertNotNull(getCubeManager(configB));
+//        assertNotNull(getCubeDescManager(configA));
+//        assertNotNull(getCubeDescManager(configB));
+//        assertNotNull(getProjectManager(configB));
+//        assertNotNull(getProjectManager(configB));
+//        assertNotNull(getMetadataManager(configB));
+//        assertNotNull(getMetadataManager(configB));
+//
+//        assertTrue(!getCubeManager(configA).equals(getCubeManager(configB)));
+//        assertTrue(!getCubeDescManager(configA).equals(getCubeDescManager(configB)));
+//        assertTrue(!getProjectManager(configA).equals(getProjectManager(configB)));
+//        assertTrue(!getMetadataManager(configA).equals(getMetadataManager(configB)));
+//
+//        assertEquals(getProjectManager(configA).listAllProjects().size(), getProjectManager(configB).listAllProjects().size());
+//    }
+//
+//    @Test
+//    public void testCubeCRUD() throws Exception {
+//        final Broadcaster broadcaster = Broadcaster.getInstance(configA);
+//        broadcaster.getCounterAndClear();
+//
+//        getStore().deleteResource("/cube/a_whole_new_cube.json");
+//
+//        //create cube
+//
+//        final String cubeName = "a_whole_new_cube";
+//        final CubeManager cubeManager = getCubeManager(configA);
+//        final CubeManager cubeManagerB = getCubeManager(configB);
+//        final ProjectManager projectManager = getProjectManager(configA);
+//        final ProjectManager projectManagerB = getProjectManager(configB);
+//        final CubeDescManager cubeDescManager = getCubeDescManager(configA);
+//        final CubeDescManager cubeDescManagerB = getCubeDescManager(configB);
+//        final CubeDesc cubeDesc = getCubeDescManager(configA).getCubeDesc("test_kylin_cube_with_slr_desc");
+//
+//        assertTrue(cubeManager.getCube(cubeName) == null);
+//        assertTrue(cubeManagerB.getCube(cubeName) == null);
+//        assertTrue(!containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
+//        assertTrue(!containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
+//        cubeManager.createCube(cubeName, ProjectInstance.DEFAULT_PROJECT_NAME, cubeDesc, null);
+//        //one for cube update, one for project update
+//        assertEquals(2, broadcaster.getCounterAndClear());
+//        waitForCounterAndClear(2);
+//
+//        assertNotNull(cubeManager.getCube(cubeName));
+//        assertNotNull(cubeManagerB.getCube(cubeName));
+//        assertTrue(containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
+//        assertTrue(containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
+//
+//        //update cube
+//        CubeInstance cube = cubeManager.getCube(cubeName);
+//        assertEquals(0, cube.getSegments().size());
+//        assertEquals(0, cubeManagerB.getCube(cubeName).getSegments().size());
+//        CubeSegment segment = new CubeSegment();
+//        segment.setName("test_segment");
+//        CubeUpdate cubeBuilder = new CubeUpdate(cube);
+//        cubeBuilder.setToAddSegs(segment);
+//        cube = cubeManager.updateCube(cubeBuilder);
+//        //one for cube update
+//        assertEquals(1, broadcaster.getCounterAndClear());
+//        waitForCounterAndClear(1);
+//        assertEquals(1, cubeManagerB.getCube(cubeName).getSegments().size());
+//        assertEquals(segment.getName(), cubeManagerB.getCube(cubeName).getSegments().get(0).getName());
+//
+//        //delete cube
+//        cubeManager.dropCube(cubeName, false);
+//        //one for cube update, one for project update
+//        assertEquals(2, broadcaster.getCounterAndClear());
+//        waitForCounterAndClear(2);
+//
+//        assertTrue(cubeManager.getCube(cubeName) == null);
+//        assertTrue(!containsRealization(projectManager.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
+//        assertTrue(cubeManagerB.getCube(cubeName) == null);
+//        assertTrue(!containsRealization(projectManagerB.listAllRealizations(ProjectInstance.DEFAULT_PROJECT_NAME), RealizationType.CUBE, cubeName));
+//
+//        final String cubeDescName = "test_cube_desc";
+//        cubeDesc.setName(cubeDescName);
+//        cubeDesc.setLastModified(0);
+//        assertTrue(cubeDescManager.getCubeDesc(cubeDescName) == null);
+//        assertTrue(cubeDescManagerB.getCubeDesc(cubeDescName) == null);
+//        cubeDescManager.createCubeDesc(cubeDesc);
+//        //one for add cube desc
+//        assertEquals(1, broadcaster.getCounterAndClear());
+//        waitForCounterAndClear(1);
+//        assertNotNull(cubeDescManager.getCubeDesc(cubeDescName));
+//        assertNotNull(cubeDescManagerB.getCubeDesc(cubeDescName));
+//
+//        cubeDesc.setNotifyList(Arrays.asList("test@email", "test@email", "test@email"));
+//        cubeDescManager.updateCubeDesc(cubeDesc);
+//        assertEquals(1, broadcaster.getCounterAndClear());
+//        waitForCounterAndClear(1);
+//        assertEquals(cubeDesc.getNotifyList(), cubeDescManagerB.getCubeDesc(cubeDescName).getNotifyList());
+//
+//        cubeDescManager.removeCubeDesc(cubeDesc);
+//        //one for add cube desc
+//        assertEquals(1, broadcaster.getCounterAndClear());
+//        waitForCounterAndClear(1);
+//        assertTrue(cubeDescManager.getCubeDesc(cubeDescName) == null);
+//        assertTrue(cubeDescManagerB.getCubeDesc(cubeDescName) == null);
+//
+//        getStore().deleteResource("/cube/a_whole_new_cube.json");
+//    }
+//
+//    private TableDesc createTestTableDesc() {
+//        TableDesc tableDesc = new TableDesc();
+//        tableDesc.setDatabase("TEST_DB");
+//        tableDesc.setName("TEST_TABLE");
+//        tableDesc.setUuid(UUID.randomUUID().toString());
+//        tableDesc.setLastModified(0);
+//        return tableDesc;
+//    }
+//
+//    @Test
+//    public void testMetaCRUD() throws Exception {
+//        final MetadataManager metadataManager = MetadataManager.getInstance(configA);
+//        final MetadataManager metadataManagerB = MetadataManager.getInstance(configB);
+//        final Broadcaster broadcaster = Broadcaster.getInstance(configA);
+//        broadcaster.getCounterAndClear();
+//
+//        TableDesc tableDesc = createTestTableDesc();
+//        assertTrue(metadataManager.getTableDesc(tableDesc.getIdentity()) == null);
+//        assertTrue(metadataManagerB.getTableDesc(tableDesc.getIdentity()) == null);
+//        metadataManager.saveSourceTable(tableDesc);
+//        //only one for table insert
+//        assertEquals(1, broadcaster.getCounterAndClear());
+//        waitForCounterAndClear(1);
+//        assertNotNull(metadataManager.getTableDesc(tableDesc.getIdentity()));
+//        assertNotNull(metadataManagerB.getTableDesc(tableDesc.getIdentity()));
+//
+//        final String dataModelName = "test_data_model";
+//        DataModelDesc dataModelDesc = metadataManager.getDataModelDesc("test_kylin_left_join_model_desc");
+//        dataModelDesc.setName(dataModelName);
+//        dataModelDesc.setLastModified(0);
+//        assertTrue(metadataManager.getDataModelDesc(dataModelName) == null);
+//        assertTrue(metadataManagerB.getDataModelDesc(dataModelName) == null);
+//
+//        dataModelDesc.setName(dataModelName);
+//        metadataManager.createDataModelDesc(dataModelDesc, "default", "ADMIN");
+//        //one for data model creation, one for project meta update
+//        assertEquals(2, broadcaster.getCounterAndClear());
+//        waitForCounterAndClear(2);
+//        assertEquals(dataModelDesc.getName(), metadataManagerB.getDataModelDesc(dataModelName).getName());
+//
+//        final LookupDesc[] lookups = dataModelDesc.getLookups();
+//        assertTrue(lookups.length > 0);
+//        dataModelDesc.setLookups(lookups);
+//        metadataManager.updateDataModelDesc(dataModelDesc);
+//        //only one for data model update
+//        assertEquals(1, broadcaster.getCounterAndClear());
+//        waitForCounterAndClear(1);
+//        assertEquals(dataModelDesc.getLookups().length, metadataManagerB.getDataModelDesc(dataModelName).getLookups().length);
+//
+//    }
+//
+//    private boolean containsRealization(Set<IRealization> realizations, RealizationType type, String name) {
+//        for (IRealization realization : realizations) {
+//            if (realization.getType() == type && realization.getName().equals(name)) {
+//                return true;
+//            }
+//        }
+//        return false;
+//    }
+//
+//}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f41fd5c/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
index f8dc945..ca4fe39 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java
@@ -18,6 +18,12 @@
 
 package org.apache.kylin.rest.service;
 
+import com.google.common.collect.Lists;
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeManager;
@@ -26,42 +32,42 @@ import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.realization.RealizationRegistry;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.apache.kylin.rest.helix.HelixClusterAdmin;
+import org.junit.*;
 import org.junit.runner.RunWith;
 import org.springframework.security.authentication.TestingAuthenticationToken;
 import org.springframework.security.core.Authentication;
 import org.springframework.security.core.context.SecurityContextHolder;
+import org.springframework.security.core.userdetails.User;
+import org.springframework.security.core.userdetails.UserDetails;
 import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
 /**
  * @author xduo
  */
 @RunWith(SpringJUnit4ClassRunner.class)
 @ContextConfiguration(locations = { "classpath:applicationContext.xml", "classpath:kylinSecurity.xml" })
 @ActiveProfiles("testing")
-public class ServiceTestBase extends LocalFileMetadataTestCase {
-
-    @BeforeClass
-    public static void setupResource() throws Exception {
-        staticCreateTestMetadata();
-        Authentication authentication = new TestingAuthenticationToken("ADMIN", "ADMIN", "ROLE_ADMIN");
-        SecurityContextHolder.getContext().setAuthentication(authentication);
-    }
-
-    @AfterClass
-    public static void tearDownResource() {
-    }
+public class ServiceTestBase extends TestBaseWithZookeeper {
 
     @Before
     public void setup() throws Exception {
         this.createTestMetadata();
 
+        UserService.UserGrantedAuthority userGrantedAuthority = new UserService.UserGrantedAuthority();
+        userGrantedAuthority.setAuthority("ROLE_ADMIN");
+        UserDetails user = new User("ADMIN", "skippped-ldap", Lists.newArrayList(userGrantedAuthority));
+        Authentication authentication = new TestingAuthenticationToken(user, "ADMIN", "ROLE_ADMIN");
+        SecurityContextHolder.getContext().setAuthentication(authentication);
+        KylinConfig kylinConfig = this.getTestConfig();
+        kylinConfig.setRestAddress("localhost:7070");
+
         MetadataManager.clearCache();
         CubeDescManager.clearCache();
         CubeManager.clearCache();


Mime
View raw message