pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: more optimizations for sql (#3139)
Date Fri, 07 Dec 2018 21:41:50 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 32d6334  more optimizations for sql (#3139)
32d6334 is described below

commit 32d6334d10e578ea1adff93139ca8efca4afe277
Author: Boyang Jerry Peng <jerry.boyang.peng@gmail.com>
AuthorDate: Fri Dec 7 13:41:45 2018 -0800

    more optimizations for sql (#3139)
    
    * more optimizations for sql
    
    * cleaning up
    
    * adding jctools dependency
    
    * add jctools to license
---
 pulsar-sql/presto-distribution/LICENSE             |   2 +
 pulsar-sql/presto-pulsar/pom.xml                   |   7 +
 .../pulsar/sql/presto/PulsarRecordCursor.java      | 166 ++++++++++++---------
 3 files changed, 106 insertions(+), 69 deletions(-)

diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index 201269c..6424768 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -390,6 +390,8 @@ The Apache Software License, Version 2.0
     - simpleclient_servlet-0.5.0.jar
   * LZ4
     - lz4-java-1.5.0.jar
+  * JCTools
+    - jctools-core-2.1.2.jar
 
 Protocol Buffers License
  * Protocol Buffers
diff --git a/pulsar-sql/presto-pulsar/pom.xml b/pulsar-sql/presto-pulsar/pom.xml
index a46d3ea..e3b2021 100644
--- a/pulsar-sql/presto-pulsar/pom.xml
+++ b/pulsar-sql/presto-pulsar/pom.xml
@@ -39,6 +39,7 @@
         <dep.javax-validation.version>1.1.0.Final</dep.javax-validation.version>
         <dep.javax-inject.version>1</dep.javax-inject.version>
         <dep.guava.version>24.1-jre</dep.guava.version>
+        <jctools.version>2.1.2</jctools.version>
     </properties>
 
     <dependencies>
@@ -89,6 +90,12 @@
             <artifactId>managed-ledger</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.jctools</groupId>
+            <artifactId>jctools-core</artifactId>
+            <version>${jctools.version}</version>
+        </dependency>
         <!-- Presto SPI -->
         <dependency>
             <groupId>com.facebook.presto</groupId>
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 4088868..e1791b4 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -41,12 +41,12 @@ import org.apache.pulsar.client.impl.MessageParser;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
@@ -69,14 +69,14 @@ public class PulsarRecordCursor implements RecordCursor {
     private PulsarSplit pulsarSplit;
     private PulsarConnectorConfig pulsarConnectorConfig;
     private ReadOnlyCursor cursor;
-    private ArrayBlockingQueue<Message> messageQueue;
-    private ArrayBlockingQueue<Entry> entryQueue;
+    private SpscArrayQueue<Message> messageQueue;
+    private SpscArrayQueue<Entry> entryQueue;
     private Object currentRecord;
     private Message currentMessage;
     private Map<String, PulsarInternalColumn> internalColumnMap = PulsarInternalColumn.getInternalFieldsMap();
     private SchemaHandler schemaHandler;
     private int maxBatchSize;
-    private AtomicLong completedBytes = new AtomicLong(0L);
+    private long completedBytes = 0;
     private ReadEntries readEntries;
     private DeserializeEntries deserializeEntries;
     private TopicName topicName;
@@ -85,10 +85,18 @@ public class PulsarRecordCursor implements RecordCursor {
     // Stats total execution time of split
     private long startTime;
 
+    // Used to make sure we don't finish before all entries are processed since entries that
have been dequeued
+    // but not been deserialized and added messages to the message queue can be missed if
we just check if the queues
+    // are empty or not
+    private final long splitSize;
+    private long entriesProcessed = 0;
+
+
     private static final Logger log = Logger.get(PulsarRecordCursor.class);
 
     public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit,
                               PulsarConnectorConfig pulsarConnectorConfig) {
+        this.splitSize = pulsarSplit.getSplitSize();
         // Set start time for split
         this.startTime = System.nanoTime();
         PulsarConnectorCache pulsarConnectorCache;
@@ -107,6 +115,7 @@ public class PulsarRecordCursor implements RecordCursor {
     // Exposed for testing purposes
     PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit,
PulsarConnectorConfig
             pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, PulsarConnectorMetricsTracker
pulsarConnectorMetricsTracker) {
+        this.splitSize = pulsarSplit.getSplitSize();
         initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory,
pulsarConnectorMetricsTracker);
     }
 
@@ -117,8 +126,8 @@ public class PulsarRecordCursor implements RecordCursor {
         this.pulsarSplit = pulsarSplit;
         this.pulsarConnectorConfig = pulsarConnectorConfig;
         this.maxBatchSize = pulsarConnectorConfig.getMaxEntryReadBatchSize();
-        this.messageQueue = new ArrayBlockingQueue<>(pulsarConnectorConfig.getMaxSplitMessageQueueSize());
-        this.entryQueue = new ArrayBlockingQueue<>(pulsarConnectorConfig.getMaxSplitEntryQueueSize());
+        this.messageQueue = new SpscArrayQueue(pulsarConnectorConfig.getMaxSplitMessageQueueSize());
+        this.entryQueue = new SpscArrayQueue(pulsarConnectorConfig.getMaxSplitEntryQueueSize());
         this.topicName = TopicName.get("persistent",
                 NamespaceName.get(pulsarSplit.getSchemaName()),
                 pulsarSplit.getTableName());
@@ -168,7 +177,7 @@ public class PulsarRecordCursor implements RecordCursor {
 
     @Override
     public long getCompletedBytes() {
-        return this.completedBytes.get();
+        return this.completedBytes;
     }
 
     @Override
@@ -185,7 +194,7 @@ public class PulsarRecordCursor implements RecordCursor {
     @VisibleForTesting
     class DeserializeEntries implements Runnable {
 
-    protected AtomicBoolean isRunning = new AtomicBoolean(false);
+    protected boolean isRunning = false;
 
         private final Thread thread;
 
@@ -194,7 +203,7 @@ public class PulsarRecordCursor implements RecordCursor {
         }
 
         public void interrupt() {
-            isRunning.set(false);
+            isRunning = false;
             thread.interrupt();
         }
 
@@ -204,62 +213,71 @@ public class PulsarRecordCursor implements RecordCursor {
 
         @Override
         public void run() {
-            isRunning.set(true);
-            while (isRunning.get()) {
-                Entry entry;
-                try {
-                    // start time for entry queue read
-                    metricsTracker.start_ENTRY_QUEUE_DEQUEUE_WAIT_TIME();
-                    // read from entry queue and block if empty
-                    entry = entryQueue.take();
-                    // record entry queue wait time stats
-                    metricsTracker.end_ENTRY_QUEUE_DEQUEUE_WAIT_TIME();
-                } catch (InterruptedException e) {
-                    break;
-                }
-                try {
-                    long bytes = entry.getDataBuffer().readableBytes();
-                    completedBytes.addAndGet(bytes);
-                    // register stats for bytes read
-                    metricsTracker.register_BYTES_READ(bytes);
+            isRunning = true;
+            while (isRunning) {
 
-                    // set start time for time deserializing entries for stats
-                    metricsTracker.start_ENTRY_DESERIALIZE_TIME();
+                 int read = entryQueue.drain(new MessagePassingQueue.Consumer<Entry>()
{
+                    @Override
+                    public void accept(Entry entry) {
 
-                    // filter entries that is not part of my split
-                    if (((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition())
< 0) {
                         try {
-                            MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(),
-                                    entry.getDataBuffer(), (messageId, message, byteBuf)
-> {
-                                        try {
-
-                                            // start time for message queue read
-                                            metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
-
-                                            // enqueue deserialize message from this entry
-                                            messageQueue.put(message);
-
-                                            // stats for how long a read from message queue
took
-                                            metricsTracker.end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
-                                            // stats for number of messages read
-                                            metricsTracker.incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
-
-                                        } catch (InterruptedException e) {
-                                            //no-op
-                                        }
-                                    });
-                        } catch (IOException e) {
-                            log.error(e, "Failed to parse message from pulsar topic %s",
topicName.toString());
-                            throw new RuntimeException(e);
+                            long bytes = entry.getDataBuffer().readableBytes();
+                            completedBytes += bytes;
+                            // register stats for bytes read
+                            metricsTracker.register_BYTES_READ(bytes);
+
+                            // check if we have processed all entries in this split
+                            if (((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition())
>= 0) {
+                                return;
+                            }
+
+                            // set start time for time deserializing entries for stats
+                            metricsTracker.start_ENTRY_DESERIALIZE_TIME();
+
+                            try {
+                                MessageParser.parseMessage(topicName, entry.getLedgerId(),
entry.getEntryId(),
+                                        entry.getDataBuffer(), (messageId, message, byteBuf)
-> {
+                                            try {
+                                                // start time for message queue read
+                                                metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
+
+                                                // enqueue deserialize message from this
entry
+                                                while (!messageQueue.offer(message)) {
+                                                    Thread.sleep(1);
+                                                }
+
+                                                // stats for how long a read from message
queue took
+                                                metricsTracker.end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
+                                                // stats for number of messages read
+                                                metricsTracker.incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
+
+                                            } catch (InterruptedException e) {
+                                                //no-op
+                                            }
+                                        });
+                            } catch (IOException e) {
+                                log.error(e, "Failed to parse message from pulsar topic %s",
topicName.toString());
+                                throw new RuntimeException(e);
+                            }
+                            // stats for time spend deserializing entries
+                            metricsTracker.end_ENTRY_DESERIALIZE_TIME();
+
+                            // stats for num messages per entry
+                            metricsTracker.end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
+
+                        } finally {
+                            entriesProcessed++;
+                            entry.release();
                         }
-                        // stats for time spend deserializing entries
-                        metricsTracker.end_ENTRY_DESERIALIZE_TIME();
+                    }
+                });
 
-                        // stats for num messages per entry
-                        metricsTracker.end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
+                if (read <= 0) {
+                    try {
+                        Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                        return;
                     }
-                } finally {
-                    entry.release();
                 }
             }
         }
@@ -269,7 +287,7 @@ public class PulsarRecordCursor implements RecordCursor {
     class ReadEntries implements AsyncCallbacks.ReadEntriesCallback {
 
         // indicate whether there are any additional entries left to read
-        private final AtomicBoolean isDone = new AtomicBoolean(false);
+        private boolean isDone = false;
 
         //num of outstanding read requests
         // set to 1 because we can only read one batch a time
@@ -281,10 +299,10 @@ public class PulsarRecordCursor implements RecordCursor {
 
                 if (!cursor.hasMoreEntries() || ((PositionImpl) cursor.getReadPosition())
                         .compareTo(pulsarSplit.getEndPosition()) >= 0) {
-                    isDone.set(true);
+                    isDone = true;
 
                 } else {
-                    int batchSize = Math.min(maxBatchSize, entryQueue.remainingCapacity());
+                    int batchSize = Math.min(maxBatchSize, entryQueue.capacity() - entryQueue.size());
 
                     if (batchSize > 0) {
                         outstandingReadsRequests.decrementAndGet();
@@ -302,7 +320,17 @@ public class PulsarRecordCursor implements RecordCursor {
 
         @Override
         public void readEntriesComplete(List<Entry> entries, Object ctx) {
-            entryQueue.addAll(entries);
+
+            entryQueue.fill(new MessagePassingQueue.Supplier<Entry>() {
+                private int i = 0;
+                @Override
+                public Entry get() {
+                    Entry entry = entries.get(i);
+                    i++;
+                    return entry;
+                }
+            }, entries.size());
+
             outstandingReadsRequests.incrementAndGet();
 
             //set read latency stats for success
@@ -312,10 +340,9 @@ public class PulsarRecordCursor implements RecordCursor {
         }
 
         public boolean hashFinished() {
-            return messageQueue.isEmpty() && entryQueue.isEmpty() && isDone.get()
&& outstandingReadsRequests.get() >=1;
+            return messageQueue.isEmpty() && isDone && outstandingReadsRequests.get()
>=1 && splitSize <= entriesProcessed;
         }
 
-
         @Override
         public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
             log.debug(exception, "Failed to read entries from topic %s", topicName.toString());
@@ -346,7 +373,7 @@ public class PulsarRecordCursor implements RecordCursor {
                 return false;
             }
 
-            if (messageQueue.remainingCapacity() > 0) {
+            if ((messageQueue.capacity() - messageQueue.size()) > 0) {
                 readEntries.run();
             }
 
@@ -355,9 +382,9 @@ public class PulsarRecordCursor implements RecordCursor {
                 break;
             } else {
                 try {
-                    Thread.sleep(5);
+                    Thread.sleep(1);
                     // stats for time spent wait to read from message queue because its empty
-                    metricsTracker.register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(5);
+                    metricsTracker.register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(1);
                 } catch (InterruptedException e) {
                     throw new RuntimeException(e);
                 }
@@ -486,6 +513,7 @@ public class PulsarRecordCursor implements RecordCursor {
             this.metricsTracker.register_TOTAL_EXECUTION_TIME(System.nanoTime() - startTime);
             this.metricsTracker.close();
         }
+
     }
 
     private void checkFieldType(int field, Class<?> expected) {


Mime
View raw message