cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [2/2] git commit: Merge branch 'cassandra-1.2' into trunk
Date Mon, 07 Jan 2013 10:14:45 GMT
Updated Branches:
  refs/heads/trunk f6df04dbd -> 94d76aa6c


Merge branch 'cassandra-1.2' into trunk

Conflicts:
	src/java/org/apache/cassandra/db/SliceFromReadCommand.java
	test/unit/org/apache/cassandra/service/RowResolverTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/94d76aa6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/94d76aa6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/94d76aa6

Branch: refs/heads/trunk
Commit: 94d76aa6c6a63d22fead4adb2df725af75274f7f
Parents: f6df04d 3d787b7
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Mon Jan 7 11:14:36 2013 +0100
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Mon Jan 7 11:14:36 2013 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 src/java/org/apache/cassandra/db/ReadCommand.java  |    4 +-
 .../apache/cassandra/db/SliceFromReadCommand.java  |    6 +-
 .../cassandra/service/AsyncRepairCallback.java     |    4 +-
 .../cassandra/service/DatacenterReadCallback.java  |   13 +-
 .../service/RangeSliceResponseResolver.java        |    4 +-
 .../org/apache/cassandra/service/ReadCallback.java |   26 ++-
 .../apache/cassandra/service/RepairCallback.java   |   86 -------
 .../apache/cassandra/service/RowDataResolver.java  |  181 +++++++++++++++
 .../cassandra/service/RowDigestResolver.java       |    2 +-
 .../cassandra/service/RowRepairResolver.java       |  181 ---------------
 .../org/apache/cassandra/service/StorageProxy.java |   16 +-
 .../apache/cassandra/service/RowResolverTest.java  |   14 +-
 13 files changed, 243 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/94d76aa6/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94d76aa6/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94d76aa6/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index 3a05ce0,8a08a42..523646d
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@@ -31,12 -29,10 +31,12 @@@ import org.apache.cassandra.db.filter.I
  import org.apache.cassandra.db.filter.QueryFilter;
  import org.apache.cassandra.db.filter.QueryPath;
  import org.apache.cassandra.db.filter.SliceQueryFilter;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.CompositeType;
  import org.apache.cassandra.io.IVersionedSerializer;
 +import org.apache.cassandra.net.MessagingService;
- import org.apache.cassandra.service.RepairCallback;
+ import org.apache.cassandra.service.RowDataResolver;
  import org.apache.cassandra.service.StorageService;
 -import org.apache.cassandra.thrift.ColumnParent;
  import org.apache.cassandra.utils.ByteBufferUtil;
  
  public class SliceFromReadCommand extends ReadCommand

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94d76aa6/src/java/org/apache/cassandra/service/RowDataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/RowDataResolver.java
index 0000000,5545293..e04d7c5
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/service/RowDataResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDataResolver.java
@@@ -1,0 -1,182 +1,181 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.service;
+ 
+ import java.io.IOException;
+ import java.net.InetAddress;
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.List;
+ 
+ import com.google.common.collect.Iterables;
+ 
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+ import org.apache.cassandra.db.filter.IDiskAtomFilter;
+ import org.apache.cassandra.db.filter.QueryFilter;
 -import org.apache.cassandra.db.filter.QueryPath;
