hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1546424 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java main/java/org/apache/hadoop/hbase/util/Bytes.java test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
Date Thu, 28 Nov 2013 18:13:16 GMT
Author: liyin
Date: Thu Nov 28 18:13:15 2013
New Revision: 1546424

URL: http://svn.apache.org/r1546424
Log:
[HBASE-10009] Wraps byte[] by a new defined ByteArray, whose equals/hashCode are overridden
before the byte-array is used as a key to the map.

Author: daviddeng

Summary: Using byte[] directly as a key would not map two instance of array with same content
to the same value. A wrapper class is defined to correct this.

Test Plan: Run test case: TestHTableMultiplexer

Reviewers: adela, ehwang, liyintang, manukranthk

Reviewed By: liyintang

CC: mahesh, hbase-eng@

Differential Revision: https://phabricator.fb.com/D1062655

Task ID: 3142036

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java?rev=1546424&r1=1546423&r2=1546424&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
Thu Nov 28 18:13:15 2013
@@ -19,14 +19,6 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
-
 import java.io.IOException;
 import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
@@ -38,10 +30,20 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
+import org.apache.hadoop.hbase.util.Bytes;
+
 /**
  * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
  * Each put will be sharded into different buffer queues based on its destination region
server.
@@ -86,7 +88,8 @@ public class HTableMultiplexer {
     this.serverToBufferQueueMap = new ConcurrentHashMap<HServerAddress,
       LinkedBlockingQueue<PutStatus>>();
     this.serverToFlushWorkerMap = new ConcurrentHashMap<HServerAddress, HTableFlushWorker>();
-    this.tableNameToHTableMap = new ConcurrentHashMap<byte[], HTable>();
+    this.tableNameToHTableMap = new ConcurrentSkipListMap<byte[], HTable>(
+            Bytes.BYTES_COMPARATOR);
     this.retryNum = conf.getInt("hbase.client.retries.number", 10);
     this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
   }
@@ -150,7 +153,6 @@ public class HTableMultiplexer {
       return false;
     }
 
-    LinkedBlockingQueue<PutStatus> queue;
     HTable htable = getHTable(table);
     try {
       htable.validatePut(put);
@@ -159,7 +161,7 @@ public class HTableMultiplexer {
         // Get the server location for the put
         HServerAddress addr = loc.getServerAddress();
         // Add the put pair into its corresponding queue.
-        queue = getBufferedQueue(addr);
+        LinkedBlockingQueue<PutStatus> queue = getBufferedQueue(addr);
         // Generate a MultiPutStatus obj and offer it into the queue
         PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry, options);
         
@@ -175,7 +177,8 @@ public class HTableMultiplexer {
    * @return the current HTableMultiplexerStatus
    */
   public HTableMultiplexerStatus getHTableMultiplexerStatus() {
-    return new HTableMultiplexerStatus(serverToFlushWorkerMap);
+    return new HTableMultiplexerStatus(this.serverToFlushWorkerMap,
+        this.tableNameToHTableMap);
   }
 
   private HTable getHTable(final byte[] table) throws IOException {
@@ -306,8 +309,11 @@ public class HTableMultiplexer {
     private MultiPutBatchMetrics metrics;
     private long overallAvgMultiPutSize;
     private Map<HServerAddress, HTableFlushWorker> serverToFlushWorkerMap;
+    private Map<byte[], HTable> tableNameToHTableMap;
 
-    public HTableMultiplexerStatus(Map<HServerAddress, HTableFlushWorker> serverToFlushWorkerMap)
{
+    public HTableMultiplexerStatus(
+        Map<HServerAddress, HTableFlushWorker> serverToFlushWorkerMap,
+        Map<byte[], HTable> tableNameToHTableMap) {
       this.totalBufferedPutCounter = 0;
       this.totalFailedPutCounter = 0;
       this.totalSucceededPutCounter = 0;
@@ -321,6 +327,7 @@ public class HTableMultiplexer {
       this.serverToAverageLatencyMap = new HashMap<String, Long>();
       this.serverToMaxLatencyMap = new HashMap<String, Long>();
       this.serverToFlushWorkerMap = serverToFlushWorkerMap;
+      this.tableNameToHTableMap = tableNameToHTableMap;
       this.metrics = new MultiPutBatchMetrics();
       this.initialize();
     }
@@ -457,6 +464,10 @@ public class HTableMultiplexer {
     public MultiPutBatchMetrics getMetrics() {
       return metrics;
     }
+
+    public int getStoredHTableCount() {
+      return this.tableNameToHTableMap.size();
+    }
   }
   
   private static class PutStatus {
@@ -485,6 +496,7 @@ public class HTableMultiplexer {
      * @deprecated Use {@link #getMaxRetryCount()} instead.
      * @return
      */
+    @SuppressWarnings("unused")
     @Deprecated
     public int getRetryCount() {
       return getMaxRetryCount();
@@ -599,6 +611,7 @@ public class HTableMultiplexer {
      * @deprecated Use {@link #getAndResetMaxLatency()} instead.
      * @return
      */
+    @SuppressWarnings("unused")
     @Deprecated
     public long getMaxLatency() {
       return this.maxLatency.getAndSet(0);

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java?rev=1546424&r1=1546423&r2=1546424&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java Thu Nov 28
18:13:15 2013
@@ -19,14 +19,6 @@
  */
 package org.apache.hadoop.hbase.util;
 
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -36,6 +28,14 @@ import java.nio.ByteBuffer;
 import java.util.Comparator;
 import java.util.Iterator;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+
 /**
  * Utility class that handles byte arrays, conversions to/from other types,
  * comparisons, hash code generation, manufacturing keys for HashMaps or
@@ -1477,5 +1477,4 @@ public class Bytes {
   public static boolean isNonEmpty(ByteBuffer b) {
     return b != null && b.remaining() > 0;
   }
-
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java?rev=1546424&r1=1546423&r2=1546424&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
Thu Nov 28 18:13:15 2013
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.client;
 
 import junit.framework.Assert;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -32,11 +33,13 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 public class TestHTableMultiplexer {
   final Log LOG = LogFactory.getLog(getClass());
-  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private final static HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
   private static byte[] FAMILY = Bytes.toBytes("testFamily");
   private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
   private static byte[] VALUE1 = Bytes.toBytes("testValue1");
@@ -68,8 +71,8 @@ public class TestHTableMultiplexer {
     List<Put> failedPuts = null;
     boolean success = false;
     
-    HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(), 
-        PER_REGIONSERVER_QUEUE_SIZE);
+    HTableMultiplexer multiplexer = new HTableMultiplexer(
+        TEST_UTIL.getConfiguration(), PER_REGIONSERVER_QUEUE_SIZE);
     HTableMultiplexerStatus status = multiplexer.getHTableMultiplexerStatus();
 
     HTable ht = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, VERSION,
@@ -201,4 +204,34 @@ public class TestHTableMultiplexer {
         Bytes.compareTo(value, r.getValue(FAMILY, qualifier)));
     }
   }
+  /**
+   * This test is to verify that different instances of byte-array with same
+   * content as the table names will result in the same HTable instance.
+   */
+  @Test
+  public void testCachedOfHTable() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong("hbase.htablemultiplexer.flush.frequency.ms", 100);
+    HTableMultiplexer multiplexer =
+        new HTableMultiplexer(TEST_UTIL.getConfiguration(), 100);
+
+    HTableMultiplexerStatus status = multiplexer.getHTableMultiplexerStatus();
+    Assert.assertEquals("storedHTableCount", 0, status.getStoredHTableCount());
+
+    byte[] TABLE = Bytes.toBytes("testCounters");
+
+    byte[] row = Bytes.toBytes("Row" + 1);
+    byte[] qualifier = Bytes.toBytes("Qualifier" + 1);
+    byte[] value = Bytes.toBytes("Value" + 1);
+    Put put = new Put(row);
+    put.add(FAMILY, qualifier, value);
+    // first put
+    multiplexer.put(TABLE, put, HBaseRPCOptions.DEFAULT);
+    Assert.assertEquals("storedHTableCount", 1, status.getStoredHTableCount());
+    // second put
+    byte[] TABLE1 = Arrays.copyOf(TABLE, TABLE.length);
+    multiplexer.put(TABLE1, put, HBaseRPCOptions.DEFAULT);
+    Assert.assertEquals("storedHTableCount", 1, status.getStoredHTableCount());
+  }
 }
+



Mime
View raw message