apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject incubator-apex-core git commit: APEXCORE-398 - Ack may not be delivered from buffer server to it's client
Date Mon, 21 Mar 2016 21:05:48 GMT
Repository: incubator-apex-core
Updated Branches:
  refs/heads/release-3.2 39d675d61 -> d8416f7e0


APEXCORE-398 - Ack may not be delivered from buffer server to it's client


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d8416f7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d8416f7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d8416f7e

Branch: refs/heads/release-3.2
Commit: d8416f7e00af56df6000e1be1c143b0e18baaf61
Parents: 39d675d
Author: Vlad Rozov <v.rozov@datatorrent.com>
Authored: Mon Mar 21 13:22:58 2016 -0700
Committer: Vlad Rozov <v.rozov@datatorrent.com>
Committed: Mon Mar 21 13:22:58 2016 -0700

----------------------------------------------------------------------
 .../com/datatorrent/bufferserver/server/Server.java  | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d8416f7e/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index c39605c..65fcf51 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -188,7 +188,12 @@ public class Server implements ServerListener
 
     final byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length);
     System.arraycopy(message, 0, tuple, tuple.length - message.length, message.length);
-    ctx.write(tuple);
+    if (ctx.write(tuple)) {
+      ctx.write();
+    } else {
+      logger.error("Failed to deliver purge ack message. {} send buffers are full.", ctx);
+      throw new RuntimeException("Failed to deliver purge ack message. " + ctx + "send buffers
are full.");
+    }
   }
 
   private void handleResetRequest(ResetRequestTuple request, final AbstractLengthPrependerClient
ctx) throws IOException
@@ -210,7 +215,12 @@ public class Server implements ServerListener
 
     final byte[] tuple = PayloadTuple.getSerializedTuple(0, message.length);
     System.arraycopy(message, 0, tuple, tuple.length - message.length, message.length);
-    ctx.write(tuple);
+    if (ctx.write(tuple)) {
+      ctx.write();
+    } else {
+      logger.error("Failed to deliver reset ack message. {} send buffers are full.", ctx);
+      throw new RuntimeException("Failed to deliver reset ack message. " + ctx + "send buffers
are full.");
+    }
   }
 
   /**
@@ -369,6 +379,7 @@ public class Server implements ServerListener
       key.attach(client);
       key.interestOps(SelectionKey.OP_READ);
       client.registered(key);
+      client.connected();
 
       int len = writeOffset - readOffset - size;
       if (len > 0) {


Mime
View raw message