apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From timothyfar...@apache.org
Subject [1/3] incubator-apex-malhar git commit: APEXMALHAR-2055 #resolve #comment Move dimensions related feature to Malhar
Date Fri, 15 Apr 2016 01:53:18 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master f1d70673d -> 8b2e6a297


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5bd1a99e/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractTopBottomAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractTopBottomAggregator.java
b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractTopBottomAggregator.java
new file mode 100644
index 0000000..ed3c577
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractTopBottomAggregator.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.EventKey;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.lib.appdata.gpo.GPOMutable;
+import com.datatorrent.lib.appdata.schemas.Type;
+
+
+public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggregator
+{
+  public static final String PROP_COUNT = "count";
+  protected int count;
+  protected SortedSet<String> subCombinations = Sets.newTreeSet();
+  
+  public AbstractTopBottomAggregator withEmbedAggregatorName(String embedAggregatorName)
+  {
+    this.setEmbedAggregatorName(embedAggregatorName);
+    return this;
+  }
+
+  public AbstractTopBottomAggregator withSubCombinations(String[] subCombinations)
+  {
+    this.setSubCombinations(subCombinations);
+    return this;
+  }
+  
+  public AbstractTopBottomAggregator withCount(int count)
+  {
+    this.setCount(count);
+    return this;
+  }
+
+  public int getCount()
+  {
+    return count;
+  }
+
+  public void setCount(int count)
+  {
+    this.count = count;
+  }
+  
+  public void setSubCombinations(Set<String> subCombinations)
+  {
+    this.subCombinations.clear();
+    this.subCombinations.addAll(subCombinations);
+  }
+
+  public void setSubCombinations(String[] subCombinations)
+  {
+    setSubCombinations(Sets.newHashSet(subCombinations));
+  }
+
+  public Set<String> getSubCombinations()
+  {
+    return subCombinations;
+  }
+
+  /**
+   * TOP/BOTTOM return a list of value
+   */
+  public Type getOutputType()
+  {
+    return Type.OBJECT;
+  }
+  
+  @Override
+  public int hashCode()
+  {
+    return (embedAggregatorName.hashCode() * 31 + count) * 31 + subCombinations.hashCode();
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    
+    AbstractTopBottomAggregator other = (AbstractTopBottomAggregator)obj;
+    if (embedAggregatorName != other.embedAggregatorName
+        && (embedAggregatorName == null || !embedAggregatorName.equals(other.embedAggregatorName)))
{
+      return false;
+    }
+    if (count != other.count) {
+      return false;
+    }
+    if (subCombinations != other.subCombinations
+        && (subCombinations == null || !subCombinations.equals(other.subCombinations)))
{
+      return false;
+    }
+
+    return true;
+  }
+  
+  
+  /**
+   * The result keep a list of object for each aggregate value
+   * The value of resultAggregate should keep a list of inputEventKey(the value can be get
from cache or load) or a map
+   * from inputEventKey to the value instead of just a list of aggregate value. As the value
could be changed in
+   * current window, and this change should be applied.
+   *
+   * precondition: resultAggregate.eventKey matches with inputSubEventKeys
+   * notes: this algorithm only support TOP for positive values and BOTTOM for negative values
+   */
+  @Override
+  public void aggregate(Aggregate resultAggregate, Set<EventKey> inputSubEventKeys,
+      Map<EventKey, Aggregate> inputAggregatesRepo)
+  {
+    //there are problem for composite's value field descriptor, just ignore now.
+    GPOMutable resultGpo = resultAggregate.getAggregates();
+    final List<String> compositeFieldList = resultAggregate.getEventKey().getKey().getFieldDescriptor().getFieldList();
+    
+    //Map<EventKey, Aggregate> existedSubEventKeyToAggregate = Maps.newHashMap();
+    for (String valueField : resultGpo.getFieldDescriptor().getFieldList()) {
+      //the resultGpo keep a list of sub aggregates
+      updateAggregate(resultAggregate, valueField, inputSubEventKeys, inputAggregatesRepo);
+
+      //compare the existed sub aggregates with the new input aggregates to update the list
+      for (EventKey eventKey : inputSubEventKeys) {
+        aggregate(compositeFieldList, resultGpo, eventKey, inputAggregatesRepo.get(eventKey).getAggregates());
+      }
+    }
+
+  }
+
+  protected transient List<String> tmpStoreFieldList = Lists.newArrayList();
+  protected static final String KEY_VALUE_SEPERATOR = "-";
+
+  /**
+   * get store map key from the eventKey
+   * 
+   * @param eventKey
+   * @return
+   */
+  protected String getStoreMapKey(EventKey subEventKey, List<String> compositeEventFieldList)
+  {
+    tmpStoreFieldList.clear();
+    tmpStoreFieldList.addAll(subEventKey.getKey().getFieldDescriptor().getFieldList());
+    tmpStoreFieldList.removeAll(compositeEventFieldList);
+    Collections.sort(tmpStoreFieldList);
+    StringBuilder key = new StringBuilder();
+    for (String field: tmpStoreFieldList) {
+      key.append(subEventKey.getKey().getField(field)).append(KEY_VALUE_SEPERATOR);
+    }
+    key.deleteCharAt(key.length() - 1);
+    
+    return key.toString();
+  }
+  
+
+  /**
+   * update existed sub aggregate. 
+   * The sub aggregates which kept in composite aggregate as candidate could be changed.
synchronize the value with
+   * input aggregates.
+   * 
+   * @param resultAggregate
+   * @param valueField
+   * @param inputSubEventKeys
+   * @param inputAggregatesRepo
+   */
+  @SuppressWarnings("unchecked")
+  protected void updateAggregate(Aggregate resultAggregate, String valueField,
+      Set<EventKey> inputSubEventKeys, Map<EventKey, Aggregate> inputAggregatesRepo)
+  {
+    Map<String, Object> resultAggregateFieldToValue =
+        (Map<String, Object>)resultAggregate.getAggregates().getFieldObject(valueField);
+    if (resultAggregateFieldToValue == null) {
+      return;
+    }
+
+    for (EventKey inputSubEventKey : inputSubEventKeys) {
+      Aggregate inputSubAggregate = inputAggregatesRepo.get(inputSubEventKey);
+      String mapKey = getStoreMapKey(inputSubAggregate.getEventKey(),
+          resultAggregate.getEventKey().getKey().getFieldDescriptor().getFieldList());
+      //Aggregate existedAggregate = existedSubEventKeyToAggregate.get(inputSubEventKey);
+      if (resultAggregateFieldToValue.get(mapKey) != null) {
+        resultAggregateFieldToValue.put(mapKey, inputSubAggregate.getAggregates().getField(valueField));
+      }
+    }
+  }
+  
+  /**
+   * need a map of value field from the inputGpo to resultGpo, use the index of Fields as
the index
+   * @param resultGpo
+   * @param inputGpo
+   */
+  @SuppressWarnings("unchecked")
+  protected void aggregate(final List<String> compositeFieldList, GPOMutable resultGpo,
+      EventKey subEventKey, GPOMutable inputGpo)
+  {
+    //the field and type should get from resultGpo instead of inputGpo as inputGpo maybe
shared by other value fields
+    List<String> aggregateFields = resultGpo.getFieldDescriptor().getFieldList();
+    Map<String, Type> fieldToType = resultGpo.getFieldDescriptor().getFieldToType();
+    for (String aggregateField : aggregateFields) {
+      Map<String, Object> fieldValue = (Map<String, Object>)resultGpo.getFieldObject(aggregateField);
+      if (fieldValue == null) {
+        fieldValue = createAggregateValueForField(aggregateField, fieldToType.get(aggregateField));
+        resultGpo.setFieldObject(aggregateField, fieldValue);
+      }
+      aggregate(compositeFieldList, fieldValue, subEventKey, inputGpo.getField(aggregateField),
+          fieldToType.get(aggregateField));
+    }
+  }
+  
+  /**
+   * seperate it in case sub class override it.
+   * @param fieldName
+   * @param fieldElementType
+   * @return
+   */
+  protected Map<String, Object> createAggregateValueForField(String fieldName, Type
fieldElementType)
+  {
+    return Maps.newHashMap();
+  }
+  
+  /**
+   * compare the result(resultMap) with input(inputFieldName, inputFieldValue)
+   * @param resultMap
+   * @param inputFieldValue
+   * @param type
+   */
+  protected void aggregate(final List<String> compositeFieldList, Map<String, Object>
resultMap,
+      EventKey subEventKey, Object inputFieldValue, Type type)
+  {
+    if (resultMap.size() < count) {
+      resultMap.put(getStoreMapKey(subEventKey, compositeFieldList), inputFieldValue);
+      return;
+    }
+    for (String key : resultMap.keySet()) {
+      Object resultValue = resultMap.get(key);
+      if (shouldReplaceResultElement(resultValue, inputFieldValue, type)) {
+        resultMap.put(key, inputFieldValue);
+        break;
+      }
+    }
+
+  }
+  
+  /**
+   * shoud the result element replaced by input element.
+   * the inputElement and resultElement should be same type
+   * @param resultElement
+   * @param inputElement
+   * @param type
+   * @return
+   */
+  protected boolean shouldReplaceResultElement(Object resultElement, Object inputElement,
Type type)
+  {
+    if (inputElement == null) {
+      return false;
+    }
+
+    if (resultElement == null) {
+      return true;
+    }
+
+    if (resultElement instanceof Comparable) {
+      @SuppressWarnings("unchecked")
+      int compareResult = ((Comparable<Object>)resultElement).compareTo(inputElement);
+      return shouldReplaceResultElement(compareResult);
+    }
+    
+    //handle other cases
+    throw new RuntimeException("Should NOT come here.");
+    
+  }
+  
+  protected abstract boolean shouldReplaceResultElement(int resultCompareToInput);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5bd1a99e/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorBottom.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorBottom.java
b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorBottom.java
new file mode 100644
index 0000000..955a466
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorBottom.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+public class AggregatorBottom extends AbstractTopBottomAggregator
+{
+  @Override
+  protected boolean shouldReplaceResultElement(int resultCompareToInput)
+  {
+    return resultCompareToInput < 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5bd1a99e/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java
b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java
index fd9fc56..6482c3b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java
@@ -21,6 +21,7 @@ package org.apache.apex.malhar.lib.dimensions.aggregator;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,6 +76,20 @@ public class AggregatorRegistry implements Serializable
       AggregatorIncrementalType.NAME_TO_ORDINAL);
 
   /**
+   * create an new instance of AggregatorRegistry instead of of share same one in case one
application has multiple
+   * schema
+   * @return new created AggregatorRegistry instance;
+   */
+  public static final AggregatorRegistry newDefaultAggregatorRegistry()
+  {
+    AggregatorRegistry aggregatorRegistry = new AggregatorRegistry(
+        DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR, DEFAULT_NAME_TO_OTF_AGGREGATOR,
+        AggregatorIncrementalType.NAME_TO_ORDINAL);
+    aggregatorRegistry.setup();
+    return aggregatorRegistry;
+  }
+
+  /**
    * This is a flag indicating whether or not this {@link AggregatorRegistry} has been setup
before or not.
    */
   private transient boolean setup = false;
@@ -93,6 +108,9 @@ public class AggregatorRegistry implements Serializable
    * {@link IncrementalAggregator} to the corresponding {@link IncrementalAggregator}.
    */
   private transient Map<Integer, IncrementalAggregator> incrementalAggregatorIDToAggregator;
+  
+  protected transient Map<Integer, AbstractTopBottomAggregator> topBottomAggregatorIDToAggregator;
+  
   /**
    * This is a map from the name assigned to an {@link IncrementalAggregator} to the {@link
IncrementalAggregator}.
    */
@@ -101,10 +119,21 @@ public class AggregatorRegistry implements Serializable
    * This is a map from the name assigned to an {@link OTFAggregator} to the {@link OTFAggregator}.
    */
   private Map<String, OTFAggregator> nameToOTFAggregator;
+  
+  /**
+   * the map from TOPN and BOTTOM aggregator to name
+   */
+  private Map<String, AbstractTopBottomAggregator> nameToTopBottomAggregator = Maps.newHashMap();
+  
   /**
    * This is a map from the name of an {@link IncrementalAggregator} to the ID of that {@link
IncrementalAggregator}.
    */
   private Map<String, Integer> incrementalAggregatorNameToID;
+  
+  protected Map<String, Integer> topBottomAggregatorNameToID = Maps.newHashMap();
+  
+  protected static Set<String> topBottomAggregatorNames;
+
 
   /**
    * This is a helper method used to autogenerate the IDs for each {@link IncrementalAggregator}
@@ -240,6 +269,16 @@ public class AggregatorRegistry implements Serializable
       Preconditions.checkNotNull(entry.getKey());
       Preconditions.checkNotNull(entry.getValue());
     }
+    
+    for (Map.Entry<String, Integer> entry : topBottomAggregatorNameToID.entrySet())
{
+      Preconditions.checkNotNull(entry.getKey());
+      Preconditions.checkNotNull(entry.getValue());
+    }
+    
+    for (Map.Entry<String, AbstractTopBottomAggregator> entry : nameToTopBottomAggregator.entrySet())
{
+      Preconditions.checkNotNull(entry.getKey());
+      Preconditions.checkNotNull(entry.getValue());
+    }
   }
 
   /**
@@ -287,6 +326,18 @@ public class AggregatorRegistry implements Serializable
     }
   }
 
+  public void buildTopBottomAggregatorIDToAggregator()
+  {
+    topBottomAggregatorIDToAggregator = Maps.newHashMap();
+
+    for (Map.Entry<String, Integer> entry : topBottomAggregatorNameToID.entrySet())
{
+      String aggregatorName = entry.getKey();
+      int aggregatorID = entry.getValue();
+      topBottomAggregatorIDToAggregator.put(aggregatorID,
+          nameToTopBottomAggregator.get(aggregatorName));
+    }
+  }
+  
   /**
    * This is a helper method which sets and validated the given mapping from an {@link IncrementalAggregator}'s
name
    * to an {@link IncrementalAggregator}.
@@ -320,10 +371,17 @@ public class AggregatorRegistry implements Serializable
    */
   public boolean isAggregator(String aggregatorName)
   {
-    return classToIncrementalAggregatorName.values().contains(aggregatorName) ||
-        nameToOTFAggregator.containsKey(aggregatorName);
+    if ( classToIncrementalAggregatorName.values().contains(aggregatorName) ||
+        nameToOTFAggregator.containsKey(aggregatorName)) {
+      return true;
+    }
+    
+    //the composite probably send whole aggregator name
+    String aggregatorType = aggregatorName.split("-")[0];
+    return (AggregatorTopBottomType.valueOf(aggregatorType) != null);
   }
 
+
   /**
    * Checks if the given aggregator name is the name of an {@link IncrementalAggregator}
registered
    * to this registry.
@@ -337,6 +395,16 @@ public class AggregatorRegistry implements Serializable
     return classToIncrementalAggregatorName.values().contains(aggregatorName);
   }
 
+  public boolean isOTFAggregator(String aggregatorName)
+  {
+    return nameToOTFAggregator.containsKey(aggregatorName);
+  }
+  
+  public boolean isTopBottomAggregatorType(String aggregatorType)
+  {
+    return (AggregatorTopBottomType.valueOf(aggregatorType) != null);
+  }
+  
   /**
    * Gets the mapping from an {@link IncrementalAggregator}'s class to the {@link IncrementalAggregator}.
    *
@@ -357,6 +425,11 @@ public class AggregatorRegistry implements Serializable
     return incrementalAggregatorIDToAggregator;
   }
 
+  public Map<Integer, AbstractTopBottomAggregator> getTopBottomAggregatorIDToAggregator()
+  {
+    return topBottomAggregatorIDToAggregator;
+  }
+
   /**
    * This a helper method which sets and validates the mapping from {@link IncrementalAggregator}
name to
    * {@link IncrementalAggregator} ID.
@@ -388,6 +461,11 @@ public class AggregatorRegistry implements Serializable
     return incrementalAggregatorNameToID;
   }
 
+  public Map<String, Integer> getTopBottomAggregatorNameToID()
+  {
+    return topBottomAggregatorNameToID;
+  }
+
   /**
    * Returns the name to {@link OTFAggregator} mapping, where the key is the name of the
{@link OTFAggregator}.
    *
@@ -398,6 +476,11 @@ public class AggregatorRegistry implements Serializable
     return nameToOTFAggregator;
   }
 
+  public Map<String, AbstractTopBottomAggregator> getNameToTopBottomAggregator()
+  {
+    return nameToTopBottomAggregator;
+  }
+
   /**
    * Returns the mapping from {@link OTFAggregator} names to a list of names of all the child
aggregators of
    * that {@link OTFAggregator}.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5bd1a99e/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorTop.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorTop.java
b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorTop.java
new file mode 100644
index 0000000..a47050b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorTop.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+public class AggregatorTop extends AbstractTopBottomAggregator
+{
+  @Override
+  protected boolean shouldReplaceResultElement(int resultCompareToInput)
+  {
+    return resultCompareToInput > 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5bd1a99e/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorTopBottomType.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorTopBottomType.java
b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorTopBottomType.java
new file mode 100644
index 0000000..bca2aac
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorTopBottomType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+public enum AggregatorTopBottomType
+{
+  TOPN,
+  BOTTOMN;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5bd1a99e/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java
b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java
index 6085254..d9ad83d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java
@@ -23,6 +23,8 @@ import java.util.Map;
 
 import com.google.common.collect.Maps;
 
+import com.datatorrent.lib.appdata.gpo.Serde;
+import com.datatorrent.lib.appdata.gpo.SerdeMapPrimitive;
 import com.datatorrent.lib.appdata.schemas.Fields;
 import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
 import com.datatorrent.lib.appdata.schemas.Type;
@@ -145,4 +147,22 @@ public final class AggregatorUtils
 
     return new FieldsDescriptor(fieldToType);
   }
+
+  public static FieldsDescriptor getOutputFieldsDescriptor(FieldsDescriptor inputFieldsDescriptor,
+      CompositeAggregator compositeAggregator)
+  {
+    Map<String, Type> fieldToType = Maps.newHashMap();
+    Map<String, Serde> fieldToSerde = Maps.newHashMap();
+
+    for (Map.Entry<String, Type> entry : inputFieldsDescriptor.getFieldToType().entrySet())
{
+      String fieldName = entry.getKey();
+      Type outputType = compositeAggregator.getOutputType();
+      fieldToType.put(fieldName, outputType);
+
+      fieldToSerde.put(fieldName, SerdeMapPrimitive.INSTANCE);
+    }
+
+    return new FieldsDescriptor(fieldToType, fieldToSerde);
+  }
+        
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5bd1a99e/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregator.java
b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregator.java
new file mode 100644
index 0000000..64ce2bd
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregator.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate;
+import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.EventKey;
+
+import com.datatorrent.lib.appdata.schemas.FieldsDescriptor;
+import com.datatorrent.lib.appdata.schemas.Type;
+
+public interface CompositeAggregator
+{
+  public int getSchemaID();
+
+  public int getDimensionDescriptorID();
+
+  public int getAggregatorID();
+  
+  public Set<Integer> getEmbedAggregatorDdIds();
+  
+  public Set<String> getFields();
+
+  public FieldsDescriptor getAggregateDescriptor();
+  
+  public FieldsDescriptor getMetaDataDescriptor();
+
+  /**
+   * Returns the output type of the {@link CompositeAggregator}. <b>Note<b> that
any combination of input types
+   * will produce the same output type for {@link CompositeAggregator}s.
+   * @return The output type of the {@link CompositeAggregator}.
+   */
+  public Type getOutputType();
+  
+  /**
+   * 
+   * @param resultAggregate the aggregate to put the result
+   * @param inputEventKeys The input(incremental) event keys, used to locate the input aggregates
+   * @param inputAggregatesRepo: the map of the EventKey to Aggregate keep the super set
of aggregate required
+   */
+  public void aggregate(Aggregate resultAggregate, Set<EventKey> inputEventKeys, Map<EventKey,
+      Aggregate> inputAggregatesRepo);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5bd1a99e/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregatorFactory.java
b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregatorFactory.java
new file mode 100644
index 0000000..9fb9b74
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregatorFactory.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import java.util.Map;
+
+/**
+ * this factory is implemented for support TOPN and BOTTOMN right now.
+ * we are not clear what other composite aggregator could be, provide interface here.
+ * assume Composite only embed one aggregate and with some properties
+ */
+public interface CompositeAggregatorFactory
+{
+  /**
+   * check if aggregatorName is a valid composite aggregator name or not.
+   * @param aggregatorName
+   * @return
+   */
+  //public boolean isValidCompositeAggregatorName(String aggregatorName);
+  
+  /**
+   * get composite aggregator name based on composite aggregator information
+   * @param aggregatorType
+   * @param embedAggregatorName
+   * @param properties
+   * @return
+   */
+  public String getCompositeAggregatorName(String aggregatorType, String embedAggregatorName,
+      Map<String, Object> properties);
+  
+  /**
+   * create composite aggregator name based on composite aggregator information
+   * @param aggregatorType
+   * @param embedAggregatorName
+   * @param properties
+   * @return
+   */
+  public <T> CompositeAggregator createCompositeAggregator(String aggregatorType, String
embedAggregatorName,
+      Map<String, Object> properties);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5bd1a99e/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/DefaultCompositeAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/DefaultCompositeAggregatorFactory.java
b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/DefaultCompositeAggregatorFactory.java
new file mode 100644
index 0000000..ccfd6cd
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/DefaultCompositeAggregatorFactory.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+/**
+ * The DefaultCompositeAggregatorFactory find the specific factory according to the aggregator
type
+ * and delegate to the specific factory.
+ * 
+ */
+public class DefaultCompositeAggregatorFactory implements CompositeAggregatorFactory
+{
+  public static final DefaultCompositeAggregatorFactory defaultInst = new DefaultCompositeAggregatorFactory()
+      .addFactory(AggregatorTopBottomType.TOPN.name(), TopBottomAggregatorFactory.defaultInstance)
+      .addFactory(AggregatorTopBottomType.BOTTOMN.name(), TopBottomAggregatorFactory.defaultInstance);
+  
+  protected Map<String, CompositeAggregatorFactory> factoryRepository = Maps.newHashMap();
+  
+  @Override
+  public String getCompositeAggregatorName(String aggregatorType, String embedAggregatorName,
+      Map<String, Object> properties)
+  {
+    return findSpecificFactory(aggregatorType).getCompositeAggregatorName(aggregatorType,
+        embedAggregatorName, properties);
+  }
+
+  @Override
+  public <T> CompositeAggregator createCompositeAggregator(String aggregatorType, String
embedAggregatorName,
+      Map<String, Object> properties)
+  {
+    return findSpecificFactory(aggregatorType).createCompositeAggregator(aggregatorType,
+        embedAggregatorName, properties);
+  }
+
+  protected CompositeAggregatorFactory findSpecificFactory(String aggregatorType)
+  {
+    return factoryRepository.get(aggregatorType);
+  }
+  
+  public DefaultCompositeAggregatorFactory addFactory(String aggregatorType, CompositeAggregatorFactory
factory)
+  {
+    factoryRepository.put(aggregatorType, factory);
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5bd1a99e/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/TopBottomAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/TopBottomAggregatorFactory.java
b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/TopBottomAggregatorFactory.java
new file mode 100644
index 0000000..461faff
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/TopBottomAggregatorFactory.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dimensions.aggregator;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+public class TopBottomAggregatorFactory extends AbstractCompositeAggregatorFactory
+{
+  public static final String PROPERTY_NAME_EMBEDED_AGGREGATOR = "embededAggregator";
+  public static final String PROPERTY_NAME_COUNT = "count";
+  public static final String PROPERTY_NAME_SUB_COMBINATIONS = "subCombinations";
+
+  public static final TopBottomAggregatorFactory defaultInstance = new TopBottomAggregatorFactory();
+  
+  @Override
+  public <T> AbstractTopBottomAggregator createCompositeAggregator(String aggregatorType,
String embedAggregatorName,
+      Map<String, Object> properties)
+  {
+    return createTopBottomAggregator(aggregatorType, embedAggregatorName, getCount(properties),
+        getSubCombinations(properties));
+  }
+  
+  public <T> AbstractTopBottomAggregator createTopBottomAggregator(String aggregatorType,
String embedAggregatorName,
+      int count, String[] subCombinations)
+  {
+    AbstractTopBottomAggregator aggregator = null;
+    if (AggregatorTopBottomType.TOPN == AggregatorTopBottomType.valueOf(aggregatorType))
{
+      aggregator = new AggregatorTop();
+    }
+    if (AggregatorTopBottomType.BOTTOMN == AggregatorTopBottomType.valueOf(aggregatorType))
{
+      aggregator = new AggregatorBottom();
+    }
+    if (aggregator == null) {
+      throw new IllegalArgumentException("Invalid composite type: " + aggregatorType);
+    }
+    aggregator.setEmbedAggregatorName(embedAggregatorName);
+    aggregator.setCount(count);
+    aggregator.setSubCombinations(subCombinations);
+    
+    return aggregator;
+  }
+
+  protected int getCount(Map<String, Object> properties)
+  {
+    return Integer.valueOf((String)properties.get(PROPERTY_NAME_COUNT));
+  }
+  
+  protected String[] getSubCombinations(Map<String, Object> properties)
+  {
+    return (String[])properties.get(PROPERTY_NAME_SUB_COMBINATIONS);
+  }
+  
+  /**
+   * The properties of TOP or BOTTOM are count and subCombinations.
+   * count only have one value and subCombinations is a set of string, we can order combinations
to simplify the name
+   */
+  @Override
+  protected String getNamePartialForProperties(Map<String, Object> properties)
+  {
+    StringBuilder sb = new StringBuilder();
+    String count = (String)properties.get(PROPERTY_NAME_COUNT);
+    sb.append(count).append(PROPERTY_SEPERATOR);
+    
+    String[] subCombinations =  (String[])properties.get(PROPERTY_NAME_SUB_COMBINATIONS);
+    Set<String> sortedSubCombinations = Sets.newTreeSet();
+    for (String subCombination : subCombinations) {
+      sortedSubCombinations.add(subCombination);
+    }
+    
+    for (String subCombination : sortedSubCombinations) {
+      sb.append(subCombination).append(PROPERTY_SEPERATOR);
+    }
+
+    //delete the last one (PROPERTY_SEPERATOR)
+    return sb.deleteCharAt(sb.length() - 1).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5bd1a99e/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
index ac8c7e2..859cd75 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
@@ -97,7 +97,7 @@ public class FileSplitterBaseTest
     @Override
     protected void finished(Description description)
     {
-      TestUtils.deleteTargetTestClassFolder(description);
+      TestUtils.deleteTargetTestClassFolderQuietly(description);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5bd1a99e/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
index 37aa7e7..9af15b4 100644
--- a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
+++ b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
@@ -63,6 +63,11 @@ public class TestUtils
     }
   }
 
+  public static void deleteTargetTestClassFolderQuietly(Description description)
+  {
+    FileUtils.deleteQuietly(new File("target/" + description.getClassName()));
+  }
+  
   @SuppressWarnings({ "unchecked", "rawtypes" })
   public static <S extends Sink, T> S setSink(OutputPort<T> port, S sink)
   {



Mime
View raw message