Hi Simon,

This is a temporary workaround for 1.9 release. We will fix the behavior in 1.10, see FLINK-13461.

Regards,
Jark

On Tue, 13 Aug 2019 at 13:57, Simon Su <barley_ss@163.com> wrote:
Hi Jark

Thanks for your reply. 

It’s weird that In this case the tableEnv provide the api called “registerCatalog”, but it does not work in some cases ( like my cases ).
Do you think it’s feasible to unify this behaviors ? I think the document is necessary, but a unify way to use tableEnv is also important.

Thanks,
SImon

On 08/13/2019 12:27Jark Wu<imjark@gmail.com> wrote:
I think we might need to improve the javadoc of tableEnv.registerTableSource/registerTableSink. 
Currently, the comment says 

"Registers an external TableSink with already configured field names and field types in this TableEnvironment's catalog."

But, what catalog? The current one or default in-memory one? 
I think, it would be better to improve the description and add a NOTE on it. 

Regards,
Jark

On Tue, 13 Aug 2019 at 10:52, Xuefu Z <usxuefu@gmail.com> wrote:
Yes, tableEnv.registerTable(_) etc always registers in the default catalog.
To create table in your custom catalog, you could use
tableEnv.sqlUpdate("create table ....").

Thanks,
Xuefu

On Mon, Aug 12, 2019 at 6:17 PM Simon Su <barley_ss@163.com> wrote:

> Hi Xuefu
>
> Thanks for you reply.
>
> Actually I have tried it as your advises. I have tried to call
> tableEnv.useCatalog and useDatabase. Also I have tried to use
> “catalogname.databasename.tableName”  in SQL. I think the root cause is
> that when I call tableEnv.registerTableSource, it’s always use a “build-in”
> Catalog and Database rather than the custom one. So if I want to use a
> custom one, I have to write code like this:
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>     EnvironmentSettings.newInstance()
>         .useBlinkPlanner()
>         .inStreamingMode()
>         .withBuiltInCatalogName("ca1")
>         .withBuiltInDatabaseName("db1")
>         .build());
>
>
> As Dawid said, if I want to store in my custom catalog, I can call
> catalog.createTable or using DDL.
>
> Thanks,
> SImon
>
> On 08/13/2019 02:55,Xuefu Z<usxuefu@gmail.com> <usxuefu@gmail.com> wrote:
>
> Hi Simon,
>
> Thanks for reporting the problem. There is some rough edges around catalog
> API and table environments, and we are improving post 1.9 release.
>
> Nevertheless, tableEnv.registerCatalog() is just to put a new catalog in
> Flink's CatalogManager, It doens't change the default catalog/database as
> you expected. To switch to your newly registered catalog, you could call
> tableEnv.useCatalog() and .useDatabase().
>
> As an alternative, you could fully qualify your table name with a
> "catalog.db.table" syntax without switching current catalog/database.
>
> Please try those and let me know if you find new problems.
>
> Thanks,
> Xuefu
>
>
>
> On Mon, Aug 12, 2019 at 12:38 AM Simon Su <barley_ss@163.com> wrote:
>
>> Hi All
>>     I want to use a custom catalog by setting the name “ca1” and create a
>> database under this catalog. When I submit the
>> SQL, and it raises the error like :
>>
>>
>>     Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. From
>> line 1, column 98 to line 1, column 116: Object 'orderstream' not found
>> within 'ca1.db1'
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
>> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
>> Caused by: org.apache.calcite.runtime.CalciteContextException: From line
>> 1, column 98 to line 1, column 116: Object 'orderstream' not found within
>> 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
>> at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
>> at
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>> at
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
>> ... 7 more
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
>> 'orderstream' not found within 'ca1.db1'
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>> ... 26 more
>>
>> It seems that Calcite cannot find the source object as expected, After I
>> debug the code I found that when using tableEnv.registerTableSource or
>> registerTableSink, It will use a build-in catalog with a hard-code catalog
>> name ( default-catalog ) and database name ( default_database ) while
>> tableEnv.registerCatalog here cannot change this behaviros, So is this a
>> reasonable behaviors ? If I don’t want to use default build-in catalog and
>> database, is there any other ways to do this ?
>>
>>
>>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
>> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to
>> change build-in catalog !!
>> tableEnv.useCatalog(catalog.getName());
>> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
>> "comment"), true);
>> tableEnv.useDatabase("db1");
>>
>> tableEnv.connect(sourceKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSource("orderstream");
>>
>> tableEnv.connect(sinkKafka)
>> .withFormat(csv)
>> .withSchema(schema2)
>> .inAppendMode()
>> .registerTableSink("sinkstream");;
>>
>> String sql = "insert into ca1.db1.sinkstream " +
>> "select tumble_start(ts, INTERVAL '5' SECOND) as t, max(data) from
>> ca1.db1.orderstream " +
>> "group by tumble(ts, INTERVAL '5' SECOND), data";
>>
>> tableEnv.sqlUpdate(sql);
>> tableEnv.execute("test");
>>
>>
>> Thanks,
>> SImon
>>
>>
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>
>

--
Xuefu Zhang

"In Honey We Trust!"