cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject git commit: Fix streaming retry patch by yukim; reviewed by slebresne for CASSANDRA-5775
Date Mon, 22 Jul 2013 13:52:05 GMT
Updated Branches:
  refs/heads/trunk fbe63ab3d -> dfc9faf28


Fix streaming retry
patch by yukim; reviewed by slebresne for CASSANDRA-5775


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dfc9faf2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dfc9faf2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dfc9faf2

Branch: refs/heads/trunk
Commit: dfc9faf28bccdd0e0da0681698ca5223968369d1
Parents: fbe63ab
Author: Yuki Morishita <yukim@apache.org>
Authored: Mon Jul 22 08:50:58 2013 -0500
Committer: Yuki Morishita <yukim@apache.org>
Committed: Mon Jul 22 08:51:56 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamReader.java       | 31 ++++++++---
 .../cassandra/streaming/StreamSession.java      | 19 +++++--
 .../cassandra/streaming/StreamTransferTask.java |  4 +-
 .../compress/CompressedStreamReader.java        | 13 ++---
 .../streaming/messages/ReceivedMessage.java     | 57 ++++++++++++++++++++
 .../streaming/messages/StreamMessage.java       |  7 +--
 7 files changed, 107 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfc9faf2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5e2f062..f7beb5e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,7 @@
  * Update deletion timestamp in Commit#updatesWithPaxosTime (CASSANDRA-5787)
  * Thrift cas() method crashes if input columns are not sorted (CASSANDRA-5786)
  * Order columns names correctly when querying for CAS (CASSANDRA-5788)
+ * Fix streaming retry (CASSANDRA-5775)
 
 
 2.0.0-beta1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfc9faf2/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 5c19eb1..862f5a2 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.streaming;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.util.Collection;
@@ -76,17 +77,12 @@ public class StreamReader
 
         Pair<String, String> kscf = Schema.instance.getCF(cfId);
         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-        Directories.DataDirectory localDir = cfs.directories.getLocationCapableOfSize(totalSize);
-        if (localDir == null)
-            throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
-        desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
 
-        SSTableWriter writer = new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys);
+        SSTableWriter writer = createWriter(cfs, totalSize);
+        DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
+        BytesReadTracker in = new BytesReadTracker(dis);
         try
         {
-            DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
-            BytesReadTracker in = new BytesReadTracker(dis);
-
             while (in.getBytesRead() < totalSize)
             {
                 writeRow(writer, in, cfs);
@@ -98,6 +94,7 @@ public class StreamReader
         catch (Throwable e)
         {
             writer.abort();
+            drain(dis, in.getBytesRead());
             if (e instanceof IOException)
                 throw (IOException) e;
             else
@@ -105,6 +102,24 @@ public class StreamReader
         }
     }
 
+    protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize) throws IOException
+    {
+        Directories.DataDirectory localDir = cfs.directories.getLocationCapableOfSize(totalSize);
+        if (localDir == null)
+            throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
+        desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
+
+        return new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys);
+    }
+
+    protected void drain(InputStream dis, long bytesRead) throws IOException
+    {
+        long toSkip = totalSize() - bytesRead;
+        toSkip = toSkip - dis.skip(toSkip);
+        while (toSkip > 0)
+            toSkip = toSkip - dis.skip(toSkip);
+    }
+
     protected long totalSize()
     {
         long size = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfc9faf2/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index aeb4419..2c4b47d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.streaming;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.Socket;
 import java.util.*;
 import java.util.concurrent.Future;
 
@@ -359,7 +358,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber,
IFailureDe
                 break;
 
             case FILE:
-                received((FileMessage) message);
+                receive((FileMessage) message);
+                break;
+
+            case RECEIVED:
+                ReceivedMessage received = (ReceivedMessage) message;
+                received(received.cfId, received.sequenceNumber);
                 break;
 
             case RETRY:
@@ -455,7 +459,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber,
IFailureDe
     {
         StreamingMetrics.totalOutgoingBytes.inc(header.size());
         metrics.outgoingBytes.inc(header.size());
-        transfers.get(header.cfId).complete(header.sequenceNumber);
     }
 
     /**
@@ -463,10 +466,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber,
IFailureDe
      *
      * @param message received file
      */
-    public void received(FileMessage message)
+    public void receive(FileMessage message)
     {
         StreamingMetrics.totalIncomingBytes.inc(message.header.size());
         metrics.incomingBytes.inc(message.header.size());
+        // send back file received message
+        handler.sendMessage(new ReceivedMessage(message.header.cfId, message.header.sequenceNumber));
         receivers.get(message.header.cfId).received(message.sstable);
     }
 
@@ -476,6 +481,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber,
IFailureDe
         streamResult.handleProgress(progress);
     }
 
