apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From timothyfar...@apache.org
Subject [3/5] incubator-apex-malhar git commit: MLHR-1919 #resolve #comment add dimension schema to malhar
Date Fri, 05 Feb 2016 20:15:47 GMT
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
new file mode 100644
index 0000000..6639d3a
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
@@ -0,0 +1,807 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.appdata.schemas;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.lib.dimensions.aggregator.AggregatorRegistry;
+import com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator;
+
+/**
+ * The {@link DimensionalSchema} class represents the App Data dimensions schema. The App Data dimensions
+ * schema is built from two sources: a {@link DimensionalConfigurationSchema} and an optional schema stub. The
+ * {@link DimensionalConfigurationSchema} is responsible for defining the key, values, dimensions combinations,
+ * and the aggregations performed for each dimensions combination. The schema stub defines the from and to
+ * times for the App Data dimensions schema. For details on how to define the {@link DimensionalConfigurationSchema}
+ * schema please the documentation for the {@link DimensionalConfigurationSchema} class. An example of a valid
+ * schema stub which defines the from and to times is below:
+ * <br/>
+ * <br/>
+ * {@code
+ * {
+ * "time":
+ * {
+ * "from":1123455556656,
+ * "to":382390859384
+ * }
+ * }
+ *
+ * @since 3.1.0
+ */
+public class DimensionalSchema implements Schema
+{
+  /**
+   * The type of the schema.
+   */
+  public static final String SCHEMA_TYPE = "dimensions";
+  /**
+   * The version of the schema.
+   */
+  public static final String SCHEMA_VERSION = "1.0";
+  /**
+   * The JSON key string corresponding to the from field.
+   */
+  public static final String FIELD_TIME_FROM = "from";
+  /**
+   * The JSON key string corresponding to the time field.
+   */
+  public static final String FIELD_TIME = "time";
+  /**
+   * The JSON key string corresponding to the to field.
+   */
+  public static final String FIELD_TIME_TO = "to";
+  /**
+   * The JSON key string corresponding to the buckets field.
+   */
+  public static final String FIELD_TIME_BUCKETS = "buckets";
+  /**
+   * The JSON key string corresponding to the slidingAggregateSupported field.
+   */
+  public static final String FIELD_SLIDING_AGGREGATE_SUPPORTED = "slidingAggregateSupported";
+  /**
+   * The JSON key string used to identify the tags.
+   */
+  //TODO To be removed when Malhar Library 3.3 becomes a dependency.
+  private static final String FIELD_TAGS = "tags";
+
+  public static final List<Fields> VALID_KEYS = ImmutableList.of(new Fields(Sets.newHashSet(FIELD_TIME)));
+  public static final List<Fields> VALID_TIME_KEYS = ImmutableList.of(
+      new Fields(Sets.newHashSet(FIELD_TIME_FROM, FIELD_TIME_TO)));
+
+  /**
+   * The from value for the schema. Null if there is no from value.
+   */
+  private Long from;
+  /**
+   * The to value for the schema. Null if there is no to value.
+   */
+  private Long to;
+  /**
+   * boolean flag indicating if any values in the schema have been changed.
+   */
+  private boolean changed = false;
+  /**
+   * boolean flag indicating if the from to fields in the schema have been changed.
+   */
+  private boolean changedFromTo = false;
+  /**
+   * boolean flag indicating if the schema keys have been updated for the schema.
+   */
+  private boolean changedSchemaKeys = false;
+  /**
+   * boolean flag indicating if the enum vals are updated.
+   */
+  private boolean areEnumsUpdated = false;
+  /**
+   * The AppData schema JSON string (which is returned in the schema query).
+   */
+  private String schemaJSON;
+  /**
+   * The {@link DimensionalConfigurationSchema} from which this {@link DimensionalSchema} was constructed.
+   */
+  private DimensionalConfigurationSchema configurationSchema;
+  /**
+   * The {@link JSONObject} representing the AppData dimensions schema.
+   */
+  private JSONObject schema;
+  /**
+   * The {@link JSONObject} representing the time section of the AppData dimensions schema.
+   */
+  private JSONObject time;
+  /**
+   * The {@link JSONObject} representing the keys section of the AppData dimensions schema.
+   */
+  private JSONArray keys;
+  /**
+   * This flag is true if there was a from and to time defined for this schema initially.
+   */
+  private boolean predefinedFromTo = false;
+  /**
+   * The schema keys for this schema.
+   */
+  private Map<String, String> schemaKeys;
+  /**
+   * The current enum vals for this schema.
+   */
+  private Map<String, List<Object>> currentEnumVals;
+  /**
+   * The schemaID assigned to this schema. This schemaID is only needed for operators
+   * which need to host multiple schemas.
+   */
+  private int schemaID = Schema.DEFAULT_SCHEMA_ID;
+
+  /**
+   * Constructor for serialization
+   */
+  private DimensionalSchema()
+  {
+    //For kryo
+  }
+
+  /**
+   * This creates a {@link DimensionalSchema} object from the given schema stub,
+   * configuration schema, and schema keys.
+   *
+   * @param schemaStub          The schema stub to use when creating this {@link DimensionalSchema}.
+   * @param configurationSchema The configuration schema to use when creating this {@link DimensionalSchema}.
+   * @param schemaKeys          The schemaKeys to use when creating this {@link DimensionalSchema}.
+   */
+  public DimensionalSchema(String schemaStub,
+      DimensionalConfigurationSchema configurationSchema,
+      Map<String, String> schemaKeys)
+  {
+    this(configurationSchema,
+        schemaKeys);
+
+    if (schemaStub != null) {
+      predefinedFromTo = true;
+      try {
+        setSchemaStub(schemaStub);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
+   * This creates a {@link DimensionalSchema} object from the given schemaID, schemaStrub,configurationSchema, and
+   * schemaKeys.
+   *
+   * @param schemaID            The schemaID assigned to this schema.
+   * @param schemaStub          The schema stub to use when creating this {@link DimensionalSchema}.
+   * @param configurationSchema The configuration schema to use when creating this {@link DimensionalSchema}.
+   * @param schemaKeys          The schemaKeys to use when creating this {@link DimensionalSchema}.
+   */
+  public DimensionalSchema(int schemaID,
+      String schemaStub,
+      DimensionalConfigurationSchema configurationSchema,
+      Map<String, String> schemaKeys)
+  {
+    this(schemaStub,
+        configurationSchema,
+        schemaKeys);
+
+    this.schemaID = schemaID;
+  }
+
+  /**
+   * This creates a {@link DimensionalSchema} from the given schemaStub and configuration schema.
+   *
+   * @param schemaStub          The schema stub to use when creating this {@link DimensionalSchema}.
+   * @param configurationSchema The configuration schema to use when creating this {@link DimensionalSchema}.
+   */
+  public DimensionalSchema(String schemaStub,
+      DimensionalConfigurationSchema configurationSchema)
+  {
+    this(schemaStub, configurationSchema, null);
+  }
+
+  /**
+   * This creates a {@link DimensionalSchema} from the given schemaID, schemaStub, and
+   * configurationSchema.
+   *
+   * @param schemaID            The schemaID assigned to this schema.
+   * @param schemaStub          The schema stub to use when creating this {@link DimensionalSchema}.
+   * @param configurationSchema The configuration schema to use when creating this {@link DimensionalSchema}.
+   */
+  public DimensionalSchema(int schemaID,
+      String schemaStub,
+      DimensionalConfigurationSchema configurationSchema)
+  {
+    this(schemaStub,
+        configurationSchema);
+
+    this.schemaID = schemaID;
+  }
+
+  /**
+   * Creates a {@link DimensionalSchema} from the given configuration schema and schema keys.
+   *
+   * @param configurationSchema The configuration schema from which to construct this {@link DimensionalEventSchema}.
+   * @param schemaKeys          The schemaKeys assigned to this schema.
+   */
+  public DimensionalSchema(DimensionalConfigurationSchema configurationSchema,
+      Map<String, String> schemaKeys)
+  {
+    setConfigurationSchema(configurationSchema);
+    setSchemaKeys(schemaKeys);
+
+    try {
+      initialize();
+    } catch (JSONException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Creates a {@link DimensionalSchema} object from the given schemaID, configurationSchema,
+   * and schemaKeys.
+   *
+   * @param schemaID            The schemaID assigned to this schema.
+   * @param configurationSchema The configuration schema from which this schema was constructed.
+   * @param schemaKeys          The schema keys assigned to this schema.
+   */
+  public DimensionalSchema(int schemaID,
+      DimensionalConfigurationSchema configurationSchema,
+      Map<String, String> schemaKeys)
+  {
+    this(configurationSchema,
+        schemaKeys);
+
+    this.schemaID = schemaID;
+  }
+
+  /**
+   * Creates a {@link DimensionalSchema} object from the given configuration schema.
+   *
+   * @param configurationSchema The configuration schema from which to construct this
+   *                            schema.
+   */
+  public DimensionalSchema(DimensionalConfigurationSchema configurationSchema)
+  {
+    this(configurationSchema,
+        null);
+  }
+
+  /**
+   * Creates a {@link DimensionalSchema} object with the given schema ID and
+   * configuration schema.
+   *
+   * @param schemaID            The schemaID assigned to this schema.
+   * @param configurationSchema The configuration schema from which this schema as constructed.
+   */
+  public DimensionalSchema(int schemaID,
+      DimensionalConfigurationSchema configurationSchema)
+  {
+    this(configurationSchema);
+    this.schemaID = schemaID;
+  }
+
+  /**
+   * Returns the aggregator registry assigned to this schema object.
+   *
+   * @return The aggregator registry.
+   */
+  public AggregatorRegistry getAggregatorRegistry()
+  {
+    return configurationSchema.getAggregatorRegistry();
+  }
+
+  @Override
+  public final void setSchemaKeys(Map<String, String> schemaKeys)
+  {
+    changed = true;
+    changedSchemaKeys = true;
+
+    if (schemaKeys == null) {
+      this.schemaKeys = null;
+      return;
+    }
+
+    for (Map.Entry<String, String> entry : schemaKeys.entrySet()) {
+      Preconditions.checkNotNull(entry.getKey());
+      Preconditions.checkNotNull(entry.getValue());
+    }
+
+    this.schemaKeys = Maps.newHashMap(schemaKeys);
+  }
+
+  /**
+   * This is a helper method for setting the configuration schema.
+   *
+   * @param configurationSchema The configuration schema.
+   */
+  private void setConfigurationSchema(DimensionalConfigurationSchema configurationSchema)
+  {
+    this.configurationSchema = Preconditions.checkNotNull(configurationSchema, "eventSchema");
+  }
+
+  /**
+   * This is a helper method extracts and validates the information contained in the schema stub for this schema.
+   *
+   * @param schemaStub The schema stub to extract information from and validate.
+   * @throws JSONException This exception is thrown if there is an error processing the provided JSON schemaStub.
+   */
+  private void setSchemaStub(String schemaStub) throws JSONException
+  {
+    JSONObject jo = new JSONObject(schemaStub);
+    SchemaUtils.checkValidKeysEx(jo, VALID_KEYS);
+
+    JSONObject tempTime = jo.getJSONObject(FIELD_TIME);
+    SchemaUtils.checkValidKeys(jo, VALID_TIME_KEYS);
+
+    this.from = tempTime.getLong(FIELD_TIME_FROM);
+    this.to = tempTime.getLong(FIELD_TIME_TO);
+  }
+
+  /**
+   * Initializes the schema JSON and schema metadata.
+   *
+   * @throws JSONException This exception is thrown when there is an
+   *                       exception building the schema for the AppData dimensions schema.
+   */
+  private void initialize() throws JSONException
+  {
+    schema = new JSONObject();
+
+    if (schemaKeys != null) {
+      schema.put(Schema.FIELD_SCHEMA_KEYS,
+          SchemaUtils.createJSONObject(schemaKeys));
+    }
+
+    schema.put(SnapshotSchema.FIELD_SCHEMA_TYPE, DimensionalSchema.SCHEMA_TYPE);
+    schema.put(SnapshotSchema.FIELD_SCHEMA_VERSION, DimensionalSchema.SCHEMA_VERSION);
+
+    if (!configurationSchema.getTags().isEmpty()) {
+      schema.put(FIELD_TAGS, new JSONArray(configurationSchema.getTags()));
+    }
+
+    //time
+    time = new JSONObject();
+    schema.put(FIELD_TIME, time);
+    JSONArray bucketsArray = new JSONArray(configurationSchema.getBucketsString());
+    time.put(FIELD_TIME_BUCKETS, bucketsArray);
+    time.put(FIELD_SLIDING_AGGREGATE_SUPPORTED, true);
+
+    //keys
+    keys = new JSONArray(configurationSchema.getKeysString());
+
+    for (int keyIndex = 0; keyIndex < keys.length(); keyIndex++) {
+      JSONObject keyJo = keys.getJSONObject(keyIndex);
+      String keyName = keyJo.getString(DimensionalConfigurationSchema.FIELD_KEYS_NAME);
+      List<String> tags = configurationSchema.getKeyToTags().get(keyName);
+
+      if (!tags.isEmpty()) {
+        keyJo.put(FIELD_TAGS, new JSONArray(tags));
+      }
+    }
+
+    schema.put(DimensionalConfigurationSchema.FIELD_KEYS, keys);
+
+    //values
+    JSONArray values = new JSONArray();
+    schema.put(SnapshotSchema.FIELD_VALUES, values);
+
+    FieldsDescriptor inputValuesDescriptor = configurationSchema.getInputValuesDescriptor();
+    Map<String, Map<String, Type>> allValueToAggregator = configurationSchema.getSchemaAllValueToAggregatorToType();
+
+    for (Map.Entry<String, Map<String, Type>> entry : allValueToAggregator.entrySet()) {
+      String valueName = entry.getKey();
+
+      for (Map.Entry<String, Type> entryAggType : entry.getValue().entrySet()) {
+        String aggregatorName = entryAggType.getKey();
+        Type outputValueType = entryAggType.getValue();
+
+        JSONObject value = new JSONObject();
+        String combinedName = valueName +
+            DimensionalConfigurationSchema.ADDITIONAL_VALUE_SEPERATOR +
+            aggregatorName;
+        value.put(SnapshotSchema.FIELD_VALUES_NAME, combinedName);
+        value.put(SnapshotSchema.FIELD_VALUES_TYPE, outputValueType.getName());
+
+        List<String> tags = configurationSchema.getValueToTags().get(valueName);
+
+        if (!tags.isEmpty()) {
+          value.put(FIELD_TAGS, new JSONArray(tags));
+        }
+
+        values.put(value);
+      }
+    }
+
+    JSONArray dimensions = new JSONArray();
+
+    for (int combinationID = 0;
+        combinationID < configurationSchema.getDimensionsDescriptorIDToKeys().size();
+        combinationID++) {
+
+      Fields fields = configurationSchema.getDimensionsDescriptorIDToKeys().get(combinationID);
+      Map<String, Set<String>> fieldToAggregatorAdditionalValues =
+          configurationSchema.getDimensionsDescriptorIDToFieldToAggregatorAdditionalValues().get(combinationID);
+
+      JSONObject combination = new JSONObject();
+      JSONArray combinationArray = new JSONArray();
+
+      for (String field : fields.getFields()) {
+        combinationArray.put(field);
+      }
+
+      combination.put(DimensionalConfigurationSchema.FIELD_DIMENSIONS_COMBINATIONS, combinationArray);
+
+      if (!fieldToAggregatorAdditionalValues.isEmpty()) {
+        JSONArray additionalValueArray = new JSONArray();
+
+        for (Map.Entry<String, Set<String>> entry : fieldToAggregatorAdditionalValues.entrySet()) {
+          String valueName = entry.getKey();
+
+          for (String aggregatorName : entry.getValue()) {
+            JSONObject additionalValueObject = new JSONObject();
+            String combinedName = valueName
+                + DimensionalConfigurationSchema.ADDITIONAL_VALUE_SEPERATOR
+                + aggregatorName;
+            Type inputValueType = inputValuesDescriptor.getType(valueName);
+
+            if (!configurationSchema.getAggregatorRegistry().isAggregator(aggregatorName)) {
+              if (aggregatorName == null) {
+                LOG.error("{} is not a valid aggregator.", aggregatorName);
+              }
+            }
+
+            Type outputValueType;
+
+            if (configurationSchema.getAggregatorRegistry().isIncrementalAggregator(aggregatorName)) {
+              IncrementalAggregator aggregator
+                  = configurationSchema.getAggregatorRegistry().getNameToIncrementalAggregator().get(aggregatorName);
+
+              outputValueType = aggregator.getOutputType(inputValueType);
+            } else {
+              outputValueType = configurationSchema.getAggregatorRegistry().getNameToOTFAggregators().get(
+                  aggregatorName).getOutputType();
+            }
+
+            additionalValueObject.put(DimensionalConfigurationSchema.FIELD_VALUES_NAME, combinedName);
+            additionalValueObject.put(DimensionalConfigurationSchema.FIELD_VALUES_TYPE, outputValueType.getName());
+            additionalValueArray.put(additionalValueObject);
+          }
+        }
+
+        combination.put(DimensionalConfigurationSchema.FIELD_DIMENSIONS_ADDITIONAL_VALUES, additionalValueArray);
+      }
+
+      dimensions.put(combination);
+    }
+
+    schema.put(DimensionalConfigurationSchema.FIELD_DIMENSIONS, dimensions);
+
+    this.schemaJSON = this.schema.toString();
+  }
+
+  /**
+   * Sets the from time for the schema.
+   *
+   * @param from The from time for the schema.
+   */
+  public void setFrom(Long from)
+  {
+    this.from = from;
+    changed = true;
+    changedFromTo = true;
+  }
+
+  /**
+   * Sets the to time for the schema.
+   *
+   * @param to The to time for the schema.
+   */
+  public void setTo(Long to)
+  {
+    this.to = to;
+    changed = true;
+    changedFromTo = true;
+  }
+
+  /**
+   * Sets the new enum lists for this schema. The sets in the provided maps are converted into lists.
+   *
+   * @param enums The new enum sets for this schema.
+   */
+  public void setEnumsSet(Map<String, Set<Object>> enums)
+  {
+    Preconditions.checkNotNull(enums);
+    areEnumsUpdated = true;
+
+    Map<String, List<Object>> enumsList = Maps.newHashMap();
+
+    //Check that all the given keys are valid
+    Preconditions.checkArgument(
+        configurationSchema.getKeyDescriptor().getFields().getFields().containsAll(enums.keySet()),
+        "The given map doesn't contain valid keys. Valid keys are %s and the provided keys are %s",
+        configurationSchema.getKeyDescriptor().getFields().getFields(),
+        enums.keySet());
+
+    //Todo check the type of the objects, for now just set them on the enum.
+
+    for (Map.Entry<String, Set<Object>> entry : enums.entrySet()) {
+      String name = entry.getKey();
+      Set<Object> vals = entry.getValue();
+
+      Preconditions.checkNotNull(name);
+      Preconditions.checkNotNull(vals);
+
+      for (Object value : entry.getValue()) {
+        Preconditions.checkNotNull(value);
+      }
+
+      List<Object> valsList = Lists.newArrayList(vals);
+      enumsList.put(name, valsList);
+    }
+
+    currentEnumVals = Maps.newHashMap(enumsList);
+  }
+
+  /**
+   * Sets the new enum lists for this schema. The sets in the provided maps are converted into lists, and
+   * sorted according to their natural ordering.
+   *
+   * @param enums The new enum sets for this schema.
+   */
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void setEnumsSetComparable(Map<String, Set<Comparable>> enums)
+  {
+    Preconditions.checkNotNull(enums);
+    areEnumsUpdated = true;
+
+    Map<String, List<Object>> enumsList = Maps.newHashMap();
+
+    //Check that all the given keys are valid
+    Preconditions.checkArgument(
+        configurationSchema.getKeyDescriptor().getFields().getFields().containsAll(enums.keySet()),
+        "The given map doesn't contain valid keys. Valid keys are %s and the provided keys are %s",
+        configurationSchema.getKeyDescriptor().getFields().getFields(),
+        enums.keySet());
+
+    //Todo check the type of the objects, for now just set them on the enum.
+    for (Map.Entry<String, Set<Comparable>> entry : enums.entrySet()) {
+      String name = entry.getKey();
+      Set<Comparable> vals = entry.getValue();
+
+      Preconditions.checkNotNull(name);
+      Preconditions.checkNotNull(vals);
+
+      for (Object value : entry.getValue()) {
+        Preconditions.checkNotNull(value);
+      }
+
+      List<Comparable> valsListComparable = Lists.newArrayList(vals);
+      Collections.sort(valsListComparable);
+      List<Object> valsList = (List)valsListComparable;
+      enumsList.put(name, valsList);
+    }
+
+    currentEnumVals = Maps.newHashMap(enumsList);
+  }
+
+  /**
+   * Sets the new enum lists for this schema.
+   *
+   * @param enums The new enum lists for this schema.
+   */
+  public void setEnumsList(Map<String, List<Object>> enums)
+  {
+    Preconditions.checkNotNull(enums);
+    areEnumsUpdated = true;
+
+    //Check that all the given keys are valid
+    Preconditions.checkArgument(
+        configurationSchema.getKeyDescriptor().getFields().getFields().containsAll(enums.keySet()),
+        "The given map doesn't contain valid keys. Valid keys are %s and the provided keys are %s",
+        configurationSchema.getKeyDescriptor().getFields().getFields(),
+        enums.keySet());
+
+    //Todo check the type of the objects, for now just set them on the enum.
+    for (Map.Entry<String, List<Object>> entry : enums.entrySet()) {
+      Preconditions.checkNotNull(entry.getKey());
+      Preconditions.checkNotNull(entry.getValue());
+    }
+
+    Map<String, List<Object>> tempEnums = Maps.newHashMap();
+
+    for (Map.Entry<String, List<Object>> entry : enums.entrySet()) {
+      String key = entry.getKey();
+      List<?> enumValues = entry.getValue();
+      List<Object> tempEnumValues = Lists.newArrayList();
+
+      for (Object enumValue : enumValues) {
+        tempEnumValues.add(enumValue);
+      }
+
+      tempEnums.put(key, tempEnumValues);
+    }
+
+    currentEnumVals = tempEnums;
+  }
+
+  @Override
+  public String getSchemaJSON()
+  {
+    if (!changed && schemaJSON != null) {
+      //If there are no changes, return the pre computed JSON
+      return schemaJSON;
+    }
+
+    if (changedSchemaKeys) {
+      //If the schema keys change, recompute the schema keys portion of the JSON
+      changedSchemaKeys = false;
+
+      if (schemaKeys == null) {
+        schema.remove(Schema.FIELD_SCHEMA_KEYS);
+      } else {
+        try {
+          schema.put(Schema.FIELD_SCHEMA_KEYS,
+              SchemaUtils.createJSONObject(schemaKeys));
+        } catch (JSONException ex) {
+          throw new RuntimeException(ex);
+        }
+      }
+    }
+
+    if (changedFromTo) {
+      //If the from to times have changed then recompute the time portion of the schema.
+      changedFromTo = false;
+      Preconditions.checkState(!(from == null ^ to == null),
+          "Either both from and to should be set or both should be not set.");
+
+      if (from != null) {
+        Preconditions.checkState(to >= from, "to {} must be greater than or equal to from {}.", to, from);
+      }
+
+      if (from == null) {
+        time.remove(FIELD_TIME_FROM);
+        time.remove(FIELD_TIME_TO);
+      } else {
+        try {
+          time.put(FIELD_TIME_FROM, from);
+          time.put(FIELD_TIME_TO, to);
+        } catch (JSONException ex) {
+          throw new RuntimeException(ex);
+        }
+      }
+    }
+
+    if (this.areEnumsUpdated) {
+      //If the enums have been updated, recompute the key portion of the schema.
+      for (int keyIndex = 0;
+          keyIndex < keys.length();
+          keyIndex++) {
+        JSONObject keyData;
+        String name;
+
+        try {
+          keyData = keys.getJSONObject(keyIndex);
+          name = keyData.getString(DimensionalConfigurationSchema.FIELD_KEYS_NAME);
+        } catch (JSONException ex) {
+          throw new RuntimeException(ex);
+        }
+
+        List<Object> enumVals = currentEnumVals.get(name);
+
+        if (enumVals == null || enumVals.isEmpty()) {
+          keyData.remove(DimensionalConfigurationSchema.FIELD_KEYS_ENUMVALUES);
+          continue;
+        }
+
+        JSONArray newEnumValues = new JSONArray();
+
+        for (Object enumVal : enumVals) {
+          newEnumValues.put(enumVal);
+        }
+
+        try {
+          keyData.put(DimensionalConfigurationSchema.FIELD_KEYS_ENUMVALUES, newEnumValues);
+        } catch (JSONException ex) {
+          throw new RuntimeException(ex);
+        }
+      }
+
+      this.areEnumsUpdated = false;
+    }
+
+    //Rebuild the schema JSON string.
+    schemaJSON = schema.toString();
+    return schemaJSON;
+  }
+
+  /**
+   * Gets the {@link DimensionalConfigurationSchema} from which this {@link DimensionalSchema}.
+   *
+   * @return The {@link DimensionalConfigurationSchema} from which this {@link DimensionalSchema} was
+   * constructed.
+   */
+  public DimensionalConfigurationSchema getDimensionalConfigurationSchema()
+  {
+    return configurationSchema;
+  }
+
+  @Override
+  public String getSchemaType()
+  {
+    return SCHEMA_TYPE;
+  }
+
+  @Override
+  public String getSchemaVersion()
+  {
+    return SCHEMA_VERSION;
+  }
+
+  @Override
+  public Map<String, String> getSchemaKeys()
+  {
+    return schemaKeys;
+  }
+
+  /**
+   * @return the predefinedFromTo
+   */
+  public boolean isPredefinedFromTo()
+  {
+    return predefinedFromTo;
+  }
+
+  /**
+   * Returns the schema ID for this schema. This is only relevant for operators which
+   * host multiple schemas.
+   *
+   * @return The schema ID for this schema.
+   */
+  @Override
+  public int getSchemaID()
+  {
+    return schemaID;
+  }
+
+  /**
+   * Returns the current enum vals for the schema. The current enum vals for the
+   * schema are expressed in a map whose keys are the names of the keys in the schema, and whose
+   * values are a list of possible values for the key.
+   *
+   * @return the currentEnumVals The current enum vals for the schema.
+   */
+  public Map<String, List<Object>> getCurrentEnumVals()
+  {
+    return currentEnumVals;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(DimensionalSchema.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java
new file mode 100644
index 0000000..b12b631
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java
@@ -0,0 +1,844 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.dimensions;
+
+import java.io.Serializable;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.lib.dimensions.aggregator.AggregateEvent;
+
+/**
+ * <p>
+ * This is the base class for the events that are used for internal processing in the subclasses of
+ * {@link AbstractDimensionsComputationFlexible} and {@link DimensionsStoreHDHT}.
+ * </p>
+ * <p>
+ * A {@link DimensionsEvent} is constructed from two parts: an {@link EventKey} and a {@link GPOMutable} object
+ * which contains the values of aggregate fields. The {@link EventKey} is used to identify the dimension combination
+ * an event belongs to, and consequently determines what input values should be aggregated together. The aggregates
+ * are the actual data payload of the event which are to be aggregated.
+ * </p>
+ *
+ * @since 3.1.0
+ */
+public class DimensionsEvent implements Serializable
+{
+  private static final long serialVersionUID = 201503231204L;
+
+  /**
+   * This is the {@link GPOMutable} object which holds all the aggregates.
+   */
+  protected GPOMutable aggregates;
+  /**
+   * This is the event key for the event.
+   */
+  protected EventKey eventKey;
+
+  /**
+   * Constructor for Kryo.
+   */
+  private DimensionsEvent()
+  {
+    //For kryo
+  }
+
+  /**
+   * This creates a {@link DimensionsEvent} from the given event key and aggregates.
+   *
+   * @param eventKey   The key from which to create a {@link DimensionsEvent}.
+   * @param aggregates The aggregates from which to create {@link DimensionsEvent}.
+   */
+  public DimensionsEvent(EventKey eventKey,
+      GPOMutable aggregates)
+  {
+    setEventKey(eventKey);
+    setAggregates(aggregates);
+  }
+
+  /**
+   * Creates a DimensionsEvent with the given key values, aggregates and ids.
+   *
+   * @param keys                  The values for fields in the key.
+   * @param aggregates            The values for fields in the aggregate.
+   * @param bucketID              The bucketID
+   * @param schemaID              The schemaID.
+   * @param dimensionDescriptorID The dimensionsDescriptorID.
+   * @param aggregatorIndex       The aggregatorIndex assigned to this event by the unifier.
+   */
+  public DimensionsEvent(GPOMutable keys,
+      GPOMutable aggregates,
+      int bucketID,
+      int schemaID,
+      int dimensionDescriptorID,
+      int aggregatorIndex)
+  {
+    this.eventKey = new EventKey(bucketID,
+        schemaID,
+        dimensionDescriptorID,
+        aggregatorIndex,
+        keys);
+    setAggregates(aggregates);
+  }
+
+  /**
+   * This creates an event with the given data. Note, this constructor assumes that the bucketID will be 0.
+   *
+   * @param keys                  The value for fields in the key.
+   * @param aggregates            The value for fields in the aggregate.
+   * @param schemaID              The schemaID.
+   * @param dimensionDescriptorID The dimensionsDescriptorID.
+   * @param aggregatorIndex       The aggregatorIndex assigned to this event by the unifier.
+   */
+  public DimensionsEvent(GPOMutable keys,
+      GPOMutable aggregates,
+      int schemaID,
+      int dimensionDescriptorID,
+      int aggregatorIndex)
+  {
+    this.eventKey = new EventKey(schemaID,
+        dimensionDescriptorID,
+        aggregatorIndex,
+        keys);
+    setAggregates(aggregates);
+  }
+
+  /**
+   * This is a helper method which sets the {@link EventKey} of the event to
+   * be the same as the given {@link EventKey}.
+   *
+   * @param eventKey The {@link EventKey} to set on this event.
+   */
+  protected final void setEventKey(EventKey eventKey)
+  {
+    this.eventKey = new EventKey(eventKey);
+  }
+
+  /**
+   * This is a helper method which sets the aggregates for this event.
+   *
+   * @param aggregates The aggregates for this event.
+   */
+  protected final void setAggregates(GPOMutable aggregates)
+  {
+    Preconditions.checkNotNull(aggregates);
+    this.aggregates = aggregates;
+  }
+
+  /**
+   * This is a helper method which returns the aggregates for this event.
+   *
+   * @return The helper method which returns the aggregates for this event.
+   */
+  public GPOMutable getAggregates()
+  {
+    return aggregates;
+  }
+
+  /**
+   * Returns the {@link EventKey} for this event.
+   *
+   * @return The {@link EventKey} for this event.
+   */
+  public EventKey getEventKey()
+  {
+    return eventKey;
+  }
+
+  /**
+   * This is a convenience method which returns the values of the key fields in this event's
+   * {@link EventKey}.
+   *
+   * @return The values of the key fields in this event's {@link EventKey}.
+   */
+  public GPOMutable getKeys()
+  {
+    return eventKey.getKey();
+  }
+
+  /**
+   * This is a convenience method which returns the schemaID of this event's {@link EventKey}.
+   *
+   * @return The schemaID of this event's {@link EventKey}.
+   */
+  public int getSchemaID()
+  {
+    return eventKey.getSchemaID();
+  }
+
+  /**
+   * Returns the id of the dimension descriptor (key combination) for which this event contains data.
+   *
+   * @return The id of the dimension descriptor (key combination) for which this event contains data.
+   */
+  public int getDimensionDescriptorID()
+  {
+    return eventKey.getDimensionDescriptorID();
+  }
+
+  /**
+   * Returns the id of the aggregator which is applied to this event's data.
+   *
+   * @return Returns the id of the aggregator which is applied to this event's data.
+   */
+  public int getAggregatorID()
+  {
+    return eventKey.getAggregatorID();
+  }
+
+  /**
+   * Returns the bucketID assigned to this event. The bucketID is useful for this event in the case that the event
+   * is sent to a partitioned HDHT operator. Each partitioned HDHT operator can use the bucketIDs for the buckets it
+   * writes to as a partition key.
+   *
+   * @return The bucketID assigned to this event.
+   */
+  public int getBucketID()
+  {
+    return eventKey.getBucketID();
+  }
+
+  /**
+   * This is a utility method which copies the given src event to the given destination event.
+   *
+   * @param aeDest The destination event.
+   * @param aeSrc  The source event.
+   */
+  public static void copy(DimensionsEvent aeDest, DimensionsEvent aeSrc)
+  {
+    GPOMutable destAggs = aeDest.getAggregates();
+    GPOMutable srcAggs = aeSrc.getAggregates();
+
+    if (srcAggs.getFieldsBoolean() != null) {
+      System.arraycopy(srcAggs.getFieldsBoolean(), 0, destAggs.getFieldsBoolean(), 0,
+          srcAggs.getFieldsBoolean().length);
+    }
+
+    if (srcAggs.getFieldsCharacter() != null) {
+      System.arraycopy(srcAggs.getFieldsCharacter(), 0, destAggs.getFieldsCharacter(), 0,
+          srcAggs.getFieldsCharacter().length);
+    }
+
+    if (srcAggs.getFieldsString() != null) {
+      System.arraycopy(srcAggs.getFieldsString(), 0, destAggs.getFieldsString(), 0, srcAggs.getFieldsString().length);
+    }
+
+    if (srcAggs.getFieldsShort() != null) {
+      System.arraycopy(srcAggs.getFieldsShort(), 0, destAggs.getFieldsShort(), 0, srcAggs.getFieldsShort().length);
+    }
+
+    if (srcAggs.getFieldsInteger() != null) {
+      System.arraycopy(srcAggs.getFieldsInteger(), 0, destAggs.getFieldsInteger(), 0,
+          srcAggs.getFieldsInteger().length);
+    }
+
+    if (srcAggs.getFieldsLong() != null) {
+      System.arraycopy(srcAggs.getFieldsLong(), 0, destAggs.getFieldsLong(), 0, srcAggs.getFieldsLong().length);
+    }
+
+    if (srcAggs.getFieldsFloat() != null) {
+      System.arraycopy(srcAggs.getFieldsFloat(), 0, destAggs.getFieldsFloat(), 0, srcAggs.getFieldsFloat().length);
+    }
+
+    if (srcAggs.getFieldsDouble() != null) {
+      System.arraycopy(srcAggs.getFieldsDouble(), 0, destAggs.getFieldsDouble(), 0, srcAggs.getFieldsDouble().length);
+    }
+  }
+
+  /**
+   * <p>
+   * The {@link EventKey} represents a dimensions combination for a dimensions event. It contains the keys and values
+   * which define a dimensions combination. It's very similar to a {@link DimensionsDescriptor} which is also used to
+   * define part of a dimensions combination. The difference between the two is that a {@link DimensionsDescriptor}
+   * only contains what
+   * keys are included in the combination, not the values of those keys (which the {@link EventKey} has.
+   * </p>
+   * <p>
+   * In addition to holding the keys in a dimensions combination and their values, the event key holds some meta
+   * information.
+   * The meta information included and their purposes are the following:
+   * <ul>
+   * <li><b>bucketID:</b> This is set when the dimension store responsible for storing the data is partitioned. In that
+   * case the bucketID is used as the partitionID.</li>
+   * <li><b>schemaID:</b> This is the id of the {@link DimensionalSchema} that this {@link EventKey} corresponds to
+   * .</li>
+   * <li><b>dimensionDescriptorID:</b> This is the id of the {@link DimensionsDescriptor} that this {@link EventKey}
+   * corresponds to.</li>
+   * <li><b>aggregatorID:</b> This is the id of the aggregator that is used to aggregate the values associated with
+   * this {@link EventKey}
+   * in a {@link DimensionsEvent}.</li>
+   * </ul>
+   * </p>
+   */
+  public static class EventKey implements Serializable
+  {
+    private static final long serialVersionUID = 201503231205L;
+
+    /**
+     * The bucketID assigned to this event key.
+     */
+    private int bucketID;
+    /**
+     * The schemaID corresponding to the {@link DimensionalSchema} that this {@link EventKey}
+     * corresponds to.
+     */
+    private int schemaID;
+    /**
+     * The dimensionsDescriptorID of the {@link DimensionDescriptor} in the corresponding {@link DimensionalSchema}.
+     */
+    private int dimensionDescriptorID;
+    /**
+     * The id of the aggregator which should be used to aggregate the values corresponding to this
+     * {@link EventKey}.
+     */
+    private int aggregatorID;
+    /**
+     * The values of the key fields.
+     */
+    private GPOMutable key;
+
+    /**
+     * Constructor for serialization.
+     */
+    private EventKey()
+    {
+      //For kryo
+    }
+
+    /**
+     * Copy constructor.
+     *
+     * @param eventKey The {@link EventKey} whose data will be copied.
+     */
+    public EventKey(EventKey eventKey)
+    {
+      this.bucketID = eventKey.bucketID;
+      this.schemaID = eventKey.schemaID;
+      this.dimensionDescriptorID = eventKey.dimensionDescriptorID;
+      this.aggregatorID = eventKey.aggregatorID;
+
+      this.key = new GPOMutable(eventKey.getKey());
+    }
+
+    /**
+     * Creates an event key with the given data.
+     *
+     * @param bucketID              The bucketID assigned to this {@link EventKey}.
+     * @param schemaID              The schemaID of the corresponding {@link DimensionalSchema}.
+     * @param dimensionDescriptorID The dimensionDescriptorID of the corresponding
+     * {@link DimensionDescriptor} in the {@link DimensionalSchema}.
+     * @param aggregatorID          The id of the aggregator which should be used to aggregate the values
+     *                              corresponding to this
+     *                              {@link EventKey}.
+     * @param key                   The values of the keys.
+     */
+    public EventKey(int bucketID,
+        int schemaID,
+        int dimensionDescriptorID,
+        int aggregatorID,
+        GPOMutable key)
+    {
+      setBucketID(bucketID);
+      setSchemaID(schemaID);
+      setDimensionDescriptorID(dimensionDescriptorID);
+      setAggregatorID(aggregatorID);
+      setKey(key);
+    }
+
+    /**
+     * Creates an event key with the given data. This constructor assumes that the bucketID will be 0.
+     *
+     * @param schemaID              The schemaID of the corresponding {@link DimensionalSchema}.
+     * @param dimensionDescriptorID The dimensionDescriptorID of the corresponding {@link DimensionDescriptor}.
+     * @param aggregatorID          The id of the aggregator which should be used to aggregate the values
+     *                              corresponding to this
+     *                              {@link EventKey}.
+     * @param key                   The values of the keys.
+     */
+    public EventKey(int schemaID,
+        int dimensionDescriptorID,
+        int aggregatorID,
+        GPOMutable key)
+    {
+      setSchemaID(schemaID);
+      setDimensionDescriptorID(dimensionDescriptorID);
+      setAggregatorID(aggregatorID);
+      setKey(key);
+    }
+
+    /**
+     * Sets the dimension descriptor ID.
+     *
+     * @param dimensionDescriptorID The dimension descriptor ID to set.
+     */
+    private void setDimensionDescriptorID(int dimensionDescriptorID)
+    {
+      this.dimensionDescriptorID = dimensionDescriptorID;
+    }
+
+    /**
+     * Returns the dimension descriptor ID.
+     *
+     * @return The dimension descriptor ID.
+     */
+    public int getDimensionDescriptorID()
+    {
+      return dimensionDescriptorID;
+    }
+
+    /**
+     * Returns the aggregatorID.
+     *
+     * @return The aggregatorID.
+     */
+    public int getAggregatorID()
+    {
+      return aggregatorID;
+    }
+
+    /**
+     * Sets the aggregatorID.
+     *
+     * @param aggregatorID The aggregatorID to set.
+     */
+    private void setAggregatorID(int aggregatorID)
+    {
+      this.aggregatorID = aggregatorID;
+    }
+
+    /**
+     * Returns the schemaID.
+     *
+     * @return The schemaID to set.
+     */
+    public int getSchemaID()
+    {
+      return schemaID;
+    }
+
+    /**
+     * Sets the schemaID.
+     *
+     * @param schemaID The schemaID to set.
+     */
+    private void setSchemaID(int schemaID)
+    {
+      this.schemaID = schemaID;
+    }
+
+    /**
+     * Returns the key values.
+     *
+     * @return The key values.
+     */
+    public GPOMutable getKey()
+    {
+      return key;
+    }
+
+    /**
+     * Sets the bucektID.
+     *
+     * @param bucketID The bucketID.
+     */
+    private void setBucketID(int bucketID)
+    {
+      this.bucketID = bucketID;
+    }
+
+    /**
+     * Gets the bucketID.
+     *
+     * @return The bucketID.
+     */
+    public int getBucketID()
+    {
+      return bucketID;
+    }
+
+    /**
+     * Sets the key values.
+     *
+     * @param key The key values to set.
+     */
+    private void setKey(GPOMutable key)
+    {
+      Preconditions.checkNotNull(key);
+      this.key = key;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      int hash = 3;
+      hash = 97 * hash + this.bucketID;
+      hash = 97 * hash + this.schemaID;
+      hash = 97 * hash + this.dimensionDescriptorID;
+      hash = 97 * hash + this.aggregatorID;
+      hash = 97 * hash + (this.key != null ? this.key.hashCode() : 0);
+      return hash;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (obj == null) {
+        return false;
+      }
+
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+
+      final EventKey other = (EventKey)obj;
+
+      if (this.bucketID != other.bucketID) {
+        return false;
+      }
+
+      if (this.schemaID != other.schemaID) {
+        return false;
+      }
+
+      if (this.dimensionDescriptorID != other.dimensionDescriptorID) {
+        return false;
+      }
+
+      if (this.aggregatorID != other.aggregatorID) {
+        return false;
+      }
+
+      if (this.key != other.key && (this.key == null || !this.key.equals(other.key))) {
+        return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "EventKey{" + "schemaID=" + schemaID + ", dimensionDescriptorID=" + dimensionDescriptorID +
+          ", aggregatorIndex=" + aggregatorID + ", key=" + key + '}';
+    }
+
+    public static List<EventKey> createEventKeys(int schemaId,
+        int dimensionsDescriptorId,
+        int aggregatorId,
+        List<GPOMutable> keys)
+    {
+      List<EventKey> eventKeys = Lists.newArrayList();
+
+      for (GPOMutable key : keys) {
+        eventKeys.add(new EventKey(schemaId, dimensionsDescriptorId, aggregatorId, key));
+      }
+
+      return eventKeys;
+    }
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int hash = 5;
+    hash = 79 * hash + (this.aggregates != null ? this.aggregates.hashCode() : 0);
+    hash = 79 * hash + (this.eventKey != null ? this.eventKey.hashCode() : 0);
+    return hash;
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    final DimensionsEvent other = (DimensionsEvent)obj;
+    if (this.aggregates != other.aggregates && (this.aggregates == null || !this.aggregates.equals(other.aggregates))) {
+      return false;
+    }
+    if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) {
+      return false;
+    }
+    return true;
+  }
+
+  public static class InputEvent extends DimensionsEvent
+  {
+    private static final long serialVersionUID = 201506210406L;
+    public boolean used = false;
+
+    private InputEvent()
+    {
+    }
+
+    /**
+     * This creates a {@link DimensionsEvent} from the given event key and aggregates.
+     *
+     * @param eventKey   The key from which to create a {@link DimensionsEvent}.
+     * @param aggregates The aggregates from which to create {@link DimensionsEvent}.
+     */
+    public InputEvent(EventKey eventKey,
+        GPOMutable aggregates)
+    {
+      setEventKey(eventKey);
+      setAggregates(aggregates);
+    }
+
+    /**
+     * Creates a DimensionsEvent with the given key values, aggregates and ids.
+     *
+     * @param keys                  The values for fields in the key.
+     * @param aggregates            The values for fields in the aggregate.
+     * @param bucketID              The bucketID
+     * @param schemaID              The schemaID.
+     * @param dimensionDescriptorID The dimensionsDescriptorID.
+     * @param aggregatorIndex       The aggregatorIndex assigned to this event by the unifier.
+     */
+    public InputEvent(GPOMutable keys,
+        GPOMutable aggregates,
+        int bucketID,
+        int schemaID,
+        int dimensionDescriptorID,
+        int aggregatorIndex)
+    {
+      this.eventKey = new EventKey(bucketID,
+          schemaID,
+          dimensionDescriptorID,
+          aggregatorIndex,
+          keys);
+      setAggregates(aggregates);
+    }
+
+    /**
+     * This creates an event with the given data. Note, this constructor assumes that the bucketID will be 0.
+     *
+     * @param keys                  The value for fields in the key.
+     * @param aggregates            The value for fields in the aggregate.
+     * @param schemaID              The schemaID.
+     * @param dimensionDescriptorID The dimensionsDescriptorID.
+     * @param aggregatorIndex       The aggregatorIndex assigned to this event by the unifier.
+     */
+    public InputEvent(GPOMutable keys,
+        GPOMutable aggregates,
+        int schemaID,
+        int dimensionDescriptorID,
+        int aggregatorIndex)
+    {
+      this.eventKey = new EventKey(schemaID,
+          dimensionDescriptorID,
+          aggregatorIndex,
+          keys);
+      setAggregates(aggregates);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return GPOUtils.hashcode(this.getKeys());
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      final DimensionsEvent other = (DimensionsEvent)obj;
+
+      if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) {
+        return false;
+      }
+      return true;
+    }
+  }
+
+  public static class Aggregate extends DimensionsEvent implements AggregateEvent
+  {
+    private static final long serialVersionUID = 201506190110L;
+
+    /**
+     * This is the aggregatorIndex assigned to this event.
+     */
+    protected int aggregatorIndex;
+    private GPOMutable metaData;
+
+    public Aggregate()
+    {
+      //for kryo and for extending classes
+    }
+
+    /**
+     * This creates a {@link DimensionsEvent} from the given event key and aggregates.
+     *
+     * @param eventKey   The key from which to create a {@link DimensionsEvent}.
+     * @param aggregates The aggregates from which to create {@link DimensionsEvent}.
+     */
+    public Aggregate(EventKey eventKey,
+        GPOMutable aggregates)
+    {
+      setEventKey(eventKey);
+      setAggregates(aggregates);
+    }
+
+    public Aggregate(EventKey eventKey,
+        GPOMutable aggregates,
+        GPOMutable metaData)
+    {
+      super(eventKey,
+          aggregates);
+
+      this.metaData = metaData;
+    }
+
+    /**
+     * Creates a DimensionsEvent with the given key values, aggregates and ids.
+     *
+     * @param keys                  The values for fields in the key.
+     * @param aggregates            The values for fields in the aggregate.
+     * @param bucketID              The bucketID
+     * @param schemaID              The schemaID.
+     * @param dimensionDescriptorID The dimensionsDescriptorID.
+     * @param aggregatorIndex       The aggregatorIndex assigned to this event by the unifier.
+     */
+    public Aggregate(GPOMutable keys,
+        GPOMutable aggregates,
+        int bucketID,
+        int schemaID,
+        int dimensionDescriptorID,
+        int aggregatorIndex)
+    {
+      this.eventKey = new EventKey(bucketID,
+          schemaID,
+          dimensionDescriptorID,
+          aggregatorIndex,
+          keys);
+      setAggregates(aggregates);
+    }
+
+    public Aggregate(GPOMutable keys,
+        GPOMutable aggregates,
+        GPOMutable metaData,
+        int bucketID,
+        int schemaID,
+        int dimensionDescriptorID,
+        int aggregatorIndex)
+    {
+      this(keys,
+          aggregates,
+          bucketID,
+          schemaID,
+          dimensionDescriptorID,
+          aggregatorIndex);
+
+      this.metaData = metaData;
+    }
+
+    /**
+     * This creates an event with the given data. Note, this constructor assumes that the bucketID will be 0.
+     *
+     * @param keys                  The value for fields in the key.
+     * @param aggregates            The value for fields in the aggregate.
+     * @param schemaID              The schemaID.
+     * @param dimensionDescriptorID The dimensionsDescriptorID.
+     * @param aggregatorIndex       The aggregatorIndex assigned to this event by the unifier.
+     */
+    public Aggregate(GPOMutable keys,
+        GPOMutable aggregates,
+        int schemaID,
+        int dimensionDescriptorID,
+        int aggregatorIndex)
+    {
+      this.eventKey = new EventKey(schemaID,
+          dimensionDescriptorID,
+          aggregatorIndex,
+          keys);
+      setAggregates(aggregates);
+    }
+
+    public Aggregate(GPOMutable keys,
+        GPOMutable aggregates,
+        GPOMutable metaData,
+        int schemaID,
+        int dimensionDescriptorID,
+        int aggregatorIndex)
+    {
+      this(keys,
+          aggregates,
+          schemaID,
+          dimensionDescriptorID,
+          aggregatorIndex);
+
+      this.metaData = metaData;
+    }
+
+    public void setMetaData(GPOMutable metaData)
+    {
+      this.metaData = metaData;
+    }
+
+    public GPOMutable getMetaData()
+    {
+      return metaData;
+    }
+
+    public void setAggregatorIndex(int aggregatorIndex)
+    {
+      this.aggregatorIndex = aggregatorIndex;
+    }
+
+    @Override
+    public int getAggregatorIndex()
+    {
+      return aggregatorIndex;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return GPOUtils.hashcode(this.getKeys());
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      final DimensionsEvent other = (DimensionsEvent)obj;
+
+      if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) {
+        return false;
+      }
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java
new file mode 100644
index 0000000..f92bfbf
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.dimensions.aggregator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.lib.appdata.schemas.CustomTimeBucket;
+import com.datatorrent.lib.dimensions.DimensionsConversionContext;
+import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
+import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey;
+import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
+
+/**
+ * * <p>
+ * {@link IncrementalAggregator}s perform aggregations in place, on a field by field basis. For example if we have a
+ * field cost, an incremental aggregator would take a new value of cost and aggregate it to an aggregate value for
+ * cost. No fields except the cost field are used in the computation of the cost aggregation in the case of an
+ * {@link IncrementalAggregator}.
+ * </p>
+ * <p>
+ * {@link IncrementalAggregator}s are intended to be used with subclasses of
+ * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema}. The way in which
+ * {@link IncrementalAggregator}s are used in this context is that a batch of fields to be aggregated by the aggregator
+ * are provided in the form of an {@link InputEvent}. For example, if there are two fields (cost and revenue), which
+ * will be aggregated by a sum aggregator, both of those fields will be included in the {@link InputEvent} passed to
+ * the sum aggregator. And the {DimensionsEventregate} event produced by the sum aggregator will contain two fields,
+ * one for cost and one for revenue.
+ * </p>
+ * 
+ */
+public abstract class AbstractIncrementalAggregator implements IncrementalAggregator
+{
+  private static final long serialVersionUID = 201506211153L;
+
+  /**
+   * The conversion context for this aggregator.
+   */
+  protected DimensionsConversionContext context;
+
+  public AbstractIncrementalAggregator()
+  {
+  }
+
+  @Override
+  public void setDimensionsConversionContext(DimensionsConversionContext context)
+  {
+    this.context = Preconditions.checkNotNull(context);
+  }
+
+  @Override
+  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
+  {
+    src.used = true;
+    Aggregate aggregate = createAggregate(src,
+        context,
+        aggregatorIndex);
+    return aggregate;
+  }
+
+  @Override
+  public int hashCode(InputEvent inputEvent)
+  {
+    long timestamp = -1L;
+    boolean hasTime = this.context.inputTimestampIndex != -1
+        && this.context.outputTimebucketIndex != -1;
+
+    if (hasTime) {
+      timestamp = inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex];
+      inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex]
+          = this.context.dd.getCustomTimeBucket().roundDown(timestamp);
+    }
+
+    int hashCode = GPOUtils.indirectHashcode(inputEvent.getKeys(), context.indexSubsetKeys);
+
+    if (hasTime) {
+      inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex] = timestamp;
+    }
+
+    return hashCode;
+  }
+
+  @Override
+  public boolean equals(InputEvent inputEvent1, InputEvent inputEvent2)
+  {
+    long timestamp1 = 0;
+    long timestamp2 = 0;
+
+    if (context.inputTimestampIndex != -1) {
+      timestamp1 = inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex];
+      inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex] =
+          context.dd.getCustomTimeBucket().roundDown(timestamp1);
+
+      timestamp2 = inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex];
+      inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex] =
+          context.dd.getCustomTimeBucket().roundDown(timestamp2);
+    }
+
+    boolean equals = GPOUtils.subsetEquals(inputEvent2.getKeys(),
+        inputEvent1.getKeys(),
+        context.indexSubsetKeys);
+
+    if (context.inputTimestampIndex != -1) {
+      inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp1;
+      inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp2;
+    }
+
+    return equals;
+  }
+
+  /**
+   * Creates an {@link Aggregate} from the given {@link InputEvent}.
+   *
+   * @param inputEvent      The {@link InputEvent} to unpack into an {@link Aggregate}.
+   * @param context         The conversion context required to transform the {@link InputEvent} into
+   *                        the correct {@link Aggregate}.
+   * @param aggregatorIndex The aggregatorIndex assigned to this {@link Aggregate}.
+   * @return The converted {@link Aggregate}.
+   */
+  public static Aggregate createAggregate(InputEvent inputEvent,
+      DimensionsConversionContext context,
+      int aggregatorIndex)
+  {
+    GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor);
+    EventKey eventKey = createEventKey(inputEvent,
+        context,
+        aggregatorIndex);
+
+    Aggregate aggregate = new Aggregate(eventKey,
+        aggregates);
+    aggregate.setAggregatorIndex(aggregatorIndex);
+
+    return aggregate;
+  }
+
+  /**
+   * Creates an {@link EventKey} from the given {@link InputEvent}.
+   *
+   * @param inputEvent      The {@link InputEvent} to extract an {@link EventKey} from.
+   * @param context         The conversion context required to extract the {@link EventKey} from
+   *                        the given {@link InputEvent}.
+   * @param aggregatorIndex The aggregatorIndex to assign to this {@link InputEvent}.
+   * @return The {@link EventKey} extracted from the given {@link InputEvent}.
+   */
+  public static EventKey createEventKey(InputEvent inputEvent,
+      DimensionsConversionContext context,
+      int aggregatorIndex)
+  {
+    GPOMutable keys = new GPOMutable(context.keyDescriptor);
+    GPOUtils.indirectCopy(keys, inputEvent.getKeys(), context.indexSubsetKeys);
+
+    if (context.outputTimebucketIndex >= 0) {
+      CustomTimeBucket timeBucket = context.dd.getCustomTimeBucket();
+
+      keys.getFieldsInteger()[context.outputTimebucketIndex] = context.customTimeBucketRegistry.getTimeBucketId(
+          timeBucket);
+      keys.getFieldsLong()[context.outputTimestampIndex] =
+          timeBucket.roundDown(inputEvent.getKeys().getFieldsLong()[context.inputTimestampIndex]);
+    }
+
+    EventKey eventKey = new EventKey(context.schemaID,
+        context.dimensionsDescriptorID,
+        context.aggregatorID,
+        keys);
+
+    return eventKey;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractIncrementalAggregator.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java
new file mode 100644
index 0000000..c15bf25
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.dimensions.aggregator;
+
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import com.datatorrent.api.annotation.Name;
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.schemas.Fields;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.Type;
+
+/**
+ * This is the average {@link OTFAggregator}.
+ *
+ * @since 3.1.0
+ */
+@Name("AVG")
+public class AggregatorAverage implements OTFAggregator
+{
+  private static final long serialVersionUID = 20154301644L;
+
+  /**
+   * The array index of the sum aggregates in the argument list of the {@link #aggregate} function.
+   */
+  public static int SUM_INDEX = 0;
+  /**
+   * The array index of the count aggregates in the argument list of the {@link #aggregate} function.
+   */
+  public static int COUNT_INDEX = 1;
+  /**
+   * The singleton instance of this class.
+   */
+  public static final AggregatorAverage INSTANCE = new AggregatorAverage();
+
+  /**
+   * The list of {@link IncrementalAggregator}s that this {@link OTFAggregator} depends on.
+   */
+  public static final transient List<Class<? extends IncrementalAggregator>> CHILD_AGGREGATORS =
+      ImmutableList.of(AggregatorIncrementalType.SUM.getAggregator().getClass(),
+      AggregatorIncrementalType.COUNT.getAggregator().getClass());
+
+  /**
+   * Constructor for singleton pattern.
+   */
+  protected AggregatorAverage()
+  {
+    //Do nothing
+  }
+
+  @Override
+  public List<Class<? extends IncrementalAggregator>> getChildAggregators()
+  {
+    return CHILD_AGGREGATORS;
+  }
+
+  @Override
+  public GPOMutable aggregate(GPOMutable... aggregates)
+  {
+    Preconditions.checkArgument(aggregates.length == getChildAggregators().size(),
+        "The number of arguments " + aggregates.length +
+        " should be the same as the number of child aggregators " + getChildAggregators().size());
+
+    GPOMutable sumAggregation = aggregates[SUM_INDEX];
+    GPOMutable countAggregation = aggregates[COUNT_INDEX];
+
+    FieldsDescriptor fieldsDescriptor = sumAggregation.getFieldDescriptor();
+    Fields fields = fieldsDescriptor.getFields();
+    GPOMutable result = new GPOMutable(AggregatorUtils.getOutputFieldsDescriptor(fields, this));
+
+    long count = countAggregation.getFieldsLong()[0];
+
+    for (String field : fields.getFields()) {
+      Type type = sumAggregation.getFieldDescriptor().getType(field);
+
+      switch (type) {
+        case BYTE: {
+          double val = ((double)sumAggregation.getFieldByte(field)) /
+              ((double)count);
+          result.setField(field, val);
+          break;
+        }
+        case SHORT: {
+          double val = ((double)sumAggregation.getFieldShort(field)) /
+              ((double)count);
+          result.setField(field, val);
+          break;
+        }
+        case INTEGER: {
+          double val = ((double)sumAggregation.getFieldInt(field)) /
+              ((double)count);
+          result.setField(field, val);
+          break;
+        }
+        case LONG: {
+          double val = ((double)sumAggregation.getFieldLong(field)) /
+              ((double)count);
+          result.setField(field, val);
+          break;
+        }
+        case FLOAT: {
+          double val = sumAggregation.getFieldFloat(field) /
+              ((double)count);
+          result.setField(field, val);
+          break;
+        }
+        case DOUBLE: {
+          double val = sumAggregation.getFieldDouble(field) /
+              ((double)count);
+          result.setField(field, val);
+          break;
+        }
+        default: {
+          throw new UnsupportedOperationException("The type " + type + " is not supported.");
+        }
+      }
+    }
+
+    return result;
+  }
+
+  @Override
+  public Type getOutputType()
+  {
+    return Type.DOUBLE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java
new file mode 100644
index 0000000..8566e1c
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.dimensions.aggregator;
+
+import java.util.Collections;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.annotation.Name;
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.Type;
+import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
+import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey;
+import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
+
+/**
+ * This {@link IncrementalAggregator} performs a count of the number of times an input is encountered.
+ *
+ * @since 3.1.0
+ */
+@Name("COUNT")
+public class AggregatorCount extends AbstractIncrementalAggregator
+{
+  private static final long serialVersionUID = 20154301645L;
+
+  /**
+   * This is a map whose keys represent input types and whose values
+   * represent the corresponding output types.
+   */
+  public static final transient Map<Type, Type> TYPE_CONVERSION_MAP;
+
+  static {
+    Map<Type, Type> typeConversionMap = Maps.newHashMap();
+
+    for (Type type : Type.values()) {
+      typeConversionMap.put(type, Type.LONG);
+    }
+
+    TYPE_CONVERSION_MAP = Collections.unmodifiableMap(typeConversionMap);
+  }
+
+  public AggregatorCount()
+  {
+    //Do nothing
+  }
+
+  @Override
+  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
+  {
+    src.used = true;
+    GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor);
+    GPOMutable keys = new GPOMutable(context.keyDescriptor);
+    GPOUtils.indirectCopy(keys, src.getKeys(), context.indexSubsetKeys);
+
+    EventKey eventKey = createEventKey(src,
+        context,
+        aggregatorIndex);
+
+    long[] longFields = aggregates.getFieldsLong();
+
+    for (int index = 0;
+        index < longFields.length;
+        index++) {
+      longFields[index] = 0;
+    }
+
+    return new Aggregate(eventKey,
+        aggregates);
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, InputEvent src)
+  {
+    long[] fieldsLong = dest.getAggregates().getFieldsLong();
+
+    for (int index = 0;
+        index < fieldsLong.length;
+        index++) {
+      //increment count
+      fieldsLong[index]++;
+    }
+  }
+
+  @Override
+  public void aggregate(Aggregate destAgg, Aggregate srcAgg)
+  {
+    long[] destLongs = destAgg.getAggregates().getFieldsLong();
+    long[] srcLongs = srcAgg.getAggregates().getFieldsLong();
+
+    for (int index = 0;
+        index < destLongs.length;
+        index++) {
+      //aggregate count
+      destLongs[index] += srcLongs[index];
+    }
+  }
+
+  @Override
+  public Type getOutputType(Type inputType)
+  {
+    return TYPE_CONVERSION_MAP.get(inputType);
+  }
+
+  @Override
+  public FieldsDescriptor getMetaDataDescriptor()
+  {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java
new file mode 100644
index 0000000..f5924b8
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.dimensions.aggregator;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.annotation.Name;
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.lib.appdata.gpo.Serde;
+import com.datatorrent.lib.appdata.gpo.SerdeFieldsDescriptor;
+import com.datatorrent.lib.appdata.gpo.SerdeListGPOMutable;
+import com.datatorrent.lib.appdata.gpo.SerdeObjectPayloadFix;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.Type;
+import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
+import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
+
+@Name("CUM_SUM")
+/**
+ * @since 3.1.0
+ */
+
+public class AggregatorCumSum extends AggregatorSum
+{
+  private static final long serialVersionUID = 201506280518L;
+
+  public static final int KEY_FD_INDEX = 0;
+  public static final int AGGREGATE_FD_INDEX = 1;
+  public static final int KEYS_INDEX = 2;
+  public static final int AGGREGATES_INDEX = 3;
+
+  public static final FieldsDescriptor META_DATA_FIELDS_DESCRIPTOR;
+
+  static {
+    Map<String, Type> fieldToType = Maps.newHashMap();
+    fieldToType.put("fdkeys", Type.OBJECT);
+    fieldToType.put("fdvalues", Type.OBJECT);
+    fieldToType.put("keys", Type.OBJECT);
+    fieldToType.put("values", Type.OBJECT);
+
+    Map<String, Serde> fieldToSerde = Maps.newHashMap();
+    fieldToSerde.put("fdkeys", SerdeFieldsDescriptor.INSTANCE);
+    fieldToSerde.put("fdvalues", SerdeFieldsDescriptor.INSTANCE);
+    fieldToSerde.put("keys", SerdeListGPOMutable.INSTANCE);
+    fieldToSerde.put("values", SerdeListGPOMutable.INSTANCE);
+
+    META_DATA_FIELDS_DESCRIPTOR = new FieldsDescriptor(fieldToType,
+        fieldToSerde,
+        new PayloadFix());
+  }
+
+  public AggregatorCumSum()
+  {
+  }
+
+  @Override
+  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
+  {
+    src.used = true;
+    Aggregate agg = createAggregate(src,
+        context,
+        aggregatorIndex);
+
+    GPOUtils.indirectCopy(agg.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
+
+    GPOMutable metaData = new GPOMutable(getMetaDataDescriptor());
+
+    GPOMutable fullKey = new GPOMutable(src.getKeys());
+
+    if (context.inputTimestampIndex >= 0) {
+      fullKey.getFieldsLong()[context.inputTimestampIndex] = -1L;
+    }
+
+    List<GPOMutable> keys = Lists.newArrayList(fullKey);
+
+    GPOMutable value = new GPOMutable(agg.getAggregates());
+    List<GPOMutable> values = Lists.newArrayList(value);
+
+    metaData.getFieldsObject()[KEY_FD_INDEX] = fullKey.getFieldDescriptor();
+    metaData.getFieldsObject()[AGGREGATE_FD_INDEX] = value.getFieldDescriptor();
+    metaData.getFieldsObject()[KEYS_INDEX] = keys;
+    metaData.getFieldsObject()[AGGREGATES_INDEX] = values;
+    agg.setMetaData(metaData);
+
+    return agg;
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, InputEvent src)
+  {
+    @SuppressWarnings("unchecked")
+    List<GPOMutable> destKeys =
+        (List<GPOMutable>)dest.getMetaData().getFieldsObject()[KEYS_INDEX];
+
+    @SuppressWarnings("unchecked")
+    List<GPOMutable> destAggregates =
+        (List<GPOMutable>)dest.getMetaData().getFieldsObject()[AGGREGATES_INDEX];
+
+    long timestamp = 0L;
+
+    if (context.inputTimestampIndex >= 0) {
+      timestamp = src.getKeys().getFieldsLong()[context.inputTimestampIndex];
+      src.getKeys().getFieldsLong()[context.inputTimestampIndex] = -1L;
+    }
+
+    if (!contains(destKeys, src.getKeys())) {
+      destKeys.add(new GPOMutable(src.getKeys()));
+
+      GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor);
+      GPOUtils.indirectCopy(aggregates, src.getAggregates(), context.indexSubsetAggregates);
+
+      destAggregates.add(aggregates);
+
+      this.aggregateAggs(dest.getAggregates(), aggregates);
+    }
+
+    if (context.inputTimestampIndex >= 0) {
+      src.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp;
+    }
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, Aggregate src)
+  {
+    dest.getMetaData().applyObjectPayloadFix();
+    src.getMetaData().applyObjectPayloadFix();
+
+    @SuppressWarnings("unchecked")
+    List<GPOMutable> destKeys =
+        (List<GPOMutable>)dest.getMetaData().getFieldsObject()[KEYS_INDEX];
+
+    @SuppressWarnings("unchecked")
+    List<GPOMutable> srcKeys =
+        (List<GPOMutable>)src.getMetaData().getFieldsObject()[KEYS_INDEX];
+
+    @SuppressWarnings("unchecked")
+    List<GPOMutable> destAggregates =
+        (List<GPOMutable>)dest.getMetaData().getFieldsObject()[AGGREGATES_INDEX];
+
+    @SuppressWarnings("unchecked")
+    List<GPOMutable> srcAggregates =
+        (List<GPOMutable>)src.getMetaData().getFieldsObject()[AGGREGATES_INDEX];
+
+    List<GPOMutable> newKeys = Lists.newArrayList();
+    List<GPOMutable> newAggs = Lists.newArrayList();
+
+    for (int index = 0;
+        index < srcKeys.size();
+        index++) {
+      GPOMutable currentSrcKey = srcKeys.get(index);
+      GPOMutable currentSrcAgg = srcAggregates.get(index);
+
+      if (!contains(destKeys, currentSrcKey)) {
+        newKeys.add(currentSrcKey);
+        newAggs.add(currentSrcAgg);
+
+        this.aggregateAggs(dest.getAggregates(), currentSrcAgg);
+      }
+    }
+
+    destKeys.addAll(newKeys);
+    destAggregates.addAll(newAggs);
+  }
+
+  private boolean contains(List<GPOMutable> mutables, GPOMutable mutable)
+  {
+    for (int index = 0;
+        index < mutables.size();
+        index++) {
+      GPOMutable mutableFromList = mutables.get(index);
+
+      if (GPOUtils.equals(mutableFromList, mutable)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public FieldsDescriptor getMetaDataDescriptor()
+  {
+    return META_DATA_FIELDS_DESCRIPTOR;
+  }
+
+  public static class PayloadFix implements SerdeObjectPayloadFix
+  {
+    @Override
+    public void fix(Object[] objects)
+    {
+      FieldsDescriptor keyfd = (FieldsDescriptor)objects[KEY_FD_INDEX];
+      FieldsDescriptor valuefd = (FieldsDescriptor)objects[AGGREGATE_FD_INDEX];
+
+      @SuppressWarnings("unchecked")
+      List<GPOMutable> keyMutables = (List<GPOMutable>)objects[KEYS_INDEX];
+      @SuppressWarnings("unchecked")
+      List<GPOMutable> aggregateMutables = (List<GPOMutable>)objects[AGGREGATES_INDEX];
+
+      fix(keyfd, keyMutables);
+      fix(valuefd, aggregateMutables);
+    }
+
+    private void fix(FieldsDescriptor fd, List<GPOMutable> mutables)
+    {
+      for (int index = 0;
+          index < mutables.size();
+          index++) {
+        mutables.get(index).setFieldDescriptor(fd);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java
new file mode 100644
index 0000000..e1bf7d4
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.dimensions.aggregator;
+
+import com.datatorrent.api.annotation.Name;
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.Type;
+import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
+import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
+
+/**
+ * <p>
+ * This aggregator creates an aggregate out of the first {@link InputEvent} encountered by this aggregator. All
+ * subsequent
+ * {@link InputEvent}s are ignored.
+ * </p>
+ * <p>
+ * <b>Note:</b> when aggregates are combined in a unifier it is not possible to tell which came first or last, so
+ * one is picked arbitrarily to be the first.
+ * </p>
+ *
+ * @since 3.1.0
+ */
+@Name("FIRST")
+public class AggregatorFirst extends AbstractIncrementalAggregator
+{
+  private static final long serialVersionUID = 20154301646L;
+
+  public AggregatorFirst()
+  {
+    //Do nothing
+  }
+
+  @Override
+  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
+  {
+    Aggregate aggregate = super.getGroup(src, aggregatorIndex);
+
+    GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
+
+    return aggregate;
+  }
+
+  @Override
+  public Type getOutputType(Type inputType)
+  {
+    return AggregatorUtils.IDENTITY_TYPE_MAP.get(inputType);
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, InputEvent src)
+  {
+    //Ignore
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, Aggregate src)
+  {
+    //Ignore
+  }
+
+  @Override
+  public FieldsDescriptor getMetaDataDescriptor()
+  {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java
new file mode 100644
index 0000000..09190e1
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.dimensions.aggregator;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+/**
+ * @since 3.1.0
+ */
+
+public enum AggregatorIncrementalType
+{
+  SUM(new AggregatorSum()),
+  MIN(new AggregatorMin()),
+  MAX(new AggregatorMax()),
+  COUNT(new AggregatorCount()),
+  LAST(new AggregatorLast()),
+  FIRST(new AggregatorFirst()),
+  CUM_SUM(new AggregatorCumSum());
+
+  public static final Map<String, Integer> NAME_TO_ORDINAL;
+  public static final Map<String, IncrementalAggregator> NAME_TO_AGGREGATOR;
+
+  private IncrementalAggregator aggregator;
+
+  static {
+    Map<String, Integer> nameToOrdinal = Maps.newHashMap();
+    Map<String, IncrementalAggregator> nameToAggregator = Maps.newHashMap();
+
+    for (AggregatorIncrementalType aggType : AggregatorIncrementalType.values()) {
+      nameToOrdinal.put(aggType.name(), aggType.ordinal());
+      nameToAggregator.put(aggType.name(), aggType.getAggregator());
+    }
+
+    NAME_TO_ORDINAL = Collections.unmodifiableMap(nameToOrdinal);
+    NAME_TO_AGGREGATOR = Collections.unmodifiableMap(nameToAggregator);
+  }
+
+  AggregatorIncrementalType(IncrementalAggregator aggregator)
+  {
+    setAggregator(aggregator);
+  }
+
+  private void setAggregator(IncrementalAggregator aggregator)
+  {
+    Preconditions.checkNotNull(aggregator);
+    this.aggregator = aggregator;
+  }
+
+  public IncrementalAggregator getAggregator()
+  {
+    return aggregator;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(AggregatorIncrementalType.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/fcdd74de/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java
new file mode 100644
index 0000000..f727036
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.dimensions.aggregator;
+
+import com.datatorrent.api.annotation.Name;
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.Type;
+import com.datatorrent.lib.dimensions.DimensionsEvent;
+import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate;
+import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent;
+
+/**
+ * <p>
+ * This aggregator creates an aggregate out of the last {@link InputEvent} encountered by this aggregator. All previous
+ * {@link InputEvent}s are ignored.
+ * </p>
+ * <p>
+ * <b>Note:</b> when aggregates are combined in a unifier it is not possible to tell which came first or last, so
+ * one is picked arbitrarily to be the last.
+ * </p>
+ *
+ * @since 3.1.0
+ */
+@Name("LAST")
+public class AggregatorLast extends AbstractIncrementalAggregator
+{
+  private static final long serialVersionUID = 20154301647L;
+
+  public AggregatorLast()
+  {
+    //Do nothing
+  }
+
+  @Override
+  public Aggregate getGroup(InputEvent src, int aggregatorIndex)
+  {
+    Aggregate aggregate = super.getGroup(src, aggregatorIndex);
+
+    GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
+
+    return aggregate;
+  }
+
+  @Override
+  public Type getOutputType(Type inputType)
+  {
+    return AggregatorUtils.IDENTITY_TYPE_MAP.get(inputType);
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, InputEvent src)
+  {
+    GPOUtils.indirectCopy(dest.getAggregates(), src.getAggregates(), context.indexSubsetAggregates);
+  }
+
+  @Override
+  public void aggregate(Aggregate dest, Aggregate src)
+  {
+    DimensionsEvent.copy(dest, src);
+  }
+
+  @Override
+  public FieldsDescriptor getMetaDataDescriptor()
+  {
+    return null;
+  }
+}



Mime
View raw message