Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9EEE5200CCC for ; Fri, 7 Jul 2017 05:07:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9D4E216801A; Fri, 7 Jul 2017 03:07:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BC96A168017 for ; Fri, 7 Jul 2017 05:07:06 +0200 (CEST) Received: (qmail 83092 invoked by uid 500); 7 Jul 2017 03:07:05 -0000 Mailing-List: contact dev-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list dev@phoenix.apache.org Received: (qmail 83081 invoked by uid 99); 7 Jul 2017 03:07:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Jul 2017 03:07:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 46DA71812FA for ; Fri, 7 Jul 2017 03:07:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id OIaARWzKVwUm for ; Fri, 7 Jul 2017 03:07:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 167E75FBBA for ; Fri, 7 Jul 2017 03:07:02 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 3375AE0637 for ; Fri, 7 Jul 2017 03:07:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 0CB8F24689 for ; Fri, 7 Jul 2017 03:07:00 +0000 (UTC) Date: Fri, 7 Jul 2017 03:07:00 +0000 (UTC) From: "James Taylor (JIRA)" To: dev@phoenix.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (PHOENIX-3905) Allow dynamic filtered join queries in UPSERT SELECT to be distributed across cluster MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 07 Jul 2017 03:07:07 -0000 [ https://issues.apache.org/jira/browse/PHOENIX-3905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] James Taylor updated PHOENIX-3905: ---------------------------------- Description: Semi joins on the leading part of the primary key end up doing batches of point queries (as opposed to a broadcast hash join), and thus could be distributed across the cluster to improve performance when used in an UPSERT SELECT. Here's a set of example schemas that executes a skip scan on the inner query: {code} CREATE TABLE COMPLETED_BATCHES ( BATCH_SEQUENCE_NUM BIGINT NOT NULL, BATCH_ID BIGINT NOT NULL, CONSTRAINT PK PRIMARY KEY ( BATCH_SEQUENCE_NUM, BATCH_ID ) ); CREATE TABLE ITEMS ( BATCH_ID BIGINT NOT NULL, ITEM_ID BIGINT NOT NULL, ITEM_TYPE BIGINT, ITEM_VALUE VARCHAR, CONSTRAINT PK PRIMARY KEY ( BATCH_ID, ITEM_ID ) ); CREATE TABLE COMPLETED_ITEMS ( ITEM_TYPE BIGINT NOT NULL, BATCH_SEQUENCE_NUM BIGINT NOT NULL, ITEM_ID BIGINT NOT NULL, ITEM_VALUE VARCHAR, CONSTRAINT PK PRIMARY KEY ( ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID ) ); {code} The explain plan of these indicate that a dynamic filter will be performed like this: {code} UPSERT SELECT CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS SKIP-SCAN-JOIN TABLE 0 CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] SERVER FILTER BY FIRST KEY ONLY SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] CLIENT MERGE SORT DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) {code} A complete unit test looks like this: {code} @Test public void testNestedLoopJoin() throws Exception { try (Connection conn = DriverManager.getConnection(getUrl())) { String t1="COMPLETED_BATCHES"; String ddl1 = "CREATE TABLE " + t1 + " (\n" + " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + " BATCH_ID BIGINT NOT NULL,\n" + " CONSTRAINT PK PRIMARY KEY\n" + " (\n" + " BATCH_SEQUENCE_NUM,\n" + " BATCH_ID\n" + " )\n" + ")" + ""; conn.createStatement().execute(ddl1); String t2="ITEMS"; String ddl2 = "CREATE TABLE " + t2 + " (\n" + " BATCH_ID BIGINT NOT NULL,\n" + " ITEM_ID BIGINT NOT NULL,\n" + " ITEM_TYPE BIGINT,\n" + " ITEM_VALUE VARCHAR,\n" + " CONSTRAINT PK PRIMARY KEY\n" + " (\n" + " BATCH_ID,\n" + " ITEM_ID\n" + " )\n" + ")"; conn.createStatement().execute(ddl2); String t3="COMPLETED_ITEMS"; String ddl3 = "CREATE TABLE " + t3 + "(\n" + " ITEM_TYPE BIGINT NOT NULL,\n" + " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + " ITEM_ID BIGINT NOT NULL,\n" + " ITEM_VALUE VARCHAR,\n" + " CONSTRAINT PK PRIMARY KEY\n" + " (\n" + " ITEM_TYPE,\n" + " BATCH_SEQUENCE_NUM, \n" + " ITEM_ID\n" + " )\n" + ")"; conn.createStatement().execute(ddl3); conn.createStatement().execute("UPSERT INTO "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)"); conn.createStatement().execute("UPSERT INTO "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,4)"); conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (1,100, 10, 'a')"); conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (2,200, 20, 'a')"); conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (3,300, 10, 'a')"); conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (4,400, 20, 'a')"); conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (5,500, 10, 'a')"); conn.commit(); conn.setAutoCommit(true); String dml = "UPSERT INTO " + t3 + " (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, ITEM_VALUE)\n" + "SELECT ITEM_TYPE, 1, ITEM_ID, ITEM_VALUE \n" + "FROM " + t2 + " i\n" + "WHERE EXISTS (" + " SELECT 1 FROM " + t1 + " b WHERE b.BATCH_ID = i.BATCH_ID AND " + " b.BATCH_SEQUENCE_NUM > 0 AND b.BATCH_SEQUENCE_NUM < 2)"; conn.createStatement().execute(dml); ResultSet rs = conn.createStatement().executeQuery("SELECT ITEM_ID FROM " + t3); assertTrue(rs.next()); assertEquals(rs.getLong(1), 200L); assertTrue(rs.next()); assertEquals(rs.getLong(1), 400L); assertFalse(rs.next()); } } {code} Currently, for these types of UPSERT SELECT queries, the selected data will flow back to the client and then back out to the appropriate server. It'll still be parallelized, but only on a single client as opposed to across multiple region servers in the cluster. The benefit would depend on how many regions servers would be involved in fetching the data for the select part of the query. was: Joins on the leading part of the primary key end up doing batches of point queries (as opposed to a broadcast hash join), and thus could be distributed across the cluster to improve performance when used in an UPSERT SELECT. The explain plan of these indicate that a dynamic filter will be performed like this: {code} DYNAMIC SERVER FILTER BY (DML.PK1 DML.PK2, DML.PK3) IN ((COM.PK1, COM.PK2, COM.PK3)) {code} Currently, for these types of UPSERT SELECT queries, the selected data will flow back to the client and then back out to the appropriate server. It'll still be parallelized, but only on a single client as opposed to across multiple region servers in the cluster. The benefit would depend on how many regions servers would be involved in fetching the data for the select part of the query. > Allow dynamic filtered join queries in UPSERT SELECT to be distributed across cluster > ------------------------------------------------------------------------------------- > > Key: PHOENIX-3905 > URL: https://issues.apache.org/jira/browse/PHOENIX-3905 > Project: Phoenix > Issue Type: Improvement > Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of point queries (as opposed to a broadcast hash join), and thus could be distributed across the cluster to improve performance when used in an UPSERT SELECT. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( > BATCH_ID BIGINT NOT NULL, > ITEM_ID BIGINT NOT NULL, > ITEM_TYPE BIGINT, > ITEM_VALUE VARCHAR, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_ID, > ITEM_ID > ) > ); > CREATE TABLE COMPLETED_ITEMS ( > ITEM_TYPE BIGINT NOT NULL, > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > ITEM_ID BIGINT NOT NULL, > ITEM_VALUE VARCHAR, > CONSTRAINT PK PRIMARY KEY > ( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID > ) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " BATCH_SEQUENCE_NUM,\n" + > " BATCH_ID\n" + > " )\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " BATCH_ID,\n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl3); > conn.createStatement().execute("UPSERT INTO "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)"); > conn.createStatement().execute("UPSERT INTO "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,4)"); > conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (1,100, 10, 'a')"); > conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (2,200, 20, 'a')"); > conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (3,300, 10, 'a')"); > conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (4,400, 20, 'a')"); > conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, item_id, item_type, item_value) VALUES (5,500, 10, 'a')"); > conn.commit(); > > conn.setAutoCommit(true); > String dml = "UPSERT INTO " + t3 + " (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, ITEM_VALUE)\n" + > "SELECT ITEM_TYPE, 1, ITEM_ID, ITEM_VALUE \n" + > "FROM " + t2 + " i\n" + > "WHERE EXISTS (" + > " SELECT 1 FROM " + t1 + " b WHERE b.BATCH_ID = i.BATCH_ID AND " + > " b.BATCH_SEQUENCE_NUM > 0 AND b.BATCH_SEQUENCE_NUM < 2)"; > conn.createStatement().execute(dml); > ResultSet rs = conn.createStatement().executeQuery("SELECT ITEM_ID FROM " + t3); > assertTrue(rs.next()); > assertEquals(rs.getLong(1), 200L); > assertTrue(rs.next()); > assertEquals(rs.getLong(1), 400L); > assertFalse(rs.next()); > } > } > {code} > Currently, for these types of UPSERT SELECT queries, the selected data will flow back to the client and then back out to the appropriate server. It'll still be parallelized, but only on a single client as opposed to across multiple region servers in the cluster. The benefit would depend on how many regions servers would be involved in fetching the data for the select part of the query. -- This message was sent by Atlassian JIRA (v6.4.14#64029)