spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Koert Kuipers <ko...@tresata.com>
Subject Re: kafka structured streaming source refuses to read
Date Sat, 28 Jan 2017 18:13:39 GMT
it seems the bug is:
https://issues.apache.org/jira/browse/KAFKA-4547

i would advise everyone not to use kafka-clients 0.10.0.2, 0.10.1.0 or
0.10.1.1

On Fri, Jan 27, 2017 at 3:56 PM, Koert Kuipers <koert@tresata.com> wrote:

> in case anyone else runs into this:
>
> the issue is that i was using kafka-clients 0.10.1.1
>
> it works when i use kafka-clients 0.10.0.1 with spark structured streaming
>
> my kafka server is 0.10.1.1
>
> On Fri, Jan 27, 2017 at 1:24 PM, Koert Kuipers <koert@tresata.com> wrote:
>
>> i checked my topic. it has 5 partitions but all the data is written to a
>> single partition: wikipedia-2
>> i turned on debug logging and i see this:
>>
>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to
>> consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2,
>> wikipedia-1]. Seeking to the end.
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-0
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-4
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-3
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-2
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-1
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-0 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-0
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-0 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-0
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-4 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-4
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-4 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-4
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-3 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received
>> successful heartbeat response for group spark-kafka-source-fac4f749-fd
>> 56-4a32-82c7-e687aadf520b-1923704552-driver-0
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-3
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-3 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-3
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-2 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=152908} for partition wikipedia-2
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-2 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-2
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-1 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-1
>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for
>> partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0,
>> wikipedia-3 -> 0, wikipedia-0 -> 0)
>>
>> what is confusing to me is this:
>> Resetting offset for partition wikipedia-2 to latest offset.
>> Fetched {timestamp=-1, offset=152908} for partition wikipedia-2
>> Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 ->
>> 0, wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0)
>>
>> why does it find latest offset 152908 for wikipedia-2 but then sets
>> latest offset to 0 for that partition? or am i misunderstanding?
>>
>> On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers <koert@tresata.com> wrote:
>>
>>> code:
>>>       val query = spark.readStream
>>>         .format("kafka")
>>>         .option("kafka.bootstrap.servers", "somenode:9092")
>>>         .option("subscribe", "wikipedia")
>>>         .load
>>>         .select(col("value") cast StringType)
>>>         .writeStream
>>>         .format("console")
>>>         .outputMode(OutputMode.Append)
>>>         .start()
>>>
>>>       while (true) {
>>>         Thread.sleep(10000)
>>>         println(query.lastProgress)
>>>       }
>>>     }
>>>
>>> On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman <
>>> alonsoir@gmail.com> wrote:
>>>
>>>> lets see the code...
>>>>
>>>> Alonso Isidoro Roman
>>>> [image: https://]about.me/alonso.isidoro.roman
>>>>
>>>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>>>>
>>>> 2017-01-27 5:56 GMT+01:00 Koert Kuipers <koert@tresata.com>:
>>>>
>>>>> my little program prints out query.lastProgress every 10 seconds, and
>>>>> this is what it shows:
>>>>>
>>>>> {
>>>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>>>   "name" : "wiki",
>>>>>   "timestamp" : "2017-01-26T22:54:45.732Z",
>>>>>   "numInputRows" : 0,
>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>   "durationMs" : {
>>>>>     "getOffset" : 9,
>>>>>     "triggerExecution" : 10
>>>>>   },
>>>>>   "stateOperators" : [ ],
>>>>>   "sources" : [ {
>>>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>>>     "startOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "endOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "numInputRows" : 0,
>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>     "processedRowsPerSecond" : 0.0
>>>>>   } ],
>>>>>   "sink" : {
>>>>>     "description" : "org.apache.spark.sql.executio
>>>>> n.streaming.ConsoleSink@4818d2d9"
>>>>>   }
>>>>> }
>>>>> {
>>>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>>>   "name" : "wiki",
>>>>>   "timestamp" : "2017-01-26T22:54:55.745Z",
>>>>>   "numInputRows" : 0,
>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>   "durationMs" : {
>>>>>     "getOffset" : 5,
>>>>>     "triggerExecution" : 5
>>>>>   },
>>>>>   "stateOperators" : [ ],
>>>>>   "sources" : [ {
>>>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>>>     "startOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "endOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "numInputRows" : 0,
>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>     "processedRowsPerSecond" : 0.0
>>>>>   } ],
>>>>>   "sink" : {
>>>>>     "description" : "org.apache.spark.sql.executio
>>>>> n.streaming.ConsoleSink@4818d2d9"
>>>>>   }
>>>>> }
>>>>> {
>>>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>>>   "name" : "wiki",
>>>>>   "timestamp" : "2017-01-26T22:55:05.748Z",
>>>>>   "numInputRows" : 0,
>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>   "durationMs" : {
>>>>>     "getOffset" : 5,
>>>>>     "triggerExecution" : 5
>>>>>   },
>>>>>   "stateOperators" : [ ],
>>>>>   "sources" : [ {
>>>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>>>     "startOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "endOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "numInputRows" : 0,
>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>     "processedRowsPerSecond" : 0.0
>>>>>   } ],
>>>>>   "sink" : {
>>>>>     "description" : "org.apache.spark.sql.executio
>>>>> n.streaming.ConsoleSink@4818d2d9"
>>>>>   }
>>>>> }
>>>>> {
>>>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>>>   "name" : "wiki",
>>>>>   "timestamp" : "2017-01-26T22:55:15.758Z",
>>>>>   "numInputRows" : 0,
>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>   "durationMs" : {
>>>>>     "getOffset" : 4,
>>>>>     "triggerExecution" : 4
>>>>>   },
>>>>>   "stateOperators" : [ ],
>>>>>   "sources" : [ {
>>>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>>>     "startOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "endOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "numInputRows" : 0,
>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>     "processedRowsPerSecond" : 0.0
>>>>>   } ],
>>>>>   "sink" : {
>>>>>     "description" : "org.apache.spark.sql.executio
>>>>> n.streaming.ConsoleSink@4818d2d9"
>>>>>   }
>>>>> }
>>>>> {
>>>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>>>   "name" : "wiki",
>>>>>   "timestamp" : "2017-01-26T22:55:25.760Z",
>>>>>   "numInputRows" : 0,
>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>   "durationMs" : {
>>>>>     "getOffset" : 4,
>>>>>     "triggerExecution" : 4
>>>>>   },
>>>>>   "stateOperators" : [ ],
>>>>>   "sources" : [ {
>>>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>>>     "startOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "endOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "numInputRows" : 0,
>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>     "processedRowsPerSecond" : 0.0
>>>>>   } ],
>>>>>   "sink" : {
>>>>>     "description" : "org.apache.spark.sql.executio
>>>>> n.streaming.ConsoleSink@4818d2d9"
>>>>>   }
>>>>> }
>>>>> {
>>>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>>>   "name" : "wiki",
>>>>>   "timestamp" : "2017-01-26T22:55:35.766Z",
>>>>>   "numInputRows" : 0,
>>>>>   "inputRowsPerSecond" : 0.0,
>>>>>   "processedRowsPerSecond" : 0.0,
>>>>>   "durationMs" : {
>>>>>     "getOffset" : 4,
>>>>>     "triggerExecution" : 4
>>>>>   },
>>>>>   "stateOperators" : [ ],
>>>>>   "sources" : [ {
>>>>>     "description" : "KafkaSource[Subscribe[wikipedia]]",
>>>>>     "startOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "endOffset" : {
>>>>>       "wikipedia" : {
>>>>>         "2" : 0,
>>>>>         "4" : 0,
>>>>>         "1" : 0,
>>>>>         "3" : 0,
>>>>>         "0" : 0
>>>>>       }
>>>>>     },
>>>>>     "numInputRows" : 0,
>>>>>     "inputRowsPerSecond" : 0.0,
>>>>>     "processedRowsPerSecond" : 0.0
>>>>>   } ],
>>>>>   "sink" : {
>>>>>     "description" : "org.apache.spark.sql.executio
>>>>> n.streaming.ConsoleSink@4818d2d9"
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>> On Thu, Jan 26, 2017 at 10:33 PM, Koert Kuipers <koert@tresata.com>
>>>>> wrote:
>>>>>
>>>>>> hey,
>>>>>> i am just getting started with kafka + spark structured streaming.
so
>>>>>> this is probably a pretty dumb mistake.
>>>>>>
>>>>>> i wrote a little program in spark to read messages from a kafka topic
>>>>>> and display them in the console, using the kafka source and console
sink. i
>>>>>> run it it in spark local mode.
>>>>>>
>>>>>> i hooked it up to a test topic that i send messages to using the
>>>>>> kafka console producer, and everything works great. i type a message
in the
>>>>>> console producer, and it pops up in my spark program. very neat!
>>>>>>
>>>>>> next i point it to another topic instead on which a kafka-connect
>>>>>> program is writing lots of irc messages. i can see kafka connect
to the
>>>>>> topic successfully, the partitions are discovered etc., and then...
>>>>>> nothing. it just keeps stuck at offsets 0 for all partitions. at
the same
>>>>>> time in another terminal i can see messages coming in just fine using
the
>>>>>> kafka console consumer.
>>>>>>
>>>>>> i dont get it. why doesnt kafka want to consume from this topic in
>>>>>> spark structured streaming?
>>>>>>
>>>>>> thanks! koert
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message