apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [02/12] apex-malhar git commit: Updated algo & working on math operators
Date Thu, 01 Sep 2016 18:30:46 GMT
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/AverageTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/AverageTest.java b/library/src/test/java/com/datatorrent/lib/math/AverageTest.java
index 1973bec..712bcdc 100644
--- a/library/src/test/java/com/datatorrent/lib/math/AverageTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/AverageTest.java
@@ -21,6 +21,8 @@ package com.datatorrent.lib.math;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.datatorrent.common.util.Pair;
+
 import com.datatorrent.lib.testbench.CollectorTestSink;
 
 /**
@@ -96,7 +98,7 @@ public class AverageTest
 
     Assert.assertEquals("number emitted tuples", 1, averageSink.collectedTuples.size());
     for (Object o : averageSink.collectedTuples) { // count is 12
-      Integer val = ((Number)o).intValue();
+      Number val = ((Pair<? extends Number, Integer>)o).getFirst().intValue();
       Assert.assertEquals("emitted average value was was ", new Integer(1157 / 12), val);
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ChangeAlertKeyValTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertKeyValTest.java b/library/src/test/java/com/datatorrent/lib/math/ChangeAlertKeyValTest.java
deleted file mode 100644
index 7d7842e..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertKeyValTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import org.junit.Assert;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.math.ChangeAlertKeyVal}.
- * <p>
- *
- */
-public class ChangeAlertKeyValTest
-{
-  private static Logger log = LoggerFactory
-      .getLogger(ChangeAlertKeyValTest.class);
-
-  /**
-   * Test node logic emits correct results.
-   */
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    testNodeProcessingSchema(new ChangeAlertKeyVal<String, Integer>());
-    testNodeProcessingSchema(new ChangeAlertKeyVal<String, Double>());
-    testNodeProcessingSchema(new ChangeAlertKeyVal<String, Float>());
-    testNodeProcessingSchema(new ChangeAlertKeyVal<String, Short>());
-    testNodeProcessingSchema(new ChangeAlertKeyVal<String, Long>());
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  public <V extends Number> void testNodeProcessingSchema(
-      ChangeAlertKeyVal<String, V> oper)
-  {
-    CollectorTestSink alertSink = new CollectorTestSink();
-
-    oper.alert.setSink(alertSink);
-    oper.setPercentThreshold(5);
-
-    oper.beginWindow(0);
-    oper.data.process(new KeyValPair<String, V>("a", oper.getValue(200)));
-    oper.data.process(new KeyValPair<String, V>("b", oper.getValue(10)));
-    oper.data.process(new KeyValPair<String, V>("c", oper.getValue(100)));
-
-    oper.data.process(new KeyValPair<String, V>("a", oper.getValue(203)));
-    oper.data.process(new KeyValPair<String, V>("b", oper.getValue(12)));
-    oper.data.process(new KeyValPair<String, V>("c", oper.getValue(101)));
-
-    oper.data.process(new KeyValPair<String, V>("a", oper.getValue(210)));
-    oper.data.process(new KeyValPair<String, V>("b", oper.getValue(12)));
-    oper.data.process(new KeyValPair<String, V>("c", oper.getValue(102)));
-
-    oper.data.process(new KeyValPair<String, V>("a", oper.getValue(231)));
-    oper.data.process(new KeyValPair<String, V>("b", oper.getValue(18)));
-    oper.data.process(new KeyValPair<String, V>("c", oper.getValue(103)));
-    oper.endWindow();
-
-    // One for a, Two for b
-    Assert.assertEquals("number emitted tuples", 3,
-        alertSink.collectedTuples.size());
-
-    double aval = 0;
-    double bval = 0;
-    log.debug("\nLogging tuples");
-    for (Object o : alertSink.collectedTuples) {
-      KeyValPair<String, KeyValPair<Number, Double>> map = (KeyValPair<String, KeyValPair<Number, Double>>)o;
-
-      log.debug(o.toString());
-      if (map.getKey().equals("a")) {
-        KeyValPair<Number, Double> vmap = map.getValue();
-        if (vmap != null) {
-          aval += vmap.getValue().doubleValue();
-        }
-      } else {
-        KeyValPair<Number, Double> vmap = map.getValue();
-        if (vmap != null) {
-          bval += vmap.getValue().doubleValue();
-        }
-      }
-    }
-    Assert.assertEquals("change in a", 10.0, aval,0);
-    Assert.assertEquals("change in a", 70.0, bval,0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ChangeAlertMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertMapTest.java b/library/src/test/java/com/datatorrent/lib/math/ChangeAlertMapTest.java
deleted file mode 100644
index 51f52f0..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertMapTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.math.ChangeAlertMap}.
- * <p>
- *
- */
-public class ChangeAlertMapTest
-{
-  private static Logger log = LoggerFactory.getLogger(ChangeAlertMapTest.class);
-
-  /**
-   * Test node logic emits correct results.
-   */
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    testNodeProcessingSchema(new ChangeAlertMap<String, Integer>());
-    testNodeProcessingSchema(new ChangeAlertMap<String, Double>());
-    testNodeProcessingSchema(new ChangeAlertMap<String, Float>());
-    testNodeProcessingSchema(new ChangeAlertMap<String, Short>());
-    testNodeProcessingSchema(new ChangeAlertMap<String, Long>());
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  public <V extends Number> void testNodeProcessingSchema(
-      ChangeAlertMap<String, V> oper)
-  {
-    CollectorTestSink alertSink = new CollectorTestSink();
-
-    oper.alert.setSink(alertSink);
-    oper.setPercentThreshold(5);
-
-    oper.beginWindow(0);
-    HashMap<String, V> input = new HashMap<String, V>();
-    input.put("a", oper.getValue(200));
-    input.put("b", oper.getValue(10));
-    input.put("c", oper.getValue(100));
-    oper.data.process(input);
-
-    input.clear();
-    input.put("a", oper.getValue(203));
-    input.put("b", oper.getValue(12));
-    input.put("c", oper.getValue(101));
-    oper.data.process(input);
-
-    input.clear();
-    input.put("a", oper.getValue(210));
-    input.put("b", oper.getValue(12));
-    input.put("c", oper.getValue(102));
-    oper.data.process(input);
-
-    input.clear();
-    input.put("a", oper.getValue(231));
-    input.put("b", oper.getValue(18));
-    input.put("c", oper.getValue(103));
-    oper.data.process(input);
-    oper.endWindow();
-
-    // One for a, Two for b
-    Assert.assertEquals("number emitted tuples", 3,
-        alertSink.collectedTuples.size());
-
-    double aval = 0;
-    double bval = 0;
-    log.debug("\nLogging tuples");
-    for (Object o : alertSink.collectedTuples) {
-      HashMap<String, HashMap<Number, Double>> map = (HashMap<String, HashMap<Number, Double>>)o;
-      Assert.assertEquals("map size", 1, map.size());
-      log.debug(o.toString());
-      HashMap<Number, Double> vmap = map.get("a");
-      if (vmap != null) {
-        aval += vmap.get(231.0).doubleValue();
-      }
-      vmap = map.get("b");
-      if (vmap != null) {
-        if (vmap.get(12.0) != null) {
-          bval += vmap.get(12.0).doubleValue();
-        } else {
-          bval += vmap.get(18.0).doubleValue();
-        }
-      }
-    }
-    Assert.assertEquals("change in a", 10.0, aval,0);
-    Assert.assertEquals("change in a", 70.0, bval,0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ChangeAlertTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertTest.java b/library/src/test/java/com/datatorrent/lib/math/ChangeAlertTest.java
deleted file mode 100644
index d127231..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/ChangeAlertTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.math.ChangeAlert}. <p>
- *
- */
-public class ChangeAlertTest
-{
-  private static Logger log = LoggerFactory.getLogger(ChangeAlertTest.class);
-
-  /**
-   * Test node logic emits correct results.
-   */
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    testNodeProcessingSchema(new ChangeAlert<Integer>());
-    testNodeProcessingSchema(new ChangeAlert<Double>());
-    testNodeProcessingSchema(new ChangeAlert<Float>());
-    testNodeProcessingSchema(new ChangeAlert<Short>());
-    testNodeProcessingSchema(new ChangeAlert<Long>());
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  public <V extends Number> void testNodeProcessingSchema(ChangeAlert<V> oper)
-  {
-    CollectorTestSink alertSink = new CollectorTestSink();
-
-    oper.alert.setSink(alertSink);
-    oper.setPercentThreshold(5);
-
-    oper.beginWindow(0);
-    oper.data.process(oper.getValue(10));
-    oper.data.process(oper.getValue(12)); // alert
-    oper.data.process(oper.getValue(12));
-    oper.data.process(oper.getValue(18)); // alert
-    oper.data.process(oper.getValue(0));  // alert
-    oper.data.process(oper.getValue(20)); // this will not alert
-    oper.data.process(oper.getValue(30)); // alert
-
-    oper.endWindow();
-
-    // One for a, Two for b
-    Assert.assertEquals("number emitted tuples", 4, alertSink.collectedTuples.size());
-
-    double aval = 0;
-    log.debug("\nLogging tuples");
-    for (Object o: alertSink.collectedTuples) {
-      KeyValPair<Number, Double> map = (KeyValPair<Number, Double>)o;
-      log.debug(o.toString());
-      aval += map.getValue().doubleValue();
-    }
-    Assert.assertEquals("change in a", 220.0, aval,0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ChangeKeyValTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/ChangeKeyValTest.java b/library/src/test/java/com/datatorrent/lib/math/ChangeKeyValTest.java
deleted file mode 100644
index 6c2151b..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/ChangeKeyValTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.math.ChangeKeyVal}.
- * <p>
- *
- */
-public class ChangeKeyValTest
-{
-  private static Logger log = LoggerFactory.getLogger(ChangeKeyValTest.class);
-
-  /**
-   * Test node logic emits correct results.
-   */
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    testNodeProcessingSchema(new ChangeKeyVal<String, Integer>());
-    testNodeProcessingSchema(new ChangeKeyVal<String, Double>());
-    testNodeProcessingSchema(new ChangeKeyVal<String, Float>());
-    testNodeProcessingSchema(new ChangeKeyVal<String, Short>());
-    testNodeProcessingSchema(new ChangeKeyVal<String, Long>());
-  }
-
-  /**
-   *
-   * @param oper
-   *          key/value pair for comparison.
-   */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  public <V extends Number> void testNodeProcessingSchema(
-      ChangeKeyVal<String, V> oper)
-  {
-    CollectorTestSink changeSink = new CollectorTestSink();
-    CollectorTestSink percentSink = new CollectorTestSink();
-
-    oper.change.setSink(changeSink);
-    oper.percent.setSink(percentSink);
-
-    oper.beginWindow(0);
-    oper.base.process(new KeyValPair<String, V>("a", oper.getValue(2)));
-    oper.base.process(new KeyValPair<String, V>("b", oper.getValue(10)));
-    oper.base.process(new KeyValPair<String, V>("c", oper.getValue(100)));
-
-    oper.data.process(new KeyValPair<String, V>("a", oper.getValue(3)));
-    oper.data.process(new KeyValPair<String, V>("b", oper.getValue(2)));
-    oper.data.process(new KeyValPair<String, V>("c", oper.getValue(4)));
-
-    oper.endWindow();
-
-    // One for each key
-    Assert.assertEquals("number emitted tuples", 3,
-        changeSink.collectedTuples.size());
-    Assert.assertEquals("number emitted tuples", 3,
-        percentSink.collectedTuples.size());
-
-    log.debug("\nLogging tuples");
-    for (Object o : changeSink.collectedTuples) {
-      KeyValPair<String, Number> kv = (KeyValPair<String, Number>)o;
-      if (kv.getKey().equals("a")) {
-        Assert.assertEquals("change in a ", 1.0, kv.getValue());
-      }
-      if (kv.getKey().equals("b")) {
-        Assert.assertEquals("change in b ", -8.0, kv.getValue());
-      }
-      if (kv.getKey().equals("c")) {
-        Assert.assertEquals("change in c ", -96.0, kv.getValue());
-      }
-    }
-
-    for (Object o : percentSink.collectedTuples) {
-      KeyValPair<String, Number> kv = (KeyValPair<String, Number>)o;
-      if (kv.getKey().equals("a")) {
-        Assert.assertEquals("change in a ", 50.0, kv.getValue());
-      }
-      if (kv.getKey().equals("b")) {
-        Assert.assertEquals("change in b ", -80.0, kv.getValue());
-      }
-      if (kv.getKey().equals("c")) {
-        Assert.assertEquals("change in c ", -96.0, kv.getValue());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ChangeTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/ChangeTest.java b/library/src/test/java/com/datatorrent/lib/math/ChangeTest.java
deleted file mode 100644
index 9595a5d..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/ChangeTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.math.Change}.
- * <p>
- *
- */
-public class ChangeTest
-{
-  private static Logger log = LoggerFactory.getLogger(ChangeTest.class);
-
-  /**
-   * Test node logic emits correct results.
-   */
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    testNodeProcessingSchema(new Change<Integer>());
-    testNodeProcessingSchema(new Change<Double>());
-    testNodeProcessingSchema(new Change<Float>());
-    testNodeProcessingSchema(new Change<Short>());
-    testNodeProcessingSchema(new Change<Long>());
-  }
-
-  /**
-   *
-   * @param oper  Data value for comparison.
-   */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  public <V extends Number> void testNodeProcessingSchema(Change<V> oper)
-  {
-    CollectorTestSink changeSink = new CollectorTestSink();
-    CollectorTestSink percentSink = new CollectorTestSink();
-
-    oper.change.setSink(changeSink);
-    oper.percent.setSink(percentSink);
-
-    oper.beginWindow(0);
-    oper.base.process(oper.getValue(10));
-    oper.data.process(oper.getValue(5));
-    oper.data.process(oper.getValue(15));
-    oper.data.process(oper.getValue(20));
-    oper.endWindow();
-
-    Assert.assertEquals("number emitted tuples", 3,
-        changeSink.collectedTuples.size());
-    Assert.assertEquals("number emitted tuples", 3,
-        percentSink.collectedTuples.size());
-
-    log.debug("\nLogging tuples");
-    for (Object o : changeSink.collectedTuples) {
-      log.debug(String.format("change %s", o));
-    }
-    for (Object o : percentSink.collectedTuples) {
-      log.debug(String.format("percent change %s", o));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/CompareExceptMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/CompareExceptMapTest.java b/library/src/test/java/com/datatorrent/lib/math/CompareExceptMapTest.java
deleted file mode 100644
index 46b9609..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/CompareExceptMapTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.math.CompareExceptMap}<p>
- *
- */
-public class CompareExceptMapTest
-{
-  /**
-   * Test node logic emits correct results
-   */
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    testNodeProcessingSchema(new CompareExceptMap<String, Integer>());
-    testNodeProcessingSchema(new CompareExceptMap<String, Double>());
-    testNodeProcessingSchema(new CompareExceptMap<String, Float>());
-    testNodeProcessingSchema(new CompareExceptMap<String, Short>());
-    testNodeProcessingSchema(new CompareExceptMap<String, Long>());
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  public void testNodeProcessingSchema(CompareExceptMap oper)
-  {
-    CountAndLastTupleTestSink compareSink = new CountAndLastTupleTestSink();
-    CountAndLastTupleTestSink exceptSink = new CountAndLastTupleTestSink();
-    oper.compare.setSink(compareSink);
-    oper.except.setSink(exceptSink);
-
-    oper.setKey("a");
-    oper.setValue(3.0);
-    oper.setTypeEQ();
-
-    oper.beginWindow(0);
-    HashMap<String, Number> input = new HashMap<String, Number>();
-    input.put("a", 2);
-    input.put("b", 20);
-    input.put("c", 1000);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 3);
-    input.put("b", 21);
-    input.put("c", 30);
-    oper.data.process(input);
-    oper.endWindow();
-
-    // One for each key
-    Assert.assertEquals("number emitted tuples", 1, exceptSink.count);
-    for (Map.Entry<String, Number> e : ((HashMap<String, Number>)exceptSink.tuple).entrySet()) {
-      if (e.getKey().equals("a")) {
-        Assert.assertEquals("emitted value for 'a' was ", new Double(2), e.getValue().doubleValue(), 0);
-      } else if (e.getKey().equals("b")) {
-        Assert.assertEquals("emitted tuple for 'b' was ", new Double(20), e.getValue().doubleValue(), 0);
-      } else if (e.getKey().equals("c")) {
-        Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), e.getValue().doubleValue(), 0);
-      }
-    }
-
-    Assert.assertEquals("number emitted tuples", 1, compareSink.count);
-    for (Map.Entry<String, Number> e : ((HashMap<String, Number>)compareSink.tuple).entrySet()) {
-      if (e.getKey().equals("a")) {
-        Assert.assertEquals("emitted value for 'a' was ", new Double(3), e.getValue().doubleValue(), 0);
-      } else if (e.getKey().equals("b")) {
-        Assert.assertEquals("emitted tuple for 'b' was ", new Double(21), e.getValue().doubleValue(), 0);
-      } else if (e.getKey().equals("c")) {
-        Assert.assertEquals("emitted tuple for 'c' was ", new Double(30), e.getValue().doubleValue(), 0);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/CompareMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/CompareMapTest.java b/library/src/test/java/com/datatorrent/lib/math/CompareMapTest.java
deleted file mode 100644
index c21019b..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/CompareMapTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.math.CompareMap}<p>
- *
- */
-public class CompareMapTest
-{
-  /**
-   * Test node logic emits correct results
-   */
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    testNodeProcessingSchema(new CompareMap<String, Integer>());
-    testNodeProcessingSchema(new CompareMap<String, Double>());
-    testNodeProcessingSchema(new CompareMap<String, Float>());
-    testNodeProcessingSchema(new CompareMap<String, Short>());
-    testNodeProcessingSchema(new CompareMap<String, Long>());
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  public void testNodeProcessingSchema(CompareMap oper)
-  {
-    CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink();
-    oper.compare.setSink(matchSink);
-    oper.setKey("a");
-    oper.setValue(3.0);
-    oper.setTypeNEQ();
-
-    oper.beginWindow(0);
-    HashMap<String, Number> input = new HashMap<String, Number>();
-
-    input.put("a", 2);
-    input.put("b", 20);
-    input.put("c", 1000);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 3);
-    oper.data.process(input);
-    oper.endWindow();
-
-    // One for each key
-    Assert.assertEquals("number emitted tuples", 1, matchSink.count);
-    for (Map.Entry<String, Number> e : ((HashMap<String, Number>)matchSink.tuple).entrySet()) {
-      if (e.getKey().equals("a")) {
-        Assert.assertEquals("emitted value for 'a' was ", new Double(2), e.getValue().doubleValue(), 0);
-      } else if (e.getKey().equals("b")) {
-        Assert.assertEquals("emitted tuple for 'b' was ", new Double(20), e.getValue().doubleValue(), 0);
-      } else if (e.getKey().equals("c")) {
-        Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), e.getValue().doubleValue(), 0);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/CountKeyValTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/CountKeyValTest.java b/library/src/test/java/com/datatorrent/lib/math/CountKeyValTest.java
deleted file mode 100644
index 0b1d9de..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/CountKeyValTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- *
- * Functional tests for {@link com.datatorrent.lib.math.CountKeyVal}. <p>
- *
- */
-public class CountKeyValTest
-{
-  /**
-   * Test operator logic emits correct results.
-   */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testNodeProcessing()
-  {
-    CountKeyVal<String, Double> oper = new CountKeyVal<String, Double>();
-    CollectorTestSink countSink = new CollectorTestSink();
-    oper.count.setSink(countSink);
-
-    oper.beginWindow(0); //
-
-    oper.data.process(new KeyValPair("a", 2.0));
-    oper.data.process(new KeyValPair("b", 20.0));
-    oper.data.process(new KeyValPair("c", 1000.0));
-    oper.data.process(new KeyValPair("a", 1.0));
-    oper.data.process(new KeyValPair("a", 10.0));
-    oper.data.process(new KeyValPair("b", 5.0));
-    oper.data.process(new KeyValPair("d", 55.0));
-    oper.data.process(new KeyValPair("b", 12.0));
-    oper.data.process(new KeyValPair("d", 22.0));
-    oper.data.process(new KeyValPair("d", 14.2));
-    oper.data.process(new KeyValPair("d", 46.0));
-    oper.data.process(new KeyValPair("e", 2.0));
-    oper.data.process(new KeyValPair("a", 23.0));
-    oper.data.process(new KeyValPair("d", 4.0));
-
-    oper.endWindow(); //
-
-    // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
-    Assert.assertEquals("number emitted tuples", 5, countSink.collectedTuples.size());
-    for (Object o : countSink.collectedTuples) {
-      KeyValPair<String, Integer> e = (KeyValPair<String, Integer>)o;
-      Integer val = (Integer)e.getValue();
-      if (e.getKey().equals("a")) {
-        Assert.assertEquals("emitted value for 'a' was ", 4, val.intValue());
-      } else if (e.getKey().equals("b")) {
-        Assert.assertEquals("emitted tuple for 'b' was ", 3, val.intValue());
-      } else if (e.getKey().equals("c")) {
-        Assert.assertEquals("emitted tuple for 'c' was ", 1, val.intValue());
-      } else if (e.getKey().equals("d")) {
-        Assert.assertEquals("emitted tuple for 'd' was ", 5, val.intValue());
-      } else if (e.getKey().equals("e")) {
-        Assert.assertEquals("emitted tuple for 'e' was ", 1, val.intValue());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/DivisionTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/DivisionTest.java b/library/src/test/java/com/datatorrent/lib/math/DivisionTest.java
index 9fe556f..c7e7c65 100644
--- a/library/src/test/java/com/datatorrent/lib/math/DivisionTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/DivisionTest.java
@@ -29,6 +29,7 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
  * <p>
  *
  */
+
 public class DivisionTest
 {
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/ExceptMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/ExceptMapTest.java b/library/src/test/java/com/datatorrent/lib/math/ExceptMapTest.java
deleted file mode 100644
index a113d5f..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/ExceptMapTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
-
-/**
- * Functional tests for {@link com.datatorrent.lib.math.ExceptMap}
- */
-public class ExceptMapTest
-{
-  /**
-   * Test node logic emits correct results
-   */
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    testNodeProcessingSchema(new ExceptMap<String, Integer>());
-    testNodeProcessingSchema(new ExceptMap<String, Double>());
-    testNodeProcessingSchema(new ExceptMap<String, Float>());
-    testNodeProcessingSchema(new ExceptMap<String, Short>());
-    testNodeProcessingSchema(new ExceptMap<String, Long>());
-  }
-
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  public void testNodeProcessingSchema(ExceptMap oper)
-  {
-    CountAndLastTupleTestSink exceptSink = new CountAndLastTupleTestSink();
-    oper.except.setSink(exceptSink);
-    oper.setKey("a");
-    oper.setValue(3.0);
-    oper.setTypeEQ();
-
-    oper.beginWindow(0);
-    HashMap<String, Number> input = new HashMap<String, Number>();
-    input.put("a", 2);
-    input.put("b", 20);
-    input.put("c", 1000);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 3);
-    oper.data.process(input);
-    oper.endWindow();
-
-    // One for each key
-    Assert.assertEquals("number emitted tuples", 1, exceptSink.count);
-    for (Map.Entry<String, Number> e : ((HashMap<String, Number>)exceptSink.tuple)
-        .entrySet()) {
-      if (e.getKey().equals("a")) {
-        Assert.assertEquals("emitted value for 'a' was ", new Double(2), e
-            .getValue().doubleValue(), 0);
-      } else if (e.getKey().equals("b")) {
-        Assert.assertEquals("emitted tuple for 'b' was ", new Double(20), e
-            .getValue().doubleValue(), 0);
-      } else if (e.getKey().equals("c")) {
-        Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), e
-            .getValue().doubleValue(), 0);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java b/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java
index 68e89eb..4eb6a1b 100644
--- a/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/MultiplyByConstantTest.java
@@ -24,8 +24,10 @@ import org.junit.Test;
 import com.datatorrent.lib.testbench.SumTestSink;
 
 /**
+ *
  * Functional tests for {@link com.datatorrent.lib.math.MultiplyByConstant}
  */
+
 public class MultiplyByConstantTest
 {
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/QuotientMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/QuotientMapTest.java b/library/src/test/java/com/datatorrent/lib/math/QuotientMapTest.java
deleted file mode 100644
index 92c0e77..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/QuotientMapTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
-
-/**
- * Functional tests for {@link com.datatorrent.lib.math.QuotientMap}
- */
-public class QuotientMapTest
-{
-  private static Logger LOG = LoggerFactory.getLogger(QuotientMap.class);
-
-  /**
-   * Test node logic emits correct results
-   */
-  @Test
-  public void testNodeProcessing() throws Exception
-  {
-    testNodeProcessingSchema(new QuotientMap<String, Integer>());
-    testNodeProcessingSchema(new QuotientMap<String, Double>());
-  }
-
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  public void testNodeProcessingSchema(QuotientMap oper) throws Exception
-  {
-    CountAndLastTupleTestSink quotientSink = new CountAndLastTupleTestSink();
-
-    oper.quotient.setSink(quotientSink);
-    oper.setMult_by(2);
-
-    oper.beginWindow(0); //
-    HashMap<String, Number> input = new HashMap<String, Number>();
-    int numtuples = 100;
-    for (int i = 0; i < numtuples; i++) {
-      input.clear();
-      input.put("a", 2);
-      input.put("b", 20);
-      input.put("c", 1000);
-      oper.numerator.process(input);
-      input.clear();
-      input.put("a", 2);
-      input.put("b", 40);
-      input.put("c", 500);
-      oper.denominator.process(input);
-    }
-
-    oper.endWindow();
-
-    // One for each key
-    Assert.assertEquals("number emitted tuples", 1, quotientSink.count);
-    HashMap<String, Number> output = (HashMap<String, Number>)quotientSink.tuple;
-    for (Map.Entry<String, Number> e : output.entrySet()) {
-      if (e.getKey().equals("a")) {
-        Assert.assertEquals("emitted value for 'a' was ", 2d,
-            e.getValue());
-      } else if (e.getKey().equals("b")) {
-        Assert.assertEquals("emitted tuple for 'b' was ", 1d,
-            e.getValue());
-      } else if (e.getKey().equals("c")) {
-        Assert.assertEquals("emitted tuple for 'c' was ", 4d,
-            e.getValue());
-      } else {
-        LOG.debug(String.format("key was %s", e.getKey()));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/QuotientTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/QuotientTest.java b/library/src/test/java/com/datatorrent/lib/math/QuotientTest.java
deleted file mode 100644
index 604e45f..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/QuotientTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.api.Sink;
-
-/**
- * Functional tests for {@link com.datatorrent.lib.math.Quotient}
- */
-public class QuotientTest
-{
-
-  class TestSink implements Sink<Object>
-  {
-    List<Object> collectedTuples = new ArrayList<Object>();
-
-    @Override
-    public void put(Object payload)
-    {
-      collectedTuples.add(payload);
-    }
-
-    @Override
-    public int getCount(boolean reset)
-    {
-      throw new UnsupportedOperationException("Not supported yet.");
-    }
-  }
-
-  /**
-   * Test oper logic emits correct results.
-   */
-  @Test
-  public void testNodeSchemaProcessing()
-  {
-    Quotient<Double> oper = new Quotient<Double>();
-    TestSink quotientSink = new TestSink();
-    oper.quotient.setSink(quotientSink);
-
-    oper.setMult_by(2);
-
-    oper.beginWindow(0); //
-    Double a = 30.0;
-    Double b = 20.0;
-    Double c = 100.0;
-    oper.denominator.process(a);
-    oper.denominator.process(b);
-    oper.denominator.process(c);
-
-    a = 5.0;
-    oper.numerator.process(a);
-    a = 1.0;
-    oper.numerator.process(a);
-    b = 44.0;
-    oper.numerator.process(b);
-
-    b = 10.0;
-    oper.numerator.process(b);
-    c = 22.0;
-    oper.numerator.process(c);
-    c = 18.0;
-    oper.numerator.process(c);
-
-    a = 0.5;
-    oper.numerator.process(a);
-    b = 41.5;
-    oper.numerator.process(b);
-    a = 8.0;
-    oper.numerator.process(a);
-    oper.endWindow(); //
-
-    // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
-    Assert.assertEquals("number emitted tuples", 1,
-        quotientSink.collectedTuples.size());
-    for (Object o : quotientSink.collectedTuples) { // sum is 1157
-      Double val = (Double)o;
-      Assert.assertEquals("emitted quotient value was ", new Double(2.0), val);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java b/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java
index e968dba..f74e0c9 100644
--- a/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java
+++ b/library/src/test/java/com/datatorrent/lib/math/SigmaTest.java
@@ -26,11 +26,12 @@ import org.junit.Test;
 import com.datatorrent.lib.testbench.SumTestSink;
 
 /**
- * 
+ *
  * Functional tests for {@link com.datatorrent.lib.math.Sigma}
  * <p>
  * 
  */
+
 public class SigmaTest
 {
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/math/SumCountMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/math/SumCountMapTest.java b/library/src/test/java/com/datatorrent/lib/math/SumCountMapTest.java
deleted file mode 100644
index b0c7b38..0000000
--- a/library/src/test/java/com/datatorrent/lib/math/SumCountMapTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.math;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- * Functional tests for {@link com.datatorrent.lib.math.SumCountMap}.
- */
-public class SumCountMapTest
-{
-  /**
-   * Test operator logic emits correct results.
-   */
-  @Test
-  public void testNodeProcessing()
-  {
-    testNodeSchemaProcessing(true, true);
-    testNodeSchemaProcessing(true, false);
-    testNodeSchemaProcessing(false, true);
-    testNodeSchemaProcessing(false, false);
-  }
-
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  public void testNodeSchemaProcessing(boolean sum, boolean count)
-  {
-    SumCountMap<String, Double> oper = new SumCountMap<String, Double>();
-    oper.setType(Double.class);
-    CollectorTestSink sumSink = new CollectorTestSink();
-    CollectorTestSink countSink = new CollectorTestSink();
-    if (sum) {
-      oper.sum.setSink(sumSink);
-    }
-    if (count) {
-      oper.count.setSink(countSink);
-    }
-
-    oper.beginWindow(0); //
-
-    HashMap<String, Double> input = new HashMap<String, Double>();
-
-    input.put("a", 2.0);
-    input.put("b", 20.0);
-    input.put("c", 1000.0);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 1.0);
-    oper.data.process(input);
-    input.clear();
-    input.put("a", 10.0);
-    input.put("b", 5.0);
-    oper.data.process(input);
-    input.clear();
-    input.put("d", 55.0);
-    input.put("b", 12.0);
-    oper.data.process(input);
-    input.clear();
-    input.put("d", 22.0);
-    oper.data.process(input);
-    input.clear();
-    input.put("d", 14.2);
-    oper.data.process(input);
-    input.clear();
-
-    // Mix integers and doubles
-    HashMap<String, Double> inputi = new HashMap<String, Double>();
-    inputi.put("d", 46.0);
-    inputi.put("e", 2.0);
-    oper.data.process(inputi);
-    inputi.clear();
-    inputi.put("a", 23.0);
-    inputi.put("d", 4.0);
-    oper.data.process(inputi);
-    inputi.clear();
-
-    oper.endWindow(); //
-
-    if (sum) {
-      // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
-      Assert.assertEquals("number emitted tuples", 1, sumSink.collectedTuples.size());
-
-      for (Object o : sumSink.collectedTuples) {
-        HashMap<String, Object> output = (HashMap<String, Object>)o;
-        for (Map.Entry<String, Object> e : output.entrySet()) {
-          Double val = (Double)e.getValue();
-          if (e.getKey().equals("a")) {
-            Assert.assertEquals("emitted value for 'a' was ", new Double(36),
-                val);
-          } else if (e.getKey().equals("b")) {
-            Assert.assertEquals("emitted tuple for 'b' was ", new Double(37),
-                val);
-          } else if (e.getKey().equals("c")) {
-            Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000),
-                val);
-          } else if (e.getKey().equals("d")) {
-            Assert.assertEquals("emitted tuple for 'd' was ",
-                new Double(141.2), val);
-          } else if (e.getKey().equals("e")) {
-            Assert.assertEquals("emitted tuple for 'e' was ", new Double(2),
-                val);
-          }
-        }
-      }
-    }
-    if (count) {
-      // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
-      Assert.assertEquals("number emitted tuples", 1, countSink.collectedTuples.size());
-      for (Object o : countSink.collectedTuples) {
-        HashMap<String, Object> output = (HashMap<String, Object>)o;
-        for (Map.Entry<String, Object> e : output.entrySet()) {
-          Integer val = (Integer)e.getValue();
-          if (e.getKey().equals("a")) {
-            Assert
-                .assertEquals("emitted value for 'a' was ", 4, val.intValue());
-          } else if (e.getKey().equals("b")) {
-            Assert
-                .assertEquals("emitted tuple for 'b' was ", 3, val.intValue());
-          } else if (e.getKey().equals("c")) {
-            Assert
-                .assertEquals("emitted tuple for 'c' was ", 1, val.intValue());
-          } else if (e.getKey().equals("d")) {
-            Assert
-                .assertEquals("emitted tuple for 'd' was ", 5, val.intValue());
-          } else if (e.getKey().equals("e")) {
-            Assert
-                .assertEquals("emitted tuple for 'e' was ", 1, val.intValue());
-          }
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java
deleted file mode 100644
index 1f29d1d..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/DeleteOperatorTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- * Functional test for {@link com.datatorrent.lib.streamquery.DeleteOperator}.
- */
-public class DeleteOperatorTest
-{
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testSqlSelect()
-  {
-    // create operator
-    DeleteOperator oper = new DeleteOperator();
-
-    EqualValueCondition  condition = new EqualValueCondition();
-    condition.addEqualValue("a", 1);
-    oper.setCondition(condition);
-
-    CollectorTestSink sink = new CollectorTestSink();
-    oper.outport.setSink(sink);
-
-    oper.setup(null);
-    oper.beginWindow(1);
-
-    HashMap<String, Object> tuple = new HashMap<String, Object>();
-    tuple.put("a", 0);
-    tuple.put("b", 1);
-    tuple.put("c", 2);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 3);
-    tuple.put("c", 4);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 5);
-    tuple.put("c", 6);
-    oper.inport.process(tuple);
-
-    oper.endWindow();
-    oper.teardown();
-
-    LOG.debug("{}", sink.collectedTuples);
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(DeleteOperatorTest.class);
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java
deleted file mode 100644
index 728fb96..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/FullOuterJoinOperatorTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.condition.Condition;
-import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
-import com.datatorrent.lib.streamquery.index.ColumnIndex;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-public class FullOuterJoinOperatorTest
-{
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testSqlSelect()
-  {
-    // create operator
-    OuterJoinOperator oper = new OuterJoinOperator();
-    oper.setFullJoin(true);
-    CollectorTestSink sink = new CollectorTestSink();
-    oper.outport.setSink(sink);
-
-    // set column join condition  
-    Condition cond = new JoinColumnEqualCondition("a", "a");
-    oper.setJoinCondition(cond);
-    
-    // add columns  
-    oper.selectTable1Column(new ColumnIndex("b", null));
-    oper.selectTable2Column(new ColumnIndex("c", null));
-
-    oper.setup(null);
-    oper.beginWindow(1);
-
-    HashMap<String, Object> tuple = new HashMap<String, Object>();
-    tuple.put("a", 0);
-    tuple.put("b", 1);
-    tuple.put("c", 2);
-    oper.inport1.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 3);
-    tuple.put("c", 4);
-    oper.inport1.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 2);
-    tuple.put("b", 11);
-    tuple.put("c", 12);
-    oper.inport1.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 0);
-    tuple.put("b", 7);
-    tuple.put("c", 8);
-    oper.inport2.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 5);
-    tuple.put("c", 6);
-    oper.inport2.process(tuple);
-
-    oper.endWindow();
-    oper.teardown();
-
-    LOG.debug("{}", sink.collectedTuples);
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(FullOuterJoinOperatorTest.class);
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java
deleted file mode 100644
index 714f93b..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/GroupByOperatorTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
-import com.datatorrent.lib.streamquery.function.SumFunction;
-import com.datatorrent.lib.streamquery.index.ColumnIndex;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- * Functional test for {@link com.datatorrent.lib.streamquery.GroupByOperatorTest}.
- */
-public class GroupByOperatorTest
-{
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testSqlGroupBy()
-  {
-    // create operator
-    GroupByHavingOperator oper = new GroupByHavingOperator();
-    oper.addColumnGroupByIndex(new ColumnIndex("b", null));
-    try {
-      oper.addAggregateIndex(new SumFunction("c", null));
-    } catch (Exception e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-      return;
-    }
-
-    EqualValueCondition  condition = new EqualValueCondition();
-    condition.addEqualValue("a", 1);
-    oper.setCondition(condition);
-
-    CollectorTestSink sink = new CollectorTestSink();
-    oper.outport.setSink(sink);
-
-    oper.setup(null);
-    oper.beginWindow(1);
-
-    HashMap<String, Object> tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 1);
-    tuple.put("c", 2);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 1);
-    tuple.put("c", 4);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 2);
-    tuple.put("c", 6);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 2);
-    tuple.put("c", 7);
-    oper.inport.process(tuple);
-    
-    oper.endWindow();
-    oper.teardown();
-
-    LOG.debug("{}", sink.collectedTuples);
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(GroupByOperatorTest.class);
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java
deleted file mode 100644
index e11723d..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/HavingOperatorTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
-import com.datatorrent.lib.streamquery.condition.HavingCompareValue;
-import com.datatorrent.lib.streamquery.condition.HavingCondition;
-import com.datatorrent.lib.streamquery.function.FunctionIndex;
-import com.datatorrent.lib.streamquery.function.SumFunction;
-import com.datatorrent.lib.streamquery.index.ColumnIndex;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- * Functional test for {@link com.datatorrent.lib.streamquery.HavingOperatorTest}.
- */
-public class HavingOperatorTest
-{
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testSqlGroupBy() throws Exception
-  {
-    // create operator
-    GroupByHavingOperator oper = new GroupByHavingOperator();
-    oper.addColumnGroupByIndex(new ColumnIndex("b", null));
-    FunctionIndex sum = new SumFunction("c", null);
-    oper.addAggregateIndex(sum);
-
-    // create having condition
-    HavingCondition having = new HavingCompareValue<Double>(sum, 6.0, 0);
-    oper.addHavingCondition(having);
-
-    EqualValueCondition  condition = new EqualValueCondition();
-    condition.addEqualValue("a", 1);
-    oper.setCondition(condition);
-
-    CollectorTestSink sink = new CollectorTestSink();
-    oper.outport.setSink(sink);
-
-    oper.setup(null);
-    oper.beginWindow(1);
-
-    HashMap<String, Object> tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 1);
-    tuple.put("c", 2);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 1);
-    tuple.put("c", 4);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 2);
-    tuple.put("c", 6);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 2);
-    tuple.put("c", 7);
-    oper.inport.process(tuple);
-    
-    oper.endWindow();
-    oper.teardown();
-
-    LOG.debug("{}", sink.collectedTuples);
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(HavingOperatorTest.class);
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java
deleted file mode 100644
index 8a022ee..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/InnerJoinOperatorTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.condition.Condition;
-import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
-import com.datatorrent.lib.streamquery.index.ColumnIndex;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- * 
- * Functional test for {@link com.datatorrent.lib.streamquery.InnerJoinOperator }.
- *
- */
-public class InnerJoinOperatorTest
-{
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testSqlSelect()
-  {
-    // create operator
-    InnerJoinOperator oper = new InnerJoinOperator();
-    CollectorTestSink sink = new CollectorTestSink();
-    oper.outport.setSink(sink);
-
-    // set column join condition
-    Condition cond = new JoinColumnEqualCondition("a", "a");
-    oper.setJoinCondition(cond);
-
-    // add columns
-    oper.selectTable1Column(new ColumnIndex("b", null));
-    oper.selectTable2Column(new ColumnIndex("c", null));
-
-    oper.setup(null);
-    oper.beginWindow(1);
-
-    HashMap<String, Object> tuple = new HashMap<String, Object>();
-    tuple.put("a", 0);
-    tuple.put("b", 1);
-    tuple.put("c", 2);
-    oper.inport1.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 3);
-    tuple.put("c", 4);
-    oper.inport1.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 0);
-    tuple.put("b", 7);
-    tuple.put("c", 8);
-    oper.inport2.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 5);
-    tuple.put("c", 6);
-    oper.inport2.process(tuple);
-
-    oper.endWindow();
-    oper.teardown();
-
-    LOG.debug("{}", sink.collectedTuples);
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(InnerJoinOperatorTest.class);
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java
deleted file mode 100644
index aa25e87..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/LeftOuterJoinOperatorTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.condition.Condition;
-import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
-import com.datatorrent.lib.streamquery.index.ColumnIndex;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-public class LeftOuterJoinOperatorTest
-{
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testSqlSelect()
-  {
-    // create operator
-    OuterJoinOperator oper = new OuterJoinOperator();
-    CollectorTestSink sink = new CollectorTestSink();
-    oper.outport.setSink(sink);
-
-    // set column join condition  
-    Condition cond = new JoinColumnEqualCondition("a", "a");
-    oper.setJoinCondition(cond);
-    
-    // add columns  
-    oper.selectTable1Column(new ColumnIndex("b", null));
-    oper.selectTable2Column(new ColumnIndex("c", null));
-
-    oper.setup(null);
-    oper.beginWindow(1);
-
-    HashMap<String, Object> tuple = new HashMap<String, Object>();
-    tuple.put("a", 0);
-    tuple.put("b", 1);
-    tuple.put("c", 2);
-    oper.inport1.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 3);
-    tuple.put("c", 4);
-    oper.inport1.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 2);
-    tuple.put("b", 11);
-    tuple.put("c", 12);
-    oper.inport1.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 0);
-    tuple.put("b", 7);
-    tuple.put("c", 8);
-    oper.inport2.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 5);
-    tuple.put("c", 6);
-    oper.inport2.process(tuple);
-
-    oper.endWindow();
-    oper.teardown();
-
-    LOG.debug("{}", sink.collectedTuples);
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(LeftOuterJoinOperatorTest.class);
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java
deleted file mode 100644
index 2d7ba87..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/OrderByOperatorTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- *  Functional test for {@link com.datatorrent.lib.streamquery.OrderByOperatorTest}.
- */
-public class OrderByOperatorTest
-{
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testSqlSelect()
-  {
-    // craete operator
-    OrderByOperator oper = new OrderByOperator();
-
-    CollectorTestSink sink = new CollectorTestSink();
-    oper.outport.setSink(sink);
-    oper.addOrderByRule(new OrderByRule<Integer>("b"));
-    oper.setDescending(true);
-
-    oper.setup(null);
-    oper.beginWindow(1);
-
-    HashMap<String, Object> tuple = new HashMap<String, Object>();
-    tuple.put("c", 2);
-    tuple.put("a", 0);
-    tuple.put("b", 1);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 2);
-    tuple.put("b", 5);
-    tuple.put("c", 6);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 2);
-    tuple.put("b", 6);
-    tuple.put("c", 6);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 3);
-    tuple.put("c", 4);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 4);
-    tuple.put("c", 4);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 8);
-    tuple.put("c", 4);
-    oper.inport.process(tuple);
-
-    oper.endWindow();
-    oper.teardown();
-
-    LOG.debug("{}", sink.collectedTuples);
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(OrderByOperatorTest.class);
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java
deleted file mode 100644
index 3a57427..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/RightOuterJoinOperatorTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.condition.Condition;
-import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
-import com.datatorrent.lib.streamquery.index.ColumnIndex;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-public class RightOuterJoinOperatorTest
-{
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testSqlSelect()
-  {
-    // create operator
-    OuterJoinOperator oper = new OuterJoinOperator();
-    oper.setRighttJoin();
-    CollectorTestSink sink = new CollectorTestSink();
-    oper.outport.setSink(sink);
-
-    // set column join condition  
-    Condition cond = new JoinColumnEqualCondition("a", "a");
-    oper.setJoinCondition(cond);
-    
-    // add columns  
-    oper.selectTable1Column(new ColumnIndex("b", null));
-    oper.selectTable2Column(new ColumnIndex("c", null));
-
-    oper.setup(null);
-    oper.beginWindow(1);
-
-    HashMap<String, Object> tuple = new HashMap<String, Object>();
-    tuple.put("a", 0);
-    tuple.put("b", 1);
-    tuple.put("c", 2);
-    oper.inport1.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 3);
-    tuple.put("c", 4);
-    oper.inport1.process(tuple);
-
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 0);
-    tuple.put("b", 7);
-    tuple.put("c", 8);
-    oper.inport2.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 5);
-    tuple.put("c", 6);
-    oper.inport2.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 2);
-    tuple.put("b", 11);
-    tuple.put("c", 12);
-    oper.inport2.process(tuple);
-    
-    oper.endWindow();
-    oper.teardown();
-
-    LOG.debug("{}", sink.collectedTuples);
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(RightOuterJoinOperatorTest.class);
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java
deleted file mode 100644
index 8e6620e..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/SelectOperatorTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
-import com.datatorrent.lib.streamquery.index.ColumnIndex;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- * Functional test for {@link com.datatorrent.lib.streamquery.SelectOperatorTest}.
- */
-public class SelectOperatorTest
-{
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testSqlSelect()
-  {
-    // create operator
-    SelectOperator oper = new SelectOperator();
-    oper.addIndex(new ColumnIndex("b", null));
-    oper.addIndex(new ColumnIndex("c", null));
-
-    EqualValueCondition  condition = new EqualValueCondition();
-    condition.addEqualValue("a", 1);
-    oper.setCondition(condition);
-
-    CollectorTestSink sink = new CollectorTestSink();
-    oper.outport.setSink(sink);
-
-    oper.setup(null);
-    oper.beginWindow(1);
-
-    HashMap<String, Object> tuple = new HashMap<String, Object>();
-    tuple.put("a", 0);
-    tuple.put("b", 1);
-    tuple.put("c", 2);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 3);
-    tuple.put("c", 4);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 5);
-    tuple.put("c", 6);
-    oper.inport.process(tuple);
-
-    oper.endWindow();
-    oper.teardown();
-
-    LOG.debug("{}", sink.collectedTuples);
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(SelectOperatorTest.class);
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java
deleted file mode 100644
index c92c6c1..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/SelectTopOperatorTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-public class SelectTopOperatorTest
-{
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  @Test
-  public void testOperator() throws Exception
-  {
-    SelectTopOperator oper = new SelectTopOperator();
-    oper.setTopValue(2);
-    CollectorTestSink sink = new CollectorTestSink();
-    oper.outport.setSink(sink);
-    
-    oper.beginWindow(1);
-    HashMap<String, Object> tuple = new HashMap<String, Object>();
-    tuple.put("a", 0);
-    tuple.put("b", 1);
-    tuple.put("c", 2);
-    oper.inport.process(tuple);
-    
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 3);
-    tuple.put("c", 4);
-    oper.inport.process(tuple);
-    
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 5);
-    tuple.put("c", 6);
-    oper.inport.process(tuple);
-    oper.endWindow();
-
-    LOG.debug("{}", sink.collectedTuples);
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(SelectTopOperatorTest.class);
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java
deleted file mode 100644
index 42af56b..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/UpdateOperatorTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.condition.EqualValueCondition;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-public class UpdateOperatorTest
-{
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testSqlSelect()
-  {
-    // create operator
-    UpdateOperator oper = new UpdateOperator();
-
-    EqualValueCondition  condition = new EqualValueCondition();
-    condition.addEqualValue("a", 1);
-    oper.setCondition(condition);
-    oper.addUpdate("c", 100);
-
-    CollectorTestSink sink = new CollectorTestSink();
-    oper.outport.setSink(sink);
-
-    oper.setup(null);
-    oper.beginWindow(1);
-
-    HashMap<String, Object> tuple = new HashMap<String, Object>();
-    tuple.put("a", 0);
-    tuple.put("b", 1);
-    tuple.put("c", 2);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 3);
-    tuple.put("c", 4);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 5);
-    tuple.put("c", 6);
-    oper.inport.process(tuple);
-
-    oper.endWindow();
-    oper.teardown();
-
-    LOG.debug("{}", sink.collectedTuples);
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(UpdateOperatorTest.class);
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/library/src/test/java/com/datatorrent/lib/streamquery/advanced/BetweenConditionTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/BetweenConditionTest.java b/library/src/test/java/com/datatorrent/lib/streamquery/advanced/BetweenConditionTest.java
deleted file mode 100644
index b0500eb..0000000
--- a/library/src/test/java/com/datatorrent/lib/streamquery/advanced/BetweenConditionTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.streamquery.advanced;
-
-import java.util.HashMap;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.lib.streamquery.SelectOperator;
-import com.datatorrent.lib.streamquery.condition.BetweenCondition;
-import com.datatorrent.lib.streamquery.index.ColumnIndex;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-
-/**
- * Functional test for {@link com.datatorrent.lib.streamquery.advanced.BetweenConditionTest}.
- */
-public class BetweenConditionTest
-{
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  @Test
-  public void testSqlSelect()
-  {
-    // create operator
-    SelectOperator oper = new SelectOperator();
-    oper.addIndex(new ColumnIndex("b", null));
-    oper.addIndex(new ColumnIndex("c", null));
-
-    BetweenCondition cond = new BetweenCondition("a", 0, 2);
-    oper.setCondition(cond);
-
-
-    CollectorTestSink sink = new CollectorTestSink();
-    oper.outport.setSink(sink);
-
-    oper.setup(null);
-    oper.beginWindow(1);
-
-    HashMap<String, Object> tuple = new HashMap<String, Object>();
-    tuple.put("a", 0);
-    tuple.put("b", 1);
-    tuple.put("c", 2);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 1);
-    tuple.put("b", 3);
-    tuple.put("c", 4);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 2);
-    tuple.put("b", 5);
-    tuple.put("c", 6);
-    oper.inport.process(tuple);
-
-    tuple = new HashMap<String, Object>();
-    tuple.put("a", 3);
-    tuple.put("b", 7);
-    tuple.put("c", 8);
-    oper.inport.process(tuple);
-    
-    oper.endWindow();
-    oper.teardown();
-
-    LOG.debug("{}", sink.collectedTuples);
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(BetweenConditionTest.class);
-}


Mime
View raw message