Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DD0C2107FE for ; Wed, 28 Aug 2013 19:32:36 +0000 (UTC) Received: (qmail 56550 invoked by uid 500); 28 Aug 2013 19:32:35 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 56514 invoked by uid 500); 28 Aug 2013 19:32:35 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 56507 invoked by uid 99); 28 Aug 2013 19:32:35 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Aug 2013 19:32:35 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Aug 2013 19:32:24 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A489523888CD; Wed, 28 Aug 2013 19:32:01 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1518335 [1/2] - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ hbase-protocol/src/main/protobuf/ hbase-server/src/main/java/org/apache/ha... Date: Wed, 28 Aug 2013 19:32:00 -0000 To: commits@hbase.apache.org From: stack@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130828193201.A489523888CD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: stack Date: Wed Aug 28 19:32:00 2013 New Revision: 1518335 URL: http://svn.apache.org/r1518335 Log: HBASE-7709 Infinite loop possible in Master/Master replication Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java?rev=1518335&r1=1518334&r2=1518335&view=diff ============================================================================== --- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java (original) +++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java Wed Aug 28 19:32:00 2013 @@ -39,6 +39,10 @@ import org.apache.hadoop.hbase.io.HeapSi import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; + @InterfaceAudience.Public @InterfaceStability.Evolving public abstract class Mutation extends OperationWithAttributes implements Row, CellScannable, @@ -57,8 +61,10 @@ public abstract class Mutation extends O // familyMap ClassSize.TREEMAP); - // Attribute used in Mutations to indicate the originating cluster. - private static final String CLUSTER_ID_ATTR = "_c.id_"; + /** + * The attribute for storing the list of clusters that have consumed the change. + */ + private static final String CONSUMED_CLUSTER_IDS = "_cs.id"; protected byte [] row = null; protected long ts = HConstants.LATEST_TIMESTAMP; @@ -225,26 +231,33 @@ public abstract class Mutation extends O } /** - * Set the replication custer id. - * @param clusterId + * Marks that the clusters with the given clusterIds have consumed the mutation + * @param clusterIds of the clusters that have consumed the mutation */ - public void setClusterId(UUID clusterId) { - if (clusterId == null) return; - byte[] val = new byte[2*Bytes.SIZEOF_LONG]; - Bytes.putLong(val, 0, clusterId.getMostSignificantBits()); - Bytes.putLong(val, Bytes.SIZEOF_LONG, clusterId.getLeastSignificantBits()); - setAttribute(CLUSTER_ID_ATTR, val); + public void setClusterIds(List clusterIds) { + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + out.writeInt(clusterIds.size()); + for (UUID clusterId : clusterIds) { + out.writeLong(clusterId.getMostSignificantBits()); + out.writeLong(clusterId.getLeastSignificantBits()); + } + setAttribute(CONSUMED_CLUSTER_IDS, out.toByteArray()); } /** - * @return The replication cluster id. + * @return the set of clusterIds that have consumed the mutation */ - public UUID getClusterId() { - byte[] attr = getAttribute(CLUSTER_ID_ATTR); - if (attr == null) { - return HConstants.DEFAULT_CLUSTER_ID; + public List getClusterIds() { + List clusterIds = new ArrayList(); + byte[] bytes = getAttribute(CONSUMED_CLUSTER_IDS); + if(bytes != null) { + ByteArrayDataInput in = ByteStreams.newDataInput(bytes); + int numClusters = in.readInt(); + for(int i=0; ioptional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+     *
+     *This parameter is deprecated in favor of clusters which 
+     *contains the list of clusters that have consumed the change.
+     *It is retained so that the log created by earlier releases (0.94) 
+     *can be read by the newer releases.
+     * 
*/ - boolean hasClusterId(); + @java.lang.Deprecated boolean hasClusterId(); /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+     *
+     *This parameter is deprecated in favor of clusters which 
+     *contains the list of clusters that have consumed the change.
+     *It is retained so that the log created by earlier releases (0.94) 
+     *can be read by the newer releases.
+     * 
*/ - org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId(); + @java.lang.Deprecated org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId(); /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+     *
+     *This parameter is deprecated in favor of clusters which 
+     *contains the list of clusters that have consumed the change.
+     *It is retained so that the log created by earlier releases (0.94) 
+     *can be read by the newer releases.
+     * 
*/ - org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder(); + @java.lang.Deprecated org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder(); // repeated .FamilyScope scopes = 6; /** @@ -609,30 +633,67 @@ public final class WALProtos { // optional uint32 following_kv_count = 7; /** * optional uint32 following_kv_count = 7; + */ + boolean hasFollowingKvCount(); + /** + * optional uint32 following_kv_count = 7; + */ + int getFollowingKvCount(); + + // repeated .UUID cluster_ids = 8; + /** + * repeated .UUID cluster_ids = 8; + * + *
+     *
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * 
+ */ + java.util.List + getClusterIdsList(); + /** + * repeated .UUID cluster_ids = 8; * *
      *
-     *optional CustomEntryType custom_entry_type = 8;
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * 
+ */ + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterIds(int index); + /** + * repeated .UUID cluster_ids = 8; + * + *
      *
-     *enum CustomEntryType {
-     *COMPACTION = 0;
-     *}
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
      * 
*/ - boolean hasFollowingKvCount(); + int getClusterIdsCount(); /** - * optional uint32 following_kv_count = 7; + * repeated .UUID cluster_ids = 8; * *
      *
-     *optional CustomEntryType custom_entry_type = 8;
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * 
+ */ + java.util.List + getClusterIdsOrBuilderList(); + /** + * repeated .UUID cluster_ids = 8; + * + *
      *
-     *enum CustomEntryType {
-     *COMPACTION = 0;
-     *}
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
      * 
*/ - int getFollowingKvCount(); + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdsOrBuilder( + int index); } /** * Protobuf type {@code WALKey} @@ -735,6 +796,14 @@ public final class WALProtos { followingKvCount_ = input.readUInt32(); break; } + case 66: { + if (!((mutable_bitField0_ & 0x00000080) == 0x00000080)) { + clusterIds_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000080; + } + clusterIds_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.PARSER, extensionRegistry)); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -746,6 +815,9 @@ public final class WALProtos { if (((mutable_bitField0_ & 0x00000020) == 0x00000020)) { scopes_ = java.util.Collections.unmodifiableList(scopes_); } + if (((mutable_bitField0_ & 0x00000080) == 0x00000080)) { + clusterIds_ = java.util.Collections.unmodifiableList(clusterIds_); + } this.unknownFields = unknownFields.build(); makeExtensionsImmutable(); } @@ -842,25 +914,49 @@ public final class WALProtos { return writeTime_; } - // optional .UUID cluster_id = 5; + // optional .UUID cluster_id = 5 [deprecated = true]; public static final int CLUSTER_ID_FIELD_NUMBER = 5; private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID clusterId_; /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+     *
+     *This parameter is deprecated in favor of clusters which 
+     *contains the list of clusters that have consumed the change.
+     *It is retained so that the log created by earlier releases (0.94) 
+     *can be read by the newer releases.
+     * 
*/ - public boolean hasClusterId() { + @java.lang.Deprecated public boolean hasClusterId() { return ((bitField0_ & 0x00000010) == 0x00000010); } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+     *
+     *This parameter is deprecated in favor of clusters which 
+     *contains the list of clusters that have consumed the change.
+     *It is retained so that the log created by earlier releases (0.94) 
+     *can be read by the newer releases.
+     * 
*/ - public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId() { + @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId() { return clusterId_; } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+     *
+     *This parameter is deprecated in favor of clusters which 
+     *contains the list of clusters that have consumed the change.
+     *It is retained so that the log created by earlier releases (0.94) 
+     *can be read by the newer releases.
+     * 
*/ - public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder() { + @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder() { return clusterId_; } @@ -905,33 +1001,81 @@ public final class WALProtos { private int followingKvCount_; /** * optional uint32 following_kv_count = 7; + */ + public boolean hasFollowingKvCount() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional uint32 following_kv_count = 7; + */ + public int getFollowingKvCount() { + return followingKvCount_; + } + + // repeated .UUID cluster_ids = 8; + public static final int CLUSTER_IDS_FIELD_NUMBER = 8; + private java.util.List clusterIds_; + /** + * repeated .UUID cluster_ids = 8; * *
      *
-     *optional CustomEntryType custom_entry_type = 8;
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * 
+ */ + public java.util.List getClusterIdsList() { + return clusterIds_; + } + /** + * repeated .UUID cluster_ids = 8; * - *enum CustomEntryType { - *COMPACTION = 0; - *} + *
+     *
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
      * 
*/ - public boolean hasFollowingKvCount() { - return ((bitField0_ & 0x00000020) == 0x00000020); + public java.util.List + getClusterIdsOrBuilderList() { + return clusterIds_; } /** - * optional uint32 following_kv_count = 7; + * repeated .UUID cluster_ids = 8; * *
      *
-     *optional CustomEntryType custom_entry_type = 8;
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * 
+ */ + public int getClusterIdsCount() { + return clusterIds_.size(); + } + /** + * repeated .UUID cluster_ids = 8; + * + *
      *
-     *enum CustomEntryType {
-     *COMPACTION = 0;
-     *}
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
      * 
*/ - public int getFollowingKvCount() { - return followingKvCount_; + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterIds(int index) { + return clusterIds_.get(index); + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+     *
+     *cluster_ids field contains the list of clusters that have
+     *consumed the change
+     * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdsOrBuilder( + int index) { + return clusterIds_.get(index); } private void initFields() { @@ -942,6 +1086,7 @@ public final class WALProtos { clusterId_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance(); scopes_ = java.util.Collections.emptyList(); followingKvCount_ = 0; + clusterIds_ = java.util.Collections.emptyList(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -976,6 +1121,12 @@ public final class WALProtos { return false; } } + for (int i = 0; i < getClusterIdsCount(); i++) { + if (!getClusterIds(i).isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } memoizedIsInitialized = 1; return true; } @@ -1004,6 +1155,9 @@ public final class WALProtos { if (((bitField0_ & 0x00000020) == 0x00000020)) { output.writeUInt32(7, followingKvCount_); } + for (int i = 0; i < clusterIds_.size(); i++) { + output.writeMessage(8, clusterIds_.get(i)); + } getUnknownFields().writeTo(output); } @@ -1041,6 +1195,10 @@ public final class WALProtos { size += com.google.protobuf.CodedOutputStream .computeUInt32Size(7, followingKvCount_); } + for (int i = 0; i < clusterIds_.size(); i++) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(8, clusterIds_.get(i)); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1096,6 +1254,8 @@ public final class WALProtos { result = result && (getFollowingKvCount() == other.getFollowingKvCount()); } + result = result && getClusterIdsList() + .equals(other.getClusterIdsList()); result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -1137,6 +1297,10 @@ public final class WALProtos { hash = (37 * hash) + FOLLOWING_KV_COUNT_FIELD_NUMBER; hash = (53 * hash) + getFollowingKvCount(); } + if (getClusterIdsCount() > 0) { + hash = (37 * hash) + CLUSTER_IDS_FIELD_NUMBER; + hash = (53 * hash) + getClusterIdsList().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -1244,6 +1408,7 @@ public final class WALProtos { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { getClusterIdFieldBuilder(); getScopesFieldBuilder(); + getClusterIdsFieldBuilder(); } } private static Builder create() { @@ -1274,6 +1439,12 @@ public final class WALProtos { } followingKvCount_ = 0; bitField0_ = (bitField0_ & ~0x00000040); + if (clusterIdsBuilder_ == null) { + clusterIds_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000080); + } else { + clusterIdsBuilder_.clear(); + } return this; } @@ -1339,6 +1510,15 @@ public final class WALProtos { to_bitField0_ |= 0x00000020; } result.followingKvCount_ = followingKvCount_; + if (clusterIdsBuilder_ == null) { + if (((bitField0_ & 0x00000080) == 0x00000080)) { + clusterIds_ = java.util.Collections.unmodifiableList(clusterIds_); + bitField0_ = (bitField0_ & ~0x00000080); + } + result.clusterIds_ = clusterIds_; + } else { + result.clusterIds_ = clusterIdsBuilder_.build(); + } result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1399,6 +1579,32 @@ public final class WALProtos { if (other.hasFollowingKvCount()) { setFollowingKvCount(other.getFollowingKvCount()); } + if (clusterIdsBuilder_ == null) { + if (!other.clusterIds_.isEmpty()) { + if (clusterIds_.isEmpty()) { + clusterIds_ = other.clusterIds_; + bitField0_ = (bitField0_ & ~0x00000080); + } else { + ensureClusterIdsIsMutable(); + clusterIds_.addAll(other.clusterIds_); + } + onChanged(); + } + } else { + if (!other.clusterIds_.isEmpty()) { + if (clusterIdsBuilder_.isEmpty()) { + clusterIdsBuilder_.dispose(); + clusterIdsBuilder_ = null; + clusterIds_ = other.clusterIds_; + bitField0_ = (bitField0_ & ~0x00000080); + clusterIdsBuilder_ = + com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ? + getClusterIdsFieldBuilder() : null; + } else { + clusterIdsBuilder_.addAllMessages(other.clusterIds_); + } + } + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1432,6 +1638,12 @@ public final class WALProtos { return false; } } + for (int i = 0; i < getClusterIdsCount(); i++) { + if (!getClusterIds(i).isInitialized()) { + + return false; + } + } return true; } @@ -1592,20 +1804,36 @@ public final class WALProtos { return this; } - // optional .UUID cluster_id = 5; + // optional .UUID cluster_id = 5 [deprecated = true]; private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID clusterId_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance(); private com.google.protobuf.SingleFieldBuilder< org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder> clusterIdBuilder_; /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * 
*/ - public boolean hasClusterId() { + @java.lang.Deprecated public boolean hasClusterId() { return ((bitField0_ & 0x00000010) == 0x00000010); } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * 
*/ - public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId() { + @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterId() { if (clusterIdBuilder_ == null) { return clusterId_; } else { @@ -1613,9 +1841,17 @@ public final class WALProtos { } } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * 
*/ - public Builder setClusterId(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) { + @java.lang.Deprecated public Builder setClusterId(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) { if (clusterIdBuilder_ == null) { if (value == null) { throw new NullPointerException(); @@ -1629,9 +1865,17 @@ public final class WALProtos { return this; } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * 
*/ - public Builder setClusterId( + @java.lang.Deprecated public Builder setClusterId( org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder builderForValue) { if (clusterIdBuilder_ == null) { clusterId_ = builderForValue.build(); @@ -1643,9 +1887,17 @@ public final class WALProtos { return this; } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * 
*/ - public Builder mergeClusterId(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) { + @java.lang.Deprecated public Builder mergeClusterId(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) { if (clusterIdBuilder_ == null) { if (((bitField0_ & 0x00000010) == 0x00000010) && clusterId_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance()) { @@ -1662,9 +1914,17 @@ public final class WALProtos { return this; } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * 
*/ - public Builder clearClusterId() { + @java.lang.Deprecated public Builder clearClusterId() { if (clusterIdBuilder_ == null) { clusterId_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance(); onChanged(); @@ -1675,17 +1935,33 @@ public final class WALProtos { return this; } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * 
*/ - public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder getClusterIdBuilder() { + @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder getClusterIdBuilder() { bitField0_ |= 0x00000010; onChanged(); return getClusterIdFieldBuilder().getBuilder(); } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * 
*/ - public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder() { + @java.lang.Deprecated public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdOrBuilder() { if (clusterIdBuilder_ != null) { return clusterIdBuilder_.getMessageOrBuilder(); } else { @@ -1693,7 +1969,15 @@ public final class WALProtos { } } /** - * optional .UUID cluster_id = 5; + * optional .UUID cluster_id = 5 [deprecated = true]; + * + *
+       *
+       *This parameter is deprecated in favor of clusters which 
+       *contains the list of clusters that have consumed the change.
+       *It is retained so that the log created by earlier releases (0.94) 
+       *can be read by the newer releases.
+       * 
*/ private com.google.protobuf.SingleFieldBuilder< org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder> @@ -1953,70 +2237,382 @@ public final class WALProtos { private int followingKvCount_ ; /** * optional uint32 following_kv_count = 7; + */ + public boolean hasFollowingKvCount() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional uint32 following_kv_count = 7; + */ + public int getFollowingKvCount() { + return followingKvCount_; + } + /** + * optional uint32 following_kv_count = 7; + */ + public Builder setFollowingKvCount(int value) { + bitField0_ |= 0x00000040; + followingKvCount_ = value; + onChanged(); + return this; + } + /** + * optional uint32 following_kv_count = 7; + */ + public Builder clearFollowingKvCount() { + bitField0_ = (bitField0_ & ~0x00000040); + followingKvCount_ = 0; + onChanged(); + return this; + } + + // repeated .UUID cluster_ids = 8; + private java.util.List clusterIds_ = + java.util.Collections.emptyList(); + private void ensureClusterIdsIsMutable() { + if (!((bitField0_ & 0x00000080) == 0x00000080)) { + clusterIds_ = new java.util.ArrayList(clusterIds_); + bitField0_ |= 0x00000080; + } + } + + private com.google.protobuf.RepeatedFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder> clusterIdsBuilder_; + + /** + * repeated .UUID cluster_ids = 8; * *
        *
-       *optional CustomEntryType custom_entry_type = 8;
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public java.util.List getClusterIdsList() { + if (clusterIdsBuilder_ == null) { + return java.util.Collections.unmodifiableList(clusterIds_); + } else { + return clusterIdsBuilder_.getMessageList(); + } + } + /** + * repeated .UUID cluster_ids = 8; + * + *
        *
-       *enum CustomEntryType {
-       *COMPACTION = 0;
-       *}
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
        * 
*/ - public boolean hasFollowingKvCount() { - return ((bitField0_ & 0x00000040) == 0x00000040); + public int getClusterIdsCount() { + if (clusterIdsBuilder_ == null) { + return clusterIds_.size(); + } else { + return clusterIdsBuilder_.getCount(); + } } /** - * optional uint32 following_kv_count = 7; + * repeated .UUID cluster_ids = 8; * *
        *
-       *optional CustomEntryType custom_entry_type = 8;
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID getClusterIds(int index) { + if (clusterIdsBuilder_ == null) { + return clusterIds_.get(index); + } else { + return clusterIdsBuilder_.getMessage(index); + } + } + /** + * repeated .UUID cluster_ids = 8; * - *enum CustomEntryType { - *COMPACTION = 0; - *} + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
        * 
*/ - public int getFollowingKvCount() { - return followingKvCount_; + public Builder setClusterIds( + int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) { + if (clusterIdsBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureClusterIdsIsMutable(); + clusterIds_.set(index, value); + onChanged(); + } else { + clusterIdsBuilder_.setMessage(index, value); + } + return this; } /** - * optional uint32 following_kv_count = 7; + * repeated .UUID cluster_ids = 8; * *
        *
-       *optional CustomEntryType custom_entry_type = 8;
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public Builder setClusterIds( + int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder builderForValue) { + if (clusterIdsBuilder_ == null) { + ensureClusterIdsIsMutable(); + clusterIds_.set(index, builderForValue.build()); + onChanged(); + } else { + clusterIdsBuilder_.setMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .UUID cluster_ids = 8; * - *enum CustomEntryType { - *COMPACTION = 0; - *} + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
        * 
*/ - public Builder setFollowingKvCount(int value) { - bitField0_ |= 0x00000040; - followingKvCount_ = value; - onChanged(); + public Builder addClusterIds(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) { + if (clusterIdsBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureClusterIdsIsMutable(); + clusterIds_.add(value); + onChanged(); + } else { + clusterIdsBuilder_.addMessage(value); + } return this; } /** - * optional uint32 following_kv_count = 7; + * repeated .UUID cluster_ids = 8; * *
        *
-       *optional CustomEntryType custom_entry_type = 8;
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public Builder addClusterIds( + int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID value) { + if (clusterIdsBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureClusterIdsIsMutable(); + clusterIds_.add(index, value); + onChanged(); + } else { + clusterIdsBuilder_.addMessage(index, value); + } + return this; + } + /** + * repeated .UUID cluster_ids = 8; + * + *
        *
-       *enum CustomEntryType {
-       *COMPACTION = 0;
-       *}
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
        * 
*/ - public Builder clearFollowingKvCount() { - bitField0_ = (bitField0_ & ~0x00000040); - followingKvCount_ = 0; - onChanged(); + public Builder addClusterIds( + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder builderForValue) { + if (clusterIdsBuilder_ == null) { + ensureClusterIdsIsMutable(); + clusterIds_.add(builderForValue.build()); + onChanged(); + } else { + clusterIdsBuilder_.addMessage(builderForValue.build()); + } + return this; + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public Builder addClusterIds( + int index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder builderForValue) { + if (clusterIdsBuilder_ == null) { + ensureClusterIdsIsMutable(); + clusterIds_.add(index, builderForValue.build()); + onChanged(); + } else { + clusterIdsBuilder_.addMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public Builder addAllClusterIds( + java.lang.Iterable values) { + if (clusterIdsBuilder_ == null) { + ensureClusterIdsIsMutable(); + super.addAll(values, clusterIds_); + onChanged(); + } else { + clusterIdsBuilder_.addAllMessages(values); + } + return this; + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public Builder clearClusterIds() { + if (clusterIdsBuilder_ == null) { + clusterIds_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000080); + onChanged(); + } else { + clusterIdsBuilder_.clear(); + } return this; } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public Builder removeClusterIds(int index) { + if (clusterIdsBuilder_ == null) { + ensureClusterIdsIsMutable(); + clusterIds_.remove(index); + onChanged(); + } else { + clusterIdsBuilder_.remove(index); + } + return this; + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder getClusterIdsBuilder( + int index) { + return getClusterIdsFieldBuilder().getBuilder(index); + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder getClusterIdsOrBuilder( + int index) { + if (clusterIdsBuilder_ == null) { + return clusterIds_.get(index); } else { + return clusterIdsBuilder_.getMessageOrBuilder(index); + } + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public java.util.List + getClusterIdsOrBuilderList() { + if (clusterIdsBuilder_ != null) { + return clusterIdsBuilder_.getMessageOrBuilderList(); + } else { + return java.util.Collections.unmodifiableList(clusterIds_); + } + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder addClusterIdsBuilder() { + return getClusterIdsFieldBuilder().addBuilder( + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance()); + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder addClusterIdsBuilder( + int index) { + return getClusterIdsFieldBuilder().addBuilder( + index, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.getDefaultInstance()); + } + /** + * repeated .UUID cluster_ids = 8; + * + *
+       *
+       *cluster_ids field contains the list of clusters that have
+       *consumed the change
+       * 
+ */ + public java.util.List + getClusterIdsBuilderList() { + return getClusterIdsFieldBuilder().getBuilderList(); + } + private com.google.protobuf.RepeatedFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder> + getClusterIdsFieldBuilder() { + if (clusterIdsBuilder_ == null) { + clusterIdsBuilder_ = new com.google.protobuf.RepeatedFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUIDOrBuilder>( + clusterIds_, + ((bitField0_ & 0x00000080) == 0x00000080), + getParentForChildren(), + isClean()); + clusterIds_ = null; + } + return clusterIdsBuilder_; + } // @@protoc_insertion_point(builder_scope:WALKey) } @@ -4216,22 +4812,22 @@ public final class WALProtos { static { java.lang.String[] descriptorData = { "\n\tWAL.proto\032\013hbase.proto\"$\n\tWALHeader\022\027\n" + - "\017has_compression\030\001 \001(\010\"\277\001\n\006WALKey\022\033\n\023enc" + + "\017has_compression\030\001 \001(\010\"\337\001\n\006WALKey\022\033\n\023enc" + "oded_region_name\030\001 \002(\014\022\022\n\ntable_name\030\002 \002" + "(\014\022\033\n\023log_sequence_number\030\003 \002(\004\022\022\n\nwrite" + - "_time\030\004 \002(\004\022\031\n\ncluster_id\030\005 \001(\0132\005.UUID\022\034" + - "\n\006scopes\030\006 \003(\0132\014.FamilyScope\022\032\n\022followin" + - "g_kv_count\030\007 \001(\r\"=\n\013FamilyScope\022\016\n\006famil" + - "y\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"" + - "\251\001\n\024CompactionDescriptor\022\022\n\ntable_name\030\001" + - " \002(\014\022\033\n\023encoded_region_name\030\002 \002(\014\022\023\n\013fam", - "ily_name\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t" + - "\022\031\n\021compaction_output\030\005 \003(\t\022\026\n\016store_hom" + - "e_dir\030\006 \002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033" + - "\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATI" + - "ON_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoop.h" + - "base.protobuf.generatedB\tWALProtosH\001\210\001\000\240" + - "\001\001" + "_time\030\004 \002(\004\022\035\n\ncluster_id\030\005 \001(\0132\005.UUIDB\002" + + "\030\001\022\034\n\006scopes\030\006 \003(\0132\014.FamilyScope\022\032\n\022foll" + + "owing_kv_count\030\007 \001(\r\022\032\n\013cluster_ids\030\010 \003(" + + "\0132\005.UUID\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022" + + "\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"\251\001\n\024Comp" + + "actionDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023", + "encoded_region_name\030\002 \002(\014\022\023\n\013family_name" + + "\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021comp" + + "action_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 " + + "\002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLIC" + + "ATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE" + + "_GLOBAL\020\001B?\n*org.apache.hadoop.hbase.pro" + + "tobuf.generatedB\tWALProtosH\001\210\001\000\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -4249,7 +4845,7 @@ public final class WALProtos { internal_static_WALKey_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_WALKey_descriptor, - new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", }); + new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", "ClusterIds", }); internal_static_FamilyScope_descriptor = getDescriptor().getMessageTypes().get(2); internal_static_FamilyScope_fieldAccessorTable = new Modified: hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto?rev=1518335&r1=1518334&r2=1518335&view=diff ============================================================================== --- hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto (original) +++ hbase/trunk/hbase-protocol/src/main/protobuf/WAL.proto Wed Aug 28 19:32:00 2013 @@ -33,13 +33,24 @@ message WALKey { required bytes table_name = 2; required uint64 log_sequence_number = 3; required uint64 write_time = 4; - optional UUID cluster_id = 5; + /* + This parameter is deprecated in favor of clusters which + contains the list of clusters that have consumed the change. + It is retained so that the log created by earlier releases (0.94) + can be read by the newer releases. + */ + optional UUID cluster_id = 5 [deprecated=true]; repeated FamilyScope scopes = 6; optional uint32 following_kv_count = 7; + /* + This field contains the list of clusters that have + consumed the change + */ + repeated UUID cluster_ids = 8; /* - optional CustomEntryType custom_entry_type = 8; - + optional CustomEntryType custom_entry_type = 9; + enum CustomEntryType { COMPACTION = 0; } Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1518335&r1=1518334&r2=1518335&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Wed Aug 28 19:32:00 2013 @@ -22,6 +22,7 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -114,7 +115,7 @@ public class Import { static class Importer extends TableMapper { private Map cfRenameMap; - private UUID clusterId; + private List clusterIds; /** * @param row The current table row key. @@ -159,11 +160,11 @@ public class Import { } } if (put != null) { - put.setClusterId(clusterId); + put.setClusterIds(clusterIds); context.write(key, put); } if (delete != null) { - delete.setClusterId(clusterId); + delete.setClusterIds(clusterIds); context.write(key, delete); } } @@ -177,7 +178,7 @@ public class Import { ZooKeeperWatcher zkw = null; try { zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null); - clusterId = ZKClusterId.getUUIDForCluster(zkw); + clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw)); } catch (ZooKeeperConnectionException e) { LOG.error("Problem connecting to ZooKeper during task setup", e); } catch (KeeperException e) { Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java?rev=1518335&r1=1518334&r2=1518335&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java Wed Aug 28 19:32:00 2013 @@ -115,6 +115,7 @@ public class ReplicationProtbufUtil { AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder(); AdminProtos.ReplicateWALEntryRequest.Builder builder = AdminProtos.ReplicateWALEntryRequest.newBuilder(); + HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder(); for (HLog.Entry entry: entries) { entryBuilder.clear(); WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder(); @@ -124,11 +125,10 @@ public class ReplicationProtbufUtil { keyBuilder.setTableName(ByteString.copyFrom(key.getTablename().getName())); keyBuilder.setLogSequenceNumber(key.getLogSeqNum()); keyBuilder.setWriteTime(key.getWriteTime()); - UUID clusterId = key.getClusterId(); - if (clusterId != null) { - HBaseProtos.UUID.Builder uuidBuilder = keyBuilder.getClusterIdBuilder(); + for(UUID clusterId : key.getClusterIds()) { uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits()); uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits()); + keyBuilder.addClusterIds(uuidBuilder.build()); } WALEdit edit = entry.getEdit(); NavigableMap scopes = key.getScopes(); Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java?rev=1518335&r1=1518334&r2=1518335&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java Wed Aug 28 19:32:00 2013 @@ -18,9 +18,10 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -41,8 +42,8 @@ implements RowProcessor { } @Override - public UUID getClusterId() { - return HConstants.DEFAULT_CLUSTER_ID; + public List getClusterIds() { + return new ArrayList(); } @Override Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1518335&r1=1518334&r2=1518335&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Aug 28 19:32:00 2013 @@ -1779,15 +1779,13 @@ public class HRegion implements HeapSize /** * This is used only by unit tests. Not required to be a public API. * @param familyMap map of family to edits for the given family. - * @param clusterId * @param durability * @throws IOException */ - void delete(NavigableMap> familyMap, UUID clusterId, + void delete(NavigableMap> familyMap, Durability durability) throws IOException { Delete delete = new Delete(FOR_UNIT_TESTS_ONLY); delete.setFamilyMap(familyMap); - delete.setClusterId(clusterId); delete.setDurability(durability); doBatchMutate(delete); } @@ -2206,7 +2204,7 @@ public class HRegion implements HeapSize Mutation mutation = batchOp.operations[firstIndex]; if (walEdit.size() > 0) { txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), - walEdit, mutation.getClusterId(), now, this.htableDescriptor); + walEdit, mutation.getClusterIds(), now, this.htableDescriptor); } // ------------------------------- @@ -2598,7 +2596,6 @@ public class HRegion implements HeapSize familyMap.put(family, edits); Put p = new Put(row); p.setFamilyMap(familyMap); - p.setClusterId(HConstants.DEFAULT_CLUSTER_ID); doBatchMutate(p); } @@ -4534,7 +4531,7 @@ public class HRegion implements HeapSize if (!walEdit.isEmpty()) { txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), walEdit, - processor.getClusterId(), now, this.htableDescriptor); + processor.getClusterIds(), now, this.htableDescriptor); } // 8. Release region lock if (locked) { @@ -4761,7 +4758,7 @@ public class HRegion implements HeapSize // cluster. A slave cluster receives the final value (not the delta) // as a Put. txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), - walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), + walEdits, new ArrayList(), EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor); } else { recordMutationWithoutWal(append.getFamilyCellMap()); @@ -4911,7 +4908,7 @@ public class HRegion implements HeapSize // cluster. A slave cluster receives the final value (not the delta) // as a Put. txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), - walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), + walEdits, new ArrayList(), EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor); } else { recordMutationWithoutWal(increment.getFamilyCellMap()); Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java?rev=1518335&r1=1518334&r2=1518335&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java Wed Aug 28 19:32:00 2013 @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import com.google.protobuf.ByteString; import com.google.protobuf.Message; @InterfaceAudience.Public @@ -107,9 +106,9 @@ public interface RowProcessor getClusterIds(); /** * Human readable name of the processor Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1518335&r1=1518334&r2=1518335&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Wed Aug 28 19:32:00 2013 @@ -27,6 +27,7 @@ import java.net.URLEncoder; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -821,12 +822,12 @@ class FSHLog implements HLog, Syncable { * @param encodedRegionName Encoded name of the region as returned by * HRegionInfo#getEncodedNameAsBytes(). * @param tableName - * @param clusterId + * @param clusterIds that have consumed the change * @return New log key. */ protected HLogKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum, - long now, UUID clusterId) { - return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterId); + long now, List clusterIds) { + return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds); } @Override @@ -839,7 +840,7 @@ class FSHLog implements HLog, Syncable { @Override public void append(HRegionInfo info, TableName tableName, WALEdit edits, final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException { - append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd, true, isInMemstore); + append(info, tableName, edits, new ArrayList(), now, htd, true, isInMemstore); } /** @@ -862,15 +863,16 @@ class FSHLog implements HLog, Syncable { * @param info * @param tableName * @param edits - * @param clusterId The originating clusterId for this edit (for replication) + * @param clusterIds that have consumed the change (for replication) * @param now * @param doSync shall we sync? * @return txid of this transaction * @throws IOException */ @SuppressWarnings("deprecation") - private long append(HRegionInfo info, TableName tableName, WALEdit edits, UUID clusterId, - final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore) + private long append(HRegionInfo info, TableName tableName, WALEdit edits, + List clusterIds, final long now, HTableDescriptor htd, boolean doSync, + boolean isInMemstore) throws IOException { if (edits.isEmpty()) return this.unflushedEntries.get(); if (this.closed) { @@ -890,7 +892,7 @@ class FSHLog implements HLog, Syncable { // actual name. byte [] encodedRegionName = info.getEncodedNameAsBytes(); if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum); - HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId); + HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterIds); doWrite(info, logKey, edits, htd); this.numEntries.incrementAndGet(); txid = this.unflushedEntries.incrementAndGet(); @@ -914,9 +916,9 @@ class FSHLog implements HLog, Syncable { @Override public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, - UUID clusterId, final long now, HTableDescriptor htd) + List clusterIds, final long now, HTableDescriptor htd) throws IOException { - return append(info, tableName, edits, clusterId, now, htd, false, true); + return append(info, tableName, edits, clusterIds, now, htd, false, true); } /** Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1518335&r1=1518334&r2=1518335&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Wed Aug 28 19:32:00 2013 @@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.regionse import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.regex.Pattern; @@ -264,7 +266,7 @@ public interface HLog { void closeAndDelete() throws IOException; /** - * Same as {@link #appendNoSync(HRegionInfo, TableName, WALEdit, UUID, long, HTableDescriptor)}, + * Same as {@link #appendNoSync(HRegionInfo, TableName, WALEdit, List, long, HTableDescriptor)}, * except it causes a sync on the log */ public void append(HRegionInfo info, TableName tableName, WALEdit edits, @@ -285,23 +287,20 @@ public interface HLog { final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException; /** - * Append a set of edits to the log. Log edits are keyed by (encoded) - * regionName, rowname, and log-sequence-id. The HLog is not flushed after - * this transaction is written to the log. - * + * Append a set of edits to the log. Log edits are keyed by (encoded) regionName, rowname, and + * log-sequence-id. The HLog is not flushed after this transaction is written to the log. * @param info * @param tableName * @param edits - * @param clusterId - * The originating clusterId for this edit (for replication) + * @param clusterIds The clusters that have consumed the change (for replication) * @param now * @param htd * @return txid of this transaction * @throws IOException */ - public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, - UUID clusterId, final long now, HTableDescriptor htd) throws IOException; - + public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, + List clusterIds, final long now, HTableDescriptor htd) throws IOException; + void hsync() throws IOException; void hflush() throws IOException; Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java?rev=1518335&r1=1518334&r2=1518335&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java Wed Aug 28 19:32:00 2013 @@ -22,7 +22,11 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.EOFException; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; @@ -93,6 +97,13 @@ public class HLogKey implements Writable } } + /* + * This is used for reading the log entries created by the previous releases + * (0.94.11) which write the clusters information to the scopes of WALEdit. + */ + private static final String PREFIX_CLUSTER_KEY = "."; + + private static final Version VERSION = Version.COMPRESSED; // The encoded region name. @@ -102,15 +113,23 @@ public class HLogKey implements Writable // Time at which this edit was written. private long writeTime; - private UUID clusterId; - + // The first element in the list is the cluster id on which the change has originated + private List clusterIds; + private NavigableMap scopes; private CompressionContext compressionContext; public HLogKey() { - this(null, null, 0L, HConstants.LATEST_TIMESTAMP, - HConstants.DEFAULT_CLUSTER_ID); + init(null, null, 0L, HConstants.LATEST_TIMESTAMP, + new ArrayList()); + } + + public HLogKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum, + final long now, UUID clusterId) { + List clusterIds = new ArrayList(); + clusterIds.add(clusterId); + init(encodedRegionName, tablename, logSeqNum, now, clusterIds); } /** @@ -123,13 +142,18 @@ public class HLogKey implements Writable * @param tablename - name of table * @param logSeqNum - log sequence number * @param now Time at which this edit was written. - * @param clusterId of the cluster (used in Replication) + * @param clusterIds the clusters that have consumed the change(used in Replication) */ public HLogKey(final byte [] encodedRegionName, final TableName tablename, - long logSeqNum, final long now, UUID clusterId) { + long logSeqNum, final long now, List clusterIds){ + init(encodedRegionName, tablename, logSeqNum, now, clusterIds); + } + + protected void init(final byte [] encodedRegionName, final TableName tablename, + long logSeqNum, final long now, List clusterIds) { this.logSeqNum = logSeqNum; this.writeTime = now; - this.clusterId = clusterId; + this.clusterIds = clusterIds; this.encodedRegionName = encodedRegionName; this.tablename = tablename; } @@ -171,14 +195,6 @@ public class HLogKey implements Writable return this.writeTime; } - /** - * Get the id of the original cluster - * @return Cluster id. - */ - public UUID getClusterId() { - return clusterId; - } - public NavigableMap getScopes() { return scopes; } @@ -187,12 +203,47 @@ public class HLogKey implements Writable this.scopes = scopes; } + public void readOlderScopes(NavigableMap scopes) { + if (scopes != null) { + Iterator> iterator = scopes.entrySet() + .iterator(); + while (iterator.hasNext()) { + Map.Entry scope = iterator.next(); + String key = Bytes.toString(scope.getKey()); + if (key.startsWith(PREFIX_CLUSTER_KEY)) { + addClusterId(UUID.fromString(key.substring(PREFIX_CLUSTER_KEY + .length()))); + iterator.remove(); + } + } + if (scopes.size() > 0) { + this.scopes = scopes; + } + } + } + + /** + * Marks that the cluster with the given clusterId has consumed the change + */ + public void addClusterId(UUID clusterId) { + if (!clusterIds.contains(clusterId)) { + clusterIds.add(clusterId); + } + } + + /** + * @return the set of cluster Ids that have consumed the change + */ + public List getClusterIds() { + return clusterIds; + } + /** - * Set the cluster id of this key. - * @param clusterId + * @return the cluster id on which the change has originated. It there is no such cluster, it + * returns DEFAULT_CLUSTER_ID (cases where replication is not enabled) */ - public void setClusterId(UUID clusterId) { - this.clusterId = clusterId; + public UUID getOriginatingClusterId(){ + return clusterIds.isEmpty() ? HConstants.DEFAULT_CLUSTER_ID : clusterIds.get(0); } @Override @@ -232,7 +283,6 @@ public class HLogKey implements Writable int result = Bytes.hashCode(this.encodedRegionName); result ^= this.logSeqNum; result ^= this.writeTime; - result ^= this.clusterId.hashCode(); return result; } @@ -299,13 +349,16 @@ public class HLogKey implements Writable } out.writeLong(this.logSeqNum); out.writeLong(this.writeTime); - // avoid storing 16 bytes when replication is not enabled - if (this.clusterId == HConstants.DEFAULT_CLUSTER_ID) { - out.writeBoolean(false); - } else { + // Don't need to write the clusters information as we are using protobufs from 0.95 + // Writing only the first clusterId for testing the legacy read + Iterator iterator = clusterIds.iterator(); + if(iterator.hasNext()){ out.writeBoolean(true); - out.writeLong(this.clusterId.getMostSignificantBits()); - out.writeLong(this.clusterId.getLeastSignificantBits()); + UUID clusterId = iterator.next(); + out.writeLong(clusterId.getMostSignificantBits()); + out.writeLong(clusterId.getLeastSignificantBits()); + } else { + out.writeBoolean(false); } } @@ -344,10 +397,13 @@ public class HLogKey implements Writable this.logSeqNum = in.readLong(); this.writeTime = in.readLong(); - this.clusterId = HConstants.DEFAULT_CLUSTER_ID; + + this.clusterIds.clear(); if (version.atLeast(Version.INITIAL)) { if (in.readBoolean()) { - this.clusterId = new UUID(in.readLong(), in.readLong()); + // read the older log + // Definitely is the originating cluster + clusterIds.add(new UUID(in.readLong(), in.readLong())); } } else { try { @@ -357,6 +413,7 @@ public class HLogKey implements Writable // Means it's a very old key, just continue } } + // Do not need to read the clusters information as we are using protobufs from 0.95 } public WALKey.Builder getBuilder( @@ -373,10 +430,11 @@ public class HLogKey implements Writable } builder.setLogSequenceNumber(this.logSeqNum); builder.setWriteTime(writeTime); - if (this.clusterId != HConstants.DEFAULT_CLUSTER_ID) { - builder.setClusterId(HBaseProtos.UUID.newBuilder() - .setLeastSigBits(this.clusterId.getLeastSignificantBits()) - .setMostSigBits(this.clusterId.getMostSignificantBits())); + HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder(); + for (UUID clusterId : clusterIds) { + uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits()); + uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits()); + builder.addClusterIds(uuidBuilder.build()); } if (scopes != null) { for (Map.Entry e : scopes.entrySet()) { @@ -401,10 +459,15 @@ public class HLogKey implements Writable this.encodedRegionName = walKey.getEncodedRegionName().toByteArray(); this.tablename = TableName.valueOf(walKey.getTableName().toByteArray()); } - this.clusterId = HConstants.DEFAULT_CLUSTER_ID; + clusterIds.clear(); if (walKey.hasClusterId()) { - this.clusterId = new UUID( - walKey.getClusterId().getMostSigBits(), walKey.getClusterId().getLeastSigBits()); + //When we are reading the older log (0.95.1 release) + //This is definitely the originating cluster + clusterIds.add(new UUID(walKey.getClusterId().getMostSigBits(), walKey.getClusterId() + .getLeastSigBits())); + } + for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) { + clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits())); } this.scopes = null; if (walKey.getScopesCount() > 0) { Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1518335&r1=1518334&r2=1518335&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Wed Aug 28 19:32:00 2013 @@ -22,8 +22,6 @@ import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; import java.text.ParseException; import java.util.ArrayList; import java.util.Collections; @@ -37,7 +35,6 @@ import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; @@ -1484,11 +1481,11 @@ public class HLogSplitter { if (kv.isDelete()) { del = new Delete(kv.getRow()); - del.setClusterId(entry.getKey().getClusterId()); + del.setClusterIds(entry.getKey().getClusterIds()); preRow = del; } else { put = new Put(kv.getRow()); - put.setClusterId(entry.getKey().getClusterId()); + put.setClusterIds(entry.getKey().getClusterIds()); preRow = put; } preKey = loc.getHostnamePort() + KEY_DELIMITER + table; Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=1518335&r1=1518334&r2=1518335&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java Wed Aug 28 19:32:00 2013 @@ -217,7 +217,7 @@ public class SequenceFileLogReader exten // Scopes are probably in WAL edit, move to key NavigableMap scopes = e.getEdit().getAndRemoveScopes(); if (scopes != null) { - e.getKey().setScopes(scopes); + e.getKey().readOlderScopes(scopes); } return true; } catch (IOException ioe) { Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1518335&r1=1518334&r2=1518335&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Wed Aug 28 19:32:00 2013 @@ -116,13 +116,13 @@ public class ReplicationSink { long totalReplicated = 0; // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per // invocation of this method per table and cluster id. - Map>> rowMap = new TreeMap>>(); + Map, List>> rowMap = + new TreeMap, List>>(); for (WALEntry entry : entries) { TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); Cell previousCell = null; Mutation m = null; - java.util.UUID uuid = toUUID(entry.getKey().getClusterId()); int count = entry.getAssociatedCellCount(); for (int i = 0; i < count; i++) { // Throw index out of bounds if our cell count is off @@ -135,8 +135,12 @@ public class ReplicationSink { m = CellUtil.isDelete(cell)? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()): new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - m.setClusterId(uuid); - addToHashMultiMap(rowMap, table, uuid, m); + List clusterIds = new ArrayList(); + for(HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()){ + clusterIds.add(toUUID(clusterId)); + } + m.setClusterIds(clusterIds); + addToHashMultiMap(rowMap, table, clusterIds, m); } if (CellUtil.isDelete(cell)) { ((Delete)m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell)); @@ -147,7 +151,7 @@ public class ReplicationSink { } totalReplicated++; } - for (Entry>> entry : rowMap.entrySet()) { + for (Entry,List>> entry : rowMap.entrySet()) { batch(entry.getKey(), entry.getValue().values()); } int size = entries.size(); @@ -181,7 +185,7 @@ public class ReplicationSink { * @param key1 * @param key2 * @param value - * @return + * @return the list of values corresponding to key1 and key2 */ private List addToHashMultiMap(Map>> map, K1 key1, K2 key2, V value) { Map> innerMap = map.get(key1); Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1518335&r1=1518334&r2=1518335&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Wed Aug 28 19:32:00 2013 @@ -38,7 +38,6 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; @@ -55,10 +54,8 @@ import org.apache.hadoop.hbase.replicati import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; -import org.apache.zookeeper.KeeperException; /** * Class that handles the source of a replication stream. @@ -395,20 +392,15 @@ public class ReplicationSource extends T seenEntries++; // Remove all KVs that should not be replicated HLogKey logKey = entry.getKey(); - // don't replicate if the log entries originated in the peer - if (!logKey.getClusterId().equals(peerClusterId)) { + // don't replicate if the log entries have already been consumed by the cluster + if (!logKey.getClusterIds().contains(peerClusterId)) { removeNonReplicableEdits(entry); // Don't replicate catalog entries, if the WALEdit wasn't // containing anything to replicate and if we're currently not set to replicate if (!logKey.getTablename().equals(TableName.META_TABLE_NAME) && edit.size() != 0) { - // Only set the clusterId if is a local key. - // This ensures that the originator sets the cluster id - // and all replicas retain the initial cluster id. - // This is *only* place where a cluster id other than the default is set. - if (HConstants.DEFAULT_CLUSTER_ID == logKey.getClusterId()) { - logKey.setClusterId(this.clusterId); - } + //Mark that the current cluster has the change + logKey.addClusterId(clusterId); currentNbOperations += countDistinctRowKeys(edit); currentNbEntries++; currentSize += entry.getEdit().size(); @@ -817,4 +809,4 @@ public class ReplicationSource extends T ", currently replicating from: " + this.currentPath + " at position: " + position; } -} +} \ No newline at end of file Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java?rev=1518335&r1=1518334&r2=1518335&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotLogSplitter.java Wed Aug 28 19:32:00 2013 @@ -172,7 +172,7 @@ class SnapshotLogSplitter implements Clo // Append Entry key = new HLogKey(newRegionName, tableName, - key.getLogSeqNum(), key.getWriteTime(), key.getClusterId()); + key.getLogSeqNum(), key.getWriteTime(), key.getClusterIds()); writer.append(new HLog.Entry(key, entry.getEdit())); } } catch (IOException e) { Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1518335&r1=1518334&r2=1518335&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original) +++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Wed Aug 28 19:32:00 2013 @@ -1231,7 +1231,7 @@ public class TestHRegion extends HBaseTe NavigableMap> deleteMap = new TreeMap>(Bytes.BYTES_COMPARATOR); deleteMap.put(family, kvs); - region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, Durability.SYNC_WAL); + region.delete(deleteMap, Durability.SYNC_WAL); } catch (Exception e) { assertTrue("Family " +new String(family)+ " does not exist", false); } @@ -1243,7 +1243,7 @@ public class TestHRegion extends HBaseTe NavigableMap> deleteMap = new TreeMap>(Bytes.BYTES_COMPARATOR); deleteMap.put(family, kvs); - region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, Durability.SYNC_WAL); + region.delete(deleteMap, Durability.SYNC_WAL); } catch (Exception e) { ok = true; } @@ -1571,7 +1571,7 @@ public class TestHRegion extends HBaseTe NavigableMap> deleteMap = new TreeMap>(Bytes.BYTES_COMPARATOR); deleteMap.put(fam1, kvs); - region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, Durability.SYNC_WAL); + region.delete(deleteMap, Durability.SYNC_WAL); // extract the key values out the memstore: // This is kinda hacky, but better than nothing... @@ -3853,7 +3853,7 @@ public class TestHRegion extends HBaseTe //verify append called or not verify(log, expectAppend ? times(1) : never()) .appendNoSync((HRegionInfo)any(), eq(tableName), - (WALEdit)any(), (UUID)any(), anyLong(), (HTableDescriptor)any()); + (WALEdit)any(), (List)any(), anyLong(), (HTableDescriptor)any()); //verify sync called or not if (expectSync || expectSyncFromLogSyncer) { Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java?rev=1518335&r1=1518334&r2=1518335&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java (original) +++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java Wed Aug 28 19:32:00 2013 @@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,7 +36,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; @@ -103,7 +104,7 @@ public final class HLogPerformanceEvalua HRegionInfo hri = region.getRegionInfo(); if (this.noSync) { hlog.appendNoSync(hri, hri.getTableName(), walEdit, - HConstants.DEFAULT_CLUSTER_ID, now, htd); + new ArrayList(), now, htd); } else { hlog.append(hri, hri.getTableName(), walEdit, now, htd); }