apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pra...@apache.org
Subject apex-core git commit: APEXCORE-745 Buffer server may stop processing tuples when backpressure is enabled
Date Fri, 30 Jun 2017 23:27:58 GMT
Repository: apex-core
Updated Branches:
  refs/heads/master 91effc979 -> 33812f657


APEXCORE-745 Buffer server may stop processing tuples when backpressure is enabled


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/33812f65
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/33812f65
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/33812f65

Branch: refs/heads/master
Commit: 33812f65712f9965d7e6140d5da638fafb24cc8e
Parents: 91effc9
Author: Vlad Rozov <vrozov@apache.org>
Authored: Thu Jun 8 14:08:48 2017 -0700
Committer: Vlad Rozov <vrozov@apache.org>
Committed: Fri Jun 30 14:37:04 2017 -0700

----------------------------------------------------------------------
 .../bufferserver/internal/DataList.java         | 38 +++++++++++++++-----
 .../bufferserver/internal/LogicalNode.java      |  2 +-
 .../datatorrent/bufferserver/server/Server.java |  1 -
 3 files changed, 30 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/33812f65/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
index 5813b56..69efc04 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java
@@ -32,6 +32,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.common.util.ToStringStyle;
+import org.apache.commons.lang.builder.ToStringBuilder;
+
 import com.datatorrent.bufferserver.packet.BeginWindowTuple;
 import com.datatorrent.bufferserver.packet.MessageType;
 import com.datatorrent.bufferserver.packet.ResetWindowTuple;
@@ -55,6 +58,8 @@ import static com.google.common.collect.Sets.newHashSet;
  */
 public class DataList
 {
+  private static final Logger logger = LoggerFactory.getLogger(DataList.class);
+
   private final int MAX_COUNT_OF_INMEM_BLOCKS;
   protected final String identifier;
   private final int blockSize;
@@ -291,7 +296,12 @@ public class DataList
 
   public void notifyListeners()
   {
-    listenersNotifier.moreDataAvailable();
+    try {
+      listenersNotifier.moreDataAvailable();
+    } catch (RuntimeException e) {
+      logger.warn("{}", listenersNotifier, e);
+    }
+    logger.debug("{} notified", listenersNotifier);
   }
 
   public void setAutoFlushExecutor(final ExecutorService es)
@@ -359,6 +369,7 @@ public class DataList
 
       set.add(dl);
     }
+    listenersNotifier.run();
   }
 
   public void removeDataListener(DataListener dl)
@@ -536,7 +547,7 @@ public class DataList
   @Override
   public String toString()
   {
-    return getClass().getName() + '@' + Integer.toHexString(hashCode()) + " {" + identifier
+ '}';
+    return new ToStringBuilder(this, ToStringStyle.DEFAULT).append("identifier", identifier).toString();
   }
 
   /**
@@ -1123,7 +1134,7 @@ public class DataList
       final Future<?> future = this.future;
       if (future == null || future.isDone() || future.isCancelled()) {
         // Do not schedule a new task if there is an existing one that is still running or
is waiting in the queue
-        this.future = autoFlushExecutor.submit(listenersNotifier);
+        this.future = autoFlushExecutor.submit(this);
       } else {
         synchronized (this) {
           if (this.future == null) {
@@ -1143,7 +1154,7 @@ public class DataList
         try {
           doesAtLeastOneListenerHaveDataToSend |= dl.addedData(false);
         } catch (RuntimeException e) {
-          logger.error("{}: removing DataListener {} due to exception", DataList.this, dl,
e);
+          logger.warn("{} removing {} due to exception", this, dl, e);
           removeDataListener(dl);
           break;
         }
@@ -1159,7 +1170,7 @@ public class DataList
             return true;
           }
         } catch (RuntimeException e) {
-          logger.error("{}: removing DataListener {} due to exception", DataList.this, dl,
e);
+          logger.warn("{} removing {} due to exception", this, dl, e);
           removeDataListener(dl);
           return checkIfListenersHaveDataToSendOnly();
         }
@@ -1170,6 +1181,7 @@ public class DataList
     @Override
     public void run()
     {
+      logger.debug("{} entered run", this);
       try {
         if (addedData() || checkIfListenersHaveDataToSendOnly()) {
           future = autoFlushExecutor.submit(this);
@@ -1183,11 +1195,19 @@ public class DataList
             }
           }
         }
-      } catch (Exception e) {
-        logger.error("{}", DataList.this, e);
+      } catch (RuntimeException e) {
+        logger.warn("{}", this, e);
+      } finally {
+        logger.debug("{} exiting run", this);
       }
     }
-  }
 
-  private static final Logger logger = LoggerFactory.getLogger(DataList.class);
+    @Override
+    public String toString()
+    {
+      return new ToStringBuilder(this, ToStringStyle.DEFAULT).append(DataList.this)
+          .append("future", future == null ? null : future.getClass().getSimpleName() + '@'
+ Integer.toHexString(System.identityHashCode(future)))
+          .append("isMoreDataAvailable", isMoreDataAvailable).toString();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/33812f65/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index b06e60a..3e8846d 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -151,7 +151,7 @@ public class LogicalNode implements DataListener
   /**
    *
    */
-  public void catchUp()
+  private void catchUp()
   {
     caughtup = false;
     if (isReady()) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/33812f65/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 857e51e..c5700f2 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -648,7 +648,6 @@ public class Server extends AbstractServer
         {
           final DataList dl = publisherBuffers.get(ln.getUpstream());
           if (dl != null) {
-            ln.catchUp();
             dl.addDataListener(ln);
           } else {
             logger.error("Disconnecting {} with no matching data list.", this);


Mime
View raw message