drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ve...@apache.org
Subject drill git commit: DRILL-2225: Fix missing PartitionSenderRootExec stats.
Date Wed, 11 Mar 2015 01:04:07 GMT
Repository: drill
Updated Branches:
  refs/heads/master 0368adf0c -> b3bdc2790


DRILL-2225: Fix missing PartitionSenderRootExec stats.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b3bdc279
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b3bdc279
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b3bdc279

Branch: refs/heads/master
Commit: b3bdc2790bffcba48ed41f867da3dfb33885445f
Parents: 0368adf
Author: vkorukanti <venki.korukanti@gmail.com>
Authored: Tue Mar 10 10:51:25 2015 -0700
Committer: vkorukanti <venki.korukanti@gmail.com>
Committed: Tue Mar 10 10:51:25 2015 -0700

----------------------------------------------------------------------
 .../partitionsender/PartitionSenderRootExec.java    | 16 +++++++++-------
 .../impl/partitionsender/PartitionerTemplate.java   | 14 --------------
 2 files changed, 9 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/b3bdc279/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index ccbd289..7df69a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -98,6 +98,8 @@ public class PartitionSenderRootExec extends BaseRootExec {
     this.statusHandler = new StatusHandler(sendCount, context);
     this.remainingReceivers = new AtomicIntegerArray(outGoingBatchCount);
     this.remaingReceiverCount = new AtomicInteger(outGoingBatchCount);
+
+    stats.setLongStat(Metric.N_RECEIVERS, outGoingBatchCount);
   }
 
   @Override
@@ -223,19 +225,18 @@ public class PartitionSenderRootExec extends BaseRootExec {
     }
   }
 
-  public void updateStats(List<? extends PartitionOutgoingBatch> outgoing) {
-    long records = 0;
+  /**
+   * Find min and max record count seen across the outgoing batches and put them in stats.
+   * @param outgoing
+   */
+  private void updateAggregateStats(List<? extends PartitionOutgoingBatch> outgoing)
{
     for (PartitionOutgoingBatch o : outgoing) {
       long totalRecords = o.getTotalRecords();
       minReceiverRecordCount = Math.min(minReceiverRecordCount, totalRecords);
       maxReceiverRecordCount = Math.max(maxReceiverRecordCount, totalRecords);
-      records += totalRecords;
     }
-    stats.addLongStat(Metric.BATCHES_SENT, 1);
-    stats.addLongStat(Metric.RECORDS_SENT, records);
     stats.setLongStat(Metric.MIN_RECORDS, minReceiverRecordCount);
     stats.setLongStat(Metric.MAX_RECORDS, maxReceiverRecordCount);
-    stats.setLongStat(Metric.N_RECEIVERS, outgoing.size());
   }
 
   @Override
@@ -254,6 +255,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
     logger.debug("Partition sender stopping.");
     ok = false;
     if (partitioner != null) {
+      updateAggregateStats(partitioner.getOutgoingBatches());
       partitioner.clear();
     }
     sendCount.waitForSendComplete();
@@ -287,6 +289,6 @@ public class PartitionSenderRootExec extends BaseRootExec {
       }
       sendCount.increment();
     }
+    stats.addLongStat(Metric.BATCHES_SENT, 1);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/b3bdc279/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 1d9088a..93d719c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -39,7 +39,6 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.FragmentWritableBatch;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
@@ -94,9 +93,6 @@ public abstract class PartitionerTemplate implements Partitioner {
     }
 
     for (MinorFragmentEndpoint destination : popConfig.getDestinations()) {
-      FragmentHandle opposite = context.getHandle().toBuilder()
-          .setMajorFragmentId(popConfig.getOppositeMajorFragmentId())
-          .setMinorFragmentId(destination.getId()).build();
       outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig,
           context.getDataTunnel(destination.getEndpoint()), context, oContext.getAllocator(),
destination.getId(), statusHandler));
     }
@@ -187,16 +183,6 @@ public abstract class PartitionerTemplate implements Partitioner {
     }
   }
 
-  private String composeTooBigMsg(int recordId, RecordBatch incoming) {
-    String msg = String.format("Record " + recordId + " is too big to fit into the allocated
memory of ValueVector.");
-    msg += " Schema: ";
-    for (int i = 0; i < incoming.getSchema().getFieldCount(); i++) {
-      MaterializedField f = incoming.getSchema().getColumn(i);
-      msg += f.getPath().getRootSegment().getPath() + " ";
-    }
-    return msg;
-  }
-
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming")
RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
   public abstract int doEval(@Named("inIndex") int inIndex);
 


Mime
View raw message