apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [26/30] apex-malhar git commit: Renamed demos to examples. Packages and artifactid names are changed as suggested.
Date Tue, 07 Mar 2017 06:58:31 GMT
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
deleted file mode 100644
index bfdb268..0000000
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.cookbook;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.apex.malhar.lib.window.TriggerOption;
-import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
-import org.apache.apex.malhar.stream.api.ApexStream;
-import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
-import org.apache.apex.malhar.stream.api.WindowedStream;
-import org.apache.apex.malhar.stream.api.function.Function;
-import org.apache.apex.malhar.stream.api.impl.StreamFactory;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
-import static org.apache.apex.malhar.stream.api.Option.Options.name;
-
-/**
- * An example that reads the public 'Shakespeare' data, and for each word in
- * the dataset that is over a given length, generates a string containing the
- * list of play names in which that word appears
- *
- * <p>Concepts: the combine transform, which lets you combine the values in a
- * key-grouped Collection
- *
- *
- * @since 3.5.0
- */
-@ApplicationAnnotation(name = "CombinePerKeyExamples")
-public class CombinePerKeyExamples implements StreamingApplication
-{
-  // Use the shakespeare public BigQuery sample
-  private static final String SHAKESPEARE_TABLE = "publicdata:samples.shakespeare";
-  // We'll track words >= this word length across all plays in the table.
-  private static final int MIN_WORD_LENGTH = 0;
-
-  /**
-   * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH,
-   * outputs word, play_name.
-   */
-  static class ExtractLargeWordsFn implements Function.MapFunction<SampleBean, KeyValPair<String, String>>
-  {
-
-    @Override
-    public KeyValPair<String, String> f(SampleBean input)
-    {
-      String playName = input.getCorpus();
-      String word = input.getWord();
-      if (word.length() >= MIN_WORD_LENGTH) {
-        return new KeyValPair<>(word, playName);
-      } else {
-        return null;
-      }
-    }
-  }
-
-
-  /**
-   * Prepares the output data which is in same bean
-   */
-  static class FormatShakespeareOutputFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, String>>, SampleBean>
-  {
-    @Override
-    public SampleBean f(Tuple.WindowedTuple<KeyValPair<String, String>> input)
-    {
-      return new SampleBean(input.getValue().getKey(), input.getValue().getValue());
-    }
-  }
-
-  /**
-   * A reduce function to concat two strings together.
-   */
-  public static class Concat extends ReduceFn<String>
-  {
-    @Override
-    public String reduce(String input1, String input2)
-    {
-      return input1 + ", " + input2;
-    }
-  }
-
-  /**
-   * Reads the public 'Shakespeare' data, and for each word in the dataset
-   * over a given length, generates a string containing the list of play names
-   * in which that word appears.
-   */
-  private static class PlaysForWord extends CompositeStreamTransform<ApexStream<SampleBean>, WindowedStream<SampleBean>>
-  {
-
-    @Override
-    public WindowedStream<SampleBean> compose(ApexStream<SampleBean> inputStream)
-    {
-      return inputStream
-          // Extract words from the input SampleBeam stream.
-          .map(new ExtractLargeWordsFn(), name("ExtractLargeWordsFn"))
-
-          // Apply window and trigger option to the streams.
-          .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
-
-          // Apply reduceByKey transformation to concat the names of all the plays that a word has appeared in together.
-          .reduceByKey(new Concat(), new Function.ToKeyValue<KeyValPair<String,String>, String, String>()
-          {
-            @Override
-            public Tuple<KeyValPair<String, String>> f(KeyValPair<String, String> input)
-            {
-              return new Tuple.PlainTuple<KeyValPair<String, String>>(input);
-            }
-          }, name("Concat"))
-
-          // Format the output back to a SampleBeam object.
-          .map(new FormatShakespeareOutputFn(), name("FormatShakespeareOutputFn"));
-    }
-  }
-
-
-  /**
-   * A Java Beam class that contains information about a word appears in a corpus written by Shakespeare.
-   */
-  public static class SampleBean
-  {
-
-    public SampleBean()
-    {
-
-    }
-
-    public SampleBean(String word, String corpus)
-    {
-      this.word = word;
-      this.corpus = corpus;
-    }
-
-    @Override
-    public String toString()
-    {
-      return this.word + " : "  + this.corpus;
-    }
-
-    private String word;
-
-    private String corpus;
-
-    public void setWord(String word)
-    {
-      this.word = word;
-    }
-
-    public String getWord()
-    {
-      return word;
-    }
-
-    public void setCorpus(String corpus)
-    {
-      this.corpus = corpus;
-    }
-
-    public String getCorpus()
-    {
-      return corpus;
-    }
-  }
-
-  /**
-   * A dummy info generator to generate {@link SampleBean} objects to mimic reading from real 'Shakespeare'
-   * data.
-   */
-  public static class SampleInput extends BaseOperator implements InputOperator
-  {
-
-    public final transient DefaultOutputPort<SampleBean> beanOutput = new DefaultOutputPort();
-    private String[] words = new String[]{"A", "B", "C", "D", "E", "F", "G"};
-    private String[] corpuses = new String[]{"1", "2", "3", "4", "5", "6", "7", "8"};
-    private static int i;
-
-    @Override
-    public void setup(Context.OperatorContext context)
-    {
-      super.setup(context);
-      i = 0;
-    }
-
-    @Override
-    public void emitTuples()
-    {
-      while (i < 1) {
-        for (String word : words) {
-          for (String corpus : corpuses) {
-            try {
-              Thread.sleep(50);
-              beanOutput.emit(new SampleBean(word, corpus));
-            } catch (Exception e) {
-              // Ignore it
-            }
-          }
-        }
-        i++;
-      }
-
-    }
-  }
-
-  public static class Collector extends BaseOperator
-  {
-    private static List<SampleBean> result;
-    private static boolean done = false;
-
-    public static List<SampleBean> getResult()
-    {
-      return result;
-    }
-
-    public static boolean isDone()
-    {
-      return done;
-    }
-
-    @Override
-    public void setup(Context.OperatorContext context)
-    {
-      result = new ArrayList<>();
-      done = false;
-    }
-
-    public final transient DefaultInputPort<SampleBean> input = new DefaultInputPort<SampleBean>()
-    {
-      @Override
-      public void process(SampleBean tuple)
-      {
-        if (tuple.getWord().equals("F")) {
-          done = true;
-        }
-        result.add(tuple);
-      }
-    };
-  }
-
-  /**
-   * Populate dag using High-Level API.
-   * @param dag
-   * @param conf
-   */
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    SampleInput input = new SampleInput();
-    Collector collector = new Collector();
-    StreamFactory.fromInput(input, input.beanOutput, name("input"))
-      .addCompositeStreams(new PlaysForWord())
-      .print(name("console"))
-      .endWith(collector, collector.input, name("Collector"))
-      .populateDag(dag);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
deleted file mode 100644
index 4df5fe7..0000000
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.cookbook;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.joda.time.Duration;
-
-import org.apache.apex.malhar.lib.window.TriggerOption;
-import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.accumulation.RemoveDuplicates;
-import org.apache.apex.malhar.stream.api.ApexStream;
-import org.apache.apex.malhar.stream.api.function.Function;
-import org.apache.apex.malhar.stream.api.impl.StreamFactory;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-
-import static org.apache.apex.malhar.stream.api.Option.Options.name;
-
-/**
- * Beam DeDupExample.
- *
- * @since 3.5.0
- */
-@ApplicationAnnotation(name = "DeDupExample")
-public class DeDupExample implements StreamingApplication
-{
-
-  public static class Collector extends BaseOperator
-  {
-    private static Tuple.WindowedTuple<List<String>> result;
-    private static boolean done = false;
-
-    public static Tuple.WindowedTuple<List<String>> getResult()
-    {
-      return result;
-    }
-
-    public static boolean isDone()
-    {
-      return done;
-    }
-
-    @Override
-    public void setup(Context.OperatorContext context)
-    {
-      super.setup(context);
-      result = new Tuple.WindowedTuple<>();
-      done = false;
-    }
-
-    public transient DefaultInputPort<Tuple.WindowedTuple<List<String>>> input = new DefaultInputPort<Tuple.WindowedTuple<List<String>>>()
-    {
-      @Override
-      public void process(Tuple.WindowedTuple<List<String>> tuple)
-      {
-        result = tuple;
-        if (result.getValue().contains("bye")) {
-          done = true;
-        }
-      }
-    };
-  }
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    Collector collector = new Collector();
-
-    // Create a stream that reads from files in a local folder and output lines one by one to downstream.
-    ApexStream<String> stream = StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput"))
-
-        // Extract all the words from the input line of text.
-        .flatMap(new Function.FlatMapFunction<String, String>()
-        {
-          @Override
-          public Iterable<String> f(String input)
-          {
-            return Arrays.asList(input.split("[\\p{Punct}\\s]+"));
-          }
-        }, name("ExtractWords"))
-
-        // Change the words to lower case, also shutdown the app when the word "bye" is detected.
-        .map(new Function.MapFunction<String, String>()
-        {
-          @Override
-          public String f(String input)
-          {
-            return input.toLowerCase();
-          }
-        }, name("ToLowerCase"));
-
-    // Apply window and trigger option.
-    stream.window(new WindowOption.GlobalWindow(),
-        new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.standardSeconds(1)))
-
-        // Remove the duplicate words and print out the result.
-        .accumulate(new RemoveDuplicates<String>(), name("RemoveDuplicates"))
-        .print(name("console"))
-        .endWith(collector, collector.input)
-        .populateDag(dag);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
deleted file mode 100644
index 834964c..0000000
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/InputPojo.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.cookbook;
-
-/**
- * Tuple class for JDBC input of {@link MaxPerKeyExamples}.
- *
- * @since 3.5.0
- */
-public class InputPojo extends Object
-{
-  private int month;
-  private int day;
-  private int year;
-  private double meanTemp;
-
-  @Override
-  public String toString()
-  {
-    return "PojoEvent [month=" + getMonth() + ", day=" + getDay() + ", year=" + getYear() + ", meanTemp=" + getMeanTemp() + "]";
-  }
-
-  public void setMonth(int month)
-  {
-    this.month = month;
-  }
-
-  public int getMonth()
-  {
-    return this.month;
-  }
-
-  public void setDay(int day)
-  {
-    this.day = day;
-  }
-
-  public int getDay()
-  {
-    return day;
-  }
-
-  public void setYear(int year)
-  {
-    this.year = year;
-  }
-
-  public int getYear()
-  {
-    return year;
-  }
-
-  public void setMeanTemp(double meanTemp)
-  {
-    this.meanTemp = meanTemp;
-  }
-
-  public double getMeanTemp()
-  {
-    return meanTemp;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
deleted file mode 100644
index 9fd9495..0000000
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.cookbook;
-
-import java.util.List;
-
-import org.apache.apex.malhar.lib.window.TriggerOption;
-import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.lib.window.Window;
-import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.lib.window.accumulation.Max;
-import org.apache.apex.malhar.stream.api.ApexStream;
-import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
-import org.apache.apex.malhar.stream.api.WindowedStream;
-import org.apache.apex.malhar.stream.api.function.Function;
-import org.apache.apex.malhar.stream.api.impl.StreamFactory;
-import org.apache.hadoop.conf.Configuration;
-
-import static java.sql.Types.DOUBLE;
-import static java.sql.Types.INTEGER;
-
-import com.google.common.collect.Lists;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.db.jdbc.JdbcFieldInfo;
-import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator;
-import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator;
-import com.datatorrent.lib.db.jdbc.JdbcStore;
-import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
-import com.datatorrent.lib.util.FieldInfo;
-import com.datatorrent.lib.util.KeyValPair;
-
-import static org.apache.apex.malhar.stream.api.Option.Options.name;
-
-/**
- * MaxPerKeyExamples Application from Beam
- *
- * @since 3.5.0
- */
-@ApplicationAnnotation(name = "MaxPerKeyExamples")
-public class MaxPerKeyExamples implements StreamingApplication
-{
-
-  /**
-   *  A map function to extract the mean temperature from {@link InputPojo}.
-   */
-  public static class ExtractTempFn implements Function.MapFunction<InputPojo, KeyValPair<Integer, Double>>
-  {
-    @Override
-    public KeyValPair<Integer, Double> f(InputPojo row)
-    {
-      Integer month = row.getMonth();
-      Double meanTemp = row.getMeanTemp();
-      return new KeyValPair<Integer, Double>(month, meanTemp);
-    }
-  }
-
-
-  /**
-   * A map function to format output to {@link OutputPojo}.
-   */
-  public static class FormatMaxesFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<Integer, Double>>, OutputPojo>
-  {
-    @Override
-    public OutputPojo f(Tuple.WindowedTuple<KeyValPair<Integer, Double>> input)
-    {
-      OutputPojo row = new OutputPojo();
-      row.setMonth(input.getValue().getKey());
-      row.setMeanTemp(input.getValue().getValue());
-      return row;
-    }
-  }
-
-  /**
-   * A composite transformation to perform three tasks:
-   * 1. extract the month and its mean temperature from input pojo.
-   * 2. find the maximum mean temperature for every month.
-   * 3. format the result to a output pojo object.
-   */
-  public static class MaxMeanTemp extends CompositeStreamTransform<WindowedStream<InputPojo>, WindowedStream<OutputPojo>>
-  {
-    @Override
-    public WindowedStream<OutputPojo> compose(WindowedStream<InputPojo> rows)
-    {
-      // InputPojo... => <month, meanTemp> ...
-      WindowedStream<KeyValPair<Integer, Double>> temps = rows.map(new ExtractTempFn(), name("ExtractTempFn"));
-
-      // month, meanTemp... => <month, max mean temp>...
-      WindowedStream<Tuple.WindowedTuple<KeyValPair<Integer, Double>>> tempMaxes =
-          temps.accumulateByKey(new Max<Double>(),
-          new Function.ToKeyValue<KeyValPair<Integer, Double>, Integer, Double>()
-            {
-              @Override
-              public Tuple<KeyValPair<Integer, Double>> f(KeyValPair<Integer, Double> input)
-              {
-                return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GlobalWindow.INSTANCE, input);
-              }
-            }, name("MaxPerMonth"));
-
-      // <month, max>... => OutputPojo...
-      WindowedStream<OutputPojo> results = tempMaxes.map(new FormatMaxesFn(), name("FormatMaxesFn"));
-
-      return results;
-    }
-  }
-
-  /**
-   * Method to set field info for {@link JdbcPOJOInputOperator}.
-   * @return
-   */
-  private List<FieldInfo> addInputFieldInfos()
-  {
-    List<FieldInfo> fieldInfos = Lists.newArrayList();
-    fieldInfos.add(new FieldInfo("MONTH", "month", FieldInfo.SupportType.INTEGER));
-    fieldInfos.add(new FieldInfo("DAY", "day", FieldInfo.SupportType.INTEGER));
-    fieldInfos.add(new FieldInfo("YEAR", "year", FieldInfo.SupportType.INTEGER));
-    fieldInfos.add(new FieldInfo("MEANTEMP", "meanTemp", FieldInfo.SupportType.DOUBLE));
-    return fieldInfos;
-  }
-
-  /**
-   * Method to set field info for {@link JdbcPOJOInsertOutputOperator}.
-   * @return
-   */
-  private List<JdbcFieldInfo> addOutputFieldInfos()
-  {
-    List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
-    fieldInfos.add(new JdbcFieldInfo("MONTH", "month", JdbcFieldInfo.SupportType.INTEGER, INTEGER));
-    fieldInfos.add(new JdbcFieldInfo("MEANTEMP", "meanTemp", JdbcFieldInfo.SupportType.DOUBLE, DOUBLE));
-    return fieldInfos;
-  }
-
-
-  /**
-   * Populate the dag using High-Level API.
-   * @param dag
-   * @param conf
-   */
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    JdbcPOJOInputOperator jdbcInput = new JdbcPOJOInputOperator();
-    jdbcInput.setFieldInfos(addInputFieldInfos());
-
-    JdbcStore store = new JdbcStore();
-    jdbcInput.setStore(store);
-
-    JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator();
-    jdbcOutput.setFieldInfos(addOutputFieldInfos());
-    JdbcTransactionalStore outputStore = new JdbcTransactionalStore();
-    jdbcOutput.setStore(outputStore);
-
-    // Create stream that reads from a Jdbc Input.
-    ApexStream<Object> stream = StreamFactory.fromInput(jdbcInput, jdbcInput.outputPort, name("jdbcInput"))
-
-        // Apply window and trigger option to the stream.
-        .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
-
-        // Because Jdbc Input sends out a stream of Object, need to cast them to InputPojo.
-        .map(new Function.MapFunction<Object, InputPojo>()
-        {
-          @Override
-          public InputPojo f(Object input)
-          {
-            return (InputPojo)input;
-          }
-        }, name("ObjectToInputPojo"))
-
-        // Plug in the composite transformation to the stream to calculate the maximum temperature for each month.
-        .addCompositeStreams(new MaxMeanTemp())
-
-        // Cast the resulted OutputPojo to Object for Jdbc Output to consume.
-        .map(new Function.MapFunction<OutputPojo, Object>()
-        {
-          @Override
-          public Object f(OutputPojo input)
-          {
-            return (Object)input;
-          }
-        }, name("OutputPojoToObject"))
-
-        // Output the result to Jdbc Output.
-        .endWith(jdbcOutput, jdbcOutput.input, name("jdbcOutput"));
-
-    stream.populateDag(dag);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
deleted file mode 100644
index f3d0c64..0000000
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/OutputPojo.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.cookbook;
-
-/**
- * OutputPojo Tuple Class for jdbcOutput of {@link MaxPerKeyExamples}.
- *
- * @since 3.5.0
- */
-public class OutputPojo
-{
-  private int month;
-  private double meanTemp;
-
-  @Override
-  public String toString()
-  {
-    return "PojoEvent [month=" + getMonth() + ", meanTemp=" + getMeanTemp() + "]";
-  }
-
-  public void setMonth(int month)
-  {
-    this.month = month;
-  }
-
-  public int getMonth()
-  {
-    return this.month;
-  }
-
-  public void setMeanTemp(double meanTemp)
-  {
-    this.meanTemp = meanTemp;
-  }
-
-  public double getMeanTemp()
-  {
-    return meanTemp;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
deleted file mode 100644
index 962faa5..0000000
--- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/TriggerExample.java
+++ /dev/null
@@ -1,577 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.cookbook;
-
-import java.util.Date;
-import java.util.Objects;
-
-import org.joda.time.Duration;
-
-import org.apache.apex.malhar.lib.window.TriggerOption;
-import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.lib.window.WindowOption;
-import org.apache.apex.malhar.stream.api.ApexStream;
-import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
-import org.apache.apex.malhar.stream.api.WindowedStream;
-import org.apache.apex.malhar.stream.api.function.Function;
-import org.apache.apex.malhar.stream.api.impl.StreamFactory;
-
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * This example illustrates the basic concepts behind triggering. It shows how to use different
- * trigger definitions to produce partial (speculative) results before all the data is processed and
- * to control when updated results are produced for late data. The example performs a streaming
- * analysis of the data coming in from PubSub and writes the results to BigQuery. It divides the
- * data into {@link Window windows} to be processed, and demonstrates using various kinds of
- * {@link org.apache.beam.sdk.transforms.windowing.Trigger triggers} to control when the results for
- * each window are emitted.
- *
- * <p> This example uses a portion of real traffic data from San Diego freeways. It contains
- * readings from sensor stations set up along each freeway. Each sensor reading includes a
- * calculation of the 'total flow' across all lanes in that freeway direction.
- *
- * <p> Concepts:
- * <pre>
- *   1. The default triggering behavior
- *   2. Late data with the default trigger
- *   3. How to get speculative estimates
- *   4. Combining late data and speculative estimates
- * </pre>
- *
- * <p> Before running this example, it will be useful to familiarize yourself with Dataflow triggers
- * and understand the concept of 'late data',
- * See:  <a href="https://cloud.google.com/dataflow/model/triggers">
- * https://cloud.google.com/dataflow/model/triggers </a> and
- * <a href="https://cloud.google.com/dataflow/model/windowing#Advanced">
- * https://cloud.google.com/dataflow/model/windowing#Advanced </a>
- *
- * <p> The example pipeline reads data from a Pub/Sub topic. By default, running the example will
- * also run an auxiliary pipeline to inject data from the default {@code --input} file to the
- * {@code --pubsubTopic}. The auxiliary pipeline puts a timestamp on the injected data so that the
- * example pipeline can operate on <i>event time</i> (rather than arrival time). The auxiliary
- * pipeline also randomly simulates late data, by setting the timestamps of some of the data
- * elements to be in the past. You may override the default {@code --input} with the file of your
- * choosing or set {@code --input=""} which will disable the automatic Pub/Sub injection, and allow
- * you to use a separate tool to publish to the given topic.
- *
- * <p> The example is configured to use the default Pub/Sub topic and the default BigQuery table
- * from the example common package (there are no defaults for a general Dataflow pipeline).
- * You can override them by using the {@code --pubsubTopic}, {@code --bigQueryDataset}, and
- * {@code --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
- * the example will try to create them.
- *
- * <p> The pipeline outputs its results to a BigQuery table.
- * Here are some queries you can use to see interesting results:
- * Replace {@code <enter_table_name>} in the query below with the name of the BigQuery table.
- * Replace {@code <enter_window_interval>} in the query below with the window interval.
- *
- * <p> To see the results of the default trigger,
- * Note: When you start up your pipeline, you'll initially see results from 'late' data. Wait after
- * the window duration, until the first pane of non-late data has been emitted, to see more
- * interesting results.
- * {@code SELECT * FROM enter_table_name WHERE triggerType = "default" ORDER BY window DESC}
- *
- * <p> To see the late data i.e. dropped by the default trigger,
- * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "withAllowedLateness" and
- * (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processingTime}
- *
- * <p>To see the the difference between accumulation mode and discarding mode,
- * {@code SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND
- * (triggerType = "withAllowedLateness" or triggerType = "sequential") and freeway = "5" ORDER BY
- * window DESC, processingTime}
- *
- * <p> To see speculative results every minute,
- * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "speculative" and freeway = "5"
- * ORDER BY window DESC, processingTime}
- *
- * <p> To see speculative results every five minutes after the end of the window
- * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "sequential" and timing != "EARLY"
- * and freeway = "5" ORDER BY window DESC, processingTime}
- *
- * <p> To see the first and the last pane for a freeway in a window for all the trigger types,
- * {@code SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window}
- *
- * <p> To reduce the number of results for each query we can add additional where clauses.
- * For examples, To see the results of the default trigger,
- * {@code SELECT * FROM <enter_table_name> WHERE triggerType = "default" AND freeway = "5" AND
- * window = "<enter_window_interval>"}
- *
- * <p> The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
- * and then exits.
- *
- * @since 3.5.0
- */
-
-public class TriggerExample
-{
-  //Numeric value of fixed window duration, in minutes
-  public static final int WINDOW_DURATION = 30;
-  // Constants used in triggers.
-  // Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results.
-  // ONE_MINUTE is used only with processing time before the end of the window
-  public static final Duration ONE_MINUTE = Duration.standardMinutes(1);
-  // FIVE_MINUTES is used only with processing time after the end of the window
-  public static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
-  // ONE_DAY is used to specify the amount of lateness allowed for the data elements.
-  public static final Duration ONE_DAY = Duration.standardDays(1);
-
-  /**
-   * This transform demonstrates using triggers to control when data is produced for each window
-   * Consider an example to understand the results generated by each type of trigger.
-   * The example uses "freeway" as the key. Event time is the timestamp associated with the data
-   * element and processing time is the time when the data element gets processed in the pipeline.
-   * For freeway 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window.
-   * Key (freeway) | Value (totalFlow) | event time | processing time
-   * 5             | 50                 | 10:00:03   | 10:00:47
-   * 5             | 30                 | 10:01:00   | 10:01:03
-   * 5             | 30                 | 10:02:00   | 11:07:00
-   * 5             | 20                 | 10:04:10   | 10:05:15
-   * 5             | 60                 | 10:05:00   | 11:03:00
-   * 5             | 20                 | 10:05:01   | 11.07:30
-   * 5             | 60                 | 10:15:00   | 10:27:15
-   * 5             | 40                 | 10:26:40   | 10:26:43
-   * 5             | 60                 | 10:27:20   | 10:27:25
-   * 5             | 60                 | 10:29:00   | 11:11:00
-   *
-   * <p> Dataflow tracks a watermark which records up to what point in event time the data is
-   * complete. For the purposes of the example, we'll assume the watermark is approximately 15m
-   * behind the current processing time. In practice, the actual value would vary over time based
-   * on the systems knowledge of the current PubSub delay and contents of the backlog (data
-   * that has not yet been processed).
-   *
-   * <p> If the watermark is 15m behind, then the window [10:00:00, 10:30:00) (in event time) would
-   * close at 10:44:59, when the watermark passes 10:30:00.
-   */
-  static class CalculateTotalFlow
-      extends CompositeStreamTransform<ApexStream<String>, WindowedStream<SampleBean>>
-  {
-    private int windowDuration;
-
-    CalculateTotalFlow(int windowDuration)
-    {
-      this.windowDuration = windowDuration;
-    }
-
-    @Override
-    public WindowedStream<SampleBean> compose(ApexStream<String> inputStream)
-    {
-      // Concept #1: The default triggering behavior
-      // By default Dataflow uses a trigger which fires when the watermark has passed the end of the
-      // window. This would be written {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
-
-      // The system also defaults to dropping late data -- data which arrives after the watermark
-      // has passed the event timestamp of the arriving element. This means that the default trigger
-      // will only fire once.
-
-      // Each pane produced by the default trigger with no allowed lateness will be the first and
-      // last pane in the window, and will be ON_TIME.
-
-      // The results for the example above with the default trigger and zero allowed lateness
-      // would be:
-      // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
-      // 5             | 260                | 6                 | true    | true   | ON_TIME
-
-      // At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a
-      // result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered
-      // late, and dropped.
-
-      WindowedStream<SampleBean> defaultTriggerResults = inputStream
-          .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
-          new TriggerOption().discardingFiredPanes())
-          .addCompositeStreams(new TotalFlow("default"));
-
-      // Concept #2: Late data with the default trigger
-      // This uses the same trigger as concept #1, but allows data that is up to ONE_DAY late. This
-      // leads to each window staying open for ONE_DAY after the watermark has passed the end of the
-      // window. Any late data will result in an additional pane being fired for that same window.
-
-      // The first pane produced will be ON_TIME and the remaining panes will be LATE.
-      // To definitely get the last pane when the window closes, use
-      // .withAllowedLateness(ONE_DAY, ClosingBehavior.FIRE_ALWAYS).
-
-      // The results for the example above with the default trigger and ONE_DAY allowed lateness
-      // would be:
-      // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
-      // 5             | 260                | 6                 | true    | false  | ON_TIME
-      // 5             | 60                 | 1                 | false   | false  | LATE
-      // 5             | 30                 | 1                 | false   | false  | LATE
-      // 5             | 20                 | 1                 | false   | false  | LATE
-      // 5             | 60                 | 1                 | false   | false  | LATE
-      WindowedStream<SampleBean> withAllowedLatenessResults = inputStream
-          .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
-          new TriggerOption().discardingFiredPanes(),
-          Duration.standardDays(1))
-          .addCompositeStreams(new TotalFlow("withAllowedLateness"));
-
-      // Concept #3: How to get speculative estimates
-      // We can specify a trigger that fires independent of the watermark, for instance after
-      // ONE_MINUTE of processing time. This allows us to produce speculative estimates before
-      // all the data is available. Since we don't have any triggers that depend on the watermark
-      // we don't get an ON_TIME firing. Instead, all panes are either EARLY or LATE.
-
-      // We also use accumulatingFiredPanes to build up the results across each pane firing.
-
-      // The results for the example above for this trigger would be:
-      // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
-      // 5             | 80                 | 2                 | true    | false  | EARLY
-      // 5             | 100                | 3                 | false   | false  | EARLY
-      // 5             | 260                | 6                 | false   | false  | EARLY
-      // 5             | 320                | 7                 | false   | false  | LATE
-      // 5             | 370                | 9                 | false   | false  | LATE
-      // 5             | 430                | 10                | false   | false  | LATE
-
-      ApexStream<SampleBean> speculativeResults = inputStream
-          .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
-              //Trigger fires every minute
-          new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1))
-                  // After emitting each pane, it will continue accumulating the elements so that each
-                  // approximation includes all of the previous data in addition to the newly arrived
-                  // data.
-          .accumulatingFiredPanes(),
-          Duration.standardDays(1))
-          .addCompositeStreams(new TotalFlow("speculative"));
-
-      // Concept #4: Combining late data and speculative estimates
-      // We can put the previous concepts together to get EARLY estimates, an ON_TIME result,
-      // and LATE updates based on late data.
-
-      // Each time a triggering condition is satisfied it advances to the next trigger.
-      // If there are new elements this trigger emits a window under following condition:
-      // > Early approximations every minute till the end of the window.
-      // > An on-time firing when the watermark has passed the end of the window
-      // > Every five minutes of late data.
-
-      // Every pane produced will either be EARLY, ON_TIME or LATE.
-
-      // The results for the example above for this trigger would be:
-      // Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
-      // 5             | 80                 | 2                 | true    | false  | EARLY
-      // 5             | 100                | 3                 | false   | false  | EARLY
-      // 5             | 260                | 6                 | false   | false  | EARLY
-      // [First pane fired after the end of the window]
-      // 5             | 320                | 7                 | false   | false  | ON_TIME
-      // 5             | 430                | 10                | false   | false  | LATE
-
-      // For more possibilities of how to build advanced triggers, see {@link Trigger}.
-      WindowedStream<SampleBean> sequentialResults = inputStream
-          .window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
-              // Speculative every ONE_MINUTE
-          new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1))
-          .withLateFiringsAtEvery(Duration.standardMinutes(5))
-                  // After emitting each pane, it will continue accumulating the elements so that each
-                  // approximation includes all of the previous data in addition to the newly arrived
-                  // data.
-          .accumulatingFiredPanes(),
-          Duration.standardDays(1))
-          .addCompositeStreams(new TotalFlow("sequential"));
-
-      return sequentialResults;
-    }
-
-  }
-
-  //////////////////////////////////////////////////////////////////////////////////////////////////
-  // The remaining parts of the pipeline are needed to produce the output for each
-  // concept above. Not directly relevant to understanding the trigger examples.
-
-  /**
-   * Calculate total flow and number of records for each freeway and format the results to TableRow
-   * objects, to save to BigQuery.
-   */
-  static class TotalFlow extends
-      CompositeStreamTransform<WindowedStream<String>, WindowedStream<SampleBean>>
-  {
-    private String triggerType;
-
-    public TotalFlow(String triggerType)
-    {
-      this.triggerType = triggerType;
-    }
-
-    @Override
-    public WindowedStream<SampleBean> compose(WindowedStream<String> inputStream)
-    {
-
-      WindowedStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = inputStream
-          .groupByKey(new ExtractFlowInfo());
-
-      return flowPerFreeway
-          .map(new Function.MapFunction<KeyValPair<String, Iterable<Integer>>, KeyValPair<String, String>>()
-          {
-            @Override
-            public KeyValPair<String, String> f(KeyValPair<String, Iterable<Integer>> input)
-            {
-              Iterable<Integer> flows = input.getValue();
-              Integer sum = 0;
-              Long numberOfRecords = 0L;
-              for (Integer value : flows) {
-                sum += value;
-                numberOfRecords++;
-              }
-              return new KeyValPair<>(input.getKey(), sum + "," + numberOfRecords);
-            }
-          })
-          .map(new FormatTotalFlow(triggerType));
-    }
-  }
-
-  /**
-   * Format the results of the Total flow calculation to a TableRow, to save to BigQuery.
-   * Adds the triggerType, pane information, processing time and the window timestamp.
-   */
-  static class FormatTotalFlow implements Function.MapFunction<KeyValPair<String, String>, SampleBean>
-  {
-    private String triggerType;
-
-    public FormatTotalFlow(String triggerType)
-    {
-      this.triggerType = triggerType;
-    }
-
-    @Override
-    public SampleBean f(KeyValPair<String, String> input)
-    {
-      String[] values = input.getValue().split(",");
-      //TODO need to have a callback to get the metadata like window id, pane id, timestamps etc.
-      return new SampleBean(triggerType, input.getKey(), Integer.parseInt(values[0]), Long
-          .parseLong(values[1]), null, false, false, null, null, new Date());
-    }
-  }
-
-  public static class SampleBean
-  {
-    public SampleBean()
-    {
-    }
-
-    private String triggerType;
-
-    private String freeway;
-
-    private int totalFlow;
-
-    private long numberOfRecords;
-
-    private String window;
-
-    private boolean isFirst;
-
-    private boolean isLast;
-
-    private Date timing;
-
-    private Date eventTime;
-
-    private Date processingTime;
-
-    @Override
-    public boolean equals(Object o)
-    {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      SampleBean that = (SampleBean)o;
-      return totalFlow == that.totalFlow &&
-          numberOfRecords == that.numberOfRecords &&
-          isFirst == that.isFirst &&
-          isLast == that.isLast &&
-          Objects.equals(triggerType, that.triggerType) &&
-          Objects.equals(freeway, that.freeway) &&
-          Objects.equals(window, that.window) &&
-          Objects.equals(timing, that.timing) &&
-          Objects.equals(eventTime, that.eventTime) &&
-          Objects.equals(processingTime, that.processingTime);
-    }
-
-    @Override
-    public int hashCode()
-    {
-      return Objects
-          .hash(triggerType, freeway, totalFlow, numberOfRecords, window, isFirst, isLast, timing, eventTime,
-            processingTime);
-    }
-
-    public SampleBean(String triggerType, String freeway, int totalFlow, long numberOfRecords, String window, boolean isFirst, boolean isLast, Date timing, Date eventTime, Date processingTime)
-    {
-
-      this.triggerType = triggerType;
-      this.freeway = freeway;
-      this.totalFlow = totalFlow;
-      this.numberOfRecords = numberOfRecords;
-      this.window = window;
-      this.isFirst = isFirst;
-      this.isLast = isLast;
-      this.timing = timing;
-      this.eventTime = eventTime;
-      this.processingTime = processingTime;
-    }
-
-    public String getTriggerType()
-    {
-      return triggerType;
-    }
-
-    public void setTriggerType(String triggerType)
-    {
-      this.triggerType = triggerType;
-    }
-
-    public String getFreeway()
-    {
-      return freeway;
-    }
-
-    public void setFreeway(String freeway)
-    {
-      this.freeway = freeway;
-    }
-
-    public int getTotalFlow()
-    {
-      return totalFlow;
-    }
-
-    public void setTotalFlow(int totalFlow)
-    {
-      this.totalFlow = totalFlow;
-    }
-
-    public long getNumberOfRecords()
-    {
-      return numberOfRecords;
-    }
-
-    public void setNumberOfRecords(long numberOfRecords)
-    {
-      this.numberOfRecords = numberOfRecords;
-    }
-
-    public String getWindow()
-    {
-      return window;
-    }
-
-    public void setWindow(String window)
-    {
-      this.window = window;
-    }
-
-    public boolean isFirst()
-    {
-      return isFirst;
-    }
-
-    public void setFirst(boolean first)
-    {
-      isFirst = first;
-    }
-
-    public boolean isLast()
-    {
-      return isLast;
-    }
-
-    public void setLast(boolean last)
-    {
-      isLast = last;
-    }
-
-    public Date getTiming()
-    {
-      return timing;
-    }
-
-    public void setTiming(Date timing)
-    {
-      this.timing = timing;
-    }
-
-    public Date getEventTime()
-    {
-      return eventTime;
-    }
-
-    public void setEventTime(Date eventTime)
-    {
-      this.eventTime = eventTime;
-    }
-
-    public Date getProcessingTime()
-    {
-      return processingTime;
-    }
-
-    public void setProcessingTime(Date processingTime)
-    {
-      this.processingTime = processingTime;
-    }
-  }
-
-  /**
-   * Extract the freeway and total flow in a reading.
-   * Freeway is used as key since we are calculating the total flow for each freeway.
-   */
-  static class ExtractFlowInfo implements Function.ToKeyValue<String, String, Integer>
-  {
-    @Override
-    public Tuple<KeyValPair<String, Integer>> f(String input)
-    {
-      String[] laneInfo = input.split(",");
-      if (laneInfo[0].equals("timestamp")) {
-        // Header row
-        return null;
-      }
-      if (laneInfo.length < 48) {
-        //Skip the invalid input.
-        return null;
-      }
-      String freeway = laneInfo[2];
-      Integer totalFlow = tryIntegerParse(laneInfo[7]);
-      // Ignore the records with total flow 0 to easily understand the working of triggers.
-      // Skip the records with total flow -1 since they are invalid input.
-      if (totalFlow == null || totalFlow <= 0) {
-        return null;
-      }
-      return new Tuple.PlainTuple<>(new KeyValPair<>(freeway, totalFlow));
-    }
-  }
-
-  private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
-
-  public static void main(String[] args) throws Exception
-  {
-    StreamFactory.fromFolder("some folder")
-        .addCompositeStreams(new CalculateTotalFlow(60));
-
-  }
-
-  private static Integer tryIntegerParse(String number)
-  {
-    try {
-      return Integer.parseInt(number);
-    } catch (NumberFormatException e) {
-      return null;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/main/resources/META-INF/properties.xml b/demos/highlevelapi/src/main/resources/META-INF/properties.xml
deleted file mode 100644
index ead0460..0000000
--- a/demos/highlevelapi/src/main/resources/META-INF/properties.xml
+++ /dev/null
@@ -1,141 +0,0 @@
-<?xml version="1.0"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<configuration>
-  <!-- 
-  <property>
-    <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
-    <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
-  </property>
-  -->
-
-  <!-- Properties for TwitterAutoComplete, please fill out all of them to make the application work -->
-  <property>
-    <name>dt.application.TwitterAutoComplete.operator.tweetSampler.consumerKey</name>
-    <value></value>
-  </property>
-  <property>
-    <name>dt.application.TwitterAutoComplete.operator.tweetSampler.consumerSecret</name>
-    <value></value>
-  </property>
-  <property>
-    <name>dt.application.TwitterAutoComplete.operator.tweetSampler.accessToken</name>
-    <value></value>
-  </property>
-  <property>
-    <name>dt.application.TwitterAutoComplete.operator.tweetSampler.accessTokenSecret</name>
-    <value></value>
-  </property>
-
-  <!-- Properties for StreamingWordExtract -->
-  <property>
-    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.userName</name>
-    <value>root</value>
-  </property>
-  <property>
-    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.password</name>
-    <value>password</value>
-  </property>
-  <property>
-    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.databaseDriver</name>
-    <value>org.hsqldb.jdbcDriver</value>
-  </property>
-  <property>
-    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.batchSize</name>
-    <value>5</value>
-  </property>
-  <property>
-    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.port.input.attr.TUPLE_CLASS</name>
-    <value>org.apache.apex.malhar.stream.sample.complete.PojoEvent</value>
-  </property>
-  <property>
-    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.store.databaseUrl</name>
-    <value>jdbc:hsqldb:mem:test</value>
-  </property>
-  <property>
-    <name>dt.application.StreamingWordExtract.operator.jdbcOutput.prop.tablename</name>
-    <value>Test</value>
-  </property>
-
-  <!-- Properties for MaxPerKeyExamples -->
-  <property>
-    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.userName</name>
-    <value>root</value>
-  </property>
-  <property>
-    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.password</name>
-    <value>password</value>
-  </property>
-  <property>
-    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.databaseDriver</name>
-    <value>org.hsqldb.jdbcDriver</value>
-  </property>
-  <property>
-    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.batchSize</name>
-    <value>5</value>
-  </property>
-  <property>
-    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.port.outputPort.attr.TUPLE_CLASS</name>
-    <value>org.apache.apex.malhar.stream.sample.cookbook.InputPojo</value>
-  </property>
-  <property>
-    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.store.databaseUrl</name>
-    <value>jdbc:hsqldb:mem:test</value>
-  </property>
-  <property>
-    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.tableName</name>
-    <value>InputTable</value>
-  </property>
-  <property>
-    <name>dt.application.MaxPerKeyExamples.operator.jdbcInput.prop.query</name>
-    <value>SELECT * FROM InputTable;</value>
-  </property>
-  <property>
-    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.userName</name>
-    <value>root</value>
-  </property>
-  <property>
-    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.password</name>
-    <value>password</value>
-  </property>
-  <property>
-    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.databaseDriver</name>
-    <value>org.hsqldb.jdbcDriver</value>
-  </property>
-  <property>
-    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.batchSize</name>
-    <value>5</value>
-  </property>
-  <property>
-    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.port.input.attr.TUPLE_CLASS</name>
-    <value>org.apache.apex.malhar.stream.sample.cookbook.OutputPojo</value>
-  </property>
-  <property>
-    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.store.databaseUrl</name>
-    <value>jdbc:hsqldb:mem:test</value>
-  </property>
-  <property>
-    <name>dt.application.MaxPerKeyExamples.operator.jdbcOutput.prop.tablename</name>
-    <value>OutputTable</value>
-  </property>
-
-</configuration>
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
deleted file mode 100644
index c078683..0000000
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/MinimalWordCountTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample;
-
-import java.util.concurrent.Callable;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.stram.StramLocalCluster;
-
-/**
- * Test for {@link MinimalWordCount}.
- */
-public class MinimalWordCountTest
-{
-  @Test
-  public void MinimalWordCountTest() throws Exception
-  {
-    LocalMode lma = LocalMode.newInstance();
-    Configuration conf = new Configuration(false);
-    conf.set("dt.application.MinimalWordCount.operator.console.silent", "true");
-    MinimalWordCount app = new MinimalWordCount();
-
-    lma.prepareDAG(app, conf);
-
-    LocalMode.Controller lc = lma.getController();
-    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
-    {
-      @Override
-      public Boolean call() throws Exception
-      {
-        return MinimalWordCount.Collector.isDone();
-      }
-    });
-
-    lc.run(10000);
-
-    Assert.assertTrue(MinimalWordCount.Collector.result.get("error") == 7);
-    Assert.assertTrue(MinimalWordCount.Collector.result.get("word") == 119);
-    Assert.assertTrue(MinimalWordCount.Collector.result.get("bye") == 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
deleted file mode 100644
index f0c51f6..0000000
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/WindowedWordCountTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample;
-
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.lib.util.KeyValPair;
-import com.datatorrent.stram.StramLocalCluster;
-
-/**
- * Testing the TwitterAutoComplete Application. In order to run this test, you need to create an app
- * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and set the following properties
- * for the application before running it:
- * Your application consumer key,
- * Your application consumer secret,
- * Your twitter access token, and
- * Your twitter access token secret.
- */
-public class WindowedWordCountTest
-{
-  @Test
-  public void WindowedWordCountTest() throws Exception
-  {
-    LocalMode lma = LocalMode.newInstance();
-    Configuration conf = new Configuration(false);
-    conf.set("dt.application.WindowedWordCount.operator.console.silent", "true");
-    lma.prepareDAG(new WindowedWordCount(), conf);
-    LocalMode.Controller lc = lma.getController();
-    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
-    {
-      @Override
-      public Boolean call() throws Exception
-      {
-        return WindowedWordCount.Collector.isDone();
-      }
-    });
-
-    lc.run(60000);
-
-    Assert.assertEquals(127, countSum(WindowedWordCount.Collector.getResult()));
-    Assert.assertEquals(28, countSumWord(WindowedWordCount.Collector.getResult(), "word2"));
-    Assert.assertEquals(7, countSumWord(WindowedWordCount.Collector.getResult(), "error"));
-    Assert.assertEquals(21, countSumWord(WindowedWordCount.Collector.getResult(), "word9"));
-    Assert.assertEquals(1, countSumWord(WindowedWordCount.Collector.getResult(), "bye"));
-  }
-
-  public long countSum(Map<KeyValPair<Long, String>, Long> map)
-  {
-    long sum = 0;
-    for (long count : map.values()) {
-      sum += count;
-    }
-    return sum;
-  }
-
-  public long countSumWord(Map<KeyValPair<Long, String>, Long> map, String word)
-  {
-    long sum = 0;
-    for (Map.Entry<KeyValPair<Long, String>, Long> entry : map.entrySet()) {
-      if (entry.getKey().getValue().equals(word)) {
-        sum += entry.getValue();
-      }
-    }
-    return sum;
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
deleted file mode 100644
index 4ed2d5d..0000000
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/AutoCompleteTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.complete;
-
-import java.util.concurrent.Callable;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.stram.StramLocalCluster;
-
-/**
- * Testing the AutoComplete Application
- */
-public class AutoCompleteTest
-{
-
-  @Test
-  public void AutoCompleteTest() throws Exception
-  {
-    LocalMode lma = LocalMode.newInstance();
-    Configuration conf = new Configuration(false);
-    conf.set("dt.application.AutoComplete.operator.console.silent", "true");
-    lma.prepareDAG(new AutoComplete(), conf);
-    LocalMode.Controller lc = lma.getController();
-
-    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
-    {
-      @Override
-      public Boolean call() throws Exception
-      {
-        return AutoComplete.Collector.isDone();
-      }
-    });
-
-    lc.run(200000);
-
-    Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("had"));
-    Assert.assertTrue(AutoComplete.Collector.getResult().containsKey("hadoop"));
-    Assert.assertEquals(2, AutoComplete.Collector.getResult().get("mapreduce").get(0).getCount());
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
deleted file mode 100644
index dc9cdec..0000000
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/StreamingWordExtractTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.complete;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.concurrent.Callable;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.base.Throwables;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
-import com.datatorrent.stram.StramLocalCluster;
-
-/**
- * Testing StreamingWordExtract application
- */
-public class StreamingWordExtractTest
-{
-  private static final String TUPLE_CLASS = "org.apache.apex.malhar.stream.sample.complete.PojoEvent";
-  private static final String DB_DRIVER = "org.h2.Driver";
-  private static final String DB_URL = "jdbc:h2:~/test";
-  private static final String TABLE_NAME = "Test";
-  private static final String USER_NAME = "root";
-  private static final String PSW = "password";
-
-  @BeforeClass
-  public static void setup()
-  {
-    try {
-      Class.forName(DB_DRIVER).newInstance();
-
-      Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
-      Statement stmt = con.createStatement();
-
-      String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
-          + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
-          + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
-          + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
-          + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
-          + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
-          + ")";
-      stmt.executeUpdate(createMetaTable);
-
-      String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
-          + "(STRINGVALUE VARCHAR(255))";
-      stmt.executeUpdate(createTable);
-
-    } catch (Throwable e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @After
-  public void cleanTable()
-  {
-    try {
-      Connection con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
-      Statement stmt = con.createStatement();
-      String dropTable = "drop table " + TABLE_NAME;
-      stmt.executeUpdate(dropTable);
-    } catch (SQLException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public void setConfig(Configuration conf)
-  {
-    conf.set("dt.operator.jdbcOutput.prop.store.userName", USER_NAME);
-    conf.set("dt.operator.jdbcOutput.prop.store.password", PSW);
-    conf.set("dt.operator.jdbcOutput.prop.store.databaseDriver", DB_DRIVER);
-    conf.set("dt.operator.jdbcOutput.prop.batchSize", "5");
-    conf.set("dt.operator.jdbcOutput.port.input.attr.TUPLE_CLASS", TUPLE_CLASS);
-    conf.set("dt.operator.jdbcOutput.prop.store.databaseUrl", DB_URL);
-    conf.set("dt.operator.jdbcOutput.prop.tablename", TABLE_NAME);
-  }
-
-  public int getNumOfEventsInStore()
-  {
-    Connection con;
-    try {
-      con = DriverManager.getConnection(DB_URL,USER_NAME,PSW);
-      Statement stmt = con.createStatement();
-
-      String countQuery = "SELECT count(*) from " + TABLE_NAME;
-      ResultSet resultSet = stmt.executeQuery(countQuery);
-      resultSet.next();
-      return resultSet.getInt(1);
-    } catch (SQLException e) {
-      throw new RuntimeException("fetching count", e);
-    }
-  }
-
-  @Test
-  public void StreamingWordExtractTest() throws Exception
-  {
-    LocalMode lma = LocalMode.newInstance();
-    Configuration conf = new Configuration(false);
-    setConfig(conf);
-    StreamingWordExtract app = new StreamingWordExtract();
-    lma.prepareDAG(app, conf);
-    LocalMode.Controller lc = lma.getController();
-
-    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
-    {
-      @Override
-      public Boolean call() throws Exception
-      {
-        return getNumOfEventsInStore() == 36;
-      }
-    });
-
-    lc.run(10000);
-
-    Assert.assertEquals(app.getWordCount(), getNumOfEventsInStore());
-    Assert.assertEquals(app.getEntriesMapped(), getNumOfEventsInStore());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
deleted file mode 100644
index fddf511..0000000
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessionsTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.complete;
-
-import java.util.List;
-import java.util.concurrent.Callable;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.stram.StramLocalCluster;
-
-/**
- * Testing the {@link TopWikipediaSessions} Application.
- */
-public class TopWikipediaSessionsTest
-{
-  @Test
-  public void TopWikipediaSessionsTest() throws Exception
-  {
-    LocalMode lma = LocalMode.newInstance();
-    Configuration conf = new Configuration(false);
-    conf.set("dt.application.TopWikipediaSessions.operator.console.silent", "true");
-    lma.prepareDAG(new TopWikipediaSessions(), conf);
-    LocalMode.Controller lc = lma.getController();
-
-    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
-    {
-      @Override
-      public Boolean call() throws Exception
-      {
-        return TopWikipediaSessions.SessionGen.getTupleCount() >= 250;
-      }
-    });
-
-    lc.run(30000);
-
-    for (int i = 0; i < TopWikipediaSessions.Collector.getResult().size(); i++) {
-      Assert.assertTrue(isInOrder(TopWikipediaSessions.Collector.getResult().get(i)));
-    }
-  }
-
-  public boolean isInOrder(List<TopWikipediaSessions.TempWrapper> input)
-  {
-    if (input.size() == 0 || input.size() == 1) {
-      return true;
-    }
-    for (int i = 0; i < input.size() - 2; i++) {
-      if (input.get(i).getValue().getValue() < input.get(i + 1).getValue().getValue()) {
-        return false;
-      }
-    }
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
deleted file mode 100644
index 766fa60..0000000
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutesTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.complete;
-
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.lib.util.KeyValPair;
-import com.datatorrent.stram.StramLocalCluster;
-
-/**
- * Testing the {@link TrafficRoutes} Application.
- */
-public class TrafficRoutesTest
-{
-
-  @Test
-  public void TrafficRoutesTest() throws Exception
-  {
-    LocalMode lma = LocalMode.newInstance();
-    Configuration conf = new Configuration(false);
-    conf.set("dt.application.TrafficRoutes.operator.console.silent", "true");
-    lma.prepareDAG(new TrafficRoutes(), conf);
-    LocalMode.Controller lc = lma.getController();
-
-    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
-    {
-      @Override
-      public Boolean call() throws Exception
-      {
-        return TrafficRoutes.InfoGen.getTupleCount() >= 100;
-      }
-    });
-
-    lc.run(60000);
-
-    Assert.assertTrue(!TrafficRoutes.Collector.getResult().isEmpty());
-    for (Map.Entry<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> entry : TrafficRoutes.Collector.getResult().entrySet()) {
-      Assert.assertTrue(entry.getValue().getKey() <= 75);
-      Assert.assertTrue(entry.getValue().getKey() >= 55);
-      Assert.assertTrue(entry.getKey().getValue().equals("SDRoute1") || entry.getKey().getValue().equals("SDRoute2"));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java
deleted file mode 100644
index 9ba2f25..0000000
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/complete/TwitterAutoCompleteTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.complete;
-
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.LocalMode;
-
-/**
- * Testing the TwitterAutoComplete Application. In order to run this test, you need to create an app
- * at https://apps.twitter.com, then generate your consumer and access keys and tokens, and set the following properties
- * for the application before running it:
- * Your application consumer key,
- * Your application consumer secret,
- * Your twitter access token, and
- * Your twitter access token secret.
- *
- * This test is mainly for local demonstration purpose. Default time to run the application is 1 minute, please
- * set the time you need to run the application before you run.
- */
-public class TwitterAutoCompleteTest
-{
-  private static final Logger logger = LoggerFactory.getLogger(org.apache.apex.malhar.stream.sample.complete.AutoCompleteTest.class);
-
-  @Test
-  @Ignore
-  public void TwitterAutoCompleteTest() throws Exception
-  {
-    LocalMode lma = LocalMode.newInstance();
-    Configuration conf = new Configuration(false);
-    //uncomment the following lines and change YOUR_XXX to the corresponding information needed.
-    //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.consumerKey", "YOUR_CONSUMERKEY");
-    //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.consumerSecret", "YOUR_CONSUERSECRET");
-    //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.accessToken", "YOUR_ACCESSTOKEN");
-    //conf.set("dt.application.TwitterAutoComplete.operator.tweetSampler.accessTokenSecret", "YOUR_TOKENSECRET");
-    lma.prepareDAG(new TwitterAutoComplete(), conf);
-    LocalMode.Controller lc = lma.getController();
-    long start = System.currentTimeMillis();
-    lc.run(60000); // Set your desired time to run the application here.
-    long end = System.currentTimeMillis();
-    long time = end - start;
-    logger.info("Test used " + time + " ms");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
deleted file mode 100644
index 1e14fff..0000000
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamplesTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.cookbook;
-
-import java.util.concurrent.Callable;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.stram.StramLocalCluster;
-
-/**
- * Test for {@link CombinePerKeyExamples}.
- */
-public class CombinePerKeyExamplesTest
-{
-  @Test
-  public void CombinePerKeyExamplesTest() throws Exception
-  {
-    LocalMode lma = LocalMode.newInstance();
-    Configuration conf = new Configuration(false);
-    conf.set("dt.application.CombinePerKeyExamples.operator.console.silent", "true");
-    CombinePerKeyExamples app = new CombinePerKeyExamples();
-
-    lma.prepareDAG(app, conf);
-
-    LocalMode.Controller lc = lma.getController();
-    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
-    {
-      @Override
-      public Boolean call() throws Exception
-      {
-        return CombinePerKeyExamples.Collector.isDone();
-      }
-    });
-    lc.run(100000);
-
-    Assert.assertTrue(CombinePerKeyExamples.Collector.getResult().get(CombinePerKeyExamples.Collector.getResult().size() - 2).getCorpus().contains("1, 2, 3, 4, 5, 6, 7, 8"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
----------------------------------------------------------------------
diff --git a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java b/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
deleted file mode 100644
index 7f93f50..0000000
--- a/demos/highlevelapi/src/test/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExampleTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.stream.sample.cookbook;
-
-import java.util.concurrent.Callable;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.hadoop.conf.Configuration;
-import com.datatorrent.api.LocalMode;
-
-import com.datatorrent.stram.StramLocalCluster;
-
-
-/**
- * Test for {@link DeDupExample}.
- */
-public class DeDupExampleTest
-{
-  @Test
-  public void DeDupExampleTest() throws Exception
-  {
-    LocalMode lma = LocalMode.newInstance();
-    Configuration conf = new Configuration(false);
-    conf.set("dt.application.DeDupExample.operator.console.silent", "true");
-    DeDupExample app = new DeDupExample();
-    lma.prepareDAG(app, conf);
-    LocalMode.Controller lc = lma.getController();
-    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
-    {
-      @Override
-      public Boolean call() throws Exception
-      {
-        return DeDupExample.Collector.isDone();
-      }
-    });
-    lc.run(50000);
-
-    Assert.assertEquals(9, DeDupExample.Collector.getResult().getValue().size());
-
-  }
-
-}


Mime
View raw message