cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [2/2] git commit: Merge branch 'cassandra-1.2' into trunk
Date Tue, 21 May 2013 21:42:02 GMT
Merge branch 'cassandra-1.2' into trunk

Conflicts:
	src/java/org/apache/cassandra/db/BatchlogManager.java
	src/java/org/apache/cassandra/db/RowMutation.java
	src/java/org/apache/cassandra/service/StorageProxy.java
	test/unit/org/apache/cassandra/db/HintedHandOffTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2ee90305
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2ee90305
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2ee90305

Branch: refs/heads/trunk
Commit: 2ee90305cd1e62033d2b78269487a73819a20c21
Parents: c609f27 3a51ccf
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Wed May 22 00:41:50 2013 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Wed May 22 00:41:50 2013 +0300

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../apache/cassandra/cql3/UntypedResultSet.java    |    5 +
 .../org/apache/cassandra/db/BatchlogManager.java   |  172 ++++++++-------
 .../apache/cassandra/db/HintedHandOffManager.java  |   25 ++-
 .../org/apache/cassandra/service/StorageProxy.java |   27 ++-
 .../org/apache/cassandra/db/HintedHandOffTest.java |    2 +-
 6 files changed, 136 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ee90305/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ee90305/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java
index 9b0c334,c56e106..6f9cb35
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@@ -134,11 -120,14 +120,12 @@@ public class BatchlogManager implement
          ByteBuffer writtenAt = LongType.instance.decompose(timestamp / 1000);
          ByteBuffer data = serializeRowMutations(mutations);
  
 -        ColumnFamily cf = ColumnFamily.create(CFMetaData.BatchlogCf);
 +        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(CFMetaData.BatchlogCf);
-         cf.addColumn(new Column(DATA, data, timestamp));
-         cf.addColumn(new Column(WRITTEN_AT, writtenAt, timestamp));
+         cf.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
+         cf.addColumn(new Column(columnName("written_at"), writtenAt, timestamp));
+         cf.addColumn(new Column(columnName("data"), data, timestamp));
 -        RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
 -        rm.add(cf);
  
 -        return rm;
 +        return new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf);
      }
  
      private static ByteBuffer serializeRowMutations(Collection<RowMutation> mutations)
@@@ -222,47 -198,90 +196,90 @@@
          DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
          int size = in.readInt();
          for (int i = 0; i < size; i++)
-             writeHintsForMutation(RowMutation.serializer.deserialize(in, VERSION));
+             replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt);
      }
  
-     private static void writeHintsForMutation(RowMutation mutation)
+     /*
+      * We try to deliver the mutations to the replicas ourselves if they are alive and only
resort to writing hints
+      * when a replica is down or a write request times out.
+      */
+     private void replaySerializedMutation(RowMutation mutation, long writtenAt) throws IOException
      {
-         String table = mutation.getTable();
+         int ttl = calculateHintTTL(mutation, writtenAt);
+         if (ttl <= 0)
+             return; // the mutation isn't safe to replay.
+ 
+         Set<InetAddress> liveEndpoints = new HashSet<InetAddress>();
+         String ks = mutation.getTable();
          Token tk = StorageService.getPartitioner().getToken(mutation.key());
-         List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table,
tk);
-         Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk,
table);
-         for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints))
+         for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks,
tk),
+                                                      StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk,
ks)))
          {
-             if (target.equals(FBUtilities.getBroadcastAddress()))
+             if (endpoint.equals(FBUtilities.getBroadcastAddress()))
                  mutation.apply();
+             else if (FailureDetector.instance.isAlive(endpoint))
+                 liveEndpoints.add(endpoint); // will try delivering directly instead of
writing a hint.
              else
-                 StorageProxy.writeHintForMutation(mutation, target);
+                 StorageProxy.writeHintForMutation(mutation, ttl, endpoint);
          }
+ 
+         if (!liveEndpoints.isEmpty())
+             attemptDirectDelivery(mutation, writtenAt, liveEndpoints);
      }
  
