hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1594662 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/coprocessor/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/coprocessor/observers/ tes...
Date Wed, 14 May 2014 18:18:29 GMT
Author: liyin
Date: Wed May 14 18:18:28 2014
New Revision: 1594662

URL: http://svn.apache.org/r1594662
Log:
[HBASE-2000] load coprocessors from jar published in hdfs

Author: adela

Summary: on online configuration change - load jars from hdfs for coprocessors

Test Plan: added TestClassLoading, also ran TestHRegionObserverBypassCoprocessor and made
sure it passes

Reviewers: daviddeng

Reviewed By: daviddeng

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D1308323

Task ID: 4171555

Added:
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/ClassLoaderTestHelper.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/TestClassLoading.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java?rev=1594662&r1=1594661&r2=1594662&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java
Wed May 14 18:18:28 2014
@@ -1065,7 +1065,7 @@ private HRegionLocation locateMetaInRoot
         LOG.debug("IOException locateRegionInMeta attempt " + tries
           + " of " + params.getNumRetries()
           + " failed; retrying after sleep of "
-          + params.getPauseTime(tries) + " because: " + e.getMessage());
+          + params.getPauseTime(tries) + " because: " + e.getMessage(), e);
       } else {
         throw e;
       }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1594662&r1=1594661&r2=1594662&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
Wed May 14 18:18:28 2014
@@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.coproces
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.SortedSet;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.DoNotRetr
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.ipc.ThriftClientInterface;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.hbase.util.coprocessor.CoprocessorClassLoader;
 import org.apache.hadoop.hbase.util.coprocessor.SortedCopyOnWriteSet;
@@ -64,7 +67,7 @@ public abstract class CoprocessorHost<E 
     "hbase.coprocessor.wal.classes";
   public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
   public static final boolean DEFAULT_ABORT_ON_ERROR = true;
-
+  public static final String USER_REGION_COPROCESSOR_FROM_HDFS_KEY = "hbase.coprocessor.jars.and.classes";
 
   private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
   protected ThriftClientInterface tcInter;
