Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 18F76108EB for ; Thu, 8 Aug 2013 22:59:06 +0000 (UTC) Received: (qmail 48085 invoked by uid 500); 8 Aug 2013 22:59:06 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 48056 invoked by uid 500); 8 Aug 2013 22:59:05 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 48049 invoked by uid 99); 8 Aug 2013 22:59:05 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Aug 2013 22:59:05 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Aug 2013 22:59:02 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 209AE238890D; Thu, 8 Aug 2013 22:58:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1512089 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java Date: Thu, 08 Aug 2013 22:58:41 -0000 To: commits@hbase.apache.org From: larsh@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130808225841.209AE238890D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: larsh Date: Thu Aug 8 22:58:40 2013 New Revision: 1512089 URL: http://svn.apache.org/r1512089 Log: HBASE-9158 Serious bug in cyclic replication Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1512089&r1=1512088&r2=1512089&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Thu Aug 8 22:58:40 2013 @@ -20,14 +20,13 @@ package org.apache.hadoop.hbase.replicat import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -45,14 +44,12 @@ import org.apache.hadoop.hbase.Stoppable import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.util.Threads; /** * This class is responsible for replicating the edits coming @@ -75,7 +72,6 @@ public class ReplicationSink { // Name of the HDFS directory that contains the temporary rep logs public static final String REPLICATION_LOG_DIR = ".replogs"; private final Configuration conf; - private final ExecutorService sharedThreadPool; private final HConnection sharedHtableCon; private final MetricsSink metrics; private final AtomicLong totalReplicatedEdits = new AtomicLong(); @@ -93,11 +89,6 @@ public class ReplicationSink { decorateConf(); this.metrics = new MetricsSink(); this.sharedHtableCon = HConnectionManager.createConnection(this.conf); - this.sharedThreadPool = new ThreadPoolExecutor(1, - conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE), - conf.getLong("hbase.htable.threads.keepalivetime", 60), TimeUnit.SECONDS, - new SynchronousQueue(), Threads.newDaemonThreadFactory("hbase-repl")); - ((ThreadPoolExecutor) this.sharedThreadPool).allowCoreThreadTimeOut(true); } /** @@ -125,10 +116,9 @@ public class ReplicationSink { // to the same table. try { long totalReplicated = 0; - // Map of table => list of Rows, we only want to flushCommits once per - // invocation of this method per table. - Map> rows = - new TreeMap>(); + // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per + // invocation of this method per table and cluster id. + Map>> rowMap = new TreeMap>>(); for (WALEntry entry : entries) { TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); @@ -148,7 +138,7 @@ public class ReplicationSink { new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()): new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); m.setClusterId(uuid); - addToMultiMap(rows, table, m); + addToHashMultiMap(rowMap, table, uuid, m); } if (CellUtil.isDelete(cell)) { ((Delete)m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell)); @@ -159,8 +149,8 @@ public class ReplicationSink { } totalReplicated++; } - for (Entry> entry : rows.entrySet()) { - batch(entry.getKey(), entry.getValue()); + for (Entry>> entry : rowMap.entrySet()) { + batch(entry.getKey(), entry.getValue().values()); } int size = entries.size(); this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime()); @@ -190,15 +180,21 @@ public class ReplicationSink { * Simple helper to a map from key to (a list of) values * TODO: Make a general utility method * @param map - * @param key + * @param key1 + * @param key2 * @param value * @return */ - private List addToMultiMap(Map> map, K key, V value) { - List values = map.get(key); + private List addToHashMultiMap(Map>> map, K1 key1, K2 key2, V value) { + Map> innerMap = map.get(key1); + if (innerMap == null) { + innerMap = new HashMap>(); + map.put(key1, innerMap); + } + List values = innerMap.get(key2); if (values == null) { values = new ArrayList(); - map.put(key, values); + innerMap.put(key2, values); } values.add(value); return values; @@ -209,15 +205,6 @@ public class ReplicationSink { */ public void stopReplicationSinkServices() { try { - this.sharedThreadPool.shutdown(); - if (!this.sharedThreadPool.awaitTermination(60000, TimeUnit.MILLISECONDS)) { - this.sharedThreadPool.shutdownNow(); - } - } catch (InterruptedException e) { - LOG.warn("Interrupted while closing the table pool", e); // ignoring it as we are closing. - Thread.currentThread().interrupt(); - } - try { this.sharedHtableCon.close(); } catch (IOException e) { LOG.warn("IOException while closing the connection", e); // ignoring as we are closing. @@ -231,14 +218,16 @@ public class ReplicationSink { * @param rows list of actions * @throws IOException */ - private void batch(TableName tableName, List rows) throws IOException { - if (rows.isEmpty()) { + private void batch(TableName tableName, Collection> allRows) throws IOException { + if (allRows.isEmpty()) { return; } HTableInterface table = null; try { - table = new HTable(tableName, this.sharedHtableCon, this.sharedThreadPool); - table.batch(rows); + table = this.sharedHtableCon.getTable(tableName); + for (List rows : allRows) { + table.batch(rows); + } } catch (InterruptedException ix) { throw new IOException(ix); } finally { Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java?rev=1512089&r1=1512088&r2=1512089&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java (original) +++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java Thu Aug 8 22:58:40 2013 @@ -73,6 +73,8 @@ public class TestMasterReplication { private static final byte[] row = Bytes.toBytes("row"); private static final byte[] row1 = Bytes.toBytes("row1"); private static final byte[] row2 = Bytes.toBytes("row2"); + private static final byte[] row3 = Bytes.toBytes("row3"); + private static final byte[] row4 = Bytes.toBytes("row4"); private static final byte[] noRepfamName = Bytes.toBytes("norep"); private static final byte[] count = Bytes.toBytes("count"); @@ -178,6 +180,21 @@ public class TestMasterReplication { assertEquals("Deletes were replicated back ", 3, getCount(htable1, delete)); assertEquals("Deletes were replicated back ", 3, getCount(htable2, delete)); assertEquals("Deletes were replicated back ", 3, getCount(htable3, delete)); + + // Test HBASE-9158 + admin2.disablePeer("1"); + // we now have an edit that was replicated into cluster originating from cluster 1 + putAndWait(row3, famName, htable1, htable2); + // now add a local edit to cluster 2 + Put put = new Put(row4); + put.add(famName, row4, row4); + htable2.put(put); + // reenable replication from cluster 2 to cluster 3 + admin2.enablePeer("1"); + // without HBASE-9158 the edit for row4 would have been marked with cluster 1's id + // and hence not replicated to cluster 1 + wait(row4, htable1); + utility3.shutdownMiniCluster(); utility2.shutdownMiniCluster(); utility1.shutdownMiniCluster(); @@ -271,6 +288,10 @@ public class TestMasterReplication { put.add(fam, row, row); source.put(put); + wait(row, target); + } + + private void wait(byte[] row, HTable target) throws Exception { Get get = new Get(row); for (int i = 0; i < NB_RETRIES; i++) { if (i==NB_RETRIES-1) { @@ -284,7 +305,7 @@ public class TestMasterReplication { assertArrayEquals(res.value(), row); break; } - } + } } /**