kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [1/2] kylin git commit: KYLIN-2672 Only clean necessary cache for CubeMigrationCLI
Date Mon, 05 Feb 2018 10:41:16 GMT
Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2672 [created] b8105db9e


KYLIN-2672 Only clean necessary cache for CubeMigrationCLI

Signed-off-by: Li Yang <liyang@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/294fc908
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/294fc908
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/294fc908

Branch: refs/heads/KYLIN-2672
Commit: 294fc9084bdc1a243ea3d5c8550060c24b04bf43
Parents: 4a29d92
Author: kangkaisen <kangkaisen@meituan.com>
Authored: Mon Dec 25 18:33:31 2017 +0800
Committer: Li Yang <liyang@apache.org>
Committed: Mon Feb 5 10:57:45 2018 +0800

----------------------------------------------------------------------
 .../kylin/common/restclient/RestClient.java     | 20 +++++++
 .../java/org/apache/kylin/cube/CubeManager.java |  2 +-
 .../kylin/metadata/TableMetadataManager.java    | 12 ++++
 .../apache/kylin/metadata/model/TableDesc.java  |  2 +-
 .../kylin/metadata/project/ProjectL2Cache.java  |  4 ++
 .../kylin/metadata/project/ProjectManager.java  |  6 +-
 .../kylin/rest/controller/CacheController.java  | 10 ++++
 .../rest/request/CubeMigrationRequest.java      | 62 ++++++++++++++++++++
 .../apache/kylin/rest/service/CacheService.java | 34 ++++++++++-
 .../org/apache/kylin/tool/CubeMigrationCLI.java | 28 ++++++---
 10 files changed, 168 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc908/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index 02045ae..e1cc13c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -274,6 +274,26 @@ public class RestClient {
         return response;
     }
 
