apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject apex-malhar git commit: APEXMALHAR-2201 Suppressed console output in tests of Stream API.
Date Thu, 01 Sep 2016 20:32:37 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master 8f00cefa2 -> f006ac6f5


APEXMALHAR-2201 Suppressed console output in tests of Stream API.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/f006ac6f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/f006ac6f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/f006ac6f

Branch: refs/heads/master
Commit: f006ac6f557340fe620839debec2f076e1e291af
Parents: 8f00cef
Author: Shunxin <lushunxin@hotmail.com>
Authored: Wed Aug 31 11:18:56 2016 -0700
Committer: Shunxin <lushunxin@hotmail.com>
Committed: Thu Sep 1 13:05:52 2016 -0700

----------------------------------------------------------------------
 .../malhar/stream/sample/MinimalWordCount.java  |  2 +-
 .../malhar/stream/sample/WindowedWordCount.java | 53 ++++++++++++--------
 .../stream/sample/complete/AutoComplete.java    | 53 ++++++++++++--------
 .../sample/complete/TopWikipediaSessions.java   |  1 +
 .../stream/sample/complete/TrafficRoutes.java   |  2 +-
 .../sample/cookbook/CombinePerKeyExamples.java  | 30 +++++++----
 .../stream/sample/cookbook/DeDupExample.java    |  5 +-
 .../stream/sample/MinimalWordCountTest.java     |  2 +-
 .../stream/sample/WindowedWordCountTest.java    |  3 +-
 .../sample/complete/AutoCompleteTest.java       |  3 +-
 .../complete/TopWikipediaSessionsTest.java      |  1 +
 .../sample/complete/TrafficRoutesTest.java      |  1 +
 .../cookbook/CombinePerKeyExamplesTest.java     |  6 +--
 .../sample/cookbook/DeDupExampleTest.java       |  2 +-
 .../apex/malhar/stream/api/ApexStream.java      |  6 +++
 .../malhar/stream/api/impl/ApexStreamImpl.java  |  9 ++++
 .../stream/sample/ApplicationWithStreamAPI.java |  6 ++-
 .../sample/ApplicationWithStreamAPITest.java    |  2 +
 .../apex/malhar/stream/sample/MyStreamTest.java |  2 +-
 19 files changed, 124 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
index 21afc5b..03579ab 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
@@ -117,7 +117,7 @@ public class MinimalWordCount implements StreamingApplication
           }
         }, name("FormatResults"))
         // Print the result.
-        .print()
+        .print(name("console"))
         // Attach a collector to the stream to collect results.
         .endWith(collector, collector.input, name("Collector"))
         // populate the dag using the stream.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
index c8a0e51..f020ddf 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
@@ -64,17 +64,11 @@ public class WindowedWordCount implements StreamingApplication
    */
   public static class TextInput extends BaseOperator implements InputOperator
   {
-    private static boolean done = false;
-
     public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+    private boolean done = false;
 
     private transient BufferedReader reader;
 
-    public static boolean isDone()
-    {
-      return done;
-    }
-
     @Override
     public void setup(Context.OperatorContext context)
     {
@@ -101,20 +95,21 @@ public class WindowedWordCount implements StreamingApplication
     @Override
     public void emitTuples()
     {
-      try {
-        String line = reader.readLine();
-        if (line == null) {
-          done = true;
-          reader.close();
-          Thread.sleep(1000);
-        } else {
-          this.output.emit(line);
+      if (!done) {
+        try {
+          String line = reader.readLine();
+          if (line == null) {
+            done = true;
+            reader.close();
+          } else {
+            this.output.emit(line);
+          }
+          Thread.sleep(50);
+        } catch (IOException ex) {
+          throw new RuntimeException(ex);
+        } catch (InterruptedException e) {
+          throw Throwables.propagate(e);
         }
-        Thread.sleep(50);
-      } catch (IOException ex) {
-        throw new RuntimeException(ex);
-      } catch (InterruptedException e) {
-        throw Throwables.propagate(e);
       }
     }
   }
@@ -122,6 +117,19 @@ public class WindowedWordCount implements StreamingApplication
   public static class Collector extends BaseOperator
   {
     private static Map<KeyValPair<Long, String>, Long> result = new HashMap<>();
+    private static boolean done = false;
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      super.setup(context);
+      done = false;
+    }
+
+    public static boolean isDone()
+    {
+      return done;
+    }
 
     public static Map<KeyValPair<Long, String>, Long> getResult()
     {
@@ -134,6 +142,9 @@ public class WindowedWordCount implements StreamingApplication
       public void process(PojoEvent tuple)
       {
         result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getWord()),
tuple.getCount());
+        if (tuple.getWord().equals("bye")) {
+          done = true;
+        }
       }
     };
   }
