apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [12/22] incubator-apex-malhar git commit: APEXMALHAR-2095 removed checkstyle violations of malhar library module
Date Wed, 18 May 2016 20:42:02 GMT
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/statistics/WeightedMeanOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/statistics/WeightedMeanOperator.java b/library/src/main/java/com/datatorrent/lib/statistics/WeightedMeanOperator.java
index a312962..7aa86be 100644
--- a/library/src/main/java/com/datatorrent/lib/statistics/WeightedMeanOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/statistics/WeightedMeanOperator.java
@@ -82,7 +82,9 @@ public class WeightedMeanOperator<V extends Number>  extends BaseNumberValueOper
     @Override
     public void process(V tuple)
     {
-      if (tuple.doubleValue() != 0.0) currentWeight = tuple.doubleValue();
+      if (tuple.doubleValue() != 0.0) {
+        currentWeight = tuple.doubleValue();
+      }
     }
   };
 
@@ -101,7 +103,7 @@ public class WeightedMeanOperator<V extends Number>  extends BaseNumberValueOper
   public void endWindow()
   {
     if (weightedCount != 0.0) {
-       mean.emit(getAverage());
+      mean.emit(getAverage());
     }
     weightedSum = 0.0;
     weightedCount = 0.0;
@@ -123,21 +125,21 @@ public class WeightedMeanOperator<V extends Number>  extends BaseNumberValueOper
         val = num.doubleValue() / weightedCount;
         break;
       case INTEGER:
-        val = (int) (num.intValue() / weightedCount);
+        val = (int)(num.intValue() / weightedCount);
         break;
       case FLOAT:
         val = new Float(num.floatValue() / weightedCount);
         break;
       case LONG:
-        val = (long) (num.longValue() / weightedCount);
+        val = (long)(num.longValue() / weightedCount);
         break;
       case SHORT:
-        val = (short) (num.shortValue() / weightedCount);
+        val = (short)(num.shortValue() / weightedCount);
         break;
       default:
         val = num.doubleValue() / weightedCount;
         break;
     }
