apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vro...@apache.org
Subject [1/2] incubator-apex-core git commit: apexcore-407 adaptive spin millis for input operators
Date Thu, 31 Mar 2016 22:40:27 GMT
Repository: incubator-apex-core
Updated Branches:
  refs/heads/master 9e1721110 -> 21dca7ce4


apexcore-407 adaptive spin millis for input operators

changed the order of sleep


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/062c01ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/062c01ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/062c01ae

Branch: refs/heads/master
Commit: 062c01ae6e0bdc00801d7978a1a9774f98398d73
Parents: 48709d9
Author: sandeshh <sandesh.hegde@gmail.com>
Authored: Mon Mar 28 11:19:45 2016 -0700
Committer: sandeshh <sandesh.hegde@gmail.com>
Committed: Mon Mar 28 14:12:26 2016 -0700

----------------------------------------------------------------------
 api/src/main/java/com/datatorrent/api/Context.java             | 3 ++-
 .../src/main/java/com/datatorrent/stram/engine/InputNode.java  | 6 +++++-
 2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/062c01ae/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index ee90100..403157d 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -194,7 +194,8 @@ public interface Context
      */
     Attribute<Long> ACTIVATION_WINDOW_ID = new Attribute<Long>(Stateless.WINDOW_ID);
     /**
-     * Poll period in milliseconds when there are no tuples available on any of the input
ports of the operator.
+     * It is a maximum poll period in milliseconds when there are no tuples available on
any of the input ports of the
+     * operator. Platform uses the heuristic to change poll period from 0 to SPIN_MILLIS
seconds.
      * Default value is 10 milliseconds.
      */
     Attribute<Integer> SPIN_MILLIS = new Attribute<Integer>(10);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/062c01ae/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
index f28841c..f564c02 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
@@ -66,7 +66,8 @@ public class InputNode extends Node<InputOperator>
   @SuppressWarnings(value = {"SleepWhileInLoop", "BroadCatchBlock", "TooBroadCatch"})
   public final void run()
   {
-    long spinMillis = context.getValue(OperatorContext.SPIN_MILLIS);
+    long maxSpinMillis = context.getValue(OperatorContext.SPIN_MILLIS);
+    long spinMillis = 0;
     final boolean handleIdleTime = operator instanceof IdleTimeHandler;
 
     boolean insideApplicationWindow = applicationWindowCount != 0;
@@ -98,7 +99,10 @@ public class InputNode extends Node<InputOperator>
               }
               else {
                 Thread.sleep(spinMillis);
+                spinMillis = Math.min(spinMillis + 1, maxSpinMillis);
               }
+            } else {
+              spinMillis = 0;
             }
           }
           else {


Mime
View raw message