apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hsy...@apache.org
Subject [1/3] apex-malhar git commit: APEXMALHAR-2169 Removed the stuff related to dynamic partition based on load from AbstractKafkaInputOperator.
Date Tue, 23 Aug 2016 17:20:30 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master 255bc11c5 -> ef42c52a1


APEXMALHAR-2169 Removed the stuff related to dynamic partition based on load from AbstractKafkaInputOperator.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/e4285484
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/e4285484
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/e4285484

Branch: refs/heads/master
Commit: e4285484f98714436ed29210032c17ffd5a7d639
Parents: 1700725
Author: chaitanya <chaithu@apache.org>
Authored: Sat Aug 20 18:10:02 2016 +0530
Committer: chaitanya <chaithu@apache.org>
Committed: Sat Aug 20 18:44:40 2016 +0530

----------------------------------------------------------------------
 .../kafka/AbstractKafkaInputOperator.java       | 180 +------------------
 1 file changed, 5 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e4285484/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index d4945ec..9d2e664 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -155,10 +155,12 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer>
implem
   // By default the partition policy is 1:1
   public PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
 
-  // default resource is unlimited in terms of msgs per second
+  // Deprecated: Please don't use this property.
+  @Deprecated
   private long msgRateUpperBound = Long.MAX_VALUE;
 
-  // default resource is unlimited in terms of bytes per second
+  // Deprecated: Please don't use this property.
+  @Deprecated
   private long byteRateUpperBound = Long.MAX_VALUE;
 
   // Store the current operator partition topology
@@ -601,46 +603,6 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer>
implem
         windowDataManager.partitioned(newManagers, deletedOperators);
         return partitions;
       }
-      else {
-
-        logger.info("[ONE_TO_MANY]: Repartition the operator(s) under " + msgRateUpperBound
+ " msgs/s and " + byteRateUpperBound + " bytes/s hard limit");
-        // size of the list depends on the load and capacity of each operator
-        newPartitions = new LinkedList<Partitioner.Partition<AbstractKafkaInputOperator<K>>>();
-
-        // Use first-fit decreasing algorithm to minimize the container number and somewhat
balance the partition
-        // try to balance the load and minimize the number of containers with each container's
load under the threshold
-        // the partition based on the latest 1 minute moving average
-        Map<KafkaPartition, long[]> kPIntakeRate = new HashMap<KafkaPartition, long[]>();
-        // get the offset for all partitions of each consumer
-        Map<KafkaPartition, Long> offsetTrack = new HashMap<KafkaPartition, Long>();
-        for (Partitioner.Partition<AbstractKafkaInputOperator<K>> partition :
partitions) {
-          List<Stats.OperatorStats> opss = partition.getStats().getLastWindowedStats();
-          if (opss == null || opss.size() == 0) {
-            continue;
-          }
-          offsetTrack.putAll(partition.getPartitionedInstance().consumer.getCurrentOffsets());
-          // Get the latest stats
-
-          Stats.OperatorStats stat = partition.getStats().getLastWindowedStats().get(partition.getStats().getLastWindowedStats().size()
- 1);
-          if (stat.counters instanceof KafkaConsumer.KafkaMeterStats) {
-            KafkaConsumer.KafkaMeterStats kms = (KafkaConsumer.KafkaMeterStats) stat.counters;
-            kPIntakeRate.putAll(get_1minMovingAvgParMap(kms));
-          }
-        }
-
-        List<PartitionInfo> partitionInfos = firstFitDecreasingAlgo(kPIntakeRate);
-
-        // Add the existing partition Ids to the deleted operators
-        for(Partitioner.Partition<AbstractKafkaInputOperator<K>> op : partitions)
-        {
-          deletedOperators.add(op.getPartitionedInstance().operatorId);
-        }
-        for (PartitionInfo r : partitionInfos) {
-          logger.info("[ONE_TO_MANY]: Create operator partition for kafka partition(s): "
+ StringUtils.join(r.kpids, ", ") + ", topic: " + this.getConsumer().topic);
-          newPartitions.add(createPartition(r.kpids, offsetTrack, newManagers));
-        }
-        currentPartitionInfo.addAll(partitionInfos);
-      }
       break;
 
     case ONE_TO_MANY_HEURISTIC:
@@ -674,54 +636,6 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer>
implem
     return p;
   }
 