@@ -270,7 +281,7 @@ public class WindowedWordCount implements StreamingApplication
         }, name("count words"))
 
         // Format the output and print out the result.
-        .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print();
+        .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print(name("console"));
 
     wordCounts.endWith(collector, collector.input, name("Collector")).populateDag(dag);
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
index 00c40e7..7ac6621 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
@@ -75,16 +75,11 @@ public class AutoComplete implements StreamingApplication
    */
   public static class TweetsInput extends BaseOperator implements InputOperator
   {
-    private static boolean done = false;
     public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+    private boolean done;
 
     private transient BufferedReader reader;
 
-    public static boolean isDone()
-    {
-      return done;
-    }
-
     @Override
     public void setup(OperatorContext context)
     {
@@ -111,20 +106,21 @@ public class AutoComplete implements StreamingApplication
     @Override
     public void emitTuples()
     {
-      try {
-        String line = reader.readLine();
-        if (line == null) {
-          done = true;
-          reader.close();
-          Thread.sleep(1000);
-        } else {
-          this.output.emit(line);
+      if (!done) {
+        try {
+          String line = reader.readLine();
+          if (line == null) {
+            done = true;
+            reader.close();
+          } else {
+            this.output.emit(line);
+          }
+          Thread.sleep(50);
+        } catch (IOException ex) {
+          throw new RuntimeException(ex);
+        } catch (InterruptedException e) {
+          // Ignore it.
         }
-        Thread.sleep(50);
-      } catch (IOException ex) {
-        throw new RuntimeException(ex);
-      } catch (InterruptedException e) {
-        // Ignore it.
       }
     }
   }
@@ -132,6 +128,19 @@ public class AutoComplete implements StreamingApplication
   public static class Collector extends BaseOperator
   {
     private static Map<String, List<CompletionCandidate>> result = new HashMap<>();
+    private static boolean done = false;
+
+    public static boolean isDone()
+    {
+      return done;
+    }
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+      super.setup(context);
+      done = false;
+    }
 
     public static Map<String, List<CompletionCandidate>> getResult()
     {
@@ -143,6 +152,9 @@ public class AutoComplete implements StreamingApplication
       @Override
       public void process(Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>
tuple)
       {
+        if (tuple.getValue().getKey().equals("yarn")) {
+          done = true;
+        }
         result.put(tuple.getValue().getKey(), tuple.getValue().getValue());
       }
     };
@@ -303,7 +315,8 @@ public class AutoComplete implements StreamingApplication
         .flatMap(new ExtractHashtags());
 
     tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
-        .addCompositeStreams(ComputeTopCompletions.top(10, true)).endWith(collector, collector.input,
name("collector"))
+        .addCompositeStreams(ComputeTopCompletions.top(10, true)).print(name("console"))
+        .endWith(collector, collector.input, name("collector"))
         .populateDag(dag);
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
index d7d62fe..a697d52 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
@@ -335,6 +335,7 @@ public class TopWikipediaSessions implements StreamingApplication
     Collector collector = new Collector();
     StreamFactory.fromInput(sg, sg.output, name("sessionGen"))
       .addCompositeStreams(new ComputeTopSessions())
+      .print(name("console"))
       .endWith(collector, collector.input, name("collector")).populateDag(dag);
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
index 3045238..08aa8c8 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
@@ -514,7 +514,7 @@ public class TrafficRoutes implements StreamingApplication
         .addCompositeStreams(new TrackSpeed())
 
         // print the result to console.
-        .print()
+        .print(name("console"))
         .endWith(collector, collector.input, name("Collector"))
         .populateDag(dag);
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
index 7c16521..653207a 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
@@ -201,11 +201,6 @@ public class CombinePerKeyExamples implements StreamingApplication
     private String[] corpuses = new String[]{"1", "2", "3", "4", "5", "6", "7", "8"};
     private static int i;
 
-    public static int getI()
-    {
-      return i;
-    }
-
     @Override
     public void setup(Context.OperatorContext context)
     {
@@ -219,10 +214,10 @@ public class CombinePerKeyExamples implements StreamingApplication
       while (i < 1) {
         for (String word : words) {
           for (String corpus : corpuses) {
-            beanOutput.emit(new SampleBean(word, corpus));
             try {
-              Thread.sleep(100);
-            } catch (InterruptedException e) {
+              Thread.sleep(50);
+              beanOutput.emit(new SampleBean(word, corpus));
+            } catch (Exception e) {
               // Ignore it
             }
           }
@@ -235,12 +230,24 @@ public class CombinePerKeyExamples implements StreamingApplication
 
   public static class Collector extends BaseOperator
   {
-    static List<SampleBean> result;
+    private static List<SampleBean> result;
+    private static boolean done = false;
+
+    public static List<SampleBean> getResult()
+    {
+      return result;
+    }
+
+    public static boolean isDone()
+    {
+      return done;
+    }
 
     @Override
     public void setup(Context.OperatorContext context)
     {
       result = new ArrayList<>();
+      done = false;
     }
 
     public final transient DefaultInputPort<SampleBean> input = new DefaultInputPort<SampleBean>()
@@ -248,6 +255,9 @@ public class CombinePerKeyExamples implements StreamingApplication
       @Override
       public void process(SampleBean tuple)
       {
+        if (tuple.getWord().equals("F")) {
+          done = true;
+        }
         result.add(tuple);
       }
     };
@@ -265,7 +275,7 @@ public class CombinePerKeyExamples implements StreamingApplication
     Collector collector = new Collector();
     StreamFactory.fromInput(input, input.beanOutput, name("input"))
       .addCompositeStreams(new PlaysForWord())
-      .print()
+      .print(name("console"))
       .endWith(collector, collector.input, name("Collector"))
       .populateDag(dag);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
index 0cd7c58..d13e2c3 100644
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
+++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
@@ -117,8 +117,9 @@ public class DeDupExample implements StreamingApplication
         new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(1)))
 
         // Remove the duplicate words and print out the result.
-        .accumulate(new RemoveDuplicates<String>(), name("RemoveDuplicates")).print().endWith(collector,
collector.input)
-
+        .accumulate(new RemoveDuplicates<String>(), name("RemoveDuplicates"))
+        .print(name("console"))
+        .endWith(collector, collector.input)
         .populateDag(dag);
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
index d32da72..c078683 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
@@ -37,7 +37,7 @@ public class MinimalWordCountTest
   {
     LocalMode lma = LocalMode.newInstance();
     Configuration conf = new Configuration(false);
-
+    conf.set("dt.application.MinimalWordCount.operator.console.silent", "true");
     MinimalWordCount app = new MinimalWordCount();
 
     lma.prepareDAG(app, conf);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
index f6270d4..f0c51f6 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
@@ -46,6 +46,7 @@ public class WindowedWordCountTest
   {
     LocalMode lma = LocalMode.newInstance();
     Configuration conf = new Configuration(false);
+    conf.set("dt.application.WindowedWordCount.operator.console.silent", "true");
     lma.prepareDAG(new WindowedWordCount(), conf);
     LocalMode.Controller lc = lma.getController();
     ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
@@ -53,7 +54,7 @@ public class WindowedWordCountTest
       @Override
       public Boolean call() throws Exception
       {
-        return WindowedWordCount.TextInput.isDone();
+        return WindowedWordCount.Collector.isDone();
       }
     });
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
index 97c5ad4..4ed2d5d 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
@@ -39,6 +39,7 @@ public class AutoCompleteTest
   {
     LocalMode lma = LocalMode.newInstance();
     Configuration conf = new Configuration(false);
+    conf.set("dt.application.AutoComplete.operator.console.silent", "true");
     lma.prepareDAG(new AutoComplete(), conf);
     LocalMode.Controller lc = lma.getController();
 
@@ -47,7 +48,7 @@ public class AutoCompleteTest
       @Override
       public Boolean call() throws Exception
       {
-        return AutoComplete.TweetsInput.isDone();
+        return AutoComplete.Collector.isDone();
       }
     });
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
index c0dbaf4..fddf511 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
@@ -39,6 +39,7 @@ public class TopWikipediaSessionsTest
   {
     LocalMode lma = LocalMode.newInstance();
     Configuration conf = new Configuration(false);
+    conf.set("dt.application.TopWikipediaSessions.operator.console.silent", "true");
     lma.prepareDAG(new TopWikipediaSessions(), conf);
     LocalMode.Controller lc = lma.getController();
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
index c532898..766fa60 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
@@ -41,6 +41,7 @@ public class TrafficRoutesTest
   {
     LocalMode lma = LocalMode.newInstance();
     Configuration conf = new Configuration(false);
+    conf.set("dt.application.TrafficRoutes.operator.console.silent", "true");
     lma.prepareDAG(new TrafficRoutes(), conf);
     LocalMode.Controller lc = lma.getController();
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
index b130808..1e14fff 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
@@ -35,7 +35,7 @@ public class CombinePerKeyExamplesTest
   {
     LocalMode lma = LocalMode.newInstance();
     Configuration conf = new Configuration(false);
-
+    conf.set("dt.application.CombinePerKeyExamples.operator.console.silent", "true");
     CombinePerKeyExamples app = new CombinePerKeyExamples();
 
     lma.prepareDAG(app, conf);
@@ -46,11 +46,11 @@ public class CombinePerKeyExamplesTest
       @Override
       public Boolean call() throws Exception
       {
-        return CombinePerKeyExamples.SampleInput.getI() >= 1;
+        return CombinePerKeyExamples.Collector.isDone();
       }
     });
     lc.run(100000);
 
-    Assert.assertTrue(CombinePerKeyExamples.Collector.result.get(CombinePerKeyExamples.Collector.result.size()
- 1).getCorpus().contains("1, 2, 3, 4, 5, 6, 7, 8"));
+    Assert.assertTrue(CombinePerKeyExamples.Collector.getResult().get(CombinePerKeyExamples.Collector.getResult().size()
- 2).getCorpus().contains("1, 2, 3, 4, 5, 6, 7, 8"));
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
index a175cd7..7f93f50 100644
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
+++ b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
@@ -38,7 +38,7 @@ public class DeDupExampleTest
   {
     LocalMode lma = LocalMode.newInstance();
     Configuration conf = new Configuration(false);
-
+    conf.set("dt.application.DeDupExample.operator.console.silent", "true");
     DeDupExample app = new DeDupExample();
     lma.prepareDAG(app, conf);
     LocalMode.Controller lc = lma.getController();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
index 6d44534..47f358f 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/ApexStream.java
@@ -106,6 +106,12 @@ public interface ApexStream<T>
    * Add a stdout console output operator
    * @return stream itself
    */
+  <STREAM extends ApexStream<T>> STREAM print(Option... opts);
+
+  /**
+   * Add a stdout console output operator
+   * @return stream itself
+   */
   <STREAM extends ApexStream<T>> STREAM print();
 
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
----------------------------------------------------------------------
diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
index 032cb03..ba399de 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImpl.java
@@ -307,6 +307,15 @@ public class ApexStreamImpl<T> implements ApexStream<T>
 
   @Override
   @SuppressWarnings("unchecked")
+  public ApexStreamImpl<T> print(Option... opts)
+  {
+    ConsoleOutputOperator consoleOutputOperator = new ConsoleOutputOperator();
+    addOperator(consoleOutputOperator, (Operator.InputPort<T>)consoleOutputOperator.input,
null, opts);
+    return this;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
   public ApexStreamImpl<T> print()
   {
     ConsoleOutputOperator consoleOutputOperator = new ConsoleOutputOperator();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
index f65806e..a39ff35 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java
@@ -35,6 +35,8 @@ import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.util.KeyValPair;
 
+import static org.apache.apex.malhar.stream.api.Option.Options.name;
+
 /**
  * An application example with stream api
  */
@@ -56,7 +58,7 @@ public class ApplicationWithStreamAPI implements StreamingApplication
             return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
           }
         });
-    stream.print();
+    stream.print(name("WordOutput"));
     stream.window(new WindowOption.GlobalWindow(), new TriggerOption().withEarlyFiringsAtEvery(Duration
         .millis(1000)).accumulatingFiredPanes()).countByKey(new Function.ToKeyValue<String,
String, Long>()
         {
@@ -65,7 +67,7 @@ public class ApplicationWithStreamAPI implements StreamingApplication
           {
             return new Tuple.PlainTuple(new KeyValPair<>(input, 1L));
           }
-        }).print();
+        }).print(name("WCOutput"));
     stream.populateDag(dag);
 
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java
b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java
index 70f26f2..29a2070 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPITest.java
@@ -42,6 +42,8 @@ public class ApplicationWithStreamAPITest
   {
     LocalMode lma = LocalMode.newInstance();
     Configuration conf = new Configuration(false);
+    conf.set("dt.application.WordCountStreamingApiDemo.operator.WCOutput.silent", "true");
+    conf.set("dt.application.WordCountStreamingApiDemo.operator.WordOutput.silent", "true");
     lma.prepareDAG(new ApplicationWithStreamAPI(), conf);
     LocalMode.Controller lc = lma.getController();
     long start = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f006ac6f/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
----------------------------------------------------------------------
diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
index 5e48974..d912117 100644
--- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
+++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java
@@ -162,7 +162,7 @@ public class MyStreamTest
       {
         return new Tuple.PlainTuple(new KeyValPair<>(input, 1L));
       }
-    }).addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false,
30000, exitCondition);
+    }).addOperator(collector, collector.inputPort, collector.outputPort).runEmbedded(false,
30000, exitCondition);
 
 
     Map<String, Long> dataMap = new HashMap<>();


Mime
View raw message