+    public void received(UUID cfId, int sequenceNumber)
+    {
+        transfers.get(cfId).complete(sequenceNumber);
+    }
+
     /**
      * Call back on receiving {@code StreamMessage.Type.RETRY} message.
      *
@@ -513,6 +523,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber,
IFailureDe
 
     public void doRetry(FileMessageHeader header, Throwable e)
     {
+        logger.warn("retrying for following error", e);
         // retry
         retries++;
         if (retries > DatabaseDescriptor.getMaxStreamingRetries())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfc9faf2/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 956692d..61ad058 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -49,9 +49,9 @@ public class StreamTransferTask extends StreamTask
     }
 
     /**
-     * Complete sending file.
+     * Received ACK for file at {@code sequenceNumber}.
      *
-     * @param sequenceNumber sequence number of completed file transfer
+     * @param sequenceNumber sequence number of file
      */
     public void complete(int sequenceNumber)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfc9faf2/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 1e8308f..da290c3 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -61,20 +61,16 @@ public class CompressedStreamReader extends StreamReader
     public SSTableReader read(ReadableByteChannel channel) throws IOException
     {
         long totalSize = totalSize();
-        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel),
compressionInfo);
 
         Pair<String, String> kscf = Schema.instance.getCF(cfId);
         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-        Directories.DataDirectory localDir = cfs.directories.getLocationCapableOfSize(totalSize);
-        if (localDir == null)
-            throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
-        desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
 
-        SSTableWriter writer = new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys);
+        SSTableWriter writer = createWriter(cfs, totalSize);
+
+        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel),
compressionInfo);
+        BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
         try
         {
-            BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
-
             for (Pair<Long, Long> section : sections)
             {
                 long length = section.right - section.left;
@@ -93,6 +89,7 @@ public class CompressedStreamReader extends StreamReader
         catch (Throwable e)
         {
             writer.abort();
+            drain(cis, in.getBytesRead());
             if (e instanceof IOException)
                 throw (IOException) e;
             else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfc9faf2/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
new file mode 100644
index 0000000..daf8bf1
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.*;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.UUID;
+
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+public class ReceivedMessage extends StreamMessage
+{
+    public static Serializer<ReceivedMessage> serializer = new Serializer<ReceivedMessage>()
+    {
+        public ReceivedMessage deserialize(ReadableByteChannel in, int version, StreamSession
session) throws IOException
+        {
+            DataInput input = new DataInputStream(Channels.newInputStream(in));
+            return new ReceivedMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version),
input.readInt());
+        }
+
+        public void serialize(ReceivedMessage message, WritableByteChannel out, int version,
StreamSession session) throws IOException
+        {
+            DataOutput output = new DataOutputStream(Channels.newOutputStream(out));
+            UUIDSerializer.serializer.serialize(message.cfId, output, MessagingService.current_version);
+            output.writeInt(message.sequenceNumber);
+        }
+    };
+
+    public final UUID cfId;
+    public final int sequenceNumber;
+
+    public ReceivedMessage(UUID cfId, int sequenceNumber)
+    {
+        super(Type.RECEIVED);
+        this.cfId = cfId;
+        this.sequenceNumber = sequenceNumber;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfc9faf2/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index f737675..11e9955 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -65,9 +65,10 @@ public abstract class StreamMessage
     {
         PREPARE(1, 5, PrepareMessage.serializer),
         FILE(2, 0, FileMessage.serializer),
-        RETRY(3, 1, RetryMessage.serializer),
-        COMPLETE(4, 4, CompleteMessage.serializer),
-        SESSION_FAILED(5, 5, SessionFailedMessage.serializer);
+        RECEIVED(3, 1, ReceivedMessage.serializer),
+        RETRY(4, 1, RetryMessage.serializer),
+        COMPLETE(5, 4, CompleteMessage.serializer),
+        SESSION_FAILED(6, 5, SessionFailedMessage.serializer);
 
         public static Type get(byte type)
         {


Mime
View raw message