flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: Stream sql query in Flink
Date Fri, 16 Sep 2016 10:55:04 GMT
I have opened a PR which should solve your problem. Would be great if 
you could test it.

https://github.com/apache/flink/pull/2506

Timo

Am 06/09/16 um 14:31 schrieb Timo Walther:
> Hi,
>
> this looks like a bug. I created an issue for it 
> (https://issues.apache.org/jira/browse/FLINK-4581). Could you also 
> send us the pom.xml you are using for your project?
>
> Timo
>
> Am 06/09/16 um 13:47 schrieb jiecxy:
>> Hi all,
>>    I want to write a program, a thread read the real-time message from
>> /var/log/messages and write them to kafaka, and it works. Then I want 
>> to use
>> sql of flink to query the messages, and the following are my code:
>>
>> -----------------------------------------------------------------------------------------------------------

>>
>>
>>          // set up the execution environment
>>          final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>          env.setParallelism(2);
>>
>>          StreamTableEnvironment tableEnv =
>> TableEnvironment.getTableEnvironment(env);
>>
>>
>>          DataStream<String> text = env.addSource(new
>> FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(), 
>> properties));
>>          DataStream<Tuple4&lt;Long, String, String, String>> messages
=
>>                  text.flatMap(new Tokenizer());
>>          tableEnv.registerDataStream("Syslogs", messages, "time, user,
>> process, msg");
>>
>>          Table result = tableEnv.sql(
>>                  "SELECT STREAM msg FROM Syslogs WHERE msg LIKE 
>> '%system%'"
>>          );
>>
>>
>>          TableSink sink = new 
>> CsvTableSink("/home/jiecxy/Desktop/test.csv",
>> "|");
>>          result.writeToSink(sink);
>>
>>          // execute program
>>          env.execute();
>> -----------------------------------------------------------------------------------------------------------

>>
>> Note: the class Tokenizer is to transfer the log to four parts. Like 
>> this:
>>     Sep  6 09:28:01 master systemd: Stopping user-988.slice.
>> to
>>    Tuple4<time, master, systemd,  Stopping user-988.slice.>
>>
>>
>> But when I ran it use Flink:
>>    bin/flink run readlog.jar
>>
>> I got the exception...  What should I do?
>>
>>
>> Starting execution of program
>>
>> ------------------------------------------------------------
>>   The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main 
>> method
>> caused an error.
>>     at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)

>>
>>     at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)

>>
>>     at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) 
>>
>>     at 
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>>     at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) 
>>
>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
>> Caused by: java.lang.RuntimeException: java.sql.SQLException: No 
>> suitable
>> driver found for jdbc:calcite:
>>     at 
>> org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:151)
>>     at 
>> org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:106)
>>     at 
>> org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:127)
>>     at
>> org.apache.flink.api.table.FlinkRelBuilder$.create(FlinkRelBuilder.scala:56) 
>>
>>     at
>> org.apache.flink.api.table.TableEnvironment.<init>(TableEnvironment.scala:73)

>>
>>     at
>> org.apache.flink.api.table.StreamTableEnvironment.<init>(StreamTableEnvironment.scala:58)

>>
>>     at
>> org.apache.flink.api.java.table.StreamTableEnvironment.<init>(StreamTableEnvironment.scala:45)

>>
>>     at
>> org.apache.flink.api.table.TableEnvironment$.getTableEnvironment(TableEnvironment.scala:376)

>>
>>     at
>> org.apache.flink.api.table.TableEnvironment.getTableEnvironment(TableEnvironment.scala)

>>
>>     at 
>> org.myorg.quickstart.ReadingFromKafka2.main(ReadingFromKafka2.java:48)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
>>
>>     at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

>>
>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>     at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)

>>
>>     ... 6 more
>> Caused by: java.sql.SQLException: No suitable driver found for 
>> jdbc:calcite:
>>     at java.sql.DriverManager.getConnection(DriverManager.java:689)
>>     at java.sql.DriverManager.getConnection(DriverManager.java:208)
>>     at 
>> org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:144)
>>     ... 20 more
>>
>>
>>
>>
>> -- 
>> View this message in context: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stream-sql-query-in-Flink-tp8928.html
>> Sent from the Apache Flink User Mailing List archive. mailing list 
>> archive at Nabble.com.
>
>


-- 
Freundliche Grüße / Kind Regards

Timo Walther

Follow me: @twalthr
https://www.linkedin.com/in/twalthr


Mime
View raw message