flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Simon Su" <barley...@163.com>
Subject Re: Flink cannot recognized catalog set by registerCatalog.
Date Tue, 13 Aug 2019 01:17:05 GMT
Hi Xuefu


Thanks for you reply. 


Actually I have tried it as your advises. I have tried to call tableEnv.useCatalog and useDatabase.
Also I have tried to use “catalogname.databasename.tableName”  in SQL. I think the root
cause is that when I call tableEnv.registerTableSource, it’s always use a “build-in”
Catalog and Database rather than the custom one. So if I want to use a custom one, I have
to write code like this:


StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.withBuiltInCatalogName("ca1")
.withBuiltInDatabaseName("db1")
.build());


As Dawid said, if I want to store in my custom catalog, I can call catalog.createTable or
using DDL.


Thanks,
SImon


On 08/13/2019 02:55,Xuefu Z<usxuefu@gmail.com> wrote:
Hi Simon,


Thanks for reporting the problem. There is some rough edges around catalog API and table environments,
and we are improving post 1.9 release.


Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in Flink's CatalogManager,
It doens't change the default catalog/database as you expected. To switch to your newly registered
catalog, you could call tableEnv.useCatalog() and .useDatabase().


As an alternative, you could fully qualify your table name with a "catalog.db.table" syntax
without switching current catalog/database.


Please try those and let me know if you find new problems.


Thanks,
Xuefu







On Mon, Aug 12, 2019 at 12:38 AM Simon Su <barley_ss@163.com> wrote:

Hi All
    I want to use a custom catalog by setting the name “ca1” and create a database under
this catalog. When I submit the
SQL, and it raises the error like :


    Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation
failed. From line 1, column 98 to line 1, column 116: Object 'orderstream' not found within
'ca1.db1'
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
at org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 98 to line
1, column 116: Object 'orderstream' not found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
... 7 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object 'orderstream' not
found within 'ca1.db1'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 26 more

It seems that Calcite cannot find the source object as expected, After I debug the code I
found that when using tableEnv.registerTableSource or registerTableSink, It will use a build-in
catalog with a hard-code catalog name ( default-catalog ) and database name ( default_database
) while tableEnv.registerCatalog here cannot change this behaviros, So is this a reasonable
behaviors ? If I don’t want to use default build-in catalog and database, is there any other
ways to do this ?


   GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to change build-in catalog
!!
tableEnv.useCatalog(catalog.getName());
catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), "comment"), true);
tableEnv.useDatabase("db1");

tableEnv.connect(sourceKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSource("orderstream");

tableEnv.connect(sinkKafka)
.withFormat(csv)
.withSchema(schema2)
.inAppendMode()
.registerTableSink("sinkstream");;

String sql = "insert into ca1.db1.sinkstream " +
"select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from ca1.db1.orderstream " +
"group by tumble(ts, INTERVAL '5' SECOND), data";

tableEnv.sqlUpdate(sql);
tableEnv.execute("test");


Thanks,
SImon




--

Xuefu Zhang

"In Honey We Trust!"
Mime
View raw message