flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jark Wu <imj...@gmail.com>
Subject Re: Flink cannot recognized catalog set by registerCatalog.
Date Tue, 13 Aug 2019 06:05:05 GMT
Hi Simon,

This is a temporary workaround for 1.9 release. We will fix the behavior in
1.10, see FLINK-13461.

Regards,
Jark

On Tue, 13 Aug 2019 at 13:57, Simon Su <barley_ss@163.com> wrote:

> Hi Jark
>
> Thanks for your reply.
>
> It’s weird that In this case the tableEnv provide the api called
> “registerCatalog”, but it does not work in some cases ( like my cases ).
> Do you think it’s feasible to unify this behaviors ? I think the document
> is necessary, but a unify way to use tableEnv is also important.
>
> Thanks,
> SImon
>
> On 08/13/2019 12:27,Jark Wu<imjark@gmail.com> <imjark@gmail.com> wrote:
>
> I think we might need to improve the javadoc of
> tableEnv.registerTableSource/registerTableSink.
> Currently, the comment says
>
> "Registers an external TableSink with already configured field names and
> field types in this TableEnvironment's catalog."
>
> But, what catalog? The current one or default in-memory one?
> I think, it would be better to improve the description and add a NOTE on
> it.
>
> Regards,
> Jark
>
> On Tue, 13 Aug 2019 at 10:52, Xuefu Z <usxuefu@gmail.com> wrote:
>
>> Yes, tableEnv.registerTable(_) etc always registers in the default
>> catalog.
>> To create table in your custom catalog, you could use
>> tableEnv.sqlUpdate("create table ....").
>>
>> Thanks,
>> Xuefu
>>
>> On Mon, Aug 12, 2019 at 6:17 PM Simon Su <barley_ss@163.com> wrote:
>>
>> > Hi Xuefu
>> >
>> > Thanks for you reply.
>> >
>> > Actually I have tried it as your advises. I have tried to call
>> > tableEnv.useCatalog and useDatabase. Also I have tried to use
>> > “catalogname.databasename.tableName”  in SQL. I think the root cause is
>> > that when I call tableEnv.registerTableSource, it’s always use a
>> “build-in”
>> > Catalog and Database rather than the custom one. So if I want to use a
>> > custom one, I have to write code like this:
>> >
>> > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>> >     EnvironmentSettings.newInstance()
>> >         .useBlinkPlanner()
>> >         .inStreamingMode()
>> >         .withBuiltInCatalogName("ca1")
>> >         .withBuiltInDatabaseName("db1")
>> >         .build());
>> >
>> >
>> > As Dawid said, if I want to store in my custom catalog, I can call
>> > catalog.createTable or using DDL.
>> >
>> > Thanks,
>> > SImon
>> >
>> > On 08/13/2019 02:55,Xuefu Z<usxuefu@gmail.com> <usxuefu@gmail.com>
>> wrote:
>> >
>> > Hi Simon,
>> >
>> > Thanks for reporting the problem. There is some rough edges around
>> catalog
>> > API and table environments, and we are improving post 1.9 release.
>> >
>> > Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
>> > Flink's CatalogManager, It doens't change the default catalog/database
>> as
>> > you expected. To switch to your newly registered catalog, you could call
>> > tableEnv.useCatalog() and .useDatabase().
>> >
>> > As an alternative, you could fully qualify your table name with a
>> > "catalog.db.table" syntax without switching current catalog/database.
>> >
>> > Please try those and let me know if you find new problems.
>> >
>> > Thanks,
>> > Xuefu
>> >
>> >
>> >
>> > On Mon, Aug 12, 2019 at 12:38 AM Simon Su <barley_ss@163.com> wrote:
>> >
>> >> Hi All
>> >>     I want to use a custom catalog by setting the name “ca1” and
>> create a
>> >> database under this catalog. When I submit the
>> >> SQL, and it raises the error like :
>> >>
>> >>
>> >>     Exception in thread "main"
>> >> org.apache.flink.table.api.ValidationException: SQL validation failed.
>> From
>> >> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> >> within 'ca1.db1'
>> >> at
>> >>
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> >> at
>> >>
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> >> at
>> >>
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> >> at
>> >>
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> >> at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> >> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> >> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> >> Caused by: org.apache.calcite.runtime.CalciteContextException: From
>> line
>> >> 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within
>> >> 'ca1.db1'
>> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> >> at
>> >>
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> >> at
>> >>
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> >> at
>> >>
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> >> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>> >> at
>> >>
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>> >> at
>> >>
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
>> >> ... 7 more
>> >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException:
>> Object
>> >> 'orderstream' not found within 'ca1.db1'
>> >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> >> at
>> >>
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> >> at
>> >>
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> >> at
>> >>
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> >> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>> >> ... 26 more
>> >>
>> >> It seems that Calcite cannot find the source object as expected, After
>> I
>> >> debug the code I found that when using tableEnv.registerTableSource or
>> >> registerTableSink, It will use a build-in catalog with a hard-code
>> catalog
>> >> name ( default-catalog ) and database name ( default_database ) while
>> >> tableEnv.registerCatalog here cannot change this behaviros, So is this
>> a
>> >> reasonable behaviors ? If I don’t want to use default build-in catalog
>> and
>> >> database, is there any other ways to do this ?
>> >>
>> >>
>> >>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
>> >> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
>> >> change build-in catalog !!
>> >> tableEnv.useCatalog(catalog.getName());
>> >> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
>> >> "comment"), true);
>> >> tableEnv.useDatabase("db1");
>> >>
>> >> tableEnv.connect(sourceKafka)
>> >> .withFormat(csv)
>> >> .withSchema(schema2)
>> >> .inAppendMode()
>> >> .registerTableSource("orderstream");
>> >>
>> >> tableEnv.connect(sinkKafka)
>> >> .withFormat(csv)
>> >> .withSchema(schema2)
>> >> .inAppendMode()
>> >> .registerTableSink("sinkstream");;
>> >>
>> >> String sql = "insert into ca1.db1.sinkstream " +
>> >> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
>> >> ca1.db1.orderstream " +
>> >> "group by tumble(ts, INTERVAL '5' SECOND), data";
>> >>
>> >> tableEnv.sqlUpdate(sql);
>> >> tableEnv.execute("test");
>> >>
>> >>
>> >> Thanks,
>> >> SImon
>> >>
>> >>
>> >
>> > --
>> > Xuefu Zhang
>> >
>> > "In Honey We Trust!"
>> >
>> >
>>
>> --
>> Xuefu Zhang
>>
>> "In Honey We Trust!"
>>
>

Mime
View raw message