kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [3/4] incubator-kylin git commit: KYLIN-920 & KYLIN-782 Upgrade to HBase 1.1 (with help from murkrishn <murkrishna@ebay.com>)
Date Tue, 03 Nov 2015 10:27:40 GMT
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
index 5482684..239c7ec 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
@@ -1,313 +1,314 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.tools;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Matcher;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- */
-public class DeployCoprocessorCLI {
-
-    private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class);
-
-    public static final String OBSERVER_CLS_NAME = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
-    public static final String ENDPOINT_CLS_NAMAE = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
-
-    public static void main(String[] args) throws IOException {
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
-        FileSystem fileSystem = FileSystem.get(hconf);
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf);
-
-        String localCoprocessorJar = new File(args[0]).getAbsolutePath();
-        logger.info("Identify coprocessor jar " + localCoprocessorJar);
-
-        List<String> tableNames = getHTableNames(kylinConfig);
-        logger.info("Identify tables " + tableNames);
-
-        Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames);
-        logger.info("Old coprocessor jar: " + oldJarPaths);
-
-        Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
-        logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
-
-        List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
-
-        // Don't remove old jars, missing coprocessor jar will fail hbase
-        // removeOldJars(oldJarPaths, fileSystem);
-
-        hbaseAdmin.close();
-
-        logger.info("Processed " + processedTables);
-        logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
-    }
-
-    public static void deployCoprocessor(HTableDescriptor tableDesc) {
-        try {
-            initHTableCoprocessor(tableDesc);
-            logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor.");
-
-        } catch (Exception ex) {
-            logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex);
-            logger.error("Will try creating the table without coprocessor.");
-        }
-    }
-
-    private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ */
+public class DeployCoprocessorCLI {
+
+    private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class);
+
+    public static final String OBSERVER_CLS_NAME = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
+    public static final String ENDPOINT_CLS_NAMAE = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
+
+    public static void main(String[] args) throws IOException {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
-        FileSystem fileSystem = FileSystem.get(hconf);
-
-        String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
-        Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
-
-        DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
-    }
-
-    public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
-        logger.info("Add coprocessor on " + desc.getNameAsString());
-        desc.addCoprocessor(ENDPOINT_CLS_NAMAE, hdfsCoprocessorJar, 1000, null);
-        desc.addCoprocessor(OBSERVER_CLS_NAME, hdfsCoprocessorJar, 1001, null);
-    }
-
-    public static void resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
-        logger.info("Disable " + tableName);
-        hbaseAdmin.disableTable(tableName);
-
-        logger.info("Unset coprocessor on " + tableName);
-        HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-        while (desc.hasCoprocessor(OBSERVER_CLS_NAME)) {
-            desc.removeCoprocessor(OBSERVER_CLS_NAME);
-        }
-        while (desc.hasCoprocessor(ENDPOINT_CLS_NAMAE)) {
-            desc.removeCoprocessor(ENDPOINT_CLS_NAMAE);
-        }
-
-        addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
-        hbaseAdmin.modifyTable(tableName, desc);
-
-        logger.info("Enable " + tableName);
-        hbaseAdmin.enableTable(tableName);
-    }
-
-    private static List<String> resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
-        List<String> processed = new ArrayList<String>();
-
-        for (String tableName : tableNames) {
-            try {
-                resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
-                processed.add(tableName);
-            } catch (IOException ex) {
-                logger.error("Error processing " + tableName, ex);
-            }
-        }
-        return processed;
-    }
-
-    public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
-        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
-        FileStatus newestJar = null;
-        for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
-            if (fileStatus.getPath().toString().endsWith(".jar")) {
-                if (newestJar == null) {
-                    newestJar = fileStatus;
-                } else {
-                    if (newestJar.getModificationTime() < fileStatus.getModificationTime())
-                        newestJar = fileStatus;
-                }
-            }
-        }
-        if (newestJar == null)
-            return null;
-
-        Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null);
-        logger.info("The newest coprocessor is " + path.toString());
-        return path;
-    }
-
-    public static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
-        Path uploadPath = null;
-        File localCoprocessorFile = new File(localCoprocessorJar);
-
-        // check existing jars
-        if (oldJarPaths == null) {
-            oldJarPaths = new HashSet<String>();
-        }
-        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
-        for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
-            if (fileStatus.getLen() == localCoprocessorJar.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified()) {
-                uploadPath = fileStatus.getPath();
-                break;
-            }
-            String filename = fileStatus.getPath().toString();
-            if (filename.endsWith(".jar")) {
-                oldJarPaths.add(filename);
-            }
-        }
-
-        // upload if not existing
-        if (uploadPath == null) {
-            // figure out a unique new jar file name
-            Set<String> oldJarNames = new HashSet<String>();
-            for (String path : oldJarPaths) {
-                oldJarNames.add(new Path(path).getName());
-            }
-            String baseName = getBaseFileName(localCoprocessorJar);
-            String newName = null;
-            int i = 0;
-            while (newName == null) {
-                newName = baseName + "-" + (i++) + ".jar";
-                if (oldJarNames.contains(newName))
-                    newName = null;
-            }
-
-            // upload
-            uploadPath = new Path(coprocessorDir, newName);
-            FileInputStream in = null;
-            FSDataOutputStream out = null;
-            try {
-                in = new FileInputStream(localCoprocessorFile);
-                out = fileSystem.create(uploadPath);
-                IOUtils.copy(in, out);
-            } finally {
-                IOUtils.closeQuietly(in);
-                IOUtils.closeQuietly(out);
-            }
-
-            fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1);
-
-        }
-
-        uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
-        return uploadPath;
-    }
-
-    private static String getBaseFileName(String localCoprocessorJar) {
-        File localJar = new File(localCoprocessorJar);
-        String baseName = localJar.getName();
-        if (baseName.endsWith(".jar"))
-            baseName = baseName.substring(0, baseName.length() - ".jar".length());
-        return baseName;
-    }
-
-    private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
-        String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
-        Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
-        fileSystem.mkdirs(coprocessorDir);
-        return coprocessorDir;
-    }
-
-    private static Set<String> getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List<String> tableNames) throws IOException {
-        HashSet<String> result = new HashSet<String>();
-
-        for (String tableName : tableNames) {
-            HTableDescriptor tableDescriptor = null;
-            try {
-                tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-            } catch (TableNotFoundException e) {
-                logger.warn("Table not found " + tableName, e);
-                continue;
-            }
-
-            Matcher keyMatcher;
-            Matcher valueMatcher;
-            for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) {
-                keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
-                if (!keyMatcher.matches()) {
-                    continue;
-                }
-                valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get()));
-                if (!valueMatcher.matches()) {
-                    continue;
-                }
-
-                String jarPath = valueMatcher.group(1).trim();
-                String clsName = valueMatcher.group(2).trim();
-
-                if (OBSERVER_CLS_NAME.equals(clsName)) {
-                    result.add(jarPath);
-                }
-            }
-        }
-
-        return result;
-    }
-
-    private static List<String> getHTableNames(KylinConfig config) {
-        CubeManager cubeMgr = CubeManager.getInstance(config);
-
-        ArrayList<String> result = new ArrayList<String>();
-        for (CubeInstance cube : cubeMgr.listAllCubes()) {
-            for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
-                String tableName = seg.getStorageLocationIdentifier();
-                if (StringUtils.isBlank(tableName) == false) {
-                    result.add(tableName);
-                    System.out.println("added new table: " + tableName);
-                }
-            }
-        }
-
-        for (IIInstance ii : IIManager.getInstance(config).listAllIIs()) {
-            for (IISegment seg : ii.getSegments(SegmentStatusEnum.READY)) {
-                String tableName = seg.getStorageLocationIdentifier();
-                if (StringUtils.isBlank(tableName) == false) {
-                    result.add(tableName);
-                    System.out.println("added new table: " + tableName);
-                }
-            }
-        }
-
-        return result;
-    }
-}
+        FileSystem fileSystem = FileSystem.get(hconf);
+        Admin hbaseAdmin = HBaseConnection.get().getAdmin();
+
+        String localCoprocessorJar = new File(args[0]).getAbsolutePath();
+        logger.info("Identify coprocessor jar " + localCoprocessorJar);
+
+        List<String> tableNames = getHTableNames(kylinConfig);
+        logger.info("Identify tables " + tableNames);
+
+        Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames);
+        logger.info("Old coprocessor jar: " + oldJarPaths);
+
+        Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
+        logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
+
+        List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
+
+        // Don't remove old jars, missing coprocessor jar will fail hbase
+        // removeOldJars(oldJarPaths, fileSystem);
+
+        hbaseAdmin.close();
+
+        logger.info("Processed " + processedTables);
+        logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
+    }
+
+    public static void deployCoprocessor(HTableDescriptor tableDesc) {
+        try {
+            initHTableCoprocessor(tableDesc);
+            logger.info("hbase table " + tableDesc.getTableName() + " deployed with coprocessor.");
+
+        } catch (Exception ex) {
+            logger.error("Error deploying coprocessor on " + tableDesc.getTableName(), ex);
+            logger.error("Will try creating the table without coprocessor.");
+        }
+    }
+
+    private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
+        FileSystem fileSystem = FileSystem.get(hconf);
+
+        String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
+        Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
+
+        DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+    }
+
+    public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
+        logger.info("Add coprocessor on " + desc.getNameAsString());
+        desc.addCoprocessor(ENDPOINT_CLS_NAMAE, hdfsCoprocessorJar, 1000, null);
+        desc.addCoprocessor(OBSERVER_CLS_NAME, hdfsCoprocessorJar, 1001, null);
+    }
+
+    public static void resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
+        logger.info("Disable " + tableName);
+        hbaseAdmin.disableTable(TableName.valueOf(tableName));
+
+        logger.info("Unset coprocessor on " + tableName);
+        HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+        while (desc.hasCoprocessor(OBSERVER_CLS_NAME)) {
+            desc.removeCoprocessor(OBSERVER_CLS_NAME);
+        }
+        while (desc.hasCoprocessor(ENDPOINT_CLS_NAMAE)) {
+            desc.removeCoprocessor(ENDPOINT_CLS_NAMAE);
+        }
+
+        addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+        hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+
+        logger.info("Enable " + tableName);
+        hbaseAdmin.enableTable(TableName.valueOf(tableName));
+    }
+
+    private static List<String> resetCoprocessorOnHTables(Admin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
+        List<String> processed = new ArrayList<String>();
+
+        for (String tableName : tableNames) {
+            try {
+                resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
+                processed.add(tableName);
+            } catch (IOException ex) {
+                logger.error("Error processing " + tableName, ex);
+            }
+        }
+        return processed;
+    }
+
+    public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
+        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
+        FileStatus newestJar = null;
+        for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
+            if (fileStatus.getPath().toString().endsWith(".jar")) {
+                if (newestJar == null) {
+                    newestJar = fileStatus;
+                } else {
+                    if (newestJar.getModificationTime() < fileStatus.getModificationTime())
+                        newestJar = fileStatus;
+                }
+            }
+        }
+        if (newestJar == null)
+            return null;
+
+        Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null);
+        logger.info("The newest coprocessor is " + path.toString());
+        return path;
+    }
+
+    public static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
+        Path uploadPath = null;
+        File localCoprocessorFile = new File(localCoprocessorJar);
+
+        // check existing jars
+        if (oldJarPaths == null) {
+            oldJarPaths = new HashSet<String>();
+        }
+        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
+        for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
+            if (fileStatus.getLen() == localCoprocessorJar.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified()) {
+                uploadPath = fileStatus.getPath();
+                break;
+            }
+            String filename = fileStatus.getPath().toString();
+            if (filename.endsWith(".jar")) {
+                oldJarPaths.add(filename);
+            }
+        }
+
+        // upload if not existing
+        if (uploadPath == null) {
+            // figure out a unique new jar file name
+            Set<String> oldJarNames = new HashSet<String>();
+            for (String path : oldJarPaths) {
+                oldJarNames.add(new Path(path).getName());
+            }
+            String baseName = getBaseFileName(localCoprocessorJar);
+            String newName = null;
+            int i = 0;
+            while (newName == null) {
+                newName = baseName + "-" + (i++) + ".jar";
+                if (oldJarNames.contains(newName))
+                    newName = null;
+            }
+
+            // upload
+            uploadPath = new Path(coprocessorDir, newName);
+            FileInputStream in = null;
+            FSDataOutputStream out = null;
+            try {
+                in = new FileInputStream(localCoprocessorFile);
+                out = fileSystem.create(uploadPath);
+                IOUtils.copy(in, out);
+            } finally {
+                IOUtils.closeQuietly(in);
+                IOUtils.closeQuietly(out);
+            }
+
+            fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1);
+
+        }
+
+        uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
+        return uploadPath;
+    }
+
+    private static String getBaseFileName(String localCoprocessorJar) {
+        File localJar = new File(localCoprocessorJar);
+        String baseName = localJar.getName();
+        if (baseName.endsWith(".jar"))
+            baseName = baseName.substring(0, baseName.length() - ".jar".length());
+        return baseName;
+    }
+
+    private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
+        String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
+        Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
+        fileSystem.mkdirs(coprocessorDir);
+        return coprocessorDir;
+    }
+
+    private static Set<String> getCoprocessorJarPaths(Admin hbaseAdmin, List<String> tableNames) throws IOException {
+        HashSet<String> result = new HashSet<String>();
+
+        for (String tableName : tableNames) {
+            HTableDescriptor tableDescriptor = null;
+            try {
+                tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+            } catch (TableNotFoundException e) {
+                logger.warn("Table not found " + tableName, e);
+                continue;
+            }
+
+            Matcher keyMatcher;
+            Matcher valueMatcher;
+            for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) {
+                keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
+                if (!keyMatcher.matches()) {
+                    continue;
+                }
+                valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get()));
+                if (!valueMatcher.matches()) {
+                    continue;
+                }
+
+                String jarPath = valueMatcher.group(1).trim();
+                String clsName = valueMatcher.group(2).trim();
+
+                if (OBSERVER_CLS_NAME.equals(clsName)) {
+                    result.add(jarPath);
+                }
+            }
+        }
+
+        return result;
+    }
+
+    private static List<String> getHTableNames(KylinConfig config) {
+        CubeManager cubeMgr = CubeManager.getInstance(config);
+
+        ArrayList<String> result = new ArrayList<String>();
+        for (CubeInstance cube : cubeMgr.listAllCubes()) {
+            for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
+                String tableName = seg.getStorageLocationIdentifier();
+                if (StringUtils.isBlank(tableName) == false) {
+                    result.add(tableName);
+                    System.out.println("added new table: " + tableName);
+                }
+            }
+        }
+
+        for (IIInstance ii : IIManager.getInstance(config).listAllIIs()) {
+            for (IISegment seg : ii.getSegments(SegmentStatusEnum.READY)) {
+                String tableName = seg.getStorageLocationIdentifier();
+                if (StringUtils.isBlank(tableName) == false) {
+                    result.add(tableName);
+                    System.out.println("added new table: " + tableName);
+                }
+            }
+        }
+
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java b/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
index 70e1df6..5fe5e58 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
@@ -28,13 +28,13 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.Bytes;
@@ -74,8 +74,7 @@ public class GridTableHBaseBenchmark {
     public static void testGridTable(double hitRatio, double indexRatio) throws IOException {
         System.out.println("Testing grid table scanning, hit ratio " + hitRatio + ", index ratio " + indexRatio);
         String hbaseUrl = "hbase"; // use hbase-site.xml on classpath
-
-        HConnection conn = HBaseConnection.get(hbaseUrl);
+        Connection conn = HBaseConnection.get(hbaseUrl);
         createHTableIfNeeded(conn, TEST_TABLE);
         prepareData(conn);
 
@@ -91,10 +90,10 @@ public class GridTableHBaseBenchmark {
 
     }
 
-    private static void testColumnScan(HConnection conn, List<Pair<Integer, Integer>> colScans) throws IOException {
+    private static void testColumnScan(Connection conn, List<Pair<Integer, Integer>> colScans) throws IOException {
         Stats stats = new Stats("COLUMN_SCAN");
 
-        HTableInterface table = conn.getTable(TEST_TABLE);
+        Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
         try {
             stats.markStart();
 
@@ -122,20 +121,20 @@ public class GridTableHBaseBenchmark {
         }
     }
 
-    private static void testRowScanNoIndexFullScan(HConnection conn, boolean[] hits) throws IOException {
+    private static void testRowScanNoIndexFullScan(Connection conn, boolean[] hits) throws IOException {
         fullScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_FULL"));
     }
 
-    private static void testRowScanNoIndexSkipScan(HConnection conn, boolean[] hits) throws IOException {
+    private static void testRowScanNoIndexSkipScan(Connection conn, boolean[] hits) throws IOException {
         jumpScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_SKIP"));
     }
 
-    private static void testRowScanWithIndex(HConnection conn, boolean[] hits) throws IOException {
+    private static void testRowScanWithIndex(Connection conn, boolean[] hits) throws IOException {
         jumpScan(conn, hits, new Stats("ROW_SCAN_IDX"));
     }
 
-    private static void fullScan(HConnection conn, boolean[] hits, Stats stats) throws IOException {
-        HTableInterface table = conn.getTable(TEST_TABLE);
+    private static void fullScan(Connection conn, boolean[] hits, Stats stats) throws IOException {
+        Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
         try {
             stats.markStart();
 
@@ -156,11 +155,11 @@ public class GridTableHBaseBenchmark {
         }
     }
 
-    private static void jumpScan(HConnection conn, boolean[] hits, Stats stats) throws IOException {
+    private static void jumpScan(Connection conn, boolean[] hits, Stats stats) throws IOException {
 
         final int jumpThreshold = 6; // compensate for Scan() overhead, totally by experience
 
-        HTableInterface table = conn.getTable(TEST_TABLE);
+        Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
         try {
 
             stats.markStart();
@@ -204,8 +203,8 @@ public class GridTableHBaseBenchmark {
         }
     }
 
-    private static void prepareData(HConnection conn) throws IOException {
-        HTableInterface table = conn.getTable(TEST_TABLE);
+    private static void prepareData(Connection conn) throws IOException {
+        Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
 
         try {
             // check how many rows existing
@@ -232,7 +231,7 @@ public class GridTableHBaseBenchmark {
                 byte[] rowkey = Bytes.toBytes(i);
                 Put put = new Put(rowkey);
                 byte[] cell = randomBytes();
-                put.add(CF, QN, cell);
+                put.addColumn(CF, QN, cell);
                 table.put(put);
                 nBytes += cell.length;
                 dot(i, N_ROWS);
@@ -258,8 +257,8 @@ public class GridTableHBaseBenchmark {
         return bytes;
     }
 
-    private static void createHTableIfNeeded(HConnection conn, String tableName) throws IOException {
-        HBaseAdmin hbase = new HBaseAdmin(conn);
+    private static void createHTableIfNeeded(Connection conn, String tableName) throws IOException {
+        Admin hbase = conn.getAdmin();
 
         try {
             boolean tableExist = false;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java b/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
index 53930e3..e283748 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
@@ -23,12 +23,11 @@ import java.io.IOException;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,8 +70,7 @@ public class HtableAlterMetadataCLI extends AbstractHadoopJob {
     }
 
     private void alter() throws IOException {
-        Configuration conf = HBaseConfiguration.create();
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+        Admin hbaseAdmin = HBaseConnection.get().getAdmin();
         HTableDescriptor table = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
 
         hbaseAdmin.disableTable(table.getTableName());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java b/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
index 3329d27..4d44088 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
@@ -22,11 +22,12 @@ import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.slf4j.Logger;
@@ -69,8 +70,8 @@ public class RowCounterCLI {
 
         logger.info("My Scan " + scan.toString());
 
-        HConnection conn = HConnectionManager.createConnection(conf);
-        HTableInterface tableInterface = conn.getTable(htableName);
+        Connection conn = ConnectionFactory.createConnection(conf);
+        Table tableInterface = conn.getTable(TableName.valueOf(htableName));
 
         Iterator<Result> iterator = tableInterface.getScanner(scan).iterator();
         int counter = 0;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java b/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
index e784a41..95a483d 100644
--- a/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
+++ b/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
@@ -22,10 +22,11 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
@@ -39,7 +40,7 @@ public class ExportHBaseData {
     KylinConfig kylinConfig;
     HTableDescriptor[] allTables;
     Configuration config;
-    HBaseAdmin hbase;
+    Admin admin;
     CliCommandExecutor cli;
     String exportHdfsFolder;
     String exportLocalFolderParent;
@@ -75,12 +76,11 @@ public class ExportHBaseData {
         int cut = metadataUrl.indexOf('@');
         tableNameBase = metadataUrl.substring(0, cut);
         String hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1);
-
-        HConnection conn = HBaseConnection.get(hbaseUrl);
+        Connection conn = HBaseConnection.get(hbaseUrl);
         try {
-            hbase = new HBaseAdmin(conn);
-            config = hbase.getConfiguration();
-            allTables = hbase.listTables();
+            admin = conn.getAdmin();
+            config = admin.getConfiguration();
+            allTables = admin.listTables();
         } catch (IOException e) {
             e.printStackTrace();
             throw e;
@@ -89,6 +89,8 @@ public class ExportHBaseData {
 
     public void tearDown() {
 
+        // close hbase admin
+        IOUtils.closeQuietly(admin);
         // cleanup hdfs
         try {
             if (cli != null && exportHdfsFolder != null) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java b/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java
index f2b9ed6..5a04d20 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java
@@ -22,8 +22,11 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.util.Bytes;
 
 /**
@@ -90,13 +93,15 @@ public class TestHbaseClient {
         conf.set("hbase.zookeeper.quorum", "hbase_host");
         conf.set("zookeeper.znode.parent", "/hbase-unsecure");
 
-        HTable table = new HTable(conf, "test1");
+        Connection connection = ConnectionFactory.createConnection(conf);
+        Table table = connection.getTable(TableName.valueOf("test1"));
         Put put = new Put(Bytes.toBytes("row1"));
 
-        put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
-        put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2"));
+        put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
+        put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2"));
 
         table.put(put);
         table.close();
+        connection.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java b/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java
index 9f9c23c..f5f94c8 100644
--- a/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java
+++ b/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java
@@ -23,10 +23,11 @@ import java.io.IOException;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.BytesUtil;
@@ -60,11 +61,11 @@ public class HBaseRowDigestTest extends HBaseMetadataTestCase {
     @Test
     public static void test() throws IOException {
         String hbaseUrl = "hbase"; // use hbase-site.xml on classpath
-        HConnection conn = null;
-        HTableInterface table = null;
+        Connection conn = null;
+        Table table = null;
         try {
             conn = HBaseConnection.get(hbaseUrl);
-            table = conn.getTable("KYLIN_II_YTYWP3CQGJ");
+            table = conn.getTable(TableName.valueOf("KYLIN_II_YTYWP3CQGJ"));
             ResultScanner scanner = table.getScanner(CF, QN);
             StringBuffer sb = new StringBuffer();
             while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/monitor/pom.xml
----------------------------------------------------------------------
diff --git a/monitor/pom.xml b/monitor/pom.xml
index dfdd12d..a131bd6 100644
--- a/monitor/pom.xml
+++ b/monitor/pom.xml
@@ -39,6 +39,12 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-common</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java
----------------------------------------------------------------------
diff --git a/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java b/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java
index 97200fc..94b3937 100644
--- a/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java
+++ b/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java
@@ -20,18 +20,21 @@ package org.apache.kylin.monitor;
 
 import java.io.IOException;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.log4j.Logger;
 
 /**
@@ -122,11 +125,10 @@ public class MonitorMetaManager {
     public static String getListWithRowkey(String table, String rowkey) throws IOException {
         Result result = getResultByRowKey(table, rowkey);
         String fileList = null;
-        if (result.list() != null) {
-            for (KeyValue kv : result.list()) {
-                fileList = Bytes.toString(kv.getValue());
+        if (result.listCells() != null) {
+            for (Cell cell : result.listCells()) {
+                fileList = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset());
             }
-
         }
         fileList = fileList == null ? "" : fileList;
         return fileList;
@@ -164,16 +166,20 @@ public class MonitorMetaManager {
      * create table in hbase
      */
     public static void creatTable(String tableName, String[] family) throws Exception {
-        HBaseAdmin admin = new HBaseAdmin(conf);
-        HTableDescriptor desc = new HTableDescriptor(tableName);
-        for (int i = 0; i < family.length; i++) {
-            desc.addFamily(new HColumnDescriptor(family[i]));
-        }
-        if (admin.tableExists(tableName)) {
-            logger.info("table Exists!");
-        } else {
-            admin.createTable(desc);
-            logger.info("create table Success!");
+        Admin admin = HBaseConnection.get().getAdmin();
+        try {
+            HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+            for (int i = 0; i < family.length; i++) {
+                desc.addFamily(new HColumnDescriptor(family[i]));
+            }
+            if (admin.tableExists(TableName.valueOf(tableName))) {
+                logger.info("table Exists!");
+            } else {
+                admin.createTable(desc);
+                logger.info("create table Success!");
+            }
+        } finally {
+            IOUtils.closeQuietly(admin);
         }
     }
 
@@ -181,13 +187,15 @@ public class MonitorMetaManager {
      * update cell in hbase
      */
     public static void updateData(String tableName, String rowKey, String family, String column, String value) throws IOException {
-        HTable table = new HTable(conf, Bytes.toBytes(tableName));
+        Table table = HBaseConnection.get().getTable(TableName.valueOf(tableName));
         Put put = new Put(rowKey.getBytes());
-        put.add(family.getBytes(), column.getBytes(), value.getBytes());
+        put.addColumn(family.getBytes(), column.getBytes(), value.getBytes());
         try {
             table.put(put);
         } catch (IOException e) {
             e.printStackTrace();
+        } finally {
+            IOUtils.closeQuietly(table);
         }
         logger.info("update table [" + tableName + "]");
         logger.info("rowKey [" + rowKey + "]");
@@ -200,9 +208,10 @@ public class MonitorMetaManager {
      * get result by rowkey
      */
     public static Result getResultByRowKey(String tableName, String rowKey) throws IOException {
-        HTable table = new HTable(conf, Bytes.toBytes(tableName));
+        Table table = HBaseConnection.get().getTable(TableName.valueOf(tableName));
         Get get = new Get(Bytes.toBytes(rowKey));
         Result result = table.get(get);
+        IOUtils.closeQuietly(table);
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1b4f862..4bdcf7c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,12 +45,13 @@
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 
         <!-- Hadoop versions -->
-        <hadoop2.version>2.6.0</hadoop2.version>
-        <yarn.version>2.6.0</yarn.version>
+        <hadoop2.version>2.7.1</hadoop2.version>
+        <yarn.version>2.7.1</yarn.version>
         <zookeeper.version>3.4.6</zookeeper.version>
-        <hive.version>0.14.0</hive.version>
-        <hive-hcatalog.version>0.14.0</hive-hcatalog.version>
-        <hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version>
+        <hive.version>1.2.1</hive.version>
+        <hive-hcatalog.version>1.2.1</hive-hcatalog.version>
+        <hbase-hadoop2.version>1.1.1</hbase-hadoop2.version>
+        <curator.version>2.7.1</curator.version>
 
         <!-- Dependency versions -->
         <antlr.version>3.4</antlr.version>
@@ -89,9 +90,6 @@
         <!-- Calcite Version -->
         <calcite.version>1.4.0-incubating</calcite.version>
 
-        <!-- Curator.version Version -->
-        <curator.version>2.6.0</curator.version>
-
         <!-- Sonar -->
         <sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
         <sonar.dynamicAnalysis>reuseReports</sonar.dynamicAnalysis>
@@ -817,8 +815,7 @@
                         </configuration>
                     </plugin>
 
-                    <!-- Override the parent assembly execution to customize the assembly
-              descriptor and final name. -->
+                    <!-- Override the parent assembly execution to customize the assembly descriptor and final name. -->
                     <plugin>
                         <artifactId>maven-assembly-plugin</artifactId>
                         <executions>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/server/src/main/java/org/apache/kylin/rest/service/AclService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/AclService.java b/server/src/main/java/org/apache/kylin/rest/service/AclService.java
index ea2a48e..8a1cf6d 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/AclService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/AclService.java
@@ -29,13 +29,14 @@ import java.util.Map;
 import java.util.NavigableMap;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.kylin.common.KylinConfig;
@@ -130,9 +131,9 @@ public class AclService implements MutableAclService {
     @Override
     public List<ObjectIdentity> findChildren(ObjectIdentity parentIdentity) {
         List<ObjectIdentity> oids = new ArrayList<ObjectIdentity>();
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
 
             Scan scan = new Scan();
             SingleColumnValueFilter parentFilter = new SingleColumnValueFilter(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), CompareOp.EQUAL, domainObjSerializer.serialize(new DomainObjectInfo(parentIdentity)));
@@ -179,10 +180,10 @@ public class AclService implements MutableAclService {
     @Override
     public Map<ObjectIdentity, Acl> readAclsById(List<ObjectIdentity> oids, List<Sid> sids) throws NotFoundException {
         Map<ObjectIdentity, Acl> aclMaps = new HashMap<ObjectIdentity, Acl>();
-        HTableInterface htable = null;
+        Table htable = null;
         Result result = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
 
             for (ObjectIdentity oid : oids) {
                 result = htable.get(new Get(Bytes.toBytes(String.valueOf(oid.getIdentifier()))));
@@ -231,16 +232,15 @@ public class AclService implements MutableAclService {
         Authentication auth = SecurityContextHolder.getContext().getAuthentication();
         PrincipalSid sid = new PrincipalSid(auth);
 
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
             Put put = new Put(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier())));
-            put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType()));
-            put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid)));
-            put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true));
+            put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType()));
+            put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid)));
+            put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true));
 
             htable.put(put);
-            htable.flushCommits();
 
             logger.debug("ACL of " + objectIdentity + " created successfully.");
         } catch (IOException e) {
@@ -254,9 +254,9 @@ public class AclService implements MutableAclService {
 
     @Override
     public void deleteAcl(ObjectIdentity objectIdentity, boolean deleteChildren) throws ChildrenExistException {
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
             Delete delete = new Delete(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier())));
 
             List<ObjectIdentity> children = findChildren(objectIdentity);
@@ -269,7 +269,6 @@ public class AclService implements MutableAclService {
             }
 
             htable.delete(delete);
-            htable.flushCommits();
 
             logger.debug("ACL of " + objectIdentity + " deleted successfully.");
         } catch (IOException e) {
@@ -287,27 +286,26 @@ public class AclService implements MutableAclService {
             throw e;
         }
 
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
             Delete delete = new Delete(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier())));
-            delete.deleteFamily(Bytes.toBytes(ACL_ACES_FAMILY));
+            delete.addFamily(Bytes.toBytes(ACL_ACES_FAMILY));
             htable.delete(delete);
 
             Put put = new Put(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier())));
 
             if (null != acl.getParentAcl()) {
-                put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity())));
+                put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity())));
             }
 
             for (AccessControlEntry ace : acl.getEntries()) {
                 AceInfo aceInfo = new AceInfo(ace);
-                put.add(Bytes.toBytes(ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo));
+                put.addColumn(Bytes.toBytes(ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo));
             }
 
             if (!put.isEmpty()) {
                 htable.put(put);
-                htable.flushCommits();
 
                 logger.debug("ACL of " + acl.getObjectIdentity() + " updated successfully.");
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index f115d89..2770475 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -29,9 +29,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.HBaseRegionSizeCalculator;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
@@ -430,33 +430,24 @@ public class CubeService extends BasicService {
      * @throws IOException Exception when HTable resource is not closed correctly.
      */
     public HBaseResponse getHTableInfo(String tableName) throws IOException {
-        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
-        HTable table = null;
+        Connection conn = HBaseConnection.get();
         HBaseResponse hr = null;
         long tableSize = 0;
         int regionCount = 0;
 
-        try {
-            table = new HTable(hconf, tableName);
-
-            HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table);
-            Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
+        HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn);
+        Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
 
-            for (long s : sizeMap.values()) {
-                tableSize += s;
-            }
+        for (long s : sizeMap.values()) {
+            tableSize += s;
+        }
 
-            regionCount = sizeMap.size();
+        regionCount = sizeMap.size();
 
-            // Set response.
-            hr = new HBaseResponse();
-            hr.setTableSize(tableSize);
-            hr.setRegionCount(regionCount);
-        } finally {
-            if (null != table) {
-                table.close();
-            }
-        }
+        // Set response.
+        hr = new HBaseResponse();
+        hr.setTableSize(tableSize);
+        hr.setRegionCount(regionCount);
 
         return hr;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 764df4b..7d14021 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -42,10 +42,11 @@ import javax.sql.DataSource;
 
 import org.apache.calcite.avatica.ColumnMetaData.Rep;
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.persistence.HBaseConnection;
@@ -124,14 +125,13 @@ public class QueryService extends BasicService {
         Query[] queryArray = new Query[queries.size()];
 
         byte[] bytes = querySerializer.serialize(queries.toArray(queryArray));
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
             Put put = new Put(Bytes.toBytes(creator));
-            put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
+            put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
 
             htable.put(put);
-            htable.flushCommits();
         } finally {
             IOUtils.closeQuietly(htable);
         }
@@ -157,14 +157,13 @@ public class QueryService extends BasicService {
 
         Query[] queryArray = new Query[queries.size()];
         byte[] bytes = querySerializer.serialize(queries.toArray(queryArray));
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
             Put put = new Put(Bytes.toBytes(creator));
-            put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
+            put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
 
             htable.put(put);
-            htable.flushCommits();
         } finally {
             IOUtils.closeQuietly(htable);
         }
@@ -176,9 +175,9 @@ public class QueryService extends BasicService {
         }
 
         List<Query> queries = new ArrayList<Query>();
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
             Get get = new Get(Bytes.toBytes(creator));
             get.addFamily(Bytes.toBytes(USER_QUERY_FAMILY));
             Result result = htable.get(get);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/server/src/main/java/org/apache/kylin/rest/service/UserService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/UserService.java b/server/src/main/java/org/apache/kylin/rest/service/UserService.java
index d665ab9..d03cd55 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/UserService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/UserService.java
@@ -25,13 +25,14 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.Bytes;
@@ -75,9 +76,9 @@ public class UserService implements UserManager {
 
     @Override
     public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException {
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
 
             Get get = new Get(Bytes.toBytes(username));
             get.addFamily(Bytes.toBytes(USER_AUTHORITY_FAMILY));
@@ -106,15 +107,14 @@ public class UserService implements UserManager {
 
     @Override
     public void updateUser(UserDetails user) {
-        HTableInterface htable = null;
+        Table htable = null;
         try {
             byte[] userAuthorities = serialize(user.getAuthorities());
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
             Put put = new Put(Bytes.toBytes(user.getUsername()));
-            put.add(Bytes.toBytes(USER_AUTHORITY_FAMILY), Bytes.toBytes(USER_AUTHORITY_COLUMN), userAuthorities);
+            put.addColumn(Bytes.toBytes(USER_AUTHORITY_FAMILY), Bytes.toBytes(USER_AUTHORITY_COLUMN), userAuthorities);
 
             htable.put(put);
-            htable.flushCommits();
         } catch (IOException e) {
             throw new RuntimeException(e.getMessage(), e);
         } finally {
@@ -124,13 +124,12 @@ public class UserService implements UserManager {
 
     @Override
     public void deleteUser(String username) {
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
             Delete delete = new Delete(Bytes.toBytes(username));
 
             htable.delete(delete);
-            htable.flushCommits();
         } catch (IOException e) {
             throw new RuntimeException(e.getMessage(), e);
         } finally {
@@ -145,9 +144,9 @@ public class UserService implements UserManager {
 
     @Override
     public boolean userExists(String username) {
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
             Result result = htable.get(new Get(Bytes.toBytes(username)));
 
             return null != result && !result.isEmpty();
@@ -164,10 +163,10 @@ public class UserService implements UserManager {
         s.addColumn(Bytes.toBytes(USER_AUTHORITY_FAMILY), Bytes.toBytes(USER_AUTHORITY_COLUMN));
 
         List<String> authorities = new ArrayList<String>();
-        HTableInterface htable = null;
+        Table htable = null;
         ResultScanner scanner = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
             scanner = htable.getScanner(s);
 
             for (Result result = scanner.next(); result != null; result = scanner.next()) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
index 9efbb79..6f08f8a 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
@@ -28,16 +28,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.kylin.common.persistence.StorageException;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.Bytes;
@@ -82,7 +82,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
     private final Collection<RowValueDecoder> rowValueDecoders;
     private final StorageContext context;
     private final String tableName;
-    private final HTableInterface table;
+    private final Table table;
     private final RowKeyDecoder rowKeyDecoder;
     private final Iterator<HBaseKeyRange> rangeIterator;
 
@@ -94,7 +94,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
     private int scanCount;
     private int scanCountDelta;
 
-    public CubeSegmentTupleIterator(CubeSegment cubeSeg, Collection<HBaseKeyRange> keyRanges, HConnection conn, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
+    public CubeSegmentTupleIterator(CubeSegment cubeSeg, Collection<HBaseKeyRange> keyRanges, Connection conn, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
         this.cube = cubeSeg.getCubeInstance();
         this.cubeSeg = cubeSeg;
         this.dimensions = dimensions;
@@ -106,7 +106,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg);
 
         try {
-            this.table = conn.getTable(tableName);
+            this.table = conn.getTable(TableName.valueOf(tableName));
         } catch (Throwable t) {
             throw new StorageException("Error when open connection to table " + tableName, t);
         }
@@ -122,12 +122,11 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
 
     private void closeScanner() {
         flushScanCountDelta();
-        
+
         if (logger.isDebugEnabled() && scan != null) {
             logger.debug("Scan " + scan.toString());
-            byte[] metricsBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
-            if (metricsBytes != null) {
-                ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(metricsBytes);
+            ScanMetrics scanMetrics = scan.getScanMetrics();
+            if (scanMetrics != null) {
                 logger.debug("HBase Metrics: " + "count={}, ms={}, bytes={}, remote_bytes={}, regions={}, not_serving_region={}, rpc={}, rpc_retries={}, remote_rpc={}, remote_rpc_retries={}", new Object[] { scanCount, scanMetrics.sumOfMillisSecBetweenNexts, scanMetrics.countOfBytesInResults, scanMetrics.countOfBytesInRemoteResults, scanMetrics.countOfRegions, scanMetrics.countOfNSRE, scanMetrics.countOfRPCcalls, scanMetrics.countOfRPCRetries, scanMetrics.countOfRemoteRPCcalls, scanMetrics.countOfRemoteRPCRetries });
             }
         }
@@ -254,7 +253,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         Scan scan = new Scan();
         scan.setCaching(SCAN_CACHE);
         scan.setCacheBlocks(true);
-        scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
+        scan.setScanMetricsEnabled(true);
         for (RowValueDecoder valueDecoder : this.rowValueDecoders) {
             HBaseColumnDesc hbaseColumn = valueDecoder.getHBaseColumn();
             byte[] byteFamily = Bytes.toBytes(hbaseColumn.getColumnFamilyName());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
index 8eb7bcb..9ae296f 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
@@ -32,7 +32,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.Pair;
@@ -140,7 +140,7 @@ public class CubeStorageEngine implements IStorageEngine {
         setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
         setLimit(filter, context);
 
-        HConnection conn = HBaseConnection.get(context.getConnUrl());
+        Connection conn = HBaseConnection.get(context.getConnUrl());
         return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java
index 918fd4b..6a76baa 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java
@@ -1,93 +1,94 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.invertedindex.model.IIDesc;
-
-/**
- * @author yangli9
- * 
- */
-public class HBaseClientKVIterator implements Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>, Closeable {
-
-    byte[] family;
-    byte[] qualifier;
-
-    HTableInterface table;
-    ResultScanner scanner;
-    Iterator<Result> iterator;
-
-    public HBaseClientKVIterator(HConnection hconn, String tableName, byte[] family, byte[] qualifier) throws IOException {
-        this.family = family;
-        this.qualifier = qualifier;
-
-        this.table = hconn.getTable(tableName);
-        this.scanner = table.getScanner(family, qualifier);
-        this.iterator = scanner.iterator();
-    }
-
-    @Override
-    public void close() {
-        IOUtils.closeQuietly(scanner);
-        IOUtils.closeQuietly(table);
-    }
-
-    @Override
-    public Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator() {
-        return new MyIterator();
-    }
-
-    private class MyIterator implements Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> {
-
-        ImmutableBytesWritable key = new ImmutableBytesWritable();
-        ImmutableBytesWritable value = new ImmutableBytesWritable();
-        Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair = new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(key, value);
-
-        @Override
-        public boolean hasNext() {
-            return iterator.hasNext();
-        }
-
-        @Override
-        public Pair<ImmutableBytesWritable, ImmutableBytesWritable> next() {
-            Result r = iterator.next();
-            Cell c = r.getColumnLatestCell(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
-            key.set(c.getRowArray(), c.getRowOffset(), c.getRowLength());
-            value.set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
-            return pair;
-        }
-
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.invertedindex.model.IIDesc;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class HBaseClientKVIterator implements Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>, Closeable {
+
+    byte[] family;
+    byte[] qualifier;
+
+    Table table;
+    ResultScanner scanner;
+    Iterator<Result> iterator;
+
+    public HBaseClientKVIterator(Connection hconn, String tableName, byte[] family, byte[] qualifier) throws IOException {
+        this.family = family;
+        this.qualifier = qualifier;
+
+        this.table = hconn.getTable(TableName.valueOf(tableName));
+        this.scanner = table.getScanner(family, qualifier);
+        this.iterator = scanner.iterator();
+    }
+
+    @Override
+    public void close() {
+        IOUtils.closeQuietly(scanner);
+        IOUtils.closeQuietly(table);
+    }
+
+    @Override
+    public Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator() {
+        return new MyIterator();
+    }
+
+    private class MyIterator implements Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> {
+
+        ImmutableBytesWritable key = new ImmutableBytesWritable();
+        ImmutableBytesWritable value = new ImmutableBytesWritable();
+        Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair = new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(key, value);
+
+        @Override
+        public boolean hasNext() {
+            return iterator.hasNext();
+        }
+
+        @Override
+        public Pair<ImmutableBytesWritable, ImmutableBytesWritable> next() {
+            Result r = iterator.next();
+            Cell c = r.getColumnLatestCell(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
+            key.set(c.getRowArray(), c.getRowOffset(), c.getRowLength());
+            value.set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+            return pair;
+        }
+
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
index afb49c0..e518a4c 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
@@ -1,57 +1,57 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase;
-
-import java.util.ArrayList;
-
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.kylin.common.persistence.HBaseConnection;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.IStorageEngine;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointTupleIterator;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexStorageEngine implements IStorageEngine {
-
-    private IISegment seg;
-
-    public InvertedIndexStorageEngine(IIInstance ii) {
-        this.seg = ii.getFirstSegment();
-    }
-
-    @Override
-    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest) {
-        String tableName = seg.getStorageLocationIdentifier();
-
-        //HConnection is cached, so need not be closed
-        HConnection conn = HBaseConnection.get(context.getConnUrl());
-        try {
-            return new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn);
-        } catch (Throwable e) {
-            e.printStackTrace();
-            throw new IllegalStateException("Error when connecting to II htable " + tableName, e);
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.IStorageEngine;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointTupleIterator;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexStorageEngine implements IStorageEngine {
+
+    private IISegment seg;
+
+    public InvertedIndexStorageEngine(IIInstance ii) {
+        this.seg = ii.getFirstSegment();
+    }
+
+    @Override
+    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest) {
+        String tableName = seg.getStorageLocationIdentifier();
+
+        // Connection is cached, so need not be closed
+        Connection conn = HBaseConnection.get(context.getConnUrl());
+        try {
+            return new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn);
+        } catch (Throwable e) {
+            e.printStackTrace();
+            throw new IllegalStateException("Error when connecting to II htable " + tableName, e);
+        }
+    }
+}


Mime
View raw message