hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [incubator-hudi] eigakow opened a new issue #1375: [SUPPORT] HoodieDeltaStreamer offset not handled correctly
Date Wed, 04 Mar 2020 16:20:51 GMT
eigakow opened a new issue #1375: [SUPPORT] HoodieDeltaStreamer offset not handled correctly
URL: https://github.com/apache/incubator-hudi/issues/1375
 
 
   **Describe the problem you faced**
   
   I am trying to implement a continuous DeltaStreamer (`hudi-0.5.1-incubating`) from Kafka
source with 3 partitions, which has been running for a while, so the earliest offsets are
no longer available on Kafka server. While connecting with a new target-base-path: 
   - the first message is processed correctly
   - the second one fails with `'org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Offsets out of range with no configured reset policy for partitions'`
   Any subsequent submits give the OffsetOutOfRange exceptions, unless I will provide a new
target-base-path.
   
   I have added some logging into KafkaOffsetGen as suggested in [#1335](https://github.com/apache/incubator-hudi/issues/1335)
and I see that:
   - for first message there is no checkpoint present, therefore it is going with 'LATEST'
as expected. Message is processed and commit is successful.
   - for the next message the previous checkpoint is now detected. However it wants to pull
all the previous messages from Kafka topic due to fromOffsets value being empty:  `numEvents:
5000000, fromOffsets: {}, toOffsets: {fr-bru-0=338362, fr-bru-1=142427, fr-bru-2=142401}`
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Create a custom properties file.
   ```
   hoodie.datasource.write.recordkey.field=full_count
   hoodie.datasource.write.partitionpath.field=full_count
   hoodie.deltastreamer.schemaprovider.source.schema.file=file:///home/director/me/hudi-0.5.1-incubating/schema.avro
   hoodie.deltastreamer.schemaprovider.target.schema.file=file:///home/director/me/hudi-0.5.1-incubating/schema.avro
   source-class=FR24JsonKafkaSource
   bootstrap.servers=streaming-kafka-broker-1:9092,streaming-kafka-broker-2:9092,streaming-kafka-broker-3:9092
   group.id=hudi_testing
   hoodie.deltastreamer.source.kafka.topic=fr-bru
   enable.auto.commit=false
   schemaprovider-class=org.apache.hudi.utilities.schema.FilebasedSchemaProvider
   auto.offset.reset=latest
   ```
   2. Launch spark-submit with HoodieDeltaStreamer
   `spark-submit --master local --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
--jars $(pwd)/../my-app-1-jar-with-dependencies.jar $(pwd)/../hudi-0.5.1-incubating_latest/incubator-hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.5.1-incubating.jar
--props file:///$(pwd)/hudi-fr24.properties --target-base-path file:///tmp/test-hudi_new --table-type
MERGE_ON_READ --target-table test_hudi_new --source-class FR24JsonKafkaSource --schemaprovider-class
org.apache.hudi.utilities.schema.FilebasedSchemaProvider --continuous
   `
   
   **Expected behavior**
   
   The jobs handles the offsets correctly and continues to read kafka messages
   
   **Environment Description**
   
   * Hudi version : hudi-0.5.1-incubating
   
   * Spark version : 2.4.0-cdh6.1.0
   
   * Hive version : 2.1.1-cdh6.1.0
   
   * Hadoop version : 3.0.0-cdh6.1.0
   
   * Storage (HDFS/S3/GCS..) : tried both hdfs and local
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
    I am using a custom class for source. 
   
   **Stacktrace**
   
   ```20/03/04 10:21:02 INFO util.FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://nameservice1],
Config:[Configuration: core-default.xml, ore-site.xml, mapred-default.xml, mapred-site.xml,
yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf_.xml],
FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_2059723432_1, ugi=iga@BIGDATAPOC.LOCAL
(auth:KERBEROS)]]]
   20/03/04 10:21:02 INFO table.HoodieTableConfig: Loading table properties from /tmp/test-hudi_new/.hoodie/hoodie.properties
   20/03/04 10:21:02 INFO table.HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1)
from /tmp/test-hudi_new
   20/03/04 10:21:02 INFO table.HoodieTableMetaClient: Loading Active commit timeline for
/tmp/test-hudi_new
   20/03/04 10:21:02 INFO timeline.HoodieActiveTimeline: Loaded instants [[20200304102036__commit__COMPLETED]]
   20/03/04 10:21:02 INFO table.HoodieCopyOnWriteTable: Nothing to clean here. It is already
clean
   20/03/04 10:21:02 INFO hudi.AbstractHoodieWriteClient: Committed 20200304102036
   20/03/04 10:21:02 INFO deltastreamer.DeltaSync: Commit 20200304102036 successful!
   20/03/04 10:21:02 INFO rdd.MapPartitionsRDD: Removing RDD 31 from persistence list
   20/03/04 10:21:02 INFO storage.BlockManager: Removing RDD 31
   20/03/04 10:21:02 INFO rdd.MapPartitionsRDD: Removing RDD 39 from persistence list
   20/03/04 10:21:02 INFO storage.BlockManager: Removing RDD 39
   20/03/04 10:21:02 INFO table.HoodieTableMetaClient: Loading HoodieTableMetaClient from
/tmp/test-hudi_new
   20/03/04 10:21:02 INFO util.FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://nameservice1],
Config:[Configuration: core-default.xml, ore-site.xml, mapred-default.xml, mapred-site.xml,
yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml], FileSystem: [DFS[DFClient[clientName=DFSClient_NONMAPREDUCE_2059723432_1,
ugi=iga@BIGDATAPOC.LOCAL (auth:KERBEROS)]]]
   20/03/04 10:21:02 INFO table.HoodieTableConfig: Loading table properties from /tmp/test-hudi_new/.hoodie/hoodie.properties
   20/03/04 10:21:02 INFO table.HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1)
from /tmp/test-hudi_new
   20/03/04 10:21:02 INFO timeline.HoodieActiveTimeline: Loaded instants [[20200304102036__commit__COMPLETED]]
   20/03/04 10:21:02 INFO deltastreamer.DeltaSync: Checkpoint to resume from : Option{val=}
   20/03/04 10:21:02 INFO consumer.ConsumerConfig: ConsumerConfig values:
           auto.commit.interval.ms = 5000
           auto.offset.reset = latest
           bootstrap.servers = [streaming-kafka-broker-1:9092, streaming-kafka-broker-2:9092,
streaming-kafka-broker-3:9092]
           check.crcs = true
           client.id =
           connections.max.idle.ms = 540000
           default.api.timeout.ms = 60000
           enable.auto.commit = false
           exclude.internal.topics = true
           fetch.max.bytes = 52428800
           fetch.max.wait.ms = 500
           fetch.min.bytes = 1
           group.id = hudi_testing
           heartbeat.interval.ms = 3000
           interceptor.classes = []
           internal.leave.group.on.close = true
           isolation.level = read_uncommitted
           key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
           max.partition.fetch.bytes = 1048576
           max.poll.interval.ms = 300000
           max.poll.records = 500
           metadata.max.age.ms = 300000
           metric.reporters = []
           metrics.num.samples = 2
           metrics.recording.level = INFO
           metrics.sample.window.ms = 30000
           partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
           receive.buffer.bytes = 65536
           reconnect.backoff.max.ms = 1000
           reconnect.backoff.ms = 50
           request.timeout.ms = 30000
           retry.backoff.ms = 100
           sasl.client.callback.handler.class = null
           sasl.jaas.config = null
           sasl.kerberos.kinit.cmd = /usr/bin/kinit
           sasl.kerberos.min.time.before.relogin = 60000
           sasl.kerberos.service.name = null
           sasl.kerberos.ticket.renew.jitter = 0.05
           sasl.kerberos.ticket.renew.window.factor = 0.8
           sasl.login.callback.handler.class = null
           sasl.login.class = null
           sasl.login.refresh.buffer.seconds = 300
           sasl.login.refresh.min.period.seconds = 60
           sasl.login.refresh.window.factor = 0.8
           sasl.login.refresh.window.jitter = 0.05
           sasl.mechanism = GSSAPI
           security.protocol = PLAINTEXT
           send.buffer.bytes = 131072
           session.timeout.ms = 10000
           ssl.cipher.suites = null
           ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
           ssl.endpoint.identification.algorithm = null
           ssl.key.password = null
           ssl.keymanager.algorithm = SunX509
           ssl.keystore.location = null
           ssl.keystore.password = null
           ssl.keystore.type = JKS
           ssl.protocol = TLS
           ssl.provider = null
           ssl.secure.random.implementation = null
           ssl.trustmanager.algorithm = PKIX
           ssl.truststore.location = null
           ssl.truststore.password = null
           ssl.truststore.type = JKS
           value.deserializer = class fr24.ingest.ZlibDeserializerString
   
   20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 'schemaprovider-class'
was supplied but isn't a known config.
   20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 'hoodie.deltastreamer.schemaprovider.source.schema.file'
was supplied bt isn't a known config.
   20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 'hoodie.deltastreamer.schemaprovider.target.schema.file'
was supplied bt isn't a known config.
   20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 'hoodie.datasource.write.partitionpath.field'
was supplied but isn't a nown config.
   20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 'hoodie.datasource.write.recordkey.field'
was supplied but isn't a know config.
   20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 'hoodie.deltastreamer.source.kafka.topic'
was supplied but isn't a know config.
   20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 'source-class' was supplied
but isn't a known config.
   20/03/04 10:21:02 INFO utils.AppInfoParser: Kafka version : 2.0.0-cdh6.1.0
   20/03/04 10:21:02 INFO utils.AppInfoParser: Kafka commitId : null
   20/03/04 10:21:02 INFO clients.Metadata: Cluster ID: h7B3MAm8TLumIZQHVT802A
   20/03/04 10:21:02 INFO sources.JsonKafkaSource: About to read 619753 from Kafka for topic
:fr-bru
   20/03/04 10:21:02 WARN kafka010.KafkaUtils: overriding enable.auto.commit to false for
executor
   20/03/04 10:21:02 WARN kafka010.KafkaUtils: overriding auto.offset.reset to none for executor
   20/03/04 10:21:02 WARN kafka010.KafkaUtils: overriding executor group.id to spark-executor-hudi_testing
   20/03/04 10:21:02 WARN kafka010.KafkaUtils: overriding receive.buffer.bytes to 65536 see
KAFKA-3135
   20/03/04 10:21:02 INFO spark.SparkContext: Starting job: isEmpty at DeltaSync.java:329
   20/03/04 10:21:02 INFO scheduler.DAGScheduler: Got job 8 (isEmpty at DeltaSync.java:329)
with 1 output partitions
   20/03/04 10:21:02 INFO scheduler.DAGScheduler: Final stage: ResultStage 14 (isEmpty at
DeltaSync.java:329)
   20/03/04 10:21:02 INFO scheduler.DAGScheduler: Parents of final stage: List()
   20/03/04 10:21:02 INFO scheduler.DAGScheduler: Missing parents: List()
   20/03/04 10:21:02 INFO scheduler.DAGScheduler: Submitting ResultStage 14 (MapPartitionsRDD[47]
at map at SourceFormatAdapter.java:62), whch has no missing parents
   20/03/04 10:21:02 INFO memory.MemoryStore: Block broadcast_9 stored as values in memory
(estimated size 5.5 KB, free 366.2 MB)
   20/03/04 10:21:02 INFO memory.MemoryStore: Block broadcast_9_piece0 stored as bytes in
memory (estimated size 3.3 KB, free 366.2 MB)
   20/03/04 10:21:02 INFO storage.BlockManagerInfo: Added broadcast_9_piece0 in memory on
biedva-worker-1.bigdatapoc.local:34716 (size: 3.3 B, free: 366.3 MB)
   20/03/04 10:21:02 INFO spark.SparkContext: Created broadcast 9 from broadcast at DAGScheduler.scala:1164
   20/03/04 10:21:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage
14 (MapPartitionsRDD[47] at map at SourceFormaAdapter.java:62) (first 15 tasks are for partitions
Vector(0))
   20/03/04 10:21:02 INFO cluster.YarnScheduler: Adding task set 14.0 with 1 tasks
   20/03/04 10:21:02 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 14.0 (TID 10502,
biedva-worker-6.bigdatapoc.local, executor 7 partition 0, PROCESS_LOCAL, 7765 bytes)
   20/03/04 10:21:02 INFO storage.BlockManagerInfo: Added broadcast_9_piece0 in memory on
biedva-worker-6.bigdatapoc.local:34194 (size: 3.3 B, free: 366.3 MB)
   20/03/04 10:21:02 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 14.0 (TID 10502,
biedva-worker-6.bigdatapoc.local, executor 7): og.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Offsets out of range with no configured reset policy for partitions: {fr-bru-0=0}
           at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:1002)
           at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:508)
           at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1261)
           at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1189)
           at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
           at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:200)
           at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:129)
           at org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36)
           at org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:212)
           at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261)
           at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229)
           at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
           at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
           at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
           at scala.collection.Iterator$class.foreach(Iterator.scala:891)
           at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
           at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
           at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
           at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
           at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
           at scala.collection.AbstractIterator.to(Iterator.scala:1334)
           at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
           at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
           at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
           at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
           at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
           at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
           at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2113)
           at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2113)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:121)
           at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
           at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   ```
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message