apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From timothyfar...@apache.org
Subject [2/3] incubator-apex-malhar git commit: APEXMALHAR-2055 #resolve #comment Move dimensions related feature to Malhar
Date Fri, 15 Apr 2016 01:53:19 GMT
APEXMALHAR-2055 #resolve #comment Move dimensions related feature to Malhar


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/5bd1a99e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/5bd1a99e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/5bd1a99e

Branch: refs/heads/master
Commit: 5bd1a99e2d8591782404c6477cb5d5d90aa6bc59
Parents: fac986b
Author: brightchen <bright@datatorrent.com>
Authored: Wed Apr 13 18:07:41 2016 -0700
Committer: brightchen <bright@datatorrent.com>
Committed: Thu Apr 14 18:27:06 2016 -0700

----------------------------------------------------------------------
 .gitignore                                      |    3 +-
 .../lib/appdata/gpo/SerdeMapPrimitive.java      |   98 ++
 .../schemas/DimensionalConfigurationSchema.java | 1021 +++++++++++++++---
 .../lib/appdata/schemas/DimensionalSchema.java  |   45 +-
 .../aggregator/AbstractCompositeAggregator.java |  166 +++
 .../AbstractCompositeAggregatorFactory.java     |   49 +
 .../aggregator/AbstractTopBottomAggregator.java |  306 ++++++
 .../dimensions/aggregator/AggregatorBottom.java |   28 +
 .../aggregator/AggregatorRegistry.java          |   87 +-
 .../dimensions/aggregator/AggregatorTop.java    |   28 +
 .../aggregator/AggregatorTopBottomType.java     |   26 +
 .../dimensions/aggregator/AggregatorUtils.java  |   20 +
 .../aggregator/CompositeAggregator.java         |   61 ++
 .../aggregator/CompositeAggregatorFactory.java  |   57 +
 .../DefaultCompositeAggregatorFactory.java      |   64 ++
 .../aggregator/TopBottomAggregatorFactory.java  |   96 ++
 .../lib/io/fs/FileSplitterBaseTest.java         |    2 +-
 .../com/datatorrent/lib/util/TestUtils.java     |    5 +
 18 files changed, 1987 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5bd1a99e/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 541471a..d69387d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,4 +10,5 @@ target/
 npm-debug.log
 nb-configuration.xml
 hadoop.log
-site/
\ No newline at end of file
+site/
+.checkstyle

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5bd1a99e/library/src/main/java/com/datatorrent/lib/appdata/gpo/SerdeMapPrimitive.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/gpo/SerdeMapPrimitive.java b/library/src/main/java/com/datatorrent/lib/appdata/gpo/SerdeMapPrimitive.java
new file mode 100644
index 0000000..3f5ae4a
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/appdata/gpo/SerdeMapPrimitive.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.appdata.gpo;
+
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.lib.appdata.schemas.Type;
+
+/**
+ * TODO: this class can move to Malhar. put in Megh for implementing TOP/BOTTOM dimension computation.
+ *
+ */
+public class SerdeMapPrimitive  implements Serde
+{
+  public static final SerdeMapPrimitive INSTANCE = new SerdeMapPrimitive();
+
+  private final GPOByteArrayList bytes = new GPOByteArrayList();
+
+  private SerdeMapPrimitive()
+  {
+  }
+
+  @Override
+  public synchronized byte[] serializeObject(Object object)
+  {
+    @SuppressWarnings("unchecked")
+    Map<Object, Object> primitiveMap = (Map<Object, Object>)object;
+
+    for (Map.Entry<Object, Object> entry : primitiveMap.entrySet() ) {
+      serializePrimitive(entry.getKey(), bytes);
+      serializePrimitive(entry.getValue(), bytes);
+    }
+
+    byte[] serializedBytes = bytes.toByteArray();
+    bytes.clear();
+    bytes.add(GPOUtils.serializeInt(serializedBytes.length));
+    bytes.add(serializedBytes);
+    serializedBytes = bytes.toByteArray();
+    bytes.clear();
+    return serializedBytes;
+  }
+
+  protected void serializePrimitive(Object object, GPOByteArrayList bytes)
+  {
+    Type type = Type.CLASS_TO_TYPE.get(object.getClass());
+
+    if (type == null || type == Type.OBJECT) {
+      throw new IllegalArgumentException("Cannot serialize objects of class " + object.getClass());
+    }
+
+    bytes.add(GPOUtils.serializeInt(type.ordinal()));
+    GPOType gpoType = GPOType.GPO_TYPE_ARRAY[type.ordinal()];
+    bytes.add(gpoType.serialize(object));
+  }
+  
+  @Override
+  public synchronized Object deserializeObject(byte[] objectBytes, MutableInt offset)
+  {
+    int length = GPOUtils.deserializeInt(objectBytes, offset);
+    int startIndex = offset.intValue();
+
+    Map<Object, Object> primitiveMap = Maps.newHashMap();
+
+    while (startIndex + length > offset.intValue()) {
+      int typeOrdinal = GPOUtils.deserializeInt(objectBytes, offset);
+      GPOType gpoType = GPOType.GPO_TYPE_ARRAY[typeOrdinal];
+      Object key = gpoType.deserialize(objectBytes, offset);
+      
+      typeOrdinal = GPOUtils.deserializeInt(objectBytes, offset);
+      gpoType = GPOType.GPO_TYPE_ARRAY[typeOrdinal];
+      Object value = gpoType.deserialize(objectBytes, offset);
+      primitiveMap.put(key, value);
+    }
+
+    return primitiveMap;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5bd1a99e/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java
index 1e048c4..5e82256 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java
@@ -19,6 +19,7 @@
 package com.datatorrent.lib.appdata.schemas;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -28,8 +29,13 @@ import javax.validation.constraints.NotNull;
 
 import org.apache.apex.malhar.lib.dimensions.CustomTimeBucketRegistry;
 import org.apache.apex.malhar.lib.dimensions.DimensionsDescriptor;
+import org.apache.apex.malhar.lib.dimensions.aggregator.AbstractCompositeAggregator;
+import org.apache.apex.malhar.lib.dimensions.aggregator.AbstractTopBottomAggregator;
 import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorRegistry;
 import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorUtils;
+import org.apache.apex.malhar.lib.dimensions.aggregator.CompositeAggregator;
+import org.apache.apex.malhar.lib.dimensions.aggregator.CompositeAggregatorFactory;
+import org.apache.apex.malhar.lib.dimensions.aggregator.DefaultCompositeAggregatorFactory;
 import org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator;
 import org.apache.apex.malhar.lib.dimensions.aggregator.OTFAggregator;
 import org.codehaus.jettison.json.JSONArray;
@@ -148,6 +154,10 @@ public class DimensionalConfigurationSchema
    */
   public static final String FIELD_KEYS_TYPE = "type";
   /**
+   * The JSON key string for the expression of a key. it is optional
+   */
+  public static final String FIELD_KEYS_EXPRESSION = "expression";
+  /**
    * The JSON key string for the enumValues of a key.
    */
   public static final String FIELD_KEYS_ENUMVALUES = "enumValues";
@@ -176,6 +186,11 @@ public class DimensionalConfigurationSchema
    */
   public static final String FIELD_VALUES_TYPE = "type";
   /**
+   * The JSON key string for the type of a value.
+   */
+  public static final String FIELD_VALUES_EXPRESSION = "expression";
+
+  /**
    * The JSON key string for the aggregators applied to a value accross all dimension combinations.
    */
   public static final String FIELD_VALUES_AGGREGATIONS = "aggregators";
@@ -184,6 +199,15 @@ public class DimensionalConfigurationSchema
    */
   //TODO To be removed when Malhar Library 3.3 becomes a dependency.
   private static final String FIELD_TAGS = "tags";
+
+  public static final String FIELD_VALUES_AGGREGATOR = "aggregator";
+  //  public static final String FIELD_VALUES_AGGREGATOR_PROPERTY = "property";
+  //  public static final String FIELD_VALUES_AGGREGATOR_PROPERTY_VALUE = "value";
+
+  public static final String PROPERTY_NAME_EMBEDED_AGGREGATOR = "embededAggregator";
+  public static final String PROPERTY_NAME_COUNT = "count";
+  public static final String PROPERTY_NAME_SUB_COMBINATIONS = "subCombinations";
+
   /**
    * The JSON key string for the dimensions section of the schema.
    */
@@ -235,6 +259,13 @@ public class DimensionalConfigurationSchema
    */
   private List<Map<String, Set<String>>> dimensionsDescriptorIDToValueToOTFAggregator;
   /**
+   * This is a map from a dimensions descriptor id to a value to the set of all composite aggregations performed
+   * on that value under the dimensions combination corresponding to that dimensions descriptor id.
+   * it includes the time bucket combination
+   */
+  private List<Map<String, Set<String>>> dimensionsDescriptorIDToValueToCompositeAggregator;
+
+  /**
    * This is a map from a dimensions descriptor id to an aggregator to a {@link FieldsDescriptor} object.
    * This is used internally to build dimensionsDescriptorIDToAggregatorIDToInputAggregatorDescriptor and
    * dimensionsDescriptorIDToAggregatorIDToOutputAggregatorDescriptor in the
@@ -246,6 +277,10 @@ public class DimensionalConfigurationSchema
    */
   private List<Map<String, FieldsDescriptor>> dimensionsDescriptorIDToOTFAggregatorToAggregateDescriptor;
   /**
+   * This is a map from a dimensions descriptor id to an composite aggregator to a {@link FieldsDescriptor} object.
+   */
+  private List<Map<String, FieldsDescriptor>> dimensionsDescriptorIDToCompositeAggregatorToAggregateDescriptor;
+  /**
    * This is a map from a {@link DimensionsDescriptor} to its corresponding dimensions descriptor ID.
    */
   private Map<DimensionsDescriptor, Integer> dimensionsDescriptorToID;
@@ -261,9 +296,11 @@ public class DimensionalConfigurationSchema
   private List<Int2ObjectMap<FieldsDescriptor>> dimensionsDescriptorIDToAggregatorIDToOutputAggregatorDescriptor;
   /**
    * This is a map from the dimensions descriptor id to the list of all aggregations performed on that dimensions
-   * descriptor id.
+   * descriptor id. This list in fact only keep ddID to Incremental Aggregator IDs
    */
-  private List<IntArrayList> dimensionsDescriptorIDToAggregatorIDs;
+  private List<IntArrayList> dimensionsDescriptorIDToIncrementalAggregatorIDs;
+
+  private List<IntArrayList> dimensionsDescriptorIDToCompositeAggregatorIDs;
   /**
    * This is a map from the dimensions descriptor id to field to all the additional value aggregations
    * specified for the dimensions combination.
@@ -271,6 +308,7 @@ public class DimensionalConfigurationSchema
   private List<Map<String, Set<String>>> dimensionsDescriptorIDToFieldToAggregatorAdditionalValues;
   /**
    * This is a map from dimensions descriptor ids to all the keys fields involved in the dimensions combination.
+   * it doesn't includes the time bucket
    */
   private List<Fields> dimensionsDescriptorIDToKeys;
   /**
@@ -313,8 +351,36 @@ public class DimensionalConfigurationSchema
    */
   private List<String> tags;
 
+  /**
+   * A map: aggregate name ==> { property name ==> property value}
+   */
+  protected Map<String, Map<String, String>> aggregatorToProperty;
+  protected List<CustomTimeBucket> customTimeBucketsCombination;
+
   private CustomTimeBucketRegistry customTimeBucketRegistry;
 
+  protected CompositeAggregatorFactory compositeAggregatorFactory = DefaultCompositeAggregatorFactory.defaultInst;
+
+  protected static final String[] COMPOSITE_AGGREGATORS = new String[]{"TOPN", "BOTTOMN"};
+  protected static final Map<String, Set<String>> aggregatorToPropertiesMap = Maps.newHashMap();
+
+  static {
+    Set<String> topBottomProperties =
+        Sets.newHashSet(PROPERTY_NAME_COUNT, PROPERTY_NAME_EMBEDED_AGGREGATOR, PROPERTY_NAME_SUB_COMBINATIONS);
+    aggregatorToPropertiesMap.put("TOPN", topBottomProperties);
+    aggregatorToPropertiesMap.put("BOTTOMN", topBottomProperties);
+  }
+
+  /**
+   * keep the key to expression
+   */
+  private Map<String, String> keyToExpression = Maps.newHashMap();
+
+  /**
+   * keep the aggregate value to expression
+   */
+  private Map<String, String> valueToExpression = Maps.newHashMap();
+
   /**
    * Constructor for serialization.
    */
@@ -387,6 +453,10 @@ public class DimensionalConfigurationSchema
     return aggregatorRegistry;
   }
 
+  /**
+   * @param ddIDToValueToAggregator
+   * @return a list of AggregatorToAggregateDescriptor. namely ddID to AggregatorToAggregateDescriptor
+   */
   private List<Map<String, FieldsDescriptor>> computeAggregatorToAggregateDescriptor(
       List<Map<String, Set<String>>> ddIDToValueToAggregator)
   {
@@ -415,9 +485,17 @@ public class DimensionalConfigurationSchema
       Map<String, FieldsDescriptor> aggregatorToValuesDescriptor = Maps.newHashMap();
 
       for (Map.Entry<String, Set<String>> entry : aggregatorToValues.entrySet()) {
-        aggregatorToValuesDescriptor.put(
-            entry.getKey(),
-            inputValuesDescriptor.getSubset(new Fields(entry.getValue())));
+        final String aggregatorName = entry.getKey();
+        if (isCompositeAggregator(aggregatorName)) {
+          //for composite aggregator, the input type and output type are different
+          aggregatorToValuesDescriptor.put(aggregatorName,
+              AggregatorUtils.getOutputFieldsDescriptor(inputValuesDescriptor.getSubset(new Fields(entry.getValue())),
+              this.getCompositeAggregatorByName(aggregatorName)));
+        } else {
+          aggregatorToValuesDescriptor.put(
+              aggregatorName,
+              inputValuesDescriptor.getSubset(new Fields(entry.getValue())));
+        }
       }
 
       tempDdIDToAggregatorToAggregateDescriptor.add(aggregatorToValuesDescriptor);
@@ -707,6 +785,9 @@ public class DimensionalConfigurationSchema
 
     //buildDDIDAggID
 
+    //composite aggregator are not supported in this method. add empty list to avoid unit test error.
+    dimensionsDescriptorIDToCompositeAggregatorToAggregateDescriptor = Lists.newArrayList();
+
     buildDimensionsDescriptorIDAggregatorIDMaps();
   }
 
@@ -748,6 +829,16 @@ public class DimensionalConfigurationSchema
 
       String keyName = tempKeyDescriptor.getString(FIELD_KEYS_NAME);
       String typeName = tempKeyDescriptor.getString(FIELD_KEYS_TYPE);
+
+      try {
+        String keyExpression = tempKeyDescriptor.getString(FIELD_KEYS_EXPRESSION);
+        if (keyExpression != null) {
+          keyToExpression.put(keyName, keyExpression);
+        }
+      } catch (JSONException e) {
+        //do nothing
+      }
+
       List<String> keyTags = getTags(tempKeyDescriptor);
 
       keyToTags.put(keyName, keyTags);
@@ -769,8 +860,7 @@ public class DimensionalConfigurationSchema
           Object val = valArray.get(valIndex);
           valuesList.add(val);
 
-          Preconditions.checkState(!(val instanceof JSONArray
-              || val instanceof JSONObject),
+          Preconditions.checkState(!(val instanceof JSONArray || val instanceof JSONObject),
               "The value must be a primitive.");
 
           Type currentType = Type.CLASS_TO_TYPE.get(val.getClass());
@@ -782,8 +872,7 @@ public class DimensionalConfigurationSchema
               maxType = currentType;
             } else {
               Preconditions.checkState(currentType.getHigherTypes().contains(maxType),
-                  "Conficting types: " + currentType.getName()
-                  + " cannot be converted to " + maxType.getName());
+                  "Conficting types: " + currentType.getName() + " cannot be converted to " + maxType.getName());
             }
           }
         }