-     private static void deleteBatch(DecoratedKey key)
+     private void attemptDirectDelivery(RowMutation mutation, long writtenAt, Set<InetAddress>
endpoints) throws IOException
      {
-         RowMutation rm = new RowMutation(Table.SYSTEM_KS, key.key);
-         rm.delete(SystemTable.BATCHLOG_CF, FBUtilities.timestampMicros());
-         rm.apply();
+         List<WriteResponseHandler> handlers = Lists.newArrayList();
+         final CopyOnWriteArraySet<InetAddress> undelivered = new CopyOnWriteArraySet<InetAddress>(endpoints);
+         for (final InetAddress ep : endpoints)
+         {
+             Runnable callback = new Runnable()
+             {
+                 public void run()
+                 {
+                     undelivered.remove(ep);
+                 }
+             };
+             WriteResponseHandler handler = new WriteResponseHandler(ep, WriteType.UNLOGGED_BATCH,
callback);
+             MessagingService.instance().sendRR(mutation.createMessage(), ep, handler);
+             handlers.add(handler);
+         }
+ 
+         // Wait for all the requests to complete.
+         for (WriteResponseHandler handler : handlers)
+         {
+             try
+             {
+                 handler.get();
+             }
+             catch (WriteTimeoutException e)
+             {
+                 logger.debug("Timed out replaying a batched mutation to a node, will write
a hint");
+             }
+         }
+ 
+         if (!undelivered.isEmpty())
+         {
+             int ttl = calculateHintTTL(mutation, writtenAt); // recalculate ttl
+             if (ttl > 0)
+                 for (InetAddress endpoint : undelivered)
+                     StorageProxy.writeHintForMutation(mutation, ttl, endpoint);
+         }
      }
  
-     private static ByteBuffer columnName(String name)
+     // calculate ttl for the mutation's hint (and reduce ttl by the time the mutation spent
in the batchlog).
+     // this ensures that deletes aren't "undone" by an old batch replay.
+     private int calculateHintTTL(RowMutation mutation, long writtenAt)
      {
-         ByteBuffer raw = UTF8Type.instance.decompose(name);
-         return CFMetaData.BatchlogCf.getCfDef().getColumnNameBuilder().add(raw).build();
 -        return (int) ((mutation.calculateHintTTL() * 1000 - (System.currentTimeMillis()
- writtenAt)) / 1000);
++        return (int) ((HintedHandOffManager.calculateHintTTL(mutation) * 1000 - (System.currentTimeMillis()
- writtenAt)) / 1000);
      }
  
-     private static List<Row> getRangeSlice(IDiskAtomFilter columnFilter)
+     private static ByteBuffer columnName(String name)
      {
-         ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
-         IPartitioner partitioner = StorageService.getPartitioner();
-         RowPosition minPosition = partitioner.getMinimumToken().minKeyBound();
-         AbstractBounds<RowPosition> range = new Range<RowPosition>(minPosition,
minPosition, partitioner);
-         return store.getRangeSlice(range, Integer.MAX_VALUE, columnFilter, null);
+         return CFMetaData.BatchlogCf.getCfDef().getColumnNameBuilder().add(UTF8Type.instance.decompose(name)).build();
      }
  
-     /** force flush + compaction to reclaim space from replayed batches */
+     // force flush + compaction to reclaim space from the replayed batches
      private void cleanup() throws ExecutionException, InterruptedException
      {
          ColumnFamilyStore cfs = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ee90305/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/HintedHandOffManager.java
index b774045,53411f5..b16ab8b
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@@ -114,29 -112,6 +114,36 @@@ public class HintedHandOffManager imple
  
      private final ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF);
  
 +    /**
 +     * Returns a mutation representing a Hint to be sent to <code>targetId</code>
 +     * as soon as it becomes available again.
 +     */
-     public static RowMutation hintFor(RowMutation mutation, UUID targetId)
++    public static RowMutation hintFor(RowMutation mutation, int ttl, UUID targetId)
 +    {
++        assert ttl > 0;
 +        UUID hintId = UUIDGen.getTimeUUID();
- 
-         // The hint TTL is set at the smallest GCGraceSeconds for any of the CFs in the
RM;
-         // this ensures that deletes aren't "undone" by delivery of an old hint
-         int ttl = Integer.MAX_VALUE;
-         for (ColumnFamily cf : mutation.getColumnFamilies())
-             ttl = Math.min(ttl, cf.metadata().getGcGraceSeconds());
- 
 +        // serialize the hint with id and version as a composite column name
 +        ByteBuffer name = comparator.decompose(hintId, MessagingService.current_version);
 +        ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, RowMutation.serializer,
MessagingService.current_version));
 +        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(Table.SYSTEM_KS,
SystemTable.HINTS_CF));
 +        cf.addColumn(name, value, System.currentTimeMillis(), ttl);
