kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shaofeng...@apache.org
Subject [1/5] kylin git commit: KYLIN-1311 fix small bug
Date Tue, 02 Feb 2016 09:40:11 GMT
Repository: kylin
Updated Branches:
  refs/heads/helix-201601 2c66114ee -> 5410e62dd


KYLIN-1311 fix small bug

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f3987462
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f3987462
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f3987462

Branch: refs/heads/helix-201601
Commit: f3987462e6685f2ed7374cf5dc058b8d22fc835f
Parents: 2c66114
Author: shaofengshi <shaofengshi@apache.org>
Authored: Fri Jan 15 17:57:26 2016 +0800
Committer: shaofengshi <shaofengshi@apache.org>
Committed: Fri Jan 15 17:57:26 2016 +0800

----------------------------------------------------------------------
 .../engine/streaming/StreamingManager.java      | 11 +++++-----
 .../rest/controller/StreamingController.java    | 13 ++++++------
 .../helix/LeaderStandbyStateModelFactory.java   | 21 +++++++++++++++++---
 3 files changed, 30 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/f3987462/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index af04a11..798fc3f 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -137,6 +137,12 @@ public class StreamingManager {
         return streamingMap.get(name);
     }
 
+    public StreamingConfig getStreamingConfigByCube(String cubeName) {
+        String streamingConfig = cubeName + "_streaming";
+        return getStreamingConfig(streamingConfig);
+    }
+
+
     public List<StreamingConfig> listAllStreaming() {
         return new ArrayList<>(streamingMap.values());
     }
@@ -168,11 +174,6 @@ public class StreamingManager {
         streamingMap.remove(streamingConfig.getName());
     }
 
-    public StreamingConfig getConfig(String name) {
-        name = name.toUpperCase();
-        return streamingMap.get(name);
-    }
-
     public void removeStreamingLocal(String streamingName) {
         streamingMap.removeLocal(streamingName);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f3987462/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index 57831d5..fb806d1 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -236,13 +236,11 @@ public class StreamingController extends BasicController {
      * @return
      * @throws IOException
      */
-    @RequestMapping(value = "/{streamingName}/build", method = {RequestMethod.PUT})
+    @RequestMapping(value = "/{cubeName}/build", method = {RequestMethod.PUT})
     @ResponseBody
-    public StreamingBuildRequest buildStream(@PathVariable String streamingName, @RequestBody
StreamingBuildRequest streamingBuildRequest) {
-        streamingBuildRequest.setStreaming(streamingName);
-        StreamingConfig streamingConfig = streamingService.getStreamingManager().getConfig(streamingName);
-        Preconditions.checkNotNull(streamingConfig, "Stream config '" + streamingName + "'
is not found.");
-        String cubeName = streamingConfig.getCubeName();
+    public StreamingBuildRequest buildStream(@PathVariable String cubeName, @RequestBody
StreamingBuildRequest streamingBuildRequest) {
+        StreamingConfig streamingConfig = streamingService.getStreamingManager().getStreamingConfigByCube(cubeName);
+        Preconditions.checkNotNull(streamingConfig, "Stream config for '" + cubeName + "'
is not found.");
         List<CubeInstance> cubes = cubeService.getCubes(cubeName, null, null, null,
null);
         Preconditions.checkArgument(cubes.size() == 1, "Cube '" + cubeName + "' is not found.");
         CubeInstance cube = cubes.get(0);
@@ -257,7 +255,8 @@ public class StreamingController extends BasicController {
             }
         }
 
-        streamingService.buildStream(streamingName, streamingBuildRequest);
+        streamingBuildRequest.setStreaming(streamingConfig.getName());
+        streamingService.buildStream(cubeName, streamingBuildRequest);
         streamingBuildRequest.setMessage("Build request is submitted successfully.");
         streamingBuildRequest.setSuccessful(true);
         return streamingBuildRequest;

http://git-wip-us.apache.org/repos/asf/kylin/blob/f3987462/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
index df23ea0..8614e8c 100644
--- a/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
+++ b/server/src/main/java/org/apache/kylin/rest/helix/LeaderStandbyStateModelFactory.java
@@ -10,6 +10,10 @@ import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.Transition;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigBase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
 import org.apache.kylin.job.lock.MockJobLock;
@@ -48,7 +52,7 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
         public void onBecomeLeaderFromStandby(Message message, NotificationContext context)
{
             logger.info("JobEngineStateModel.onBecomeLeaderFromStandby()");
             try {
-                KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+                final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
                 DefaultScheduler scheduler = DefaultScheduler.createInstance();
                 scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock());
                 while (!scheduler.hasStarted()) {
@@ -89,11 +93,22 @@ public class LeaderStandbyStateModelFactory extends StateTransitionHandlerFactor
         public void onBecomeLeaderFromStandby(Message message, NotificationContext context)
{
             String resourceName = message.getResourceId().stringify();
             Preconditions.checkArgument(resourceName.startsWith(RESOURCE_STREAME_CUBE_PREFIX));
-            long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_"))
+ 1);
+            long end = Long.parseLong(resourceName.substring(resourceName.lastIndexOf("_")
+ 1));
             String temp = resourceName.substring(RESOURCE_STREAME_CUBE_PREFIX.length(), resourceName.lastIndexOf("_"));
-            long start = Long.parseLong(temp.substring(temp.lastIndexOf("_")) + 1);
+            long start = Long.parseLong(temp.substring(temp.lastIndexOf("_") + 1));
             String streamingConfig = temp.substring(0, temp.lastIndexOf("_"));
 
+            final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+            
+            final String cubeName = StreamingManager.getInstance(kylinConfig).getStreamingConfig(streamingConfig).getCubeName();
+            final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+            for (CubeSegment segment : cube.getSegments()) {
+                if (segment.getDateRangeStart() <= start && segment.getDateRangeEnd()
>= end) {
+                    logger.info("Segment " + segment.getName() + " already exist, no need
rebuild.");
+                    return;
+                }
+            }
+            
             KylinConfigBase.getKylinHome();
             String segmentId = start + "_" + end;
             String cmd = KylinConfigBase.getKylinHome() + "/bin/kylin.sh streaming start
" + streamingConfig + " " + segmentId + " -oneoff true -start " + start + " -end " + end +
" -streaming " + streamingConfig;


Mime
View raw message