apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [05/12] incubator-apex-malhar git commit: APEXMALHAR-1991 #resolve #comment Move Dimensions Computation Classes to org.apache.apex.malhar package
Date Fri, 01 Apr 2016 03:54:45 GMT
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorMin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorMin.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorMin.java
new file mode 100644
index 0000000..1f53011
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorMin.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent;
+
+import com.datatorrent.api.annotation.Name;
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.Type;
+
+/**
+ * This {@link IncrementalAggregator} takes the min of the fields provided in the {@link InputEvent}.
+ *
+ * @since 3.1.0
+ */
+@Name("MIN")
+public class AggregatorMin extends AbstractIncrementalAggregator
+{
+  private static final long serialVersionUID = 20154301648L;
+
+  public AggregatorMin()
+  {
+    //Do nothing
+  }
+
+  @Override
+  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
+  {
+    Aggregate aggregate = super.getGroup(src, aggregatorIndex);
+
+    GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
+
+    return aggregate;
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, InputEvent src)
+  {
+    GPOMutable destAggs = dest.getAggregates();
+    GPOMutable srcAggs = src.getAggregates();
+
+    {
+      byte[] destByte = destAggs.getFieldsByte();
+      if (destByte != null) {
+        byte[] srcByte = srcAggs.getFieldsByte();
+        int[] srcIndices = context.indexSubsetAggregates.fieldsByteIndexSubset;
+        for (int index = 0;
+            index < destByte.length;
+            index++) {
+          byte tempVal = srcByte[srcIndices[index]];
+          if (destByte[index] > tempVal) {
+            destByte[index] = tempVal;
+          }
+        }
+      }
+    }
+
+    {
+      short[] destShort = destAggs.getFieldsShort();
+      if (destShort != null) {
+        short[] srcShort = srcAggs.getFieldsShort();
+        int[] srcIndices = context.indexSubsetAggregates.fieldsShortIndexSubset;
+        for (int index = 0;
+            index < destShort.length;
+            index++) {
+          short tempVal = srcShort[srcIndices[index]];
+          if (destShort[index] > tempVal) {
+            destShort[index] = tempVal;
+          }
+        }
+      }
+    }
+
+    {
+      int[] destInteger = destAggs.getFieldsInteger();
+      if (destInteger != null) {
+        int[] srcInteger = srcAggs.getFieldsInteger();
+        int[] srcIndices = context.indexSubsetAggregates.fieldsIntegerIndexSubset;
+        for (int index = 0;
+            index < destInteger.length;
+            index++) {
+          int tempVal = srcInteger[srcIndices[index]];
+          if (destInteger[index] > tempVal) {
+            destInteger[index] = tempVal;
+          }
+        }
+      }
+    }
+
+    {
+      long[] destLong = destAggs.getFieldsLong();
+      if (destLong != null) {
+        long[] srcLong = srcAggs.getFieldsLong();
+        int[] srcIndices = context.indexSubsetAggregates.fieldsLongIndexSubset;
+        for (int index = 0;
+            index < destLong.length;
+            index++) {
+          long tempVal = srcLong[srcIndices[index]];
+          if (destLong[index] > tempVal) {
+            destLong[index] = tempVal;
+          }
+        }
+      }
+    }
+
+    {
+      float[] destFloat = destAggs.getFieldsFloat();
+      if (destFloat != null) {
+        float[] srcFloat = srcAggs.getFieldsFloat();
+        int[] srcIndices = context.indexSubsetAggregates.fieldsFloatIndexSubset;
+        for (int index = 0;
+            index < destFloat.length;
+            index++) {
+          float tempVal = srcFloat[srcIndices[index]];
+          if (destFloat[index] > tempVal) {
+            destFloat[index] = tempVal;
+          }
+        }
+      }
+    }
+
+    {
+      double[] destDouble = destAggs.getFieldsDouble();
+      if (destDouble != null) {
+        double[] srcDouble = srcAggs.getFieldsDouble();
+        int[] srcIndices = context.indexSubsetAggregates.fieldsDoubleIndexSubset;
+        for (int index = 0;
+            index < destDouble.length;
+            index++) {
+          double tempVal = srcDouble[srcIndices[index]];
+          if (destDouble[index] > tempVal) {
+            destDouble[index] = tempVal;
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, Aggregate src)
+  {
+    GPOMutable destAggs = dest.getAggregates();
+    GPOMutable srcAggs = src.getAggregates();
+
+    {
+      byte[] destByte = destAggs.getFieldsByte();
+      if (destByte != null) {
+        byte[] srcByte = srcAggs.getFieldsByte();
+
+        for (int index = 0;
+            index < destByte.length;
+            index++) {
+          if (destByte[index] > srcByte[index]) {
+            destByte[index] = srcByte[index];
+          }
+        }
+      }
+    }
+
+    {
+      short[] destShort = destAggs.getFieldsShort();
+      if (destShort != null) {
+        short[] srcShort = srcAggs.getFieldsShort();
+
+        for (int index = 0;
+            index < destShort.length;
+            index++) {
+          if (destShort[index] > srcShort[index]) {
+            destShort[index] = srcShort[index];
+          }
+        }
+      }
+    }
+
+    {
+      int[] destInteger = destAggs.getFieldsInteger();
+      if (destInteger != null) {
+        int[] srcInteger = srcAggs.getFieldsInteger();
+
+        for (int index = 0;
+            index < destInteger.length;
+            index++) {
+          if (destInteger[index] > srcInteger[index]) {
+            destInteger[index] = srcInteger[index];
+          }
+        }
+      }
+    }
+
+    {
+      long[] destLong = destAggs.getFieldsLong();
+      if (destLong != null) {
+        long[] srcLong = srcAggs.getFieldsLong();
+
+        for (int index = 0;
+            index < destLong.length;
+            index++) {
+          if (destLong[index] > srcLong[index]) {
+            destLong[index] = srcLong[index];
+          }
+        }
+      }
+    }
+
+    {
+      float[] destFloat = destAggs.getFieldsFloat();
+      if (destFloat != null) {
+        float[] srcFloat = srcAggs.getFieldsFloat();
+
+        for (int index = 0;
+            index < destFloat.length;
+            index++) {
+          if (destFloat[index] > srcFloat[index]) {
+            destFloat[index] = srcFloat[index];
+          }
+        }
+      }
+    }
+
+    {
+      double[] destDouble = destAggs.getFieldsDouble();
+      if (destDouble != null) {
+        double[] srcDouble = srcAggs.getFieldsDouble();
+
+        for (int index = 0;
+            index < destDouble.length;
+            index++) {
+          if (destDouble[index] > srcDouble[index]) {
+            destDouble[index] = srcDouble[index];
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public Type getOutputType(Type inputType)
+  {
+    return AggregatorUtils.IDENTITY_NUMBER_TYPE_MAP.get(inputType);
+  }
+
+  @Override
+  public FieldsDescriptor getMetaDataDescriptor()
+  {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorOTFType.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorOTFType.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorOTFType.java
new file mode 100644
index 0000000..b46ae38
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorOTFType.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import java.util.Collections;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+/**
+ * This is a convenience enum to store all the information about default {@link OTFAggregator}s
+ * in one place.
+ *
+ * @since 3.1.0
+ */
+public enum AggregatorOTFType
+{
+  /**
+   * The average {@link OTFAggregator}.
+   */
+  AVG(AggregatorAverage.INSTANCE);
+
+  /**
+   * A map from {@link OTFAggregator} names to {@link OTFAggregator}s.
+   */
+  public static final Map<String, OTFAggregator> NAME_TO_AGGREGATOR;
+
+  static {
+    Map<String, OTFAggregator> nameToAggregator = Maps.newHashMap();
+
+    for (AggregatorOTFType aggType : AggregatorOTFType.values()) {
+      nameToAggregator.put(aggType.name(), aggType.getAggregator());
+    }
+
+    NAME_TO_AGGREGATOR = Collections.unmodifiableMap(nameToAggregator);
+  }
+
+  /**
+   * The {@link OTFAggregator} assigned to this enum.
+   */
+  private OTFAggregator aggregator;
+
+  /**
+   * Creates an {@link OTFAggregator} enum with the given aggregator.
+   *
+   * @param aggregator The {@link OTFAggregator} assigned to this enum.
+   */
+  AggregatorOTFType(OTFAggregator aggregator)
+  {
+    setAggregator(aggregator);
+  }
+
+  /**
+   * Sets the {@link OTFAggregator} assigned to this enum.
+   *
+   * @param aggregator The {@link OTFAggregator} assigned to this enum.
+   */
+  private void setAggregator(OTFAggregator aggregator)
+  {
+    this.aggregator = Preconditions.checkNotNull(aggregator);
+  }
+
+  /**
+   * Gets the {@link OTFAggregator} assigned to this enum.
+   *
+   * @return The {@link OTFAggregator} assigned to this enum.
+   */
+  public OTFAggregator getAggregator()
+  {
+    return aggregator;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java
new file mode 100644
index 0000000..fd9fc56
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * <p>
+ * This registry is used by generic dimensions computation operators and dimension stores in order to support
+ * plugging different
+ * aggregators into the operator. Subclasses of
+ * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema} use this registry
+ * to support pluggable aggregators when doing dimensions computation, and Subclasses of
+ * AppDataSingleSchemaDimensionStoreHDHT use this class as well.
+ * </p>
+ * <p>
+ * The primary purpose of an {@link AggregatorRegistry} is to provide a mapping from aggregator names to aggregators,
+ * and to provide mappings from aggregator IDs to aggregators. These mappings are necessary in order to correctly
+ * process schemas, App Data queries, and store aggregated data.
+ * </p>
+ *
+ * @since 3.1.0
+ */
+public class AggregatorRegistry implements Serializable
+{
+  private static final long serialVersionUID = 20154301642L;
+
+  /**
+   * This is a map from {@link IncrementalAggregator} names to {@link IncrementalAggregator}s used by the
+   * default {@link AggregatorRegistry}.
+   */
+  private static final transient Map<String, IncrementalAggregator> DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR;
+  /**
+   * This is a map from {@link OTFAggregator} names to {@link OTFAggregator}s used by the default
+   * {@link AggregatorRegistry}.
+   */
+  private static final transient Map<String, OTFAggregator> DEFAULT_NAME_TO_OTF_AGGREGATOR;
+
+  //Build the default maps
+  static {
+    DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR = Maps.newHashMap(AggregatorIncrementalType.NAME_TO_AGGREGATOR);
+    DEFAULT_NAME_TO_OTF_AGGREGATOR = Maps.newHashMap(AggregatorOTFType.NAME_TO_AGGREGATOR);
+  }
+
+  /**
+   * This is a default aggregator registry that can be used in operators.
+   */
+  public static final AggregatorRegistry DEFAULT_AGGREGATOR_REGISTRY = new AggregatorRegistry(
+      DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR, DEFAULT_NAME_TO_OTF_AGGREGATOR,
+      AggregatorIncrementalType.NAME_TO_ORDINAL);
+
+  /**
+   * This is a flag indicating whether or not this {@link AggregatorRegistry} has been setup before or not.
+   */
+  private transient boolean setup = false;
+  /**
+   * This is a map from the class of an {@link IncrementalAggregator} to the name of that
+   * {@link IncrementalAggregator}.
+   */
+  private transient Map<Class<? extends IncrementalAggregator>, String> classToIncrementalAggregatorName;
+  /**
+   * This is a map from the name of an {@link OTFAggregator} to the list of the names of all
+   * {@link IncrementalAggregator} that are child aggregators of that {@link OTFAggregator}.
+   */
+  private transient Map<String, List<String>> otfAggregatorToIncrementalAggregators;
+  /**
+   * This is a map from the aggregator ID of an
+   * {@link IncrementalAggregator} to the corresponding {@link IncrementalAggregator}.
+   */
+  private transient Map<Integer, IncrementalAggregator> incrementalAggregatorIDToAggregator;
+  /**
+   * This is a map from the name assigned to an {@link IncrementalAggregator} to the {@link IncrementalAggregator}.
+   */
+  private Map<String, IncrementalAggregator> nameToIncrementalAggregator;
+  /**
+   * This is a map from the name assigned to an {@link OTFAggregator} to the {@link OTFAggregator}.
+   */
+  private Map<String, OTFAggregator> nameToOTFAggregator;
+  /**
+   * This is a map from the name of an {@link IncrementalAggregator} to the ID of that {@link IncrementalAggregator}.
+   */
+  private Map<String, Integer> incrementalAggregatorNameToID;
+
+  /**
+   * This is a helper method used to autogenerate the IDs for each {@link IncrementalAggregator}
+   *
+   * @param nameToAggregator A mapping from the name of an {@link IncrementalAggregator} to the
+   *                         {@link IncrementalAggregator}.
+   * @return A mapping from the name of an {@link IncrementalAggregator} to the ID assigned to that
+   * {@link IncrementalAggregator}.
+   */
+  private static Map<String, Integer> autoGenIds(Map<String, IncrementalAggregator> nameToAggregator)
+  {
+    Map<String, Integer> staticAggregatorNameToID = Maps.newHashMap();
+
+    for (Map.Entry<String, IncrementalAggregator> entry : nameToAggregator.entrySet()) {
+      staticAggregatorNameToID.put(entry.getKey(), stringHash(entry.getValue().getClass().getName()));
+    }
+
+    return staticAggregatorNameToID;
+  }
+
+  /**
+   * This is a helper method for computing the hash of the string. This is intended to be a static unchanging
+   * method since the computed hash is used for aggregator IDs which are used for persistence.
+   * <p>
+   * <b>Note:</b> Do not change this function it will cause corruption for users updating existing data stores.
+   * </p>
+   *
+   * @return The hash of the given string.
+   */
+  private static int stringHash(String string)
+  {
+    int hash = 5381;
+
+    for (int index = 0;
+        index < string.length();
+        index++) {
+      int character = (int)string.charAt(index);
+      hash = hash * 33 + character;
+    }
+
+    return hash;
+  }
+
+  /**
+   * This constructor is present for Kryo serialization
+   */
+  private AggregatorRegistry()
+  {
+    //for kryo
+  }
+
+  /**
+   * <p>
+   * This creates an {@link AggregatorRegistry} which assigns the given names to the given
+   * {@link IncrementalAggregator}s and {@link OTFAggregator}s. This constructor also auto-generates
+   * the IDs associated with each {@link IncrementalAggregator} by computing the hashcode of the
+   * fully qualified class name of each {@link IncrementalAggregator}.
+   * </p>
+   * <p>
+   * <b>Note:</b> IDs only need to be generated for {@link IncrementalAggregator}s since they are the
+   * only type of stored aggregations. {@link OTFAggregator}s do not require an ID since they are not stored.
+   * </p>
+   *
+   * @param nameToIncrementalAggregator This is a map from {@link String} to {@link IncrementalAggregator},
+   *                                    where the string is the name of an
+   *                                    {@link IncrementalAggregator} and the value is the {@link IncrementalAggregator}
+   *                                    with that name.
+   * @param nameToOTFAggregator         This is a map from {@link String} to {@link OTFAggregator}, where the string
+   *                                    is the name of
+   *                                    an {@link OTFAggregator} and the value is the {@link OTFAggregator} with that
+   *                                    name.
+   */
+  public AggregatorRegistry(Map<String, IncrementalAggregator> nameToIncrementalAggregator,
+      Map<String, OTFAggregator> nameToOTFAggregator)
+  {
+    this(nameToIncrementalAggregator,
+        nameToOTFAggregator,
+        autoGenIds(nameToIncrementalAggregator));
+  }
+
+  /**
+   * <p>
+   * This creates an {@link AggregatorRegistry} which assigns the given names to the given
+   * {@link IncrementalAggregator}s and {@link OTFAggregator}s. This constructor assigns IDs to each
+   * {@link IncrementalAggregator} by using the provided map from incremental aggregator names to IDs.
+   * </p>
+   * <p>
+   * <b>Note:</b> IDs only need to be generated for {@link IncrementalAggregator}s since they are the
+   * only type of stored aggregations. {@link OTFAggregator}s do not require an ID since they are not stored.
+   * </p>
+   *
+   * @param nameToIncrementalAggregator   This is a map from {@link String} to {@link IncrementalAggregator},
+   *                                      where the string is the name of an
+   *                                      {@link IncrementalAggregator} and the value is the
+   *                                      {@link IncrementalAggregator}
+   *                                      with that name.
+   * @param nameToOTFAggregator           This is a map from {@link String} to {@link OTFAggregator}, where the
+   *                                      string is the name of
+   *                                      an {@link OTFAggregator} and the value is the {@link OTFAggregator} with
+   *                                      that name.
+   * @param incrementalAggregatorNameToID This is a map from the name of an {@link IncrementalAggregator} to the ID
+   *                                      for that
+   *                                      {@link IncrementalAggregator}.
+   */
+  public AggregatorRegistry(Map<String, IncrementalAggregator> nameToIncrementalAggregator,
+      Map<String, OTFAggregator> nameToOTFAggregator,
+      Map<String, Integer> incrementalAggregatorNameToID)
+  {
+    setNameToIncrementalAggregator(nameToIncrementalAggregator);
+    setNameToOTFAggregator(nameToOTFAggregator);
+
+    setIncrementalAggregatorNameToID(incrementalAggregatorNameToID);
+
+    validate();
+  }
+
+  /**
+   * This is a helper method which is used to do validation on the maps provided to the constructor of this class.
+   */
+  private void validate()
+  {
+    for (Map.Entry<String, IncrementalAggregator> entry : nameToIncrementalAggregator.entrySet()) {
+      Preconditions.checkNotNull(entry.getKey());
+      Preconditions.checkNotNull(entry.getValue());
+    }
+
+    for (Map.Entry<String, OTFAggregator> entry : nameToOTFAggregator.entrySet()) {
+      Preconditions.checkNotNull(entry.getKey());
+      Preconditions.checkNotNull(entry.getValue());
+    }
+
+    for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) {
+      Preconditions.checkNotNull(entry.getKey());
+      Preconditions.checkNotNull(entry.getValue());
+    }
+  }
+
+  /**
+   * This method is called to initialize various internal datastructures of the {@link AggregatorRegistry}.
+   * This method should be called before the {@link AggregatorRegistry} is used.
+   */
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public void setup()
+  {
+    if (setup) {
+      //If the AggregatorRegistry was already setup. Don't set it up again.
+      return;
+    }
+
+    setup = true;
+
+    classToIncrementalAggregatorName = Maps.newHashMap();
+
+    for (Map.Entry<String, IncrementalAggregator> entry : nameToIncrementalAggregator.entrySet()) {
+      classToIncrementalAggregatorName.put((Class)entry.getValue().getClass(), entry.getKey());
+    }
+
+    incrementalAggregatorIDToAggregator = Maps.newHashMap();
+
+    for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) {
+      String aggregatorName = entry.getKey();
+      int aggregatorID = entry.getValue();
+      incrementalAggregatorIDToAggregator.put(aggregatorID,
+          nameToIncrementalAggregator.get(aggregatorName));
+    }
+
+    otfAggregatorToIncrementalAggregators = Maps.newHashMap();
+
+    for (Map.Entry<String, OTFAggregator> entry : nameToOTFAggregator.entrySet()) {
+      String name = entry.getKey();
+      List<String> staticAggregators = Lists.newArrayList();
+
+      OTFAggregator dotfAggregator = nameToOTFAggregator.get(name);
+
+      for (Class clazz : dotfAggregator.getChildAggregators()) {
+        staticAggregators.add(classToIncrementalAggregatorName.get(clazz));
+      }
+
+      otfAggregatorToIncrementalAggregators.put(name, staticAggregators);
+    }
+  }
+
+  /**
+   * This is a helper method which sets and validated the given mapping from an {@link IncrementalAggregator}'s name
+   * to an {@link IncrementalAggregator}.
+   *
+   * @param nameToIncrementalAggregator The mapping from an {@link IncrementalAggregator}'s name to an
+   *                                    {@link IncrementalAggregator}.
+   */
+  private void setNameToIncrementalAggregator(Map<String, IncrementalAggregator> nameToIncrementalAggregator)
+  {
+    this.nameToIncrementalAggregator = Maps.newHashMap(Preconditions.checkNotNull(nameToIncrementalAggregator));
+  }
+
+  /**
+   * This is a helper method which sets and validates the given mapping from an {@link OTFAggregator}'s name to
+   * an {@link OTFAggregator}.
+   *
+   * @param nameToOTFAggregator The mapping from an {@link OTFAggregator}'s name to an {@link OTFAggregator}.
+   */
+  private void setNameToOTFAggregator(Map<String, OTFAggregator> nameToOTFAggregator)
+  {
+    this.nameToOTFAggregator = Maps.newHashMap(Preconditions.checkNotNull(nameToOTFAggregator));
+  }
+
+  /**
+   * Checks if the given aggregatorName is the name of an {@link IncrementalAggregator} or {@link OTFAggregator}
+   * registered to this registry.
+   *
+   * @param aggregatorName The aggregator name to check.
+   * @return True if the given aggregator name is the name of an {@link IncrementalAggregator} registered to
+   * this registry. False otherwise.
+   */
+  public boolean isAggregator(String aggregatorName)
+  {
+    return classToIncrementalAggregatorName.values().contains(aggregatorName) ||
+        nameToOTFAggregator.containsKey(aggregatorName);
+  }
+
+  /**
+   * Checks if the given aggregator name is the name of an {@link IncrementalAggregator} registered
+   * to this registry.
+   *
+   * @param aggregatorName The aggregator name to check.
+   * @return True if the given aggregator name is the name of an {@link IncrementalAggregator} registered
+   * to this registry. False otherwise.
+   */
+  public boolean isIncrementalAggregator(String aggregatorName)
+  {
+    return classToIncrementalAggregatorName.values().contains(aggregatorName);
+  }
+
+  /**
+   * Gets the mapping from an {@link IncrementalAggregator}'s class to the {@link IncrementalAggregator}.
+   *
+   * @return The mapping from an {@link IncrementalAggregator}'s class to the {@link IncrementalAggregator}.
+   */
+  public Map<Class<? extends IncrementalAggregator>, String> getClassToIncrementalAggregatorName()
+  {
+    return classToIncrementalAggregatorName;
+  }
+
+  /**
+   * Gets the mapping from an {@link IncrementalAggregator}'s ID to the {@link IncrementalAggregator}.
+   *
+   * @return The mapping from an {@link IncrementalAggregator}'s ID to the {@link IncrementalAggregator}.
+   */
+  public Map<Integer, IncrementalAggregator> getIncrementalAggregatorIDToAggregator()
+  {
+    return incrementalAggregatorIDToAggregator;
+  }
+
+  /**
+   * This a helper method which sets and validates the mapping from {@link IncrementalAggregator} name to
+   * {@link IncrementalAggregator} ID.
+   *
+   * @param incrementalAggregatorNameToID The mapping from {@link IncrementalAggregator} name to
+   *                                      {@link IncrementalAggregator} ID.
+   */
+  private void setIncrementalAggregatorNameToID(Map<String, Integer> incrementalAggregatorNameToID)
+  {
+    Preconditions.checkNotNull(incrementalAggregatorNameToID);
+
+    for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) {
+      Preconditions.checkNotNull(entry.getKey());
+      Preconditions.checkNotNull(entry.getValue());
+    }
+
+    this.incrementalAggregatorNameToID = Maps.newHashMap(incrementalAggregatorNameToID);
+  }
+
+  /**
+   * This returns a map from the names of an {@link IncrementalAggregator}s to the corresponding ID of the
+   * {@link IncrementalAggregator}.
+   *
+   * @return Returns a map from the names of an {@link IncrementalAggregator} to the corresponding ID of the
+   * {@link IncrementalAggregator}.
+   */
+  public Map<String, Integer> getIncrementalAggregatorNameToID()
+  {
+    return incrementalAggregatorNameToID;
+  }
+
+  /**
+   * Returns the name to {@link OTFAggregator} mapping, where the key is the name of the {@link OTFAggregator}.
+   *
+   * @return The name to {@link OTFAggregator} mapping.
+   */
+  public Map<String, OTFAggregator> getNameToOTFAggregators()
+  {
+    return nameToOTFAggregator;
+  }
+
+  /**
+   * Returns the mapping from {@link OTFAggregator} names to a list of names of all the child aggregators of
+   * that {@link OTFAggregator}.
+   *
+   * @return The mapping from {@link OTFAggregator} names to a list of names of all the child aggregators of
+   * that {@link OTFAggregator}.
+   */
+  public Map<String, List<String>> getOTFAggregatorToIncrementalAggregators()
+  {
+    return otfAggregatorToIncrementalAggregators;
+  }
+
+  /**
+   * Returns the name to {@link IncrementalAggregator} mapping, where the key is the name of the {@link OTFAggregator}.
+   *
+   * @return The name to {@link IncrementalAggregator} mapping.
+   */
+  public Map<String, IncrementalAggregator> getNameToIncrementalAggregator()
+  {
+    return nameToIncrementalAggregator;
+  }
+
+  private static final Logger lOG = LoggerFactory.getLogger(AggregatorRegistry.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorSum.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorSum.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorSum.java
new file mode 100644
index 0000000..9c79247
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorSum.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent;
+
+import com.datatorrent.api.annotation.Name;
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.Type;
+
+/**
+ * This {@link IncrementalAggregator} performs a sum operation over the fields in the given {@link InputEvent}.
+ *
+ * @since 3.1.0
+ */
+@Name("SUM")
+public class AggregatorSum extends AbstractIncrementalAggregator
+{
+  private static final long serialVersionUID = 20154301649L;
+
+  public AggregatorSum()
+  {
+    //Do nothing
+  }
+
+  @Override
+  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
+  {
+    src.used = true;
+    Aggregate aggregate = createAggregate(src,
+        context,
+        aggregatorIndex);
+
+    GPOMutable value = aggregate.getAggregates();
+    GPOUtils.zeroFillNumeric(value);
+
+    return aggregate;
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, Aggregate src)
+  {
+    GPOMutable destAggs = dest.getAggregates();
+    GPOMutable srcAggs = src.getAggregates();
+
+    aggregateAggs(destAggs, srcAggs);
+  }
+
+  public void aggregateAggs(GPOMutable destAggs, GPOMutable srcAggs)
+  {
+    {
+      byte[] destByte = destAggs.getFieldsByte();
+      if (destByte != null) {
+        byte[] srcByte = srcAggs.getFieldsByte();
+
+        for (int index = 0;
+            index < destByte.length;
+            index++) {
+          destByte[index] += srcByte[index];
+        }
+      }
+    }
+
+    {
+      short[] destShort = destAggs.getFieldsShort();
+      if (destShort != null) {
+        short[] srcShort = srcAggs.getFieldsShort();
+
+        for (int index = 0;
+            index < destShort.length;
+            index++) {
+          destShort[index] += srcShort[index];
+        }
+      }
+    }
+
+    {
+      int[] destInteger = destAggs.getFieldsInteger();
+      if (destInteger != null) {
+        int[] srcInteger = srcAggs.getFieldsInteger();
+
+        for (int index = 0;
+            index < destInteger.length;
+            index++) {
+          destInteger[index] += srcInteger[index];
+        }
+      }
+    }
+
+    {
+      long[] destLong = destAggs.getFieldsLong();
+      if (destLong != null) {
+        long[] srcLong = srcAggs.getFieldsLong();
+
+        for (int index = 0;
+            index < destLong.length;
+            index++) {
+          destLong[index] += srcLong[index];
+        }
+      }
+    }
+
+    {
+      float[] destFloat = destAggs.getFieldsFloat();
+      if (destFloat != null) {
+        float[] srcFloat = srcAggs.getFieldsFloat();
+
+        for (int index = 0;
+            index < destFloat.length;
+            index++) {
+          destFloat[index] += srcFloat[index];
+        }
+      }
+    }
+
+    {
+      double[] destDouble = destAggs.getFieldsDouble();
+      if (destDouble != null) {
+        double[] srcDouble = srcAggs.getFieldsDouble();
+
+        for (int index = 0;
+            index < destDouble.length;
+            index++) {
+          destDouble[index] += srcDouble[index];
+        }
+      }
+    }
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, InputEvent src)
+  {
+    GPOMutable destAggs = dest.getAggregates();
+    GPOMutable srcAggs = src.getAggregates();
+
+    aggregateInput(destAggs, srcAggs);
+  }
+
+  public void aggregateInput(GPOMutable destAggs, GPOMutable srcAggs)
+  {
+    {
+      byte[] destByte = destAggs.getFieldsByte();
+      if (destByte != null) {
+        byte[] srcByte = srcAggs.getFieldsByte();
+        int[] srcIndices = context.indexSubsetAggregates.fieldsByteIndexSubset;
+        for (int index = 0;
+            index < destByte.length;
+            index++) {
+          destByte[index] += srcByte[srcIndices[index]];
+        }
+      }
+    }
+
+    {
+      short[] destShort = destAggs.getFieldsShort();
+      if (destShort != null) {
+        short[] srcShort = srcAggs.getFieldsShort();
+        int[] srcIndices = context.indexSubsetAggregates.fieldsShortIndexSubset;
+        for (int index = 0;
+            index < destShort.length;
+            index++) {
+          destShort[index] += srcShort[srcIndices[index]];
+        }
+      }
+    }
+
+    {
+      int[] destInteger = destAggs.getFieldsInteger();
+      if (destInteger != null) {
+        int[] srcInteger = srcAggs.getFieldsInteger();
+        int[] srcIndices = context.indexSubsetAggregates.fieldsIntegerIndexSubset;
+        for (int index = 0;
+            index < destInteger.length;
+            index++) {
+          destInteger[index] += srcInteger[srcIndices[index]];
+        }
+      }
+    }
+
+    {
+      long[] destLong = destAggs.getFieldsLong();
+      if (destLong != null) {
+        long[] srcLong = srcAggs.getFieldsLong();
+        int[] srcIndices = context.indexSubsetAggregates.fieldsLongIndexSubset;
+        for (int index = 0;
+            index < destLong.length;
+            index++) {
+          destLong[index] += srcLong[srcIndices[index]];
+        }
+      }
+    }
+
+    {
+      float[] destFloat = destAggs.getFieldsFloat();
+      if (destFloat != null) {
+        float[] srcFloat = srcAggs.getFieldsFloat();
+        int[] srcIndices = context.indexSubsetAggregates.fieldsFloatIndexSubset;
+        for (int index = 0;
+            index < destFloat.length;
+            index++) {
+          destFloat[index] += srcFloat[srcIndices[index]];
+        }
+      }
+    }
+
+    {
+      double[] destDouble = destAggs.getFieldsDouble();
+      if (destDouble != null) {
+        double[] srcDouble = srcAggs.getFieldsDouble();
+        int[] srcIndices = context.indexSubsetAggregates.fieldsDoubleIndexSubset;
+        for (int index = 0;
+            index < destDouble.length;
+            index++) {
+          destDouble[index] += srcDouble[srcIndices[index]];
+        }
+      }
+    }
+  }
+
+  @Override
+  public Type getOutputType(Type inputType)
+  {
+    return AggregatorUtils.IDENTITY_NUMBER_TYPE_MAP.get(inputType);
+  }
+
+  @Override
+  public FieldsDescriptor getMetaDataDescriptor()
+  {
+    return null;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(AggregatorSum.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java
new file mode 100644
index 0000000..6085254
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import java.util.Collections;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.lib.appdata.schemas.Fields;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.Type;
+
+/**
+ * This class contains utility methods which are useful for aggregators.
+ *
+ * @since 3.1.0
+ */
+public final class AggregatorUtils
+{
+  /**
+   * This is an identity type map, which maps input types to the same output types.
+   */
+  public static final transient Map<Type, Type> IDENTITY_TYPE_MAP;
+  /**
+   * This is an identity type map, for numeric types only. This is
+   * helpful when creating aggregators like {@link AggregatorSum}, where the sum of ints is an
+   * int and the sum of floats is a float.
+   */
+  public static final transient Map<Type, Type> IDENTITY_NUMBER_TYPE_MAP;
+
+  static {
+    Map<Type, Type> identityTypeMap = Maps.newHashMap();
+
+    for (Type type : Type.values()) {
+      identityTypeMap.put(type, type);
+    }
+
+    IDENTITY_TYPE_MAP = Collections.unmodifiableMap(identityTypeMap);
+
+    Map<Type, Type> identityNumberTypeMap = Maps.newHashMap();
+
+    for (Type type : Type.NUMERIC_TYPES) {
+      identityNumberTypeMap.put(type, type);
+    }
+
+    IDENTITY_NUMBER_TYPE_MAP = Collections.unmodifiableMap(identityNumberTypeMap);
+  }
+
+  /**
+   * Don't instantiate this class.
+   */
+  private AggregatorUtils()
+  {
+    //Don't instantiate this class.
+  }
+
+  /**
+   * This is a helper method which takes a {@link FieldsDescriptor} object, which defines the types of the fields
+   * that the {@link IncrementalAggregator} receives as input. It then uses the given {@link IncrementalAggregator}
+   * and {@link FieldsDescriptor} object to compute the {@link FieldsDescriptor} object for the aggregation produced
+   * byte the given
+   * {@link IncrementalAggregator} when it receives an input corresponding to the given input {@link FieldsDescriptor}.
+   *
+   * @param inputFieldsDescriptor This is a {@link FieldsDescriptor} object which defines the names and types of input
+   *                              data recieved by an aggregator.
+   * @param incrementalAggregator This is the
+   * {@link IncrementalAggregator} for which an output {@link FieldsDescriptor} needs
+   *                              to be computed.
+   * @return The output {@link FieldsDescriptor} for this aggregator when it receives input data with the same schema as
+   * the specified input {@link FieldsDescriptor}.
+   */
+  public static FieldsDescriptor getOutputFieldsDescriptor(FieldsDescriptor inputFieldsDescriptor,
+      IncrementalAggregator incrementalAggregator)
+  {
+    Map<String, Type> fieldToType = Maps.newHashMap();
+
+    for (Map.Entry<String, Type> entry :
+        inputFieldsDescriptor.getFieldToType().entrySet()) {
+      String fieldName = entry.getKey();
+      Type fieldType = entry.getValue();
+      Type outputType = incrementalAggregator.getOutputType(fieldType);
+      fieldToType.put(fieldName, outputType);
+    }
+
+    return new FieldsDescriptor(fieldToType);
+  }
+
+  /**
+   * This is a utility method which creates an output {@link FieldsDescriptor} using the field names
+   * from the given {@link FieldsDescriptor} and the output type of the given {@link OTFAggregator}.
+   *
+   * @param inputFieldsDescriptor The {@link FieldsDescriptor} from which to derive the field names used
+   *                              for the output fields descriptor.
+   * @param otfAggregator         The {@link OTFAggregator} to use for creating the output {@link FieldsDescriptor}.
+   * @return The output {@link FieldsDescriptor}.
+   */
+  public static FieldsDescriptor getOutputFieldsDescriptor(FieldsDescriptor inputFieldsDescriptor,
+      OTFAggregator otfAggregator)
+  {
+    Map<String, Type> fieldToType = Maps.newHashMap();
+
+    for (Map.Entry<String, Type> entry :
+        inputFieldsDescriptor.getFieldToType().entrySet()) {
+      String fieldName = entry.getKey();
+      Type outputType = otfAggregator.getOutputType();
+      fieldToType.put(fieldName, outputType);
+    }
+
+    return new FieldsDescriptor(fieldToType);
+  }
+
+  /**
+   * This is a utility method which creates an output {@link FieldsDescriptor} from the
+   * given field names and the given {@link OTFAggregator}.
+   *
+   * @param fields        The names of the fields to be included in the output {@link FieldsDescriptor}.
+   * @param otfAggregator The {@link OTFAggregator} to use when creating the output {@link FieldsDescriptor}.
+   * @return The output {@link FieldsDescriptor}.
+   */
+  public static FieldsDescriptor getOutputFieldsDescriptor(Fields fields,
+      OTFAggregator otfAggregator)
+  {
+    Map<String, Type> fieldToType = Maps.newHashMap();
+
+    for (String field : fields.getFields()) {
+      fieldToType.put(field, otfAggregator.getOutputType());
+    }
+
+    return new FieldsDescriptor(fieldToType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/IncrementalAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/IncrementalAggregator.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/IncrementalAggregator.java
new file mode 100644
index 0000000..33e868f
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/IncrementalAggregator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import org.apache.apex.malhar.lib.dimensions.DimensionsConversionContext;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent;
+import org.apache.apex.malhar.lib.dimensions.aggregator.AggregateEvent.Aggregator;
+
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.Type;
+
+/**
+ * <p>
+ * {@link IncrementalAggregator}s perform aggregations in place, on a field by field basis. For example if we have a
+ * field cost, an incremental aggregator would take a new value of cost and aggregate it to an aggregate value for
+ * cost. No fields except the cost field are used in the computation of the cost aggregation in the case of an
+ * {@link IncrementalAggregator}.
+ * </p>
+ * <p>
+ * {@link IncrementalAggregator}s are intended to be used with subclasses of
+ * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema}. The way in which
+ * {@link IncrementalAggregator}s are used in this context is that a batch of fields to be aggregated by the aggregator
+ * are provided in the form of an {@link InputEvent}. For example, if there are two fields (cost and revenue), which
+ * will be aggregated by a sum aggregator, both of those fields will be included in the {@link InputEvent} passed to
+ * the sum aggregator. And the {DimensionsEventregate} event produced by the sum aggregator will contain two fields,
+ * one for cost and one for revenue.
+ * </p>
+ *
+ */
+public interface IncrementalAggregator extends Aggregator<InputEvent, Aggregate>
+{
+  /**
+   * This method defines the type mapping for the {@link IncrementalAggregator}. The type mapping defines the
+   * relationship between the type of an input field and the type of its aggregate. For example if the aggregator takes
+   * a field of type int and produces an aggregate of type float, then this method would return a type of float when
+   * the given input type is an int.
+   * @param inputType The type of a field to be aggregate.
+   * @return The type of the aggregate corresponding to an input field of the given type.
+   */
+  public Type getOutputType(Type inputType);
+
+  /**
+   * This sets
+   */
+  public void setDimensionsConversionContext(DimensionsConversionContext context);
+
+  /**
+   * Returns a {@link FieldsDescriptor} object which describes the meta data that is stored along with aggregations.
+   * This method returns null if this aggregator stores no metadata.
+   * @return A {@link FieldsDescriptor} object which describes the meta data that is stored along with aggregations.
+   * This method returns null if this aggregator stores no metadata.
+   */
+  public FieldsDescriptor getMetaDataDescriptor();
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/OTFAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/OTFAggregator.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/OTFAggregator.java
new file mode 100644
index 0000000..cef32db
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/OTFAggregator.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import java.io.Serializable;
+
+import java.util.List;
+
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.schemas.Type;
+
+/**
+ * <p>
+ * This interface represents an On The Fly Aggregator. On the fly aggregators represent a class
+ * of aggregations which use the results of incremental aggregators, which implement the
+ * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator} interface. An example of an aggregation which
+ * needs to be performed on the fly is average. Average needs to be performed on the fly because average cannot be
+ * computed with just an existing average and a new data item, an average required the sum of all data items, and the
+ * count of all data items. An example implementation of average is {@link AggregatorAverage}. Also note
+ * that unlike {@link IncrementalAggregator}s an {@link OTFAggregator} only has one output type. This done
+ * because {@link OTFAggregator}s usually represent a very specific computation, with a specific output type.
+ * For example, average is a computation that you will almost always want to produce a double. But if you require
+ * an average operation that produces an integer, that could be done as a separate {@link OTFAggregator}.
+ * </p>
+ * <p>
+ * The primary usage for {@link OTFAggregator}s are in store operators which respond to queries. Currently,
+ * the only places which utilize {@link OTFAggregator}s are subclasses of the DimensionsStoreHDHT operator.
+ * </p>
+ * <p>
+ * This interface extends {@link Serializable} because On The Fly aggregators may be set
+ * as properties on some operators and operator properties are required to be java serializable.
+ * </p>
+ * @since 3.1.0
+ */
+public interface OTFAggregator extends Serializable
+{
+  public static final long serialVersionUID = 201505251039L;
+
+  /**
+   * This method returns all the incremental aggregators on which this aggregator depends on
+   * to compute its result. In the case of {@link AggregatorAverage} it's child aggregators are
+   * {@link AggregatorCount} and {@link AggregatorSum}.
+   * @return All the incremental aggregators on which this aggregator depends on to compute its
+   * result.
+   */
+
+  public List<Class<? extends IncrementalAggregator>> getChildAggregators();
+  /**
+   * This method performs an on the fly aggregation from the given aggregates. The aggregates
+   * provided to this aggregator are each the result of one of this aggregators child aggregators.
+   * The order in which the aggregates are passed to this method is the same as the order in
+   * which the child aggregators are listed in the result of the {@link #getChildAggregators} method.
+   * Also note that this aggregator does not aggregate one field at a time. This aggregator recieves
+   * a batch of fields from each child aggregator, and the result of the method is also a batch of fields.
+   * @param aggregates These are the results of all the child aggregators. The results are in the same
+   * order as the child aggregators specified in the result of the {@link #getChildAggregators} method.
+   * @return The result of the on the fly aggregation.
+   */
+
+  public GPOMutable aggregate(GPOMutable... aggregates);
+  /**
+   * Returns the output type of the {@link OTFAggregator}. <b>Note<b> that any combination of input types
+   * will produce the same output type for {@link OTFAggregator}s.
+   * @return The output type of the {@link OTFAggregator}.
+   */
+
+  public Type getOutputType();
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/package-info.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/package-info.java
new file mode 100644
index 0000000..acee645
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/package-info.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+package org.apache.apex.malhar.lib.dimensions;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/test/java/com/datatorrent/lib/appdata/dimensions/CustomTimeBucketRegistryTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/dimensions/CustomTimeBucketRegistryTest.java b/library/src/test/java/com/datatorrent/lib/appdata/dimensions/CustomTimeBucketRegistryTest.java
index 67f8c76..00a925b 100644
--- a/library/src/test/java/com/datatorrent/lib/appdata/dimensions/CustomTimeBucketRegistryTest.java
+++ b/library/src/test/java/com/datatorrent/lib/appdata/dimensions/CustomTimeBucketRegistryTest.java
@@ -18,13 +18,13 @@
  */
 package com.datatorrent.lib.appdata.dimensions;
 
-
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.apex.malhar.lib.dimensions.CustomTimeBucketRegistry;
+
 import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
 import com.datatorrent.lib.appdata.schemas.TimeBucket;
-import com.datatorrent.lib.dimensions.CustomTimeBucketRegistry;
 
 public class CustomTimeBucketRegistryTest
 {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/test/java/com/datatorrent/lib/appdata/dimensions/DimensionsEventTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/dimensions/DimensionsEventTest.java b/library/src/test/java/com/datatorrent/lib/appdata/dimensions/DimensionsEventTest.java
index 43b71ef..fd69676 100644
--- a/library/src/test/java/com/datatorrent/lib/appdata/dimensions/DimensionsEventTest.java
+++ b/library/src/test/java/com/datatorrent/lib/appdata/dimensions/DimensionsEventTest.java
@@ -20,15 +20,16 @@ package com.datatorrent.lib.appdata.dimensions;
 
 import java.util.Map;
 
-import com.google.common.collect.Maps;
-
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.EventKey;
+
+import com.google.common.collect.Maps;
+
 import com.datatorrent.lib.appdata.gpo.GPOMutable;
 import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
 import com.datatorrent.lib.appdata.schemas.Type;
-import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey;
 
 public class DimensionsEventTest
 {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchemaTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchemaTest.java b/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchemaTest.java
index 26a06bd..b3669bc 100644
--- a/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchemaTest.java
+++ b/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchemaTest.java
@@ -24,23 +24,24 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import org.codehaus.jettison.json.JSONArray;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.codehaus.jettison.json.JSONArray;
+
+import org.apache.apex.malhar.lib.dimensions.DimensionsDescriptor;
+import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorIncrementalType;
+import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorRegistry;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 import com.datatorrent.lib.appdata.schemas.DimensionalConfigurationSchema.DimensionsCombination;
 import com.datatorrent.lib.appdata.schemas.DimensionalConfigurationSchema.Key;
 import com.datatorrent.lib.appdata.schemas.DimensionalConfigurationSchema.Value;
-import com.datatorrent.lib.dimensions.DimensionsDescriptor;
-import com.datatorrent.lib.dimensions.aggregator.AggregatorIncrementalType;
-import com.datatorrent.lib.dimensions.aggregator.AggregatorRegistry;
 
 public class DimensionalConfigurationSchemaTest
 {
@@ -56,9 +57,9 @@ public class DimensionalConfigurationSchemaTest
   public void noEnumTest()
   {
     //Test if loading of no enums works
-    DimensionalConfigurationSchema des =
-    new DimensionalConfigurationSchema(SchemaUtils.jarResourceFileToString("adsGenericEventSchemaNoEnums.json"),
-    AggregatorRegistry.DEFAULT_AGGREGATOR_REGISTRY);
+    DimensionalConfigurationSchema des = new DimensionalConfigurationSchema(
+        SchemaUtils.jarResourceFileToString("adsGenericEventSchemaNoEnums.json"),
+        AggregatorRegistry.DEFAULT_AGGREGATOR_REGISTRY);
 
     DimensionalSchema dimensionalSchema = new DimensionalSchema(des);
     dimensionalSchema.getSchemaJSON();

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalSchemaTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalSchemaTest.java b/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalSchemaTest.java
index a98d346..50b539e 100644
--- a/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalSchemaTest.java
+++ b/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalSchemaTest.java
@@ -24,20 +24,21 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONObject;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorRegistry;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 import com.datatorrent.lib.appdata.query.serde.MessageSerializerFactory;
-import com.datatorrent.lib.dimensions.aggregator.AggregatorRegistry;
 
 public class DimensionalSchemaTest
 {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/test/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistryTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistryTest.java b/library/src/test/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistryTest.java
deleted file mode 100644
index 5c4feed..0000000
--- a/library/src/test/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistryTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
-import com.datatorrent.lib.appdata.schemas.TimeBucket;
-import com.datatorrent.lib.dimensions.CustomTimeBucketRegistry;
-
-
-public class CustomTimeBucketRegistryTest
-{
-  @Test
-  public void testBuildingRegistry()
-  {
-    CustomTimeBucketRegistry timeBucketRegistry = new CustomTimeBucketRegistry();
-
-    CustomTimeBucket c1m = new CustomTimeBucket(TimeBucket.MINUTE);
-    CustomTimeBucket c1h = new CustomTimeBucket(TimeBucket.HOUR);
-    CustomTimeBucket c1d = new CustomTimeBucket(TimeBucket.DAY);
-
-    timeBucketRegistry.register(c1m, TimeBucket.MINUTE.ordinal());
-    timeBucketRegistry.register(c1h, TimeBucket.HOUR.ordinal());
-    timeBucketRegistry.register(c1d, TimeBucket.DAY.ordinal());
-
-    CustomTimeBucket customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.MINUTE.ordinal());
-    Assert.assertTrue(customTimeBucket.isUnit());
-    Assert.assertEquals(TimeBucket.MINUTE, customTimeBucket.getTimeBucket());
-
-    customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.HOUR.ordinal());
-    Assert.assertTrue(customTimeBucket.isUnit());
-    Assert.assertEquals(TimeBucket.HOUR, customTimeBucket.getTimeBucket());
-
-    customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.DAY.ordinal());
-    Assert.assertTrue(customTimeBucket.isUnit());
-    Assert.assertEquals(TimeBucket.DAY, customTimeBucket.getTimeBucket());
-
-    Assert.assertEquals(TimeBucket.MINUTE.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1m));
-    Assert.assertEquals(TimeBucket.HOUR.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1h));
-    Assert.assertEquals(TimeBucket.DAY.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1d));
-  }
-
-  @Test
-  public void testRegister()
-  {
-    CustomTimeBucketRegistry timeBucketRegistry = new CustomTimeBucketRegistry();
-
-    CustomTimeBucket c1m = new CustomTimeBucket(TimeBucket.MINUTE);
-    CustomTimeBucket c1h = new CustomTimeBucket(TimeBucket.HOUR);
-    CustomTimeBucket c1d = new CustomTimeBucket(TimeBucket.DAY);
-
-    timeBucketRegistry.register(c1m, TimeBucket.MINUTE.ordinal());
-    timeBucketRegistry.register(c1h, TimeBucket.HOUR.ordinal());
-    timeBucketRegistry.register(c1d, TimeBucket.DAY.ordinal());
-
-    int max = Integer.MIN_VALUE;
-    max = Math.max(max, TimeBucket.MINUTE.ordinal());
-    max = Math.max(max, TimeBucket.HOUR.ordinal());
-    max = Math.max(max, TimeBucket.DAY.ordinal());
-
-    CustomTimeBucket c5m = new CustomTimeBucket(TimeBucket.MINUTE, 5L);
-
-    timeBucketRegistry.register(c5m);
-    int timeBucketId = timeBucketRegistry.getTimeBucketId(c5m);
-
-    Assert.assertEquals(max + 1, timeBucketId);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/test/java/com/datatorrent/lib/dimensions/DimensionsDescriptorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/dimensions/DimensionsDescriptorTest.java b/library/src/test/java/com/datatorrent/lib/dimensions/DimensionsDescriptorTest.java
deleted file mode 100644
index 54682b1..0000000
--- a/library/src/test/java/com/datatorrent/lib/dimensions/DimensionsDescriptorTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.dimensions;
-
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Sets;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
-import com.datatorrent.lib.appdata.schemas.Fields;
-import com.datatorrent.lib.appdata.schemas.TimeBucket;
-import com.datatorrent.lib.appdata.schemas.Type;
-import com.datatorrent.lib.dimensions.DimensionsDescriptor;
-
-public class DimensionsDescriptorTest
-{
-  public static final String KEY_1_NAME = "key1";
-  public static final Type KEY_1_TYPE = Type.INTEGER;
-  public static final String KEY_2_NAME = "key2";
-  public static final Type KEY_2_TYPE = Type.STRING;
-
-  public static final String AGG_1_NAME = "agg1";
-  public static final Type AGG_1_TYPE = Type.INTEGER;
-  public static final String AGG_2_NAME = "agg2";
-  public static final Type AGG_2_TYPE = Type.STRING;
-
-  @Test
-  public void simpleTest1()
-  {
-    DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME);
-
-    Set<String> fields = Sets.newHashSet();
-    fields.add(KEY_1_NAME);
-
-    Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields());
-    Assert.assertEquals("The timeunit should be null.", null, ad.getTimeBucket());
-  }
-
-  @Test
-  public void simpleTest2()
-  {
-    DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME +
-                                                       DimensionsDescriptor.DELIMETER_SEPERATOR +
-                                                       KEY_2_NAME);
-
-    Set<String> fields = Sets.newHashSet();
-    fields.add(KEY_1_NAME);
-    fields.add(KEY_2_NAME);
-
-    Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields());
-    Assert.assertEquals("The timeunit should be null.", null, ad.getTimeBucket());
-  }
-
-  @Test
-  public void simpleTimeTest()
-  {
-    DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME +
-                                                       DimensionsDescriptor.DELIMETER_SEPERATOR +
-                                                       DimensionsDescriptor.DIMENSION_TIME +
-                                                       DimensionsDescriptor.DELIMETER_EQUALS +
-                                                       "DAYS");
-
-    Set<String> fields = Sets.newHashSet();
-    fields.add(KEY_1_NAME);
-
-    Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields());
-    Assert.assertEquals("The timeunit should be DAYS.", TimeUnit.DAYS, ad.getTimeBucket().getTimeUnit());
-  }
-
-  @Test
-  public void equalsAndHashCodeTest()
-  {
-    DimensionsDescriptor ddA = new DimensionsDescriptor(new CustomTimeBucket(TimeBucket.MINUTE, 5L),
-                                                        new Fields(Sets.newHashSet("a", "b")));
-
-    DimensionsDescriptor ddB = new DimensionsDescriptor(new CustomTimeBucket(TimeBucket.MINUTE, 5L),
-                                                        new Fields(Sets.newHashSet("a", "b")));
-
-    Assert.assertTrue(ddB.equals(ddA));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/test/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistryTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistryTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistryTest.java
new file mode 100644
index 0000000..c9524b1
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistryTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.dimensions.CustomTimeBucketRegistry;
+
+import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
+import com.datatorrent.lib.appdata.schemas.TimeBucket;
+
+public class CustomTimeBucketRegistryTest
+{
+  @Test
+  public void testBuildingRegistry()
+  {
+    CustomTimeBucketRegistry timeBucketRegistry = new CustomTimeBucketRegistry();
+
+    CustomTimeBucket c1m = new CustomTimeBucket(TimeBucket.MINUTE);
+    CustomTimeBucket c1h = new CustomTimeBucket(TimeBucket.HOUR);
+    CustomTimeBucket c1d = new CustomTimeBucket(TimeBucket.DAY);
+
+    timeBucketRegistry.register(c1m, TimeBucket.MINUTE.ordinal());
+    timeBucketRegistry.register(c1h, TimeBucket.HOUR.ordinal());
+    timeBucketRegistry.register(c1d, TimeBucket.DAY.ordinal());
+
+    CustomTimeBucket customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.MINUTE.ordinal());
+    Assert.assertTrue(customTimeBucket.isUnit());
+    Assert.assertEquals(TimeBucket.MINUTE, customTimeBucket.getTimeBucket());
+
+    customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.HOUR.ordinal());
+    Assert.assertTrue(customTimeBucket.isUnit());
+    Assert.assertEquals(TimeBucket.HOUR, customTimeBucket.getTimeBucket());
+
+    customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.DAY.ordinal());
+    Assert.assertTrue(customTimeBucket.isUnit());
+    Assert.assertEquals(TimeBucket.DAY, customTimeBucket.getTimeBucket());
+
+    Assert.assertEquals(TimeBucket.MINUTE.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1m));
+    Assert.assertEquals(TimeBucket.HOUR.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1h));
+    Assert.assertEquals(TimeBucket.DAY.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1d));
+  }
+
+  @Test
+  public void testRegister()
+  {
+    CustomTimeBucketRegistry timeBucketRegistry = new CustomTimeBucketRegistry();
+
+    CustomTimeBucket c1m = new CustomTimeBucket(TimeBucket.MINUTE);
+    CustomTimeBucket c1h = new CustomTimeBucket(TimeBucket.HOUR);
+    CustomTimeBucket c1d = new CustomTimeBucket(TimeBucket.DAY);
+
+    timeBucketRegistry.register(c1m, TimeBucket.MINUTE.ordinal());
+    timeBucketRegistry.register(c1h, TimeBucket.HOUR.ordinal());
+    timeBucketRegistry.register(c1d, TimeBucket.DAY.ordinal());
+
+    int max = Integer.MIN_VALUE;
+    max = Math.max(max, TimeBucket.MINUTE.ordinal());
+    max = Math.max(max, TimeBucket.HOUR.ordinal());
+    max = Math.max(max, TimeBucket.DAY.ordinal());
+
+    CustomTimeBucket c5m = new CustomTimeBucket(TimeBucket.MINUTE, 5L);
+
+    timeBucketRegistry.register(c5m);
+    int timeBucketId = timeBucketRegistry.getTimeBucketId(c5m);
+
+    Assert.assertEquals(max + 1, timeBucketId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/test/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptorTest.java
new file mode 100644
index 0000000..3101577
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptorTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.dimensions.DimensionsDescriptor;
+
+import com.google.common.collect.Sets;
+
+import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
+import com.datatorrent.lib.appdata.schemas.Fields;
+import com.datatorrent.lib.appdata.schemas.TimeBucket;
+import com.datatorrent.lib.appdata.schemas.Type;
+
+public class DimensionsDescriptorTest
+{
+  public static final String KEY_1_NAME = "key1";
+  public static final Type KEY_1_TYPE = Type.INTEGER;
+  public static final String KEY_2_NAME = "key2";
+  public static final Type KEY_2_TYPE = Type.STRING;
+
+  public static final String AGG_1_NAME = "agg1";
+  public static final Type AGG_1_TYPE = Type.INTEGER;
+  public static final String AGG_2_NAME = "agg2";
+  public static final Type AGG_2_TYPE = Type.STRING;
+
+  @Test
+  public void simpleTest1()
+  {
+    DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME);
+
+    Set<String> fields = Sets.newHashSet();
+    fields.add(KEY_1_NAME);
+
+    Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields());
+    Assert.assertEquals("The timeunit should be null.", null, ad.getTimeBucket());
+  }
+
+  @Test
+  public void simpleTest2()
+  {
+    DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME +
+                                                       DimensionsDescriptor.DELIMETER_SEPERATOR +
+                                                       KEY_2_NAME);
+
+    Set<String> fields = Sets.newHashSet();
+    fields.add(KEY_1_NAME);
+    fields.add(KEY_2_NAME);
+
+    Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields());
+    Assert.assertEquals("The timeunit should be null.", null, ad.getTimeBucket());
+  }
+
+  @Test
+  public void simpleTimeTest()
+  {
+    DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME +
+                                                       DimensionsDescriptor.DELIMETER_SEPERATOR +
+                                                       DimensionsDescriptor.DIMENSION_TIME +
+                                                       DimensionsDescriptor.DELIMETER_EQUALS +
+                                                       "DAYS");
+
+    Set<String> fields = Sets.newHashSet();
+    fields.add(KEY_1_NAME);
+
+    Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields());
+    Assert.assertEquals("The timeunit should be DAYS.", TimeUnit.DAYS, ad.getTimeBucket().getTimeUnit());
+  }
+
+  @Test
+  public void equalsAndHashCodeTest()
+  {
+    DimensionsDescriptor ddA = new DimensionsDescriptor(new CustomTimeBucket(TimeBucket.MINUTE, 5L),
+                                                        new Fields(Sets.newHashSet("a", "b")));
+
+    DimensionsDescriptor ddB = new DimensionsDescriptor(new CustomTimeBucket(TimeBucket.MINUTE, 5L),
+                                                        new Fields(Sets.newHashSet("a", "b")));
+
+    Assert.assertTrue(ddB.equals(ddA));
+  }
+}


Mime
View raw message