-  private List<PartitionInfo> firstFitDecreasingAlgo(final Map<KafkaPartition, long[]>
kPIntakeRate)
-  {
-    // (Decreasing) Sort the map by msgs/s and bytes/s in descending order
-    List<Map.Entry<KafkaPartition, long[]>> sortedMapEntry = new LinkedList<Map.Entry<KafkaPartition,
long[]>>(kPIntakeRate.entrySet());
-    Collections.sort(sortedMapEntry, new Comparator<Map.Entry<KafkaPartition, long[]>>()
-    {
-      @Override
-      public int compare(Map.Entry<KafkaPartition, long[]> firstEntry, Map.Entry<KafkaPartition,
long[]> secondEntry)
-      {
-        long[] firstPair = firstEntry.getValue();
-        long[] secondPair = secondEntry.getValue();
-        if (msgRateUpperBound == Long.MAX_VALUE || firstPair[0] == secondPair[0]) {
-          return (int) (secondPair[1] - firstPair[1]);
-        } else {
-          return (int) (secondPair[0] - firstPair[0]);
-        }
-      }
-    });
-
-    // (First-fit) Look for first fit operator to assign the consumer
-    // Go over all the kafka partitions and look for the right operator to assign to
-    // Each record has a set of kafka partition ids and the resource left for that operator
after assigned the consumers for those partitions
-    List<PartitionInfo> pif = new LinkedList<PartitionInfo>();
-    outer:
-    for (Map.Entry<KafkaPartition, long[]> entry : sortedMapEntry) {
-      long[] resourceRequired = entry.getValue();
-      for (PartitionInfo r : pif) {
-        if (r.msgRateLeft > resourceRequired[0] && r.byteRateLeft > resourceRequired[1])
{
-          // found first fit operator partition that has enough resource for this consumer
-          // add consumer to the operator partition
-          r.kpids.add(entry.getKey());
-          // update the resource left in this partition
-          r.msgRateLeft -= r.msgRateLeft == Long.MAX_VALUE ? 0 : resourceRequired[0];
-          r.byteRateLeft -= r.byteRateLeft == Long.MAX_VALUE ? 0 : resourceRequired[1];
-          continue outer;
-        }
-      }
-      // didn't find the existing "operator" to assign this consumer
-      PartitionInfo nr = new PartitionInfo();
-      nr.kpids = Sets.newHashSet(entry.getKey());
-      nr.msgRateLeft = msgRateUpperBound == Long.MAX_VALUE ? msgRateUpperBound : msgRateUpperBound
- resourceRequired[0];
-      nr.byteRateLeft = byteRateUpperBound == Long.MAX_VALUE ? byteRateUpperBound : byteRateUpperBound
- resourceRequired[1];
-      pif.add(nr);
-    }
-
-    return pif;
-  }
-
   @Override
   public StatsListener.Response processStats(StatsListener.BatchedOperatorStats stats)
   {
@@ -830,97 +744,13 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer>
implem
         }
       }
 
-      if (strategy == PartitionStrategy.ONE_TO_ONE) {
-        return false;
-      }
-
-      // This is expensive part and only every repartitionCheckInterval it will check existing
the overall partitions
-      // and see if there is more optimal solution
-      // The decision is made by 2 constraint
-      // Hard constraint which is upper bound overall msgs/s or bytes/s
-      // Soft constraint which is more optimal solution
-
-      boolean b = breakHardConstraint(kstats) || breakSoftConstraint();
-      if (b) {
-        currentPartitionInfo.clear();
-        kafkaStatsHolder.clear();
-      }
-      return b;
+      return false;
     } finally {
       // update last  check time
       lastCheckTime = System.currentTimeMillis();
     }
   }
 
-  /**
-   * Check to see if there is other more optimal(less partition) partition assignment based
on current statistics
-   *
-   * @return True if all windowed stats indicate different partition size we need to adjust
the partition.
-   */
-  private boolean breakSoftConstraint()
-  {
-    if (kafkaStatsHolder.size() != currentPartitionInfo.size()) {
-      return false;
-    }
-    int length = kafkaStatsHolder.get(kafkaStatsHolder.keySet().iterator().next()).size();
-    for (int j = 0; j < length; j++) {
-      Map<KafkaPartition, long[]> kPIntakeRate = new HashMap<KafkaPartition, long[]>();
-      for (Integer pid : kafkaStatsHolder.keySet()) {
-        if(kafkaStatsHolder.get(pid).size() <= j)
-          continue;
-        kPIntakeRate.putAll(get_1minMovingAvgParMap(kafkaStatsHolder.get(pid).get(j)));
-      }
-      if (kPIntakeRate.size() == 0) {
-        return false;
-      }
-      List<PartitionInfo> partitionInfo = firstFitDecreasingAlgo(kPIntakeRate);
-      if (partitionInfo.size() == 0 || partitionInfo.size() == currentPartitionInfo.size())
{
-        return false;
-      }
-    }
-    // if all windowed stats indicate different partition size we need to adjust the partition
-    return true;
-  }
-
-  /**
-   * Check if all the statistics within the windows break the upper bound hard limit in msgs/s
or bytes/s
-   *
-   * @return True if all the statistics within the windows break the upper bound hard limit
in msgs/s or bytes/s.
-   */
-  private boolean breakHardConstraint(List<KafkaConsumer.KafkaMeterStats> kmss)
-  {
-    // Only care about the KafkaMeterStats
-
-    // if there is no kafka meter stats at all, don't repartition
-    if (kmss == null || kmss.size() == 0) {
-      return false;
-    }
-    // if all the stats within the window have msgs/s above the upper bound threshold (hard
limit)
-    boolean needRP = Iterators.all(kmss.iterator(), new Predicate<KafkaConsumer.KafkaMeterStats>()
-    {
-      @Override
-      public boolean apply(KafkaConsumer.KafkaMeterStats kms)
-      {
-        // If there are more than 1 kafka partition and the total msg/s reach the limit
-        return kms.partitionStats.size() > 1 && kms.totalMsgPerSec > msgRateUpperBound;
-      }
-    });
-
-    // or all the stats within the window have bytes/s above the upper bound threshold (hard
limit)
-    needRP = needRP || Iterators.all(kmss.iterator(), new Predicate<KafkaConsumer.KafkaMeterStats>()
-    {
-      @Override
-      public boolean apply(KafkaConsumer.KafkaMeterStats kms)
-      {
-        //If there are more than 1 kafka partition and the total bytes/s reach the limit
-        return kms.partitionStats.size() > 1 && kms.totalBytesPerSec > byteRateUpperBound;
-      }
-    });
-
-    return needRP;
-
-  }
-
   public static enum PartitionStrategy
   {
     /**


Mime
View raw message