+    public void clearCacheForCubeMigration(String cube, String project, String model, Map<String,
String> tableToProjects) throws IOException{
+        String url = baseUrl + "/cache/migration";
+        HttpPost post = new HttpPost(url);
+
+        post.addHeader("Accept", "application/json, text/plain, */*");
+        post.addHeader("Content-Type", "application/json");
+
+        HashMap<String, Object> paraMap = new HashMap<String, Object>();
+        paraMap.put("cube", cube);
+        paraMap.put("project", project);
+        paraMap.put("model", model);
+        paraMap.put("tableToProjects", tableToProjects);
+        String jsonMsg = JsonUtil.writeValueAsString(paraMap);
+        post.setEntity(new StringEntity(jsonMsg, "UTF-8"));
+        HttpResponse response = client.execute(post);
+        if (response.getStatusLine().getStatusCode() != 200) {
+            throw new IOException("Invalid response " + response.getStatusLine().getStatusCode());
+        }
+    }
+
     private HashMap dealResponse(HttpResponse response) throws IOException {
         if (response.getStatusLine().getStatusCode() != 200) {
             throw new IOException("Invalid response " + response.getStatusLine().getStatusCode());

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc908/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 8bdb5aa..9c52e8b 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -386,7 +386,7 @@ public class CubeManager implements IRealizationProvider {
     }
 
     // for internal
-    CubeInstance reloadCubeQuietly(String cubeName) {
+    public CubeInstance reloadCubeQuietly(String cubeName) {
         try (AutoLock lock = cubeMapLock.lockForWrite()) {
             CubeInstance cube = crud.reloadQuietly(cubeName);
             if (cube != null)

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc908/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
index f09c47c..2308df4 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java
@@ -151,6 +151,12 @@ public class TableMetadataManager {
         }
     }
 
+    public void reloadSourceTable(String table, String project) {
+        try (AutoLock lock = srcTableMapLock.lockForWrite()) {
+            srcTableCrud.reloadQuietly(TableDesc.concatResourcePath(table, project));
+        }
+    }
+
     public List<TableDesc> listAllTables(String prj) {
         try (AutoLock lock = srcTableMapLock.lockForWrite()) {
             return Lists.newArrayList(getAllTablesMap(prj).values());
@@ -314,6 +320,12 @@ public class TableMetadataManager {
         }
     }
 
+    public void reloadTableExt(String table, String project) {
+        try (AutoLock lock = srcExtMapLock.lockForWrite()) {
+            srcExtCrud.reloadQuietly(TableExtDesc.concatResourcePath(table, project));
+        }
+    }
+
     /**
      * Get table extended info. Keys are defined in {@link MetadataConstants}
      * 

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc908/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index 68bc5e9..be278de 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -57,7 +57,7 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware
{
 
     // this method should only used for getting dest path when copying from src to dest.
     // if you want to get table's src path, use getResourcePath() instead.
-    private static String concatResourcePath(String tableIdentity, String prj) {
+    public static String concatResourcePath(String tableIdentity, String prj) {
         return concatRawResourcePath(makeResourceName(tableIdentity, prj));
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc908/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectL2Cache.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectL2Cache.java
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectL2Cache.java
index 70b6a12..6e09ae8 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectL2Cache.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectL2Cache.java
@@ -187,6 +187,10 @@ class ProjectL2Cache {
         return result;
     }
 
+    public void reloadCacheByProject(String project) {
+        projectCaches.put(project, loadCache(project));
+    }
+
     private ProjectCache loadCache(String project) {
         logger.debug("Loading L2 project cache for " + project);
         ProjectCache projectCache = new ProjectCache(project);

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc908/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index 0dd364d..aae692d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -117,10 +117,14 @@ public class ProjectManager {
         l2Cache.clear();
     }
 
+    public void reloadProjectL2Cache(String project) {
+        l2Cache.reloadCacheByProject(project);
+    }
+
     public ProjectInstance reloadProjectQuietly(String project) throws IOException {
         try (AutoLock lock = prjMapLock.lockForWrite()) {
             ProjectInstance prj = crud.reloadQuietly(project);
-            clearL2Cache();
+            reloadProjectL2Cache(project);
             return prj;
         }
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc908/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
b/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
index 992094b..08b7cc4 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.rest.request.CubeMigrationRequest;
 import org.apache.kylin.rest.service.CacheService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,6 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.ResponseBody;
@@ -73,6 +75,14 @@ public class CacheController extends BasicController {
         cacheService.notifyMetadataChange(Broadcaster.SYNC_ALL, Broadcaster.Event.UPDATE,
Broadcaster.SYNC_ALL);
     }
 
+    @RequestMapping(value = "/migration", method = RequestMethod.POST)
+    @ResponseBody
+    public void clearCacheForCubeMigration(@RequestBody CubeMigrationRequest request) throws
IOException {
+        cacheService.clearCacheForCubeMigration(request.getCube(), request.getProject(),
request.getModel(), request.getTableToProjects());
+
+        cacheService.cleanDataCache(request.getProject());
+    }
+
     public void setCacheService(CacheService cacheService) {
         this.cacheService = cacheService;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc908/server-base/src/main/java/org/apache/kylin/rest/request/CubeMigrationRequest.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/CubeMigrationRequest.java
b/server-base/src/main/java/org/apache/kylin/rest/request/CubeMigrationRequest.java
new file mode 100644
index 0000000..175fb59
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/CubeMigrationRequest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.request;
+
+import java.util.Map;
+
+public class CubeMigrationRequest {
+    private String cube;
+    private String model;
+    private String project;
+
+    private Map<String, String> tableToProjects;//For KYLIN-2717 compatibility, the
project of old table will be NULL
+
+    public String getProject() {
+        return project;
+    }
+
+    public void setProject(String project) {
+        this.project = project;
+    }
+
+    public String getCube() {
+        return cube;
+    }
+
+    public void setCube(String cube) {
+        this.cube = cube;
+    }
+
+    public String getModel() {
+        return model;
+    }
+
+    public void setModel(String model) {
+        this.model = model;
+    }
+
+    public Map<String, String> getTableToProjects() {
+        return tableToProjects;
+    }
+
+    public void setTableToProjects(Map<String, String> tableToProjects) {
+        this.tableToProjects = tableToProjects;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc908/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
index b61309e..a8771ed 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -20,6 +20,7 @@ package org.apache.kylin.rest.service;
 
 import java.io.IOException;
 
+import java.util.Map;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
 import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -114,7 +115,7 @@ public class CacheService extends BasicService implements InitializingBean
{
         broadcaster.notifyListener(entity, event, cacheKey);
     }
 
-    protected void cleanDataCache(String project) {
+    public void cleanDataCache(String project) {
         if (cacheManager != null) {
             logger.info("cleaning cache for project " + project + " (currently remove all
entries)");
             cacheManager.getCache(QueryService.SUCCESS_QUERY_CACHE).removeAll();
@@ -133,4 +134,35 @@ public class CacheService extends BasicService implements InitializingBean
{
         }
     }
 
+    public void clearCacheForCubeMigration(String cube, String project, String model, Map<String,
String> tableToProjects) throws IOException {
+        //the metadata reloading must be in order
+
+        //table must before model
+        for (Map.Entry<String, String> entry : tableToProjects.entrySet()) {
+            //For KYLIN-2717 compatibility, use tableProject not project
+            getTableManager().reloadSourceTable(entry.getKey(), entry.getValue());
+            getTableManager().reloadTableExt(entry.getKey(), entry.getValue());
+        }
+        logger.info("reload table cache done");
+
+        //ProjectInstance cache must before cube and model cache, as the new cubeDesc init
and model reloading relays on latest ProjectInstance cache
+        getProjectManager().reloadProjectQuietly(project);
+        logger.info("reload project cache done");
+
+        //model must before cube desc
+        getDataModelManager().reloadDataModel(model);
+        logger.info("reload model cache done");
+
+        //cube desc must before cube instance
+        getCubeDescManager().reloadCubeDescLocal(cube);
+        logger.info("reload cubeDesc cache done");
+
+        getCubeManager().reloadCubeQuietly(cube);
+        logger.info("reload cube cache done");
+
+        //reload project l2cache again after cube cache, because the project L2 cache relay
on latest cube cache
+        getProjectManager().reloadProjectL2Cache(project);
+        logger.info("reload project l2cache done");
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/294fc908/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index a4a6ab9..5426b62 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -54,7 +54,6 @@ import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
@@ -129,8 +128,16 @@ public class CubeMigrationCLI extends AbstractApplication {
             throws IOException, InterruptedException {
 
         moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri),
cubeName,
-                projectName, Boolean.parseBoolean(copyAcl), Boolean.parseBoolean(purgeAndDisable),
-                Boolean.parseBoolean(overwriteIfExists), Boolean.parseBoolean(realExecute),
true);
+                projectName, copyAcl, purgeAndDisable, overwriteIfExists, realExecute);
+    }
+
+    public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String
projectName, String copyAcl,
+            String purgeAndDisable, String overwriteIfExists, String realExecute)
+            throws IOException, InterruptedException {
+
+        moveCube(srcCfg, dstCfg, cubeName, projectName, Boolean.parseBoolean(copyAcl),
+                Boolean.parseBoolean(purgeAndDisable), Boolean.parseBoolean(overwriteIfExists),
+                Boolean.parseBoolean(realExecute), true);
     }
 
     public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName,
String copyAcl,
@@ -187,12 +194,12 @@ public class CubeMigrationCLI extends AbstractApplication {
             if (migrateSegment) {
                 checkMigrationSuccess(dstConfig, cubeName, true);
             }
-            updateMeta(dstConfig);
+            updateMeta(dstConfig, projectName, cubeName, cube.getModel());
         } else {
             showOpts();
         }
     }
-
+    
     public void checkMigrationSuccess(KylinConfig kylinConfig, String cubeName, Boolean ifFix)
throws IOException {
         CubeMigrationCheckCLI checkCLI = new CubeMigrationCheckCLI(kylinConfig, ifFix);
         checkCLI.execute(cubeName);
@@ -619,7 +626,7 @@ public class CubeMigrationCLI extends AbstractApplication {
         }
         }
     }
-
+    
     private String renameTableWithinProject(String srcItem) {
         if (dstProject != null && srcItem.contains(ResourceStore.TABLE_RESOURCE_ROOT))
{
             String tableIdentity = TableDesc.parseResourcePath(srcItem).getFirst();
@@ -631,13 +638,18 @@ public class CubeMigrationCLI extends AbstractApplication {
         return srcItem;
     }
 
-    private void updateMeta(KylinConfig config) {
+    private void updateMeta(KylinConfig config, String projectName, String cubeName, DataModelDesc
model) {
         String[] nodes = config.getRestServers();
+        Map<String, String> tableToProjects = new HashMap<>();
+        for (TableRef tableRef : model.getAllTables()) {
+            tableToProjects.put(tableRef.getTableIdentity(), tableRef.getTableDesc().getProject());
+        }
+
         for (String node : nodes) {
             RestClient restClient = new RestClient(node);
             try {
                 logger.info("update meta cache for " + node);
-                restClient.wipeCache(Broadcaster.SYNC_ALL, Broadcaster.Event.UPDATE.getType(),
Broadcaster.SYNC_ALL);
+                restClient.clearCacheForCubeMigration(cubeName, projectName, model.getName(),
tableToProjects);
             } catch (IOException e) {
                 logger.error(e.getMessage());
             }


Mime
View raw message