@@ -863,6 +952,8 @@ public class DimensionalConfigurationSchema
     Map<String, Set<String>> allValueToOTFAggregator = Maps.newHashMap();
     Map<String, Set<String>> valueToAggregators = Maps.newHashMap();
     Map<String, Set<String>> valueToOTFAggregators = Maps.newHashMap();
+    Map<String, Set<String>> allValueToCompositeAggregator = Maps.newHashMap();
+    Map<String, Set<String>> valueToCompositeAggregators = Maps.newHashMap();
 
     Map<String, Type> aggFieldToType = Maps.newHashMap();
     JSONArray valuesArray = jo.getJSONArray(FIELD_VALUES);
@@ -875,6 +966,16 @@ public class DimensionalConfigurationSchema
       JSONObject value = valuesArray.getJSONObject(valueIndex);
       String name = value.getString(FIELD_VALUES_NAME);
       String type = value.getString(FIELD_VALUES_TYPE);
+
+      try {
+        String valueExpression = value.getString(FIELD_VALUES_EXPRESSION);
+        if (valueExpression != null) {
+          valueToExpression.put(name, valueExpression);
+        }
+      } catch (JSONException e) {
+        //Do nothing
+      }
+
       List<String> valueTags = getTags(value);
 
       valueToTags.put(name, valueTags);
@@ -892,6 +993,7 @@ public class DimensionalConfigurationSchema
       aggFieldToType.put(name, typeT);
       Set<String> aggregatorSet = Sets.newHashSet();
       Set<String> aggregatorOTFSet = Sets.newHashSet();
+      Set<String> aggregateCompositeSet = Sets.newHashSet();
 
       if (value.has(FIELD_VALUES_AGGREGATIONS)) {
         JSONArray aggregators = value.getJSONArray(FIELD_VALUES_AGGREGATIONS);
@@ -903,59 +1005,38 @@ public class DimensionalConfigurationSchema
         for (int aggregatorIndex = 0;
             aggregatorIndex < aggregators.length();
             aggregatorIndex++) {
-          String aggregatorName = aggregators.getString(aggregatorIndex);
-
-          if (!aggregatorRegistry.isAggregator(aggregatorName)) {
-            throw new IllegalArgumentException(aggregatorName + " is not a valid aggregator.");
-          }
-
-          if (aggregatorRegistry.isIncrementalAggregator(aggregatorName)) {
-            Set<String> aggregatorNames = allValueToAggregator.get(name);
-
-            if (aggregatorNames == null) {
-              aggregatorNames = Sets.newHashSet();
-              allValueToAggregator.put(name, aggregatorNames);
-            }
-
-            aggregatorNames.add(aggregatorName);
 
-            if (!aggregatorSet.add(aggregatorName)) {
-              throw new IllegalArgumentException("An aggregator " + aggregatorName
-                  + " cannot be specified twice for a value");
-            }
-
-            IncrementalAggregator aggregator = aggregatorRegistry.getNameToIncrementalAggregator().get(aggregatorName);
-            aggregatorToType.put(aggregatorName, aggregator.getOutputType(typeT));
+          //the aggrator is not only has name any more, it could be an object or a String
+          //example: {"aggregator":"BOTTOMN","property":"count","value":"20","embededAggregator":"AVG"}
+          String aggregatorType = null;
+          aggregatorType = aggregators.getString(aggregatorIndex);
+          if (isJsonSimpleString(aggregatorType)) {
+            //it's is simple aggregator
+            addNonCompositeAggregator(aggregatorType, allValueToAggregator, allValueToOTFAggregator,
+                name, aggregatorSet, aggregatorToType, typeT, aggregatorOTFSet, true);
           } else {
-            //Check for duplicate on the fly aggregators
-            Set<String> aggregatorNames = allValueToOTFAggregator.get(name);
-
-            if (aggregatorNames == null) {
-              aggregatorNames = Sets.newHashSet();
-              allValueToOTFAggregator.put(name, aggregatorNames);
-            }
-
-            if (!aggregatorNames.add(aggregatorName)) {
-              throw new IllegalArgumentException("An aggregator " + aggregatorName +
-                  " cannot be specified twice for a value");
-            }
-
-            aggregatorOTFSet.add(aggregatorName);
-
-            //Add child aggregators
-            aggregatorNames = allValueToAggregator.get(name);
+            //it is a composite aggragate
+            JSONObject jsonAggregator = aggregators.getJSONObject(aggregatorIndex);
+            aggregatorType = jsonAggregator.getString(FIELD_VALUES_AGGREGATOR);
+            Map<String, Object> propertyNameToValue = getPropertyNameToValue(jsonAggregator, aggregatorType);
+
+            //the steps following is for composite aggregator.
+            if (isCompositeAggregator(aggregatorType)) {
+              String embededAggregatorName = (String)propertyNameToValue.get(PROPERTY_NAME_EMBEDED_AGGREGATOR);
+
+              /**
+               * don't add embed aggregator here as the emebed aggregator is with different dimension as this dimension
+               * maybe haven't created yet.
+               */
+              CompositeAggregator aggregator = addCompositeAggregator(aggregatorType, allValueToCompositeAggregator,
+                  aggregateCompositeSet, name,
+                  embededAggregatorName, propertyNameToValue, aggregatorToType);
 
-            if (aggregatorNames == null) {
-              aggregatorNames = Sets.newHashSet();
-              allValueToAggregator.put(name, aggregatorNames);
+            } else {
+              throw new IllegalArgumentException(
+                  "Unknow aggregator type: " + aggregatorType + ", please check if it valid.");
             }
 
-            OTFAggregator aggregator = aggregatorRegistry.getNameToOTFAggregators().get(aggregatorName);
-            aggregatorNames.addAll(aggregatorRegistry.getOTFAggregatorToIncrementalAggregators().get(aggregatorName));
-            aggregatorSet.addAll(aggregatorRegistry.getOTFAggregatorToIncrementalAggregators().get(aggregatorName));
-            aggregatorToType.put(aggregatorName, aggregator.getOutputType());
-
-            LOG.debug("field name {} and adding aggregator names {}:", name, aggregatorNames);
           }
         }
       }
@@ -964,6 +1045,9 @@ public class DimensionalConfigurationSchema
         valueToAggregators.put(name, aggregatorSet);
         valueToOTFAggregators.put(name, aggregatorOTFSet);
       }
+      if (!aggregateCompositeSet.isEmpty()) {
+        valueToCompositeAggregators.put(name, aggregateCompositeSet);
+      }
     }
 
     LOG.debug("allValueToAggregator {}", allValueToAggregator);
