phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [25/51] [partial] Initial commit
Date Mon, 27 Jan 2014 19:23:26 GMT
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
new file mode 100644
index 0000000..5f74650
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.job.JobManager.JobCallable;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SQLCloseables;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
+
+
+/**
+ *
+ * Class that parallelizes the scan over a table using the ExecutorService provided.  Each region of the table will be scanned in parallel with
+ * the results accessible through {@link #getIterators()}
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ParallelIterators extends ExplainTable implements ResultIterators {
+	private static final Logger logger = LoggerFactory.getLogger(ParallelIterators.class);
+    private final List<KeyRange> splits;
+    private final ParallelIteratorFactory iteratorFactory;
+    
+    public static interface ParallelIteratorFactory {
+        PeekingResultIterator newIterator(ResultIterator scanner) throws SQLException;
+    }
+
+    private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min
+
+    static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
+        @Override
+        public KeyRange apply(HRegionLocation region) {
+            return KeyRange.getKeyRange(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey());
+        }
+    };
+
+    public ParallelIterators(StatementContext context, TableRef tableRef, FilterableStatement statement, RowProjector projector, GroupBy groupBy, Integer limit, ParallelIteratorFactory iteratorFactory) throws SQLException {
+        super(context, tableRef, groupBy);
+        this.splits = getSplits(context, tableRef, statement.getHint());
+        this.iteratorFactory = iteratorFactory;
+        Scan scan = context.getScan();
+        PTable table = tableRef.getTable();
+        if (projector.isProjectEmptyKeyValue()) {
+            Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap();
+            // If nothing projected into scan and we only have one column family, just allow everything
+            // to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to
+            // be quite a bit faster.
+            if (familyMap.isEmpty() && table.getColumnFamilies().size() == 1) {
+                // Project the one column family. We must project a column family since it's possible
+                // that there are other non declared column families that we need to ignore.
+                scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes());
+                ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter());
+            } else {
+                byte[] ecf = SchemaUtil.getEmptyColumnFamily(table.getColumnFamilies());
+                // Project empty key value unless the column family containing it has
+                // been projected in its entirety.
+                if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
+                    scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
+                }
+            }
+        }
+        if (limit != null) {
+            ScanUtil.andFilterAtEnd(scan, new PageFilter(limit));
+        }
+    }
+
+    /**
+     * Splits the given scan's key range so that each split can be queried in parallel
+     * @param hintNode TODO
+     *
+     * @return the key ranges that should be scanned in parallel
+     */
+    // exposed for tests
+    public static List<KeyRange> getSplits(StatementContext context, TableRef table, HintNode hintNode) throws SQLException {
+        return ParallelIteratorRegionSplitterFactory.getSplitter(context, table, hintNode).getSplits();
+    }
+
+    public List<KeyRange> getSplits() {
+        return splits;
+    }
+
+    /**
+     * Executes the scan in parallel across all regions, blocking until all scans are complete.
+     * @return the result iterators for the scan of each region
+     */
+    @Override
+    public List<PeekingResultIterator> getIterators() throws SQLException {
+        boolean success = false;
+        final ConnectionQueryServices services = context.getConnection().getQueryServices();
+        ReadOnlyProps props = services.getProps();
+        int numSplits = splits.size();
+        List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numSplits);
+        List<Pair<byte[],Future<PeekingResultIterator>>> futures = new ArrayList<Pair<byte[],Future<PeekingResultIterator>>>(numSplits);
+        final UUID scanId = UUID.randomUUID();
+        try {
+            ExecutorService executor = services.getExecutor();
+            for (KeyRange split : splits) {
+                final Scan splitScan = new Scan(this.context.getScan());
+                // Intersect with existing start/stop key if the table is salted
+                // If not salted, we've already intersected it. If salted, we need
+                // to wait until now to intersect, as we're running parallel scans
+                // on all the possible regions here.
+                if (tableRef.getTable().getBucketNum() != null) {
+                    KeyRange minMaxRange = context.getMinMaxRange();
+                    if (minMaxRange != null) {
+                        // Add salt byte based on current split, as minMaxRange won't have it
+                        minMaxRange = SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange);
+                        split = split.intersect(minMaxRange);
+                    }
+                }
+                if (ScanUtil.intersectScanRange(splitScan, split.getLowerRange(), split.getUpperRange(), this.context.getScanRanges().useSkipScanFilter())) {
+                    Future<PeekingResultIterator> future =
+                        executor.submit(new JobCallable<PeekingResultIterator>() {
+
+                        @Override
+                        public PeekingResultIterator call() throws Exception {
+                            // TODO: different HTableInterfaces for each thread or the same is better?
+                        	long startTime = System.currentTimeMillis();
+                            ResultIterator scanner = new TableResultIterator(context, tableRef, splitScan);
+                            if (logger.isDebugEnabled()) {
+                            	logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan);
+                            }
+                            return iteratorFactory.newIterator(scanner);
+                        }
+
+                        /**
+                         * Defines the grouping for round robin behavior.  All threads spawned to process
+                         * this scan will be grouped together and time sliced with other simultaneously
+                         * executing parallel scans.
+                         */
+                        @Override
+                        public Object getJobId() {
+                            return ParallelIterators.this;
+                        }
+                    });
+                    futures.add(new Pair<byte[],Future<PeekingResultIterator>>(split.getLowerRange(),future));
+                }
+            }
+
+            int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
+            // Sort futures by row key so that we have a predicatble order we're getting rows back for scans.
+            // We're going to wait here until they're finished anyway and this makes testing much easier.
+            Collections.sort(futures, new Comparator<Pair<byte[],Future<PeekingResultIterator>>>() {
+                @Override
+                public int compare(Pair<byte[], Future<PeekingResultIterator>> o1, Pair<byte[], Future<PeekingResultIterator>> o2) {
+                    return Bytes.compareTo(o1.getFirst(), o2.getFirst());
+                }
+            });
+            for (Pair<byte[],Future<PeekingResultIterator>> future : futures) {
+                iterators.add(future.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS));
+            }
+
+            success = true;
+            return iterators;
+        } catch (Exception e) {
+            throw ServerUtil.parseServerException(e);
+        } finally {
+            if (!success) {
+                SQLCloseables.closeAllQuietly(iterators);
+                // Don't call cancel, as it causes the HConnection to get into a funk
+//                for (Pair<byte[],Future<PeekingResultIterator>> future : futures) {
+//                    future.getSecond().cancel(true);
+//                }
+            }
+        }
+    }
+
+    @Override
+    public int size() {
+        return this.splits.size();
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+        StringBuilder buf = new StringBuilder();
+        buf.append("CLIENT PARALLEL " + size() + "-WAY ");
+        explain(buf.toString(),planSteps);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/PeekingResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/PeekingResultIterator.java b/src/main/java/org/apache/phoenix/iterate/PeekingResultIterator.java
new file mode 100644
index 0000000..994f343
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/PeekingResultIterator.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+
+
+/**
+ * 
+ * Interface for iterating through results returned from a scan, adding the
+ * ability to peek at the next result.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface PeekingResultIterator extends ResultIterator {
+    public static final PeekingResultIterator EMPTY_ITERATOR = new PeekingResultIterator() {
+
+        @Override
+        public Tuple next() throws SQLException {
+            return null;
+        }
+
+        @Override
+        public Tuple peek() {
+            return null;
+        }
+
+        @Override
+        public void close() throws SQLException {
+        }
+
+        @Override
+        public void explain(List<String> planSteps) {
+        }
+    };
+
+    /**
+     * Returns the next result without advancing the iterator
+     * @throws SQLException
+     */
+    public Tuple peek() throws SQLException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
new file mode 100644
index 0000000..4160174
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
+
+
+public class RegionScannerResultIterator extends BaseResultIterator {
+    private final RegionScanner scanner;
+    
+    public RegionScannerResultIterator(RegionScanner scanner) {
+        this.scanner = scanner;
+        MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
+    }
+    
+    @Override
+    public Tuple next() throws SQLException {
+        try {
+            // TODO: size
+            List<KeyValue> results = new ArrayList<KeyValue>();
+            // Results are potentially returned even when the return value of s.next is false
+            // since this is an indication of whether or not there are more values after the
+            // ones returned
+            boolean hasMore = scanner.nextRaw(results, null);
+            if (!hasMore && results.isEmpty()) {
+                return null;
+            }
+            // We instantiate a new tuple because in all cases currently we hang on to it (i.e.
+            // to compute and hold onto the TopN).
+            MultiKeyValueTuple tuple = new MultiKeyValueTuple();
+            tuple.setKeyValues(results);
+            return tuple;
+        } catch (IOException e) {
+            throw ServerUtil.parseServerException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/ResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/ResultIterator.java b/src/main/java/org/apache/phoenix/iterate/ResultIterator.java
new file mode 100644
index 0000000..43e4758
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/ResultIterator.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SQLCloseable;
+
+
+public interface ResultIterator extends SQLCloseable {
+    public static final ResultIterator EMPTY_ITERATOR = new ResultIterator() {
+        @Override
+        public void close() throws SQLException {
+        }
+
+        @Override
+        public Tuple next() throws SQLException {
+            return null;
+        }
+
+        @Override
+        public void explain(List<String> planSteps) {
+        }
+    };
+
+    /**
+     * Grab the next row's worth of values. The iterator will return a Tuple.
+     * @return Tuple object if there is another row, null if the scanner is
+     * exhausted.
+     * @throws SQLException e
+     */
+    public Tuple next() throws SQLException;
+    
+    public void explain(List<String> planSteps);
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/ResultIterators.java b/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
new file mode 100644
index 0000000..0ee9562
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/ResultIterators.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+public interface ResultIterators {
+    public List<PeekingResultIterator> getIterators() throws SQLException;
+    public int size();
+    public void explain(List<String> planSteps);
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
new file mode 100644
index 0000000..0c98121
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
+
+
+public class ScanningResultIterator implements ResultIterator {
+    private final ResultScanner scanner;
+    
+    public ScanningResultIterator(ResultScanner scanner) {
+        this.scanner = scanner;
+    }
+    
+    @Override
+    public void close() throws SQLException {
+        scanner.close();
+    }
+
+    @Override
+    public Tuple next() throws SQLException {
+        try {
+            Result result = scanner.next();
+            // TODO: use ResultTuple.setResult(result)
+            // Need to create a new one if holding on to it (i.e. OrderedResultIterator)
+            return result == null ? null : new ResultTuple(result);
+        } catch (IOException e) {
+            throw ServerUtil.parseServerException(e);
+        }
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java b/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java
new file mode 100644
index 0000000..7324b00
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.TableRef;
+
+
+/**
+ * Split the region according to the information contained in the scan's SkipScanFilter.
+ */
+public class SkipRangeParallelIteratorRegionSplitter extends DefaultParallelIteratorRegionSplitter {
+
+    public static SkipRangeParallelIteratorRegionSplitter getInstance(StatementContext context, TableRef table, HintNode hintNode) {
+        return new SkipRangeParallelIteratorRegionSplitter(context, table, hintNode);
+    }
+
+    protected SkipRangeParallelIteratorRegionSplitter(StatementContext context, TableRef table, HintNode hintNode) {
+        super(context, table, hintNode);
+    }
+
+    @Override
+    protected List<HRegionLocation> getAllRegions() throws SQLException {
+        List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices().getAllTableRegions(tableRef.getTable().getName().getBytes());
+        return filterRegions(allTableRegions, context.getScanRanges());
+    }
+
+    public List<HRegionLocation> filterRegions(List<HRegionLocation> allTableRegions, final ScanRanges ranges) {
+        Iterable<HRegionLocation> regions;
+        if (ranges == ScanRanges.EVERYTHING) {
+            return allTableRegions;
+        } else if (ranges == ScanRanges.NOTHING) { // TODO: why not emptyList?
+            return Lists.<HRegionLocation>newArrayList();
+        } else {
+            regions = Iterables.filter(allTableRegions,
+                    new Predicate<HRegionLocation>() {
+                    @Override
+                    public boolean apply(HRegionLocation region) {
+                        KeyRange minMaxRange = context.getMinMaxRange();
+                        if (minMaxRange != null) {
+                            KeyRange range = KeyRange.getKeyRange(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey());
+                            if (tableRef.getTable().getBucketNum() != null) {
+                                // Add salt byte, as minMaxRange won't have it
+                                minMaxRange = SaltingUtil.addSaltByte(region.getRegionInfo().getStartKey(), minMaxRange);
+                            }
+                            range = range.intersect(minMaxRange);
+                            return ranges.intersect(range.getLowerRange(), range.getUpperRange());
+                        }
+                        return ranges.intersect(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey());
+                    }
+            });
+        }
+        return Lists.newArrayList(regions);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/SpoolTooBigToDiskException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/SpoolTooBigToDiskException.java b/src/main/java/org/apache/phoenix/iterate/SpoolTooBigToDiskException.java
new file mode 100644
index 0000000..4dffee0
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/SpoolTooBigToDiskException.java
@@ -0,0 +1,17 @@
+package org.apache.phoenix.iterate;
+
+/**
+ * Thrown by {@link org.apache.phoenix.iterate.SpoolingResultIterator } when
+ * result is too big to fit into memory and too big to spool to disk.
+ * 
+ * @author haitaoyao
+ * 
+ */
+public class SpoolTooBigToDiskException extends RuntimeException {
+
+	private static final long serialVersionUID = 1L;
+
+	public SpoolTooBigToDiskException(String msg) {
+		super(msg);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
new file mode 100644
index 0000000..5acf71c
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -0,0 +1,320 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.io.*;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.io.output.DeferredFileOutputStream;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.memory.MemoryManager;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.*;
+
+
+
+/**
+ * 
+ * Result iterator that spools the results of a scan to disk once an in-memory threshold has been reached.
+ * If the in-memory threshold is not reached, the results are held in memory with no disk writing perfomed.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class SpoolingResultIterator implements PeekingResultIterator {
+    private final PeekingResultIterator spoolFrom;
+    
+    public static class SpoolingResultIteratorFactory implements ParallelIteratorFactory {
+        private final QueryServices services;
+        
+        public SpoolingResultIteratorFactory(QueryServices services) {
+            this.services = services;
+        }
+        @Override
+        public PeekingResultIterator newIterator(ResultIterator scanner) throws SQLException {
+            return new SpoolingResultIterator(scanner, services);
+        }
+        
+    }
+
+    public SpoolingResultIterator(ResultIterator scanner, QueryServices services) throws SQLException {
+        this (scanner, services.getMemoryManager(), 
+        		services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES),
+        		services.getProps().getLong(QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SPOOL_TO_DISK_BYTES));
+    }
+    
+    /**
+    * Create a result iterator by iterating through the results of a scan, spooling them to disk once
+    * a threshold has been reached. The scanner passed in is closed prior to returning.
+    * @param scanner the results of a table scan
+    * @param mm memory manager tracking memory usage across threads.
+    * @param thresholdBytes the requested threshold.  Will be dialed down if memory usage (as determined by
+    *  the memory manager) is exceeded.
+    * @throws SQLException
+    */
+    SpoolingResultIterator(ResultIterator scanner, MemoryManager mm, final int thresholdBytes, final long maxSpoolToDisk) throws SQLException {
+        boolean success = false;
+        boolean usedOnDiskIterator = false;
+        final MemoryChunk chunk = mm.allocate(0, thresholdBytes);
+        File tempFile = null;
+        try {
+            // Can't be bigger than int, since it's the max of the above allocation
+            int size = (int)chunk.getSize();
+            tempFile = File.createTempFile("ResultSpooler",".bin");
+            DeferredFileOutputStream spoolTo = new DeferredFileOutputStream(size, tempFile) {
+                @Override
+                protected void thresholdReached() throws IOException {
+                    super.thresholdReached();
+                    chunk.close();
+                }
+            };
+            DataOutputStream out = new DataOutputStream(spoolTo);
+            final long maxBytesAllowed = maxSpoolToDisk == -1 ? 
+            		Long.MAX_VALUE : thresholdBytes + maxSpoolToDisk;
+            long bytesWritten = 0L;
+            int maxSize = 0;
+            for (Tuple result = scanner.next(); result != null; result = scanner.next()) {
+                int length = TupleUtil.write(result, out);
+                bytesWritten += length;
+                if(bytesWritten > maxBytesAllowed){
+                		throw new SpoolTooBigToDiskException("result too big, max allowed(bytes): " + maxBytesAllowed);
+                }
+                maxSize = Math.max(length, maxSize);
+            }
+            spoolTo.close();
+            if (spoolTo.isInMemory()) {
+                byte[] data = spoolTo.getData();
+                chunk.resize(data.length);
+                spoolFrom = new InMemoryResultIterator(data, chunk);
+            } else {
+                spoolFrom = new OnDiskResultIterator(maxSize, spoolTo.getFile());
+                usedOnDiskIterator = true;
+            }
+            success = true;
+        } catch (IOException e) {
+            throw ServerUtil.parseServerException(e);
+        } finally {
+            try {
+                scanner.close();
+            } finally {
+                try {
+                    if (!usedOnDiskIterator) {
+                        tempFile.delete();
+                    }
+                } finally {
+                    if (!success) {
+                        chunk.close();
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public Tuple peek() throws SQLException {
+        return spoolFrom.peek();
+    }
+
+    @Override
+    public Tuple next() throws SQLException {
+        return spoolFrom.next();
+    }
+    
+    @Override
+    public void close() throws SQLException {
+        spoolFrom.close();
+    }
+
+    /**
+     * 
+     * Backing result iterator if it was not necessary to spool results to disk.
+     *
+     * @author jtaylor
+     * @since 0.1
+     */
+    private static class InMemoryResultIterator implements PeekingResultIterator {
+        private final MemoryChunk memoryChunk;
+        private final byte[] bytes;
+        private Tuple next;
+        private int offset;
+        
+        private InMemoryResultIterator(byte[] bytes, MemoryChunk memoryChunk) throws SQLException {
+            this.bytes = bytes;
+            this.memoryChunk = memoryChunk;
+            advance();
+        }
+
+        private Tuple advance() throws SQLException {
+            if (offset >= bytes.length) {
+                return next = null;
+            }
+            int resultSize = ByteUtil.vintFromBytes(bytes, offset);
+            offset += WritableUtils.getVIntSize(resultSize);
+            ImmutableBytesWritable value = new ImmutableBytesWritable(bytes,offset,resultSize);
+            offset += resultSize;
+            Tuple result = new ResultTuple(new Result(value));
+            return next = result;
+        }
+        
+        @Override
+        public Tuple peek() throws SQLException {
+            return next;
+        }
+
+        @Override
+        public Tuple next() throws SQLException {
+            Tuple current = next;
+            advance();
+            return current;
+        }
+        
+        @Override
+        public void close() {
+            memoryChunk.close();
+        }
+
+        @Override
+        public void explain(List<String> planSteps) {
+        }
+    }
+    
+    /**
+     * 
+     * Backing result iterator if results were spooled to disk
+     *
+     * @author jtaylor
+     * @since 0.1
+     */
+    private static class OnDiskResultIterator implements PeekingResultIterator {
+        private final File file;
+        private DataInputStream spoolFrom;
+        private Tuple next;
+        private int maxSize;
+        private int bufferIndex;
+        private byte[][] buffers = new byte[2][];
+        private boolean isClosed;
+        
+        private OnDiskResultIterator (int maxSize, File file) {
+            this.file = file;
+            this.maxSize = maxSize;
+        }
+        
+        private synchronized void init() throws IOException {
+            if (spoolFrom == null) {
+                spoolFrom = new DataInputStream(new BufferedInputStream(new FileInputStream(file)));
+                // We need two so that we can have a current and a next without them stomping on each other
+                buffers[0] = new byte[maxSize];
+                buffers[1] = new byte[maxSize];
+                advance();
+            }
+        }
+    
+        private synchronized void reachedEnd() throws IOException {
+            next = null;
+            isClosed = true;
+            try {
+                if (spoolFrom != null) {
+                    spoolFrom.close();
+                }
+            } finally {
+                file.delete();
+            }
+        }
+        
+        private synchronized Tuple advance() throws IOException {
+            if (isClosed) {
+                return next;
+            }
+            int length;
+            try {
+                length = WritableUtils.readVInt(spoolFrom);
+            } catch (EOFException e) {
+                reachedEnd();
+                return next;
+            }
+            int totalBytesRead = 0;
+            int offset = 0;
+            // Alternate between buffers so that the current one is not affected by advancing
+            bufferIndex = (bufferIndex + 1) % 2;
+            byte[] buffer = buffers [bufferIndex];
+            while(totalBytesRead < length) {
+                int bytesRead = spoolFrom.read(buffer, offset, length);
+                if (bytesRead == -1) {
+                    reachedEnd();
+                    return next;
+                }
+                offset += bytesRead;
+                totalBytesRead += bytesRead;
+            }
+            next = new ResultTuple(new Result(new ImmutableBytesWritable(buffer,0,length)));
+            return next;
+        }
+        
+        @Override
+        public synchronized Tuple peek() throws SQLException {
+            try {
+                init();
+                return next;
+            } catch (IOException e) {
+                throw ServerUtil.parseServerException(e);
+            }
+        }
+    
+        @Override
+        public synchronized Tuple next() throws SQLException {
+            try {
+                init();
+                Tuple current = next;
+                advance();
+                return current;
+            } catch (IOException e) {
+                throw ServerUtil.parseServerException(e);
+            }
+        }
+        
+        @Override
+        public synchronized void close() throws SQLException {
+            try {
+                if (!isClosed) {
+                    reachedEnd();
+                }
+            } catch (IOException e) {
+                throw ServerUtil.parseServerException(e);
+            }
+        }
+
+        @Override
+        public void explain(List<String> planSteps) {
+        }
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
new file mode 100644
index 0000000..16ffce4
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Scan;
+
+import com.google.common.io.Closeables;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
+
+
+/**
+ *
+ * Wrapper for ResultScanner creation that closes HTableInterface
+ * when ResultScanner is closed.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class TableResultIterator extends ExplainTable implements ResultIterator {
+    private final HTableInterface htable;
+    private final ResultIterator delegate;
+
+    public TableResultIterator(StatementContext context, TableRef tableRef) throws SQLException {
+        this(context, tableRef, context.getScan());
+    }
+
+    public TableResultIterator(StatementContext context, TableRef tableRef, Scan scan) throws SQLException {
+        super(context, tableRef);
+        htable = context.getConnection().getQueryServices().getTable(tableRef.getTable().getName().getBytes());
+        try {
+            delegate = new ScanningResultIterator(htable.getScanner(scan));
+        } catch (IOException e) {
+            Closeables.closeQuietly(htable);
+            throw ServerUtil.parseServerException(e);
+        }
+    }
+
+    @Override
+    public void close() throws SQLException {
+        try {
+            delegate.close();
+        } finally {
+            try {
+                htable.close();
+            } catch (IOException e) {
+                throw ServerUtil.parseServerException(e);
+            }
+        }
+    }
+
+    @Override
+    public Tuple next() throws SQLException {
+        return delegate.next();
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+        StringBuilder buf = new StringBuilder();
+        explain(buf.toString(),planSteps);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java b/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
new file mode 100644
index 0000000..347e8eb
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.*;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+
+
+public class UngroupedAggregatingResultIterator extends GroupedAggregatingResultIterator {
+    private boolean hasRows = false;
+
+    public UngroupedAggregatingResultIterator( PeekingResultIterator resultIterator, Aggregators aggregators) {
+        super(resultIterator, aggregators);
+    }
+    
+    @Override
+    public Tuple next() throws SQLException {
+        Tuple result = super.next();
+        // Ensure ungrouped aggregregation always returns a row, even if the underlying iterator doesn't.
+        if (result == null && !hasRows) {
+            // Generate value using unused ClientAggregators
+            byte[] value = aggregators.toBytes(aggregators.getAggregators());
+            result = new SingleKeyValueTuple(
+                    KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, 
+                            SINGLE_COLUMN_FAMILY, 
+                            SINGLE_COLUMN, 
+                            AGG_TIMESTAMP, 
+                            value));
+        }
+        hasRows = true;
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/jdbc/Jdbc7Shim.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/jdbc/Jdbc7Shim.java b/src/main/java/org/apache/phoenix/jdbc/Jdbc7Shim.java
new file mode 100644
index 0000000..7a73a4e
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/jdbc/Jdbc7Shim.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.concurrent.Executor;
+import java.util.logging.Logger;
+
+/**
+ * Interfaces to be implemented by classes that need to be "JDK7" compliant,
+ * but also run in JDK6
+ */
+public final class Jdbc7Shim {
+
+    public interface Statement {  // Note: do not extend "regular" statement or else eclipse 3.7 complains
+        void closeOnCompletion() throws SQLException;
+        boolean isCloseOnCompletion() throws SQLException;
+    }
+
+    public interface CallableStatement extends Statement {
+        public <T> T getObject(int columnIndex, Class<T> type) throws SQLException;
+        public <T> T getObject(String columnLabel, Class<T> type) throws SQLException;
+    }
+
+    public interface Connection {
+         void setSchema(String schema) throws SQLException;
+         String getSchema() throws SQLException;
+         void abort(Executor executor) throws SQLException;
+         void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException;
+         int getNetworkTimeout() throws SQLException;
+    }
+
+    public interface ResultSet {
+         public <T> T getObject(int columnIndex, Class<T> type) throws SQLException;
+         public <T> T getObject(String columnLabel, Class<T> type) throws SQLException;
+    }
+
+    public interface DatabaseMetaData {
+        java.sql.ResultSet getPseudoColumns(String catalog, String schemaPattern,
+                             String tableNamePattern, String columnNamePattern)
+            throws SQLException;
+        boolean  generatedKeyAlwaysReturned() throws SQLException;
+    }
+
+    public interface Driver {
+        public Logger getParentLogger() throws SQLFeatureNotSupportedException;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
new file mode 100644
index 0000000..204acba
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -0,0 +1,639 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.Reader;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.text.Format;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixStatement.PhoenixStatementParser;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.DelegateConnectionQueryServices;
+import org.apache.phoenix.query.MetaDataMutated;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PMetaData;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.DateUtil;
+import org.apache.phoenix.util.JDBCUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SQLCloseable;
+import org.apache.phoenix.util.SQLCloseables;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * 
+ * JDBC Connection implementation of Phoenix.
+ * Currently the following are supported:
+ * - Statement
+ * - PreparedStatement
+ * The connection may only be used with the following options:
+ * - ResultSet.TYPE_FORWARD_ONLY
+ * - Connection.TRANSACTION_READ_COMMITTED
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jdbc7Shim.Connection, MetaDataMutated  {
+    private final String url;
+    private final ConnectionQueryServices services;
+    private final Properties info;
+    private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
+    private final Format[] formatters = new Format[PDataType.values().length];
+    private final MutationState mutationState;
+    private final int mutateBatchSize;
+    private final Long scn;
+    private boolean isAutoCommit = false;
+    private PMetaData metaData;
+    private final byte[] tenantId;
+    private final String datePattern;
+    
+    private boolean isClosed = false;
+    
+    public PhoenixConnection(PhoenixConnection connection) throws SQLException {
+        this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getPMetaData());
+        this.isAutoCommit = connection.isAutoCommit;
+    }
+    
+    @SuppressWarnings("unchecked")
+    public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException {
+        this.url = url;
+        // Copy so client cannot change
+        this.info = info == null ? new Properties() : new Properties(info);
+        if (this.info.isEmpty()) {
+            this.services = services;
+        } else {
+            Map<String, String> existingProps = services.getProps().asMap();
+            Map<String, String> tmpAugmentedProps = Maps.newHashMapWithExpectedSize(existingProps.size() + info.size());
+            tmpAugmentedProps.putAll(existingProps);
+            tmpAugmentedProps.putAll((Map)this.info);
+            final ReadOnlyProps augmentedProps = new ReadOnlyProps(tmpAugmentedProps);
+            this.services = new DelegateConnectionQueryServices(services) {
+    
+                @Override
+                public ReadOnlyProps getProps() {
+                    return augmentedProps;
+                }
+            };
+        }
+        this.scn = JDBCUtil.getCurrentSCN(url, this.info);
+        this.tenantId = JDBCUtil.getTenantId(url, this.info);
+        this.mutateBatchSize = JDBCUtil.getMutateBatchSize(url, this.info, services.getProps());
+        datePattern = services.getProps().get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT);
+        int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+        Format dateTimeFormat = DateUtil.getDateFormatter(datePattern);
+        formatters[PDataType.DATE.ordinal()] = dateTimeFormat;
+        formatters[PDataType.TIME.ordinal()] = dateTimeFormat;
+        this.metaData = metaData;
+        this.mutationState = new MutationState(maxSize, this);
+    }
+
+    public int executeStatements(Reader reader, List<Object> binds, PrintStream out) throws IOException, SQLException {
+        int bindsOffset = 0;
+        int nStatements = 0;
+        PhoenixStatementParser parser = new PhoenixStatementParser(reader);
+        try {
+            while (true) {
+                PhoenixPreparedStatement stmt = new PhoenixPreparedStatement(this, parser);
+                ParameterMetaData paramMetaData = stmt.getParameterMetaData();
+                for (int i = 0; i < paramMetaData.getParameterCount(); i++) {
+                    stmt.setObject(i+1, binds.get(bindsOffset+i));
+                }
+                long start = System.currentTimeMillis();
+                boolean isQuery = stmt.execute();
+                if (isQuery) {
+                    ResultSet rs = stmt.getResultSet();
+                    if (!rs.next()) {
+                        if (out != null) {
+                            out.println("no rows selected");
+                        }
+                    } else {
+                        int columnCount = 0;
+                        if (out != null) {
+                            ResultSetMetaData md = rs.getMetaData();
+                            columnCount = md.getColumnCount();
+                            for (int i = 1; i <= columnCount; i++) {
+                                int displayWidth = md.getColumnDisplaySize(i);
+                                String label = md.getColumnLabel(i);
+                                if (md.isSigned(i)) {
+                                    out.print(displayWidth < label.length() ? label.substring(0,displayWidth) : Strings.padStart(label, displayWidth, ' '));
+                                    out.print(' ');
+                                } else {
+                                    out.print(displayWidth < label.length() ? label.substring(0,displayWidth) : Strings.padEnd(md.getColumnLabel(i), displayWidth, ' '));
+                                    out.print(' ');
+                                }
+                            }
+                            out.println();
+                            for (int i = 1; i <= columnCount; i++) {
+                                int displayWidth = md.getColumnDisplaySize(i);
+                                out.print(Strings.padStart("", displayWidth,'-'));
+                                out.print(' ');
+                            }
+                            out.println();
+                        }
+                        do {
+                            if (out != null) {
+                                ResultSetMetaData md = rs.getMetaData();
+                                for (int i = 1; i <= columnCount; i++) {
+                                    int displayWidth = md.getColumnDisplaySize(i);
+                                    String value = rs.getString(i);
+                                    String valueString = value == null ? QueryConstants.NULL_DISPLAY_TEXT : value;
+                                    if (md.isSigned(i)) {
+                                        out.print(Strings.padStart(valueString, displayWidth, ' '));
+                                    } else {
+                                        out.print(Strings.padEnd(valueString, displayWidth, ' '));
+                                    }
+                                    out.print(' ');
+                                }
+                                out.println();
+                            }
+                        } while (rs.next());
+                    }
+                } else if (out != null){
+                    int updateCount = stmt.getUpdateCount();
+                    if (updateCount >= 0) {
+                        out.println((updateCount == 0 ? "no" : updateCount) + (updateCount == 1 ? " row " : " rows ") + stmt.getUpdateOperation().toString());
+                    }
+                }
+                bindsOffset += paramMetaData.getParameterCount();
+                double elapsedDuration = ((System.currentTimeMillis() - start) / 1000.0);
+                out.println("Time: " + elapsedDuration + " sec(s)\n");
+                nStatements++;
+            }
+        } catch (EOFException e) {
+        }
+        return nStatements;
+    }
+
+    public byte[] getTenantId() {
+        return tenantId;
+    }
+    
+    public Long getSCN() {
+        return scn;
+    }
+    
+    public int getMutateBatchSize() {
+        return mutateBatchSize;
+    }
+    
+    public PMetaData getPMetaData() {
+        return metaData;
+    }
+
+    public MutationState getMutationState() {
+        return mutationState;
+    }
+    
+    public String getDatePattern() {
+        return datePattern;
+    }
+    
+    public Format getFormatter(PDataType type) {
+        return formatters[type.ordinal()];
+    }
+    
+    public String getURL() {
+        return url;
+    }
+    
+    public ConnectionQueryServices getQueryServices() {
+        return services;
+    }
+    
+    @Override
+    public void clearWarnings() throws SQLException {
+    }
+
+    private void closeStatements() throws SQLException {
+        List<SQLCloseable> statements = this.statements;
+        // create new list to prevent close of statements
+        // from modifying this list.
+        this.statements = Lists.newArrayList();
+        try {
+            mutationState.rollback(this);
+        } finally {
+            try {
+                SQLCloseables.closeAll(statements);
+            } finally {
+                statements.clear();
+            }
+        }
+    }
+    
+    protected boolean removeStatement(SQLCloseable statement) throws SQLException {
+         return statements.remove(statement);
+    }
+    
+    @Override
+    public void close() throws SQLException {
+        if (isClosed) {
+            return;
+        }
+        try {
+            closeStatements();
+        } finally {
+            isClosed = true;
+        }
+    }
+
+    @Override
+    public void commit() throws SQLException {
+        mutationState.commit();
+    }
+
+    @Override
+    public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Blob createBlob() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Clob createClob() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public NClob createNClob() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public SQLXML createSQLXML() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Statement createStatement() throws SQLException {
+        PhoenixStatement statement = new PhoenixStatement(this);
+        statements.add(statement);
+        return statement;
+    }
+
+    /**
+     * Back-door way to inject processing into walking through a result set
+     * @param statementFactory
+     * @return PhoenixStatement
+     * @throws SQLException
+     */
+    public PhoenixStatement createStatement(PhoenixStatementFactory statementFactory) throws SQLException {
+        PhoenixStatement statement = statementFactory.newStatement(this);
+        statements.add(statement);
+        return statement;
+    }
+
+    @Override
+    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+        if (resultSetType != ResultSet.TYPE_FORWARD_ONLY || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) {
+            throw new SQLFeatureNotSupportedException();
+        }
+        return createStatement();
+    }
+
+    @Override
+    public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
+            throws SQLException {
+        if (resultSetHoldability != ResultSet.CLOSE_CURSORS_AT_COMMIT) {
+            throw new SQLFeatureNotSupportedException();
+        }
+        return createStatement(resultSetType, resultSetConcurrency);
+    }
+
+    @Override
+    public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public boolean getAutoCommit() throws SQLException {
+        return isAutoCommit;
+    }
+
+    @Override
+    public String getCatalog() throws SQLException {
+        return "";
+    }
+
+    @Override
+    public Properties getClientInfo() throws SQLException { 
+        // Defensive copy so client cannot change
+        return new Properties(info);
+    }
+
+    @Override
+    public String getClientInfo(String name) {
+        return info.getProperty(name);
+    }
+
+    @Override
+    public int getHoldability() throws SQLException {
+        return ResultSet.CLOSE_CURSORS_AT_COMMIT;
+    }
+
+    @Override
+    public DatabaseMetaData getMetaData() throws SQLException {
+        return new PhoenixDatabaseMetaData(this);
+    }
+
+    @Override
+    public int getTransactionIsolation() throws SQLException {
+        return Connection.TRANSACTION_READ_COMMITTED;
+    }
+
+    @Override
+    public Map<String, Class<?>> getTypeMap() throws SQLException {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public SQLWarning getWarnings() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean isClosed() throws SQLException {
+        return isClosed;
+    }
+
+    @Override
+    public boolean isReadOnly() throws SQLException {
+        return true;
+    }
+
+    @Override
+    public boolean isValid(int timeout) throws SQLException {
+        // TODO: run query here or ping
+        return !isClosed;
+    }
+
+    @Override
+    public String nativeSQL(String sql) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public CallableStatement prepareCall(String sql) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
+            int resultSetHoldability) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql) throws SQLException {
+        PhoenixPreparedStatement statement = new PhoenixPreparedStatement(this, sql);
+        statements.add(statement);
+        return statement;
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
+            throws SQLException {
+        if (resultSetType != ResultSet.TYPE_FORWARD_ONLY || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) {
+            throw new SQLFeatureNotSupportedException();
+        }
+        return prepareStatement(sql);
+    }
+
+    @Override
+    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
+            int resultSetHoldability) throws SQLException {
+        if (resultSetHoldability != ResultSet.CLOSE_CURSORS_AT_COMMIT) {
+            throw new SQLFeatureNotSupportedException();
+        }
+        return prepareStatement(sql, resultSetType, resultSetConcurrency);
+    }
+
+    @Override
+    public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void rollback() throws SQLException {
+        mutationState.rollback(this);
+    }
+
+    @Override
+    public void rollback(Savepoint savepoint) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void setAutoCommit(boolean isAutoCommit) throws SQLException {
+        this.isAutoCommit = isAutoCommit;
+    }
+
+    @Override
+    public void setCatalog(String catalog) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void setClientInfo(Properties properties) throws SQLClientInfoException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setClientInfo(String name, String value) throws SQLClientInfoException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setHoldability(int holdability) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void setReadOnly(boolean readOnly) throws SQLException {
+        if (readOnly) {
+            throw new SQLFeatureNotSupportedException();
+        }
+    }
+
+    @Override
+    public Savepoint setSavepoint() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Savepoint setSavepoint(String name) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void setTransactionIsolation(int level) throws SQLException {
+        if (level != Connection.TRANSACTION_READ_COMMITTED) {
+            throw new SQLFeatureNotSupportedException();
+        }
+    }
+
+    @Override
+    public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        return iface.isInstance(this);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> T unwrap(Class<T> iface) throws SQLException {
+        if (!iface.isInstance(this)) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.CLASS_NOT_UNWRAPPABLE)
+                .setMessage(this.getClass().getName() + " not unwrappable from " + iface.getName())
+                .build().buildException();
+        }
+        return (T)this;
+    }
+
+    @Override
+    public void setSchema(String schema) throws SQLException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public String getSchema() throws SQLException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void abort(Executor executor) throws SQLException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public int getNetworkTimeout() throws SQLException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public PMetaData addTable(PTable table) throws SQLException {
+        // TODO: since a connection is only used by one thread at a time,
+        // we could modify this metadata in place since it's not shared.
+        if (scn == null || scn > table.getTimeStamp()) {
+            metaData = metaData.addTable(table);
+        }
+        //Cascade through to connectionQueryServices too
+        getQueryServices().addTable(table);
+        return metaData;
+    }
+
+    @Override
+    public PMetaData addColumn(String tableName, List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows)
+            throws SQLException {
+        metaData = metaData.addColumn(tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows);
+        //Cascade through to connectionQueryServices too
+        getQueryServices().addColumn(tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows);
+        return metaData;
+    }
+
+    @Override
+    public PMetaData removeTable(String tableName) throws SQLException {
+        metaData = metaData.removeTable(tableName);
+        //Cascade through to connectionQueryServices too
+        getQueryServices().removeTable(tableName);
+        return metaData;
+    }
+
+    @Override
+    public PMetaData removeColumn(String tableName, String familyName, String columnName, long tableTimeStamp,
+            long tableSeqNum) throws SQLException {
+        metaData = metaData.removeColumn(tableName, familyName, columnName, tableTimeStamp, tableSeqNum);
+        //Cascade through to connectionQueryServices too
+        getQueryServices().removeColumn(tableName, familyName, columnName, tableTimeStamp, tableSeqNum);
+        return metaData;
+    }
+}


Mime
View raw message