- 
 +        return new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(targetId), cf);
 +    }
 +
++    /*
++     * determine the TTL for the hint RowMutation
++     * this is set at the smallest GCGraceSeconds for any of the CFs in the RM
++     * this ensures that deletes aren't "undone" by delivery of an old hint
++     */
++    public static int calculateHintTTL(RowMutation mutation)
++    {
++        int ttl = Integer.MAX_VALUE;
++        for (ColumnFamily cf : mutation.getColumnFamilies())
++            ttl = Math.min(ttl, cf.metadata().getGcGraceSeconds());
++        return ttl;
++    }
++
++
      public void start()
      {
          MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ee90305/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 906a387,e8440c4..3c2b8a5
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -30,8 -31,6 +30,7 @@@ import javax.management.ObjectName
  
  import com.google.common.base.Function;
  import com.google.common.collect.*;
 +import com.google.common.util.concurrent.Uninterruptibles;
- 
  import org.apache.commons.lang.StringUtils;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -777,14 -548,21 +776,21 @@@ public class StorageProxy implements St
  
          HintRunnable runnable = new HintRunnable(target)
          {
 -            public void runMayThrow() throws IOException
 +            public void runMayThrow()
              {
-                 logger.debug("Adding hint for {}", target);
- 
-                 writeHintForMutation(mutation, target);
-                 // Notify the handler only for CL == ANY
-                 if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
-                     responseHandler.response(null);
 -                int ttl = mutation.calculateHintTTL();
++                int ttl = HintedHandOffManager.calculateHintTTL(mutation);
+                 if (ttl > 0)
+                 {
+                     logger.debug("Adding hint for {}", target);
+                     writeHintForMutation(mutation, ttl, target);
+                     // Notify the handler only for CL == ANY
+                     if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
+                         responseHandler.response(null);
+                 }
+                 else
+                 {
+                     logger.debug("Skipped writing hint for {} (ttl {})", target, ttl);
+                 }
              }
          };
  
@@@ -798,13 -576,17 +804,12 @@@
          return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
      }
  
-     public static void writeHintForMutation(RowMutation mutation, InetAddress target)
 -    public static void writeHintForMutation(RowMutation mutation, int ttl, InetAddress target)
throws IOException
++    public static void writeHintForMutation(RowMutation mutation, int ttl, InetAddress target)
      {
+         assert ttl > 0;
          UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target);
 -        if ((hostId == null) && (MessagingService.instance().getVersion(target)
< MessagingService.VERSION_12))
 -        {
 -            logger.warn("Unable to store hint for host with missing ID, {} (old node?)",
target.toString());
 -            return;
 -        }
          assert hostId != null : "Missing host ID for " + target.getHostAddress();
-         RowMutation hintedMutation = HintedHandOffManager.hintFor(mutation, hostId);
-         hintedMutation.apply();
- 
 -        mutation.toHint(ttl, hostId).apply();
++        HintedHandOffManager.hintFor(mutation, ttl, hostId).apply();
          totalHints.incrementAndGet();
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ee90305/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/HintedHandOffTest.java
index 87e2747,a012109..c19efcb
--- a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
+++ b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
@@@ -63,9 -64,13 +63,9 @@@ public class HintedHandOffTest extends 
  
          // insert 1 hint
          RowMutation rm = new RowMutation(TABLE4, ByteBufferUtil.bytes(1));
 -        rm.add(new QueryPath(STANDARD1_CF,
 -                             null,
 -                             ByteBufferUtil.bytes(String.valueOf(COLUMN1))),
 -               ByteBufferUtil.EMPTY_BYTE_BUFFER,
 -               System.currentTimeMillis());
 +        rm.add(STANDARD1_CF, ByteBufferUtil.bytes(String.valueOf(COLUMN1)), ByteBufferUtil.EMPTY_BYTE_BUFFER,
System.currentTimeMillis());
  
-         HintedHandOffManager.hintFor(rm, UUID.randomUUID()).apply();
 -        rm.toHint(rm.calculateHintTTL(), UUID.randomUUID()).apply();
++        HintedHandOffManager.hintFor(rm, HintedHandOffManager.calculateHintTTL(rm), UUID.randomUUID()).apply();
  
          // flush data to disk
          hintStore.forceBlockingFlush();


Mime
View raw message