apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From timothyfar...@apache.org
Subject [2/4] incubator-apex-malhar git commit: APEXMALHAR-1991 #resolve #comment Move Dimensions Computation Classes to org.apache.apex.malhar package
Date Mon, 28 Mar 2016 22:39:43 GMT
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptor.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptor.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptor.java
new file mode 100644
index 0000000..f7df583
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptor.java
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
+import com.datatorrent.lib.appdata.schemas.Fields;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.TimeBucket;
+import com.datatorrent.lib.appdata.schemas.Type;
+
+/**
+ * <p>
+ * This class defines a dimensions combination which is used by dimensions
+ * computation operators and stores. A dimension combination is composed of the
+ * names of the fields that constitute the key, as well as the TimeBucket under
+ * which data is stored.
+ * </p>
+ * <p>
+ * This class supports the creation of a dimensions combination from a
+ * {@link TimeBucket} object and a set of fields. It also supports the creation
+ * of a dimensions combination an aggregation string. An aggregation string
+ * looks like the following: <br/>
+ * <br/>
+ * {@code
+ * "time=MINUTES:publisher:advertiser"
+ * } <br/>
+ * <br/>
+ * In the example above <b>"time=MINUTES"</b> represents a time bucket, and the
+ * other colon separated strings represent the name of fields which comprise the
+ * key for this dimension combination. When specifiying a time bucket in an
+ * aggregation string you must use the name of one of the TimeUnit enums.
+ * </p>
+ * <p>
+ * One of the primary uses of a {@link DimensionsDescriptor} is for querying a
+ * dimensional data store. When a query is received for a dimensional data
+ * store, the query must be mapped to many things including a
+ * dimensionDescriptorID. The dimensionDescriptorID is an id assigned to a class
+ * of dimension combinations which share the same keys. This mapping is
+ * performed by creating a {@link DimensionsDescriptor} object from the query,
+ * and then using the {@link DimensionsDescriptor} object to look up the correct
+ * dimensionsDescriptorID. This lookup to retrieve a dimensionsDescriptorID is
+ * necessary because a dimensionsDescriptorID is used for storage in order to
+ * prevent key conflicts.
+ * </p>
+ *
+ *
+ * @since 3.3.0
+ */
+public class DimensionsDescriptor implements Serializable, Comparable<DimensionsDescriptor>
+{
+  private static final long serialVersionUID = 201506251237L;
+
+  /**
+   * Name of the reserved time field.
+   */
+  public static final String DIMENSION_TIME = "time";
+  /**
+   * Type of the reserved time field.
+   */
+  public static final Type DIMENSION_TIME_TYPE = Type.LONG;
+  /**
+   * Name of the reserved time bucket field.
+   */
+  public static final String DIMENSION_TIME_BUCKET = "timeBucket";
+  /**
+   * Type of the reserved time bucket field.
+   */
+  public static final Type DIMENSION_TIME_BUCKET_TYPE = Type.INTEGER;
+  /**
+   * The set of fields used for time, which are intended to be queried. Not that
+   * the timeBucket field is not included here because its not intended to be
+   * queried.
+   */
+  public static final Fields TIME_FIELDS = new Fields(Sets.newHashSet(DIMENSION_TIME));
+  /**
+   * This set represents the field names which cannot be part of the user
+   * defined field names in a schema for dimensions computation.
+   */
+  public static final Set<String> RESERVED_DIMENSION_NAMES = ImmutableSet.of(DIMENSION_TIME, DIMENSION_TIME_BUCKET);
+  /**
+   * This is the equals string separator used when defining a time bucket for a
+   * dimensions combination.
+   */
+  public static final String DELIMETER_EQUALS = "=";
+  /**
+   * This separates dimensions in the dimensions combination.
+   */
+  public static final String DELIMETER_SEPERATOR = ":";
+  /**
+   * A map from a key field to its type.
+   */
+  public static final Map<String, Type> DIMENSION_FIELD_TO_TYPE;
+
+  /**
+   * The time bucket used for this dimension combination.
+   */
+  private TimeBucket timeBucket;
+  /**
+   * The custom time bucket used for this dimension combination.
+   */
+  private CustomTimeBucket customTimeBucket;
+  /**
+   * The set of key fields which compose this dimension combination.
+   */
+  private Fields fields;
+
+  static {
+    Map<String, Type> dimensionFieldToType = Maps.newHashMap();
+
+    dimensionFieldToType.put(DIMENSION_TIME, DIMENSION_TIME_TYPE);
+    dimensionFieldToType.put(DIMENSION_TIME_BUCKET, DIMENSION_TIME_BUCKET_TYPE);
+
+    DIMENSION_FIELD_TO_TYPE = Collections.unmodifiableMap(dimensionFieldToType);
+  }
+
+  /**
+   * Constructor for kryo serialization.
+   */
+  private DimensionsDescriptor()
+  {
+    //for kryo
+  }
+
+  /**
+   * Creates a dimensions descriptor (dimensions combination) with the given
+   * {@link TimeBucket} and key fields.
+   *
+   * @param timeBucket
+   *          The {@link TimeBucket} that this dimensions combination
+   *          represents.
+   * @param fields
+   *          The key fields included in this dimensions combination.
+   * @deprecated use
+   *             {@link #DimensionsDescriptor(com.datatorrent.lib.appdata.schemas.CustomTimeBucket, com.datatorrent.lib.appdata.schemas.Fields)}
+   *             instead.
+   */
+  @Deprecated
+  public DimensionsDescriptor(TimeBucket timeBucket, Fields fields)
+  {
+    setTimeBucket(timeBucket);
+    setFields(fields);
+  }
+
+  /**
+   * Creates a dimensions descriptor (dimensions combination) with the given
+   * {@link CustomTimeBucket} and key fields.
+   *
+   * @param timeBucket
+   *          The {@link CustomTimeBucket} that this dimensions combination
+   *          represents.
+   * @param fields
+   *          The key fields included in this dimensions combination.
+   */
+  public DimensionsDescriptor(CustomTimeBucket timeBucket, Fields fields)
+  {
+    setCustomTimeBucket(timeBucket);
+    setFields(fields);
+  }
+
+  /**
+   * Creates a dimensions descriptor (dimensions combination) with the given key
+   * fields.
+   *
+   * @param fields
+   *          The key fields included in this dimensions combination.
+   */
+  public DimensionsDescriptor(Fields fields)
+  {
+    setFields(fields);
+  }
+
+  /**
+   * This construction creates a dimensions descriptor (dimensions combination)
+   * from the given aggregation string.
+   *
+   * @param aggregationString
+   *          The aggregation string to use when initializing this dimensions
+   *          combination.
+   */
+  public DimensionsDescriptor(String aggregationString)
+  {
+    initialize(aggregationString);
+  }
+
+  /**
+   * Initializes the dimensions combination with the given aggregation string.
+   *
+   * @param aggregationString
+   *          The aggregation string with which to initialize this dimensions
+   *          combination.
+   */
+  private void initialize(String aggregationString)
+  {
+    String[] fieldArray = aggregationString.split(DELIMETER_SEPERATOR);
+    Set<String> fieldSet = Sets.newHashSet();
+
+    for (String field : fieldArray) {
+      String[] fieldAndValue = field.split(DELIMETER_EQUALS);
+      String fieldName = fieldAndValue[0];
+
+      if (fieldName.equals(DIMENSION_TIME_BUCKET)) {
+        throw new IllegalArgumentException(DIMENSION_TIME_BUCKET + " is an invalid time.");
+      }
+
+      if (!fieldName.equals(DIMENSION_TIME)) {
+        fieldSet.add(fieldName);
+      }
+
+      if (fieldName.equals(DIMENSION_TIME)) {
+        if (timeBucket != null) {
+          throw new IllegalArgumentException(
+              "Cannot specify time in a dimensions " + "descriptor when a timebucket is also " + "specified.");
+        }
+
+        if (fieldAndValue.length == 2) {
+
+          timeBucket = TimeBucket.TIME_UNIT_TO_TIME_BUCKET.get(TimeUnit.valueOf(fieldAndValue[1]));
+        }
+      }
+    }
+
+    fields = new Fields(fieldSet);
+  }
+
+  /**
+   * This is a helper method which sets and validates the {@link TimeBucket}.
+   *
+   * @param timeBucket
+   *          The {@link TimeBucket} to set and validate.
+   */
+  private void setTimeBucket(TimeBucket timeBucket)
+  {
+    Preconditions.checkNotNull(timeBucket);
+    this.timeBucket = timeBucket;
+    this.customTimeBucket = new CustomTimeBucket(timeBucket);
+  }
+
+  /**
+   * This is a helper method which sets and validates the
+   * {@link CustomTimeBucket}.
+   *
+   * @param customTimeBucket
+   *          The {@link CustomTimeBucket} to set and validate.
+   */
+  private void setCustomTimeBucket(CustomTimeBucket customTimeBucket)
+  {
+    Preconditions.checkNotNull(customTimeBucket);
+    this.customTimeBucket = customTimeBucket;
+    this.timeBucket = customTimeBucket.getTimeBucket();
+  }
+
+  /**
+   * Gets the {@link TimeBucket} for this {@link DimensionsDescriptor} object.
+   *
+   * @return The {@link TimeBucket} for this {@link DimensionsDescriptor}
+   *         object.
+   * @deprecated use {@link #getCustomTimeBucket()} instead.
+   */
+  @Deprecated
+  public TimeBucket getTimeBucket()
+  {
+    return timeBucket;
+  }
+
+  /**
+   * Gets the {@link CustomTimeBucket} for this {@link DimensionsDescriptor}
+   * object.
+   *
+   * @return The {@link CustomTimeBucket} for this {@link DimensionsDescriptor}
+   *         object.
+   */
+  public CustomTimeBucket getCustomTimeBucket()
+  {
+    return customTimeBucket;
+  }
+
+  /**
+   * This is a helper method which sets and validates the set of key fields for
+   * this {@link DimensionsDescriptor} object.
+   *
+   * @param fields
+   *          The set of key fields for this {@link DimensionsDescriptor}
+   *          object.
+   */
+  private void setFields(Fields fields)
+  {
+    Preconditions.checkNotNull(fields);
+    this.fields = fields;
+  }
+
+  /**
+   * Returns the set of key fields for this {@link DimensionsDescriptor} object.
+   *
+   * @return The set of key fields for this {@link DimensionsDescriptor} object.
+   */
+  public Fields getFields()
+  {
+    return fields;
+  }
+
+  /**
+   * This method is used to create a new {@link FieldsDescriptor} object
+   * representing this {@link DimensionsDescriptor} object from another
+   * {@link FieldsDescriptor} object which defines the names and types of all
+   * the available key fields.
+   *
+   * @param parentDescriptor
+   *          The {@link FieldsDescriptor} object which defines the name and
+   *          type of all the available key fields.
+   * @return A {@link FieldsDescriptor} object which represents this
+   *         {@link DimensionsDescriptor} (dimensions combination) derived from
+   *         the given {@link FieldsDescriptor} object.
+   */
+  public FieldsDescriptor createFieldsDescriptor(FieldsDescriptor parentDescriptor)
+  {
+    Map<String, Type> fieldToType = Maps.newHashMap();
+    Map<String, Type> parentFieldToType = parentDescriptor.getFieldToType();
+
+    for (String field : this.fields.getFields()) {
+      if (RESERVED_DIMENSION_NAMES.contains(field)) {
+        continue;
+      }
+
+      fieldToType.put(field, parentFieldToType.get(field));
+    }
+
+    if (timeBucket != null && timeBucket != TimeBucket.ALL) {
+      fieldToType.put(DIMENSION_TIME_BUCKET, DIMENSION_TIME_BUCKET_TYPE);
+      fieldToType.put(DIMENSION_TIME, Type.LONG);
+    }
+
+    return new FieldsDescriptor(fieldToType);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int hash = 7;
+    hash = 83 * hash + (this.customTimeBucket != null ? this.customTimeBucket.hashCode() : 0);
+    hash = 83 * hash + (this.fields != null ? this.fields.hashCode() : 0);
+    return hash;
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    final DimensionsDescriptor other = (DimensionsDescriptor)obj;
+    if (!this.customTimeBucket.equals(other.customTimeBucket)) {
+      return false;
+    }
+    if (this.fields != other.fields && (this.fields == null || !this.fields.equals(other.fields))) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "DimensionsDescriptor{" + "timeBucket=" + customTimeBucket + ", fields=" + fields + '}';
+  }
+
+  @Override
+  public int compareTo(DimensionsDescriptor other)
+  {
+    if (this == other) {
+      return 0;
+    }
+
+    List<String> thisFieldList = this.getFields().getFieldsList();
+    List<String> otherFieldList = other.getFields().getFieldsList();
+
+    if (thisFieldList != otherFieldList) {
+      int compare = thisFieldList.size() - otherFieldList.size();
+
+      if (compare != 0) {
+        return compare;
+      }
+
+      Collections.sort(thisFieldList);
+      Collections.sort(otherFieldList);
+
+      for (int index = 0; index < thisFieldList.size(); index++) {
+        String thisField = thisFieldList.get(index);
+        String otherField = otherFieldList.get(index);
+
+        int fieldCompare = thisField.compareTo(otherField);
+
+        if (fieldCompare != 0) {
+          return fieldCompare;
+        }
+      }
+    }
+
+    CustomTimeBucket thisBucket = this.getCustomTimeBucket();
+    CustomTimeBucket otherBucket = other.getCustomTimeBucket();
+
+    if (thisBucket == null && otherBucket == null) {
+      return 0;
+    } else if (thisBucket != null && otherBucket == null) {
+      return 1;
+    } else if (thisBucket == null && otherBucket != null) {
+      return -1;
+    } else {
+      return thisBucket.compareTo(otherBucket);
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(DimensionsDescriptor.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsEvent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsEvent.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsEvent.java
new file mode 100644
index 0000000..de9e096
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsEvent.java
@@ -0,0 +1,848 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.apex.malhar.lib.dimensions.aggregator.AggregateEvent;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+
+/**
+ * <p>
+ * This is the base class for the events that are used for internal processing
+ * in the subclasses of {@link AbstractDimensionsComputationFlexible} and
+ * {@link DimensionsStoreHDHT}.
+ * </p>
+ * <p>
+ * A {@link DimensionsEvent} is constructed from two parts: an {@link EventKey}
+ * and a {@link GPOMutable} object which contains the values of aggregate
+ * fields. The {@link EventKey} is used to identify the dimension combination an
+ * event belongs to, and consequently determines what input values should be
+ * aggregated together. The aggregates are the actual data payload of the event
+ * which are to be aggregated.
+ * </p>
+ *
+ * @since 3.1.0
+ */
+public class DimensionsEvent implements Serializable
+{
+  private static final long serialVersionUID = 201503231204L;
+
+  /**
+   * This is the {@link GPOMutable} object which holds all the aggregates.
+   */
+  protected GPOMutable aggregates;
+  /**
+   * This is the event key for the event.
+   */
+  protected EventKey eventKey;
+
+  /**
+   * Constructor for Kryo.
+   */
+  private DimensionsEvent()
+  {
+    //For kryo
+  }
+
+  /**
+   * This creates a {@link DimensionsEvent} from the given event key and
+   * aggregates.
+   *
+   * @param eventKey
+   *          The key from which to create a {@link DimensionsEvent}.
+   * @param aggregates
+   *          The aggregates from which to create {@link DimensionsEvent}.
+   */
+  public DimensionsEvent(EventKey eventKey, GPOMutable aggregates)
+  {
+    setEventKey(eventKey);
+    setAggregates(aggregates);
+  }
+
+  /**
+   * Creates a DimensionsEvent with the given key values, aggregates and ids.
+   *
+   * @param keys
+   *          The values for fields in the key.
+   * @param aggregates
+   *          The values for fields in the aggregate.
+   * @param bucketID
+   *          The bucketID
+   * @param schemaID
+   *          The schemaID.
+   * @param dimensionDescriptorID
+   *          The dimensionsDescriptorID.
+   * @param aggregatorIndex
+   *          The aggregatorIndex assigned to this event by the unifier.
+   */
+  public DimensionsEvent(GPOMutable keys, GPOMutable aggregates, int bucketID, int schemaID, int dimensionDescriptorID,
+      int aggregatorIndex)
+  {
+    this.eventKey = new EventKey(bucketID, schemaID, dimensionDescriptorID, aggregatorIndex, keys);
+    setAggregates(aggregates);
+  }
+
+  /**
+   * This creates an event with the given data. Note, this constructor assumes
+   * that the bucketID will be 0.
+   *
+   * @param keys
+   *          The value for fields in the key.
+   * @param aggregates
+   *          The value for fields in the aggregate.
+   * @param schemaID
+   *          The schemaID.
+   * @param dimensionDescriptorID
+   *          The dimensionsDescriptorID.
+   * @param aggregatorIndex
+   *          The aggregatorIndex assigned to this event by the unifier.
+   */
+  public DimensionsEvent(GPOMutable keys, GPOMutable aggregates, int schemaID, int dimensionDescriptorID,
+      int aggregatorIndex)
+  {
+    this.eventKey = new EventKey(schemaID, dimensionDescriptorID, aggregatorIndex, keys);
+    setAggregates(aggregates);
+  }
+
+  /**
+   * This is a helper method which sets the {@link EventKey} of the event to be
+   * the same as the given {@link EventKey}.
+   *
+   * @param eventKey
+   *          The {@link EventKey} to set on this event.
+   */
+  protected final void setEventKey(EventKey eventKey)
+  {
+    this.eventKey = new EventKey(eventKey);
+  }
+
+  /**
+   * This is a helper method which sets the aggregates for this event.
+   *
+   * @param aggregates
+   *          The aggregates for this event.
+   */
+  protected final void setAggregates(GPOMutable aggregates)
+  {
+    Preconditions.checkNotNull(aggregates);
+    this.aggregates = aggregates;
+  }
+
+  /**
+   * This is a helper method which returns the aggregates for this event.
+   *
+   * @return The helper method which returns the aggregates for this event.
+   */
+  public GPOMutable getAggregates()
+  {
+    return aggregates;
+  }
+
+  /**
+   * Returns the {@link EventKey} for this event.
+   *
+   * @return The {@link EventKey} for this event.
+   */
+  public EventKey getEventKey()
+  {
+    return eventKey;
+  }
+
+  /**
+   * This is a convenience method which returns the values of the key fields in
+   * this event's {@link EventKey}.
+   *
+   * @return The values of the key fields in this event's {@link EventKey}.
+   */
+  public GPOMutable getKeys()
+  {
+    return eventKey.getKey();
+  }
+
+  /**
+   * This is a convenience method which returns the schemaID of this event's
+   * {@link EventKey}.
+   *
+   * @return The schemaID of this event's {@link EventKey}.
+   */
+  public int getSchemaID()
+  {
+    return eventKey.getSchemaID();
+  }
+
+  /**
+   * Returns the id of the dimension descriptor (key combination) for which this
+   * event contains data.
+   *
+   * @return The id of the dimension descriptor (key combination) for which this
+   *         event contains data.
+   */
+  public int getDimensionDescriptorID()
+  {
+    return eventKey.getDimensionDescriptorID();
+  }
+
+  /**
+   * Returns the id of the aggregator which is applied to this event's data.
+   *
+   * @return Returns the id of the aggregator which is applied to this event's
+   *         data.
+   */
+  public int getAggregatorID()
+  {
+    return eventKey.getAggregatorID();
+  }
+
+  /**
+   * Returns the bucketID assigned to this event. The bucketID is useful for
+   * this event in the case that the event is sent to a partitioned HDHT
+   * operator. Each partitioned HDHT operator can use the bucketIDs for the
+   * buckets it writes to as a partition key.
+   *
+   * @return The bucketID assigned to this event.
+   */
+  public int getBucketID()
+  {
+    return eventKey.getBucketID();
+  }
+
+  /**
+   * This is a utility method which copies the given src event to the given
+   * destination event.
+   *
+   * @param aeDest
+   *          The destination event.
+   * @param aeSrc
+   *          The source event.
+   */
+  public static void copy(DimensionsEvent aeDest, DimensionsEvent aeSrc)
+  {
+    GPOMutable destAggs = aeDest.getAggregates();
+    GPOMutable srcAggs = aeSrc.getAggregates();
+
+    if (srcAggs.getFieldsBoolean() != null) {
+      System.arraycopy(srcAggs.getFieldsBoolean(), 0, destAggs.getFieldsBoolean(), 0,
+          srcAggs.getFieldsBoolean().length);
+    }
+
+    if (srcAggs.getFieldsCharacter() != null) {
+      System.arraycopy(srcAggs.getFieldsCharacter(), 0, destAggs.getFieldsCharacter(), 0,
+          srcAggs.getFieldsCharacter().length);
+    }
+
+    if (srcAggs.getFieldsString() != null) {
+      System.arraycopy(srcAggs.getFieldsString(), 0, destAggs.getFieldsString(), 0, srcAggs.getFieldsString().length);
+    }
+
+    if (srcAggs.getFieldsShort() != null) {
+      System.arraycopy(srcAggs.getFieldsShort(), 0, destAggs.getFieldsShort(), 0, srcAggs.getFieldsShort().length);
+    }
+
+    if (srcAggs.getFieldsInteger() != null) {
+      System.arraycopy(srcAggs.getFieldsInteger(), 0, destAggs.getFieldsInteger(), 0,
+          srcAggs.getFieldsInteger().length);
+    }
+
+    if (srcAggs.getFieldsLong() != null) {
+      System.arraycopy(srcAggs.getFieldsLong(), 0, destAggs.getFieldsLong(), 0, srcAggs.getFieldsLong().length);
+    }
+
+    if (srcAggs.getFieldsFloat() != null) {
+      System.arraycopy(srcAggs.getFieldsFloat(), 0, destAggs.getFieldsFloat(), 0, srcAggs.getFieldsFloat().length);
+    }
+
+    if (srcAggs.getFieldsDouble() != null) {
+      System.arraycopy(srcAggs.getFieldsDouble(), 0, destAggs.getFieldsDouble(), 0, srcAggs.getFieldsDouble().length);
+    }
+  }
+
+  /**
+   * <p>
+   * The {@link EventKey} represents a dimensions combination for a dimensions
+   * event. It contains the keys and values which define a dimensions
+   * combination. It's very similar to a {@link DimensionsDescriptor} which is
+   * also used to define part of a dimensions combination. The difference
+   * between the two is that a {@link DimensionsDescriptor} only contains what
+   * keys are included in the combination, not the values of those keys (which
+   * the {@link EventKey} has.
+   * </p>
+   * <p>
+   * In addition to holding the keys in a dimensions combination and their
+   * values, the event key holds some meta information. The meta information
+   * included and their purposes are the following:
+   * <ul>
+   * <li><b>bucketID:</b> This is set when the dimension store responsible for
+   * storing the data is partitioned. In that case the bucketID is used as the
+   * partitionID.</li>
+   * <li><b>schemaID:</b> This is the id of the {@link DimensionalSchema} that
+   * this {@link EventKey} corresponds to .</li>
+   * <li><b>dimensionDescriptorID:</b> This is the id of the
+   * {@link DimensionsDescriptor} that this {@link EventKey} corresponds to.
+   * </li>
+   * <li><b>aggregatorID:</b> This is the id of the aggregator that is used to
+   * aggregate the values associated with this {@link EventKey} in a
+   * {@link DimensionsEvent}.</li>
+   * </ul>
+   * </p>
+   */
+  public static class EventKey implements Serializable
+  {
+    private static final long serialVersionUID = 201503231205L;
+
+    /**
+     * The bucketID assigned to this event key.
+     */
+    private int bucketID;
+    /**
+     * The schemaID corresponding to the {@link DimensionalSchema} that this
+     * {@link EventKey} corresponds to.
+     */
+    private int schemaID;
+    /**
+     * The dimensionsDescriptorID of the {@link DimensionDescriptor} in the
+     * corresponding {@link DimensionalSchema}.
+     */
+    private int dimensionDescriptorID;
+    /**
+     * The id of the aggregator which should be used to aggregate the values
+     * corresponding to this {@link EventKey}.
+     */
+    private int aggregatorID;
+    /**
+     * The values of the key fields.
+     */
+    private GPOMutable key;
+
+    /**
+     * Constructor for serialization.
+     */
+    private EventKey()
+    {
+      //For kryo
+    }
+
+    /**
+     * Copy constructor.
+     *
+     * @param eventKey
+     *          The {@link EventKey} whose data will be copied.
+     */
+    public EventKey(EventKey eventKey)
+    {
+      this.bucketID = eventKey.bucketID;
+      this.schemaID = eventKey.schemaID;
+      this.dimensionDescriptorID = eventKey.dimensionDescriptorID;
+      this.aggregatorID = eventKey.aggregatorID;
+
+      this.key = new GPOMutable(eventKey.getKey());
+    }
+
+    /**
+     * Creates an event key with the given data.
+     *
+     * @param bucketID
+     *          The bucketID assigned to this {@link EventKey}.
+     * @param schemaID
+     *          The schemaID of the corresponding {@link DimensionalSchema}.
+     * @param dimensionDescriptorID
+     *          The dimensionDescriptorID of the corresponding
+     *          {@link DimensionDescriptor} in the {@link DimensionalSchema}.
+     * @param aggregatorID
+     *          The id of the aggregator which should be used to aggregate the
+     *          values corresponding to this {@link EventKey}.
+     * @param key
+     *          The values of the keys.
+     */
+    public EventKey(int bucketID, int schemaID, int dimensionDescriptorID, int aggregatorID, GPOMutable key)
+    {
+      setBucketID(bucketID);
+      setSchemaID(schemaID);
+      setDimensionDescriptorID(dimensionDescriptorID);
+      setAggregatorID(aggregatorID);
+      setKey(key);
+    }
+
+    /**
+     * Creates an event key with the given data. This constructor assumes that
+     * the bucketID will be 0.
+     *
+     * @param schemaID
+     *          The schemaID of the corresponding {@link DimensionalSchema}.
+     * @param dimensionDescriptorID
+     *          The dimensionDescriptorID of the corresponding
+     *          {@link DimensionDescriptor}.
+     * @param aggregatorID
+     *          The id of the aggregator which should be used to aggregate the
+     *          values corresponding to this {@link EventKey}.
+     * @param key
+     *          The values of the keys.
+     */
+    public EventKey(int schemaID, int dimensionDescriptorID, int aggregatorID, GPOMutable key)
+    {
+      setSchemaID(schemaID);
+      setDimensionDescriptorID(dimensionDescriptorID);
+      setAggregatorID(aggregatorID);
+      setKey(key);
+    }
+
+    /**
+     * Sets the dimension descriptor ID.
+     *
+     * @param dimensionDescriptorID
+     *          The dimension descriptor ID to set.
+     */
+    private void setDimensionDescriptorID(int dimensionDescriptorID)
+    {
+      this.dimensionDescriptorID = dimensionDescriptorID;
+    }
+
+    /**
+     * Returns the dimension descriptor ID.
+     *
+     * @return The dimension descriptor ID.
+     */
+    public int getDimensionDescriptorID()
+    {
+      return dimensionDescriptorID;
+    }
+
+    /**
+     * Returns the aggregatorID.
+     *
+     * @return The aggregatorID.
+     */
+    public int getAggregatorID()
+    {
+      return aggregatorID;
+    }
+
+    /**
+     * Sets the aggregatorID.
+     *
+     * @param aggregatorID
+     *          The aggregatorID to set.
+     */
+    private void setAggregatorID(int aggregatorID)
+    {
+      this.aggregatorID = aggregatorID;
+    }
+
+    /**
+     * Returns the schemaID.
+     *
+     * @return The schemaID to set.
+     */
+    public int getSchemaID()
+    {
+      return schemaID;
+    }
+
+    /**
+     * Sets the schemaID.
+     *
+     * @param schemaID
+     *          The schemaID to set.
+     */
+    private void setSchemaID(int schemaID)
+    {
+      this.schemaID = schemaID;
+    }
+
+    /**
+     * Returns the key values.
+     *
+     * @return The key values.
+     */
+    public GPOMutable getKey()
+    {
+      return key;
+    }
+
+    /**
+     * Sets the bucektID.
+     *
+     * @param bucketID
+     *          The bucketID.
+     */
+    private void setBucketID(int bucketID)
+    {
+      this.bucketID = bucketID;
+    }
+
+    /**
+     * Gets the bucketID.
+     *
+     * @return The bucketID.
+     */
+    public int getBucketID()
+    {
+      return bucketID;
+    }
+
+    /**
+     * Sets the key values.
+     *
+     * @param key
+     *          The key values to set.
+     */
+    private void setKey(GPOMutable key)
+    {
+      Preconditions.checkNotNull(key);
+      this.key = key;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      int hash = 3;
+      hash = 97 * hash + this.bucketID;
+      hash = 97 * hash + this.schemaID;
+      hash = 97 * hash + this.dimensionDescriptorID;
+      hash = 97 * hash + this.aggregatorID;
+      hash = 97 * hash + (this.key != null ? this.key.hashCode() : 0);
+      return hash;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (obj == null) {
+        return false;
+      }
+
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+
+      final EventKey other = (EventKey)obj;
+
+      if (this.bucketID != other.bucketID) {
+        return false;
+      }
+
+      if (this.schemaID != other.schemaID) {
+        return false;
+      }
+
+      if (this.dimensionDescriptorID != other.dimensionDescriptorID) {
+        return false;
+      }
+
+      if (this.aggregatorID != other.aggregatorID) {
+        return false;
+      }
+
+      if (this.key != other.key && (this.key == null || !this.key.equals(other.key))) {
+        return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "EventKey{" + "schemaID=" + schemaID + ", dimensionDescriptorID=" + dimensionDescriptorID
+          + ", aggregatorIndex=" + aggregatorID + ", key=" + key + '}';
+    }
+
+    public static List<EventKey> createEventKeys(int schemaId, int dimensionsDescriptorId, int aggregatorId,
+        List<GPOMutable> keys)
+    {
+      List<EventKey> eventKeys = Lists.newArrayList();
+
+      for (GPOMutable key : keys) {
+        eventKeys.add(new EventKey(schemaId, dimensionsDescriptorId, aggregatorId, key));
+      }
+
+      return eventKeys;
+    }
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int hash = 5;
+    hash = 79 * hash + (this.aggregates != null ? this.aggregates.hashCode() : 0);
+    hash = 79 * hash + (this.eventKey != null ? this.eventKey.hashCode() : 0);
+    return hash;
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    final DimensionsEvent other = (DimensionsEvent)obj;
+    if (this.aggregates != other.aggregates && (this.aggregates == null || !this.aggregates.equals(other.aggregates))) {
+      return false;
+    }
+    if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) {
+      return false;
+    }
+    return true;
+  }
+
+  public static class InputEvent extends DimensionsEvent
+  {
+    private static final long serialVersionUID = 201506210406L;
+    public boolean used = false;
+
+    private InputEvent()
+    {
+    }
+
+    /**
+     * This creates a {@link DimensionsEvent} from the given event key and
+     * aggregates.
+     *
+     * @param eventKey
+     *          The key from which to create a {@link DimensionsEvent}.
+     * @param aggregates
+     *          The aggregates from which to create {@link DimensionsEvent}.
+     */
+    public InputEvent(EventKey eventKey, GPOMutable aggregates)
+    {
+      setEventKey(eventKey);
+      setAggregates(aggregates);
+    }
+
+    /**
+     * Creates a DimensionsEvent with the given key values, aggregates and ids.
+     *
+     * @param keys
+     *          The values for fields in the key.
+     * @param aggregates
+     *          The values for fields in the aggregate.
+     * @param bucketID
+     *          The bucketID
+     * @param schemaID
+     *          The schemaID.
+     * @param dimensionDescriptorID
+     *          The dimensionsDescriptorID.
+     * @param aggregatorIndex
+     *          The aggregatorIndex assigned to this event by the unifier.
+     */
+    public InputEvent(GPOMutable keys, GPOMutable aggregates, int bucketID, int schemaID, int dimensionDescriptorID,
+        int aggregatorIndex)
+    {
+      this.eventKey = new EventKey(bucketID, schemaID, dimensionDescriptorID, aggregatorIndex, keys);
+      setAggregates(aggregates);
+    }
+
+    /**
+     * This creates an event with the given data. Note, this constructor assumes
+     * that the bucketID will be 0.
+     *
+     * @param keys
+     *          The value for fields in the key.
+     * @param aggregates
+     *          The value for fields in the aggregate.
+     * @param schemaID
+     *          The schemaID.
+     * @param dimensionDescriptorID
+     *          The dimensionsDescriptorID.
+     * @param aggregatorIndex
+     *          The aggregatorIndex assigned to this event by the unifier.
+     */
+    public InputEvent(GPOMutable keys, GPOMutable aggregates, int schemaID, int dimensionDescriptorID,
+        int aggregatorIndex)
+    {
+      this.eventKey = new EventKey(schemaID, dimensionDescriptorID, aggregatorIndex, keys);
+      setAggregates(aggregates);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return GPOUtils.hashcode(this.getKeys());
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      final DimensionsEvent other = (DimensionsEvent)obj;
+
+      if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) {
+        return false;
+      }
+      return true;
+    }
+  }
+
+  public static class Aggregate extends DimensionsEvent implements AggregateEvent
+  {
+    private static final long serialVersionUID = 201506190110L;
+
+    /**
+     * This is the aggregatorIndex assigned to this event.
+     */
+    protected int aggregatorIndex;
+    private GPOMutable metaData;
+
+    public Aggregate()
+    {
+      //for kryo and for extending classes
+    }
+
+    /**
+     * This creates a {@link DimensionsEvent} from the given event key and
+     * aggregates.
+     *
+     * @param eventKey
+     *          The key from which to create a {@link DimensionsEvent}.
+     * @param aggregates
+     *          The aggregates from which to create {@link DimensionsEvent}.
+     */
+    public Aggregate(EventKey eventKey, GPOMutable aggregates)
+    {
+      setEventKey(eventKey);
+      setAggregates(aggregates);
+    }
+
+    public Aggregate(EventKey eventKey, GPOMutable aggregates, GPOMutable metaData)
+    {
+      super(eventKey, aggregates);
+
+      this.metaData = metaData;
+    }
+
+    /**
+     * Creates a DimensionsEvent with the given key values, aggregates and ids.
+     *
+     * @param keys
+     *          The values for fields in the key.
+     * @param aggregates
+     *          The values for fields in the aggregate.
+     * @param bucketID
+     *          The bucketID
+     * @param schemaID
+     *          The schemaID.
+     * @param dimensionDescriptorID
+     *          The dimensionsDescriptorID.
+     * @param aggregatorIndex
+     *          The aggregatorIndex assigned to this event by the unifier.
+     */
+    public Aggregate(GPOMutable keys, GPOMutable aggregates, int bucketID, int schemaID, int dimensionDescriptorID,
+        int aggregatorIndex)
+    {
+      this.eventKey = new EventKey(bucketID, schemaID, dimensionDescriptorID, aggregatorIndex, keys);
+      setAggregates(aggregates);
+    }
+
+    public Aggregate(GPOMutable keys, GPOMutable aggregates, GPOMutable metaData, int bucketID, int schemaID,
+        int dimensionDescriptorID, int aggregatorIndex)
+    {
+      this(keys, aggregates, bucketID, schemaID, dimensionDescriptorID, aggregatorIndex);
+
+      this.metaData = metaData;
+    }
+
+    /**
+     * This creates an event with the given data. Note, this constructor assumes
+     * that the bucketID will be 0.
+     *
+     * @param keys
+     *          The value for fields in the key.
+     * @param aggregates
+     *          The value for fields in the aggregate.
+     * @param schemaID
+     *          The schemaID.
+     * @param dimensionDescriptorID
+     *          The dimensionsDescriptorID.
+     * @param aggregatorIndex
+     *          The aggregatorIndex assigned to this event by the unifier.
+     */
+    public Aggregate(GPOMutable keys, GPOMutable aggregates, int schemaID, int dimensionDescriptorID,
+        int aggregatorIndex)
+    {
+      this.eventKey = new EventKey(schemaID, dimensionDescriptorID, aggregatorIndex, keys);
+      setAggregates(aggregates);
+    }
+
+    public Aggregate(GPOMutable keys, GPOMutable aggregates, GPOMutable metaData, int schemaID,
+        int dimensionDescriptorID, int aggregatorIndex)
+    {
+      this(keys, aggregates, schemaID, dimensionDescriptorID, aggregatorIndex);
+
+      this.metaData = metaData;
+    }
+
+    public void setMetaData(GPOMutable metaData)
+    {
+      this.metaData = metaData;
+    }
+
+    public GPOMutable getMetaData()
+    {
+      return metaData;
+    }
+
+    public void setAggregatorIndex(int aggregatorIndex)
+    {
+      this.aggregatorIndex = aggregatorIndex;
+    }
+
+    @Override
+    public int getAggregatorIndex()
+    {
+      return aggregatorIndex;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return GPOUtils.hashcode(this.getKeys());
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      final DimensionsEvent other = (DimensionsEvent)obj;
+
+      if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) {
+        return false;
+      }
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractIncrementalAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractIncrementalAggregator.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractIncrementalAggregator.java
new file mode 100644
index 0000000..b25390e
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractIncrementalAggregator.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.dimensions.DimensionsConversionContext;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.EventKey;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
+
+/**
+ * * <p>
+ * {@link IncrementalAggregator}s perform aggregations in place, on a field by field basis. For example if we have a
+ * field cost, an incremental aggregator would take a new value of cost and aggregate it to an aggregate value for
+ * cost. No fields except the cost field are used in the computation of the cost aggregation in the case of an
+ * {@link IncrementalAggregator}.
+ * </p>
+ * <p>
+ * {@link IncrementalAggregator}s are intended to be used with subclasses of
+ * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema}. The way in which
+ * {@link IncrementalAggregator}s are used in this context is that a batch of fields to be aggregated by the aggregator
+ * are provided in the form of an {@link InputEvent}. For example, if there are two fields (cost and revenue), which
+ * will be aggregated by a sum aggregator, both of those fields will be included in the {@link InputEvent} passed to
+ * the sum aggregator. And the {DimensionsEventregate} event produced by the sum aggregator will contain two fields,
+ * one for cost and one for revenue.
+ * </p>
+ * 
+ */
+public abstract class AbstractIncrementalAggregator implements IncrementalAggregator
+{
+  private static final long serialVersionUID = 201506211153L;
+
+  /**
+   * The conversion context for this aggregator.
+   */
+  protected DimensionsConversionContext context;
+
+  public AbstractIncrementalAggregator()
+  {
+  }
+
+  @Override
+  public void setDimensionsConversionContext(DimensionsConversionContext context)
+  {
+    this.context = Preconditions.checkNotNull(context);
+  }
+
+  @Override
+  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
+  {
+    src.used = true;
+    Aggregate aggregate = createAggregate(src,
+        context,
+        aggregatorIndex);
+    return aggregate;
+  }
+
+  @Override
+  public int hashCode(InputEvent inputEvent)
+  {
+    long timestamp = -1L;
+    boolean hasTime = this.context.inputTimestampIndex != -1
+        && this.context.outputTimebucketIndex != -1;
+
+    if (hasTime) {
+      timestamp = inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex];
+      inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex]
+          = this.context.dd.getCustomTimeBucket().roundDown(timestamp);
+    }
+
+    int hashCode = GPOUtils.indirectHashcode(inputEvent.getKeys(), context.indexSubsetKeys);
+
+    if (hasTime) {
+      inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex] = timestamp;
+    }
+
+    return hashCode;
+  }
+
+  @Override
+  public boolean equals(InputEvent inputEvent1, InputEvent inputEvent2)
+  {
+    long timestamp1 = 0;
+    long timestamp2 = 0;
+
+    if (context.inputTimestampIndex != -1) {
+      timestamp1 = inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex];
+      inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex] =
+          context.dd.getCustomTimeBucket().roundDown(timestamp1);
+
+      timestamp2 = inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex];
+      inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex] =
+          context.dd.getCustomTimeBucket().roundDown(timestamp2);
+    }
+
+    boolean equals = GPOUtils.subsetEquals(inputEvent2.getKeys(),
+        inputEvent1.getKeys(),
+        context.indexSubsetKeys);
+
+    if (context.inputTimestampIndex != -1) {
+      inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp1;
+      inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp2;
+    }
+
+    return equals;
+  }
+
+  /**
+   * Creates an {@link Aggregate} from the given {@link InputEvent}.
+   *
+   * @param inputEvent      The {@link InputEvent} to unpack into an {@link Aggregate}.
+   * @param context         The conversion context required to transform the {@link InputEvent} into
+   *                        the correct {@link Aggregate}.
+   * @param aggregatorIndex The aggregatorIndex assigned to this {@link Aggregate}.
+   * @return The converted {@link Aggregate}.
+   */
+  public static Aggregate createAggregate(InputEvent inputEvent,
+      DimensionsConversionContext context,
+      int aggregatorIndex)
+  {
+    GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor);
+    EventKey eventKey = createEventKey(inputEvent,
+        context,
+        aggregatorIndex);
+
+    Aggregate aggregate = new Aggregate(eventKey,
+        aggregates);
+    aggregate.setAggregatorIndex(aggregatorIndex);
+
+    return aggregate;
+  }
+
+  /**
+   * Creates an {@link EventKey} from the given {@link InputEvent}.
+   *
+   * @param inputEvent      The {@link InputEvent} to extract an {@link EventKey} from.
+   * @param context         The conversion context required to extract the {@link EventKey} from
+   *                        the given {@link InputEvent}.
+   * @param aggregatorIndex The aggregatorIndex to assign to this {@link InputEvent}.
+   * @return The {@link EventKey} extracted from the given {@link InputEvent}.
+   */
+  public static EventKey createEventKey(InputEvent inputEvent,
+      DimensionsConversionContext context,
+      int aggregatorIndex)
+  {
+    GPOMutable keys = new GPOMutable(context.keyDescriptor);
+    GPOUtils.indirectCopy(keys, inputEvent.getKeys(), context.indexSubsetKeys);
+
+    if (context.outputTimebucketIndex >= 0) {
+      CustomTimeBucket timeBucket = context.dd.getCustomTimeBucket();
+
+      keys.getFieldsInteger()[context.outputTimebucketIndex] = context.customTimeBucketRegistry.getTimeBucketId(
+          timeBucket);
+      keys.getFieldsLong()[context.outputTimestampIndex] =
+          timeBucket.roundDown(inputEvent.getKeys().getFieldsLong()[context.inputTimestampIndex]);
+    }
+
+    EventKey eventKey = new EventKey(context.schemaID,
+        context.dimensionsDescriptorID,
+        context.aggregatorID,
+        keys);
+
+    return eventKey;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractIncrementalAggregator.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregateEvent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregateEvent.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregateEvent.java
new file mode 100644
index 0000000..6000c90
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregateEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import it.unimi.dsi.fastutil.Hash;
+
+/**
+ * @since 3.3.0
+ */
+public interface AggregateEvent
+{
+  int getAggregatorIndex();
+
+  public static interface Aggregator<EVENT, AGGREGATE extends AggregateEvent> extends Hash.Strategy<EVENT>
+  {
+    AGGREGATE getGroup(EVENT src, int aggregatorIndex);
+
+    void aggregate(AGGREGATE dest, EVENT src);
+
+    void aggregate(AGGREGATE dest, AGGREGATE src);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorAverage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorAverage.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorAverage.java
new file mode 100644
index 0000000..e5b6d83
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorAverage.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import com.datatorrent.api.annotation.Name;
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.schemas.Fields;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.Type;
+
+/**
+ * This is the average {@link OTFAggregator}.
+ *
+ * @since 3.1.0
+ */
+@Name("AVG")
+public class AggregatorAverage implements OTFAggregator
+{
+  private static final long serialVersionUID = 20154301644L;
+
+  /**
+   * The array index of the sum aggregates in the argument list of the {@link #aggregate} function.
+   */
+  public static int SUM_INDEX = 0;
+  /**
+   * The array index of the count aggregates in the argument list of the {@link #aggregate} function.
+   */
+  public static int COUNT_INDEX = 1;
+  /**
+   * The singleton instance of this class.
+   */
+  public static final AggregatorAverage INSTANCE = new AggregatorAverage();
+
+  /**
+   * The list of {@link IncrementalAggregator}s that this {@link OTFAggregator} depends on.
+   */
+  public static final transient List<Class<? extends IncrementalAggregator>> CHILD_AGGREGATORS =
+      ImmutableList.of(AggregatorIncrementalType.SUM.getAggregator().getClass(),
+      AggregatorIncrementalType.COUNT.getAggregator().getClass());
+
+  /**
+   * Constructor for singleton pattern.
+   */
+  protected AggregatorAverage()
+  {
+    //Do nothing
+  }
+
+  @Override
+  public List<Class<? extends IncrementalAggregator>> getChildAggregators()
+  {
+    return CHILD_AGGREGATORS;
+  }
+
+  @Override
+  public GPOMutable aggregate(GPOMutable... aggregates)
+  {
+    Preconditions.checkArgument(aggregates.length == getChildAggregators().size(),
+        "The number of arguments " + aggregates.length +
+        " should be the same as the number of child aggregators " + getChildAggregators().size());
+
+    GPOMutable sumAggregation = aggregates[SUM_INDEX];
+    GPOMutable countAggregation = aggregates[COUNT_INDEX];
+
+    FieldsDescriptor fieldsDescriptor = sumAggregation.getFieldDescriptor();
+    Fields fields = fieldsDescriptor.getFields();
+    GPOMutable result = new GPOMutable(AggregatorUtils.getOutputFieldsDescriptor(fields, this));
+
+    long count = countAggregation.getFieldsLong()[0];
+
+    for (String field : fields.getFields()) {
+      Type type = sumAggregation.getFieldDescriptor().getType(field);
+
+      switch (type) {
+        case BYTE: {
+          double val = ((double)sumAggregation.getFieldByte(field)) /
+              ((double)count);
+          result.setField(field, val);
+          break;
+        }
+        case SHORT: {
+          double val = ((double)sumAggregation.getFieldShort(field)) /
+              ((double)count);
+          result.setField(field, val);
+          break;
+        }
+        case INTEGER: {
+          double val = ((double)sumAggregation.getFieldInt(field)) /
+              ((double)count);
+          result.setField(field, val);
+          break;
+        }
+        case LONG: {
+          double val = ((double)sumAggregation.getFieldLong(field)) /
+              ((double)count);
+          result.setField(field, val);
+          break;
+        }
+        case FLOAT: {
+          double val = sumAggregation.getFieldFloat(field) /
+              ((double)count);
+          result.setField(field, val);
+          break;
+        }
+        case DOUBLE: {
+          double val = sumAggregation.getFieldDouble(field) /
+              ((double)count);
+          result.setField(field, val);
+          break;
+        }
+        default: {
+          throw new UnsupportedOperationException("The type " + type + " is not supported.");
+        }
+      }
+    }
+
+    return result;
+  }
+
+  @Override
+  public Type getOutputType()
+  {
+    return Type.DOUBLE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorCount.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorCount.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorCount.java
new file mode 100644
index 0000000..61d12a8
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorCount.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.EventKey;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.annotation.Name;
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.Type;
+
+/**
+ * This {@link IncrementalAggregator} performs a count of the number of times an input is encountered.
+ *
+ * @since 3.1.0
+ */
+@Name("COUNT")
+public class AggregatorCount extends AbstractIncrementalAggregator
+{
+  private static final long serialVersionUID = 20154301645L;
+
+  /**
+   * This is a map whose keys represent input types and whose values
+   * represent the corresponding output types.
+   */
+  public static final transient Map<Type, Type> TYPE_CONVERSION_MAP;
+
+  static {
+    Map<Type, Type> typeConversionMap = Maps.newHashMap();
+
+    for (Type type : Type.values()) {
+      typeConversionMap.put(type, Type.LONG);
+    }
+
+    TYPE_CONVERSION_MAP = Collections.unmodifiableMap(typeConversionMap);
+  }
+
+  public AggregatorCount()
+  {
+    //Do nothing
+  }
+
+  @Override
+  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
+  {
+    src.used = true;
+    GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor);
+    GPOMutable keys = new GPOMutable(context.keyDescriptor);
+    GPOUtils.indirectCopy(keys, src.getKeys(), context.indexSubsetKeys);
+
+    EventKey eventKey = createEventKey(src,
+        context,
+        aggregatorIndex);
+
+    long[] longFields = aggregates.getFieldsLong();
+
+    for (int index = 0;
+        index < longFields.length;
+        index++) {
+      longFields[index] = 0;
+    }
+
+    return new Aggregate(eventKey,
+        aggregates);
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, InputEvent src)
+  {
+    long[] fieldsLong = dest.getAggregates().getFieldsLong();
+
+    for (int index = 0;
+        index < fieldsLong.length;
+        index++) {
+      //increment count
+      fieldsLong[index]++;
+    }
+  }
+
+  @Override
+  public void aggregate(Aggregate destAgg, Aggregate srcAgg)
+  {
+    long[] destLongs = destAgg.getAggregates().getFieldsLong();
+    long[] srcLongs = srcAgg.getAggregates().getFieldsLong();
+
+    for (int index = 0;
+        index < destLongs.length;
+        index++) {
+      //aggregate count
+      destLongs[index] += srcLongs[index];
+    }
+  }
+
+  @Override
+  public Type getOutputType(Type inputType)
+  {
+    return TYPE_CONVERSION_MAP.get(inputType);
+  }
+
+  @Override
+  public FieldsDescriptor getMetaDataDescriptor()
+  {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorCumSum.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorCumSum.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorCumSum.java
new file mode 100644
index 0000000..744cd1c
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorCumSum.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.annotation.Name;
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.lib.appdata.gpo.Serde;
+import com.datatorrent.lib.appdata.gpo.SerdeFieldsDescriptor;
+import com.datatorrent.lib.appdata.gpo.SerdeListGPOMutable;
+import com.datatorrent.lib.appdata.gpo.SerdeObjectPayloadFix;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.Type;
+
+@Name("CUM_SUM")
+/**
+ * @since 3.1.0
+ */
+
+public class AggregatorCumSum extends AggregatorSum
+{
+  private static final long serialVersionUID = 201506280518L;
+
+  public static final int KEY_FD_INDEX = 0;
+  public static final int AGGREGATE_FD_INDEX = 1;
+  public static final int KEYS_INDEX = 2;
+  public static final int AGGREGATES_INDEX = 3;
+
+  public static final FieldsDescriptor META_DATA_FIELDS_DESCRIPTOR;
+
+  static {
+    Map<String, Type> fieldToType = Maps.newHashMap();
+    fieldToType.put("fdkeys", Type.OBJECT);
+    fieldToType.put("fdvalues", Type.OBJECT);
+    fieldToType.put("keys", Type.OBJECT);
+    fieldToType.put("values", Type.OBJECT);
+
+    Map<String, Serde> fieldToSerde = Maps.newHashMap();
+    fieldToSerde.put("fdkeys", SerdeFieldsDescriptor.INSTANCE);
+    fieldToSerde.put("fdvalues", SerdeFieldsDescriptor.INSTANCE);
+    fieldToSerde.put("keys", SerdeListGPOMutable.INSTANCE);
+    fieldToSerde.put("values", SerdeListGPOMutable.INSTANCE);
+
+    META_DATA_FIELDS_DESCRIPTOR = new FieldsDescriptor(fieldToType,
+        fieldToSerde,
+        new PayloadFix());
+  }
+
+  public AggregatorCumSum()
+  {
+  }
+
+  @Override
+  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
+  {
+    src.used = true;
+    Aggregate agg = createAggregate(src,
+        context,
+        aggregatorIndex);
+
+    GPOUtils.indirectCopy(agg.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
+
+    GPOMutable metaData = new GPOMutable(getMetaDataDescriptor());
+
+    GPOMutable fullKey = new GPOMutable(src.getKeys());
+
+    if (context.inputTimestampIndex >= 0) {
+      fullKey.getFieldsLong()[context.inputTimestampIndex] = -1L;
+    }
+
+    List<GPOMutable> keys = Lists.newArrayList(fullKey);
+
+    GPOMutable value = new GPOMutable(agg.getAggregates());
+    List<GPOMutable> values = Lists.newArrayList(value);
+
+    metaData.getFieldsObject()[KEY_FD_INDEX] = fullKey.getFieldDescriptor();
+    metaData.getFieldsObject()[AGGREGATE_FD_INDEX] = value.getFieldDescriptor();
+    metaData.getFieldsObject()[KEYS_INDEX] = keys;
+    metaData.getFieldsObject()[AGGREGATES_INDEX] = values;
+    agg.setMetaData(metaData);
+
+    return agg;
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, InputEvent src)
+  {
+    @SuppressWarnings("unchecked")
+    List<GPOMutable> destKeys =
+        (List<GPOMutable>)dest.getMetaData().getFieldsObject()[KEYS_INDEX];
+
+    @SuppressWarnings("unchecked")
+    List<GPOMutable> destAggregates =
+        (List<GPOMutable>)dest.getMetaData().getFieldsObject()[AGGREGATES_INDEX];
+
+    long timestamp = 0L;
+
+    if (context.inputTimestampIndex >= 0) {
+      timestamp = src.getKeys().getFieldsLong()[context.inputTimestampIndex];
+      src.getKeys().getFieldsLong()[context.inputTimestampIndex] = -1L;
+    }
+
+    if (!contains(destKeys, src.getKeys())) {
+      destKeys.add(new GPOMutable(src.getKeys()));
+
+      GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor);
+      GPOUtils.indirectCopy(aggregates, src.getAggregates(), context.indexSubsetAggregates);
+
+      destAggregates.add(aggregates);
+
+      this.aggregateAggs(dest.getAggregates(), aggregates);
+    }
+
+    if (context.inputTimestampIndex >= 0) {
+      src.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp;
+    }
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, Aggregate src)
+  {
+    dest.getMetaData().applyObjectPayloadFix();
+    src.getMetaData().applyObjectPayloadFix();
+
+    @SuppressWarnings("unchecked")
+    List<GPOMutable> destKeys =
+        (List<GPOMutable>)dest.getMetaData().getFieldsObject()[KEYS_INDEX];
+
+    @SuppressWarnings("unchecked")
+    List<GPOMutable> srcKeys =
+        (List<GPOMutable>)src.getMetaData().getFieldsObject()[KEYS_INDEX];
+
+    @SuppressWarnings("unchecked")
+    List<GPOMutable> destAggregates =
+        (List<GPOMutable>)dest.getMetaData().getFieldsObject()[AGGREGATES_INDEX];
+
+    @SuppressWarnings("unchecked")
+    List<GPOMutable> srcAggregates =
+        (List<GPOMutable>)src.getMetaData().getFieldsObject()[AGGREGATES_INDEX];
+
+    List<GPOMutable> newKeys = Lists.newArrayList();
+    List<GPOMutable> newAggs = Lists.newArrayList();
+
+    for (int index = 0;
+        index < srcKeys.size();
+        index++) {
+      GPOMutable currentSrcKey = srcKeys.get(index);
+      GPOMutable currentSrcAgg = srcAggregates.get(index);
+
+      if (!contains(destKeys, currentSrcKey)) {
+        newKeys.add(currentSrcKey);
+        newAggs.add(currentSrcAgg);
+
+        this.aggregateAggs(dest.getAggregates(), currentSrcAgg);
+      }
+    }
+
+    destKeys.addAll(newKeys);
+    destAggregates.addAll(newAggs);
+  }
+
+  private boolean contains(List<GPOMutable> mutables, GPOMutable mutable)
+  {
+    for (int index = 0;
+        index < mutables.size();
+        index++) {
+      GPOMutable mutableFromList = mutables.get(index);
+
+      if (GPOUtils.equals(mutableFromList, mutable)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public FieldsDescriptor getMetaDataDescriptor()
+  {
+    return META_DATA_FIELDS_DESCRIPTOR;
+  }
+
+  public static class PayloadFix implements SerdeObjectPayloadFix
+  {
+    @Override
+    public void fix(Object[] objects)
+    {
+      FieldsDescriptor keyfd = (FieldsDescriptor)objects[KEY_FD_INDEX];
+      FieldsDescriptor valuefd = (FieldsDescriptor)objects[AGGREGATE_FD_INDEX];
+
+      @SuppressWarnings("unchecked")
+      List<GPOMutable> keyMutables = (List<GPOMutable>)objects[KEYS_INDEX];
+      @SuppressWarnings("unchecked")
+      List<GPOMutable> aggregateMutables = (List<GPOMutable>)objects[AGGREGATES_INDEX];
+
+      fix(keyfd, keyMutables);
+      fix(valuefd, aggregateMutables);
+    }
+
+    private void fix(FieldsDescriptor fd, List<GPOMutable> mutables)
+    {
+      for (int index = 0;
+          index < mutables.size();
+          index++) {
+        mutables.get(index).setFieldDescriptor(fd);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorFirst.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorFirst.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorFirst.java
new file mode 100644
index 0000000..a6ceb65
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorFirst.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent;
+
+import com.datatorrent.api.annotation.Name;
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.Type;
+
+/**
+ * <p>
+ * This aggregator creates an aggregate out of the first {@link InputEvent} encountered by this aggregator. All
+ * subsequent
+ * {@link InputEvent}s are ignored.
+ * </p>
+ * <p>
+ * <b>Note:</b> when aggregates are combined in a unifier it is not possible to tell which came first or last, so
+ * one is picked arbitrarily to be the first.
+ * </p>
+ *
+ * @since 3.1.0
+ */
+@Name("FIRST")
+public class AggregatorFirst extends AbstractIncrementalAggregator
+{
+  private static final long serialVersionUID = 20154301646L;
+
+  public AggregatorFirst()
+  {
+    //Do nothing
+  }
+
+  @Override
+  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
+  {
+    Aggregate aggregate = super.getGroup(src, aggregatorIndex);
+
+    GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
+
+    return aggregate;
+  }
+
+  @Override
+  public Type getOutputType(Type inputType)
+  {
+    return AggregatorUtils.IDENTITY_TYPE_MAP.get(inputType);
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, InputEvent src)
+  {
+    //Ignore
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, Aggregate src)
+  {
+    //Ignore
+  }
+
+  @Override
+  public FieldsDescriptor getMetaDataDescriptor()
+  {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorIncrementalType.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorIncrementalType.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorIncrementalType.java
new file mode 100644
index 0000000..c82e6ee
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorIncrementalType.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+/**
+ * @since 3.1.0
+ */
+
+public enum AggregatorIncrementalType
+{
+  SUM(new AggregatorSum()),
+  MIN(new AggregatorMin()),
+  MAX(new AggregatorMax()),
+  COUNT(new AggregatorCount()),
+  LAST(new AggregatorLast()),
+  FIRST(new AggregatorFirst()),
+  CUM_SUM(new AggregatorCumSum());
+
+  public static final Map<String, Integer> NAME_TO_ORDINAL;
+  public static final Map<String, IncrementalAggregator> NAME_TO_AGGREGATOR;
+
+  private IncrementalAggregator aggregator;
+
+  static {
+    Map<String, Integer> nameToOrdinal = Maps.newHashMap();
+    Map<String, IncrementalAggregator> nameToAggregator = Maps.newHashMap();
+
+    for (AggregatorIncrementalType aggType : AggregatorIncrementalType.values()) {
+      nameToOrdinal.put(aggType.name(), aggType.ordinal());
+      nameToAggregator.put(aggType.name(), aggType.getAggregator());
+    }
+
+    NAME_TO_ORDINAL = Collections.unmodifiableMap(nameToOrdinal);
+    NAME_TO_AGGREGATOR = Collections.unmodifiableMap(nameToAggregator);
+  }
+
+  AggregatorIncrementalType(IncrementalAggregator aggregator)
+  {
+    setAggregator(aggregator);
+  }
+
+  private void setAggregator(IncrementalAggregator aggregator)
+  {
+    Preconditions.checkNotNull(aggregator);
+    this.aggregator = aggregator;
+  }
+
+  public IncrementalAggregator getAggregator()
+  {
+    return aggregator;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(AggregatorIncrementalType.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorLast.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorLast.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorLast.java
new file mode 100644
index 0000000..b679098
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorLast.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent;
+
+import com.datatorrent.api.annotation.Name;
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.Type;
+
+/**
+ * <p>
+ * This aggregator creates an aggregate out of the last {@link InputEvent} encountered by this aggregator. All previous
+ * {@link InputEvent}s are ignored.
+ * </p>
+ * <p>
+ * <b>Note:</b> when aggregates are combined in a unifier it is not possible to tell which came first or last, so
+ * one is picked arbitrarily to be the last.
+ * </p>
+ *
+ * @since 3.1.0
+ */
+@Name("LAST")
+public class AggregatorLast extends AbstractIncrementalAggregator
+{
+  private static final long serialVersionUID = 20154301647L;
+
+  public AggregatorLast()
+  {
+    //Do nothing
+  }
+
+  @Override
+  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
+  {
+    Aggregate aggregate = super.getGroup(src, aggregatorIndex);
+
+    GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
+
+    return aggregate;
+  }
+
+  @Override
+  public Type getOutputType(Type inputType)
+  {
+    return AggregatorUtils.IDENTITY_TYPE_MAP.get(inputType);
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, InputEvent src)
+  {
+    GPOUtils.indirectCopy(dest.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, Aggregate src)
+  {
+    DimensionsEvent.copy(dest, src);
+  }
+
+  @Override
+  public FieldsDescriptor getMetaDataDescriptor()
+  {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorMax.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorMax.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorMax.java
new file mode 100644
index 0000000..8aca886
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorMax.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent;
+
+import com.datatorrent.api.annotation.Name;
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.Type;
+
+/**
+ * This {@link IncrementalAggregator} takes the max of the fields provided in the {@link InputEvent}.
+ *
+ * @since 3.1.0
+ */
+@Name("MAX")
+public class AggregatorMax extends AbstractIncrementalAggregator
+{
+  private static final long serialVersionUID = 201503120332L;
+
+  public AggregatorMax()
+  {
+    //Do nothing
+  }
+
+  @Override
+  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
+  {
+    Aggregate aggregate = super.getGroup(src, aggregatorIndex);
+
+    GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
+
+    return aggregate;
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, InputEvent src)
+  {
+    GPOMutable destAggs = dest.getAggregates();
+    GPOMutable srcAggs = src.getAggregates();
+
+    {
+      byte[] destByte = destAggs.getFieldsByte();
+      if (destByte != null) {
+        byte[] srcByte = srcAggs.getFieldsByte();
+        int[] srcIndices = context.indexSubsetAggregates.fieldsByteIndexSubset;
+        for (int index = 0;
+            index < destByte.length;
+            index++) {
+          byte tempVal = srcByte[srcIndices[index]];
+          if (destByte[index] < tempVal) {
+            destByte[index] = tempVal;
+          }
+        }
+      }
+    }
+
+    {
+      short[] destShort = destAggs.getFieldsShort();
+      if (destShort != null) {
+        short[] srcShort = srcAggs.getFieldsShort();
+        int[] srcIndices = context.indexSubsetAggregates.fieldsShortIndexSubset;
+        for (int index = 0;
+            index < destShort.length;
+            index++) {
+          short tempVal = srcShort[srcIndices[index]];
+          if (destShort[index] < tempVal) {
+            destShort[index] = tempVal;
+          }
+        }
+      }
+    }
+
+    {
+      int[] destInteger = destAggs.getFieldsInteger();
+      if (destInteger != null) {
+        int[] srcInteger = srcAggs.getFieldsInteger();
+        int[] srcIndices = context.indexSubsetAggregates.fieldsIntegerIndexSubset;
+        for (int index = 0;
+            index < destInteger.length;
+            index++) {
+          int tempVal = srcInteger[srcIndices[index]];
+          if (destInteger[index] < tempVal) {
+            destInteger[index] = tempVal;
+          }
+        }
+      }
+    }
+
+    {
+      long[] destLong = destAggs.getFieldsLong();
+      if (destLong != null) {
+        long[] srcLong = srcAggs.getFieldsLong();
+        int[] srcIndices = context.indexSubsetAggregates.fieldsLongIndexSubset;
+        for (int index = 0;
+            index < destLong.length;
+            index++) {
+          long tempVal = srcLong[srcIndices[index]];
+          if (destLong[index] < tempVal) {
+            destLong[index] = tempVal;
+          }
+        }
+      }
+    }
+
+    {
+      float[] destFloat = destAggs.getFieldsFloat();
+      if (destFloat != null) {
+        float[] srcFloat = srcAggs.getFieldsFloat();
+        int[] srcIndices = context.indexSubsetAggregates.fieldsFloatIndexSubset;
+        for (int index = 0;
+            index < destFloat.length;
+            index++) {
+          float tempVal = srcFloat[srcIndices[index]];
+          if (destFloat[index] < tempVal) {
+            destFloat[index] = tempVal;
+          }
+        }
+      }
+    }
+
+    {
+      double[] destDouble = destAggs.getFieldsDouble();
+      if (destDouble != null) {
+        double[] srcDouble = srcAggs.getFieldsDouble();
+        int[] srcIndices = context.indexSubsetAggregates.fieldsDoubleIndexSubset;
+        for (int index = 0;
+            index < destDouble.length;
+            index++) {
+          double tempVal = srcDouble[srcIndices[index]];
+          if (destDouble[index] < tempVal) {
+            destDouble[index] = tempVal;
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, Aggregate src)
+  {
+    GPOMutable destAggs = dest.getAggregates();
+    GPOMutable srcAggs = src.getAggregates();
+
+    {
+      byte[] destByte = destAggs.getFieldsByte();
+      if (destByte != null) {
+        byte[] srcByte = srcAggs.getFieldsByte();
+
+        for (int index = 0;
+            index < destByte.length;
+            index++) {
+          if (destByte[index] < srcByte[index]) {
+            destByte[index] = srcByte[index];
+          }
+        }
+      }
+    }
+
+    {
+      short[] destShort = destAggs.getFieldsShort();
+      if (destShort != null) {
+        short[] srcShort = srcAggs.getFieldsShort();
+
+        for (int index = 0;
+            index < destShort.length;
+            index++) {
+          if (destShort[index] < srcShort[index]) {
+            destShort[index] = srcShort[index];
+          }
+        }
+      }
+    }
+
+    {
+      int[] destInteger = destAggs.getFieldsInteger();
+      if (destInteger != null) {
+        int[] srcInteger = srcAggs.getFieldsInteger();
+
+        for (int index = 0;
+            index < destInteger.length;
+            index++) {
+          if (destInteger[index] < srcInteger[index]) {
+            destInteger[index] = srcInteger[index];
+          }
+        }
+      }
+    }
+
+    {
+      long[] destLong = destAggs.getFieldsLong();
+      if (destLong != null) {
+        long[] srcLong = srcAggs.getFieldsLong();
+
+        for (int index = 0;
+            index < destLong.length;
+            index++) {
+          if (destLong[index] < srcLong[index]) {
+            destLong[index] = srcLong[index];
+          }
+        }
+      }
+    }
+
+    {
+      float[] destFloat = destAggs.getFieldsFloat();
+      if (destFloat != null) {
+        float[] srcFloat = srcAggs.getFieldsFloat();
+
+        for (int index = 0;
+            index < destFloat.length;
+            index++) {
+          if (destFloat[index] < srcFloat[index]) {
+            destFloat[index] = srcFloat[index];
+          }
+        }
+      }
+    }
+
+    {
+      double[] destDouble = destAggs.getFieldsDouble();
+      if (destDouble != null) {
+        double[] srcDouble = srcAggs.getFieldsDouble();
+
+        for (int index = 0;
+            index < destDouble.length;
+            index++) {
+          if (destDouble[index] < srcDouble[index]) {
+            destDouble[index] = srcDouble[index];
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public Type getOutputType(Type inputType)
+  {
+    return AggregatorUtils.IDENTITY_NUMBER_TYPE_MAP.get(inputType);
+  }
+
+  @Override
+  public FieldsDescriptor getMetaDataDescriptor()
+  {
+    return null;
+  }
+}


Mime
View raw message