+ import org.apache.cassandra.db.filter.SliceQueryFilter;
+ import org.apache.cassandra.net.IAsyncResult;
+ import org.apache.cassandra.net.MessageIn;
+ import org.apache.cassandra.net.MessageOut;
+ import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.utils.CloseableIterator;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.IFilter;
+ 
+ public class RowDataResolver extends AbstractRowResolver
+ {
+     private int maxLiveCount = 0;
+     public List<IAsyncResult> repairResults = Collections.emptyList();
+     private final IDiskAtomFilter filter;
+ 
+     public RowDataResolver(String table, ByteBuffer key, IDiskAtomFilter qFilter)
+     {
+         super(key, table);
+         this.filter = qFilter;
+     }
+ 
+     /*
+     * This method handles the following scenario:
+     *
+     * there was a mismatch on the initial read, so we redid the digest requests
+     * as full data reads.  In this case we need to compute the most recent version
+     * of each column, and send diffs to out-of-date replicas.
+     */
+     public Row resolve() throws DigestMismatchException, IOException
+     {
+         if (logger.isDebugEnabled())
+             logger.debug("resolving " + replies.size() + " responses");
+         long startTime = System.currentTimeMillis();
+ 
+         ColumnFamily resolved;
+         if (replies.size() > 1)
+         {
+             List<ColumnFamily> versions = new ArrayList<ColumnFamily>(replies.size());
+             List<InetAddress> endpoints = new ArrayList<InetAddress>(replies.size());
+ 
+             for (MessageIn<ReadResponse> message : replies)
+             {
+                 ReadResponse response = message.payload;
+                 ColumnFamily cf = response.row().cf;
+                 assert !response.isDigestQuery() : "Received digest response to repair read
from " + message.from;
+                 versions.add(cf);
+                 endpoints.add(message.from);
+ 
+                 // compute maxLiveCount to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
+                 int liveCount = cf == null ? 0 : filter.getLiveCount(cf);
+                 if (liveCount > maxLiveCount)
+                     maxLiveCount = liveCount;
+             }
+ 
+             resolved = resolveSuperset(versions);
+             if (logger.isDebugEnabled())
+                 logger.debug("versions merged");
+ 
+             // send updates to any replica that was missing part of the full row
+             // (resolved can be null even if versions doesn't have all nulls because of
the call to removeDeleted in resolveSuperSet)
+             if (resolved != null)
+                 repairResults = scheduleRepairs(resolved, table, key, versions, endpoints);
+         }
+         else
+         {
+             resolved = replies.iterator().next().payload.row().cf;
+         }
+ 
+         if (logger.isDebugEnabled())
+             logger.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
+ 
+         return new Row(key, resolved);
+     }
+ 
+     /**
+      * For each row version, compare with resolved (the superset of all row versions);
+      * if it is missing anything, send a mutation to the endpoint it come from.
+      */
+     public static List<IAsyncResult> scheduleRepairs(ColumnFamily resolved, String
table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints)
+     {
+         List<IAsyncResult> results = new ArrayList<IAsyncResult>(versions.size());
+ 
+         for (int i = 0; i < versions.size(); i++)
+         {
+             ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
+             if (diffCf == null) // no repair needs to happen
+                 continue;
+ 
+             // create and send the row mutation message based on the diff
+             RowMutation rowMutation = new RowMutation(table, key.key);
+             rowMutation.add(diffCf);
+             MessageOut repairMessage;
+             // use a separate verb here because we don't want these to be get the white
glove hint-
+             // on-timeout behavior that a "real" mutation gets
+             repairMessage = rowMutation.createMessage(MessagingService.Verb.READ_REPAIR);
+             results.add(MessagingService.instance().sendRR(repairMessage, endpoints.get(i)));
+         }
+ 
+         return results;
+     }
+ 
+     static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions)
+     {
+         assert Iterables.size(versions) > 0;
+ 
+         ColumnFamily resolved = null;
+         for (ColumnFamily cf : versions)
+         {
+             if (cf == null)
+                 continue;
+ 
+             if (resolved == null)
+                 resolved = cf.cloneMeShallow();
+             else
+                 resolved.delete(cf);
+         }
+         if (resolved == null)
+             return null;
+ 
+         // mimic the collectCollatedColumn + removeDeleted path that getColumnFamily takes.
+         // this will handle removing columns and subcolumns that are supressed by a row
or
+         // supercolumn tombstone.
 -        QueryFilter filter = new QueryFilter(null, new QueryPath(resolved.metadata().cfName),
new IdentityQueryFilter());
 -        List<CloseableIterator<IColumn>> iters = new ArrayList<CloseableIterator<IColumn>>();
++        QueryFilter filter = new QueryFilter(null, resolved.metadata().cfName, new IdentityQueryFilter());
++        List<CloseableIterator<Column>> iters = new ArrayList<CloseableIterator<Column>>();
+         for (ColumnFamily version : versions)
+         {
+             if (version == null)
+                 continue;
+             iters.add(FBUtilities.closeableIterator(version.iterator()));
+         }
+         filter.collateColumns(resolved, iters, Integer.MIN_VALUE);
+         return ColumnFamilyStore.removeDeleted(resolved, Integer.MIN_VALUE);
+     }
+ 
+     public Row getData() throws IOException
+     {
+         return replies.iterator().next().payload.row();
+     }
+ 
+     public boolean isDataPresent()
+     {
+         return !replies.isEmpty();
+     }
+ 
+     public int getMaxLiveCount()
+     {
+         return maxLiveCount;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94d76aa6/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94d76aa6/test/unit/org/apache/cassandra/service/RowResolverTest.java
----------------------------------------------------------------------


Mime
View raw message