phoenix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ethan Wang (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (PHOENIX-3999) Optimize inner joins as SKIP-SCAN-JOIN when possible
Date Sun, 20 Aug 2017 08:36:00 GMT

    [ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134342#comment-16134342
] 

Ethan Wang commented on PHOENIX-3999:
-------------------------------------

Thanks [~maryannxue]. I think the score based Calcite optimization is the way to go. For now,
 here at salesforce we got some imminent use cases related to this ticket and I'm taking a
look.

As for the step one, I'm checking if the inner join and the semi join of the same query (
that [~jamestaylor] posted above) are end up doing as explain says. Here are my findings,
basically both are as [~maryannxue] rightly pointed out.

1, For Inner join, i.e.,
{code:java}
SELECT 
	i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
FROM  
	ITEMS i, COMPLETED_BATCHES b
WHERE 
   b.BATCH_ID = i.BATCH_ID
   AND b.BATCH_SEQUENCE_NUM > 0 
   AND b.BATCH_SEQUENCE_NUM < 2;
{code}


The plan explanation is accuate. 

What happened was: It turn into a HashJoinPlan. parent LHS is "Item", RHS is a hash join subplan
for "COMPLETED_BATCHES". This subplan will first execute a regular scan (even before user
start calling rs.next()). When it finishes, it turn into a "Dynamic Filter" and broadcast
to all "ITEM" regions and persist as HashCacheImpl obj. On the region server I was able to
observe the hashCache object contains the two COMPLETED_BATCHES rows. Later on, when LHS skip-scan
starts, inside HashJoinRegionScanner, will consult with this cached dynamic filter when fetching
each result tuples to resultQueue and send back to user.


bq. while in the query you mentioned above b.BATCH_SEQUENCE_NUM is from RHS and is not part
of the join key, so SKIP-SCAN-JOIN is not used. Still this query satisfies the child-parent
join optimization conditions and should be fired, which mean "dynamic filter" should appear
in the plan. Does it work that way now?

So the answer should be yes.



2, when written in semi join, i.e.,
{code:java}
SELECT 
	ITEM_TYPE, 1, ITEM_ID, ITEM_VALUE   
FROM  
	ITEMS i
WHERE EXISTS (  
	SELECT 1 FROM COMPLETED_BATCHES b 
	WHERE b.BATCH_ID = i.BATCH_ID 
	AND   b.BATCH_SEQUENCE_NUM > 0 
	AND   b.BATCH_SEQUENCE_NUM < 2
)
{code}

Still it becomes a HashJoinPlan where parent is "ITEM" table scan but the RHS subplan becomes
point look up (TupleProjectionPlan). No ServerCache (dynamic filter) object has ever been
generated or broadcasted. This RHS subplan now becomes a static constant key range and is
merged with parent's where clause.

So, if I understand right, the semi join is supposed to be more "optimized" than the inner
join one, because less info is required for the final result. And also, "SKIP-SCAN-JOIN" should
*always* outpreform the regular "DYNAMIC FILTER", becuase "DYNAMIC FILTER" at most can not
reduce the scan length on the parent. However "SKIP-SCAN-JOIN" sometimes can. As a result
semi join should "always" performs better in all senario. (right?)

