flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Java Table API and external catalog bug?
Date Thu, 25 Oct 2018 08:04:22 GMT
Any other help here? is this a bug or something wrong in my code?

On Tue, Oct 23, 2018 at 9:02 AM Flavio Pompermaier <pompermaier@okkam.it>
wrote:

> I've tried with t2, test.t2 and test.test.t2.
>
> On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu, <xuefu.z@alibaba-inc.com> wrote:
>
>> Have you tried "t2" instead of "test.t2"? There is a possibility that
>> catalog name isn't part of the table name in the table API.
>>
>> Thanks,
>> Xuefu
>>
>> ------------------------------------------------------------------
>> Sender:Flavio Pompermaier <pompermaier@okkam.it>
>> Sent at:2018 Oct 22 (Mon) 23:06
>> Recipient:user <user@flink.apache.org>
>> Subject:Java Table API and external catalog bug?
>>
>> Hi to all,
>> I've tried to register an external catalog and use it with the Table API
>> in Flink 1.6.1.
>> The following (Java) test job cannot write to a sink using insertInto
>> because Flink cannot find the table by id (test.t2). Am I doing something
>> wrong or is this a bug?
>>
>> This is my Java test class:
>>
>> import org.apache.flink.api.java.ExecutionEnvironment;
>> import org.apache.flink.core.fs.FileSystem.WriteMode;
>> import org.apache.flink.table.api.TableEnvironment;
>> import org.apache.flink.table.api.java.BatchTableEnvironment;
>> import org.apache.flink.table.catalog.ExternalCatalogTable;
>> import org.apache.flink.table.catalog.InMemoryExternalCatalog;
>> import org.apache.flink.table.descriptors.Csv;
>> import org.apache.flink.table.descriptors.FileSystem;
>> import org.apache.flink.table.descriptors.FormatDescriptor;
>> import org.apache.flink.table.descriptors.Schema;
>> import org.apache.flink.table.sinks.CsvTableSink;
>>
>> public class CatalogExperiment {
>>   public static void main(String[] args) throws Exception {
>>     // create an external catalog
>>     final String outPath = "file:/tmp/file2.txt";
>>     InMemoryExternalCatalog catalog = new InMemoryExternalCatalog("test");
>>     FileSystem connDescIn = new
>> FileSystem().path("file:/tmp/file-test.txt");
>>     FileSystem connDescOut = new FileSystem().path(outPath);
>>     FormatDescriptor csvDesc = new Csv()//
>>         .field("a", "string")//
>>         .field("b", "string")//
>>         .field("c", "string")//
>>         .fieldDelimiter("\t");
>>     Schema schemaDesc = new Schema()//
>>         .field("a", "string")//
>>         .field("b", "string")//
>>         .field("c", "string");
>>     ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)//
>>         .withFormat(csvDesc)//
>>         .withSchema(schemaDesc)//
>>         .asTableSource();
>>     ExternalCatalogTable t2 = ExternalCatalogTable.builder(connDescOut)//
>>         .withFormat(csvDesc)//
>>         .withSchema(schemaDesc)//
>>         .asTableSink();
>>     catalog.createTable("t1", t1, true);
>>     catalog.createTable("t2", t2, true);
>>
>>     final  ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>     final BatchTableEnvironment btEnv =
>> TableEnvironment.getTableEnvironment(env);
>>     btEnv.registerExternalCatalog("test", catalog);
>>     // this does not work ---------------------------------------
>>     btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table was
>> registered under the name test.t2
>>     // this works ---------------------------------------
>>     btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath, "\t",
>> 1, WriteMode.OVERWRITE));
>>     env.execute();
>>   }
>> }
>>
>>
>> Best,
>> Flavio
>>
>>

Mime
View raw message