apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [27/30] apex-malhar git commit: Renamed demos to examples. Packages and artifactid names are changed as suggested.
Date Tue, 07 Mar 2017 06:58:32 GMT
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
deleted file mode 100644
index 327c882..0000000
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.apex.malhar.lib.window.TriggerOption;
-import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.stream.api.function.Function;
-import org.apache.apex.malhar.stream.api.impl.StreamFactory;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
-import static org.apache.apex.malhar.stream.api.Option.Options.name;
-
-/**
- * Beam MinimalWordCount Example
- *
- * @since 3.5.0
- */
-@ApplicationAnnotation(name = "MinimalWordCount")
-public class MinimalWordCount implements StreamingApplication
-{
-  public static class Collector extends BaseOperator
-  {
-    static Map<String, Long> result;
-    private static boolean done = false;
-
-    public static boolean isDone()
-    {
-      return done;
-    }
-
-    @Override
-    public void setup(Context.OperatorContext context)
-    {
-      done = false;
-      result = new HashMap<>();
-    }
-
-    public final transient DefaultInputPort<KeyValPair<String, Long>> input = new DefaultInputPort<KeyValPair<String, Long>>()
-    {
-      @Override
-      public void process(KeyValPair<String, Long> tuple)
-      {
-        if (tuple.getKey().equals("bye")) {
-          done = true;
-        }
-        result.put(tuple.getKey(), tuple.getValue());
-      }
-    };
-  }
-
-  /**
-   * Populate the dag using High-Level API.
-   * @param dag
-   * @param conf
-   */
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    Collector collector = new Collector();
-    // Create a stream reading from a file line by line using StreamFactory.
-    StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput"))
-        // Use a flatmap transformation to extract words from the incoming stream of lines.
-        .flatMap(new Function.FlatMapFunction<String, String>()
-        {
-          @Override
-          public Iterable<String> f(String input)
-          {
-            return Arrays.asList(input.split("[^a-zA-Z']+"));
-
-          }
-        }, name("ExtractWords"))
-        // Apply windowing to the stream for counting, in this case, the window option is global window.
-        .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
-        // Count the appearances of every word.
-        .countByKey(new Function.ToKeyValue<String, String, Long>()
-        {
-          @Override
-          public Tuple<KeyValPair<String, Long>> f(String input)
-          {
-            return new Tuple.PlainTuple<KeyValPair<String, Long>>(new KeyValPair<String, Long>(input, 1L));
-          }
-        }, name("countByKey"))
-        // Format the counting result to a readable format by unwrapping the tuples.
-        .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, KeyValPair<String, Long>>()
-        {
-          @Override
-          public KeyValPair<String, Long> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
-          {
-            return input.getValue();
-          }
-        }, name("FormatResults"))
-        // Print the result.
-        .print(name("console"))
-        // Attach a collector to the stream to collect results.
-        .endWith(collector, collector.input, name("Collector"))
-        // populate the dag using the stream.
-        .populateDag(dag);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
deleted file mode 100644
index 5b83bd0..0000000
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.joda.time.Duration;
-import org.apache.apex.malhar.lib.window.TriggerOption;
-import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.stream.api.ApexStream;
-import org.apache.apex.malhar.stream.api.WindowedStream;
-import org.apache.apex.malhar.stream.api.function.Function;
-import org.apache.apex.malhar.stream.api.impl.StreamFactory;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.base.Throwables;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
-import static org.apache.apex.malhar.stream.api.Option.Options.name;
-
-/**
- * Beam WindowedWordCount Example.
- *
- * @since 3.5.0
- */
-@ApplicationAnnotation(name = "WindowedWordCount")
-public class WindowedWordCount implements StreamingApplication
-{
-  static final int WINDOW_SIZE = 1;  // Default window duration in minutes
-
-  /**
-   * A input operator that reads from and output a file line by line to downstream with a time gap between
-   * every two lines.
-   */
-  public static class TextInput extends BaseOperator implements InputOperator
-  {
-    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
-    private boolean done = false;
-
-    private transient BufferedReader reader;
-
-    @Override
-    public void setup(Context.OperatorContext context)
-    {
-      done = false;
-      initReader();
-    }
-
-    private void initReader()
-    {
-      try {
-        InputStream resourceStream = this.getClass().getResourceAsStream("/wordcount/word.txt");
-        reader = new BufferedReader(new InputStreamReader(resourceStream));
-      } catch (Exception ex) {
-        throw Throwables.propagate(ex);
-      }
-    }
-
-    @Override
-    public void teardown()
-    {
-      IOUtils.closeQuietly(reader);
-    }
-
-    @Override
-    public void emitTuples()
-    {
-      if (!done) {
-        try {
-          String line = reader.readLine();
-          if (line == null) {
-            done = true;
-            reader.close();
-          } else {
-            this.output.emit(line);
-          }
-          Thread.sleep(50);
-        } catch (IOException ex) {
-          throw new RuntimeException(ex);
-        } catch (InterruptedException e) {
-          throw Throwables.propagate(e);
-        }
-      }
-    }
-  }
-
-  public static class Collector extends BaseOperator
-  {
-    private static Map<KeyValPair<Long, String>, Long> result = new HashMap<>();
-    private static boolean done = false;
-
-    @Override
-    public void setup(Context.OperatorContext context)
-    {
-      super.setup(context);
-      done = false;
-    }
-
-    public static boolean isDone()
-    {
-      return done;
-    }
-
-    public static Map<KeyValPair<Long, String>, Long> getResult()
-    {
-      return result;
-    }
-
-    public final transient DefaultInputPort<PojoEvent> input = new DefaultInputPort<PojoEvent>()
-    {
-      @Override
-      public void process(PojoEvent tuple)
-      {
-        result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getWord()), tuple.getCount());
-        if (tuple.getWord().equals("bye")) {
-          done = true;
-        }
-      }
-    };
-  }
-
-  /**
-   * A Pojo Tuple class used for outputting result to JDBC.
-   */
-  public static class PojoEvent
-  {
-    private String word;
-    private long count;
-    private long timestamp;
-
-    @Override
-    public String toString()
-    {
-      return "PojoEvent (word=" + getWord() + ", count=" + getCount() + ", timestamp=" + getTimestamp() + ")";
-    }
-
-    public String getWord()
-    {
-      return word;
-    }
-
-    public void setWord(String word)
-    {
-      this.word = word;
-    }
-
-    public long getCount()
-    {
-      return count;
-    }
-
-    public void setCount(long count)
-    {
-      this.count = count;
-    }
-
-    public long getTimestamp()
-    {
-      return timestamp;
-    }
-
-    public void setTimestamp(long timestamp)
-    {
-      this.timestamp = timestamp;
-    }
-  }
-
-  /**
-   * A map function that wrap the input string with a random generated timestamp.
-   */
-  public static class AddTimestampFn implements Function.MapFunction<String, Tuple.TimestampedTuple<String>>
-  {
-    private static final Duration RAND_RANGE = Duration.standardMinutes(10);
-    private final Long minTimestamp;
-
-    AddTimestampFn()
-    {
-      this.minTimestamp = System.currentTimeMillis();
-    }
-
-    @Override
-    public Tuple.TimestampedTuple<String> f(String input)
-    {
-      // Generate a timestamp that falls somewhere in the past two hours.
-      long randMillis = (long)(Math.random() * RAND_RANGE.getMillis());
-      long randomTimestamp = minTimestamp + randMillis;
-
-      return new Tuple.TimestampedTuple<>(randomTimestamp, input);
-    }
-  }
-
-  /** A MapFunction that converts a Word and Count into a PojoEvent. */
-  public static class FormatAsTableRowFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, PojoEvent>
-  {
-    @Override
-    public PojoEvent f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
-    {
-      PojoEvent row = new PojoEvent();
-      row.setTimestamp(input.getTimestamp());
-      row.setCount(input.getValue().getValue());
-      row.setWord(input.getValue().getKey());
-      return row;
-    }
-  }
-
-  /**
-   * Populate dag with High-Level API.
-   * @param dag
-   * @param conf
-   */
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    TextInput input = new TextInput();
-    Collector collector = new Collector();
-
-    // Create stream from the TextInput operator.
-    ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(input, input.output, name("input"))
-
-        // Extract all the words from the input line of text.
-        .flatMap(new Function.FlatMapFunction<String, String>()
-        {
-          @Override
-          public Iterable<String> f(String input)
-          {
-            return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
-          }
-        }, name("ExtractWords"))
-
-        // Wrap the word with a randomly generated timestamp.
-        .map(new AddTimestampFn(), name("AddTimestampFn"));
-
-
-    // apply window and trigger option.
-    // TODO: change trigger option to atWaterMark when available.
-    WindowedStream<Tuple.TimestampedTuple<String>> windowedWords = stream
-        .window(new WindowOption.TimeWindows(Duration.standardMinutes(WINDOW_SIZE)),
-        new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1));
-
-
-    WindowedStream<PojoEvent> wordCounts =
-        // Perform a countByKey transformation to count the appearance of each word in every time window.
-        windowedWords.countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>()
-        {
-          @Override
-          public Tuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input)
-          {
-            return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(),
-              new KeyValPair<String, Long>(input.getValue(), 1L));
-          }
-        }, name("count words"))
-
-        // Format the output and print out the result.
-        .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print(name("console"));
-
-    wordCounts.endWith(collector, collector.input, name("Collector")).populateDag(dag);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
deleted file mode 100644
index 2db59b6..0000000
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/AutoComplete.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.complete;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.apex.malhar.lib.window.TriggerOption;
-import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.lib.window.Window;
-import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.stream.api.ApexStream;
-import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
-import org.apache.apex.malhar.stream.api.WindowedStream;
-import org.apache.apex.malhar.stream.api.function.Function;
-import org.apache.apex.malhar.stream.api.impl.StreamFactory;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.base.Throwables;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
-import static org.apache.apex.malhar.stream.api.Option.Options.name;
-
-/**
- * An example that computes the most popular hash tags
- * for every prefix, which can be used for auto-completion.
- * This application is identical to TwitterAutoComplete, except it's
- * reading from a file. This application is mainly for local testing
- * purpose.
- *
- * <p>This will update the datastore every 10 seconds based on the last
- * 30 minutes of data received.
- *
- * @since 3.5.0
- */
-@ApplicationAnnotation(name = "AutoComplete")
-public class AutoComplete implements StreamingApplication
-{
-
-  /**
-   * A dummy Twitter input operator. It reads from a text file containing some tweets and output a line every
-   * half of a second.
-   */
-  public static class TweetsInput extends BaseOperator implements InputOperator
-  {
-    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
-    private boolean done;
-
-    private transient BufferedReader reader;
-
-    @Override
-    public void setup(OperatorContext context)
-    {
-      done = false;
-      initReader();
-    }
-
-    private void initReader()
-    {
-      try {
-        InputStream resourceStream = this.getClass().getResourceAsStream("/sampletweets.txt");
-        reader = new BufferedReader(new InputStreamReader(resourceStream));
-      } catch (Exception ex) {
-        throw Throwables.propagate(ex);
-      }
-    }
-
-    @Override
-    public void teardown()
-    {
-      IOUtils.closeQuietly(reader);
-    }
-
-    @Override
-    public void emitTuples()
-    {
-      if (!done) {
-        try {
-          String line = reader.readLine();
-          if (line == null) {
-            done = true;
-            reader.close();
-          } else {
-            this.output.emit(line);
-          }
-          Thread.sleep(50);
-        } catch (IOException ex) {
-          throw new RuntimeException(ex);
-        } catch (InterruptedException e) {
-          // Ignore it.
-        }
-      }
-    }
-  }
-
-  public static class Collector extends BaseOperator
-  {
-    private static Map<String, List<CompletionCandidate>> result = new HashMap<>();
-    private static boolean done = false;
-
-    public static boolean isDone()
-    {
-      return done;
-    }
-
-    @Override
-    public void setup(OperatorContext context)
-    {
-      super.setup(context);
-      done = false;
-    }
-
-    public static Map<String, List<CompletionCandidate>> getResult()
-    {
-      return result;
-    }
-
-    public final transient DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>()
-    {
-      @Override
-      public void process(Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>> tuple)
-      {
-        if (tuple.getValue().getKey().equals("yarn")) {
-          done = true;
-        }
-        result.put(tuple.getValue().getKey(), tuple.getValue().getValue());
-      }
-    };
-  }
-
-  /**
-   * FlapMap Function to extract all hashtags from a text form tweet.
-   */
-  private static class ExtractHashtags implements Function.FlatMapFunction<String, String>
-  {
-
-    @Override
-    public Iterable<String> f(String input)
-    {
-      List<String> result = new LinkedList<>();
-      Matcher m = Pattern.compile("#\\S+").matcher(input);
-      while (m.find()) {
-        result.add(m.group().substring(1));
-      }
-      return result;
-    }
-  }
-
-  /**
-   * Lower latency, but more expensive.
-   */
-  private static class ComputeTopFlat
-      extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
-  {
-    private final int candidatesPerPrefix;
-    private final int minPrefix;
-
-    public ComputeTopFlat(int candidatesPerPrefix, int minPrefix)
-    {
-      this.candidatesPerPrefix = candidatesPerPrefix;
-      this.minPrefix = minPrefix;
-    }
-
-    @Override
-    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(
-        WindowedStream<CompletionCandidate> input)
-    {
-      return input
-        .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix))
-        .accumulateByKey(new TopNByKey(), new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String,
-          CompletionCandidate>()
-        {
-          @Override
-          public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
-          {
-            // TODO: Should be removed after Auto-wrapping is supported.
-            return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple);
-          }
-        });
-    }
-  }
-
-  /**
-   * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output
-   * KeyValPairs of the prefix and the CompletionCandidate
-   */
-  private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>>
-  {
-    private final int minPrefix;
-    private final int maxPrefix;
-
-    public AllPrefixes()
-    {
-      this(0, Integer.MAX_VALUE);
-    }
-
-    public AllPrefixes(int minPrefix)
-    {
-      this(minPrefix, Integer.MAX_VALUE);
-    }
-
-    public AllPrefixes(int minPrefix, int maxPrefix)
-    {
-      this.minPrefix = minPrefix;
-      this.maxPrefix = maxPrefix;
-    }
-
-    @Override
-    public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input)
-    {
-      List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>();
-      String word = input.getValue();
-      for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
-
-        result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input));
-      }
-      return result;
-    }
-  }
-
-  /**
-   * A Composite stream transform that takes as input a list of tokens and returns
-   * the most common tokens per prefix.
-   */
-  public static class ComputeTopCompletions
-      extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
-  {
-    private final int candidatesPerPrefix;
-    private final boolean recursive;
-
-    protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive)
-    {
-      this.candidatesPerPrefix = candidatesPerPrefix;
-      this.recursive = recursive;
-    }
-
-    public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive)
-    {
-      return new ComputeTopCompletions(candidatesPerPrefix, recursive);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream)
-    {
-      ApexStream<CompletionCandidate> candidates = inputStream
-          .countByKey(new Function.ToKeyValue<String, String, Long>()
-          {
-            @Override
-            public Tuple<KeyValPair<String, Long>> f(String input)
-            {
-              return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
-            }
-          }, name("countByKey"))
-          .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
-          {
-            @Override
-            public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
-            {
-              return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue());
-            }
-          }, name("ToCompletionCandidate"));
-
-      return candidates.addCompositeStreams(new ComputeTopFlat(10, 1));
-
-    }
-  }
-
-  /**
-   * Populate the dag with High-Level API.
-   * @param dag
-   * @param conf
-   */
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    TweetsInput input = new TweetsInput();
-    Collector collector = new Collector();
-
-    WindowOption windowOption = new WindowOption.GlobalWindow();
-
-    ApexStream<String> tags = StreamFactory.fromInput(input, input.output, name("tweetSampler"))
-        .flatMap(new ExtractHashtags());
-
-    tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
-        .addCompositeStreams(ComputeTopCompletions.top(10, true)).print(name("console"))
-        .endWith(collector, collector.input, name("collector"))
-        .populateDag(dag);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
deleted file mode 100644
index 991424e..0000000
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/CompletionCandidate.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.complete;
-
-/**
- * Class used to store tag-count pairs in Auto Complete Demo.
- *
- * @since 3.5.0
- */
-public class CompletionCandidate implements Comparable<CompletionCandidate>
-{
-  private long count;
-  private String value;
-
-  public CompletionCandidate(String value, long count)
-  {
-    this.value = value;
-    this.count = count;
-  }
-
-  public long getCount()
-  {
-    return count;
-  }
-
-  public String getValue()
-  {
-    return value;
-  }
-
-  // Empty constructor required for Kryo.
-  public CompletionCandidate()
-  {
-
-  }
-
-  @Override
-  public int compareTo(CompletionCandidate o)
-  {
-    if (this.count < o.count) {
-      return -1;
-    } else if (this.count == o.count) {
-      return this.value.compareTo(o.value);
-    } else {
-      return 1;
-    }
-  }
-
-  @Override
-  public boolean equals(Object other)
-  {
-    if (other instanceof CompletionCandidate) {
-      CompletionCandidate that = (CompletionCandidate)other;
-      return this.count == that.count && this.value.equals(that.value);
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public int hashCode()
-  {
-    return Long.valueOf(count).hashCode() ^ value.hashCode();
-  }
-
-  @Override
-  public String toString()
-  {
-    return "CompletionCandidate[" + value + ", " + count + "]";
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
deleted file mode 100644
index ee15d90..0000000
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/PojoEvent.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.complete;
-
-/**
- * Tuple Class for JdbcOutput of StreamingWordExtract.
- *
- * @since 3.5.0
- */
-public class PojoEvent extends Object
-{
-  private String stringValue;
-
-  @Override
-  public String toString()
-  {
-    return "PojoEvent [stringValue=" + getStringValue() + "]";
-  }
-
-  public void setStringValue(String newString)
-  {
-    this.stringValue = newString;
-  }
-
-  public String getStringValue()
-  {
-    return this.stringValue;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
deleted file mode 100644
index 07f01d0..0000000
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtract.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.complete;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.apex.malhar.stream.api.ApexStream;
-import org.apache.apex.malhar.stream.api.Option;
-import org.apache.apex.malhar.stream.api.function.Function;
-import org.apache.apex.malhar.stream.api.impl.StreamFactory;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.db.jdbc.JdbcFieldInfo;
-import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator;
-import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
-
-import static java.sql.Types.VARCHAR;
-
-/**
- * Beam StreamingWordExtract Example.
- *
- * @since 3.5.0
- */
-@ApplicationAnnotation(name = "StreamingWordExtract")
-public class StreamingWordExtract implements StreamingApplication
-{
-  private static int wordCount = 0; // A counter to count number of words have been extracted.
-  private static int entriesMapped = 0; // A counter to count number of entries have been mapped.
-
-  public int getWordCount()
-  {
-    return wordCount;
-  }
-
-  public int getEntriesMapped()
-  {
-    return entriesMapped;
-  }
-
-  /**
-   * A MapFunction that tokenizes lines of text into individual words.
-   */
-  public static class ExtractWords implements Function.FlatMapFunction<String, String>
-  {
-    @Override
-    public Iterable<String> f(String input)
-    {
-      List<String> result = new ArrayList<>(Arrays.asList(input.split("[^a-zA-Z0-9']+")));
-      wordCount += result.size();
-      return result;
-    }
-  }
-
-
-  /**
-   * A MapFunction that uppercases a word.
-   */
-  public static class Uppercase implements Function.MapFunction<String, String>
-  {
-    @Override
-    public String f(String input)
-    {
-      return input.toUpperCase();
-    }
-  }
-
-
-  /**
-   * A filter function to filter out empty strings.
-   */
-  public static class EmptyStringFilter implements Function.FilterFunction<String>
-  {
-    @Override
-    public boolean f(String input)
-    {
-      return !input.isEmpty();
-    }
-  }
-
-
-  /**
-   * A map function to map the result string to a pojo entry.
-   */
-  public static class PojoMapper implements Function.MapFunction<String, Object>
-  {
-
-    @Override
-    public Object f(String input)
-    {
-      PojoEvent pojo = new PojoEvent();
-      pojo.setStringValue(input);
-      entriesMapped++;
-      return pojo;
-    }
-  }
-
-  /**
-   * Add field infos to the {@link JdbcPOJOInsertOutputOperator}.
-   */
-  private static List<JdbcFieldInfo> addFieldInfos()
-  {
-    List<JdbcFieldInfo> fieldInfos = new ArrayList<>();
-    fieldInfos.add(new JdbcFieldInfo("STRINGVALUE", "stringValue", JdbcFieldInfo.SupportType.STRING, VARCHAR));
-    return fieldInfos;
-  }
-
-  /**
-   * Populate dag with High-Level API.
-   * @param dag
-   * @param conf
-   */
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator();
-    jdbcOutput.setFieldInfos(addFieldInfos());
-    JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
-    jdbcOutput.setStore(outputStore);
-    jdbcOutput.setTablename("TestTable");
-
-    // Create a stream reading from a folder.
-    ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/data");
-
-    // Extract all the words from the input line of text.
-    stream.flatMap(new ExtractWords())
-
-        // Filter out the empty strings.
-        .filter(new EmptyStringFilter())
-
-        // Change every word to uppercase.
-        .map(new Uppercase())
-
-        // Map the resulted word to a Pojo entry.
-        .map(new PojoMapper())
-
-        // Output the entries to JdbcOutput and insert them into a table.
-        .endWith(jdbcOutput, jdbcOutput.input, Option.Options.name("jdbcOutput"));
-
-    stream.populateDag(dag);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java
deleted file mode 100644
index c7ccae3..0000000
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopNByKey.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.complete;
-
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.apex.malhar.lib.window.Accumulation;
-
-/**
- * Specialized TopNByKey accumulation for AutoComplete Demo.
- *
- * @since 3.5.0
- */
-public class TopNByKey implements
-    Accumulation<CompletionCandidate, Map<String, Long>, List<CompletionCandidate>>
-{
-  int n = 10;
-
-  Comparator comparator;
-
-  public void setN(int n)
-  {
-    this.n = n;
-  }
-
-  public void setComparator(Comparator comparator)
-  {
-    this.comparator = comparator;
-  }
-
-  @Override
-  public Map<String, Long> defaultAccumulatedValue()
-  {
-    return new HashMap<>();
-  }
-
-  /**
-   * Accumulate the input. Update the entry in the Accumulation Map if the key of the input is existed, create a
-   * new entry otherwise.
-   * @param accumulatedValue
-   * @param input
-   * @return
-   */
-  @Override
-  public Map<String, Long> accumulate(Map<String, Long> accumulatedValue, CompletionCandidate input)
-  {
-    accumulatedValue.put(input.getValue(), input.getCount());
-    return accumulatedValue;
-  }
-
-  /**
-   * Merge two Maps together. For every key, keep the larger value in the resulted Map.
-   * @param accumulatedValue1
-   * @param accumulatedValue2
-   * @return
-   */
-  @Override
-  public Map<String, Long> merge(Map<String, Long> accumulatedValue1, Map<String, Long> accumulatedValue2)
-  {
-    for (Map.Entry<String, Long> entry : accumulatedValue2.entrySet()) {
-      if (accumulatedValue1.containsKey(entry.getKey()) && accumulatedValue1.get(entry.getKey()) > entry.getValue()) {
-        continue;
-      }
-      accumulatedValue1.put(entry.getKey(), entry.getValue());
-    }
-    return accumulatedValue1;
-  }
-
-  /**
-   * Loop through the Accumulation Map to get the top n entries based on their values, return a list containing
-   * those entries.
-   * @param accumulatedValue
-   * @return
-   */
-  @Override
-  public List<CompletionCandidate> getOutput(Map<String, Long> accumulatedValue)
-  {
-    LinkedList<CompletionCandidate> result = new LinkedList<>();
-    for (Map.Entry<String, Long> entry : accumulatedValue.entrySet()) {
-      int k = 0;
-      for (CompletionCandidate inMemory : result) {
-        if (entry.getValue() > inMemory.getCount()) {
-          break;
-        }
-        k++;
-      }
-      result.add(k, new CompletionCandidate(entry.getKey(), entry.getValue()));
-      if (result.size() > n) {
-        result.remove(result.get(result.size() - 1));
-      }
-    }
-    return result;
-  }
-
-  @Override
-  public List<CompletionCandidate> getRetraction(List<CompletionCandidate> value)
-  {
-    return new LinkedList<>();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
deleted file mode 100644
index 68ec733..0000000
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.complete;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Random;
-import javax.annotation.Nullable;
-
-import org.joda.time.Duration;
-
-import org.apache.apex.malhar.lib.window.TriggerOption;
-import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.lib.window.Window;
-import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.accumulation.TopN;
-import org.apache.apex.malhar.stream.api.ApexStream;
-import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
-import org.apache.apex.malhar.stream.api.WindowedStream;
-import org.apache.apex.malhar.stream.api.function.Function;
-import org.apache.apex.malhar.stream.api.impl.StreamFactory;
-import org.apache.hadoop.conf.Configuration;
-
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
-import static org.apache.apex.malhar.stream.api.Option.Options.name;
-
-/**
- * Beam's TopWikipediaSessions Example.
- *
- * @since 3.5.0
- */
-@ApplicationAnnotation(name = "TopWikipediaSessions")
-public class TopWikipediaSessions implements StreamingApplication
-{
-  /**
-   * A generator that outputs a stream of combinations of some users and some randomly generated edit time.
-   */
-  public static class SessionGen extends BaseOperator implements InputOperator
-  {
-    private String[] names = new String[]{"user1", "user2", "user3", "user4"};
-    public transient DefaultOutputPort<KeyValPair<String, Long>> output = new DefaultOutputPort<>();
-
-    private static final Duration RAND_RANGE = Duration.standardDays(365);
-    private Long minTimestamp;
-    private long sleepTime;
-    private static int tupleCount = 0;
-
-    public static int getTupleCount()
-    {
-      return tupleCount;
-    }
-
-    private String randomName(String[] names)
-    {
-      int index = new Random().nextInt(names.length);
-      return names[index];
-    }
-
-    @Override
-    public void setup(Context.OperatorContext context)
-    {
-      super.setup(context);
-      tupleCount = 0;
-      minTimestamp = System.currentTimeMillis();
-      sleepTime = context.getValue(Context.OperatorContext.SPIN_MILLIS);
-    }
-
-    @Override
-    public void emitTuples()
-    {
-      long randMillis = (long)(Math.random() * RAND_RANGE.getMillis());
-      long randomTimestamp = minTimestamp + randMillis;
-      output.emit(new KeyValPair<String, Long>(randomName(names), randomTimestamp));
-      tupleCount++;
-      try {
-        Thread.sleep(sleepTime);
-      } catch (InterruptedException e) {
-        // Ignore it.
-      }
-    }
-  }
-
-  public static class Collector extends BaseOperator
-  {
-    private final int resultSize = 5;
-    private static List<List<TempWrapper>> result = new ArrayList<>();
-
-    public static List<List<TempWrapper>> getResult()
-    {
-      return result;
-    }
-
-    public final transient DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<TempWrapper>>>()
-    {
-      @Override
-      public void process(Tuple.WindowedTuple<List<TempWrapper>> tuple)
-      {
-        if (result.size() == resultSize) {
-          result.remove(0);
-        }
-        result.add(tuple.getValue());
-      }
-    };
-  }
-
-
-  /**
-   * Convert the upstream (user, time) combination to a timestamped tuple of user.
-   */
-  static class ExtractUserAndTimestamp implements Function.MapFunction<KeyValPair<String, Long>, Tuple.TimestampedTuple<String>>
-  {
-    @Override
-    public Tuple.TimestampedTuple<String> f(KeyValPair<String, Long> input)
-    {
-      long timestamp = input.getValue();
-      String userName = input.getKey();
-
-      // Sets the implicit timestamp field to be used in windowing.
-      return new Tuple.TimestampedTuple<>(timestamp, userName);
-
-    }
-  }
-
-  /**
-   * Computes the number of edits in each user session.  A session is defined as
-   * a string of edits where each is separated from the next by less than an hour.
-   */
-  static class ComputeSessions
-      extends CompositeStreamTransform<ApexStream<Tuple.TimestampedTuple<String>>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>>
-  {
-    @Override
-    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> compose(ApexStream<Tuple.TimestampedTuple<String>> inputStream)
-    {
-      return inputStream
-
-        // Chuck the stream into session windows.
-        .window(new WindowOption.SessionWindows(Duration.standardHours(1)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
-
-        // Count the number of edits for a user within one session.
-        .countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>()
-        {
-          @Override
-          public Tuple.TimestampedTuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input)
-          {
-            return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(), new KeyValPair<String, Long>(input.getValue(), 1L));
-          }
-        }, name("ComputeSessions"));
-    }
-  }
-
-  /**
-   * A comparator class used for comparing two TempWrapper objects.
-   */
-  public static class Comp implements Comparator<TempWrapper>
-  {
-    @Override
-    public int compare(TempWrapper o1, TempWrapper o2)
-    {
-      return Long.compare(o1.getValue().getValue(), o2.getValue().getValue());
-    }
-  }
-
-  /**
-   * A function to extract timestamp from a TempWrapper object.
-   */
-  // TODO: Need to revisit and change back to using TimestampedTuple.
-  public static class TimestampExtractor implements com.google.common.base.Function<TempWrapper, Long>
-  {
-    @Override
-    public Long apply(@Nullable TempWrapper input)
-    {
-      return input.getTimestamp();
-    }
-  }
-
-  /**
-   * A temporary wrapper to wrap a KeyValPair and a timestamp together to represent a timestamped tuple, the reason
-   * for this is that we cannot resolve a type conflict when calling accumulate(). After the issue resolved, we can
-   * remove this class.
-   */
-  public static class TempWrapper
-  {
-    private KeyValPair<String, Long> value;
-    private Long timestamp;
-
-    public TempWrapper()
-    {
-
-    }
-
-    public TempWrapper(KeyValPair<String, Long> value, Long timestamp)
-    {
-      this.value = value;
-      this.timestamp = timestamp;
-    }
-
-    @Override
-    public String toString()
-    {
-      return this.value + "  -  " + this.timestamp;
-    }
-
-    public Long getTimestamp()
-    {
-      return timestamp;
-    }
-
-    public void setTimestamp(Long timestamp)
-    {
-      this.timestamp = timestamp;
-    }
-
-    public KeyValPair<String, Long> getValue()
-    {
-      return value;
-    }
-
-    public void setValue(KeyValPair<String, Long> value)
-    {
-      this.value = value;
-    }
-  }
-
-  /**
-   * Computes the longest session ending in each month, in this case we use 30 days to represent every month.
-   */
-  private static class TopPerMonth
-      extends CompositeStreamTransform<ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>>
-  {
-
-    @Override
-    public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<Tuple.WindowedTuple<KeyValPair<String, Long>>> inputStream)
-    {
-      TopN<TempWrapper> topN = new TopN<>();
-      topN.setN(10);
-      topN.setComparator(new Comp());
-
-      return inputStream
-
-        // Map the input WindowedTuple to a TempWrapper object.
-        .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, TempWrapper>()
-        {
-          @Override
-          public TempWrapper f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
-          {
-            Window window = input.getWindows().iterator().next();
-            return new TempWrapper(input.getValue(), window.getBeginTimestamp());
-          }
-        }, name("TempWrapper"))
-
-        // Apply window and trigger option again, this time chuck the stream into fixed time windows.
-        .window(new WindowOption.TimeWindows(Duration.standardDays(30)), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(5)))
-
-        // Compute the top 10 user-sessions with most number of edits.
-        .accumulate(topN, name("TopN")).with("timestampExtractor", new TimestampExtractor());
-    }
-  }
-
-  /**
-   * A map function that combine the user and his/her edit session together to a string and use that string as a key
-   * with number of edits in that session as value to create a new key value pair to send to downstream.
-   */
-  static class SessionsToStringsDoFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, Tuple.WindowedTuple<KeyValPair<String, Long>>>
-  {
-    @Override
-    public Tuple.WindowedTuple<KeyValPair<String, Long>> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
-    {
-      Window window = input.getWindows().iterator().next();
-      return new Tuple.WindowedTuple<KeyValPair<String, Long>>(window, new KeyValPair<String, Long>(
-        input.getValue().getKey()  + " : " + window.getBeginTimestamp() + " : " + window.getDurationMillis(),
-        input.getValue().getValue()));
-    }
-  }
-
-  /**
-   * A flatmap function that turns the result into readable format.
-   */
-  static class FormatOutputDoFn implements Function.FlatMapFunction<Tuple.WindowedTuple<List<TempWrapper>>, String>
-  {
-    @Override
-    public Iterable<String> f(Tuple.WindowedTuple<List<TempWrapper>> input)
-    {
-      ArrayList<String> result = new ArrayList<>();
-      for (TempWrapper item : input.getValue()) {
-        String session = item.getValue().getKey();
-        long count = item.getValue().getValue();
-        Window window = input.getWindows().iterator().next();
-        result.add(session + " + " + count + " : " + window.getBeginTimestamp());
-      }
-      return result;
-    }
-  }
-
-  /**
-   * A composite transform that compute the top wikipedia sessions.
-   */
-  public static class ComputeTopSessions extends CompositeStreamTransform<ApexStream<KeyValPair<String, Long>>, WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>>>
-  {
-    @Override
-    public WindowedStream<Tuple.WindowedTuple<List<TempWrapper>>> compose(ApexStream<KeyValPair<String, Long>> inputStream)
-    {
-      return inputStream
-        .map(new ExtractUserAndTimestamp(), name("ExtractUserAndTimestamp"))
-        .addCompositeStreams(new ComputeSessions())
-        .map(new SessionsToStringsDoFn(), name("SessionsToStringsDoFn"))
-        .addCompositeStreams(new TopPerMonth());
-    }
-  }
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    SessionGen sg = new SessionGen();
-    Collector collector = new Collector();
-    StreamFactory.fromInput(sg, sg.output, name("sessionGen"))
-      .addCompositeStreams(new ComputeTopSessions())
-      .print(name("console"))
-      .endWith(collector, collector.input, name("collector")).populateDag(dag);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
deleted file mode 100644
index e6a53d6..0000000
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java
+++ /dev/null
@@ -1,523 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.complete;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-import org.joda.time.Duration;
-
-import org.apache.apex.malhar.lib.window.TriggerOption;
-import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.accumulation.Group;
-import org.apache.apex.malhar.stream.api.ApexStream;
-import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
-import org.apache.apex.malhar.stream.api.WindowedStream;
-import org.apache.apex.malhar.stream.api.function.Function;
-import org.apache.apex.malhar.stream.api.impl.StreamFactory;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
-import static org.apache.apex.malhar.stream.api.Option.Options.name;
-
-/**
- * Beam's TrafficRoutes example.
- *
- * @since 3.5.0
- */
-@ApplicationAnnotation(name = "TrafficRoutes")
-public class TrafficRoutes implements StreamingApplication
-{
-  static Map<String, String> sdStations = buildStationInfo();
-  static final int WINDOW_DURATION = 3;  // Default sliding window duration in minutes
-  static final int WINDOW_SLIDE_EVERY = 1;  // Default window 'slide every' setting in minutes
-
-  /**
-   * This class holds information about a station reading's average speed.
-   */
-  public static class StationSpeed implements Comparable<StationSpeed>
-  {
-    @Nullable
-    String stationId;
-    @Nullable
-    Double avgSpeed;
-    @Nullable
-    Long timestamp;
-
-    public StationSpeed() {}
-
-    public StationSpeed(String stationId, Double avgSpeed, Long timestamp)
-    {
-      this.stationId = stationId;
-      this.avgSpeed = avgSpeed;
-      this.timestamp = timestamp;
-    }
-
-    public void setAvgSpeed(@Nullable Double avgSpeed)
-    {
-      this.avgSpeed = avgSpeed;
-    }
-
-    public void setStationId(@Nullable String stationId)
-    {
-      this.stationId = stationId;
-    }
-
-    public void setTimestamp(@Nullable Long timestamp)
-    {
-      this.timestamp = timestamp;
-    }
-
-    @Nullable
-    public Long getTimestamp()
-    {
-      return timestamp;
-    }
-
-    public String getStationId()
-    {
-      return this.stationId;
-    }
-
-    public Double getAvgSpeed()
-    {
-      return this.avgSpeed;
-    }
-
-    @Override
-    public int compareTo(StationSpeed other)
-    {
-      return Long.compare(this.timestamp, other.timestamp);
-    }
-  }
-
-  /**
-   * This class holds information about a route's speed/slowdown.
-   */
-  static class RouteInfo
-  {
-    @Nullable
-    String route;
-    @Nullable
-    Double avgSpeed;
-    @Nullable
-    Boolean slowdownEvent;
-
-    public RouteInfo()
-    {
-
-    }
-
-    public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent)
-    {
-      this.route = route;
-      this.avgSpeed = avgSpeed;
-      this.slowdownEvent = slowdownEvent;
-    }
-
-    public String getRoute()
-    {
-      return this.route;
-    }
-
-    public Double getAvgSpeed()
-    {
-      return this.avgSpeed;
-    }
-
-    public Boolean getSlowdownEvent()
-    {
-      return this.slowdownEvent;
-    }
-  }
-
-  /**
-   * Extract the timestamp field from the input string, and wrap the input string in a {@link Tuple.TimestampedTuple}
-   * with the extracted timestamp.
-   */
-  static class ExtractTimestamps implements Function.MapFunction<String, Tuple.TimestampedTuple<String>>
-  {
-
-    @Override
-    public Tuple.TimestampedTuple<String> f(String input)
-    {
-      String[] items = input.split(",");
-      String timestamp = tryParseTimestamp(items);
-
-      return new Tuple.TimestampedTuple<>(Long.parseLong(timestamp), input);
-    }
-  }
-
-  /**
-   * Filter out readings for the stations along predefined 'routes', and output
-   * (station, speed info) keyed on route.
-   */
-  static class ExtractStationSpeedFn implements Function.FlatMapFunction<Tuple.TimestampedTuple<String>, KeyValPair<String, StationSpeed>>
-  {
-
-    @Override
-    public Iterable<KeyValPair<String, StationSpeed>> f(Tuple.TimestampedTuple<String> input)
-    {
-
-      ArrayList<KeyValPair<String, StationSpeed>> result = new ArrayList<>();
-      String[] items = input.getValue().split(",");
-      String stationType = tryParseStationType(items);
-      // For this analysis, use only 'main line' station types
-      if (stationType != null && stationType.equals("ML")) {
-        Double avgSpeed = tryParseAvgSpeed(items);
-        String stationId = tryParseStationId(items);
-        // For this simple example, filter out everything but some hardwired routes.
-        if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) {
-          StationSpeed stationSpeed =
-              new StationSpeed(stationId, avgSpeed, input.getTimestamp());
-          // The tuple key is the 'route' name stored in the 'sdStations' hash.
-          KeyValPair<String, StationSpeed> outputValue = new KeyValPair<>(sdStations.get(stationId), stationSpeed);
-          result.add(outputValue);
-        }
-      }
-      return result;
-    }
-  }
-
-  /**
-   * For a given route, track average speed for the window. Calculate whether
-   * traffic is currently slowing down, via a predefined threshold. If a supermajority of
-   * speeds in this sliding window are less than the previous reading we call this a 'slowdown'.
-   * Note: these calculations are for example purposes only, and are unrealistic and oversimplified.
-   */
-  static class GatherStats
-      implements Function.FlatMapFunction<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>, Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>>
-  {
-    @Override
-    public Iterable<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> f(Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>> input)
-    {
-      ArrayList<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> result = new ArrayList<>();
-      String route = input.getValue().getKey();
-      double speedSum = 0.0;
-      int speedCount = 0;
-      int speedups = 0;
-      int slowdowns = 0;
-      List<StationSpeed> infoList = Lists.newArrayList(input.getValue().getValue());
-      // StationSpeeds sort by embedded timestamp.
-      Collections.sort(infoList);
-      Map<String, Double> prevSpeeds = new HashMap<>();
-      // For all stations in the route, sum (non-null) speeds. Keep a count of the non-null speeds.
-      for (StationSpeed item : infoList) {
-        Double speed = item.getAvgSpeed();
-        if (speed != null) {
-          speedSum += speed;
-          speedCount++;
-          Double lastSpeed = prevSpeeds.get(item.getStationId());
-          if (lastSpeed != null) {
-            if (lastSpeed < speed) {
-              speedups += 1;
-            } else {
-              slowdowns += 1;
-            }
-          }
-          prevSpeeds.put(item.getStationId(), speed);
-        }
-      }
-      if (speedCount == 0) {
-        // No average to compute.
-        return result;
-      }
-      double speedAvg = speedSum / speedCount;
-      boolean slowdownEvent = slowdowns >= 2 * speedups;
-      RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent);
-      result.add(new Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>(input.getTimestamp(), new KeyValPair<String, RouteInfo>(route, routeInfo)));
-      return result;
-    }
-  }
-
-  /**
-   * Output Pojo class for outputting result to JDBC.
-   */
-  static class OutputPojo
-  {
-    private Double avgSpeed;
-    private Boolean slowdownEvent;
-    private String key;
-    private Long timestamp;
-
-    public OutputPojo()
-    {
-    }
-
-    public OutputPojo(Double avgSpeed, Boolean slowdownEvent, String key, Long timestamp)
-    {
-      this.avgSpeed = avgSpeed;
-      this.slowdownEvent = slowdownEvent;
-      this.key = key;
-      this.timestamp = timestamp;
-    }
-
-    @Override
-    public String toString()
-    {
-      return key + " + " + avgSpeed + " + " + slowdownEvent + " + " + timestamp;
-    }
-
-    public void setTimestamp(Long timestamp)
-    {
-      this.timestamp = timestamp;
-    }
-
-    public Long getTimestamp()
-    {
-      return timestamp;
-    }
-
-    public void setAvgSpeed(Double avgSpeed)
-    {
-      this.avgSpeed = avgSpeed;
-    }
-
-    public Double getAvgSpeed()
-    {
-      return avgSpeed;
-    }
-
-    public void setKey(String key)
-    {
-      this.key = key;
-    }
-
-    public String getKey()
-    {
-      return key;
-    }
-
-    public void setSlowdownEvent(Boolean slowdownEvent)
-    {
-      this.slowdownEvent = slowdownEvent;
-    }
-
-    public Boolean getSlowdownEvent()
-    {
-      return slowdownEvent;
-    }
-
-  }
-
-  public static class Collector extends BaseOperator
-  {
-    private static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> result = new HashMap<>();
-
-    public static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> getResult()
-    {
-      return result;
-    }
-
-    public final transient DefaultInputPort<OutputPojo> input = new DefaultInputPort<OutputPojo>()
-    {
-      @Override
-      public void process(OutputPojo tuple)
-      {
-        result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getKey()), new KeyValPair<Double, Boolean>(tuple.getAvgSpeed(), tuple.getSlowdownEvent()));
-      }
-    };
-  }
-
-  /**
-   * Format the results of the slowdown calculations to a OutputPojo.
-   */
-  static class FormatStatsFn implements Function.MapFunction<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>, OutputPojo>
-  {
-    @Override
-    public OutputPojo f(Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>> input)
-    {
-      RouteInfo routeInfo = input.getValue().getValue();
-      OutputPojo row = new OutputPojo(routeInfo.getAvgSpeed(), routeInfo.getSlowdownEvent(), input.getValue().getKey(), input.getTimestamp());
-      return row;
-    }
-  }
-
-
-  /**
-   * This composite transformation extracts speed info from traffic station readings.
-   * It groups the readings by 'route' and analyzes traffic slowdown for that route.
-   * Lastly, it formats the results for JDBC.
-   */
-  static class TrackSpeed extends
-      CompositeStreamTransform<WindowedStream<KeyValPair<String, StationSpeed>>, WindowedStream<OutputPojo>>
-  {
-    @Override
-    public WindowedStream<OutputPojo> compose(WindowedStream<KeyValPair<String, StationSpeed>> inputStream)
-    {
-      // Apply a GroupByKey transform to collect a list of all station
-      // readings for a given route.
-      WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>> timeGroup =
-          inputStream
-          .accumulateByKey(new Group<StationSpeed>(), new Function.ToKeyValue<KeyValPair<String, StationSpeed>, String, StationSpeed>()
-          {
-            @Override
-            public Tuple<KeyValPair<String, StationSpeed>> f(KeyValPair<String, StationSpeed> input)
-            {
-              return new Tuple.TimestampedTuple<>(input.getValue().getTimestamp(), input);
-            }
-          }, name("GroupByKey"));
-
-      // Analyze 'slowdown' over the route readings.
-      WindowedStream<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> stats = timeGroup
-          .flatMap(new GatherStats(), name("GatherStats"));
-
-      // Format the results for writing to JDBC table.
-      WindowedStream<OutputPojo> results = stats.map(new FormatStatsFn(), name("FormatStatsFn"));
-
-      return results;
-    }
-  }
-
-
-  private static Double tryParseAvgSpeed(String[] inputItems)
-  {
-    try {
-      return Double.parseDouble(tryParseString(inputItems, 3));
-    } catch (NumberFormatException e) {
-      return null;
-    } catch (NullPointerException e) {
-      return null;
-    }
-  }
-
-  private static String tryParseStationType(String[] inputItems)
-  {
-    return tryParseString(inputItems, 2);
-  }
-
-  private static String tryParseStationId(String[] inputItems)
-  {
-    return tryParseString(inputItems, 1);
-  }
-
-  private static String tryParseTimestamp(String[] inputItems)
-  {
-    return tryParseString(inputItems, 0);
-  }
-
-  private static String tryParseString(String[] inputItems, int index)
-  {
-    return inputItems.length >= index ? inputItems[index] : null;
-  }
-
-  /**
-   * Define some small hard-wired San Diego 'routes' to track based on sensor station ID.
-   */
-  private static Map<String, String> buildStationInfo()
-  {
-    Map<String, String> stations = new Hashtable<String, String>();
-    stations.put("1108413", "SDRoute1"); // from freeway 805 S
-    stations.put("1108699", "SDRoute2"); // from freeway 78 E
-    stations.put("1108702", "SDRoute2");
-    return stations;
-  }
-
-  /**
-   * A dummy generator to generate some traffic information.
-   */
-  public static class InfoGen extends BaseOperator implements InputOperator
-  {
-    public transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
-
-    private String[] stationTypes = new String[]{"ML", "BL", "GL"};
-    private int[] stationIDs = new int[]{1108413, 1108699, 1108702};
-    private double ave = 55.0;
-    private long timestamp;
-    private static final Duration RAND_RANGE = Duration.standardMinutes(10);
-    private static int tupleCount = 0;
-
-    public static int getTupleCount()
-    {
-      return tupleCount;
-    }
-
-    @Override
-    public void setup(Context.OperatorContext context)
-    {
-      tupleCount = 0;
-      timestamp = System.currentTimeMillis();
-    }
-
-    @Override
-    public void emitTuples()
-    {
-      for (String stationType : stationTypes) {
-        for (int stationID : stationIDs) {
-          double speed = Math.random() * 20 + ave;
-          long time = (long)(Math.random() * RAND_RANGE.getMillis()) + timestamp;
-          try {
-            output.emit(time + "," + stationID + "," + stationType + "," + speed);
-            tupleCount++;
-
-            Thread.sleep(50);
-          } catch (Exception e) {
-            // Ignore it
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    InfoGen infoGen = new InfoGen();
-    Collector collector = new Collector();
-
-    // Create a stream from the input operator.
-    ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(infoGen, infoGen.output, name("infoGen"))
-
-        // Extract the timestamp from the input and wrap it into a TimestampedTuple.
-        .map(new ExtractTimestamps(), name("ExtractTimestamps"));
-
-    stream
-        // Extract the average speed of a station.
-        .flatMap(new ExtractStationSpeedFn(), name("ExtractStationSpeedFn"))
-
-        // Apply window and trigger option.
-        .window(new WindowOption.SlidingTimeWindows(Duration.standardMinutes(WINDOW_DURATION), Duration.standardMinutes(WINDOW_SLIDE_EVERY)), new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(5000)).accumulatingFiredPanes())
-
-        // Apply TrackSpeed composite transformation to compute the route information.
-        .addCompositeStreams(new TrackSpeed())
-
-        // print the result to console.
-        .print(name("console"))
-        .endWith(collector, collector.input, name("Collector"))
-        .populateDag(dag);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
deleted file mode 100644
index 4fc80ea..0000000
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoComplete.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.complete;
-
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetEncoder;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.joda.time.Duration;
-
-import org.apache.apex.malhar.lib.window.TriggerOption;
-import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.lib.window.Window;
-import org.apache.apex.malhar.lib.window.WindowOption;
-
-import org.apache.apex.malhar.stream.api.ApexStream;
-import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
-import org.apache.apex.malhar.stream.api.WindowedStream;
-import org.apache.apex.malhar.stream.api.function.Function;
-import org.apache.apex.malhar.stream.api.impl.StreamFactory;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.twitter.TwitterSampleInput;
-import com.datatorrent.lib.util.KeyValPair;
-
-import static org.apache.apex.malhar.stream.api.Option.Options.name;
-
-/**
- * Auto Complete Hashtag Demo with real time twitter input. In order to run this application, you need to create an app
- * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and enter those information
- * accordingly in /resources/META-INF/properties.xml.
- *
- * The authentication requires following 4 information.
- * Your application consumer key,
- * Your application consumer secret,
- * Your twitter access token, and
- * Your twitter access token secret.
- *
- * @since 3.5.0
- */
-@ApplicationAnnotation(name = "TwitterAutoComplete")
-public class TwitterAutoComplete implements StreamingApplication
-{
-  /**
-   * Check whether every character in a string is ASCII encoding.
-   */
-  public static class StringUtils
-  {
-    static CharsetEncoder encoder = Charset.forName("US-ASCII").newEncoder();
-
-    public static boolean isAscii(String v)
-    {
-      return encoder.canEncode(v);
-    }
-  }
-
-  /**
-   * FlapMap Function to extract all hashtags from a text form tweet.
-   */
-  private static class ExtractHashtags implements Function.FlatMapFunction<String, String>
-  {
-
-    @Override
-    public Iterable<String> f(String input)
-    {
-      List<String> result = new LinkedList<>();
-      Matcher m = Pattern.compile("#\\S+").matcher(input);
-      while (m.find()) {
-        result.add(m.group().substring(1));
-      }
-      return result;
-    }
-  }
-
-  /**
-   * Lower latency, but more expensive.
-   */
-  private static class ComputeTopFlat
-      extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
-  {
-    private final int candidatesPerPrefix;
-    private final int minPrefix;
-
-    public ComputeTopFlat(int candidatesPerPrefix, int minPrefix)
-    {
-      this.candidatesPerPrefix = candidatesPerPrefix;
-      this.minPrefix = minPrefix;
-    }
-
-    @Override
-    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(
-        WindowedStream<CompletionCandidate> input)
-    {
-      TopNByKey topNByKey = new TopNByKey();
-      topNByKey.setN(candidatesPerPrefix);
-      return input
-        .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix, 3), name("Extract Prefixes"))
-        .accumulateByKey(topNByKey, new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String, CompletionCandidate>()
-        {
-          @Override
-          public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
-          {
-            // TODO: Should be removed after Auto-wrapping is supported.
-            return new Tuple.WindowedTuple<>(Window.GlobalWindow.INSTANCE, tuple);
-          }
-        }, name("TopNByKey"));
-    }
-  }
-
-  /**
-   * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output
-   * KeyValPairs of the prefix and the CompletionCandidate
-   */
-  private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>>
-  {
-    private final int minPrefix;
-    private final int maxPrefix;
-
-    public AllPrefixes()
-    {
-      this(0, Integer.MAX_VALUE);
-    }
-
-    public AllPrefixes(int minPrefix)
-    {
-      this(minPrefix, Integer.MAX_VALUE);
-    }
-
-    public AllPrefixes(int minPrefix, int maxPrefix)
-    {
-      this.minPrefix = minPrefix;
-      this.maxPrefix = maxPrefix;
-    }
-
-    @Override
-    public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input)
-    {
-      List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>();
-      String word = input.getValue();
-      for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {
-        result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input));
-      }
-      return result;
-    }
-  }
-
-  /**
-   * A Composite stream transform that takes as input a list of tokens and returns
-   * the most common tokens per prefix.
-   */
-  public static class ComputeTopCompletions
-      extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
-  {
-    private final int candidatesPerPrefix;
-    private final boolean recursive;
-
-    protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive)
-    {
-      this.candidatesPerPrefix = candidatesPerPrefix;
-      this.recursive = recursive;
-    }
-
-    public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive)
-    {
-      return new ComputeTopCompletions(candidatesPerPrefix, recursive);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream)
-    {
-
-      ApexStream<CompletionCandidate> candidates = inputStream
-          .countByKey(new Function.ToKeyValue<String, String, Long>()
-          {
-            @Override
-            public Tuple<KeyValPair<String, Long>> f(String input)
-            {
-              return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
-            }
-          }, name("Hashtag Count"))
-          .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
-          {
-            @Override
-            public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
-            {
-              return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue());
-            }
-          }, name("KeyValPair to CompletionCandidate"));
-
-      return candidates.addCompositeStreams(new ComputeTopFlat(candidatesPerPrefix, 1));
-
-    }
-  }
-
-  /**
-   * FilterFunction to filter out tweets with non-acsii characters.
-   */
-  static class ASCIIFilter implements Function.FilterFunction<String>
-  {
-    @Override
-    public boolean f(String input)
-    {
-      return StringUtils.isAscii(input);
-    }
-  }
-
-  /**
-   * Populate the dag with High-Level API.
-   * @param dag
-   * @param conf
-   */
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    TwitterSampleInput input = new TwitterSampleInput();
-
-    WindowOption windowOption = new WindowOption.GlobalWindow();
-
-    ApexStream<String> tags = StreamFactory.fromInput(input, input.text, name("tweetSampler"))
-        .filter(new ASCIIFilter(), name("ACSII Filter"))
-        .flatMap(new ExtractHashtags(), name("Extract Hashtags"));
-
-    ApexStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> s =
-        tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(10)))
-        .addCompositeStreams(ComputeTopCompletions.top(10, true)).print();
-
-    s.populateDag(dag);
-  }
-}


Mime
View raw message