-    return (V) val;
+    return (V)val;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/AbstractAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/stream/AbstractAggregator.java b/library/src/main/java/com/datatorrent/lib/stream/AbstractAggregator.java
index 9a8daa3..abb6be5 100644
--- a/library/src/main/java/com/datatorrent/lib/stream/AbstractAggregator.java
+++ b/library/src/main/java/com/datatorrent/lib/stream/AbstractAggregator.java
@@ -48,97 +48,97 @@ import com.datatorrent.api.annotation.OperatorAnnotation;
 @OperatorAnnotation(partitionable = false)
 public abstract class AbstractAggregator<T> implements Operator
 {
-	/**
-	 * collection of input values.
-	 */
-	protected Collection<T> collection;
-	@Min(0)
-	/**
-	 * size of output collection, all tuples till end window if set to 0.
-	 */
-	private int size = 0;
+  /**
+   * collection of input values.
+   */
+  protected Collection<T> collection;
+  @Min(0)
+  /**
+   * size of output collection, all tuples till end window if set to 0.
+   */
+  private int size = 0;
 
-	/**
-	 * Input port that takes data to be added to a collection.
-	 */
-	public final transient DefaultInputPort<T> input = new DefaultInputPort<T>()
-	{
-		@Override
-		public void process(T tuple)
-		{
-			if (collection == null) {
-				collection = getNewCollection(size);
-			}
-			collection.add(tuple);
-			if (collection.size() == size) {
-				output.emit(collection);
-				collection = null;
-			}
-		}
+  /**
+   * Input port that takes data to be added to a collection.
+   */
+  public final transient DefaultInputPort<T> input = new DefaultInputPort<T>()
+  {
+    @Override
+    public void process(T tuple)
+    {
+      if (collection == null) {
+        collection = getNewCollection(size);
+      }
+      collection.add(tuple);
+      if (collection.size() == size) {
+        output.emit(collection);
+        collection = null;
+      }
+    }
 
-	};
+  };
 
-	/**
-	 * Output port that emits a collection.
-	 */
-	public final transient DefaultOutputPort<Collection<T>> output = new DefaultOutputPort<Collection<T>>();
+  /**
+   * Output port that emits a collection.
+   */
+  public final transient DefaultOutputPort<Collection<T>> output = new DefaultOutputPort<Collection<T>>();
 
-	/**
-	 * Set the size of the collection.
-	 *
-	 * If set to zero, the collection collects all the tuples within a window and
-	 * emits the collection as 1 output tuple at the end of the window. If set to
-	 * positive value, it collects the collection as soon as the size of the
-	 * collection reaches the size.
-	 *
-	 * @param size
-	 *          the size to set
-	 */
-	public void setSize(int size)
-	{
-		this.size = size;
-	}
+  /**
+   * Set the size of the collection.
+   *
+   * If set to zero, the collection collects all the tuples within a window and
+   * emits the collection as 1 output tuple at the end of the window. If set to
+   * positive value, it collects the collection as soon as the size of the
+   * collection reaches the size.
+   *
+   * @param size
+   *          the size to set
+   */
+  public void setSize(int size)
+  {
+    this.size = size;
+  }
 
-	/**
-	 * Size of collection.
-	 *
-	 * @return size of collection
-	 */
-	@Min(0)
-	public int getSize()
-	{
-		return size;
-	}
+  /**
+   * Size of collection.
+   *
+   * @return size of collection
+   */
+  @Min(0)
+  public int getSize()
+  {
+    return size;
+  }
 
-	/**
-	 * Abstract method to get collection of given size.
-	 *
-	 * @param size
-	 * @return collection
-	 */
-	public abstract Collection<T> getNewCollection(int size);
+  /**
+   * Abstract method to get collection of given size.
+   *
+   * @param size
+   * @return collection
+   */
+  public abstract Collection<T> getNewCollection(int size);
 
-	@Override
-	public void beginWindow(long windowId)
-	{
-	}
+  @Override
+  public void beginWindow(long windowId)
+  {
+  }
 
-	@Override
-	public void endWindow()
-	{
-		if (size == 0 && collection != null) {
-			output.emit(collection);
-			collection = null;
-		}
-	}
+  @Override
+  public void endWindow()
+  {
+    if (size == 0 && collection != null) {
+      output.emit(collection);
+      collection = null;
+    }
+  }
 
-	@Override
-	public void setup(OperatorContext context)
-	{
-	}
+  @Override
+  public void setup(OperatorContext context)
+  {
+  }
 
-	@Override
-	public void teardown()
-	{
-	}
+  @Override
+  public void teardown()
+  {
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/ArrayListToItem.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/stream/ArrayListToItem.java b/library/src/main/java/com/datatorrent/lib/stream/ArrayListToItem.java
index 106d16d..efd807a 100644
--- a/library/src/main/java/com/datatorrent/lib/stream/ArrayListToItem.java
+++ b/library/src/main/java/com/datatorrent/lib/stream/ArrayListToItem.java
@@ -18,11 +18,12 @@
  */
 package com.datatorrent.lib.stream;
 
+import java.util.ArrayList;
+
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.util.BaseKeyOperator;
-import java.util.ArrayList;
 
 /**
  * An implementation of BaseKeyOperator that breaks up an ArrayList tuple into Objects.
@@ -42,25 +43,25 @@ import java.util.ArrayList;
 @Stateless
 public class ArrayListToItem<K> extends BaseKeyOperator<K>
 {
-	/**
-	 * Input data port that takes an arraylist.
-	 */
-	public final transient DefaultInputPort<ArrayList<K>> data = new DefaultInputPort<ArrayList<K>>()
-	{
-		/**
-		 * Emit one item at a time
-		 */
-		@Override
-		public void process(ArrayList<K> tuple)
-		{
-			for (K k : tuple) {
-				item.emit(cloneKey(k));
-			}
-		}
-	};
+  /**
+   * Input data port that takes an arraylist.
+   */
+  public final transient DefaultInputPort<ArrayList<K>> data = new DefaultInputPort<ArrayList<K>>()
+  {
+    /**
+     * Emit one item at a time
+     */
+    @Override
+    public void process(ArrayList<K> tuple)
+    {
+      for (K k : tuple) {
+        item.emit(cloneKey(k));
+      }
+    }
+  };
 
-	/**
-	 * Output port that emits an array item.
-	 */
-	public final transient DefaultOutputPort<K> item = new DefaultOutputPort<K>();
+  /**
+   * Output port that emits an array item.
+   */
+  public final transient DefaultOutputPort<K> item = new DefaultOutputPort<K>();
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/ConsolidatorKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/stream/ConsolidatorKeyVal.java b/library/src/main/java/com/datatorrent/lib/stream/ConsolidatorKeyVal.java
index 148f0b5..6874796 100644
--- a/library/src/main/java/com/datatorrent/lib/stream/ConsolidatorKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/stream/ConsolidatorKeyVal.java
@@ -50,137 +50,137 @@ import com.datatorrent.lib.util.KeyValPair;
 @OperatorAnnotation(partitionable = false)
 public class ConsolidatorKeyVal<K, V1, V2, V3, V4, V5> implements Operator
 {
-	/**
-	 * key/array values output result.
-	 */
-	protected HashMap<K, ArrayList<Object>> result;
-
-	@Override
-	public void setup(OperatorContext context)
-	{
-	}
-
-	@Override
-	public void teardown()
-	{
-	}
-
-	/**
-	 * <p>
-	 * Class operates on <K,V> pair, stores value in given number position in
-	 * list. <br>
-	 *
-	 * @param <V>
-	 *          value type.
-	 */
-	public class ConsolidatorInputPort<V> extends
-			DefaultInputPort<KeyValPair<K, V>>
-	{
-		/**
-		 * Value position in list.
-		 */
-		private int number;
-
-		/**
-		 * Constructor
-		 *
-		 * @param oper
-		 *          Connected operator.
-		 * @param num
-		 *          Value position in list.
-		 */
-		ConsolidatorInputPort(Operator oper, int num)
-		{
-			super();
-			number = num;
-		}
-
-		/**
-		 * Process key/value pair.
-		 */
-		@Override
-		public void process(KeyValPair<K, V> tuple)
-		{
-			K key = tuple.getKey();
-			ArrayList<Object> list = getObject(key);
-			list.set(number, tuple.getValue());
-		}
-
-	}
-
-	/**
-	 * V1 type value input port.
-	 */
-	public final transient ConsolidatorInputPort<V1> in1 = new ConsolidatorInputPort<V1>(
-			this, 0);
-
-	/**
-	 * V2 type value input port.
-	 */
-	public final transient ConsolidatorInputPort<V2> in2 = new ConsolidatorInputPort<V2>(
-			this, 1);
-
-	/**
-	 * V3 type value input port.
-	 */
-	@InputPortFieldAnnotation(optional = true)
-	public final transient ConsolidatorInputPort<V3> in3 = new ConsolidatorInputPort<V3>(
-			this, 2);
-
-	/**
-	 * V4 type value input port.
-	 */
-	@InputPortFieldAnnotation(optional = true)
-	public final transient ConsolidatorInputPort<V4> in4 = new ConsolidatorInputPort<V4>(
-			this, 3);
-
-	/**
-	 * V5 type value input port.
-	 */
-	@InputPortFieldAnnotation(optional = true)
-	public final transient ConsolidatorInputPort<V5> in5 = new ConsolidatorInputPort<V5>(
-			this, 4);
-
-	/**
-	 * Output port that emits a hashmap of &lt;key,arraylist&gt;.
-	 */
-	public final transient DefaultOutputPort<HashMap<K, ArrayList<Object>>> out = new DefaultOutputPort<HashMap<K, ArrayList<Object>>>();
-
-	/**
-	 * Get array list object for given key
-	 *
-	 * @param k  key
-	 * @return array list for key.
-	 */
-	public ArrayList<Object> getObject(K k)
-	{
-		ArrayList<Object> val = result.get(k);
-		if (val == null) {
-			val = new ArrayList<Object>(5);
-			val.add(0, null);
-			val.add(1, null);
-			val.add(2, null);
-			val.add(3, null);
-			val.add(4, null);
-			result.put(k, val);
-		}
-		return val;
-	}
-
-	@Override
-	public void beginWindow(long windowId)
-	{
-		result = new HashMap<K, ArrayList<Object>>();
-	}
-
-	/**
-	 * Emits merged data
-	 */
-	@Override
-	public void endWindow()
-	{
-		if (!result.isEmpty()) {
-			out.emit(result);
-		}
-	}
+  /**
+   * key/array values output result.
+   */
+  protected HashMap<K, ArrayList<Object>> result;
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  /**
+   * <p>
+   * Class operates on <K,V> pair, stores value in given number position in
+   * list. <br>
+   *
+   * @param <V>
+   *          value type.
+   */
+  public class ConsolidatorInputPort<V> extends
+      DefaultInputPort<KeyValPair<K, V>>
+  {
+    /**
+     * Value position in list.
+     */
+    private int number;
+
+    /**
+     * Constructor
+     *
+     * @param oper
+     *          Connected operator.
+     * @param num
+     *          Value position in list.
+     */
+    ConsolidatorInputPort(Operator oper, int num)
+    {
+      super();
+      number = num;
+    }
+
+    /**
+     * Process key/value pair.
+     */
+    @Override
+    public void process(KeyValPair<K, V> tuple)
+    {
+      K key = tuple.getKey();
+      ArrayList<Object> list = getObject(key);
+      list.set(number, tuple.getValue());
+    }
+
+  }
+
+  /**
+   * V1 type value input port.
+   */
+  public final transient ConsolidatorInputPort<V1> in1 = new ConsolidatorInputPort<V1>(
+      this, 0);
+
+  /**
+   * V2 type value input port.
+   */
+  public final transient ConsolidatorInputPort<V2> in2 = new ConsolidatorInputPort<V2>(
+      this, 1);
+
+  /**
+   * V3 type value input port.
+   */
+  @InputPortFieldAnnotation(optional = true)
+  public final transient ConsolidatorInputPort<V3> in3 = new ConsolidatorInputPort<V3>(
+      this, 2);
+
+  /**
+   * V4 type value input port.
+   */
+  @InputPortFieldAnnotation(optional = true)
+  public final transient ConsolidatorInputPort<V4> in4 = new ConsolidatorInputPort<V4>(
+      this, 3);
+
+  /**
+   * V5 type value input port.
+   */
+  @InputPortFieldAnnotation(optional = true)
+  public final transient ConsolidatorInputPort<V5> in5 = new ConsolidatorInputPort<V5>(
+      this, 4);
+
+  /**
+   * Output port that emits a hashmap of &lt;key,arraylist&gt;.
+   */
+  public final transient DefaultOutputPort<HashMap<K, ArrayList<Object>>> out = new DefaultOutputPort<HashMap<K, ArrayList<Object>>>();
+
+  /**
+   * Get array list object for given key
+   *
+   * @param k  key
+   * @return array list for key.
+   */
+  public ArrayList<Object> getObject(K k)
+  {
+    ArrayList<Object> val = result.get(k);
+    if (val == null) {
+      val = new ArrayList<Object>(5);
+      val.add(0, null);
+      val.add(1, null);
+      val.add(2, null);
+      val.add(3, null);
+      val.add(4, null);
+      result.put(k, val);
+    }
+    return val;
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    result = new HashMap<K, ArrayList<Object>>();
+  }
+
+  /**
+   * Emits merged data
+   */
+  @Override
+  public void endWindow()
+  {
+    if (!result.isEmpty()) {
+      out.emit(result);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/Counter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/stream/Counter.java b/library/src/main/java/com/datatorrent/lib/stream/Counter.java
index 67aedb4..8de2653 100644
--- a/library/src/main/java/com/datatorrent/lib/stream/Counter.java
+++ b/library/src/main/java/com/datatorrent/lib/stream/Counter.java
@@ -42,59 +42,59 @@ import com.datatorrent.api.Operator.Unifier;
  */
 public class Counter implements Operator, Unifier<Integer>
 {
-        /**
-	 * Input port that takes objects to be counted in each window.
-	 */
-	public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
-	{
-		@Override
-		public void process(Object tuple)
-		{
-			count++;
-		}
+  /**
+   * Input port that takes objects to be counted in each window.
+   */
+  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+  {
+    @Override
+    public void process(Object tuple)
+    {
+      count++;
+    }
 
-	};
+  };
 
-          /**
-	 * Output port that takes emits count in each window.
-	 */
-	public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>()
-	{
-		@Override
-		public Unifier<Integer> getUnifier()
-		{
-			return Counter.this;
-		}
+  /**
+   * Output port that takes emits count in each window.
+   */
+  public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>()
+  {
+    @Override
+    public Unifier<Integer> getUnifier()
+    {
+      return Counter.this;
+    }
 
-	};
+  };
 
-	@Override
-	public void beginWindow(long windowId)
-	{
-		count = 0;
-	}
+  @Override
+  public void beginWindow(long windowId)
+  {
+    count = 0;
+  }
 
-	@Override
-	public void process(Integer tuple)
-	{
-		count += tuple;
-	}
+  @Override
+  public void process(Integer tuple)
+  {
+    count += tuple;
+  }
 
-	@Override
-	public void endWindow()
-	{
-		output.emit(count);
-	}
+  @Override
+  public void endWindow()
+  {
+    output.emit(count);
+  }
 
-	@Override
-	public void setup(OperatorContext context)
-	{
-	}
+  @Override
+  public void setup(OperatorContext context)
+  {
+  }
 
-	@Override
-	public void teardown()
-	{
-	}
+  @Override
+  public void teardown()
+  {
+  }
 
-	private transient int count;
+  private transient int count;
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/DevNull.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/stream/DevNull.java b/library/src/main/java/com/datatorrent/lib/stream/DevNull.java
index 8ced16b..877b562 100644
--- a/library/src/main/java/com/datatorrent/lib/stream/DevNull.java
+++ b/library/src/main/java/com/datatorrent/lib/stream/DevNull.java
@@ -18,9 +18,9 @@
  */
 package com.datatorrent.lib.stream;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * An implementation of BaseOperator that terminates a stream and does not affect the tuple.
@@ -39,15 +39,15 @@ import com.datatorrent.api.annotation.Stateless;
 @Stateless
 public class DevNull<K> extends BaseOperator
 {
-	/**
-	 * Input any data type port.
-	 */
-	public final transient DefaultInputPort<K> data = new DefaultInputPort<K>()
-	{
-		@Override
-		public void process(K tuple)
-		{
-			// Does nothing; allows a stream to terminate and therefore be debugged
-		}
-	};
+  /**
+   * Input any data type port.
+   */
+  public final transient DefaultInputPort<K> data = new DefaultInputPort<K>()
+  {
+    @Override
+    public void process(K tuple)
+    {
+      // Does nothing; allows a stream to terminate and therefore be debugged
+    }
+  };
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/DevNullCounter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/stream/DevNullCounter.java b/library/src/main/java/com/datatorrent/lib/stream/DevNullCounter.java
index e8bea13..87093fe 100644
--- a/library/src/main/java/com/datatorrent/lib/stream/DevNullCounter.java
+++ b/library/src/main/java/com/datatorrent/lib/stream/DevNullCounter.java
@@ -18,14 +18,15 @@
  */
 package com.datatorrent.lib.stream;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Context.OperatorContext;
-
 import javax.validation.constraints.Min;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.common.util.BaseOperator;
+
 /**
  * An implementation of BaseOperator that is used for logging by counting the tuple and then drops it.
  * <p>
@@ -48,141 +49,140 @@ import org.slf4j.LoggerFactory;
 public class DevNullCounter<K> extends BaseOperator
 {
         /**
-	 * Input port that takes objects to be counted in each window.
-	 */
-	public final transient DefaultInputPort<K> data = new DefaultInputPort<K>()
-	{
-		/**
-		 * Process each tuple. Expects upstream node to compute number of tuples in
-		 * that window and send it as an int<br>
-		 *
-		 * @param tuple
-		 */
-		@Override
-		public void process(K tuple)
-		{
-			tuple_count++;
-		}
-	};
-	private static Logger log = LoggerFactory.getLogger(DevNullCounter.class);
-	private long windowStartTime = 0;
-	long[] tuple_numbers = null;
-	long[] time_numbers = null;
-	int tuple_index = 0;
-	int count_denominator = 1;
-	long count_windowid = 0;
-	long tuple_count = 1; // so that the first begin window starts the count down
+   * Input port that takes objects to be counted in each window.
+   */
+  public final transient DefaultInputPort<K> data = new DefaultInputPort<K>()
+  {
+    /**
+     * Process each tuple. Expects upstream node to compute number of tuples in
+     * that window and send it as an int<br>
+     *
+     * @param tuple
+     */
+    @Override
+    public void process(K tuple)
+    {
+      tuple_count++;
+    }
+  };
+  private static Logger log = LoggerFactory.getLogger(DevNullCounter.class);
+  private long windowStartTime = 0;
+  long[] tuple_numbers = null;
+  long[] time_numbers = null;
+  int tuple_index = 0;
+  int count_denominator = 1;
+  long count_windowid = 0;
+  long tuple_count = 1; // so that the first begin window starts the count down
 
-	private boolean debug = true;
+  private boolean debug = true;
 
-	/**
-	 * getter function for debug state
-	 *
-	 * @return debug state
-	 */
-	public boolean getDebug()
-	{
-		return debug;
-	}
+  /**
+   * getter function for debug state
+   *
+   * @return debug state
+   */
+  public boolean getDebug()
+  {
+    return debug;
+  }
 
-	/**
-	 * setter function for debug state
-	 *
-	 * @param i
-	 *          sets debug to i
-	 */
-	public void setDebug(boolean i)
-	{
-		debug = i;
-	}
+  /**
+   * setter function for debug state
+   *
+   * @param i
+   *          sets debug to i
+   */
+  public void setDebug(boolean i)
+  {
+    debug = i;
+  }
 
-	@Min(1)
-	private int rollingwindowcount = 1;
+  @Min(1)
+  private int rollingwindowcount = 1;
 
-	public void setRollingwindowcount(int val)
-	{
-		rollingwindowcount = val;
-	}
+  public void setRollingwindowcount(int val)
+  {
+    rollingwindowcount = val;
+  }
 
-	/**
-	 * Sets up all the config parameters. Assumes checking is done and has passed
-	 *
-	 * @param context
-	 */
-	@Override
-	public void setup(OperatorContext context)
-	{
-		windowStartTime = 0;
-		if (rollingwindowcount != 1) { // Initialized the tuple_numbers
-			tuple_numbers = new long[rollingwindowcount];
-			time_numbers = new long[rollingwindowcount];
-			for (int i = tuple_numbers.length; i > 0; i--) {
-				tuple_numbers[i - 1] = 0;
-				time_numbers[i - 1] = 0;
-			}
-			tuple_index = 0;
-		}
-	}
+  /**
+   * Sets up all the config parameters. Assumes checking is done and has passed
+   *
+   * @param context
+   */
+  @Override
+  public void setup(OperatorContext context)
+  {
+    windowStartTime = 0;
+    if (rollingwindowcount != 1) { // Initialized the tuple_numbers
+      tuple_numbers = new long[rollingwindowcount];
+      time_numbers = new long[rollingwindowcount];
+      for (int i = tuple_numbers.length; i > 0; i--) {
+        tuple_numbers[i - 1] = 0;
+        time_numbers[i - 1] = 0;
+      }
+      tuple_index = 0;
+    }
+  }
 
-	@Override
-	public void beginWindow(long windowId)
-	{
-		if (tuple_count != 0) { // Do not restart time if no tuples were sent
-			windowStartTime = System.currentTimeMillis();
-			tuple_count = 0;
-		}
-	}
+  @Override
+  public void beginWindow(long windowId)
+  {
+    if (tuple_count != 0) { // Do not restart time if no tuples were sent
+      windowStartTime = System.currentTimeMillis();
+      tuple_count = 0;
+    }
+  }
 
-	/**
-	 * convenient method for not sending more than configured number of windows.
-	 */
-	@Override
-	public void endWindow()
-	{
-		if (!debug) {
-			return;
-		}
-		if (tuple_count == 0) {
-			return;
-		}
-		long elapsedTime = System.currentTimeMillis() - windowStartTime;
-		if (elapsedTime == 0) {
-			elapsedTime = 1; // prevent from / zero
-		}
+  /**
+   * convenient method for not sending more than configured number of windows.
+   */
+  @Override
+  public void endWindow()
+  {
+    if (!debug) {
+      return;
+    }
+    if (tuple_count == 0) {
+      return;
+    }
+    long elapsedTime = System.currentTimeMillis() - windowStartTime;
+    if (elapsedTime == 0) {
+      elapsedTime = 1; // prevent from / zero
+    }
 
-		long average;
-		long tuples_per_sec = (tuple_count * 1000) / elapsedTime; // * 1000 as
-																															// elapsedTime is
-																															// in millis
-		if (rollingwindowcount == 1) {
-			average = tuples_per_sec;
-		} else { // use tuple_numbers
-			long slots;
-			if (count_denominator == rollingwindowcount) {
-				tuple_numbers[tuple_index] = tuple_count;
-				time_numbers[tuple_index] = elapsedTime;
-				slots = rollingwindowcount;
-				tuple_index++;
-				if (tuple_index == rollingwindowcount) {
-					tuple_index = 0;
-				}
-			} else {
-				tuple_numbers[count_denominator - 1] = tuple_count;
-				time_numbers[count_denominator - 1] = elapsedTime;
-				slots = count_denominator;
-				count_denominator++;
-			}
-			long time_slot = 0;
-			long numtuples = 0;
-			for (int i = 0; i < slots; i++) {
-				numtuples += tuple_numbers[i];
-				time_slot += time_numbers[i];
-			}
-			average = (numtuples * 1000) / time_slot;
-		}
-		log.debug(String
-				.format(
-						"\nWindowid (%d), Time (%d ms): The rate for %d tuples is %d. This window had %d tuples_per_sec ",
-						count_windowid++, elapsedTime, tuple_count, average, tuples_per_sec));
-	}
+    long average;
+    long tuples_per_sec = (tuple_count * 1000) / elapsedTime; // * 1000 as
+                                                              // elapsedTime is
+                                                              // in millis
+    if (rollingwindowcount == 1) {
+      average = tuples_per_sec;
+    } else { // use tuple_numbers
+      long slots;
+      if (count_denominator == rollingwindowcount) {
+        tuple_numbers[tuple_index] = tuple_count;
+        time_numbers[tuple_index] = elapsedTime;
+        slots = rollingwindowcount;
+        tuple_index++;
+        if (tuple_index == rollingwindowcount) {
+          tuple_index = 0;
+        }
+      } else {
+        tuple_numbers[count_denominator - 1] = tuple_count;
+        time_numbers[count_denominator - 1] = elapsedTime;
+        slots = count_denominator;
+        count_denominator++;
+      }
+      long time_slot = 0;
+      long numtuples = 0;
+      for (int i = 0; i < slots; i++) {
+        numtuples += tuple_numbers[i];
+        time_slot += time_numbers[i];
+      }
+      average = (numtuples * 1000) / time_slot;
+    }
+    log.debug(String.format(
+        "\nWindowid (%d), Time (%d ms): The rate for %d tuples is %d. This window had %d tuples_per_sec ",
+        count_windowid++, elapsedTime, tuple_count, average, tuples_per_sec));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/HashMapToKeyValPair.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/stream/HashMapToKeyValPair.java b/library/src/main/java/com/datatorrent/lib/stream/HashMapToKeyValPair.java
index 03dba6d..29ce727 100644
--- a/library/src/main/java/com/datatorrent/lib/stream/HashMapToKeyValPair.java
+++ b/library/src/main/java/com/datatorrent/lib/stream/HashMapToKeyValPair.java
@@ -18,14 +18,15 @@
  */
 package com.datatorrent.lib.stream;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.util.BaseKeyValueOperator;
 import com.datatorrent.lib.util.KeyValPair;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * An implementation of BaseKeyValueOperator that breaks a HashMap tuple into objects.
@@ -52,47 +53,47 @@ import java.util.Map;
 @Stateless
 public class HashMapToKeyValPair<K, V> extends BaseKeyValueOperator<K, V>
 {
-	/**
-	 * Input port that takes a hashmap of &lt;key,value&rt;.
-	 */
-	public final transient DefaultInputPort<HashMap<K, V>> data = new DefaultInputPort<HashMap<K, V>>()
-	{
-		/**
-		 * Emits key, key/val pair, and val based on port connections
-		 */
-		@Override
-		public void process(HashMap<K, V> tuple)
-		{
-			for (Map.Entry<K, V> e : tuple.entrySet()) {
-				if (key.isConnected()) {
-					key.emit(cloneKey(e.getKey()));
-				}
-				if (val.isConnected()) {
-					val.emit(cloneValue(e.getValue()));
-				}
-				if (keyval.isConnected()) {
-					keyval.emit(new KeyValPair<K, V>(cloneKey(e.getKey()), cloneValue(e
-							.getValue())));
-				}
-			}
-		}
-	};
+  /**
+   * Input port that takes a hashmap of &lt;key,value&rt;.
+   */
+  public final transient DefaultInputPort<HashMap<K, V>> data = new DefaultInputPort<HashMap<K, V>>()
+  {
+    /**
+     * Emits key, key/val pair, and val based on port connections
+     */
+    @Override
+    public void process(HashMap<K, V> tuple)
+    {
+      for (Map.Entry<K, V> e : tuple.entrySet()) {
+        if (key.isConnected()) {
+          key.emit(cloneKey(e.getKey()));
+        }
+        if (val.isConnected()) {
+          val.emit(cloneValue(e.getValue()));
+        }
+        if (keyval.isConnected()) {
+          keyval.emit(new KeyValPair<K, V>(cloneKey(e.getKey()), cloneValue(e
+              .getValue())));
+        }
+      }
+    }
+  };
 
-	/**
-	 * Key output port.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<K> key = new DefaultOutputPort<K>();
+  /**
+   * Key output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<K> key = new DefaultOutputPort<K>();
 
-	/**
-	 * key/value pair output port.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<KeyValPair<K, V>> keyval = new DefaultOutputPort<KeyValPair<K, V>>();
+  /**
+   * key/value pair output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, V>> keyval = new DefaultOutputPort<KeyValPair<K, V>>();
 
-	/**
-	 * Value output port.
-	 */
-	@OutputPortFieldAnnotation(optional = true)
-	public final transient DefaultOutputPort<V> val = new DefaultOutputPort<V>();
+  /**
+   * Value output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<V> val = new DefaultOutputPort<V>();
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/JsonByteArrayOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/stream/JsonByteArrayOperator.java b/library/src/main/java/com/datatorrent/lib/stream/JsonByteArrayOperator.java
index 73ece79..b33eadc 100644
--- a/library/src/main/java/com/datatorrent/lib/stream/JsonByteArrayOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/stream/JsonByteArrayOperator.java
@@ -22,14 +22,14 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
-import com.datatorrent.api.annotation.Stateless;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**
@@ -70,8 +70,7 @@ public class JsonByteArrayOperator extends BaseOperator
         JSONObject value = jSONObject.optJSONObject(key);
         if (value == null) {
           map.put(insertKey, jSONObject.get(key));
-        }
-        else {
+        } else {
           getFlatMap(value, map, insertKey);
         }
       }
@@ -105,8 +104,7 @@ public class JsonByteArrayOperator extends BaseOperator
           outputFlatMap.emit(flatMap);
         }
 
-      }
-      catch (Throwable ex) {
+      } catch (Throwable ex) {
         DTThrowable.rethrow(ex);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/KeyValPairToHashMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/stream/KeyValPairToHashMap.java b/library/src/main/java/com/datatorrent/lib/stream/KeyValPairToHashMap.java
index 29574e6..dfa3ba2 100644
--- a/library/src/main/java/com/datatorrent/lib/stream/KeyValPairToHashMap.java
+++ b/library/src/main/java/com/datatorrent/lib/stream/KeyValPairToHashMap.java
@@ -18,12 +18,13 @@
  */
 package com.datatorrent.lib.stream;
 
+import java.util.HashMap;
+
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.util.BaseKeyValueOperator;
 import com.datatorrent.lib.util.KeyValPair;
-import java.util.HashMap;
 
 /**
  * An implementation of BaseKeyValueOperator that converts Key Value Pair to a HashMap tuple.
@@ -45,25 +46,25 @@ import java.util.HashMap;
 @Stateless
 public class KeyValPairToHashMap<K, V> extends BaseKeyValueOperator<K, V>
 {
-	/**
-	 * Input port that takes a key value pair.
-	 */
-	public final transient DefaultInputPort<KeyValPair<K, V>> keyval = new DefaultInputPort<KeyValPair<K, V>>()
-	{
-		/**
-		 * Emits key, key/val pair, and val based on port connections
-		 */
-		@Override
-		public void process(KeyValPair<K, V> tuple)
-		{
-			HashMap<K, V> otuple = new HashMap<K, V>(1);
-			otuple.put(tuple.getKey(), tuple.getValue());
-			map.emit(otuple);
-		}
-	};
+  /**
+   * Input port that takes a key value pair.
+   */
+  public final transient DefaultInputPort<KeyValPair<K, V>> keyval = new DefaultInputPort<KeyValPair<K, V>>()
+  {
+    /**
+     * Emits key, key/val pair, and val based on port connections
+     */
+    @Override
+    public void process(KeyValPair<K, V> tuple)
+    {
+      HashMap<K, V> otuple = new HashMap<K, V>(1);
+      otuple.put(tuple.getKey(), tuple.getValue());
+      map.emit(otuple);
+    }
+  };
 
-	/**
-	 * key/value map output port.
-	 */
-	public final transient DefaultOutputPort<HashMap<K, V>> map = new DefaultOutputPort<HashMap<K, V>>();
+  /**
+   * key/value map output port.
+   */
+  public final transient DefaultOutputPort<HashMap<K, V>> map = new DefaultOutputPort<HashMap<K, V>>();
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/RoundRobinHashMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/stream/RoundRobinHashMap.java b/library/src/main/java/com/datatorrent/lib/stream/RoundRobinHashMap.java
index cc47f5c..aee1213 100644
--- a/library/src/main/java/com/datatorrent/lib/stream/RoundRobinHashMap.java
+++ b/library/src/main/java/com/datatorrent/lib/stream/RoundRobinHashMap.java
@@ -18,10 +18,11 @@
  */
 package com.datatorrent.lib.stream;
 
+import java.util.HashMap;
+
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.lib.util.BaseKeyValueOperator;
-import java.util.HashMap;
 
 /**
  * <p>
@@ -49,56 +50,56 @@ import java.util.HashMap;
  */
 public class RoundRobinHashMap<K, V> extends BaseKeyValueOperator<K, V>
 {
-	/**
-	 * Keys for round robin association.
-	 */
-	protected K[] keys;
+  /**
+   * Keys for round robin association.
+   */
+  protected K[] keys;
 
-	/**
-	 * Current key index.
-	 */
-	protected int cursor = 0;
+  /**
+   * Current key index.
+   */
+  protected int cursor = 0;
 
-	private HashMap<K, V> otuple;
+  private HashMap<K, V> otuple;
 
-	/**
-	 * Value input port.
-	 */
-	public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
-	{
-		/**
-		 * Emits key, key/val pair, and val based on port connections
-		 */
-		@Override
-		public void process(V tuple)
-		{
-			if (keys.length == 0) {
-				return;
-			}
-			if (cursor == 0) {
-				otuple = new HashMap<K, V>();
-			}
-			otuple.put(keys[cursor], tuple);
-			if (++cursor >= keys.length) {
-				map.emit(otuple);
-				cursor = 0;
-				otuple = null;
-			}
-		}
-	};
+  /**
+   * Value input port.
+   */
+  public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
+  {
+    /**
+     * Emits key, key/val pair, and val based on port connections
+     */
+    @Override
+    public void process(V tuple)
+    {
+      if (keys.length == 0) {
+        return;
+      }
+      if (cursor == 0) {
+        otuple = new HashMap<K, V>();
+      }
+      otuple.put(keys[cursor], tuple);
+      if (++cursor >= keys.length) {
+        map.emit(otuple);
+        cursor = 0;
+        otuple = null;
+      }
+    }
+  };
 
-	/**
-	 * key/value map output port.
-	 */
-	public final transient DefaultOutputPort<HashMap<K, V>> map = new DefaultOutputPort<HashMap<K, V>>();
+  /**
+   * key/value map output port.
+   */
+  public final transient DefaultOutputPort<HashMap<K, V>> map = new DefaultOutputPort<HashMap<K, V>>();
 
-	/**
-	 * Keys for round robin asspociation, set by application.
-	 *
-	 * @param keys
-	 */
-	public void setKeys(K[] keys)
-	{
-		this.keys = keys;
-	}
+  /**
+   * Keys for round robin asspociation, set by application.
+   *
+   * @param keys
+   */
+  public void setKeys(K[] keys)
+  {
+    this.keys = keys;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/StreamDuplicater.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/stream/StreamDuplicater.java b/library/src/main/java/com/datatorrent/lib/stream/StreamDuplicater.java
index f13158e..f08b931 100644
--- a/library/src/main/java/com/datatorrent/lib/stream/StreamDuplicater.java
+++ b/library/src/main/java/com/datatorrent/lib/stream/StreamDuplicater.java
@@ -43,9 +43,9 @@ import com.datatorrent.lib.util.BaseKeyOperator;
 @Stateless
 public class StreamDuplicater<K> extends BaseKeyOperator<K>
 {
-	/**
-	 * Input data port.
-	 */
+  /**
+   * Input data port.
+   */
   public final transient DefaultInputPort<K> data = new DefaultInputPort<K>()
   {
     /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java b/library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java
index 5d4cfa6..8678e9f 100644
--- a/library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java
+++ b/library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java
@@ -18,10 +18,10 @@
  */
 package com.datatorrent.lib.stream;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * An implementation of BaseOperator that merges two streams with identical schema and emits the tuples to the output port in order.
@@ -41,10 +41,10 @@ import com.datatorrent.api.annotation.Stateless;
 @Stateless
 public class StreamMerger<K> extends BaseOperator
 {
-	/**
-	 * Data input port 1.
-	 */
- public final transient DefaultInputPort<K> data1 = new DefaultInputPort<K>()
+  /**
+   * Data input port 1.
+   */
+  public final transient DefaultInputPort<K> data1 = new DefaultInputPort<K>()
   {
     /**
      * Emits to port "out"

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java
index aefd6cd..e3bba8a 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java
@@ -18,14 +18,14 @@
  */
 package com.datatorrent.lib.streamquery;
 
-import com.datatorrent.common.util.BaseOperator;
+import java.util.ArrayList;
+import java.util.HashMap;
+
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-
-import java.util.ArrayList;
-import java.util.HashMap;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * A base implementation of a BaseOperator that is a sql stream operator.&nbsp;  Subclasses should provide the
@@ -58,7 +58,8 @@ public abstract class AbstractSqlStreamOperator extends BaseOperator
      */
     public HashMap<String, ColumnInfo> columnInfoMap = new HashMap<String, ColumnInfo>();
 
-    public InputSchema() {
+    public InputSchema()
+    {
     }
 
     public InputSchema(String name)

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java
index 70e4333..77c7522 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java
@@ -20,9 +20,9 @@ package com.datatorrent.lib.streamquery;
 
 import java.util.Map;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.streamquery.condition.Condition;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java
index ac05444..2fe8bc3 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java
@@ -18,15 +18,20 @@
  */
 package com.datatorrent.lib.streamquery;
 
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.lib.streamquery.AbstractSqlStreamOperator.InputSchema.ColumnInfo;
-
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.streamquery.AbstractSqlStreamOperator.InputSchema.ColumnInfo;
+
 /**
  * An implementation of AbstractSqlStreamOperator that provides embedded derby sql input operator.
  * <p>
@@ -38,13 +43,14 @@ import java.util.Map;
 public class DerbySqlStreamOperator extends AbstractSqlStreamOperator
 {
   protected transient ArrayList<PreparedStatement> insertStatements = new ArrayList<PreparedStatement>(5);
-    protected List<String> execStmtStringList = new ArrayList<String>();
+  protected List<String> execStmtStringList = new ArrayList<String>();
   protected transient ArrayList<PreparedStatement> execStatements = new ArrayList<PreparedStatement>(5);
   protected transient ArrayList<PreparedStatement> deleteStatements = new ArrayList<PreparedStatement>(5);
   protected transient Connection db;
 
-  public void addExecStatementString(String stmt) {
-       this.execStmtStringList.add(stmt);
+  public void addExecStatementString(String stmt)
+  {
+    this.execStmtStringList.add(stmt);
   }
 
 
@@ -54,8 +60,7 @@ public class DerbySqlStreamOperator extends AbstractSqlStreamOperator
     System.setProperty("derby.stream.error.file", "/dev/null");
     try {
       Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
 
@@ -74,7 +79,7 @@ public class DerbySqlStreamOperator extends AbstractSqlStreamOperator
         String columnNames = "";
         String insertQuestionMarks = "";
         int j = 0;
-        for (Map.Entry<String, ColumnInfo> entry: inputSchema.columnInfoMap.entrySet()) {
+        for (Map.Entry<String, ColumnInfo> entry : inputSchema.columnInfoMap.entrySet()) {
           if (!columnSpec.isEmpty()) {
             columnSpec += ",";
             columnNames += ",";
@@ -87,21 +92,22 @@ public class DerbySqlStreamOperator extends AbstractSqlStreamOperator
           insertQuestionMarks += "?";
           entry.getValue().bindIndex = ++j;
         }
-        String createTempTableStmt = "DECLARE GLOBAL TEMPORARY TABLE SESSION." + inputSchema.name + "(" + columnSpec + ") NOT LOGGED";
+        String createTempTableStmt =
+            "DECLARE GLOBAL TEMPORARY TABLE SESSION." + inputSchema.name + "(" + columnSpec + ") NOT LOGGED";
         st = db.prepareStatement(createTempTableStmt);
         st.execute();
         st.close();
 
-        String insertStmt = "INSERT INTO SESSION." + inputSchema.name + " (" + columnNames + ") VALUES (" + insertQuestionMarks + ")";
+        String insertStmt = "INSERT INTO SESSION." + inputSchema.name + " (" + columnNames + ") VALUES ("
+            + insertQuestionMarks + ")";
 
         insertStatements.add(i, db.prepareStatement(insertStmt));
         deleteStatements.add(i, db.prepareStatement("DELETE FROM SESSION." + inputSchema.name));
       }
-        for (String stmtStr: execStmtStringList) {
-            execStatements.add(db.prepareStatement(stmtStr));
-        }
-    }
-    catch (SQLException ex) {
+      for (String stmtStr : execStmtStringList) {
+        execStatements.add(db.prepareStatement(stmtStr));
+      }
+    } catch (SQLException ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -111,8 +117,7 @@ public class DerbySqlStreamOperator extends AbstractSqlStreamOperator
   {
     try {
       db.setAutoCommit(false);
-    }
-    catch (SQLException ex) {
+    } catch (SQLException ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -124,18 +129,16 @@ public class DerbySqlStreamOperator extends AbstractSqlStreamOperator
 
     PreparedStatement insertStatement = insertStatements.get(tableNum);
     try {
-      for (Map.Entry<String, Object> entry: tuple.entrySet()) {
+      for (Map.Entry<String, Object> entry : tuple.entrySet()) {
         ColumnInfo t = inputSchema.columnInfoMap.get(entry.getKey());
         if (t != null && t.bindIndex != 0) {
-          //System.out.println("Binding: "+entry.getValue().toString()+" to "+t.bindIndex);
           insertStatement.setString(t.bindIndex, entry.getValue().toString());
         }
       }
 
       insertStatement.executeUpdate();
       insertStatement.clearParameters();
-    }
-    catch (SQLException ex) {
+    } catch (SQLException ex) {
       throw new RuntimeException(ex);
     }
   }
@@ -147,48 +150,46 @@ public class DerbySqlStreamOperator extends AbstractSqlStreamOperator
       db.commit();
       if (bindings != null) {
         for (int i = 0; i < bindings.size(); i++) {
-            for (PreparedStatement stmt: execStatements) {
-                stmt.setString(i, bindings.get(i).toString());
-            }
+          for (PreparedStatement stmt : execStatements) {
+            stmt.setString(i, bindings.get(i).toString());
+          }
         }
       }
 
-
-     for (PreparedStatement stmt: execStatements) {
-          executePreparedStatement(stmt);
+      for (PreparedStatement stmt : execStatements) {
+        executePreparedStatement(stmt);
       }
-      for (PreparedStatement st: deleteStatements) {
+      for (PreparedStatement st : deleteStatements) {
         st.executeUpdate();
         st.clearParameters();
       }
-    }
-    catch (SQLException ex) {
+    } catch (SQLException ex) {
       throw new RuntimeException(ex);
     }
     bindings = null;
   }
 
-    private void executePreparedStatement(PreparedStatement statement) throws SQLException {
-        ResultSet res = statement.executeQuery();
-        ResultSetMetaData resmeta = res.getMetaData();
-        int columnCount = resmeta.getColumnCount();
-        while (res.next()) {
-            HashMap<String, Object> resultRow = new HashMap<String, Object>();
-            for (int i = 1; i <= columnCount; i++) {
-                resultRow.put(resmeta.getColumnName(i), res.getObject(i));
-            }
-            this.result.emit(resultRow);
-        }
-        statement.clearParameters();
+  private void executePreparedStatement(PreparedStatement statement) throws SQLException
+  {
+    ResultSet res = statement.executeQuery();
+    ResultSetMetaData resmeta = res.getMetaData();
+    int columnCount = resmeta.getColumnCount();
+    while (res.next()) {
+      HashMap<String, Object> resultRow = new HashMap<String, Object>();
+      for (int i = 1; i <= columnCount; i++) {
+        resultRow.put(resmeta.getColumnName(i), res.getObject(i));
+      }
+      this.result.emit(resultRow);
     }
+    statement.clearParameters();
+  }
 
   @Override
   public void teardown()
   {
     try {
       db.close();
-    }
-    catch (SQLException ex) {
+    } catch (SQLException ex) {
       throw new RuntimeException(ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java
index 63ad18a..1821953 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java
@@ -24,10 +24,10 @@ import java.util.Map;
 
 import javax.validation.constraints.NotNull;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.streamquery.condition.Condition;
 import com.datatorrent.lib.streamquery.condition.HavingCondition;
 import com.datatorrent.lib.streamquery.function.FunctionIndex;
@@ -100,14 +100,14 @@ public class GroupByHavingOperator extends BaseOperator
   {
     columnGroupIndexes.add(index);
   }
+
   public void addHavingCondition(@NotNull HavingCondition condition)
   {
     havingConditions.add(condition);
   }
 
   /**
-   * @param set
-   *          condition
+   * @param condition condition
    */
   public void setCondition(Condition condition)
   {
@@ -123,8 +123,9 @@ public class GroupByHavingOperator extends BaseOperator
     @Override
     public void process(Map<String, Object> tuple)
     {
-      if ((condition != null) && (!condition.isValidRow(tuple)))
+      if ((condition != null) && (!condition.isValidRow(tuple))) {
         return;
+      }
       rows.add(tuple);
     }
   };
@@ -193,8 +194,9 @@ public class GroupByHavingOperator extends BaseOperator
           return;
         }
       }
-      if (isValidHaving)
+      if (isValidHaving) {
         outport.emit(result);
+      }
     }
 
     rows = new ArrayList<Map<String, Object>>();
@@ -215,13 +217,13 @@ public class GroupByHavingOperator extends BaseOperator
     @Override
     public boolean equals(Object other)
     {
-      if (other instanceof MultiKeyCompare)
-        if (compareKeys.size() != ((MultiKeyCompare) other).compareKeys.size()) {
+      if (other instanceof MultiKeyCompare) {
+        if (compareKeys.size() != ((MultiKeyCompare)other).compareKeys.size()) {
           return false;
         }
+      }
       for (int i = 0; i < compareKeys.size(); i++) {
-        if (!(compareKeys.get(i).equals(((MultiKeyCompare) other).compareKeys
-            .get(i)))) {
+        if (!(compareKeys.get(i).equals(((MultiKeyCompare)other).compareKeys.get(i)))) {
           return false;
         }
       }
@@ -241,8 +243,9 @@ public class GroupByHavingOperator extends BaseOperator
     @Override
     public int compareTo(Object other)
     {
-      if (this.equals(other))
+      if (this.equals(other)) {
         return 0;
+      }
       return -1;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java
index f5eafb4..883329e 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java
@@ -94,38 +94,37 @@ public class InnerJoinOperator implements Operator
     {
       table1.add(tuple);
       for (int j = 0; j < table2.size(); j++) {
-        if ((joinCondition == null)
-                || (joinCondition.isValidJoin(tuple, table2.get(j)))) {
+        if ((joinCondition == null) || (joinCondition.isValidJoin(tuple, table2.get(j)))) {
           joinRows(tuple, table2.get(j));
         }
       }
-		}
-	};
-
-	/**
-	 * Input port 2 that takes a map of &lt;string,object&gt;.
-	 */
-	public final transient DefaultInputPort<Map<String, Object>> inport2 = new DefaultInputPort<Map<String, Object>>() {
-		@Override
-		public void process(Map<String, Object> tuple)
-		{
-	    table2.add(tuple);
+    }
+  };
+
+  /**
+   * Input port 2 that takes a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultInputPort<Map<String, Object>> inport2 = new DefaultInputPort<Map<String, Object>>()
+  {
+    @Override
+    public void process(Map<String, Object> tuple)
+    {
+      table2.add(tuple);
       for (int j = 0; j < table1.size(); j++) {
-        if ((joinCondition == null)
-                || (joinCondition.isValidJoin(table1.get(j), tuple))) {
+        if ((joinCondition == null) || (joinCondition.isValidJoin(table1.get(j), tuple))) {
           joinRows(table1.get(j), tuple);
         }
       }
-		}
-	};
+    }
+  };
 
-	/**
-	 * Output port that emits a map of &lt;string,object&gt;.
-	 */
-	public final transient DefaultOutputPort<Map<String, Object>> outport =
-			new DefaultOutputPort<Map<String, Object>>();
+  /**
+   * Output port that emits a map of &lt;string,object&gt;.
+   */
+  public final transient DefaultOutputPort<Map<String, Object>> outport =
+      new DefaultOutputPort<Map<String, Object>>();
 
-	@Override
+  @Override
   public void setup(OperatorContext arg0)
   {
     table1 = new ArrayList<Map<String, Object>>();
@@ -159,7 +158,7 @@ public class InnerJoinOperator implements Operator
 
   /**
    * Pick the supported condition. Currently only equal join is supported.
-   * @param set joinCondition
+   * @param joinCondition joinCondition
    */
   public void setJoinCondition(Condition joinCondition)
   {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/OrderByOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/OrderByOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/OrderByOperator.java
index ebc5d23..18d9928 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/OrderByOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/OrderByOperator.java
@@ -21,9 +21,9 @@ package com.datatorrent.lib.streamquery;
 import java.util.ArrayList;
 import java.util.Map;
 
+import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.Unifier;
 
@@ -49,49 +49,49 @@ import com.datatorrent.api.Operator.Unifier;
  */
 public class OrderByOperator implements Operator, Unifier<Map<String, Object>>
 {
-	/**
-	 * Order by rules.
-	 */
-	ArrayList<OrderByRule<?>>	orderByRules	= new ArrayList<OrderByRule<?>>();
-
-	/**
-	 * Descending flag.
-	 */
-	private boolean isDescending;
-
-	/**
-	 * collected rows.
-	 */
-	private ArrayList<Map<String, Object>> rows;
-
-	/**
-	 * Add order by rule.
-	 */
-	public void addOrderByRule(OrderByRule<?> rule)
-	{
-		orderByRules.add(rule);
-	}
-
-	/**
+  /**
+   * Order by rules.
+   */
+  ArrayList<OrderByRule<?>> orderByRules = new ArrayList<OrderByRule<?>>();
+
+  /**
+   * Descending flag.
+   */
+  private boolean isDescending;
+
+  /**
+   * collected rows.
+   */
+  private ArrayList<Map<String, Object>> rows;
+
+  /**
+   * Add order by rule.
+   */
+  public void addOrderByRule(OrderByRule<?> rule)
+  {
+    orderByRules.add(rule);
+  }
+
+  /**
    * @return isDescending
    */
   public boolean isDescending()
   {
-	  return isDescending;
+    return isDescending;
   }
 
-	/**
-   * @param set isDescending
+  /**
+   * @param isDescending isDescending
    */
   public void setDescending(boolean isDescending)
   {
-	  this.isDescending = isDescending;
+    this.isDescending = isDescending;
   }
 
-	@Override
+  @Override
   public void process(Map<String, Object> tuple)
   {
-	  rows.add(tuple);
+    rows.add(tuple);
   }
 
   @Override
@@ -103,13 +103,17 @@ public class OrderByOperator implements Operator, Unifier<Map<String, Object>>
   @Override
   public void endWindow()
   {
-    for (int i=0; i < orderByRules.size(); i++) {
+    for (int i = 0; i < orderByRules.size(); i++) {
       rows = orderByRules.get(i).sort(rows);
     }
     if (isDescending) {
-      for (int i=0; i < rows.size(); i++)  outport.emit(rows.get(i));
+      for (int i = 0; i < rows.size(); i++) {
+        outport.emit(rows.get(i));
+      }
     } else {
-      for (int i=rows.size()-1; i >= 0;  i--)  outport.emit(rows.get(i));
+      for (int i = rows.size() - 1; i >= 0; i--) {
+        outport.emit(rows.get(i));
+      }
     }
   }
 
@@ -130,7 +134,8 @@ public class OrderByOperator implements Operator, Unifier<Map<String, Object>>
   /**
    * Input port that takes a map of &lt;string,object&gt;.
    */
-  public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() {
+  public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
+  {
     @Override
     public void process(Map<String, Object> tuple)
     {
@@ -141,18 +146,19 @@ public class OrderByOperator implements Operator, Unifier<Map<String, Object>>
   /**
    * Output port that emits a map of &lt;string,object&gt;.
    */
-  public final transient DefaultOutputPort<Map<String, Object>> outport =  new DefaultOutputPort<Map<String, Object>>()
-      {
-         @Override
-         public Unifier<Map<String, Object>> getUnifier() {
-           OrderByOperator unifier = new OrderByOperator();
-           for (int i=0; i < getOrderByRules().size(); i++) {
-             unifier.addOrderByRule(getOrderByRules().get(i));
-           }
-           unifier.setDescending(isDescending);
-           return unifier;
-         }
-      };
+  public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>()
+  {
+    @Override
+    public Unifier<Map<String, Object>> getUnifier()
+    {
+      OrderByOperator unifier = new OrderByOperator();
+      for (int i = 0; i < getOrderByRules().size(); i++) {
+        unifier.addOrderByRule(getOrderByRules().get(i));
+      }
+      unifier.setDescending(isDescending);
+      return unifier;
+    }
+  };
 
   /**
    * @return the orderByRules

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/OrderByRule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/OrderByRule.java b/library/src/main/java/com/datatorrent/lib/streamquery/OrderByRule.java
index 0b16065..8573903 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/OrderByRule.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/OrderByRule.java
@@ -58,7 +58,7 @@ public class OrderByRule<T extends Comparable>
     for (int i = 0; i < rows.size(); i++) {
       Map<String, Object> row = rows.get(i);
       if (row.containsKey(columnName)) {
-        T value = (T) row.get(columnName);
+        T value = (T)row.get(columnName);
         ArrayList<Map<String, Object>> list;
         if (sorted.containsKey(value)) {
           list = sorted.get(value);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/OuterJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/OuterJoinOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/OuterJoinOperator.java
index 1e1dcfb..0494bfb 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/OuterJoinOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/OuterJoinOperator.java
@@ -88,8 +88,7 @@ public class OuterJoinOperator extends InnerJoinOperator
       for (int i = 0; i < table2.size(); i++) {
         boolean merged = false;
         for (int j = 0; j < table1.size(); j++) {
-          if ((joinCondition == null)
-              || (joinCondition.isValidJoin(table1.get(j), table2.get(i)))) {
+          if ((joinCondition == null) || (joinCondition.isValidJoin(table1.get(j), table2.get(i)))) {
             merged = true;
           }
         }
@@ -104,6 +103,7 @@ public class OuterJoinOperator extends InnerJoinOperator
   {
     isLeftJoin = true;
   }
+
   public void setRighttJoin()
   {
     isLeftJoin = false;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/SelectFunctionOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/SelectFunctionOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/SelectFunctionOperator.java
index c1c411c..77616f3 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/SelectFunctionOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/SelectFunctionOperator.java
@@ -95,7 +95,9 @@ public class SelectFunctionOperator implements Operator
   @Override
   public void endWindow()
   {
-    if (functions.size() == 0) return;
+    if (functions.size() == 0) {
+      return;
+    }
     Map<String, Object>  collect = new HashMap<String, Object>();
     for (FunctionIndex function : functions) {
       try {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/SelectOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/SelectOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/SelectOperator.java
index b02e40f..4dbc1f0 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/SelectOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/SelectOperator.java
@@ -22,9 +22,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.streamquery.condition.Condition;
 import com.datatorrent.lib.streamquery.index.Index;
 
@@ -89,8 +89,9 @@ public class SelectOperator extends BaseOperator
     @Override
     public void process(Map<String, Object> tuple)
     {
-      if ((condition != null) && (!condition.isValidRow(tuple)))
+      if ((condition != null) && (!condition.isValidRow(tuple))) {
         return;
+      }
       if (indexes.size() == 0) {
         outport.emit(tuple);
         return;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/SelectTopOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/SelectTopOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/SelectTopOperator.java
index 365642f..c3ae083 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/SelectTopOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/SelectTopOperator.java
@@ -58,7 +58,8 @@ public class SelectTopOperator implements Operator
   /**
    * Input port that takes a map of &lt;string,object&gt;.
    */
-  public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() {
+  public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
+  {
     @Override
     public void process(Map<String, Object> tuple)
     {
@@ -89,13 +90,13 @@ public class SelectTopOperator implements Operator
   @Override
   public void endWindow()
   {
-      int numEmits = topValue;
-      if (isPercentage) {
-        numEmits = list.size() * (topValue/100);
-      }
-      for (int i=0; (i < numEmits)&&(i < list.size()); i++) {
-        outport.emit(list.get(i));
-      }
+    int numEmits = topValue;
+    if (isPercentage) {
+      numEmits = list.size() * (topValue / 100);
+    }
+    for (int i = 0; (i < numEmits) && (i < list.size()); i++) {
+      outport.emit(list.get(i));
+    }
   }
 
   public int getTopValue()

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/UpdateOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/UpdateOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/UpdateOperator.java
index e130515..6724a7e 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/UpdateOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/UpdateOperator.java
@@ -21,9 +21,9 @@ package com.datatorrent.lib.streamquery;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.streamquery.condition.Condition;
 
 /**
@@ -54,33 +54,36 @@ public class UpdateOperator extends BaseOperator
    */
   Map<String, Object> updates = new HashMap<String, Object>();
 
-	/**
-	 *  condition.
-	 */
-	private Condition condition = null;
+  /**
+   *  condition.
+   */
+  private Condition condition = null;
 
-	/**
-	 * set condition.
-	 */
-	public void setCondition(Condition condition)
-	{
-		this.condition = condition;
-	}
+  /**
+   * set condition.
+   */
+  public void setCondition(Condition condition)
+  {
+    this.condition = condition;
+  }
 
   /**
    * Input port that takes a map of &lt;string,object&gt;.
    */
-  public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() {
+  public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>()
+  {
     @Override
     public void process(Map<String, Object> tuple)
     {
-      if ((condition != null)&&(!condition.isValidRow(tuple)))return;
+      if ((condition != null) && (!condition.isValidRow(tuple))) {
+        return;
+      }
       if (updates.size() == 0) {
         outport.emit(tuple);
         return;
       }
       Map<String, Object> result = new HashMap<String, Object>();
-      for(Map.Entry<String, Object> entry : tuple.entrySet()) {
+      for (Map.Entry<String, Object> entry : tuple.entrySet()) {
         if (updates.containsKey(entry.getKey())) {
           result.put(entry.getKey(), updates.get(entry.getKey()));
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/condition/BetweenCondition.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/BetweenCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/BetweenCondition.java
index efcb62c..43cdc72 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/BetweenCondition.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/BetweenCondition.java
@@ -74,11 +74,19 @@ public class BetweenCondition  extends Condition
   @Override
   public boolean isValidRow(@NotNull Map<String, Object> row)
   {
-    if (!row.containsKey(column)) return false;
+    if (!row.containsKey(column)) {
+      return false;
+    }
     Object value = row.get(column);
-    if (value == null) return false;
-    if (((Comparable)value).compareTo((Comparable)leftValue) < 0) return false;
-    if (((Comparable)value).compareTo((Comparable)rightValue) > 0) return false;
+    if (value == null) {
+      return false;
+    }
+    if (((Comparable)value).compareTo((Comparable)leftValue) < 0) {
+      return false;
+    }
+    if (((Comparable)value).compareTo((Comparable)rightValue) > 0) {
+      return false;
+    }
     return true;
   }
 
@@ -88,7 +96,7 @@ public class BetweenCondition  extends Condition
   @Override
   public boolean isValidJoin(@NotNull Map<String, Object> row1, Map<String, Object> row2)
   {
-    assert(false);
+    assert (false);
     return false;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/condition/CompoundCondition.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/CompoundCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/CompoundCondition.java
index 2caadc6..b4bd3ed 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/CompoundCondition.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/CompoundCondition.java
@@ -59,10 +59,12 @@ public class CompoundCondition extends Condition
 
   /**
    * Constructor for logical or metric.
+   *
    * @param leftCondition  Left validate row condition, must be non null. <br>
-   * @param rightCondition  Right validate row condition, must be non null. <br>
+   * @param rightCondition Right validate row condition, must be non null. <br>
    */
-  public CompoundCondition(Condition leftCondition, Condition rightCondition) {
+  public CompoundCondition(Condition leftCondition, Condition rightCondition)
+  {
     this.leftCondition = leftCondition;
     this.rightCondition = rightCondition;
   }
@@ -70,11 +72,13 @@ public class CompoundCondition extends Condition
   /**
    * Constructor for logical and metric if logical and parameter is true.
    * <br>
+   *
    * @param leftCondition  Left validate row condition, must be non null. <br>
-   * @param rightCondition  Right validate row condition, must be non null. <br>
-   * @param isLogicalAnd  Logical AND if true.
+   * @param rightCondition Right validate row condition, must be non null. <br>
+   * @param isLogicalAnd   Logical AND if true.
    */
-  public CompoundCondition(Condition leftCondition, Condition rightCondition, boolean isLogicalAnd) {
+  public CompoundCondition(Condition leftCondition, Condition rightCondition, boolean isLogicalAnd)
+  {
     this.leftCondition = leftCondition;
     this.rightCondition = rightCondition;
     logicalOr = !isLogicalAnd;
@@ -84,7 +88,7 @@ public class CompoundCondition extends Condition
   public boolean isValidRow(Map<String, Object> row)
   {
     if (logicalOr) {
-       return leftCondition.isValidRow(row) || rightCondition.isValidRow(row);
+      return leftCondition.isValidRow(row) || rightCondition.isValidRow(row);
     } else {
       return leftCondition.isValidRow(row) && rightCondition.isValidRow(row);
     }
@@ -117,7 +121,8 @@ public class CompoundCondition extends Condition
     this.rightCondition = rightCondition;
   }
 
-  public void setLogicalAnd() {
+  public void setLogicalAnd()
+  {
     this.logicalOr = false;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/condition/Condition.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/Condition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/Condition.java
index c0a4fde..86d5581 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/Condition.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/Condition.java
@@ -31,13 +31,14 @@ import javax.validation.constraints.NotNull;
  * @tags sql condition, filter
  * @since 0.3.3
  */
-abstract public class Condition
+public abstract class Condition
 {
-	/**
-	 * Row containing column/value map.
-	 * @return row validation status.
-	 */
-  abstract public boolean isValidRow(@NotNull Map<String, Object> row);
+  /**
+   * Row containing column/value map.
+   *
+   * @return row validation status.
+   */
+  public abstract boolean isValidRow(@NotNull Map<String, Object> row);
 
   /**
    * Filter valid rows only.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/condition/EqualValueCondition.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/EqualValueCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/EqualValueCondition.java
index fbcb9b0..bb478cf 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/EqualValueCondition.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/EqualValueCondition.java
@@ -57,23 +57,28 @@ public class EqualValueCondition extends Condition
   public boolean isValidRow(Map<String, Object> row)
   {
     // no conditions
-    if (equalMap.size() == 0)
+    if (equalMap.size() == 0) {
       return true;
+    }
 
     // compare each condition value
     for (Map.Entry<String, Object> entry : equalMap.entrySet()) {
-      if (!row.containsKey(entry.getKey()))
+      if (!row.containsKey(entry.getKey())) {
         return false;
+      }
       Object value = row.get(entry.getKey());
       if (entry.getValue() == null) {
-        if (value == null)
+        if (value == null) {
           return true;
+        }
         return false;
       }
-      if (value == null)
+      if (value == null) {
         return false;
-      if (!entry.getValue().equals(value))
+      }
+      if (!entry.getValue().equals(value)) {
         return false;
+      }
     }
     return true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCompareValue.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCompareValue.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCompareValue.java
index b0a3127..7877053 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCompareValue.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCompareValue.java
@@ -70,8 +70,8 @@ public class HavingCompareValue<T extends Comparable>   extends HavingCondition
   @Override
   public boolean isValidAggregate(@NotNull ArrayList<Map<String, Object>> rows) throws Exception
   {
-      Object computed = aggregateIndex.compute(rows);
-      return (compareType == compareValue.compareTo(computed));
+    Object computed = aggregateIndex.compute(rows);
+    return (compareType == compareValue.compareTo(computed));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCondition.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCondition.java
index 89451e2..6dac690 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCondition.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCondition.java
@@ -42,14 +42,15 @@ public abstract class HavingCondition
   protected FunctionIndex  aggregateIndex = null;
 
   /**
-   * @param aggregateIndex  Aggregate index to be validated.
+   * @param aggregateIndex Aggregate index to be validated.
    */
-  public HavingCondition(FunctionIndex  aggregateIndex) {
+  public HavingCondition(FunctionIndex aggregateIndex)
+  {
     this.aggregateIndex = aggregateIndex;
   }
 
   /**
    *  Check if aggregate is valid.
    */
-  abstract public boolean isValidAggregate(@NotNull ArrayList<Map<String, Object>> rows) throws Exception;
+  public abstract boolean isValidAggregate(@NotNull ArrayList<Map<String, Object>> rows) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/condition/InCondition.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/InCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/InCondition.java
index 0d5f5c2..236f3b1 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/InCondition.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/InCondition.java
@@ -50,16 +50,19 @@ public class InCondition extends Condition
   private Set<Object> inValues = new HashSet<Object>();
 
   /**
-   * @param  column Column name for which value is checked in values set.
+   * @param column Column name for which value is checked in values set.
    */
-  public InCondition(@NotNull String column) {
+  public InCondition(@NotNull String column)
+  {
     this.column = column;
   }
 
   @Override
   public boolean isValidRow(@NotNull Map<String, Object> row)
   {
-    if (!row.containsKey(column)) return false;
+    if (!row.containsKey(column)) {
+      return false;
+    }
     return inValues.contains(row.get(column));
   }
 
@@ -79,7 +82,8 @@ public class InCondition extends Condition
     this.column = column;
   }
 
-  public void addInValue(Object value) {
+  public void addInValue(Object value)
+  {
     this.inValues.add(value);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java
index f3b829a..d350edc 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java
@@ -48,7 +48,8 @@ public class JoinColumnEqualCondition  extends Condition
   @NotNull
   private String column2;
 
-  public JoinColumnEqualCondition(@NotNull String column1,@NotNull String column2) {
+  public JoinColumnEqualCondition(@NotNull String column1, @NotNull String column2)
+  {
     this.column1 = column1;
     this.column2 = column2;
   }
@@ -59,7 +60,7 @@ public class JoinColumnEqualCondition  extends Condition
   @Override
   public boolean isValidRow(Map<String, Object> row)
   {
-    assert(false);
+    assert (false);
     return false;
   }
 
@@ -69,7 +70,9 @@ public class JoinColumnEqualCondition  extends Condition
   @Override
   public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> row2)
   {
-    if (!row1.containsKey(column1) || !row2.containsKey(column2)) return false;
+    if (!row1.containsKey(column1) || !row2.containsKey(column2)) {
+      return false;
+    }
     Object value1 = row1.get(column1);
     Object value2 = row2.get(column2);
     return value1.equals(value2);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/condition/LikeCondition.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/LikeCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/LikeCondition.java
index f879cd6..b3d7174 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/LikeCondition.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/LikeCondition.java
@@ -54,7 +54,8 @@ public class LikeCondition extends Condition
    * @param column Column to be matched with regular expression, must be non-null.
    * @param pattern Regular expression pattern, must be non-null.
    */
-  public LikeCondition(@NotNull String column,@NotNull String pattern) {
+  public LikeCondition(@NotNull String column,@NotNull String pattern)
+  {
     setColumn(column);
     setPattern(pattern);
   }
@@ -66,10 +67,11 @@ public class LikeCondition extends Condition
   @Override
   public boolean isValidRow(Map<String, Object> row)
   {
-    if (!row.containsKey(column)) return false;
-    Matcher match = pattern.matcher((CharSequence) row.get(column));
-    if (!match.find()) return false;
-    return true;
+    if (!row.containsKey(column)) {
+      return false;
+    }
+    Matcher match = pattern.matcher((CharSequence)row.get(column));
+    return match.find();
   }
 
   /**
@@ -78,7 +80,7 @@ public class LikeCondition extends Condition
   @Override
   public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> row2)
   {
-    assert(false);
+    assert (false);
     return false;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/function/AverageFunction.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/function/AverageFunction.java b/library/src/main/java/com/datatorrent/lib/streamquery/function/AverageFunction.java
index 43223d1..e212ff8 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/function/AverageFunction.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/function/AverageFunction.java
@@ -55,12 +55,14 @@ public class AverageFunction  extends FunctionIndex
   @Override
   public Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws Exception
   {
-    if (rows.size() == 0) return 0.0;
+    if (rows.size() == 0) {
+      return 0.0;
+    }
     double sum = 0.0;
     for (Map<String, Object> row : rows) {
       sum += ((Number)row.get(column)).doubleValue();
     }
-    return sum/rows.size();
+    return sum / rows.size();
   }
 
   /**
@@ -70,7 +72,9 @@ public class AverageFunction  extends FunctionIndex
   @Override
   protected String aggregateName()
   {
-    if (!StringUtils.isEmpty(alias)) return alias;
+    if (!StringUtils.isEmpty(alias)) {
+      return alias;
+    }
     return "AVG(" + column + ")";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/function/CountFunction.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/function/CountFunction.java b/library/src/main/java/com/datatorrent/lib/streamquery/function/CountFunction.java
index 350a56a..dafe54e 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/function/CountFunction.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/function/CountFunction.java
@@ -57,10 +57,14 @@ public class CountFunction extends FunctionIndex
   @Override
   public Object compute(ArrayList<Map<String, Object>> rows) throws Exception
   {
-    if (column.equals("*")) return rows.size();
+    if (column.equals("*")) {
+      return rows.size();
+    }
     long count = 0;
     for (Map<String, Object> row : rows) {
-      if (row.containsKey(column) && (row.get(column) != null)) count++;
+      if (row.containsKey(column) && (row.get(column) != null)) {
+        count++;
+      }
     }
     return count;
   }
@@ -72,7 +76,9 @@ public class CountFunction extends FunctionIndex
   @Override
   protected String aggregateName()
   {
-    if (!StringUtils.isEmpty(alias)) return alias;
+    if (!StringUtils.isEmpty(alias)) {
+      return alias;
+    }
     return "COUNT(" + column + ")";
   }
 


Mime
View raw message