phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maryann...@apache.org
Subject [1/2] phoenix git commit: PHOENIX-2480 SQL Query with multiple projection selections over multiple tables having LEFT OUTER JOINS returns completely null for random columns even when data is present
Date Thu, 03 Dec 2015 02:20:02 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.0 76df6f2f0 -> a57553c1a


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a57553c1/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
new file mode 100644
index 0000000..d9016d0
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class SortMergeJoinMoreIT extends BaseHBaseManagedTimeIT {
+    
+    @BeforeClass
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Forces server cache to be used
+        props.put(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(2));
+        // Must update config before starting server
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+    
+    @Test
+    public void testJoinOverSaltedTables() throws Exception {
+        String tempTableNoSalting = "TEMP_TABLE_NO_SALTING";
+        String tempTableWithSalting = "TEMP_TABLE_WITH_SALTING";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            conn.createStatement().execute("CREATE TABLE " + tempTableNoSalting 
+                    + "   (mypk INTEGER NOT NULL PRIMARY KEY, " 
+                    + "    col1 INTEGER)");
+            conn.createStatement().execute("CREATE TABLE " + tempTableWithSalting 
+                    + "   (mypk INTEGER NOT NULL PRIMARY KEY, " 
+                    + "    col1 INTEGER) SALT_BUCKETS=4");
+            
+            PreparedStatement upsertStmt = conn.prepareStatement(
+                    "upsert into " + tempTableNoSalting + "(mypk, col1) " + "values (?, ?)");
+            for (int i = 0; i < 3; i++) {
+                upsertStmt.setInt(1, i + 1);
+                upsertStmt.setInt(2, 3 - i);
+                upsertStmt.execute();
+            }
+            conn.commit();
+            
+            upsertStmt = conn.prepareStatement(
+                    "upsert into " + tempTableWithSalting + "(mypk, col1) " + "values (?,
?)");
+            for (int i = 0; i < 6; i++) {
+                upsertStmt.setInt(1, i + 1);
+                upsertStmt.setInt(2, 3 - (i % 3));
+                upsertStmt.execute();
+            }
+            conn.commit();
+            
+            // LHS=unsalted JOIN RHS=salted
+            String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ lhs.mypk, lhs.col1, rhs.mypk,
rhs.col1 FROM " 
+                    + tempTableNoSalting + " lhs JOIN "
+                    + tempTableWithSalting + " rhs ON rhs.mypk = lhs.col1 ORDER BY lhs.mypk";
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 1);
+            assertEquals(rs.getInt(2), 3);
+            assertEquals(rs.getInt(3), 3);
+            assertEquals(rs.getInt(4), 1);
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 2);
+            assertEquals(rs.getInt(2), 2);
+            assertEquals(rs.getInt(3), 2);
+            assertEquals(rs.getInt(4), 2);
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 3);
+            assertEquals(rs.getInt(2), 1);
+            assertEquals(rs.getInt(3), 1);
+            assertEquals(rs.getInt(4), 3);
+
+            assertFalse(rs.next());
+            
+            // LHS=salted JOIN RHS=salted
+            query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ lhs.mypk, lhs.col1, rhs.mypk, rhs.col1
FROM " 
+                    + tempTableWithSalting + " lhs JOIN "
+                    + tempTableNoSalting + " rhs ON rhs.mypk = lhs.col1 ORDER BY lhs.mypk";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 1);
+            assertEquals(rs.getInt(2), 3);
+            assertEquals(rs.getInt(3), 3);
+            assertEquals(rs.getInt(4), 1);
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 2);
+            assertEquals(rs.getInt(2), 2);
+            assertEquals(rs.getInt(3), 2);
+            assertEquals(rs.getInt(4), 2);
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 3);
+            assertEquals(rs.getInt(2), 1);
+            assertEquals(rs.getInt(3), 1);
+            assertEquals(rs.getInt(4), 3);
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 4);
+            assertEquals(rs.getInt(2), 3);
+            assertEquals(rs.getInt(3), 3);
+            assertEquals(rs.getInt(4), 1);
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 5);
+            assertEquals(rs.getInt(2), 2);
+            assertEquals(rs.getInt(3), 2);
+            assertEquals(rs.getInt(4), 2);
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 6);
+            assertEquals(rs.getInt(2), 1);
+            assertEquals(rs.getInt(3), 1);
+            assertEquals(rs.getInt(4), 3);
+
+            assertFalse(rs.next());
+            
+            // LHS=salted JOIN RHS=salted
+            query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ lhs.mypk, lhs.col1, rhs.mypk, rhs.col1
FROM " 
+                    + tempTableWithSalting + " lhs JOIN "
+                    + tempTableWithSalting + " rhs ON rhs.mypk = (lhs.col1 + 3) ORDER BY
lhs.mypk";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 1);
+            assertEquals(rs.getInt(2), 3);
+            assertEquals(rs.getInt(3), 6);
+            assertEquals(rs.getInt(4), 1);
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 2);
+            assertEquals(rs.getInt(2), 2);
+            assertEquals(rs.getInt(3), 5);
+            assertEquals(rs.getInt(4), 2);
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 3);
+            assertEquals(rs.getInt(2), 1);
+            assertEquals(rs.getInt(3), 4);
+            assertEquals(rs.getInt(4), 3);
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 4);
+            assertEquals(rs.getInt(2), 3);
+            assertEquals(rs.getInt(3), 6);
+            assertEquals(rs.getInt(4), 1);
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 5);
+            assertEquals(rs.getInt(2), 2);
+            assertEquals(rs.getInt(3), 5);
+            assertEquals(rs.getInt(4), 2);
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 6);
+            assertEquals(rs.getInt(2), 1);
+            assertEquals(rs.getInt(3), 4);
+            assertEquals(rs.getInt(4), 3);
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testJoinOnDynamicColumns() throws Exception {
+        String tableA = "tableA";
+        String tableB = "tableB";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = null;
+        PreparedStatement stmt = null;
+        try {
+            conn = DriverManager.getConnection(getUrl(), props);
+            String ddlA = "CREATE TABLE " + tableA + "   (pkA INTEGER NOT NULL, " + "   
colA1 INTEGER, "
+                    + "        colA2 VARCHAR " + "CONSTRAINT PK PRIMARY KEY" + "(pkA)" +
")";
+
+            String ddlB = "CREATE TABLE " + tableB + "   (pkB INTEGER NOT NULL PRIMARY KEY,
" + "    colB INTEGER)";
+            stmt = conn.prepareStatement(ddlA);
+            stmt.execute();
+            stmt.close();
+
+            stmt = conn.prepareStatement(ddlB);
+            stmt.execute();
+            stmt.close();
+
+            String upsertA = "UPSERT INTO TABLEA (pkA, colA1, colA2) VALUES(?, ?, ?)";
+            stmt = conn.prepareStatement(upsertA);
+            int i = 0;
+            for (i = 0; i < 5; i++) {
+                stmt.setInt(1, i);
+                stmt.setInt(2, i + 10);
+                stmt.setString(3, "00" + i);
+                stmt.executeUpdate();
+            }
+            conn.commit();
+            stmt.close();
+
+            // upsert select dynamic columns in tableB
+            conn.createStatement().execute("CREATE SEQUENCE SEQB");
+            String upsertBSelectA = "UPSERT INTO TABLEB (pkB, pkA INTEGER)"
+                    + "SELECT NEXT VALUE FOR SEQB, pkA FROM TABLEA";
+            stmt = conn.prepareStatement(upsertBSelectA);
+            stmt.executeUpdate();
+            stmt.close();
+            conn.commit();
+            conn.createStatement().execute("DROP SEQUENCE SEQB");
+
+            // perform a join between tableB and tableA by joining on the dynamic column
that we upserted in
+            // tableB. This join should return all the rows from table A.
+            String joinSql = "SELECT /*+ USE_SORT_MERGE_JOIN*/ A.pkA, A.COLA1, A.colA2 FROM
TABLEB B(pkA INTEGER) JOIN TABLEA A ON a.pkA = b.pkA";
+            stmt = conn.prepareStatement(joinSql);
+            ResultSet rs = stmt.executeQuery();
+            i = 0;
+            while (rs.next()) {
+                // check that we get back all the rows that we upserted for tableA above.
+                assertEquals(rs.getInt(1), i);
+                assertEquals(rs.getInt(2), i + 10);
+                assertEquals(rs.getString(3), "00" + i);
+                i++;
+            }
+            assertEquals(5,i);
+        } finally {
+            if (stmt != null) {
+                stmt.close();
+            }
+            if (conn != null) {
+                conn.close();
+            }
+
+        }
+
+    }
+
+    @Test
+    public void testSubqueryWithoutData() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+
+        try {
+            String GRAMMAR_TABLE = "CREATE TABLE IF NOT EXISTS GRAMMAR_TABLE (ID INTEGER
PRIMARY KEY, " +
+                    "unsig_id UNSIGNED_INT, big_id BIGINT, unsig_long_id UNSIGNED_LONG, tiny_id
TINYINT," +
+                    "unsig_tiny_id UNSIGNED_TINYINT, small_id SMALLINT, unsig_small_id UNSIGNED_SMALLINT,"
+ 
+                    "float_id FLOAT, unsig_float_id UNSIGNED_FLOAT, double_id DOUBLE, unsig_double_id
UNSIGNED_DOUBLE," + 
+                    "decimal_id DECIMAL, boolean_id BOOLEAN, time_id TIME, date_id DATE,
timestamp_id TIMESTAMP," + 
+                    "unsig_time_id TIME, unsig_date_id DATE, unsig_timestamp_id TIMESTAMP,
varchar_id VARCHAR (30)," + 
+                    "char_id CHAR (30), binary_id BINARY (100), varbinary_id VARBINARY (100))";
+
+            String LARGE_TABLE = "CREATE TABLE IF NOT EXISTS LARGE_TABLE (ID INTEGER PRIMARY
KEY, " +
+                    "unsig_id UNSIGNED_INT, big_id BIGINT, unsig_long_id UNSIGNED_LONG, tiny_id
TINYINT," +
+                    "unsig_tiny_id UNSIGNED_TINYINT, small_id SMALLINT, unsig_small_id UNSIGNED_SMALLINT,"
+ 
+                    "float_id FLOAT, unsig_float_id UNSIGNED_FLOAT, double_id DOUBLE, unsig_double_id
UNSIGNED_DOUBLE," + 
+                    "decimal_id DECIMAL, boolean_id BOOLEAN, time_id TIME, date_id DATE,
timestamp_id TIMESTAMP," + 
+                    "unsig_time_id TIME, unsig_date_id DATE, unsig_timestamp_id TIMESTAMP,
varchar_id VARCHAR (30)," + 
+                    "char_id CHAR (30), binary_id BINARY (100), varbinary_id VARBINARY (100))";
+
+            String SECONDARY_LARGE_TABLE = "CREATE TABLE IF NOT EXISTS SECONDARY_LARGE_TABLE
(SEC_ID INTEGER PRIMARY KEY," +
+                    "sec_unsig_id UNSIGNED_INT, sec_big_id BIGINT, sec_usnig_long_id UNSIGNED_LONG,
sec_tiny_id TINYINT," + 
+                    "sec_unsig_tiny_id UNSIGNED_TINYINT, sec_small_id SMALLINT, sec_unsig_small_id
UNSIGNED_SMALLINT," + 
+                    "sec_float_id FLOAT, sec_unsig_float_id UNSIGNED_FLOAT, sec_double_id
DOUBLE, sec_unsig_double_id UNSIGNED_DOUBLE," +
+                    "sec_decimal_id DECIMAL, sec_boolean_id BOOLEAN, sec_time_id TIME, sec_date_id
DATE," +
+                    "sec_timestamp_id TIMESTAMP, sec_unsig_time_id TIME, sec_unsig_date_id
DATE, sec_unsig_timestamp_id TIMESTAMP," +
+                    "sec_varchar_id VARCHAR (30), sec_char_id CHAR (30), sec_binary_id BINARY
(100), sec_varbinary_id VARBINARY (100))";
+            createTestTable(getUrl(), GRAMMAR_TABLE);
+            createTestTable(getUrl(), LARGE_TABLE);
+            createTestTable(getUrl(), SECONDARY_LARGE_TABLE);
+
+            String ddl = "SELECT /*+USE_SORT_MERGE_JOIN*/ * FROM (SELECT ID, BIG_ID, DATE_ID
FROM LARGE_TABLE AS A WHERE (A.ID % 5) = 0) AS A " +
+                    "INNER JOIN (SELECT SEC_ID, SEC_TINY_ID, SEC_UNSIG_FLOAT_ID FROM SECONDARY_LARGE_TABLE
AS B WHERE (B.SEC_ID % 5) = 0) AS B " +     
+                    "ON A.ID=B.SEC_ID WHERE A.DATE_ID > ALL (SELECT SEC_DATE_ID FROM SECONDARY_LARGE_TABLE
LIMIT 100) " +      
+                    "AND B.SEC_UNSIG_FLOAT_ID = ANY (SELECT sec_unsig_float_id FROM SECONDARY_LARGE_TABLE
" +                                       
+                    "WHERE SEC_ID > ALL (SELECT MIN (ID) FROM GRAMMAR_TABLE WHERE UNSIG_ID
IS NULL) AND " +
+                    "SEC_UNSIG_ID < ANY (SELECT DISTINCT(UNSIG_ID) FROM LARGE_TABLE WHERE
UNSIG_ID<2500) LIMIT 1000) " +
+                    "AND A.ID < 10000";
+            ResultSet rs = conn.createStatement().executeQuery(ddl);
+            assertFalse(rs.next());  
+        } finally {
+            Statement statement = conn.createStatement();
+            String query = "drop table GRAMMAR_TABLE";
+            statement.executeUpdate(query);
+            query = "drop table LARGE_TABLE";
+            statement.executeUpdate(query);
+            query = "drop table SECONDARY_LARGE_TABLE";
+            statement.executeUpdate(query);
+            conn.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a57553c1/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 8a9abc2..ad65c1c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -285,23 +285,28 @@ public class QueryCompiler {
             JoinType[] joinTypes = new JoinType[count];
             PTable[] tables = new PTable[count];
             int[] fieldPositions = new int[count];
-            HashSubPlan[] subPlans = new HashSubPlan[count];
+            StatementContext[] subContexts = new StatementContext[count];
+            QueryPlan[] subPlans = new QueryPlan[count];
+            HashSubPlan[] hashPlans = new HashSubPlan[count];
             fieldPositions[0] = projectedTable.getColumns().size() - projectedTable.getPKColumns().size();
             for (int i = 0; i < count; i++) {
                 JoinSpec joinSpec = joinSpecs.get(i);
                 Scan subScan = ScanUtil.newScan(originalScan);
-                StatementContext subContext = new StatementContext(statement, context.getResolver(),
subScan, new SequenceManager(statement));
-                QueryPlan joinPlan = compileJoinQuery(subContext, binds, joinSpec.getJoinTable(),
true, true, null);
+                subContexts[i] = new StatementContext(statement, context.getResolver(), subScan,
new SequenceManager(statement));
+                subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(),
true, true, null);
                 boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
                 if (hasPostReference) {
-                    tables[i] = subContext.getResolver().getTables().get(0).getTable();
+                    tables[i] = subContexts[i].getResolver().getTables().get(0).getTable();
                     projectedTable = JoinCompiler.joinProjectedTables(projectedTable, tables[i],
joinSpec.getType());
                 } else {
                     tables[i] = null;
                 }
+            }
+            for (int i = 0; i < count; i++) {
+                JoinSpec joinSpec = joinSpecs.get(i);
                 context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable,
context.getConnection(), query.getUdfParseNodes()));
                 joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
-                Pair<List<Expression>, List<Expression>> joinConditions
= joinSpec.compileJoinConditions(context, subContext, true);
+                Pair<List<Expression>, List<Expression>> joinConditions
= joinSpec.compileJoinConditions(context, subContexts[i], true);
                 joinExpressions[i] = joinConditions.getFirst();
                 List<Expression> hashExpressions = joinConditions.getSecond();
                 Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression,
Expression>(null, null);
@@ -312,7 +317,7 @@ public class QueryCompiler {
                 if (i < count - 1) {
                     fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 :
(tables[i].getColumns().size() - tables[i].getPKColumns().size()));
                 }
-                subPlans[i] = new HashSubPlan(i, joinPlan, optimized ? null : hashExpressions,
joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression);
+                hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions,
joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression);
             }
             TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
             QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery
&& joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns
? tupleProjector : null, true);
@@ -322,7 +327,7 @@ public class QueryCompiler {
                 limit = plan.getLimit();
             }
             HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, joinExpressions,
joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression, limit);
-            return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, subPlans);
+            return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, hashPlans);
         }
 
         JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);


Mime
View raw message