@@ -975,6 +1059,8 @@ public class DimensionalConfigurationSchema
 
     dimensionsDescriptorIDToValueToAggregator = Lists.newArrayList();
     dimensionsDescriptorIDToValueToOTFAggregator = Lists.newArrayList();
+    dimensionsDescriptorIDToValueToCompositeAggregator = Lists.newArrayList();
+
     dimensionsDescriptorIDToKeyDescriptor = Lists.newArrayList();
     dimensionsDescriptorIDToDimensionsDescriptor = Lists.newArrayList();
     dimensionsDescriptorIDToAggregatorToAggregateDescriptor = Lists.newArrayList();
@@ -1039,8 +1125,14 @@ public class DimensionalConfigurationSchema
       JSONObject dimension = dimensionsArray.getJSONObject(dimensionsIndex);
       //Get the key fields of a descriptor
       JSONArray combinationFields = dimension.getJSONArray(FIELD_DIMENSIONS_COMBINATIONS);
+
+      //valueName to IncrementalAggregator
       Map<String, Set<String>> specificValueToAggregator = Maps.newHashMap();
+      //valueName to OTFAggregator
       Map<String, Set<String>> specificValueToOTFAggregator = Maps.newHashMap();
+      //valueName to CompositeAggregator
+      Map<String, Set<String>> specificValueToCompositeAggregator = Maps.newHashMap();
+      //TODO: need a mechanism to check the value name is value.
 
       for (Map.Entry<String, Set<String>> entry : valueToAggregators.entrySet()) {
         Set<String> aggregators = Sets.newHashSet();
@@ -1054,6 +1146,12 @@ public class DimensionalConfigurationSchema
         specificValueToOTFAggregator.put(entry.getKey(), aggregators);
       }
 
