flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Olsen <davidolsen4...@gmail.com>
Subject Re: java.io.IOException: Couldn't access resultSet
Date Mon, 06 Jun 2016 15:37:03 GMT
After recompiling the 1.0.3 source and testing it, I discover that
InputFormat.open() in FileSourceFunction doesn't get called
because splitIterator.hasNext() returns false. This looks
like getInputSplits() creates Iterator<InputSplit> object with 'exhausted'
variable initialized to false, and then the following statement checks if
hasNext() then open inputformat when it's true but exhausted variable
always returns false, resulting in InputFormat.open() is not called (due to
hasNext() always returns false). Is the variable 'exhausted' supposed to
act in that way (initialized to false, then check if hasNext() true, which
unfortunately is always false)?

I appreciate any suggestions. Thanks.


On 6 June 2016 at 15:46, Chesnay Schepler <chesnay@apache.org> wrote:

> the JDBC IF does not and never has used the configuration.
>
>
> On 06.06.2016 09:27, Aljoscha Krettek wrote:
>
> The problem could be that open() is not called with a proper Configuration
> object in streaming mode.
>
> On Sun, 5 Jun 2016 at 19:33 Stephan Ewen < <sewen@apache.org>
> sewen@apache.org> wrote:
>
>> Hi David!
>>
>> You are using the JDBC format that was written for the batch API in the
>> streaming API.
>>
>> While that should actually work, it is a somewhat new and less tested
>> function. Let's double check that the call to open() is properly forwarded.
>>
>>
>> On Sun, Jun 5, 2016 at 12:47 PM, David Olsen <davidolsen4123@gmail.com>
>> wrote:
>>
>>> Switching to use org.apache.flink.api.java.ExecutionEnvironment, my code
>>> can successfully read data from database through JDBCInputFormat. But I
>>> need stream mode (and now it seems that the DataSet and DataStream is not
>>> interchangeable). Are there any additional functions required to be
>>> executed before StreamExecutionEnvironment creates jdbc input?
>>>
>>> Thanks
>>>
>>>
>>> On 5 June 2016 at 18:26, David Olsen < <davidolsen4123@gmail.com>
>>> davidolsen4123@gmail.com> wrote:
>>>
>>>> I remove the open method when constructing jdbc input format, but I
>>>> still obtain "couldn't access resultSet" error.
>>>>
>>>> Caused by: java.io.IOException: Couldn't access resultSet
>>>> at
>>>> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:179)
>>>> at
>>>> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:51)
>>>> at
>>>> org.apache.flink.streaming.api.functions.source.FileSourceFunction.run(FileSourceFunction.java:124)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.lang.NullPointerException
>>>> at
>>>> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:164)
>>>> ... 7 more
>>>>
>>>> Anything I should check as well?
>>>>
>>>> Thanks
>>>>
>>>>
>>>> On 5 June 2016 at 17:26, Chesnay Schepler < <chesnay@apache.org>
>>>> chesnay@apache.org> wrote:
>>>>
>>>>> you are not supposed to call open yourselves.
>>>>>
>>>>>
>>>>> On 05.06.2016 11:05, David Olsen wrote:
>>>>>
>>>>>> Following the sample on the flink website[1] to test jdbc I
>>>>>> encountered an error "Couldn't access resultSet". It looks like the
>>>>>> nextRecord is called before open() function. However I've called
open()
>>>>>> when I construct jdbc input format. Any functions I should call before
job
>>>>>> submission?
>>>>>>
>>>>>> def jdbc()= {
>>>>>>   val jdbcif =
>>>>>> JDBCInputFormat.buildJDBCInputFormat.setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://localhost/test").setQuery("select
>>>>>> name from department").setUsername(...).setPassword(...).finish
>>>>>>   jdbcif.open(null)
>>>>>>   jdbcif.asInstanceOf[JDBCInputFormat[Tuple1[String]]]
>>>>>> }
>>>>>>
>>>>>> def main(args: Array[String]) {
>>>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
//
>>>>>> -> import
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>>>>>>     val evidence$6 = new TupleTypeInfo(classOf[Tuple1[String]],
>>>>>> STRING_TYPE_INFO)
>>>>>>     val stream = env.createInput(jdbc(), evidence$6)
>>>>>>     stream.map ( new MapFunction[Tuple1[String], String]() {
>>>>>>       override def map(tuple: Tuple1[String]): String =
>>>>>> tuple.getField(0)
>>>>>> }).returns(classOf[String]).writeAsText("/path/to/jdbc")
>>>>>>     env.execute("test-flink")
>>>>>> }
>>>>>>
>>>>>> The version used in this test is flink 1.0.3 and scala 2.11.
>>>>>>
>>>>>> [1].
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message