asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "abdullah alamoudi (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: Optimize PartitionWriter
Date Sat, 12 Nov 2016 17:56:05 GMT
abdullah alamoudi has submitted this change and it was merged.

Change subject: Optimize PartitionWriter
......................................................................


Optimize PartitionWriter

Change-Id: Ic36c58b143f3fc2d37b180559c11c1566bcc1a86
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1347
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
---
M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
3 files changed, 22 insertions(+), 11 deletions(-)

Approvals:
  Yingyi Bu: Looks good to me, approved
  Jenkins: Verified; No violations found; Verified



diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index 7f518cd..9463982 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -104,6 +104,12 @@
     }
 
     @Override
+    public int getTupleCount() {
+        // if message is set, there is always a message. that message could be a null message
(TODO: optimize)
+        return tupleCount + ((message == null) ? 0 : 1);
+    }
+
+    @Override
     public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException
{
         if (!initialized) {
             message = TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index c047567..6d87d89 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -40,13 +40,14 @@
     private final FrameTupleAccessor tupleAccessor;
     private final ITuplePartitionComputer tpc;
     private final IHyracksTaskContext ctx;
-    private boolean allocatedFrame = false;
+    private boolean[] allocatedFrames;
 
     public PartitionDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount, IPartitionWriterFactory
pwFactory,
             RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) throws HyracksDataException
{
         this.consumerPartitionCount = consumerPartitionCount;
         pWriters = new IFrameWriter[consumerPartitionCount];
         isOpen = new boolean[consumerPartitionCount];
+        allocatedFrames = new boolean[consumerPartitionCount];
         appenders = new FrameTupleAppender[consumerPartitionCount];
         for (int i = 0; i < consumerPartitionCount; ++i) {
             try {
@@ -70,7 +71,7 @@
         HyracksDataException closeException = null;
         for (int i = 0; i < pWriters.length; ++i) {
             if (isOpen[i]) {
-                if (allocatedFrame) {
+                if (allocatedFrames[i] && appenders[i].getTupleCount() > 0) {
                     try {
                         appenders[i].write(pWriters[i], true);
                     } catch (Throwable th) {
@@ -103,9 +104,6 @@
             isOpen[i] = true;
             pWriters[i].open();
         }
-        if (!allocatedFrame) {
-            allocateFrames();
-        }
     }
 
     @Override
@@ -114,15 +112,16 @@
         int tupleCount = tupleAccessor.getTupleCount();
         for (int i = 0; i < tupleCount; ++i) {
             int h = tpc.partition(tupleAccessor, i, consumerPartitionCount);
+            if (!allocatedFrames[h]) {
+                allocateFrames(h);
+            }
             FrameUtils.appendToWriter(pWriters[h], appenders[h], tupleAccessor, i);
         }
     }
 
-    private void allocateFrames() throws HyracksDataException {
-        for (int i = 0; i < appenders.length; ++i) {
-            appenders[i].reset(new VSizeFrame(ctx), true);
-        }
-        allocatedFrame = true;
+    protected void allocateFrames(int i) throws HyracksDataException {
+        appenders[i].reset(new VSizeFrame(ctx), true);
+        allocatedFrames[i] = true;
     }
 
     @Override
@@ -149,7 +148,9 @@
     @Override
     public void flush() throws HyracksDataException {
         for (int i = 0; i < consumerPartitionCount; i++) {
-            appenders[i].flush(pWriters[i]);
+            if (allocatedFrames[i]) {
+                appenders[i].flush(pWriters[i]);
+            }
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
index 4055fb0..97d5f2b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
@@ -32,6 +32,10 @@
             IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor, ITuplePartitionComputer
tpc)
                     throws HyracksDataException {
         super(ctx, consumerPartitionCount, pwFactory, recordDescriptor, tpc);
+        // since the message partition writer sends broadcast messages, we allocate frames
when we create the writer
+        for (int i = 0; i < consumerPartitionCount; ++i) {
+            allocateFrames(i);
+        }
     }
 
     @Override

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1347
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ic36c58b143f3fc2d37b180559c11c1566bcc1a86
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamousaa@gmail.com>
Gerrit-Reviewer: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Till Westmann <tillw@apache.org>
Gerrit-Reviewer: Yingyi Bu <buyingyi@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message