+      for (Map.Entry<String, Set<String>> entry : valueToCompositeAggregators.entrySet()) {
+        Set<String> aggregators = Sets.newHashSet();
+        aggregators.addAll(entry.getValue());
+        specificValueToCompositeAggregator.put(entry.getKey(), aggregators);
+      }
+
       List<String> keyNames = Lists.newArrayList();
       //loop through the key fields of a descriptor
       for (int keyIndex = 0;
@@ -1075,7 +1173,7 @@ public class DimensionalConfigurationSchema
       dimensionsDescriptorIDToFieldToAggregatorAdditionalValues.add(fieldToAggregatorAdditionalValues);
 
       Set<CustomTimeBucket> customTimeBucketsCombinationSet = Sets.newHashSet(customTimeBucketsAllSet);
-      List<CustomTimeBucket> customTimeBucketsCombination = Lists.newArrayList(customTimeBucketsAll);
+      customTimeBucketsCombination = Lists.newArrayList(customTimeBucketsAll);
 
       if (dimension.has(DimensionalConfigurationSchema.FIELD_DIMENSIONS_TIME_BUCKETS)) {
         JSONArray timeBuckets = dimension.getJSONArray(DimensionalConfigurationSchema.FIELD_DIMENSIONS_TIME_BUCKETS);
@@ -1116,108 +1214,135 @@ public class DimensionalConfigurationSchema
             additionalValueIndex < additionalValues.length();
             additionalValueIndex++) {
           String additionalValue = additionalValues.getString(additionalValueIndex);
-          String[] components = additionalValue.split(ADDITIONAL_VALUE_SEPERATOR);
-
-          if (components.length != ADDITIONAL_VALUE_NUM_COMPONENTS) {
-            throw new IllegalArgumentException("The number of component values "
-                + "in an additional value must be "
-                + ADDITIONAL_VALUE_NUM_COMPONENTS
-                + " not " + components.length);
-          }
-
-          String valueName = components[ADDITIONAL_VALUE_VALUE_INDEX];
-          String aggregatorName = components[ADDITIONAL_VALUE_AGGREGATOR_INDEX];
 
-          {
-            Set<String> aggregators = fieldToAggregatorAdditionalValues.get(valueName);
+          if (isJsonSimpleString(additionalValue)) {
+            String[] components = additionalValue.split(ADDITIONAL_VALUE_SEPERATOR);
 
-            if (aggregators == null) {
-              aggregators = Sets.newHashSet();
-              fieldToAggregatorAdditionalValues.put(valueName, aggregators);
+            if (components.length != ADDITIONAL_VALUE_NUM_COMPONENTS) {
+              throw new IllegalArgumentException("The number of component values "
+                  + "in an additional value must be "
+                  + ADDITIONAL_VALUE_NUM_COMPONENTS
+                  + " not " + components.length);
             }
 
-            aggregators.add(aggregatorName);
-          }
-
-          if (!aggregatorRegistry.isAggregator(aggregatorName)) {
-            throw new IllegalArgumentException(aggregatorName + " is not a valid aggregator.");
-          }
+            String valueName = components[ADDITIONAL_VALUE_VALUE_INDEX];
+            verifyValueDefined(valueName, aggFieldToType.keySet());
 
-          if (aggregatorRegistry.isIncrementalAggregator(aggregatorName)) {
-            Set<String> aggregatorNames = allValueToAggregator.get(valueName);
-
-            if (aggregatorNames == null) {
-              aggregatorNames = Sets.newHashSet();
-              allValueToAggregator.put(valueName, aggregatorNames);
-            }
+            String aggregatorName = components[ADDITIONAL_VALUE_AGGREGATOR_INDEX];
 
-            aggregatorNames.add(aggregatorName);
+            {
+              Set<String> aggregators = fieldToAggregatorAdditionalValues.get(valueName);
 
-            Set<String> aggregators = specificValueToAggregator.get(valueName);
+              if (aggregators == null) {
+                aggregators = Sets.newHashSet();
+                fieldToAggregatorAdditionalValues.put(valueName, aggregators);
+              }
 
-            if (aggregators == null) {
-              aggregators = Sets.newHashSet();
-              specificValueToAggregator.put(valueName, aggregators);
+              aggregators.add(aggregatorName);
             }
 
-            if (aggregators == null) {
-              throw new IllegalArgumentException("The additional value " + additionalValue
-                  + "Does not have a corresponding value " + valueName
-                  + " defined in the " + FIELD_VALUES + " section.");
+            if (!aggregatorRegistry.isAggregator(aggregatorName)) {
+              throw new IllegalArgumentException(aggregatorName + " is not a valid aggregator.");
             }
 
-            if (!aggregators.add(aggregatorName)) {
-              throw new IllegalArgumentException("The aggregator " + aggregatorName
-                  + " was already defined in the " + FIELD_VALUES
-                  + " section for the value " + valueName);
-            }
-          } else {
-            //Check for duplicate on the fly aggregators
-            Set<String> aggregatorNames = specificValueToOTFAggregator.get(valueName);
+            if (aggregatorRegistry.isIncrementalAggregator(aggregatorName)) {
+              Set<String> aggregatorNames = allValueToAggregator.get(valueName);
+
+              if (aggregatorNames == null) {
+                aggregatorNames = Sets.newHashSet();
+                allValueToAggregator.put(valueName, aggregatorNames);
+              }
+
+              aggregatorNames.add(aggregatorName);
+
+              Set<String> aggregators = specificValueToAggregator.get(valueName);
+              if (aggregators == null) {
+                aggregators = Sets.newHashSet();
+                specificValueToAggregator.put(valueName, aggregators);
+              }
+
+              if (!aggregators.add(aggregatorName)) {
+                throw new IllegalArgumentException("The aggregator " + aggregatorName
+                    + " was already defined in the " + FIELD_VALUES
+                    + " section for the value " + valueName);
+              }
+            } else {
+              //Check for duplicate on the fly aggregators
+              Set<String> aggregatorNames = specificValueToOTFAggregator.get(valueName);
 
-            if (aggregatorNames == null) {
-              aggregatorNames = Sets.newHashSet();
-              specificValueToOTFAggregator.put(valueName, aggregatorNames);
-            }
+              if (aggregatorNames == null) {
+                aggregatorNames = Sets.newHashSet();
+                specificValueToOTFAggregator.put(valueName, aggregatorNames);
+              }
 
-            if (!aggregatorNames.add(aggregatorName)) {
-              throw new IllegalArgumentException("The aggregator " + aggregatorName +
-                  " cannot be specified twice for the value " + valueName);
-            }
+              if (!aggregatorNames.add(aggregatorName)) {
+                throw new IllegalArgumentException("The aggregator " + aggregatorName +
+                    " cannot be specified twice for the value " + valueName);
+              }
 
-            aggregatorNames = allValueToOTFAggregator.get(valueName);
+              aggregatorNames = allValueToOTFAggregator.get(valueName);
 
-            if (aggregatorNames == null) {
-              aggregatorNames = Sets.newHashSet();
-              allValueToOTFAggregator.put(valueName, aggregatorNames);
-            }
+              if (aggregatorNames == null) {
+                aggregatorNames = Sets.newHashSet();
+                allValueToOTFAggregator.put(valueName, aggregatorNames);
+              }
 
-            if (!aggregatorNames.add(aggregatorName)) {
-              throw new IllegalArgumentException("The aggregator " + aggregatorName +
-                  " cannot be specified twice for the value " + valueName);
-            }
+              if (!aggregatorNames.add(aggregatorName)) {
+                throw new IllegalArgumentException("The aggregator " + aggregatorName +
+                    " cannot be specified twice for the value " + valueName);
+              }
 
-            //
+              Set<String> aggregators = specificValueToAggregator.get(valueName);
 
-            Set<String> aggregators = specificValueToAggregator.get(valueName);
+              if (aggregators == null) {
+                aggregators = Sets.newHashSet();
+                specificValueToAggregator.put(valueName, aggregators);
+              }
 
-            if (aggregators == null) {
-              aggregators = Sets.newHashSet();
-              specificValueToAggregator.put(valueName, aggregators);
-            }
+              if (aggregators == null) {
+                throw new IllegalArgumentException("The additional value " + additionalValue
+                    + "Does not have a corresponding value " + valueName
+                    + " defined in the " + FIELD_VALUES + " section.");
+              }
 
-            if (aggregators == null) {
-              throw new IllegalArgumentException("The additional value " + additionalValue
-                  + "Does not have a corresponding value " + valueName
-                  + " defined in the " + FIELD_VALUES + " section.");
+              aggregators.addAll(aggregatorRegistry.getOTFAggregatorToIncrementalAggregators().get(aggregatorName));
+            }
+          } else {
+            //it is a composite aggragate
+            JSONObject jsonAddition = additionalValues.getJSONObject(additionalValueIndex);
+            String valueName = (String)jsonAddition.keys().next();
+            verifyValueDefined(valueName, aggFieldToType.keySet());
+
+            JSONObject jsonAggregator = jsonAddition.getJSONObject(valueName);
+            String aggregatorName = jsonAggregator.getString(FIELD_VALUES_AGGREGATOR);
+            Map<String, Object> propertyNameToValue = getPropertyNameToValue(jsonAggregator, aggregatorName);
+
+            //the steps following is for composite aggregator.
+            if (isCompositeAggregator(aggregatorName)) {
+              String embededAggregatorName = (String)propertyNameToValue.get(PROPERTY_NAME_EMBEDED_AGGREGATOR);
+
+              /**
+               * don't add embed aggregator here as the emebed aggregator is with different dimension as this dimension
+               * maybe haven't created yet. the subCombination should be part of the combination
+               */
+              Set<String> compositeAggregators = specificValueToCompositeAggregator.get(valueName);
+              if (compositeAggregators == null) {
+                compositeAggregators = Sets.newHashSet();
+                specificValueToCompositeAggregator.put(valueName, compositeAggregators);
+              }
+              CompositeAggregator aggregator = addCompositeAggregator(aggregatorName, allValueToCompositeAggregator,
+                  compositeAggregators,
+                  valueName, embededAggregatorName, propertyNameToValue, null);
+            } else {
+              throw new IllegalArgumentException(
+                  "Unknow aggregator name: " + aggregatorName + ", please check if it valid.");
             }
-
-            aggregators.addAll(aggregatorRegistry.getOTFAggregatorToIncrementalAggregators().get(aggregatorName));
           }
+
         }
       }
 
-      if (specificValueToAggregator.isEmpty()) {
+      if (specificValueToAggregator.isEmpty() && specificValueToCompositeAggregator.isEmpty()) {
         throw new IllegalArgumentException("No aggregations defined for the " +
             "following field combination " +
             combinationFields.toString());
@@ -1226,6 +1351,7 @@ public class DimensionalConfigurationSchema
       for (CustomTimeBucket customTimeBucket : customTimeBucketsCombination) {
         dimensionsDescriptorIDToValueToAggregator.add(specificValueToAggregator);
         dimensionsDescriptorIDToValueToOTFAggregator.add(specificValueToOTFAggregator);
+        dimensionsDescriptorIDToValueToCompositeAggregator.add(specificValueToCompositeAggregator);
       }
     }
 
@@ -1243,18 +1369,21 @@ public class DimensionalConfigurationSchema
       }
     }
 
-    //DD ID To Aggregator To Aggregate Descriptor
+    //compute addition dimension and aggregator for composite aggregator
+    computeAdditionalDimensionForCompositeAggregators();
 
+    //DD ID To Aggregator To Aggregate Descriptor
     dimensionsDescriptorIDToAggregatorToAggregateDescriptor = computeAggregatorToAggregateDescriptor(
         dimensionsDescriptorIDToValueToAggregator);
 
     //DD ID To OTF Aggregator To Aggregator Descriptor
-
     dimensionsDescriptorIDToOTFAggregatorToAggregateDescriptor = computeAggregatorToAggregateDescriptor(
         dimensionsDescriptorIDToValueToOTFAggregator);
 
-    //Dimensions Descriptor To ID
+    dimensionsDescriptorIDToCompositeAggregatorToAggregateDescriptor = computeAggregatorToAggregateDescriptor(
+        dimensionsDescriptorIDToValueToCompositeAggregator);
 
+    //Dimensions Descriptor To ID
     dimensionsDescriptorToID = Maps.newHashMap();
 
     for (int index = 0;
@@ -1264,8 +1393,64 @@ public class DimensionalConfigurationSchema
     }
 
     //Build id maps
-
     buildDimensionsDescriptorIDAggregatorIDMaps();
+
+    aggregatorRegistry.buildTopBottomAggregatorIDToAggregator();
+
+    //fulfill the embed ddids of composite aggregators
+    fulfillCompositeAggregatorExtraInfo();
+  }
+
+  protected Map<String, Object> getPropertyNameToValue(JSONObject jsonAggregator, String aggregatorName)
+      throws JSONException
+  {
+    //aggregatorName = jsonAggregator.getString(FIELD_VALUES_AGGREGATOR);
+
+    Set<String> propertyNames = aggregatorToPropertiesMap.get(aggregatorName);
+
+    if (propertyNames == null) {
+      return Collections.emptyMap();
+    }
+
+    Map<String, Object> propertyNameToValue = Maps.newHashMap();
+    for (String propertyName : propertyNames) {
+      String propertyValue = jsonAggregator.getString(propertyName);
+      if (propertyValue == null) {
+        continue;
+      }
+
+      if (isJsonSimpleString(propertyValue)) {
+        propertyNameToValue.put(propertyName, propertyValue);
+      } else {
+        JSONArray propertyValues = jsonAggregator.getJSONArray(propertyName);
+        if (propertyValues != null) {
+          final int valueLength = propertyValues.length();
+          String[] values = new String[valueLength];
+          for (int i = 0; i < valueLength; ++i) {
+            values[i] = propertyValues.getString(i);
+          }
+          propertyNameToValue.put(propertyName, values);
+        }
+      }
+    }
+    return propertyNameToValue;
+  }
+
+  /**
+   * The composite aggregator is not only aggregator type.
+   *
+   * @param aggregatorName
+   * @return
+   */
+  protected boolean isCompositeAggregator(String aggregatorName)
+  {
+    aggregatorName = aggregatorName.split("-")[0];
+    for (int index = 0; index < COMPOSITE_AGGREGATORS.length; ++index) {
+      if (COMPOSITE_AGGREGATORS[index].equals(aggregatorName)) {
+        return true;
+      }
+    }
+    return false;
   }
 
   /**
@@ -1287,6 +1472,350 @@ public class DimensionalConfigurationSchema
   }
 
   /**
+   * The composite aggregators could add additional dimensions and aggregators.
+   * This method parse all composite aggregators to genereate additional dimensions and aggregators(embeded
+   * aggregators).
+   */
+  protected void computeAdditionalDimensionForCompositeAggregators()
+  {
+    //NOTES: dimensionsDescriptorIDToKeys doesn't have time bucket combination while
+    // dimensionsDescriptorIDToValueToCompositeAggregator has.
+    //the fieldsCombinations generated just check if the keys already existed.
+    Map<Set<String>, Integer> keysToCombinationId = getKeysToCombinationId();
+
+    //the dimensionsDescriptorIDToKeys will be change during the process, only to go through the initial for
+    // composite aggregator.
+    final int initialKeysCombinationsSize = dimensionsDescriptorIDToKeys.size();
+    for (int keysIndex = 0; keysIndex < initialKeysCombinationsSize; ++keysIndex) {
+      Set<String> keys = dimensionsDescriptorIDToKeys.get(keysIndex).getFields();
+
+      int ddId = keysIndex * customTimeBucketsCombination.size();
+      Map<String, Set<String>> valueToAggregators = dimensionsDescriptorIDToValueToCompositeAggregator.get(ddId);
+      Map<String, Set<String>> compositeAggregatorToValues = getAggregatorToValues(valueToAggregators);
+
+      // for the embed aggregator, the value is composite's value; the key is composite's key combined with
+      // subCombination
+      for (Map.Entry<String, Set<String>> aggregatorToValuesEntry : compositeAggregatorToValues.entrySet()) {
+        AbstractTopBottomAggregator aggregator = getCompositeAggregatorByName(aggregatorToValuesEntry.getKey());
+        Set<String> subCombination = aggregator.getSubCombinations();
+        addSubKeysAndAggregator(aggregatorToValuesEntry.getValue(), keys, subCombination,
+            aggregator.getEmbedAggregatorName(), keysToCombinationId);
+      }
+    }
+  }
+
+  protected Map<String, Set<String>> getAggregatorToValues(Map<String, Set<String>> valueToAggregators)
+  {
+    Map<String, Set<String>> aggregatorToValues = Maps.newHashMap();
+    for (Map.Entry<String, Set<String>> entry : valueToAggregators.entrySet()) {
+      for (String aggregator : entry.getValue()) {
+        Set<String> values = aggregatorToValues.get(aggregator);
+        if (values == null) {
+          values = Sets.newHashSet();
+          aggregatorToValues.put(aggregator, values);
+        }
+        values.add(entry.getKey());
+      }
+    }
+    return aggregatorToValues;
+  }
+
+  protected AbstractTopBottomAggregator getCompositeAggregatorByName(String compositeAggregatorName)
+  {
+    return aggregatorRegistry.getNameToTopBottomAggregator().get(compositeAggregatorName);
+  }
+
+  /**
+   * NOTES: dimensionsDescriptorIDToKeys doesn't have time bucket combination, so one key set should only index
+   *
+   * @return map from field keys to CombinationId
+   */
+  protected Map<Set<String>, Integer> getKeysToCombinationId()
+  {
+    Map<Set<String>, Integer> keysToDdid = Maps.newHashMap();
+    for (int index = 0; index < dimensionsDescriptorIDToKeys.size(); ++index) {
+      Set<String> keys = Sets.newHashSet();
+      keys.addAll(dimensionsDescriptorIDToKeys.get(index).getFieldsList());
+
+      Integer orgIndex = keysToDdid.put(keys, index);
+      if (orgIndex != null) {
+        throw new RuntimeException("The keys" + keys + "already have a index " + index + " associated with it.");
+      }
+    }
+    return keysToDdid;
+  }
+
+  /**
+   * add sub-keys and aggregtor to the dimension description.
+   * add the aggregator to the dimension if it already existed, else add new dimension
+   * precondition: neither keys nor subKeys are empty. Keys empty means all keys, composite aggregator should not
+   * apply to this case
+   * NOTES: keep the data integration
+   *
+   * @param values
+   * @param keysOfCompositeAggregator
+   * @param subKeys
+   * @param aggregatorName            the name of the aggregator. it should be an incremental aggregator only
+   * @param keysToCombinationId       keep the fields combinations, don't include time bucket combination.
+   */
+  protected void addSubKeysAndAggregator(Set<String> values, Set<String> keysOfCompositeAggregator, Set<String> subKeys,
+      String aggregatorName, Map<Set<String>, Integer> keysToCombinationId)
+  {
+    if (keysOfCompositeAggregator == null || subKeys == null || keysOfCompositeAggregator.isEmpty() ||
+        subKeys.isEmpty()) {
+      throw new IllegalArgumentException("Both keys and subKeys can't be null or empty");
+    }
+
+    Set<String> allKeys = Sets.newHashSet();
+    allKeys.addAll(keysOfCompositeAggregator);
+    allKeys.addAll(subKeys);
+    if (allKeys.size() != keysOfCompositeAggregator.size() + subKeys.size()) {
+      throw new IllegalArgumentException(
+          "Should NOT have overlap between keys " + keysOfCompositeAggregator.toString() + " and subKeys " + subKeys);
+    }
+
+    Integer combinationId = keysToCombinationId.get(allKeys);
+    if (combinationId == null) {
+      //this fields combination not existed yet, add new dimension
+      //dimensionsDescriptorIDToKeys don't keep the time bucket combination.
+      if (dimensionsDescriptorIDToKeys.add(new Fields(allKeys))) {
+        combinationId = dimensionsDescriptorIDToKeys.size() - 1;
+      } else {
+        throw new RuntimeException("The keys " + allKeys + " already existed.");
+      }
+
+      keysToCombinationId.put(allKeys, combinationId);
+      addValueToAggregatorToCombination(values, allKeys, aggregatorName);
+    } else {
+      //if the combination existed, check the aggregator and add the aggregator if not added.
+      Set<String> incrementalAggregatorNames;
+      boolean isOTFAggregator = false;
+      if (!isIncrementalAggregator(aggregatorName)) {
+        //For OTF aggregator, need to and its depended incremental aggregators
+        incrementalAggregatorNames = getOTFDependedIncrementalAggregatorNames(aggregatorName);
+        isOTFAggregator = true;
+      } else {
+        incrementalAggregatorNames = Sets.newHashSet();
+        incrementalAggregatorNames.add(aggregatorName);
+      }
+
+      Map<String, Set<String>> newValueToIncrementalAggregators = Maps.newHashMap();
+      Map<String, Set<String>> newValueToOTFAggregators = Maps.newHashMap();
+      for (String value : values) {
+        newValueToIncrementalAggregators.put(value, incrementalAggregatorNames);
+        if (isOTFAggregator) {
+          newValueToOTFAggregators.put(value, Sets.newHashSet(aggregatorName));
+        }
+      }
+
+      int ddid = combinationId * customTimeBucketsCombination.size();
+      for (int index = 0; index < customTimeBucketsCombination.size(); ++index, ++ddid) {
+        //for incremental aggregator, newValueToOTFAggregators is empty;
+        //for OTF, both newValueToIncrementalAggregators and newValueToOTFAggregators should be merged.
+        mergeMaps(dimensionsDescriptorIDToValueToAggregator.get(ddid), newValueToIncrementalAggregators);
+        mergeMaps(dimensionsDescriptorIDToValueToOTFAggregator.get(ddid), newValueToOTFAggregators);
+      }
+    }
+  }
+
+  /**
+   * @param values         the fields of value to be computed
+   * @param allKeys        the allKeys represents an combination
+   * @param aggregatorName the name of the aggregator(incremental or OTF) to be added to this combination
+   */
+  protected void addValueToAggregatorToCombination(Set<String> values, Set<String> allKeys, String aggregatorName)
+  {
+    Map<String, Set<String>> valueToIncrementalAggregators = Maps.newHashMap();
+    Map<String, Set<String>> valueToOTFAggregators = Maps.newHashMap();
+
+    Set<String> incrementalAggregatorNames;
+    boolean isOTFAggregator = false;
+    if (!isIncrementalAggregator(aggregatorName)) {
+      //For OTF aggregator, need to and its depended incremental aggregators
+      incrementalAggregatorNames = getOTFDependedIncrementalAggregatorNames(aggregatorName);
+      isOTFAggregator = true;
+    } else {
+      incrementalAggregatorNames = Sets.newHashSet();
+      incrementalAggregatorNames.add(aggregatorName);
+    }
+    for (String value : values) {
+      valueToIncrementalAggregators.put(value, incrementalAggregatorNames);
+      if (isOTFAggregator) {
+        valueToOTFAggregators.put(value, Sets.newHashSet(aggregatorName));
+      }
+    }
+
+    for (CustomTimeBucket customTimeBucket : customTimeBucketsCombination) {
+      dimensionsDescriptorIDToValueToAggregator.add(valueToIncrementalAggregators);
+      dimensionsDescriptorIDToValueToOTFAggregator.add(valueToOTFAggregators);
+
+      //add empty information just for alignment
+      dimensionsDescriptorIDToValueToCompositeAggregator.add(Collections.<String, Set<String>>emptyMap());
+      dimensionsDescriptorIDToFieldToAggregatorAdditionalValues.add(Collections.<String, Set<String>>emptyMap());
+
+      //key descriptor
+      DimensionsDescriptor dimensionsDescriptor = new DimensionsDescriptor(customTimeBucket, new Fields(allKeys));
+      dimensionsDescriptorIDToDimensionsDescriptor.add(dimensionsDescriptor);
+      dimensionsDescriptorIDToKeyDescriptor.add(dimensionsDescriptor.createFieldsDescriptor(keyDescriptor));
+    }
+  }
+
+  protected boolean isIncrementalAggregator(String aggregatorName)
+  {
+    return aggregatorRegistry.getNameToIncrementalAggregator().get(aggregatorName) != null;
+  }
+
+  protected boolean isOTFAggregator(String aggregatorName)
+  {
+    return aggregatorRegistry.getNameToOTFAggregators().get(aggregatorName) != null;
+  }
+
+  protected Set<String> getOTFDependedIncrementalAggregatorNames(String oftAggregatorName)
+  {
+    return Sets.newHashSet(
+        aggregatorRegistry.getOTFAggregatorToIncrementalAggregators().get(oftAggregatorName).iterator());
+  }
+
+  /**
+   * The value must be defined in the FIELD_VALUES section.
+   * The additional section don't have value type information add can't define value.
+   *
+   * @param valueName
+   * @param valueNames
+   */
+  protected void verifyValueDefined(String valueName, Set<String> valueNames)
+  {
+    if (valueNames == null || !valueNames.contains(valueName)) {
+      throw new IllegalArgumentException("The additional value " + valueName + "Does not have a corresponding value "
+          + valueName + " defined in the " + FIELD_VALUES + " section.");
+    }
+  }
+
+  protected boolean isJsonSimpleString(String string)
+  {
+    return !string.contains("{") && !string.contains("[");
+  }
+
+  protected Object addNonCompositeAggregator(
+      String aggregatorName,
+      Map<String, Set<String>> allValueToAggregator,
+      Map<String, Set<String>> allValueToOTFAggregator,
+      String valueName,
+      Set<String> aggregatorSet,
+      Map<String, Type> aggregatorToType,
+      Type typeT,
+      Set<String> aggregatorOTFSet,
+      boolean checkDuplicate
+  )
+  {
+    if (aggregatorRegistry.isIncrementalAggregator(aggregatorName)) {
+      Set<String> aggregatorNames = allValueToAggregator.get(valueName);
+
+      if (aggregatorNames == null) {
+        aggregatorNames = Sets.newHashSet();
+        allValueToAggregator.put(valueName, aggregatorNames);
+      }
+
+      aggregatorNames.add(aggregatorName);
+
+      if (!aggregatorSet.add(aggregatorName) && checkDuplicate) {
+        throw new IllegalArgumentException("An aggregator " + aggregatorName
+            + " cannot be specified twice for a value");
+      }
+
+      IncrementalAggregator aggregator = aggregatorRegistry.getNameToIncrementalAggregator().get(aggregatorName);
+      aggregatorToType.put(aggregatorName, aggregator.getOutputType(typeT));
+      return aggregator;
+
+    }
+
+    if (aggregatorRegistry.isOTFAggregator(aggregatorName)) {
+      //Check for duplicate on the fly aggregators
+      Set<String> aggregatorNames = allValueToOTFAggregator.get(valueName);
+
+      if (aggregatorNames == null) {
+        aggregatorNames = Sets.newHashSet();
+        allValueToOTFAggregator.put(valueName, aggregatorNames);
+      }
+
+      if (!aggregatorNames.add(aggregatorName) && checkDuplicate) {
+        throw new IllegalArgumentException("An aggregator " + aggregatorName +
+            " cannot be specified twice for a value");
+      }
+
+      aggregatorOTFSet.add(aggregatorName);
+
+      //Add child aggregators
+      aggregatorNames = allValueToAggregator.get(valueName);
+
+      if (aggregatorNames == null) {
+        aggregatorNames = Sets.newHashSet();
+        allValueToAggregator.put(valueName, aggregatorNames);
+      }
+
+      OTFAggregator aggregator = aggregatorRegistry.getNameToOTFAggregators().get(aggregatorName);
+      aggregatorNames.addAll(aggregatorRegistry.getOTFAggregatorToIncrementalAggregators().get(aggregatorName));
+      aggregatorSet.addAll(aggregatorRegistry.getOTFAggregatorToIncrementalAggregators().get(aggregatorName));
+      aggregatorToType.put(aggregatorName, aggregator.getOutputType());
+      LOG.debug("field name {} and adding aggregator names {}:", valueName, aggregatorNames);
+
+      return aggregator;
+    }
+
+    throw new IllegalArgumentException(aggregatorName + " is not a valid non-composit aggregator.");
+  }
+
+  protected CompositeAggregator addCompositeAggregator(
+      String aggregatorType,
+      Map<String, Set<String>> allValueToCompositeAggregator,
+      Set<String> aggregateCompositeSet,
+      String valueName,
+      String embededAggregatorName,
+      Map<String, Object> properties,
+      Map<String, Type> aggregatorToType
+  )
+  {
+    if (!aggregatorRegistry.isTopBottomAggregatorType(aggregatorType)) {
+      throw new IllegalArgumentException(aggregatorType + " is not a valid composite aggregator.");
+    }
+
+    final String aggregatorName = compositeAggregatorFactory.getCompositeAggregatorName(aggregatorType,
+        embededAggregatorName, properties);
+    final CompositeAggregator aggregator = compositeAggregatorFactory.createCompositeAggregator(aggregatorType,
+        embededAggregatorName, properties);
+
+    //Check for duplicate
+    Set<String> aggregatorNames = allValueToCompositeAggregator.get(valueName);
+
+    if (aggregatorNames == null) {
+      aggregatorNames = Sets.newHashSet();
+      allValueToCompositeAggregator.put(valueName, aggregatorNames);
+    }
+
+    if (!aggregatorNames.add(aggregatorName)) {
+      throw new IllegalArgumentException("An aggregator " + aggregatorName +
+          " cannot be specified twice for value '" + valueName + "'");
+    }
+    allValueToCompositeAggregator.put(valueName, aggregatorNames);
+
+    aggregateCompositeSet.add(aggregatorName);
+
+    //we don't know how to handle a generic composite aggregator, handle Top/Bottom aggregator here only
+    //Add aggregator to the repository
+    if (aggregator instanceof AbstractTopBottomAggregator) {
+      aggregatorRegistry.getNameToTopBottomAggregator().put(aggregatorName, (AbstractTopBottomAggregator)aggregator);
+    }
+
+    LOG.debug("field name {} and adding aggregator names {}:", valueName, aggregatorNames);
+
+    if (aggregatorToType != null) {
+      aggregatorToType.put(aggregatorName, aggregator.getOutputType());
+    }
+
+    return aggregator;
+  }
+
+  /**
    * This is a helper method which retrieves the schema tags from the {@link JSONObject} if they are present.
    *
    * @param jo The {@link JSONObject} to retrieve schema tags from.
@@ -1327,9 +1856,14 @@ public class DimensionalConfigurationSchema
     return newCombinations;
   }
 
-  private void buildDimensionsDescriptorIDAggregatorIDMaps()
+  /**
+   * Precondition: all depended aggregators( for example AVG depended on SUM and COUNT, Composite Aggregators
+   * depended on embed aggregators )
+   * should already solved. This function will not handle this dependencies.
+   */
+  protected void buildDimensionsDescriptorIDAggregatorIDMaps()
   {
-    dimensionsDescriptorIDToAggregatorIDs = Lists.newArrayList();
+    dimensionsDescriptorIDToIncrementalAggregatorIDs = Lists.newArrayList();
     dimensionsDescriptorIDToAggregatorIDToInputAggregatorDescriptor = Lists.newArrayList();
     dimensionsDescriptorIDToAggregatorIDToOutputAggregatorDescriptor = Lists.newArrayList();
 
@@ -1340,24 +1874,162 @@ public class DimensionalConfigurationSchema
       Int2ObjectMap<FieldsDescriptor> inputMap = new Int2ObjectOpenHashMap<>();
       Int2ObjectMap<FieldsDescriptor> outputMap = new Int2ObjectOpenHashMap<>();
 
-      dimensionsDescriptorIDToAggregatorIDs.add(aggIDList);
+      dimensionsDescriptorIDToIncrementalAggregatorIDs.add(aggIDList);
       dimensionsDescriptorIDToAggregatorIDToInputAggregatorDescriptor.add(inputMap);
       dimensionsDescriptorIDToAggregatorIDToOutputAggregatorDescriptor.add(outputMap);
 
       for (Map.Entry<String, FieldsDescriptor> entry :
           dimensionsDescriptorIDToAggregatorToAggregateDescriptor.get(index).entrySet()) {
+        buildNonCompositeAggregatorIDMap(entry.getKey(), entry.getValue(), aggIDList, inputMap, outputMap);
+      }
+    }
+
+    //get the max aggregator id for generating the composite aggregator id
+    int maxAggregatorID = getLargestNonCompositeAggregatorID();
+
+    //assign aggregatorID to composite aggregators
+    dimensionsDescriptorIDToCompositeAggregatorIDs = Lists.newArrayList();
+    for (int index = 0;
+        index < dimensionsDescriptorIDToCompositeAggregatorToAggregateDescriptor.size();
+        index++) {
+      IntArrayList aggIDList = new IntArrayList();
+      //NOTE: share same map with incremental aggreator. As the input FD and output FD will be get from aggregatorID,
+      // so it should be ok to share same map.
+      Int2ObjectMap<FieldsDescriptor> inputMap = dimensionsDescriptorIDToAggregatorIDToInputAggregatorDescriptor.get(
+          index);
+      Int2ObjectMap<FieldsDescriptor> outputMap = dimensionsDescriptorIDToAggregatorIDToOutputAggregatorDescriptor.get(
+          index);
+
+      dimensionsDescriptorIDToCompositeAggregatorIDs.add(aggIDList);
+
+      for (Map.Entry<String, FieldsDescriptor> entry :
+          dimensionsDescriptorIDToCompositeAggregatorToAggregateDescriptor.get(index).entrySet()) {
         String aggregatorName = entry.getKey();
         FieldsDescriptor inputDescriptor = entry.getValue();
-        IncrementalAggregator incrementalAggregator = aggregatorRegistry.getNameToIncrementalAggregator().get(
+        AbstractCompositeAggregator compositeAggregator = aggregatorRegistry.getNameToTopBottomAggregator().get(
             aggregatorName);
-        int aggregatorID = aggregatorRegistry.getIncrementalAggregatorNameToID().get(aggregatorName);
+
+        //simple use ++ to assign aggregator id
+        int aggregatorID;
+        Integer objAggregatorID = aggregatorRegistry.getTopBottomAggregatorNameToID().get(aggregatorName);
+        if (objAggregatorID == null) {
+          aggregatorID = ++maxAggregatorID;
+          aggregatorRegistry.getTopBottomAggregatorNameToID().put(aggregatorName, aggregatorID);
+        } else {
+          aggregatorID = objAggregatorID;
+        }
         aggIDList.add(aggregatorID);
         inputMap.put(aggregatorID, inputDescriptor);
-        outputMap.put(aggregatorID,
-            AggregatorUtils.getOutputFieldsDescriptor(inputDescriptor,
-            incrementalAggregator));
+        //buildNonCompositeAggregatorIDMap(getEmbededAggregatorName(aggregatorName), entry.getValue(), aggIDList,
+        // inputMap, outputMap);
+
+        outputMap.put(aggregatorID, AggregatorUtils.getOutputFieldsDescriptor(inputDescriptor, compositeAggregator));
+      }
+    }
+  }
+
+  protected int getLargestNonCompositeAggregatorID()
+  {
+    int maxAggregatorID = 0;
+    Collection<Integer> aggregatorIDs = aggregatorRegistry.getIncrementalAggregatorNameToID().values();
+    for (int aggregatorID : aggregatorIDs) {
+      if (aggregatorID > maxAggregatorID) {
+        maxAggregatorID = aggregatorID;
       }
     }
+    return maxAggregatorID;
+  }
+
+  protected void buildNonCompositeAggregatorIDMap(String aggregatorName, FieldsDescriptor inputDescriptor,
+      IntArrayList aggIDList, Int2ObjectMap<FieldsDescriptor> inputMap, Int2ObjectMap<FieldsDescriptor> outputMap)
+  {
+    IncrementalAggregator incrementalAggregator = aggregatorRegistry.getNameToIncrementalAggregator().get(
+        aggregatorName);
+    //don't need to build OTF aggregate
+    if (incrementalAggregator == null) {
+      return;
+    }
+    int aggregatorID = aggregatorRegistry.getIncrementalAggregatorNameToID().get(aggregatorName);
+    mergeAggregatorID(aggIDList, aggregatorID);
+    inputMap.put(aggregatorID, inputDescriptor);
+    outputMap.put(aggregatorID,
+        AggregatorUtils.getOutputFieldsDescriptor(inputDescriptor,
+        incrementalAggregator));
+  }
+
+  /**
+   * fulfill the embed ddids of composite aggregators
+   * get the dimensional descriptor of composite aggregator; genereate field keys of embed aggregator
+   */
+  protected void fulfillCompositeAggregatorExtraInfo()
+  {
+    Map<Set<String>, Integer> keysToCombinationId = getKeysToCombinationId();
+    final int timeBucketSize = customTimeBuckets.size();
+
+    for (int index = 0; index < dimensionsDescriptorIDToCompositeAggregatorToAggregateDescriptor.size(); ++index) {
+      Map<String, FieldsDescriptor> compositeAggregatorNameToDescriptor =
+          dimensionsDescriptorIDToCompositeAggregatorToAggregateDescriptor.get(index);
+      for (String compositeAggregatorName : compositeAggregatorNameToDescriptor.keySet()) {
+        AbstractTopBottomAggregator compositeAggregator = aggregatorRegistry.getNameToTopBottomAggregator().get(
+            compositeAggregatorName);
+
+        //set DimensionDescriptorID
+        compositeAggregator.setDimensionDescriptorID(index);
+
+        //aggregator id
+        compositeAggregator.setAggregatorID(
+            aggregatorRegistry.getTopBottomAggregatorNameToID().get(compositeAggregatorName));
+
+        //keys for embed aggregator
+        Set<String> keys = Sets.newHashSet();
+        DimensionsDescriptor dd = dimensionsDescriptorIDToDimensionsDescriptor.get(index);
+
+        keys.addAll(dd.getFields().getFieldsList());
+
+        {
+          Set<String> compositeKeys = Sets.newHashSet();
+          compositeKeys.addAll(keys);
+          compositeAggregator.setFields(compositeKeys);
+
+          compositeAggregator.setAggregateDescriptor(compositeAggregatorNameToDescriptor.get(compositeAggregatorName));
+        }
+
+        keys.addAll(compositeAggregator.getSubCombinations());
+
+        Integer combinationId = keysToCombinationId.get(keys);
+        if (combinationId == null) {
+          throw new RuntimeException("Can't find combination id for keys: " + keys);
+        }
+        for (int ddid = combinationId * timeBucketSize; ddid < (combinationId + 1) * timeBucketSize; ++ddid) {
+          compositeAggregator.addEmbedAggregatorDdId(ddid);
+        }
+      }
+    }
+  }
+
+  protected String getEmbededAggregatorName(String compositeAggregatorName)
+  {
+    try {
+      return compositeAggregatorName.split("-")[1];
+    } catch (Exception e) {
+      throw new RuntimeException("Invalid Composite Aggregator Name: " + compositeAggregatorName);
+    }
+  }
+
+  /**
+   * add the aggregatorID into list if not existed
+   *
+   * @param aggIDList
+   * @param aggregatorID
+   */
+  protected void mergeAggregatorID(IntArrayList aggIDList, int aggregatorID)
+  {
+    for (int index = 0; index < aggIDList.size(); ++index) {
+      if (aggIDList.get(index) == aggregatorID) {
+        return;
+      }
+    }
+    aggIDList.add(aggregatorID);
   }
 
   private void mergeMaps(Map<String, Set<String>> destmap, Map<String, Set<String>> srcmap)
@@ -1514,11 +2186,27 @@ public class DimensionalConfigurationSchema
    *
    * @return The dimensionsDescriptorIDToAggregatorIDs map.
    */
+  public List<IntArrayList> getDimensionsDescriptorIDToIncrementalAggregatorIDs()
+  {
+    return dimensionsDescriptorIDToIncrementalAggregatorIDs;
+  }
+
+  /**
+   * this is the old interface, keep it as was.
+   *
+   * @return The dimensionsDescriptorIDToAggregatorIDs map.
+   */
   public List<IntArrayList> getDimensionsDescriptorIDToAggregatorIDs()
   {
-    return dimensionsDescriptorIDToAggregatorIDs;
+    return getDimensionsDescriptorIDToIncrementalAggregatorIDs();
+  }
+  
+  public List<IntArrayList> getDimensionsDescriptorIDToCompositeAggregatorIDs()
+  {
+    return dimensionsDescriptorIDToCompositeAggregatorIDs;
   }
 
+
   /**
    * Returns the dimensionsDescriptorIDToKeys map.
    *
@@ -1593,6 +2281,12 @@ public class DimensionalConfigurationSchema
     return dimensionsDescriptorIDToOTFAggregatorToAggregateDescriptor;
   }
 
+  @VisibleForTesting
+  public List<Map<String, FieldsDescriptor>> getDimensionsDescriptorIDToCompositeAggregatorToAggregateDescriptor()
+  {
+    return dimensionsDescriptorIDToCompositeAggregatorToAggregateDescriptor;
+  }
+
   @Override
   public int hashCode()
   {
@@ -1619,7 +2313,8 @@ public class DimensionalConfigurationSchema
     hash = 97 * hash + (this.dimensionsDescriptorIDToAggregatorIDToOutputAggregatorDescriptor != null ?
         this.dimensionsDescriptorIDToAggregatorIDToOutputAggregatorDescriptor.hashCode() : 0);
     hash = 97 * hash +
-        (this.dimensionsDescriptorIDToAggregatorIDs != null ? this.dimensionsDescriptorIDToAggregatorIDs.hashCode() :
+        (this.dimensionsDescriptorIDToIncrementalAggregatorIDs != null ?
+            this.dimensionsDescriptorIDToIncrementalAggregatorIDs.hashCode() :
             0);
     hash = 97 * hash + (this.dimensionsDescriptorIDToFieldToAggregatorAdditionalValues != null ?
         this.dimensionsDescriptorIDToFieldToAggregatorAdditionalValues.hashCode() : 0);
@@ -1656,14 +2351,14 @@ public class DimensionalConfigurationSchema
       return false;
     }
     if (this.dimensionsDescriptorIDToKeyDescriptor != other.dimensionsDescriptorIDToKeyDescriptor &&
-        (this.dimensionsDescriptorIDToKeyDescriptor == null || !this.dimensionsDescriptorIDToKeyDescriptor.equals(
-        other.dimensionsDescriptorIDToKeyDescriptor))) {
+        (this.dimensionsDescriptorIDToKeyDescriptor == null ||
+        !this.dimensionsDescriptorIDToKeyDescriptor.equals(other.dimensionsDescriptorIDToKeyDescriptor))) {
       return false;
     }
     if (this.dimensionsDescriptorIDToDimensionsDescriptor != other.dimensionsDescriptorIDToDimensionsDescriptor &&
         (this.dimensionsDescriptorIDToDimensionsDescriptor == null ||
-        !this.dimensionsDescriptorIDToDimensionsDescriptor.equals(
-        other.dimensionsDescriptorIDToDimensionsDescriptor))) {
+        !this.dimensionsDescriptorIDToDimensionsDescriptor
+        .equals(other.dimensionsDescriptorIDToDimensionsDescriptor))) {
       return false;
     }
     if (this.dimensionsDescriptorIDToValueToAggregator != other.dimensionsDescriptorIDToValueToAggregator &&
@@ -1673,8 +2368,8 @@ public class DimensionalConfigurationSchema
     }
     if (this.dimensionsDescriptorIDToValueToOTFAggregator != other.dimensionsDescriptorIDToValueToOTFAggregator &&
         (this.dimensionsDescriptorIDToValueToOTFAggregator == null ||
-        !this.dimensionsDescriptorIDToValueToOTFAggregator.equals(
-        other.dimensionsDescriptorIDToValueToOTFAggregator))) {
+        !this.dimensionsDescriptorIDToValueToOTFAggregator
+        .equals(other.dimensionsDescriptorIDToValueToOTFAggregator))) {
       return false;
     }
     if (this.dimensionsDescriptorIDToAggregatorToAggregateDescriptor !=
@@ -1710,9 +2405,11 @@ public class DimensionalConfigurationSchema
         other.dimensionsDescriptorIDToAggregatorIDToOutputAggregatorDescriptor))) {
       return false;
     }
-    if (this.dimensionsDescriptorIDToAggregatorIDs != other.dimensionsDescriptorIDToAggregatorIDs &&
-        (this.dimensionsDescriptorIDToAggregatorIDs == null || !this.dimensionsDescriptorIDToAggregatorIDs.equals(
-        other.dimensionsDescriptorIDToAggregatorIDs))) {
+    if (this.dimensionsDescriptorIDToIncrementalAggregatorIDs !=
+        other.dimensionsDescriptorIDToIncrementalAggregatorIDs &&
+        (this.dimensionsDescriptorIDToIncrementalAggregatorIDs == null ||
+        !this.dimensionsDescriptorIDToIncrementalAggregatorIDs.equals(
+        other.dimensionsDescriptorIDToIncrementalAggregatorIDs))) {
       return false;
     }
     if (this.dimensionsDescriptorIDToFieldToAggregatorAdditionalValues !=
@@ -1787,6 +2484,16 @@ public class DimensionalConfigurationSchema
     return tags;
   }
 
+  public Map<String, String> getKeyToExpression()
+  {
+    return keyToExpression;
+  }
+
+  public Map<String, String> getValueToExpression()
+  {
+    return valueToExpression;
+  }
+
   /**
    * This class represents a value in the {@link DimensionalConfigurationSchema}.
    */

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5bd1a99e/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
index 30f2c1e..9ac76c8 100644
--- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
+++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java
@@ -98,6 +98,8 @@ public class DimensionalSchema implements Schema
   public static final List<Fields> VALID_TIME_KEYS = ImmutableList.of(
       new Fields(Sets.newHashSet(FIELD_TIME_FROM, FIELD_TIME_TO)));
 
+  public static final String FIELD_RESPONSE_DELAY_MILLS = "responseDelayMillis";
+  
   /**
    * The from value for the schema. Null if there is no from value.
    */
@@ -160,6 +162,8 @@ public class DimensionalSchema implements Schema
    */
   private int schemaID = Schema.DEFAULT_SCHEMA_ID;
 
+  protected long responseDelayMillis;
+  
   /**
    * Constructor for serialization
    */
@@ -178,10 +182,11 @@ public class DimensionalSchema implements Schema
    */
   public DimensionalSchema(String schemaStub,
       DimensionalConfigurationSchema configurationSchema,
-      Map<String, String> schemaKeys)
+      Map<String, String> schemaKeys,
+      long responseDelayMillis)
   {
     this(configurationSchema,
-        schemaKeys);
+        schemaKeys, responseDelayMillis);
 
     if (schemaStub != null) {
       predefinedFromTo = true;
@@ -209,7 +214,7 @@ public class DimensionalSchema implements Schema
   {
     this(schemaStub,
         configurationSchema,
-        schemaKeys);
+        schemaKeys, 0);
 
     this.schemaID = schemaID;
   }
@@ -221,9 +226,12 @@ public class DimensionalSchema implements Schema
    * @param configurationSchema The configuration schema to use when creating this {@link DimensionalSchema}.
    */
   public DimensionalSchema(String schemaStub,
-      DimensionalConfigurationSchema configurationSchema)
+      DimensionalConfigurationSchema configurationSchema,
+      long responseDelayMillis)
   {
-    this(schemaStub, configurationSchema, null);
+    this(schemaStub,
+        configurationSchema,
+        null, responseDelayMillis);
   }
 
   /**
@@ -236,11 +244,12 @@ public class DimensionalSchema implements Schema
    */
   public DimensionalSchema(int schemaID,
       String schemaStub,
-      DimensionalConfigurationSchema configurationSchema)
+      DimensionalConfigurationSchema configurationSchema,
+      long responseDelayMillis)
   {
     this(schemaStub,
-        configurationSchema);
-
+        configurationSchema, 
+        responseDelayMillis);
     this.schemaID = schemaID;
   }
 
