From commits-return-100554-archive-asf-public=cust-asf.ponee.io@beam.apache.org Fri Jan 18 19:41:57 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 5D316180647 for ; Fri, 18 Jan 2019 19:41:56 +0100 (CET) Received: (qmail 35817 invoked by uid 500); 18 Jan 2019 18:41:55 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 35807 invoked by uid 99); 18 Jan 2019 18:41:55 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Jan 2019 18:41:55 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id CDC188762E; Fri, 18 Jan 2019 18:41:54 +0000 (UTC) Date: Fri, 18 Jan 2019 18:41:54 +0000 To: "commits@beam.apache.org" Subject: [beam] branch master updated: [SQL] Force cacheless root schema in Jdbc connection MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154783691328.2072.18405420640355215133@gitbox.apache.org> From: anton@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: beam X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 18a21e0eeb302825341c5da4a0030e2ae760481e X-Git-Newrev: 20d6093e4a4f864091d821d5320a5468a7547b4f X-Git-Rev: 20d6093e4a4f864091d821d5320a5468a7547b4f X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. anton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 20d6093 [SQL] Force cacheless root schema in Jdbc connection new 6a59667 Merge pull request #7553 from akedin/cacheless-root-schema 20d6093 is described below commit 20d6093e4a4f864091d821d5320a5468a7547b4f Author: akedin AuthorDate: Thu Jan 17 16:04:34 2019 -0800 [SQL] Force cacheless root schema in Jdbc connection --- .../extensions/sql/impl/CalciteFactoryWrapper.java | 110 ++++++++++++++++++++ .../sdk/extensions/sql/impl/JdbcConnection.java | 26 ++--- .../beam/sdk/extensions/sql/impl/JdbcDriver.java | 68 ++---------- .../beam/sdk/extensions/sql/impl/JdbcFactory.java | 115 +++++++++++++++++++++ .../sdk/extensions/sql/impl/JdbcDriverTest.java | 12 +++ 5 files changed, 262 insertions(+), 69 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteFactoryWrapper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteFactoryWrapper.java new file mode 100644 index 0000000..a039154 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteFactoryWrapper.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.Properties; +import java.util.TimeZone; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.AvaticaFactory; +import org.apache.calcite.avatica.AvaticaPreparedStatement; +import org.apache.calcite.avatica.AvaticaResultSet; +import org.apache.calcite.avatica.AvaticaSpecificDatabaseMetaData; +import org.apache.calcite.avatica.AvaticaStatement; +import org.apache.calcite.avatica.Meta; +import org.apache.calcite.avatica.QueryState; +import org.apache.calcite.avatica.UnregisteredDriver; +import org.apache.calcite.jdbc.CalciteFactory; +import org.apache.calcite.jdbc.CalciteSchema; + +/** + * Wrapper for {@link CalciteFactory}. + * + *