@@ -125,9 +128,11 @@ public abstract class CoprocessorHost<E 
     List<E> configured = new ArrayList<E>();
     for (String className : defaultCPClasses) {
       className = className.trim();
-      if (findCoprocessor(className) != null) {
-        continue;
-      }
+      // TODO: check if we want to reenable this behavior later!
+      // if (findCoprocessor(className) != null) {
+      // System.out.println("coprocessor found with name: " + className);
+      // continue;
+      // }
       ClassLoader cl = this.getClass().getClassLoader();
       Thread.currentThread().setContextClassLoader(cl);
       try {
@@ -143,17 +148,91 @@ public abstract class CoprocessorHost<E 
   }
 
   /**
-   * Generally used when we do online configuration change for the loaded coprocessors
+   * Generally used when we do online configuration change for the loaded
+   * coprocessors
+   *
    * @param conf
    * @param confKey
    */
-  protected void reloadSysCoprocessorsOnConfigChange(Configuration conf, String confKey)
{
-    //remove whatever is loaded already
-    coprocessors.clear();
+  protected void reloadSysCoprocessorsOnConfigChange(Configuration conf,
+      String confKey) {
+    // remove whatever is loaded already
     loadSystemCoprocessors(conf, confKey);
   }
 
   /**
+   * read coprocessors that should be loaded from configuration
+   *
+   * @param conf
+   */
+  private List<Pair<String, String>> readCoprocessorsFromConf(Configuration conf)
{
+    List<Pair<String, String>> jarClass = new ArrayList<>();
+    Collection<String> jarsAndImpls = conf
+        .getStringCollection(USER_REGION_COPROCESSOR_FROM_HDFS_KEY);
+    String jar = null;
+    for (Iterator<String> iterator = jarsAndImpls.iterator(); iterator
+        .hasNext();) {
+      String string = (String) iterator.next();
+      if (string.endsWith(".jar")) {
+        if (checkIfCorrectPath(string)) {
+          jar = string;
+        } else {
+          LOG.error("jar " + string
+              + " is not placed on the correct path or path is incorrect!");
+          return null;
+        }
+      } else {
+        jarClass.add(new Pair<>(jar, string));
+      }
+    }
+    return jarClass;
+  }
+
+  /**
+   * Checks if the specified path for the coprocessor jar is in expected format
+   * TODO: make this more advanced - currently is very hardoced and dummy
+   *
+   * @param path
+   *          - Absolute and complete path where the coprocessor jar resides
+   *          Expected path is everything in this format:
+   *          coprocessors/project/version(integer)/jarfile
+   * @return
+   */
+  public static boolean checkIfCorrectPath(String path) {
+    String[] firstSplit = path.split(":");
+    String[] parts = firstSplit[2].split("/");
+    // port is included here
+    if (parts.length != 5) {
+      return false;
+    }
+    if (!parts[1].equals("coprocessors")) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Used to load coprocessors whose jar is specified via hdfs path. All
+   * existing coprocessors will be unloaded (if they appear again in the new
+   * configuration then they will be re-added).
+   *
+   * @param config
+   * @throws IOException
+   */
+  protected void reloadCoprocessorsFromHdfs(Configuration config)
+      throws IOException {
+    List<Pair<String, String>> fromConf = this.readCoprocessorsFromConf(config);
+    List<E> newCoprocessors = new ArrayList<>();
+    for (Pair<String, String> pair : fromConf) {
+      System.out.println(pair.getFirst() + " " + pair.getSecond());
+      E coproc = load(new Path(pair.getFirst()), pair.getSecond(),
+          Coprocessor.PRIORITY_USER, config);
+      newCoprocessors.add(coproc);
+    }
+    this.coprocessors.addAll(newCoprocessors);
+  }
+
+  /**
    * Load a coprocessor implementation into the host
    * @param path path to implementation jar
    * @param className the main class name

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java?rev=1594662&r1=1594661&r2=1594662&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorHost.java
Wed May 14 18:18:28 2014
@@ -73,14 +73,16 @@ public class RegionCoprocessorHost exten
 
   /**
    * Used mainly when we dynamically reload the configuration
+   * @throws IOException
    */
-  public void reloadCoprocessors(Configuration newConf) {
+  public void reloadCoprocessors(Configuration newConf) throws IOException {
+    coprocessors.clear();
     // reload system default cp's from configuration.
     reloadSysCoprocessorsOnConfigChange(newConf, REGION_COPROCESSOR_CONF_KEY);
     // reload system default cp's for user tables from configuration.
-    //TODO: check whether this checks for ROOT too
     if (!region.getRegionInfo().getTableDesc().isMetaRegion()
         && !region.getRegionInfo().getTableDesc().isRootRegion()) {
+      reloadCoprocessorsFromHdfs(newConf);
       reloadSysCoprocessorsOnConfigChange(newConf, USER_REGION_COPROCESSOR_CONF_KEY);
     }
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1594662&r1=1594661&r2=1594662&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Wed May 14 18:18:28 2014
@@ -550,13 +550,21 @@ public class HRegion implements HeapSize
     this.coprocessorHost = new RegionCoprocessorHost(this,
         this.conf);
     // initialize dynamic parameters with current configuration
-    this.loadDynamicConf(conf);
+    try {
+      this.loadDynamicConf(conf);
+    } catch (IOException e) {
+      LOG.error("Was unable to load coprocessors from configuration", e);
+    }
   }
 
   @Override
   public void notifyOnChange(Configuration conf) {
     LOG.info("Online configuration changed!");
-    this.loadDynamicConf(conf);
+    try {
+      this.loadDynamicConf(conf);
+    } catch (IOException e) {
+      LOG.error("Was unable to load coprocessors from configuration", e);
+    }
   }
 
   private static void logIfChange(String varName, long orgV, long newV) {
@@ -566,8 +574,9 @@ public class HRegion implements HeapSize
   }
   /**
    * Load online configurable parameters from a specified Configuration
+   * @throws IOException
    */
-  private void loadDynamicConf(Configuration conf) {
+  private void loadDynamicConf(Configuration conf) throws IOException {
     long newColumnfamilyMemstoreFlushSize = conf.getLong(
         HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE,
         HTableDescriptor.DEFAULT_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE);

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java?rev=1594662&r1=1594661&r2=1594662&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/observers/TestHRegionObserverBypassCoprocessor.java
Wed May 14 18:18:28 2014
@@ -8,8 +8,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import junit.framework.Assert;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -28,6 +26,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/ClassLoaderTestHelper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/ClassLoaderTestHelper.java?rev=1594662&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/ClassLoaderTestHelper.java
(added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/ClassLoaderTestHelper.java
Wed May 14 18:18:28 2014
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util.coprocessor;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
+
+import javax.tools.JavaCompiler;
+import javax.tools.JavaFileObject;
+import javax.tools.StandardJavaFileManager;
+import javax.tools.ToolProvider;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Some utilities to help class loader testing
+ */
+public class ClassLoaderTestHelper {
+  private static final Log LOG = LogFactory.getLog(ClassLoaderTestHelper.class);
+
+  private static final int BUFFER_SIZE = 4096;
+
+  /**
+   * Jar a list of files into a jar archive.
+   *
+   * @param archiveFile the target jar archive
+   * @param tobejared a list of files to be jared
+   */
+  private static boolean createJarArchive(File archiveFile, File[] tobeJared) {
+    try {
+      byte buffer[] = new byte[BUFFER_SIZE];
+      // Open archive file
+      FileOutputStream stream = new FileOutputStream(archiveFile);
+      JarOutputStream out = new JarOutputStream(stream, new Manifest());
+
+      for (int i = 0; i < tobeJared.length; i++) {
+        if (tobeJared[i] == null || !tobeJared[i].exists()
+            || tobeJared[i].isDirectory()) {
+          continue;
+        }
+
+        // Add archive entry
+        JarEntry jarAdd = new JarEntry(tobeJared[i].getName());
+        jarAdd.setTime(tobeJared[i].lastModified());
+        out.putNextEntry(jarAdd);
+
+        // Write file to archive
+        FileInputStream in = new FileInputStream(tobeJared[i]);
+        while (true) {
+          int nRead = in.read(buffer, 0, buffer.length);
+          if (nRead <= 0)
+            break;
+          out.write(buffer, 0, nRead);
+        }
+        in.close();
+      }
+      out.close();
+      stream.close();
+      LOG.info("Adding classes to jar file completed");
+      return true;
+    } catch (Exception ex) {
+      LOG.error("Error: " + ex.getMessage());
+      return false;
+    }
+  }
+
+  /**
+   * Create a test jar for testing purpose for a given class
+   * name with specified code string: save the class to a file,
+   * compile it, and jar it up. If the code string passed in is
+   * null, a bare empty class will be created and used.
+   *
+   * @param testDir the folder under which to store the test class and jar
+   * @param className the test class name
+   * @param code the optional test class code, which can be null.
+   * If null, a bare empty class will be used
+   * @return the test jar file generated
+   */
+  public static File buildJar(String testDir,
+      String className, String code) throws Exception {
+    return buildJar(testDir, className, code, testDir);
+  }
+
+  /**
+   * Create a test jar for testing purpose for a given class
+   * name with specified code string.
+   *
+   * @param testDir the folder under which to store the test class
+   * @param className the test class name
+   * @param code the optional test class code, which can be null.
+   * If null, an empty class will be used
+   * @param folder the folder under which to store the generated jar
+   * @return the test jar file generated
+   */
+  public static File buildJar(String testDir,
+      String className, String code, String folder) throws Exception {
+    String javaCode = code != null ? code : "public class " + className + " {}";
+    Path srcDir = new Path(testDir, "src");
+    File srcDirPath = new File(srcDir.toString());
+    srcDirPath.mkdirs();
+    File sourceCodeFile = new File(srcDir.toString(), className + ".java");
+    BufferedWriter bw = new BufferedWriter(new FileWriter(sourceCodeFile));
+    bw.write(javaCode);
+    bw.close();
+
+    // compile it by JavaCompiler
+    JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+    ArrayList<String> srcFileNames = new ArrayList<String>();
+    srcFileNames.add(sourceCodeFile.toString());
+    StandardJavaFileManager fm = compiler.getStandardFileManager(null, null,
+      null);
+    Iterable<? extends JavaFileObject> cu =
+      fm.getJavaFileObjects(sourceCodeFile);
+    List<String> options = new ArrayList<String>();
+    options.add("-classpath");
+    // only add hbase classes to classpath. This is a little bit tricky: assume
+    // the classpath is {hbaseSrc}/target/classes.
+    String currentDir = new File(".").getAbsolutePath();
+    String classpath = currentDir + File.separator + "target"+ File.separator
+      + "classes" + System.getProperty("path.separator")
+      + System.getProperty("java.class.path") + System.getProperty("path.separator")
+      + System.getProperty("surefire.test.class.path");
+
+    options.add(classpath);
+    LOG.debug("Setting classpath to: " + classpath);
+
+    JavaCompiler.CompilationTask task = compiler.getTask(null, fm, null,
+      options, null, cu);
+    assertTrue("Compile file " + sourceCodeFile + " failed.", task.call());
+
+    // build a jar file by the classes files
+    String jarFileName = className + ".jar";
+    File jarFile = new File(folder, jarFileName);
+    jarFile.getParentFile().mkdirs();
+    if (!createJarArchive(jarFile,
+        new File[]{new File(srcDir.toString(), className + ".class")})){
+      assertTrue("Build jar file failed.", false);
+    }
+    return jarFile;
+  }
+
+  /**
+   * Add a list of jar files to another jar file under a specific folder.
+   * It is used to generated coprocessor jar files which can be loaded by
+   * the coprocessor class loader.  It is for testing usage only so we
+   * don't be so careful about stream closing in case any exception.
+   *
+   * @param targetJar the target jar file
+   * @param libPrefix the folder where to put inner jar files
+   * @param srcJars the source inner jar files to be added
+   * @throws Exception if anything doesn't work as expected
+   */
+  public static void addJarFilesToJar(File targetJar,
+      String libPrefix, File... srcJars) throws Exception {
+    FileOutputStream stream = new FileOutputStream(targetJar);
+    JarOutputStream out = new JarOutputStream(stream, new Manifest());
+    byte buffer[] = new byte[BUFFER_SIZE];
+
+    for (File jarFile: srcJars) {
+      // Add archive entry
+      JarEntry jarAdd = new JarEntry(libPrefix + jarFile.getName());
+      jarAdd.setTime(jarFile.lastModified());
+      out.putNextEntry(jarAdd);
+
+      // Write file to archive
+      FileInputStream in = new FileInputStream(jarFile);
+      while (true) {
+        int nRead = in.read(buffer, 0, buffer.length);
+        if (nRead <= 0)
+          break;
+        out.write(buffer, 0, nRead);
+      }
+      in.close();
+    }
+    out.close();
+    stream.close();
+    LOG.info("Adding jar file to outer jar file completed");
+  }
+
+  static String localDirPath(Configuration conf) {
+    return conf.get(CoprocessorClassLoader.LOCAL_DIR_KEY)
+      + File.separator + "jars" + File.separator;
+  }
+
+}

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/TestClassLoading.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/TestClassLoading.java?rev=1594662&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/TestClassLoading.java
(added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/coprocessor/TestClassLoading.java
Wed May 14 18:18:28 2014
@@ -0,0 +1,189 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.observers.TestHRegionObserverBypassCoprocessor.TestCoprocessor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+* Test coprocessors class loading.
+*/
+public class TestClassLoading {
+ private static final Log LOG = LogFactory.getLog(TestClassLoading.class);
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final byte[] DUMMY = Bytes.toBytes("dummy");
+ private static final byte[] TEST = Bytes.toBytes("test");
+
+ private static MiniDFSCluster cluster;
+
+ static final String tableName = "TestClassLoading";
+ static final String cpName1 = "TestCP1";
+ static final String cpName2 = "TestCP2";
+
+ private static Class<?> testCoprocessor = TestCoprocessor.class;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    // load TestCoprocessor in the beginning
+    conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+        testCoprocessor.getName());
+    TEST_UTIL.startMiniCluster(1);
+    cluster = TEST_UTIL.getDFSCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+    if (admin.tableExists(tableName)) {
+      if (admin.isTableEnabled(tableName)) {
+        admin.disableTable(tableName);
+      }
+      admin.deleteTable(tableName);
+    }
+    TEST_UTIL.createTable(Bytes.toBytes(tableName),
+        new byte[][] { DUMMY, TEST });
+  }
+
+  static File buildCoprocessorJar(String className) throws Exception {
+    String code = "import org.apache.hadoop.hbase.coprocessor.observers.BaseRegionObserver;\n"
+        + "public class " + className + " extends BaseRegionObserver {}";
+    return ClassLoaderTestHelper.buildJar(TEST_UTIL.getDFSCluster()
+        .getDataDirectory().toString(), className, code);
+  }
+
+  public static String buildCorrectPathForCoprocessorJar(String dataDir) {
+    System.out.println("dataDir: " + dataDir);
+    String newStr = dataDir + File.separator+"coprocessors" + File.separator+"test" + File.separator
+ "1";
+    System.out.println(newStr);
+    return newStr;
+  }
+
+  @Test
+  public void testClassLoadingFromHDFS() throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+    File jarFile1 = buildCoprocessorJar(cpName1);
+    System.out.println(jarFile1.getName());
+    File jarFile2 = buildCoprocessorJar(cpName2);
+    // have to create directories because we are not placing the files on hdfs
+    // root
+    assertTrue(fs.mkdirs(new Path("/coprocessors/test/1")));
+    // copy the jars into dfs
+    fs.moveFromLocalFile(new Path(jarFile1.getPath()), new Path(fs.getUri()
+        .toString(), buildCorrectPathForCoprocessorJar("") + Path.SEPARATOR));
+
+    String jarFileOnHDFS1 = buildCorrectPathForCoprocessorJar(fs.getUri()
+        .toString()) + Path.SEPARATOR + jarFile1.getName();
+    Path pathOnHDFS1 = new Path(jarFileOnHDFS1);
+    System.out.println(jarFileOnHDFS1);
+    assertTrue("Copy jar file to HDFS failed.", fs.exists(pathOnHDFS1));
+    LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS1);
+
+    fs.moveFromLocalFile(new Path(jarFile2.getPath()), new Path(fs.getUri()
+        .toString(), buildCorrectPathForCoprocessorJar("") + Path.SEPARATOR));
+
+    String jarFileOnHDFS2 = buildCorrectPathForCoprocessorJar(fs.getUri()
+        .toString()) + Path.SEPARATOR + jarFile2.getName();
+    Path pathOnHDFS2 = new Path(jarFileOnHDFS2);
+    assertTrue("Copy jar file to HDFS failed.", fs.exists(pathOnHDFS2));
+    LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS2);
+    Configuration conf = TEST_UTIL.getConfiguration();
+
+    // check if only TestCoprocessor is currently loaded
+    List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegions(Bytes.toBytes(tableName));
+    Set<String> expectedCoprocessorSimpleName = new HashSet<>();
+    Set<String> allCoprocessors = RegionCoprocessorHost
+        .getEverLoadedCoprocessors();
+    assertEquals("Number of coprocessors ever loaded", 1,
+        allCoprocessors.size());
+    assertEquals("Expected loaded coprocessor",
+        TestCoprocessor.class.getName(), allCoprocessors.toArray()[0]);
+    // do online config change and confirm the new coprocessor is loaded
+    CoprocessorClassLoader.clearCache();
+    // remove the firstly added coprocessor
+    conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, "");
+    conf.set(CoprocessorHost.USER_REGION_COPROCESSOR_FROM_HDFS_KEY,
+        pathOnHDFS1 + "," + cpName1);
+
+    // invoke online configuration change
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    // check everloaded coprocessors
+    allCoprocessors = RegionCoprocessorHost.getEverLoadedCoprocessors();
+    assertEquals("Number of coprocessors ever loaded", 2,
+        allCoprocessors.size());
+    expectedCoprocessorSimpleName.add(cpName1);
+
+    for (HRegion r : regions) {
+      Set<String> currentCoprocessors = r.getCoprocessorHost()
+          .getCoprocessors();
+      assertEquals("Number of current coprocessors", 1,
+          currentCoprocessors.size());
+      assertEquals("Expected loaded coprocessors",
+          expectedCoprocessorSimpleName, currentCoprocessors);
+    }
+    //now load the second coprocessor too
+    String current = conf.get(CoprocessorHost.USER_REGION_COPROCESSOR_FROM_HDFS_KEY);
+    current +="," + pathOnHDFS2+"," + cpName2;
+    conf.set(CoprocessorHost.USER_REGION_COPROCESSOR_FROM_HDFS_KEY, current);
+    // invoke online config change
+    HRegionServer.configurationManager.notifyAllObservers(conf);
+    allCoprocessors = RegionCoprocessorHost.getEverLoadedCoprocessors();
+    assertEquals("Number of ever loaded coprocessors", 3,
+        allCoprocessors.size());
+    expectedCoprocessorSimpleName.add(cpName2);
+    for (HRegion r : regions) {
+      Set<String> currentCoprocessors = r.getCoprocessorHost().getCoprocessors();
+      assertTrue("Number of currently loaded coprocessors",
+          currentCoprocessors.size() == 2);
+      assertEquals("Expected loaded coprocessors",
+          expectedCoprocessorSimpleName, currentCoprocessors);
+    }
+  }
+
+}



Mime
View raw message