@@ -251,11 +260,11 @@ public class DimensionalSchema implements Schema
    * @param schemaKeys          The schemaKeys assigned to this schema.
    */
   public DimensionalSchema(DimensionalConfigurationSchema configurationSchema,
-      Map<String, String> schemaKeys)
+      Map<String, String> schemaKeys, long responseDelayMillis)
   {
     setConfigurationSchema(configurationSchema);
     setSchemaKeys(schemaKeys);
-
+    this.responseDelayMillis = responseDelayMillis;
     try {
       initialize();
     } catch (JSONException e) {
@@ -276,7 +285,7 @@ public class DimensionalSchema implements Schema
       Map<String, String> schemaKeys)
   {
     this(configurationSchema,
-        schemaKeys);
+        schemaKeys, 0);
 
     this.schemaID = schemaID;
   }
@@ -290,7 +299,7 @@ public class DimensionalSchema implements Schema
   public DimensionalSchema(DimensionalConfigurationSchema configurationSchema)
   {
     this(configurationSchema,
-        null);
+        null, 0);
   }
 
   /**
@@ -381,6 +390,11 @@ public class DimensionalSchema implements Schema
 
     schema.put(SnapshotSchema.FIELD_SCHEMA_TYPE, DimensionalSchema.SCHEMA_TYPE);
     schema.put(SnapshotSchema.FIELD_SCHEMA_VERSION, DimensionalSchema.SCHEMA_VERSION);
+    
+    //responseDelayMillis
+    if (responseDelayMillis > 0) {
+      schema.put(FIELD_RESPONSE_DELAY_MILLS, responseDelayMillis);
+    }
 
     if (!configurationSchema.getTags().isEmpty()) {
       schema.put(FIELD_TAGS, new JSONArray(configurationSchema.getTags()));
@@ -444,7 +458,10 @@ public class DimensionalSchema implements Schema
     for (int combinationID = 0;
         combinationID < configurationSchema.getDimensionsDescriptorIDToKeys().size();
         combinationID++) {
-
+      
+      //TODO: the auto-generated combination for computation of composite aggregator will be added.
+      //should remove it.
+      
       Fields fields = configurationSchema.getDimensionsDescriptorIDToKeys().get(combinationID);
       Map<String, Set<String>> fieldToAggregatorAdditionalValues =
           configurationSchema.getDimensionsDescriptorIDToFieldToAggregatorAdditionalValues().get(combinationID);
@@ -497,7 +514,7 @@ public class DimensionalSchema implements Schema
 
         combination.put(DimensionalConfigurationSchema.FIELD_DIMENSIONS_ADDITIONAL_VALUES, additionalValueArray);
       }
-
+      
       dimensions.put(combination);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5bd1a99e/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregator.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregator.java
new file mode 100644
index 0000000..bcbb223
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregator.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import java.util.Set;
+
+import org.apache.apex.malhar.lib.dimensions.DimensionsConversionContext;
+
+import com.google.common.collect.Sets;
+
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+
+/**
+ * SimpleCompositAggregator is the aggregator which embed other aggregator
+ *
+ *
+ */
+public abstract class AbstractCompositeAggregator implements CompositeAggregator
+{
+  private static final long serialVersionUID = 661710563764433621L;
+
+  protected String embedAggregatorName;
+  protected int dimensionDescriptorID;
+  protected int aggregatorID;
+  protected FieldsDescriptor aggregateDescriptor;
+  //protected int embedAggregatorID;
+  protected Set<Integer> embedAggregatorDdIds = Sets.newHashSet();
+  protected Set<String> fields = Sets.newHashSet();
+  
+  protected DimensionsConversionContext dimensionsConversionContext;
+  
+  public DimensionsConversionContext getDimensionsConversionContext()
+  {
+    return dimensionsConversionContext;
+  }
+
+  public void setDimensionsConversionContext(DimensionsConversionContext dimensionsConversionContext)
+  {
+    this.dimensionsConversionContext = dimensionsConversionContext;
+  }
+
+  public AbstractCompositeAggregator withDimensionsConversionContext(
+      DimensionsConversionContext dimensionsConversionContext)
+  {
+    this.setDimensionsConversionContext(dimensionsConversionContext);
+    return this;
+  }
+  
+  public String getEmbedAggregatorName()
+  {
+    return embedAggregatorName;
+  }
+
+  public void setEmbedAggregatorName(String embedAggregatorName)
+  {
+    this.embedAggregatorName = embedAggregatorName;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((dimensionsConversionContext == null) ? 0 : dimensionsConversionContext.hashCode());
+//    result = prime * result + ((embedAggregator == null) ? 0 : embedAggregator.hashCode());
+    result = prime * result + ((embedAggregatorName == null) ? 0 : embedAggregatorName.hashCode());
+    return result;
+  }
+
+
+  @Override
+  public int getDimensionDescriptorID()
+  {
+    return dimensionDescriptorID;
+  }
+
+  public void setDimensionDescriptorID(int dimensionDescriptorID)
+  {
+    this.dimensionDescriptorID = dimensionDescriptorID;
+  }
+  
+  @Override
+  public int getAggregatorID()
+  {
+    return aggregatorID;
+  }
+
+  public void setAggregatorID(int aggregatorID)
+  {
+    this.aggregatorID = aggregatorID;
+  }
+
+  @Override
+  public FieldsDescriptor getAggregateDescriptor()
+  {
+    return aggregateDescriptor;
+  }
+
+  public void setAggregateDescriptor(FieldsDescriptor aggregateDescriptor)
+  {
+    this.aggregateDescriptor = aggregateDescriptor;
+  }
+  
+  @Override
+  public Set<String> getFields()
+  {
+    return fields;
+  }
+
+  public void setFields(Set<String> fields)
+  {
+    this.fields = fields;
+  }
+
+  @Override
+  public int getSchemaID()
+  {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  //implement this, the ddid in fact should be a set or list. or return the first ddid, and use the timebucket to
+  //get other ddids. or think about get rid of this method in this class and implement outside. if the embeded
+  //aggregator is OTF, just keep the ddid of OTF as depended incremental aggregators should have same ddid
+  @Override
+  public Set<Integer> getEmbedAggregatorDdIds()
+  {
+    return embedAggregatorDdIds;
+  }
+
+  public void addEmbedAggregatorDdId(int ddid)
+  {
+    embedAggregatorDdIds.add(ddid);
+  }
+
+  public void addEmbedAggregatorDdIds(Set<Integer> ddids)
+  {
+    embedAggregatorDdIds.addAll(ddids);
+  }
+  
+  /**
+   * bright: TODO: check
+   */
+  @Override
+  public FieldsDescriptor getMetaDataDescriptor()
+  {
+    return null;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5bd1a99e/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregatorFactory.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregatorFactory.java
new file mode 100644
index 0000000..9a71e30
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregatorFactory.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import java.util.Map;
+
+public abstract class AbstractCompositeAggregatorFactory implements CompositeAggregatorFactory
+{
+  protected static final String NAME_TEMPLATE = "%s-%s-%s";
+  protected static final String PROPERTY_SEPERATOR = "_";
+  protected static final String PROPERTY_VALUE_SEPERATOR = "|";
+  
+  @Override
+  public String getCompositeAggregatorName(String aggregatorType, String embededAggregatorName,
+      Map<String, Object> properties)
+  {
+    return String.format(NAME_TEMPLATE, aggregatorType, embededAggregatorName, getNamePartialForProperties(properties));
+  }
+
+  protected String getNamePartialForProperties(Map<String, Object> properties)
+  {
+    if (properties.size() == 1) {
+      return properties.values().iterator().next().toString();
+    }
+    StringBuilder sb = new StringBuilder();
+    for (Map.Entry<String, Object> entry : properties.entrySet()) {
+      sb.append(entry.getKey()).append(PROPERTY_VALUE_SEPERATOR).append(entry.getValue()).append(PROPERTY_SEPERATOR);
+    }
+    //delete the last one (PROPERTY_SEPERATOR)
+    return sb.deleteCharAt(sb.length() - 1).toString();
+  }
+}
+


Mime
View raw message