Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 96C95200B61 for ; Mon, 25 Jul 2016 15:46:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 958DD160A7D; Mon, 25 Jul 2016 13:46:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5E9DC160A98 for ; Mon, 25 Jul 2016 15:46:24 +0200 (CEST) Received: (qmail 98389 invoked by uid 500); 25 Jul 2016 13:46:23 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 98141 invoked by uid 99); 25 Jul 2016 13:46:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 25 Jul 2016 13:46:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 37CB0E5720; Mon, 25 Jul 2016 13:46:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Mon, 25 Jul 2016 13:46:28 -0000 Message-Id: <75b20156d04343c89eca2b321cb993bf@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [06/12] ignite git commit: ignite-3563 Support distributedJoins flag in JDBC driver archived-at: Mon, 25 Jul 2016 13:46:25 -0000 ignite-3563 Support distributedJoins flag in JDBC driver Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e15cf1d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e15cf1d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e15cf1d Branch: refs/heads/ignite-3560 Commit: 0e15cf1df8b161dbc44f8d3810feca3192f05f73 Parents: 58d0544 Author: agura Authored: Mon Jul 25 12:31:24 2016 +0300 Committer: agura Committed: Mon Jul 25 15:11:12 2016 +0300 ---------------------------------------------------------------------- .../jdbc2/JdbcDistributedJoinsQueryTest.java | 319 +++++++++++++++++++ .../jdbc/suite/IgniteJdbcDriverTestSuite.java | 2 + .../org/apache/ignite/IgniteJdbcDriver.java | 14 +- .../ignite/internal/jdbc2/JdbcConnection.java | 12 + .../ignite/internal/jdbc2/JdbcQueryTask.java | 9 +- .../ignite/internal/jdbc2/JdbcResultSet.java | 4 +- .../ignite/internal/jdbc2/JdbcStatement.java | 2 +- 7 files changed, 357 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0e15cf1d/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDistributedJoinsQueryTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDistributedJoinsQueryTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDistributedJoinsQueryTest.java new file mode 100644 index 0000000..53bfa73 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDistributedJoinsQueryTest.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.jdbc2; + +import java.io.Serializable; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.ConnectorConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Tests for complex queries with distributed joins enabled (joins, etc.). + */ +public class JdbcDistributedJoinsQueryTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** JDBC URL. */ + private static final String BASE_URL = CFG_URL_PREFIX + "distributedJoins=true@modules/clients/src/test/config/jdbc-config.xml"; + + /** Statement. */ + private Statement stmt; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cache = defaultCacheConfiguration(); + + cache.setCacheMode(PARTITIONED); + cache.setBackups(1); + cache.setWriteSynchronizationMode(FULL_SYNC); + cache.setAtomicityMode(TRANSACTIONAL); + cache.setIndexedTypes(String.class, Organization.class, String.class, Person.class); + + cfg.setCacheConfiguration(cache); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + cfg.setConnectorConfiguration(new ConnectorConfiguration()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(3); + + IgniteCache orgCache = grid(0).cache(null); + + assert orgCache != null; + + orgCache.put("o1", new Organization(1, "A")); + orgCache.put("o2", new Organization(2, "B")); + + IgniteCache personCache = grid(0).cache(null); + + assert personCache != null; + + personCache.put("p1", new Person(1, "John White", 25, 1)); + personCache.put("p2", new Person(2, "Joe Black", 35, 1)); + personCache.put("p3", new Person(3, "Mike Green", 40, 2)); + + Class.forName("org.apache.ignite.IgniteJdbcDriver"); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stmt = DriverManager.getConnection(BASE_URL).createStatement(); + + assert stmt != null; + assert !stmt.isClosed(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + if (stmt != null) { + stmt.getConnection().close(); + stmt.close(); + + assert stmt.isClosed(); + } + } + + /** + * @throws Exception If failed. + */ + public void testJoin() throws Exception { + ResultSet rs = stmt.executeQuery( + "select p.id, p.name, o.name as orgName from Person p, Organization o where p.orgId = o.id"); + + assert rs != null; + + int cnt = 0; + + while (rs.next()) { + int id = rs.getInt("id"); + + if (id == 1) { + assert "John White".equals(rs.getString("name")); + assert "A".equals(rs.getString("orgName")); + } + else if (id == 2) { + assert "Joe Black".equals(rs.getString("name")); + assert "A".equals(rs.getString("orgName")); + } + else if (id == 3) { + assert "Mike Green".equals(rs.getString("name")); + assert "B".equals(rs.getString("orgName")); + } + else + assert false : "Wrong ID: " + id; + + cnt++; + } + + assertEquals(3, cnt); + } + + /** + * @throws Exception If failed. + */ + public void testJoinWithoutAlias() throws Exception { + ResultSet rs = stmt.executeQuery( + "select p.id, p.name, o.name from Person p, Organization o where p.orgId = o.id"); + + assert rs != null; + + int cnt = 0; + + while (rs.next()) { + int id = rs.getInt(1); + + if (id == 1) { + assert "John White".equals(rs.getString("name")); + assert "John White".equals(rs.getString(2)); + assert "A".equals(rs.getString(3)); + } + else if (id == 2) { + assert "Joe Black".equals(rs.getString("name")); + assert "Joe Black".equals(rs.getString(2)); + assert "A".equals(rs.getString(3)); + } + else if (id == 3) { + assert "Mike Green".equals(rs.getString("name")); + assert "Mike Green".equals(rs.getString(2)); + assert "B".equals(rs.getString(3)); + } + else + assert false : "Wrong ID: " + id; + + cnt++; + } + + assert cnt == 3; + } + + /** + * @throws Exception If failed. + */ + public void testIn() throws Exception { + ResultSet rs = stmt.executeQuery("select name from Person where age in (25, 35)"); + + assert rs != null; + + int cnt = 0; + + while (rs.next()) { + assert "John White".equals(rs.getString("name")) || + "Joe Black".equals(rs.getString("name")); + + cnt++; + } + + assert cnt == 2; + } + + /** + * @throws Exception If failed. + */ + public void testBetween() throws Exception { + ResultSet rs = stmt.executeQuery("select name from Person where age between 24 and 36"); + + assert rs != null; + + int cnt = 0; + + while (rs.next()) { + assert "John White".equals(rs.getString("name")) || + "Joe Black".equals(rs.getString("name")); + + cnt++; + } + + assert cnt == 2; + } + + /** + * @throws Exception If failed. + */ + public void testCalculatedValue() throws Exception { + ResultSet rs = stmt.executeQuery("select age * 2 from Person"); + + assert rs != null; + + int cnt = 0; + + while (rs.next()) { + assert rs.getInt(1) == 50 || + rs.getInt(1) == 70 || + rs.getInt(1) == 80; + + cnt++; + } + + assert cnt == 3; + } + + /** + * Person. + */ + @SuppressWarnings("UnusedDeclaration") + private static class Person implements Serializable { + /** ID. */ + @QuerySqlField + private final int id; + + /** Name. */ + @QuerySqlField(index = false) + private final String name; + + /** Age. */ + @QuerySqlField + private final int age; + + /** Organization ID. */ + @QuerySqlField(index = true) + private final int orgId; + + /** + * @param id ID. + * @param name Name. + * @param age Age. + * @param orgId Organization ID. + */ + private Person(int id, String name, int age, int orgId) { + assert !F.isEmpty(name); + assert age > 0; + assert orgId > 0; + + this.id = id; + this.name = name; + this.age = age; + this.orgId = orgId; + } + } + + /** + * Organization. + */ + @SuppressWarnings("UnusedDeclaration") + private static class Organization implements Serializable { + /** ID. */ + @QuerySqlField + private final int id; + + /** Name. */ + @QuerySqlField(index = false) + private final String name; + + /** + * @param id ID. + * @param name Name. + */ + private Organization(int id, String name) { + this.id = id; + this.name = name; + } + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0e15cf1d/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java index 603ee81..b1053b0 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.jdbc.suite; import junit.framework.TestSuite; +import org.apache.ignite.internal.jdbc2.JdbcDistributedJoinsQueryTest; import org.apache.ignite.jdbc.JdbcComplexQuerySelfTest; import org.apache.ignite.jdbc.JdbcConnectionSelfTest; import org.apache.ignite.jdbc.JdbcEmptyCacheSelfTest; @@ -56,6 +57,7 @@ public class IgniteJdbcDriverTestSuite extends TestSuite { suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcPreparedStatementSelfTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcResultSetSelfTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcComplexQuerySelfTest.class)); + suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDistributedJoinsQueryTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcMetadataSelfTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcEmptyCacheSelfTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcLocalCachesSelfTest.class)); http://git-wip-us.apache.org/repos/asf/ignite/blob/0e15cf1d/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java index 567ff9f..d432c1e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java @@ -104,6 +104,11 @@ import org.apache.ignite.logger.java.JavaLogger; * can make significant performance and network optimizations. * Default value is {@code false}. * + *
  • + * {@code distributedJoins} - enables support of distributed joins feature. This flag does not make sense in + * combination with {@code local} and/or {@code collocated} flags with {@code true} value or in case of querying + * of local cache. Default value is {@code false}. + *
  • * * *

    Configuration of Ignite Java client based connection

    @@ -284,6 +289,9 @@ public class IgniteJdbcDriver implements Driver { /** Collocated parameter name. */ private static final String PARAM_COLLOCATED = "collocated"; + /** Distributed joins parameter name. */ + private static final String PARAM_DISTRIBUTED_JOINS = "distributedJoins"; + /** Hostname property name. */ public static final String PROP_HOST = PROP_PREFIX + "host"; @@ -302,6 +310,9 @@ public class IgniteJdbcDriver implements Driver { /** Collocated property name. */ public static final String PROP_COLLOCATED = PROP_PREFIX + PARAM_COLLOCATED; + /** Distributed joins property name. */ + public static final String PROP_DISTRIBUTED_JOINS = PROP_PREFIX + PARAM_DISTRIBUTED_JOINS; + /** Cache name property name. */ public static final String PROP_CFG = PROP_PREFIX + "cfg"; @@ -366,7 +377,8 @@ public class IgniteJdbcDriver implements Driver { new PropertyInfo("Cache name", info.getProperty(PROP_CACHE), ""), new PropertyInfo("Node ID", info.getProperty(PROP_NODE_ID), ""), new PropertyInfo("Local", info.getProperty(PROP_LOCAL), ""), - new PropertyInfo("Collocated", info.getProperty(PROP_COLLOCATED), "") + new PropertyInfo("Collocated", info.getProperty(PROP_COLLOCATED), ""), + new PropertyInfo("Distributed Joins", info.getProperty(PROP_DISTRIBUTED_JOINS), "") ); if (info.getProperty(PROP_CFG) != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/0e15cf1d/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java index 2d2ce5d..6f0d9c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java @@ -69,6 +69,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.IgniteJdbcDriver.PROP_CACHE; import static org.apache.ignite.IgniteJdbcDriver.PROP_CFG; import static org.apache.ignite.IgniteJdbcDriver.PROP_COLLOCATED; +import static org.apache.ignite.IgniteJdbcDriver.PROP_DISTRIBUTED_JOINS; import static org.apache.ignite.IgniteJdbcDriver.PROP_LOCAL; import static org.apache.ignite.IgniteJdbcDriver.PROP_NODE_ID; @@ -113,6 +114,9 @@ public class JdbcConnection implements Connection { /** Collocated query flag. */ private boolean collocatedQry; + /** Distributed joins flag. */ + private boolean distributedJoins; + /** Statements. */ final Set statements = new HashSet<>(); @@ -132,6 +136,7 @@ public class JdbcConnection implements Connection { this.cacheName = props.getProperty(PROP_CACHE); this.locQry = Boolean.parseBoolean(props.getProperty(PROP_LOCAL)); this.collocatedQry = Boolean.parseBoolean(props.getProperty(PROP_COLLOCATED)); + this.distributedJoins = Boolean.parseBoolean(props.getProperty(PROP_DISTRIBUTED_JOINS)); String nodeIdProp = props.getProperty(PROP_NODE_ID); @@ -706,6 +711,13 @@ public class JdbcConnection implements Connection { } /** + * @return Distributed joins flag. + */ + boolean isDistributedJoins() { + return distributedJoins; + } + + /** * Ensures that connection is not closed. * * @throws SQLException If connection is closed. http://git-wip-us.apache.org/repos/asf/ignite/blob/0e15cf1d/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java index 1a5793a..c4911cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java @@ -92,6 +92,9 @@ class JdbcQueryTask implements IgniteCallable { /** Collocated query flag. */ private final boolean collocatedQry; + /** Distributed joins flag. */ + private final boolean distributedJoins; + /** * @param ignite Ignite. * @param cacheName Cache name. @@ -102,9 +105,11 @@ class JdbcQueryTask implements IgniteCallable { * @param uuid UUID. * @param locQry Local query flag. * @param collocatedQry Collocated query flag. + * @param distributedJoins Distributed joins flag. */ public JdbcQueryTask(Ignite ignite, String cacheName, String sql, - boolean loc, Object[] args, int fetchSize, UUID uuid, boolean locQry, boolean collocatedQry) { + boolean loc, Object[] args, int fetchSize, UUID uuid, + boolean locQry, boolean collocatedQry, boolean distributedJoins) { this.ignite = ignite; this.args = args; this.uuid = uuid; @@ -114,6 +119,7 @@ class JdbcQueryTask implements IgniteCallable { this.loc = loc; this.locQry = locQry; this.collocatedQry = collocatedQry; + this.distributedJoins = distributedJoins; } /** {@inheritDoc} */ @@ -147,6 +153,7 @@ class JdbcQueryTask implements IgniteCallable { qry.setPageSize(fetchSize); qry.setLocal(locQry); qry.setCollocated(collocatedQry); + qry.setDistributedJoins(distributedJoins); QueryCursor> qryCursor = cache.query(qry); http://git-wip-us.apache.org/repos/asf/ignite/blob/0e15cf1d/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java index 69dddad..8e0e9d0 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java @@ -146,8 +146,8 @@ public class JdbcResultSet implements ResultSet { boolean loc = nodeId == null; - JdbcQueryTask qryTask = new JdbcQueryTask(loc ? ignite : null, conn.cacheName(), - null, loc, null, fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery()); + JdbcQueryTask qryTask = new JdbcQueryTask(loc ? ignite : null, conn.cacheName(), null, loc, null, + fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(), conn.isDistributedJoins()); try { JdbcQueryTask.QueryResult res = http://git-wip-us.apache.org/repos/asf/ignite/blob/0e15cf1d/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java index 6b806f3..e187dc0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java @@ -96,7 +96,7 @@ public class JdbcStatement implements Statement { boolean loc = nodeId == null; JdbcQueryTask qryTask = new JdbcQueryTask(loc ? ignite : null, conn.cacheName(), - sql, loc, args, fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery()); + sql, loc, args, fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(), conn.isDistributedJoins()); try { JdbcQueryTask.QueryResult res =