phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject git commit: PHOENIX-1188 Performance regression for non-aggregate queries
Date Thu, 21 Aug 2014 07:40:20 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.0 e2922d43c -> 1987fba12


PHOENIX-1188 Performance regression for non-aggregate queries

Conflicts:
	phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1987fba1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1987fba1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1987fba1

Branch: refs/heads/4.0
Commit: 1987fba121dd693c5b26f364dc2081176f15bb38
Parents: e2922d4
Author: James Taylor <jtaylor@salesforce.com>
Authored: Thu Aug 21 00:04:12 2014 -0700
Committer: James Taylor <jtaylor@salesforce.com>
Committed: Thu Aug 21 00:44:08 2014 -0700

----------------------------------------------------------------------
 .../phoenix/iterate/ChunkedResultIterator.java  | 85 +++++++-------------
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  6 --
 .../phoenix/query/QueryServicesOptions.java     |  9 ++-
 .../apache/phoenix/trace/TraceMetricSource.java |  4 +-
 4 files changed, 38 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/1987fba1/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index 3f6ed81..c702e99 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -23,15 +23,19 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * {@code PeekingResultIterator} implementation that loads data in chunks. This is intended
for
  * basic scan plans, to avoid loading large quantities of data from HBase in one go.
@@ -40,7 +44,7 @@ public class ChunkedResultIterator implements PeekingResultIterator {
     private static final Logger logger = LoggerFactory.getLogger(ChunkedResultIterator.class);
 
     private final ParallelIterators.ParallelIteratorFactory delegateIteratorFactory;
-    private SingleChunkResultIterator singleChunkResultIterator;
+    private ImmutableBytesWritable lastKey = new ImmutableBytesWritable();
     private final StatementContext context;
     private final TableRef tableRef;
     private Scan scan;
@@ -70,12 +74,19 @@ public class ChunkedResultIterator implements PeekingResultIterator {
     }
 
     public ChunkedResultIterator(ParallelIterators.ParallelIteratorFactory delegateIteratorFactory,
-            StatementContext context, TableRef tableRef, Scan scan, long chunkSize) {
+            StatementContext context, TableRef tableRef, Scan scan, long chunkSize) throws
SQLException {
         this.delegateIteratorFactory = delegateIteratorFactory;
         this.context = context;
         this.tableRef = tableRef;
         this.scan = scan;
         this.chunkSize = chunkSize;
+        // Instantiate single chunk iterator and the delegate iterator in constructor
+        // to get parallel scans kicked off in separate threads. If we delay this,
+        // we'll get serialized behavior (see PHOENIX-
+        if (logger.isDebugEnabled()) logger.debug("Get first chunked result iterator over
" + tableRef.getTable().getName().getString() + " with " + scan);
+        ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(
+                new TableResultIterator(context, tableRef, scan), chunkSize);
+        resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator,
scan);
     }
 
     @Override
@@ -95,26 +106,16 @@ public class ChunkedResultIterator implements PeekingResultIterator {
 
     @Override
     public void close() throws SQLException {
-        if (resultIterator != null) {
-            resultIterator.close();
-        }
-        if (singleChunkResultIterator != null) {
-            singleChunkResultIterator.close();
-        }
+        resultIterator.close();
     }
 
     private PeekingResultIterator getResultIterator() throws SQLException {
-        if (resultIterator == null) {
-            if (logger.isDebugEnabled()) logger.debug("Get first chunked result iterator
over " + tableRef.getTable().getName().getString() + " with " + scan);
-            singleChunkResultIterator = new SingleChunkResultIterator(
-                    new TableResultIterator(context, tableRef, scan), chunkSize);
-            resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator,
scan);
-        } else if (resultIterator.peek() == null && !singleChunkResultIterator.isEndOfStreamReached())
{
-            singleChunkResultIterator.close();
+        if (resultIterator.peek() == null && lastKey != null) {
+            resultIterator.close();
             scan = ScanUtil.newScan(scan);
-            scan.setStartRow(singleChunkResultIterator.getLastKey());
+            scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey));
             if (logger.isDebugEnabled()) logger.debug("Get next chunked result iterator over
" + tableRef.getTable().getName().getString() + " with " + scan);
-            singleChunkResultIterator = new SingleChunkResultIterator(
+            ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(
                     new TableResultIterator(context, tableRef, scan), chunkSize);
             resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator,
scan);
         }
@@ -124,23 +125,22 @@ public class ChunkedResultIterator implements PeekingResultIterator
{
     /**
      * ResultIterator that runs over a single chunk of results (i.e. a portion of a scan).
      */
-    private static class SingleChunkResultIterator implements ResultIterator {
+    private class SingleChunkResultIterator implements ResultIterator {
 
         private int rowCount = 0;
         private boolean chunkComplete;
-        private boolean endOfStreamReached;
-        private Tuple lastTuple;
         private final ResultIterator delegate;
         private final long chunkSize;
 
         private SingleChunkResultIterator(ResultIterator delegate, long chunkSize) {
+            Preconditions.checkArgument(chunkSize > 0);
             this.delegate = delegate;
             this.chunkSize = chunkSize;
         }
 
         @Override
         public Tuple next() throws SQLException {
-            if (isChunkComplete() || isEndOfStreamReached()) {
+            if (chunkComplete || lastKey == null) {
                 return null;
             }
             Tuple next = delegate.next();
@@ -149,15 +149,15 @@ public class ChunkedResultIterator implements PeekingResultIterator
{
                 // necessary for (at least) hash joins, as they can return multiple rows
with the
                 // same row key. Stopping a chunk at a row key boundary is necessary in order
to
                 // be able to start the next chunk on the next row key
-                if (rowCount >= chunkSize && rowKeyChanged(lastTuple, next)) {
+                if (rowCount == chunkSize) {
+                    next.getKey(lastKey);
+                } else if (rowCount > chunkSize && rowKeyChanged(next)) {
                     chunkComplete = true;
-                    lastTuple = next;
                     return null;
                 }
-                lastTuple = next;
                 rowCount++;
             } else {
-                endOfStreamReached = true;
+                lastKey = null;
             }
             return next;
         }
@@ -172,36 +172,13 @@ public class ChunkedResultIterator implements PeekingResultIterator
{
             delegate.close();
         }
 
-        /**
-         * Returns true if the current chunk has been fully iterated over.
-         */
-        public boolean isChunkComplete() {
-            return chunkComplete;
-        }
-
-        /**
-         * Returns true if the end of all chunks has been reached.
-         */
-        public boolean isEndOfStreamReached() {
-            return endOfStreamReached;
-        }
-
-        /**
-         * Returns the last-encountered key.
-         */
-        public byte[] getLastKey() {
-            ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
-            lastTuple.getKey(keyPtr);
-            return keyPtr.get();
-        }
-
-        private boolean rowKeyChanged(Tuple lastTuple, Tuple newTuple) {
-            ImmutableBytesWritable oldKeyPtr = new ImmutableBytesWritable();
-            ImmutableBytesWritable newKeyPtr = new ImmutableBytesWritable();
-            lastTuple.getKey(oldKeyPtr);
-            newTuple.getKey(newKeyPtr);
+        private boolean rowKeyChanged(Tuple newTuple) {
+            byte[] currentKey = lastKey.get();
+            int offset = lastKey.getOffset();
+            int length = lastKey.getLength();
+            newTuple.getKey(lastKey);
 
-            return oldKeyPtr.compareTo(newKeyPtr) != 0;
+            return Bytes.compareTo(currentKey, offset, length, lastKey.get(), lastKey.getOffset(),
lastKey.getLength()) != 0;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1987fba1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 5a9dae9..622fc8a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -51,8 +51,6 @@ import java.util.concurrent.Executor;
 
 import javax.annotation.Nullable;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.call.CallRunner;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -75,7 +73,6 @@ import org.apache.phoenix.schema.PMetaData.Pruner;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.trace.TracingCompat;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.JDBCUtil;
@@ -86,7 +83,6 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 import org.cloudera.htrace.Sampler;
-import org.cloudera.htrace.Trace;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Strings;
@@ -108,8 +104,6 @@ import com.google.common.collect.Maps;
  * @since 0.1
  */
 public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jdbc7Shim.Connection,
MetaDataMutated  {
-    private static final Log LOG = LogFactory.getLog(PhoenixConnection.class);
-
     private final String url;
     private final ConnectionQueryServices services;
     private final Properties info;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1987fba1/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 3bfc703..e6cd94e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -101,10 +101,11 @@ public class QueryServicesOptions {
     public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 1024 * 1;
// 1 Mb
     public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 5;
     public static final long DEFAULT_MAX_SPOOL_TO_DISK_BYTES = 1024000000;
-    // We make the default chunk size one row smaller than the default scan cache size because
-    // one extra row is typically read and discarded by the ChunkedResultIterator, and we
don't
-    // want to fill up a whole new cache to read a single extra record
-    public static final long DEFAULT_SCAN_RESULT_CHUNK_SIZE = DEFAULT_SCAN_CACHE_SIZE - 1L;
+    // Only the first chunked batches are fetched in parallel, so this default
+    // should be on the relatively bigger side of things. Bigger means more
+    // latency and client-side spooling/buffering. Smaller means less initial
+    // latency and less parallelization.
+    public static final long DEFAULT_SCAN_RESULT_CHUNK_SIZE = 2999;
     
     // 
     // Spillable GroupBy - SPGBY prefix

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1987fba1/phoenix-hadoop2-compat/src/main/java/org/apache/phoenix/trace/TraceMetricSource.java
----------------------------------------------------------------------
diff --git a/phoenix-hadoop2-compat/src/main/java/org/apache/phoenix/trace/TraceMetricSource.java
b/phoenix-hadoop2-compat/src/main/java/org/apache/phoenix/trace/TraceMetricSource.java
index 5374695..ec40722 100644
--- a/phoenix-hadoop2-compat/src/main/java/org/apache/phoenix/trace/TraceMetricSource.java
+++ b/phoenix-hadoop2-compat/src/main/java/org/apache/phoenix/trace/TraceMetricSource.java
@@ -18,11 +18,11 @@
 package org.apache.phoenix.trace;
 
 import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION;
-import static org.apache.phoenix.metrics.MetricInfo.TAG;
 import static org.apache.phoenix.metrics.MetricInfo.END;
 import static org.apache.phoenix.metrics.MetricInfo.PARENT;
 import static org.apache.phoenix.metrics.MetricInfo.SPAN;
 import static org.apache.phoenix.metrics.MetricInfo.START;
+import static org.apache.phoenix.metrics.MetricInfo.TAG;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -136,7 +136,7 @@ public class TraceMetricSource implements PhoenixSpanReceiver, MetricsSource
{
     // this is also necessary to ensure that we register the metrics source as an MBean (avoiding
a
     // runtime warning)
     MetricsRecordBuilder marker = collector.addRecord(TracingCompat.METRICS_MARKER_CONTEXT);
-    marker.add(new MetricsTag((MetricsInfo) new MetricsInfoImpl("stat", "num spans"), Integer
+    marker.add(new MetricsTag(new MetricsInfoImpl("stat", "num spans"), Integer
         .toString(spans.size())));
 
     // actually convert the known spans into metric records as well


Mime
View raw message