phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [1/5] PHOENIX-180 Use stats to guide query parallelization (Ramkrishna S Vasudevan)
Date Fri, 19 Sep 2014 23:43:28 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.0 a18862d06 -> 5cdc938e8


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
new file mode 100644
index 0000000..17b5825
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.generated.StatCollectorProtos;
+import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectRequest;
+import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectResponse;
+import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectResponse.Builder;
+import org.apache.phoenix.coprocessor.generated.StatCollectorProtos.StatCollectService;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PhoenixArray;
+import org.apache.phoenix.util.TimeKeeper;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
+/**
+ * An endpoint implementation that allows to collect the stats for a given region and groups the stat per family. This is also an
+ * RegionObserver that collects the information on compaction also. The user would be allowed to invoke this endpoint and thus populate the
+ * Phoenix stats table with the max key, min key and guide posts for the given region. The stats can be consumed by the stats associated
+ * with every PTable and the same can be used to parallelize the queries
+ */
+public class StatisticsCollector extends BaseRegionObserver implements CoprocessorService, Coprocessor,
+        StatisticsTracker, StatCollectService.Interface {
+
+    public static void addToTable(HTableDescriptor desc) throws IOException {
+        desc.addCoprocessor(StatisticsCollector.class.getName());
+    }
+
+    private Map<String, byte[]> minMap = new ConcurrentHashMap<String, byte[]>();
+    private Map<String, byte[]> maxMap = new ConcurrentHashMap<String, byte[]>();
+    private long guidepostDepth;
+    private long byteCount = 0;
+    private Map<String, List<byte[]>> guidePostsMap = new ConcurrentHashMap<String, List<byte[]>>();
+    private Map<ImmutableBytesPtr, Boolean> familyMap = new ConcurrentHashMap<ImmutableBytesPtr, Boolean>();
+    private RegionCoprocessorEnvironment env;
+    protected StatisticsTable stats;
+    // Ensures that either analyze or compaction happens at any point of time.
+    private ReentrantLock lock = new ReentrantLock();
+    private static final Log LOG = LogFactory.getLog(StatisticsCollector.class);
+
+    @Override
+    public void collectStat(RpcController controller, StatCollectRequest request, RpcCallback<StatCollectResponse> done) {
+        HRegion region = env.getRegion();
+        boolean heldLock = false;
+        int count = 0;
+        Builder newBuilder = StatCollectResponse.newBuilder();
+        try {
+            if (lock.tryLock()) {
+                heldLock = true;
+                // Clear all old stats
+                clear();
+                Scan scan = createScan(env.getConfiguration());
+                if (request.hasStartRow()) {
+                    scan.setStartRow(request.getStartRow().toByteArray());
+                }
+                if (request.hasStopRow()) {
+                    scan.setStopRow(request.getStopRow().toByteArray());
+                }
+                RegionScanner scanner = null;
+                try {
+                    scanner = region.getScanner(scan);
+                    count = scanRegion(scanner, count);
+                } catch (IOException e) {
+                    LOG.error(e);
+                    ResponseConverter.setControllerException(controller, e);
+                } finally {
+                    if (scanner != null) {
+                        try {
+                            ArrayList<Mutation> mutations = new ArrayList<Mutation>();
+                            writeStatsToStatsTable(region, scanner, true, mutations, TimeKeeper.SYSTEM.getCurrentTime());
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Committing new stats for the region " + region.getRegionInfo());
+                            }
+                            commitStats(mutations);
+                        } catch (IOException e) {
+                            LOG.error(e);
+                            ResponseConverter.setControllerException(controller, e);
+                        } finally {
+                            clear();
+                        }
+                    }
+                }
+            }
+        } finally {
+            if (heldLock) {
+                lock.unlock();
+            }
+            newBuilder.setRowsScanned(count);
+            StatCollectResponse result = newBuilder.build();
+            done.run(result);
+        }
+    }
+
+    private void writeStatsToStatsTable(final HRegion region,
+            final RegionScanner scanner, boolean delete, List<Mutation> mutations, long currentTime) throws IOException {
+        scanner.close();
+        try {
+            // update the statistics table
+            for (ImmutableBytesPtr fam : familyMap.keySet()) {
+                String tableName = region.getRegionInfo().getTable().getNameAsString();
+                if (delete) {
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("Deleting the stats for the region "+region.getRegionInfo());
+                    }
+                    stats.deleteStats(tableName, region.getRegionInfo().getRegionNameAsString(), this,
+                            Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime);
+                }
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Adding new stats for the region "+region.getRegionInfo());
+                }
+                stats.addStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this,
+                        Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime);
+            }
+        } catch (IOException e) {
+            LOG.error("Failed to update statistics table!", e);
+            throw e;
+        }
+    }
+
+    private void commitStats(List<Mutation> mutations) throws IOException {
+        stats.commitStats(mutations);
+    }
+
+    private void deleteStatsFromStatsTable(final HRegion region, List<Mutation> mutations, long currentTime) throws IOException {
+        try {
+            // update the statistics table
+            for (ImmutableBytesPtr fam : familyMap.keySet()) {
+                String tableName = region.getRegionInfo().getTable().getNameAsString();
+                stats.deleteStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this,
+                        Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime);
+            }
+        } catch (IOException e) {
+            LOG.error("Failed to delete from statistics table!", e);
+            throw e;
+        }
+    }
+
+    private int scanRegion(RegionScanner scanner, int count) throws IOException {
+        List<Cell> results = new ArrayList<Cell>();
+        boolean hasMore = true;
+        while (hasMore) {
+            // Am getting duplicates here. Need to avoid that
+            hasMore = scanner.next(results);
+            updateStat(results);
+            count += results.size();
+            results.clear();
+            while (!hasMore) {
+                break;
+            }
+        }
+        return count;
+    }
+
+    /**
+     * Update the current statistics based on the lastest batch of key-values from the underlying scanner
+     * 
+     * @param results
+     *            next batch of {@link KeyValue}s
+     */
+    protected void updateStat(final List<Cell> results) {
+        for (Cell c : results) {
+            KeyValue kv = KeyValueUtil.ensureKeyValue(c);
+            updateStatistic(kv);
+        }
+    }
+
+    @Override
+    public Service getService() {
+        return StatCollectorProtos.StatCollectService.newReflectiveService(this);
+    }
+
+    @Override
+    public void start(CoprocessorEnvironment env) throws IOException {
+        if (env instanceof RegionCoprocessorEnvironment) {
+            this.env = (RegionCoprocessorEnvironment)env;
+        } else {
+            throw new CoprocessorException("Must be loaded on a table region!");
+        }
+        HTableDescriptor desc = ((RegionCoprocessorEnvironment)env).getRegion().getTableDesc();
+        // Get the stats table associated with the current table on which the CP is
+        // triggered
+        stats = StatisticsTable.getStatisticsTableForCoprocessor(env, desc.getName());
+        guidepostDepth = env.getConfiguration().getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_CONF_KEY,
+                QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH);
+    }
+
+    @Override
+    public void stop(CoprocessorEnvironment env) throws IOException {
+        if (env instanceof RegionCoprocessorEnvironment) {
+            TableName table = ((RegionCoprocessorEnvironment)env).getRegion().getRegionInfo().getTable();
+            // Close only if the table is system table
+            if(table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
+                stats.close();
+            }
+        }
+    }
+
+    @Override
+    public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+            List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s)
+            throws IOException {
+        InternalScanner internalScan = s;
+        TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
+        if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
+            boolean heldLock = false;
+            try {
+                if (lock.tryLock()) {
+                    heldLock = true;
+                    // See if this is for Major compaction
+                    if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
+                        // this is the first CP accessed, so we need to just create a major
+                        // compaction scanner, just
+                        // like in the compactor
+                        if (s == null) {
+                            Scan scan = new Scan();
+                            scan.setMaxVersions(store.getFamily().getMaxVersions());
+                            long smallestReadPoint = store.getSmallestReadPoint();
+                            internalScan = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
+                                    smallestReadPoint, earliestPutTs);
+                        }
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Compaction scanner created for stats");
+                        }
+                        InternalScanner scanner = getInternalScanner(c, store, internalScan,
+                                store.getColumnFamilyName());
+                        if (scanner != null) {
+                            internalScan = scanner;
+                        }
+                    }
+                }
+            } finally {
+                if (heldLock) {
+                    lock.unlock();
+                }
+            }
+        }
+        return internalScan;
+    }
+    
+
+    @Override
+    public void postSplit(ObserverContext<RegionCoprocessorEnvironment> ctx, HRegion l, HRegion r) throws IOException {
+        // Invoke collectStat here
+        HRegion region = ctx.getEnvironment().getRegion();
+        TableName table = region.getRegionInfo().getTable();
+        if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) {
+            if (familyMap != null) {
+                familyMap.clear();
+            }
+            // Create a delete operation on the parent region
+            // Then write the new guide posts for individual regions
+            // TODO : Try making this atomic
+            List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3);
+            long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
+            Configuration conf = ctx.getEnvironment().getConfiguration();
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Collecting stats for the daughter region "+l.getRegionInfo());
+            }
+            collectStatsForSplitRegions(conf, l, region, true, mutations, currentTime);
+            clear();
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Collecting stats for the daughter region "+r.getRegionInfo());
+            }
+            collectStatsForSplitRegions(conf, r, region, false, mutations, currentTime);
+            clear();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo());
+            }
+            commitStats(mutations);
+        }
+    }
+
+    private void collectStatsForSplitRegions(Configuration conf, HRegion daughter, HRegion parent, boolean delete,
+            List<Mutation> mutations, long currentTime) throws IOException {
+        Scan scan = createScan(conf);
+        RegionScanner scanner = null;
+        int count = 0;
+        try {
+            scanner = daughter.getScanner(scan);
+            count = scanRegion(scanner, count);
+        } catch (IOException e) {
+            LOG.error(e);
+            throw e;
+        } finally {
+            if (scanner != null) {
+                try {
+                    if (delete) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Deleting the stats for the parent region " + parent.getRegionInfo());
+                        }
+                        deleteStatsFromStatsTable(parent, mutations, currentTime);
+                    }
+                    writeStatsToStatsTable(daughter, scanner, false, mutations, currentTime);
+                } catch (IOException e) {
+                    LOG.error(e);
+                    throw e;
+                }
+            }
+        }
+    }
+
+    private Scan createScan(Configuration conf) {
+        Scan scan = new Scan();
+        scan.setCaching(
+                conf.getInt(QueryServices.SCAN_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_SCAN_CACHE_SIZE));
+        // do not cache the blocks here
+        scan.setCacheBlocks(false);
+        return scan;
+    }
+
+    protected InternalScanner getInternalScanner(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+            InternalScanner internalScan, String family) {
+        return new StatisticsScanner(this, stats, c.getEnvironment().getRegion().getRegionInfo(), internalScan,
+                Bytes.toBytes(family));
+    }
+
+    @Override
+    public void clear() {
+        this.maxMap.clear();
+        this.minMap.clear();
+        this.guidePostsMap.clear();
+        this.familyMap.clear();
+    }
+
+    @Override
+    public void updateStatistic(KeyValue kv) {
+        byte[] cf = kv.getFamily();
+        familyMap.put(new ImmutableBytesPtr(cf), true);
+        
+        String fam = Bytes.toString(cf);
+        byte[] row = new ImmutableBytesPtr(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())
+                .copyBytesIfNecessary();
+        if (!minMap.containsKey(fam) && !maxMap.containsKey(fam)) {
+            minMap.put(fam, row);
+            // Ideally the max key also should be added in this case
+            maxMap.put(fam, row);
+        } else {
+            if (Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), minMap.get(fam), 0,
+                    minMap.get(fam).length) < 0) {
+                minMap.put(fam, row);
+            }
+            if (Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), maxMap.get(fam), 0,
+                    maxMap.get(fam).length) > 0) {
+                maxMap.put(fam, row);
+            }
+        }
+        byteCount += kv.getLength();
+        // TODO : This can be moved to an interface so that we could collect guide posts in different ways
+        if (byteCount >= guidepostDepth) {
+            if (guidePostsMap.get(fam) != null) {
+                guidePostsMap.get(fam).add(
+                        row);
+            } else {
+                List<byte[]> guidePosts = new ArrayList<byte[]>();
+                guidePosts.add(row);
+                guidePostsMap.put(fam, guidePosts);
+            }
+            // reset the count for the next key
+            byteCount = 0;
+        }
+    }
+
+    @Override
+    public byte[] getMaxKey(String fam) {
+        if (maxMap.get(fam) != null) { return maxMap.get(fam); }
+        return null;
+    }
+
+    @Override
+    public byte[] getMinKey(String fam) {
+        if (minMap.get(fam) != null) { return minMap.get(fam); }
+        return null;
+    }
+
+    @Override
+    public byte[] getGuidePosts(String fam) {
+        if (!guidePostsMap.isEmpty()) {
+            List<byte[]> guidePosts = guidePostsMap.get(fam);
+            if (guidePosts != null) {
+                byte[][] array = new byte[guidePosts.size()][];
+                int i = 0;
+                for (byte[] element : guidePosts) {
+                    array[i] = element;
+                    i++;
+                }
+                PhoenixArray phoenixArray = new PhoenixArray(PDataType.VARBINARY, array);
+                return PDataType.VARBINARY_ARRAY.toBytes(phoenixArray);
+            }
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
new file mode 100644
index 0000000..09174b2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.schema.stat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TimeKeeper;
+
+/**
+ * The scanner that does the scanning to collect the stats during major compaction.{@link StatisticsCollector}
+ */
+public class StatisticsScanner implements InternalScanner {
+    private static final Log LOG = LogFactory.getLog(StatisticsScanner.class);
+    private InternalScanner delegate;
+    private StatisticsTable stats;
+    private HRegionInfo region;
+    private StatisticsTracker tracker;
+    private byte[] family;
+
+    public StatisticsScanner(StatisticsTracker tracker, StatisticsTable stats, HRegionInfo region,
+            InternalScanner delegate, byte[] family) {
+        // should there be only one tracker?
+        this.tracker = tracker;
+        this.stats = stats;
+        this.delegate = delegate;
+        this.region = region;
+        this.family = family;
+        this.tracker.clear();
+    }
+
+    @Override
+    public boolean next(List<Cell> result) throws IOException {
+        boolean ret = delegate.next(result);
+        updateStat(result);
+        return ret;
+    }
+
+    @Override
+    public boolean next(List<Cell> result, int limit) throws IOException {
+        boolean ret = delegate.next(result, limit);
+        updateStat(result);
+        return ret;
+    }
+
+    /**
+     * Update the current statistics based on the lastest batch of key-values from the underlying scanner
+     * 
+     * @param results
+     *            next batch of {@link KeyValue}s
+     */
+    protected void updateStat(final List<Cell> results) {
+        for (Cell c : results) {
+            KeyValue kv = KeyValueUtil.ensureKeyValue(c);
+            if (c.getTypeByte() == KeyValue.Type.Put.getCode()) {
+                tracker.updateStatistic(kv);
+            }
+        }
+    }
+
+    public void close() throws IOException {
+        IOException toThrow = null;
+        try {
+            // update the statistics table
+            // Just verify if this if fine
+            String tableName = SchemaUtil.getTableNameFromFullName(region.getTable().getNameAsString());
+            ArrayList<Mutation> mutations = new ArrayList<Mutation>();
+            long currentTime = TimeKeeper.SYSTEM.getCurrentTime();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Deleting the stats for the region " + region.getRegionNameAsString()
+                        + " as part of major compaction");
+            }
+            stats.deleteStats(tableName, region.getRegionNameAsString(), this.tracker, Bytes.toString(family),
+                    mutations, currentTime);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Adding new stats for the region " + region.getRegionNameAsString()
+                        + " as part of major compaction");
+            }
+            stats.addStats(tableName, region.getRegionNameAsString(), this.tracker, Bytes.toString(family), mutations,
+                    currentTime);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Committing new stats for the region " + region.getRegionNameAsString()
+                        + " as part of major compaction");
+            }
+            stats.commitStats(mutations);
+        } catch (IOException e) {
+            LOG.error("Failed to update statistics table!", e);
+            toThrow = e;
+        }
+        // close the delegate scanner
+        try {
+            delegate.close();
+        } catch (IOException e) {
+            LOG.error("Error while closing the scanner");
+            // TODO : We should throw the exception
+            /*if (toThrow == null) { throw e; }
+            throw MultipleIOException.createIOException(Lists.newArrayList(toThrow, e));*/
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
new file mode 100644
index 0000000..fcbbee9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stat;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ * Wrapper to access the statistics table SYSTEM.STATS using the HTable.
+ */
+@SuppressWarnings("deprecation")
+public class StatisticsTable implements Closeable {
+    /** Map of the currently open statistics tables */
+    private static final Map<String, StatisticsTable> tableMap = new HashMap<String, StatisticsTable>();
+    /**
+     * @param env
+     *            Environment wherein the coprocessor is attempting to update the stats table.
+     * @param primaryTableName
+     *            name of the primary table on which we should collect stats
+     * @return the {@link StatisticsTable} for the given primary table.
+     * @throws IOException
+     *             if the table cannot be created due to an underlying HTable creation error
+     */
+    public synchronized static StatisticsTable getStatisticsTableForCoprocessor(CoprocessorEnvironment env,
+            byte[] primaryTableName) throws IOException {
+        StatisticsTable table = tableMap.get(primaryTableName);
+        if (table == null) {
+            // Map the statics table and the table with which the statistics is
+            // associated. This is a workaround
+            HTablePool pool = new HTablePool(env.getConfiguration(), 1);
+            HTableInterface hTable = pool.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
+            table = new StatisticsTable(hTable, primaryTableName);
+            tableMap.put(Bytes.toString(primaryTableName), table);
+        }
+        return table;
+    }
+
+    private final HTableInterface statisticsTable;
+    private final byte[] sourceTableName;
+
+    private StatisticsTable(HTableInterface statsTable, byte[] sourceTableName) {
+        this.statisticsTable = statsTable;
+        this.sourceTableName = sourceTableName;
+    }
+
+    public StatisticsTable(Configuration conf, HTableDescriptor source) throws IOException {
+        this(new HTable(conf, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME), source.getName());
+    }
+
+    /**
+     * Close the connection to the table
+     */
+    @Override
+    public void close() throws IOException {
+        statisticsTable.close();
+    }
+
+    /**
+     * Update a list of statistics for a given region.  If the ANALYZE <tablename> query is issued
+     * then we use Upsert queries to update the table
+     * If the region gets splitted or the major compaction happens we update using HTable.put()
+     * @param tablekey - The table name
+     * @param schemaName - the schema name associated with the table          
+     * @param region name -  the region of the table for which the stats are collected
+     * @param tracker - the statistics tracker
+     * @param fam -  the family for which the stats is getting collected.
+     * @param split - if the updation is caused due to a split
+     * @param mutations - list of mutations that collects all the mutations to commit in a batch
+     * @param currentTime -  the current time
+     * @throws IOException
+     *             if we fail to do any of the puts. Any single failure will prevent any future attempts for the remaining list of stats to
+     *             update
+     */
+    public void addStats(String tableName, String regionName, StatisticsTracker tracker, String fam,
+            List<Mutation> mutations, long currentTime) throws IOException {
+        if (tracker == null) { return; }
+
+        // Add the timestamp header
+        formLastUpdatedStatsMutation(tableName, currentTime, mutations);
+
+        byte[] prefix = StatisticsUtils.getRowKey(PDataType.VARCHAR.toBytes(tableName), PDataType.VARCHAR.toBytes(fam),
+                PDataType.VARCHAR.toBytes(regionName));
+        formStatsUpdateMutation(tracker, fam, mutations, currentTime, prefix);
+    }
+
+    public void commitStats(List<Mutation> mutations) throws IOException {
+        Object[] res = new Object[mutations.size()];
+        try {
+            statisticsTable.batch(mutations, res);
+        } catch (InterruptedException e) {
+            throw new IOException("Exception while adding deletes and puts");
+        }
+    }
+
+    private void formStatsUpdateMutation(StatisticsTracker tracker, String fam, List<Mutation> mutations,
+            long currentTime, byte[] prefix) {
+        Put put = new Put(prefix, currentTime);
+        if (tracker.getGuidePosts(fam) != null) {
+            put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES,
+                    currentTime, (tracker.getGuidePosts(fam)));
+        }
+        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_KEY_BYTES,
+                currentTime, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam)));
+        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES,
+                currentTime, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam)));
+        mutations.add(put);
+    }
+
+    private void formLastUpdatedStatsMutation(String tableName, long currentTime, List<Mutation> mutations) throws IOException {
+        byte[] prefix = StatisticsUtils.getRowKeyForTSUpdate(PDataType.VARCHAR.toBytes(tableName));
+        Put put = new Put(prefix);
+        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME_BYTES, currentTime,
+                PDataType.DATE.toBytes(new Date(currentTime)));
+        mutations.add(put);
+    }
+    
+    public void deleteStats(String tableName, String regionName, StatisticsTracker tracker, String fam,
+            List<Mutation> mutations, long currentTime)
+            throws IOException {
+        byte[] prefix = StatisticsUtils.getRowKey(PDataType.VARCHAR.toBytes(tableName), PDataType.VARCHAR.toBytes(fam),
+                PDataType.VARCHAR.toBytes(regionName));
+        mutations.add(new Delete(prefix, currentTime - 1));
+    }
+
+    /**
+     * @return the underlying {@link HTableInterface} to which this table is writing
+     */
+    HTableInterface getUnderlyingTable() {
+        return statisticsTable;
+    }
+
+    byte[] getSourceTableName() {
+        return this.sourceTableName;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java
new file mode 100644
index 0000000..e1754f3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTracker.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stat;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * Track a statistic for the column on a given region
+ */
+public interface StatisticsTracker {
+
+    /**
+     * Reset the statistic after the completion of the compaction
+     */
+    public void clear();
+
+    /**
+     * Update the current statistics with the next {@link KeyValue} to be written
+     * 
+     * @param kv
+     *            next {@link KeyValue} to be written.
+     */
+    public void updateStatistic(KeyValue kv);
+
+    /**
+     * Return the max key of the family
+     * @param fam
+     * @return
+     */
+    public byte[] getMaxKey(String fam);
+
+    /**
+     * Return the min key of the family
+     * 
+     * @param fam
+     * @return
+     */
+    public byte[] getMinKey(String fam);
+
+    /**
+     * Return the guide posts of the family
+     * 
+     * @param fam
+     * @return
+     */
+    public byte[] getGuidePosts(String fam);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java
new file mode 100644
index 0000000..7cb3a38
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stat;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+/**
+ * Simple utility class for managing multiple key parts of the statistic
+ */
+public class StatisticsUtils {
+
+    private StatisticsUtils() {
+        // private ctor for utility classes
+    }
+
+    /** Number of parts in our complex key */
+    protected static final int NUM_KEY_PARTS = 3;
+
+    public static byte[] getRowKey(byte[] table, byte[] fam, byte[] region) throws IOException {
+        // always starts with the source table
+        TrustedByteArrayOutputStream os = new TrustedByteArrayOutputStream(table.length + region.length
+                + fam.length + (NUM_KEY_PARTS - 1));
+        os.write(table);
+        os.write(QueryConstants.SEPARATOR_BYTE_ARRAY);
+        os.write(fam);
+        os.write(QueryConstants.SEPARATOR_BYTE_ARRAY);
+        os.write(region);
+        os.close();
+        return os.getBuffer();
+    }
+    
+    public static byte[] getRowKeyForTSUpdate(byte[] table) throws IOException {
+        // always starts with the source table
+        TrustedByteArrayOutputStream os = new TrustedByteArrayOutputStream(table.length);
+        os.write(table);
+        os.close();
+        return os.getBuffer();
+    }
+
+    public static byte[] getCFFromRowKey(byte[] table, byte[] row, int rowOffset, int rowLength) {
+        // Move over the the sepeartor byte that would be written after the table name
+        int startOff = Bytes.indexOf(row, table) + (table.length) + 1;
+        int endOff = startOff;
+        while (endOff < rowLength) {
+            // Check for next seperator byte
+            if (row[endOff] != QueryConstants.SEPARATOR_BYTE) {
+                endOff++;
+            } else {
+                break;
+            }
+        }
+        int cfLength = endOff - startOff;
+        byte[] cf = new byte[cfLength];
+        System.arraycopy(row, startOff, cf, 0, cfLength);
+        return cf;
+    }
+
+    public static byte[] copyRow(KeyValue kv) {
+        return Arrays.copyOfRange(kv.getRowArray(), kv.getRowOffset(), kv.getRowOffset() + kv.getRowLength());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index e17e9bf..37285f6 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.query;
 
 import static org.apache.phoenix.util.PhoenixRuntime.CURRENT_SCN_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
 import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
 import static org.apache.phoenix.util.TestUtil.ATABLE_NAME;
@@ -51,7 +50,6 @@ import static org.apache.phoenix.util.TestUtil.JOIN_ITEM_TABLE_FULL_NAME;
 import static org.apache.phoenix.util.TestUtil.JOIN_ORDER_TABLE_FULL_NAME;
 import static org.apache.phoenix.util.TestUtil.JOIN_SUPPLIER_TABLE_FULL_NAME;
 import static org.apache.phoenix.util.TestUtil.KEYONLY_NAME;
-import static org.apache.phoenix.util.TestUtil.LOCALHOST;
 import static org.apache.phoenix.util.TestUtil.MDTEST_NAME;
 import static org.apache.phoenix.util.TestUtil.MILLIS_IN_DAY;
 import static org.apache.phoenix.util.TestUtil.MULTI_CF_NAME;
@@ -125,7 +123,12 @@ import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.util.*;
+import org.apache.phoenix.util.ConfigUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
 import org.junit.Assert;
 
 import com.google.common.collect.ImmutableMap;
@@ -548,6 +551,7 @@ public abstract class BaseTest {
         conf.setInt("hbase.hlog.asyncer.number", 2);
         conf.setInt("hbase.assignment.zkevent.workers", 5);
         conf.setInt("hbase.assignment.threads.max", 5);
+        conf.setInt(QueryServices.HISTOGRAM_BYTE_DEPTH_CONF_KEY, 20);
         return conf;
     }
 
@@ -1225,13 +1229,12 @@ public abstract class BaseTest {
         try {
             HTableDescriptor[] tables = admin.listTables();
             for (HTableDescriptor table : tables) {
-                boolean isCatalogTable = (Bytes.compareTo(table.getName(), PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES) == 0);
-                boolean isSequenceTable = (Bytes.compareTo(table.getName(), PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES) == 0);
-                if (!isCatalogTable && !isSequenceTable) {
+                String schemaName = SchemaUtil.getSchemaNameFromFullName(table.getName());
+                if (!QueryConstants.SYSTEM_SCHEMA_NAME.equals(schemaName)) {
                     admin.disableTable(table.getName());
                     admin.deleteTable(table.getName());
                 }
-            }    
+            }
         } finally {
             admin.close();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index 47f5b1b..2e3c8f7 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -32,7 +32,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
  */
 public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
 
-    private static final int DEFAULT_THREAD_POOL_SIZE = 8;
+    private static final int DEFAULT_THREAD_POOL_SIZE = 20;
     private static final int DEFAULT_QUEUE_SIZE = 0;
     // TODO: setting this down to 5mb causes insufficient memory exceptions. Need to investigate why
     private static final int DEFAULT_MAX_MEMORY_PERC = 30; // 30% of heap
@@ -42,8 +42,6 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
     private static final int DEFAULT_MAX_TENANT_MEMORY_PERC = 100;
     private static final int DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS = 60000 * 60; // 1HR (to prevent age-out of hash cache during debugging)
     private static final long DEFAULT_MAX_HASH_CACHE_SIZE = 1024*1024*10;  // 10 Mb
-    private static final int DEFAULT_TARGET_QUERY_CONCURRENCY = 4;
-    private static final int DEFAULT_MAX_QUERY_CONCURRENCY = 8;
     private static final boolean DEFAULT_DROP_METADATA = false;
     
     private static final int DEFAULT_MASTER_INFO_PORT = -1;
@@ -69,8 +67,6 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl {
                 .setMaxMemoryWaitMs(DEFAULT_MAX_MEMORY_WAIT_MS)
                 .setMaxTenantMemoryPerc(DEFAULT_MAX_TENANT_MEMORY_PERC)
                 .setMaxServerCacheSize(DEFAULT_MAX_HASH_CACHE_SIZE)
-                .setTargetQueryConcurrency(DEFAULT_TARGET_QUERY_CONCURRENCY)
-                .setMaxQueryConcurrency(DEFAULT_MAX_QUERY_CONCURRENCY)
                 .setRowKeyOrderSaltedTable(true)
                 .setMaxServerCacheTTLMs(DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS)
                 .setMasterInfoPort(DEFAULT_MASTER_INFO_PORT)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-protocol/src/main/MetaDataService.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/MetaDataService.proto b/phoenix-protocol/src/main/MetaDataService.proto
index a766674..9174b4d 100644
--- a/phoenix-protocol/src/main/MetaDataService.proto
+++ b/phoenix-protocol/src/main/MetaDataService.proto
@@ -92,6 +92,16 @@ message GetVersionResponse {
   required int64 version = 1;
 }
 
+message ClearCacheForTableRequest {
+  required bytes tenantId = 1;
+  required bytes schemaName  = 2;
+  required bytes tableName = 3;
+  required int64 clientTimestamp = 4;
+}
+
+message ClearCacheForTableResponse {
+}
+
 service MetaDataService {
   rpc getTable(GetTableRequest)
     returns (MetaDataResponse);
@@ -115,5 +125,8 @@ service MetaDataService {
     returns (ClearCacheResponse);
     
    rpc getVersion(GetVersionRequest)
-    returns (GetVersionResponse);           
+    returns (GetVersionResponse);
+   
+   rpc clearCacheForTable(ClearCacheForTableRequest)
+    returns (ClearCacheForTableResponse);
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-protocol/src/main/PTable.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/PTable.proto b/phoenix-protocol/src/main/PTable.proto
index 20c63e1..3b5f5cf 100644
--- a/phoenix-protocol/src/main/PTable.proto
+++ b/phoenix-protocol/src/main/PTable.proto
@@ -72,6 +72,6 @@ message PTable {
   optional bytes viewStatement = 18;
   repeated bytes physicalNames = 19;
   optional bytes tenantId = 20;
-  optional int32 viewIndexId = 21; 
+  optional int32 viewIndexId = 21;
   optional bytes indexType = 22;
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/phoenix-protocol/src/main/StatisticsCollect.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/StatisticsCollect.proto b/phoenix-protocol/src/main/StatisticsCollect.proto
new file mode 100644
index 0000000..c80a756
--- /dev/null
+++ b/phoenix-protocol/src/main/StatisticsCollect.proto
@@ -0,0 +1,20 @@
+option java_package = "org.apache.phoenix.coprocessor.generated";
+option java_outer_classname = "StatCollectorProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+
+message StatCollectRequest {
+  optional bytes startRow = 1;
+  optional bytes stopRow = 2;
+}
+
+message StatCollectResponse {
+  required uint64 rowsScanned = 1;
+}
+
+service StatCollectService {
+  rpc collectStat(StatCollectRequest)
+    returns (StatCollectResponse);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cdc938e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4ffb4ba..f97473f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -115,7 +115,7 @@
     
     <!-- Plugin options -->
     <numForkedUT>3</numForkedUT>
-    <numForkedIT>7</numForkedIT>
+    <numForkedIT>5</numForkedIT>
     
     <!-- Set default encoding so multi-byte tests work correctly on the Mac -->
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>


Mime
View raw message