> Optimize inner joins as SKIP-SCAN-JOIN when possible
> ----------------------------------------------------
>
>                 Key: PHOENIX-3999
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-3999
>             Project: Phoenix
>          Issue Type: Bug
>            Reporter: James Taylor
>
> Semi joins on the leading part of the primary key end up doing batches of point queries
(as opposed to a broadcast hash join), however inner joins do not.
> Here's a set of example schemas that executes a skip scan on the inner query:
> {code}
> CREATE TABLE COMPLETED_BATCHES (
>     BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>     BATCH_ID           BIGINT NOT NULL,
>     CONSTRAINT PK PRIMARY KEY
>     (
>         BATCH_SEQUENCE_NUM,
>         BATCH_ID
>     )
> );
> CREATE TABLE ITEMS (
>    BATCH_ID BIGINT NOT NULL,
>    ITEM_ID BIGINT NOT NULL,
>    ITEM_TYPE BIGINT,
>    ITEM_VALUE VARCHAR,
>    CONSTRAINT PK PRIMARY KEY
>    (
>         BATCH_ID,
>         ITEM_ID
>    )
> );
> CREATE TABLE COMPLETED_ITEMS (
>    ITEM_TYPE          BIGINT NOT NULL,
>    BATCH_SEQUENCE_NUM BIGINT NOT NULL,
>    ITEM_ID            BIGINT NOT NULL,
>    ITEM_VALUE         VARCHAR,
>    CONSTRAINT PK PRIMARY KEY
>    (
>       ITEM_TYPE,
>       BATCH_SEQUENCE_NUM,  
>       ITEM_ID
>    )
> );
> {code}
> The explain plan of these indicate that a dynamic filter will be performed like this:
> {code}
> UPSERT SELECT
> CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS
>     SKIP-SCAN-JOIN TABLE 0
>         CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2]
>             SERVER FILTER BY FIRST KEY ONLY
>             SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID]
>         CLIENT MERGE SORT
>     DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9)
> {code}
> We should also be able to leverage this optimization when an inner join is used such
as this:
> {code}
> UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, ITEM_VALUE)
>    SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE   
>    FROM  ITEMS i, COMPLETED_BATCHES b
>    WHERE b.BATCH_ID = i.BATCH_ID AND          
>    b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000;
> {code}
> A complete unit test looks like this:
> {code}
>     @Test
>     public void testNestedLoopJoin() throws Exception {
>         try (Connection conn = DriverManager.getConnection(getUrl())) {
>             String t1="COMPLETED_BATCHES";
>             String ddl1 = "CREATE TABLE " + t1 + " (\n" + 
>                     "    BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
>                     "    BATCH_ID           BIGINT NOT NULL,\n" + 
>                     "    CONSTRAINT PK PRIMARY KEY\n" + 
>                     "    (\n" + 
>                     "        BATCH_SEQUENCE_NUM,\n" + 
>                     "        BATCH_ID\n" + 
>                     "    )\n" + 
>                     ")" + 
>                     "";
>             conn.createStatement().execute(ddl1);
>             
>             String t2="ITEMS";
>             String ddl2 = "CREATE TABLE " + t2 + " (\n" + 
>                     "   BATCH_ID BIGINT NOT NULL,\n" + 
>                     "   ITEM_ID BIGINT NOT NULL,\n" + 
>                     "   ITEM_TYPE BIGINT,\n" + 
>                     "   ITEM_VALUE VARCHAR,\n" + 
>                     "   CONSTRAINT PK PRIMARY KEY\n" + 
>                     "   (\n" + 
>                     "        BATCH_ID,\n" + 
>                     "        ITEM_ID\n" + 
>                     "   )\n" + 
>                     ")";
>             conn.createStatement().execute(ddl2);
>             String t3="COMPLETED_ITEMS";
>             String ddl3 = "CREATE TABLE " + t3 + "(\n" + 
>                     "   ITEM_TYPE          BIGINT NOT NULL,\n" + 
>                     "   BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + 
>                     "   ITEM_ID            BIGINT NOT NULL,\n" + 
>                     "   ITEM_VALUE         VARCHAR,\n" + 
>                     "   CONSTRAINT PK PRIMARY KEY\n" + 
>                     "   (\n" + 
>                     "      ITEM_TYPE,\n" + 
>                     "      BATCH_SEQUENCE_NUM,  \n" + 
>                     "      ITEM_ID\n" + 
>                     "   )\n" + 
>                     ")";
>             conn.createStatement().execute(ddl3);
>             conn.createStatement().execute("UPSERT INTO "+t1+"(BATCH_SEQUENCE_NUM, batch_id)
VALUES (1,2)");
>             conn.createStatement().execute("UPSERT INTO "+t1+"(BATCH_SEQUENCE_NUM, batch_id)
VALUES (1,4)");
>             conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type,
item_value) VALUES (1,100, 10, 'a')");
>             conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type,
item_value) VALUES (2,200, 20, 'a')");
>             conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type,
item_value) VALUES (3,300, 10, 'a')");
>             conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type,
item_value) VALUES (4,400, 20, 'a')");
>             conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type,
item_value) VALUES (5,500, 10, 'a')");
>             conn.commit();
>             
>             conn.setAutoCommit(true);
>             String dml = "UPSERT INTO " + t3 + " (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID,
ITEM_VALUE)\n" + 
>                     "SELECT ITEM_TYPE, 1, ITEM_ID, ITEM_VALUE   \n" + 
>                     "FROM  " + t2 + " i\n" + 
>                     "WHERE EXISTS (" +
>                     "  SELECT 1 FROM " + t1 + " b WHERE b.BATCH_ID = i.BATCH_ID AND "
+
>                     "  b.BATCH_SEQUENCE_NUM > 0 AND b.BATCH_SEQUENCE_NUM < 2)";
>             conn.createStatement().execute(dml);
>             ResultSet rs = conn.createStatement().executeQuery("SELECT ITEM_ID FROM "
+ t3);
>             assertTrue(rs.next());
>             assertEquals(rs.getLong(1), 200L);
>             assertTrue(rs.next());
>             assertEquals(rs.getLong(1), 400L);
>             assertFalse(rs.next());
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message