This is a non-functional class to delegate to the underlying {@link CalciteFactory}. The + * purpose is to hide the delegation logic from the implementation ({@link JdbcFactory}). + */ +public abstract class CalciteFactoryWrapper extends CalciteFactory { + + protected CalciteFactory factory; + + CalciteFactoryWrapper(CalciteFactory factory) { + super(4, 1); + this.factory = factory; + } + + @Override + public AvaticaConnection newConnection( + UnregisteredDriver driver, + AvaticaFactory avaticaFactory, + String url, + Properties info, + CalciteSchema rootSchema, + JavaTypeFactory typeFactory) { + + return this.factory.newConnection(driver, avaticaFactory, url, info, rootSchema, typeFactory); + } + + @Override + public AvaticaStatement newStatement( + AvaticaConnection connection, + Meta.StatementHandle h, + int resultSetType, + int resultSetConcurrency, + int resultSetHoldability) + throws SQLException { + return this.factory.newStatement( + connection, h, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public AvaticaPreparedStatement newPreparedStatement( + AvaticaConnection connection, + Meta.StatementHandle h, + Meta.Signature signature, + int resultSetType, + int resultSetConcurrency, + int resultSetHoldability) + throws SQLException { + return this.factory.newPreparedStatement( + connection, h, signature, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public AvaticaResultSet newResultSet( + AvaticaStatement statement, + QueryState state, + Meta.Signature signature, + TimeZone timeZone, + Meta.Frame firstFrame) + throws SQLException { + return this.factory.newResultSet(statement, state, signature, timeZone, firstFrame); + } + + @Override + public AvaticaSpecificDatabaseMetaData newDatabaseMetaData(AvaticaConnection connection) { + return this.factory.newDatabaseMetaData(connection); + } + + @Override + public ResultSetMetaData newResultSetMetaData( + AvaticaStatement statement, Meta.Signature signature) throws SQLException { + return this.factory.newResultSetMetaData(statement, signature); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java index 318d90d..e3c2aa6 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java @@ -57,17 +57,20 @@ public class JdbcConnection extends CalciteConnectionWrapper { *

Sets the pipeline options, replaces the initial non-functional top-level schema with schema * created by {@link BeamCalciteSchemaFactory}. */ - static @Nullable JdbcConnection initialize(CalciteConnection connection) throws SQLException { - if (connection == null) { - return null; - } + static @Nullable JdbcConnection initialize(CalciteConnection connection) { + try { + if (connection == null) { + return null; + } - JdbcConnection jdbcConnection = new JdbcConnection(connection); - jdbcConnection.setPipelineOptionsMap(extractPipelineOptions(connection)); - jdbcConnection.getRootSchema().setCacheEnabled(false); - jdbcConnection.setSchema( - connection.getSchema(), BeamCalciteSchemaFactory.fromInitialEmptySchema(jdbcConnection)); - return jdbcConnection; + JdbcConnection jdbcConnection = new JdbcConnection(connection); + jdbcConnection.setPipelineOptionsMap(extractPipelineOptions(connection)); + jdbcConnection.setSchema( + connection.getSchema(), BeamCalciteSchemaFactory.fromInitialEmptySchema(jdbcConnection)); + return jdbcConnection; + } catch (SQLException e) { + throw new RuntimeException(e); + } } /** @@ -120,7 +123,6 @@ public class JdbcConnection extends CalciteConnectionWrapper { */ void setSchema(String name, TableProvider tableProvider) { BeamCalciteSchema beamCalciteSchema = new BeamCalciteSchema(this, tableProvider); - SchemaPlus addedSchemaPlus = getRootSchema().add(name, beamCalciteSchema); - addedSchemaPlus.setCacheEnabled(false); + getRootSchema().add(name, beamCalciteSchema); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java index 0cf8423..bb7cc42 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java @@ -17,12 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql.impl; -import static org.apache.calcite.avatica.BuiltInConnectionProperty.TIME_ZONE; -import static org.apache.calcite.config.CalciteConnectionProperty.LEX; -import static org.apache.calcite.config.CalciteConnectionProperty.PARSER_FACTORY; -import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA; import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA_FACTORY; -import static org.apache.calcite.config.CalciteConnectionProperty.TYPE_SYSTEM; import static org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory; import com.google.auto.service.AutoService; @@ -33,15 +28,12 @@ import java.util.List; import java.util.Properties; import java.util.function.Consumer; import org.apache.beam.sdk.extensions.sql.SqlTransform; -import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl; -import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets; import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.ReleaseInfo; -import org.apache.calcite.avatica.ConnectionProperty; -import org.apache.calcite.config.Lex; +import org.apache.calcite.avatica.AvaticaFactory; import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.jdbc.CalciteFactory; import org.apache.calcite.jdbc.Driver; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelOptRule; @@ -109,6 +101,11 @@ public class JdbcDriver extends Driver { } @Override + protected AvaticaFactory createFactory() { + return JdbcFactory.wrap((CalciteFactory) super.createFactory()); + } + + @Override protected String getConnectStringPrefix() { return CONNECT_STRING_PREFIX; } @@ -127,54 +124,10 @@ public class JdbcDriver extends Driver { * CalciteConnection}. */ @Override - public Connection connect(String url, Properties originalConnectionProperties) - throws SQLException { - - // do this check before even looking into properties - // do not remove this, please - if (!acceptsURL(url)) { - return null; - } - - Properties connectionProps = ensureDefaultProperties(originalConnectionProperties); - CalciteConnection calciteConnection = (CalciteConnection) super.connect(url, connectionProps); - + public Connection connect(String url, Properties info) throws SQLException { // calciteConnection is initialized with an empty Beam schema, // we need to populate it with pipeline options, load table providers, etc - return JdbcConnection.initialize(calciteConnection); - } - - /** - * Make sure required default properties are set. - * - *

Among other things sets up the parser class name, rel data type system and default schema - * factory. - * - *

The specified Beam schema factory will be used by Calcite to create the initial top level - * Beam schema. It can be later overridden by setting the schema via {@link - * JdbcConnection#setSchema(String, TableProvider)}. - */ - private Properties ensureDefaultProperties(Properties originalInfo) { - Properties info = new Properties(); - info.putAll(originalInfo); - - setIfNull(info, TIME_ZONE, "UTC"); - setIfNull(info, LEX, Lex.JAVA.name()); - setIfNull(info, PARSER_FACTORY, BeamSqlParserImpl.class.getName() + "#FACTORY"); - setIfNull(info, TYPE_SYSTEM, BeamRelDataTypeSystem.class.getName()); - setIfNull(info, SCHEMA, TOP_LEVEL_BEAM_SCHEMA); - setIfNull(info, SCHEMA_FACTORY, BeamCalciteSchemaFactory.AllProviders.class.getName()); - - info.put("beam.userAgent", "BeamSQL/" + ReleaseInfo.getReleaseInfo().getVersion()); - - return info; - } - - private static void setIfNull(Properties info, ConnectionProperty key, String value) { - // A null value indicates the default. We want to override defaults only. - if (info.getProperty(key.camelName()) == null) { - info.setProperty(key.camelName(), value); - } + return JdbcConnection.initialize((CalciteConnection) super.connect(url, info)); } /** @@ -191,7 +144,8 @@ public class JdbcDriver extends Driver { public static JdbcConnection connect(TableProvider tableProvider) { try { Properties properties = new Properties(); - setIfNull(properties, SCHEMA_FACTORY, BeamCalciteSchemaFactory.Empty.class.getName()); + properties.setProperty( + SCHEMA_FACTORY.camelName(), BeamCalciteSchemaFactory.Empty.class.getName()); JdbcConnection connection = (JdbcConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, properties); connection.setSchema(TOP_LEVEL_BEAM_SCHEMA, tableProvider); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcFactory.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcFactory.java new file mode 100644 index 0000000..70c1229 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcFactory.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl; + +import static org.apache.beam.sdk.extensions.sql.impl.JdbcDriver.TOP_LEVEL_BEAM_SCHEMA; +import static org.apache.calcite.avatica.BuiltInConnectionProperty.TIME_ZONE; +import static org.apache.calcite.config.CalciteConnectionProperty.LEX; +import static org.apache.calcite.config.CalciteConnectionProperty.PARSER_FACTORY; +import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA; +import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA_FACTORY; +import static org.apache.calcite.config.CalciteConnectionProperty.TYPE_SYSTEM; + +import java.util.Properties; +import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem; +import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; +import org.apache.beam.sdk.util.ReleaseInfo; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.AvaticaFactory; +import org.apache.calcite.avatica.ConnectionProperty; +import org.apache.calcite.avatica.UnregisteredDriver; +import org.apache.calcite.config.Lex; +import org.apache.calcite.jdbc.CalciteFactory; +import org.apache.calcite.jdbc.CalciteSchema; + +/** + * Implements {@link CalciteFactory} that is used by Clacite JDBC driver to instantiate different + * JDBC objects, like connections, result sets, etc. + * + *

The purpose of this class is to intercept the connection creation and force a cache-less root + * schema ({@link org.apache.calcite.jdbc.SimpleCalciteSchema}). Otherwise Calcite uses {@link + * org.apache.calcite.jdbc.CachingCalciteSchema} that eagerly caches table information. This + * behavior does not work well for dynamic table providers. + */ +class JdbcFactory extends CalciteFactoryWrapper { + + JdbcFactory(CalciteFactory factory) { + super(factory); + } + + static JdbcFactory wrap(CalciteFactory calciteFactory) { + return new JdbcFactory(calciteFactory); + } + + @Override + public AvaticaConnection newConnection( + UnregisteredDriver driver, + AvaticaFactory avaticaFactory, + String url, + Properties info, + CalciteSchema rootSchema, + JavaTypeFactory typeFactory) { + + Properties connectionProps = ensureDefaultProperties(info); + CalciteSchema actualRootSchema = rootSchema; + if (rootSchema == null) { + actualRootSchema = CalciteSchema.createRootSchema(true, false, ""); + } + + return super.newConnection( + driver, avaticaFactory, url, connectionProps, actualRootSchema, typeFactory); + } + + /** + * Make sure required default properties are set. + * + *

Among other things sets up the parser class name, rel data type system and default schema + * factory. + * + *

The specified Beam schema factory will be used by Calcite to create the initial top level + * Beam schema. It can be later overridden by setting the schema via {@link + * JdbcConnection#setSchema(String, TableProvider)}. + */ + private Properties ensureDefaultProperties(Properties originalInfo) { + Properties info = new Properties(); + info.putAll(originalInfo); + + setIfNull(info, TIME_ZONE, "UTC"); + setIfNull(info, LEX, Lex.JAVA.name()); + setIfNull(info, PARSER_FACTORY, BeamSqlParserImpl.class.getName() + "#FACTORY"); + setIfNull(info, TYPE_SYSTEM, BeamRelDataTypeSystem.class.getName()); + setIfNull(info, SCHEMA, TOP_LEVEL_BEAM_SCHEMA); + setIfNull(info, SCHEMA_FACTORY, BeamCalciteSchemaFactory.AllProviders.class.getName()); + setIfNull(info, "beam.userAgent", "BeamSQL/" + ReleaseInfo.getReleaseInfo().getVersion()); + + return info; + } + + private static void setIfNull(Properties info, ConnectionProperty key, String value) { + setIfNull(info, key.camelName(), value); + } + + private static void setIfNull(Properties info, String key, String value) { + // A null value indicates the default. We want to override defaults only. + if (info.getProperty(key) == null) { + info.setProperty(key, value); + } + } +} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java index a7525d2..6f36173 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; import org.apache.calcite.jdbc.CalciteConnection; @@ -120,6 +121,17 @@ public class JdbcDriverTest { assertThat(pipelineOptions.get("userAgent"), containsString("BeamSQL")); } + /** Tests that userAgent is set. */ + @Test + public void testDriverManager_hasUserAgent() throws Exception { + JdbcConnection connection = + (JdbcConnection) DriverManager.getConnection(JdbcDriver.CONNECT_STRING_PREFIX); + BeamCalciteSchema schema = connection.getCurrentBeamSchema(); + assertThat( + schema.getPipelineOptions().get("userAgent"), + equalTo("BeamSQL/" + ReleaseInfo.getReleaseInfo().getVersion())); + } + /** Tests that userAgent can be overridden on the querystring. */ @Test public void testDriverManager_setUserAgent() throws Exception {