kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Randall Hauch <rha...@gmail.com>
Subject Re: Kafka Connect and Partitions
Date Fri, 28 Apr 2017 15:07:59 GMT
The source connector creates SourceRecord object and can set a number of
fields, including the message's key and value, the Kafka topic name and, if
desired, the Kafka topic partition number. If the connector does se the the
topic partition to a non-null value, then that's the partition to which
Kafka Connect will write the message; otherwise, the customer partitioner
(e.g., your custom partitioner) used by the Kafka Connect producer will
choose/compute the partition based purely upon the key and value byte
arrays. Note that if the connector doesn't set the topic partition number
and no special producer partitioner is specified, the default hash-based
Kafka partitioner will be used.

So, the connector can certainly set the topic partition number, and it may
be easier to do it there since the connector actually has the key and
values before they are serialized. But no matter what, the connector is the
only thing that sets the message key in the source record.

BTW, the SourceRecord's "source position" and "source offset" are actually
the connector-defined information about the source and where the connector
has read in that source. Don't confuse these with the topic name or topic
partition number.

Hope that helps.

Randall

On Fri, Apr 28, 2017 at 7:15 AM, <david.franklin@bt.com> wrote:

> Hi Gwen,
>
> Having added a custom partitioner (via the producer.partitioner.class
> property in worker.properties) that simply randomly selects a partition,
> the data is now written evenly across all the partitions :)
>
> The root of my confusion regarding why the default partitioner writes all
> data to the same partition is that I don't understand how the SourceRecords
> returned in the source task poll() method are used by the partitioner.  The
> data that is passed to the partitioner includes a key Object (which is an
> empty byte array - presumably this is a bad idea!), and a value Object
> (which is a non-empty byte array):
>
>     @Override
>     public int partition(String topic, Object key, byte[] keyBytes, Object
> value, byte[] valueBytes, Cluster cluster) {
>         System.out.println(String.format(
>                 "### PARTITION key[%s][%s][%d] value[%s][%s][%d]",
>                 key, key.getClass().getSimpleName(), keyBytes.length,
>                 value, value.getClass().getSimpleName(),
> valueBytes.length));
>
> =>
> ### PARTITION key[[B@584f599f][byte[]][0] value[[B@73cc0cd8][byte[]][236]
>
> However, I don't understand how the above key and value are derived from
> the SourceRecord attributes which, in my application's case, is as follows:
>
>                     events.add(new SourceRecord(
>                             offsetKey(filename),
>                             offsetValue(++recordIndex),
>                             topicName,
>                             Schema.BYTES_SCHEMA,
>                             line));
>                     System.out.println(String.format(
>                             "### PARTITION SourceRecord key[%s] value[%s]
> topic[%s] schema[%s], data[%s]",
>                             offsetKey(filename), offsetValue(recordIndex),
> topicName, Schema.BYTES_SCHEMA, line));
>
> =>
> ### PARTITION SourceRecord key[{_taskFiles=e:\a\b\c\d.ser}]
> value[{_position=1}] topic[Topic1] schema[Schema{BYTES}], data[{"field1":
> value1,  …, "fieldN": valueN}]
>
> In worker.properties I use the key.converter and value.converter
> properties to apply an Avro converter to the data written to Kafka.  Hence,
> I assume, the byte[]  format of the key and the value.  Though I don't
> understand why the key is empty and this, presumably, is why all data is
> mapped to the same Kafka partition.
>
> Could you explain how the SourceRecord is used to derive the partition key
> please.  Can you see from the above summary why the partition key is null?
> It defeats me :(
>
> Have a good weekend, thanks,
>
> David
>
> -----Original Message-----
> From: Gwen Shapira [mailto:gwen@confluent.io]
> Sent: 27 April 2017 17:44
> To: dev@kafka.apache.org
> Subject: Re: Kafka Connect and Partitions
>
> That's great! So we tracked this down to the source connector not properly
> partitioning data.
>
> Do you set both key and value? It sounds a bit like maybe all your records
> have the exact same key, which means they all get hashed to the same
> partition. Can you check that?
>
> On Thu, Apr 27, 2017 at 3:22 AM, <david.franklin@bt.com> wrote:
>
> > Hi Gwen,
> >
> > Many thanks for your much appreciated offer to help with this.
> >
> > In answer to your questions:
> > * Are you writing a connector or trying to use an existing one?
> > I'm writing a new source/sink connector pipeline: folderToTopics piped
> > into topicsToFolders.
> > * Is the connector reading from the topic you think you are reading?
> > Yes
> > * Do you actually have 4 tasks? Are they all running? Are there errors?
> > Yes, Yes, No (see log output below)
> > * What happens if you stop the only task doing the work?
> > I'm not sure how to do this but am confident that the single effective
> > task does actually complete the job correctly.
> > * Is the one task subscribed to all partitions? How did you check that?
> > I don't think it is.  In the log output below, the line tagged
> > 'Setting newly assigned partitions' shows the topics and their
> > partitions; the lines tagged 'Kafka Offset/Partition' show the
> > partition that contains the data read.
> > As you will see, only partition 31 is ever read, (interestingly) for
> > all topics.
> > * Do you have data in all 50 partitions?
> > No - only partition 31 contains any data, which therefore explains the
> > 'topicsToFolders' sink task behaviour.
> > This points the finger of suspicion at the 'foldersToTopics' source task.
> > This may be the root of the problem but I'm not clear why the
> > DefaultPartitioner doesn't write the data across all partitions.
> > To simplify things, I've reduced my setup to 2 producer tasks, reading
> > data from files to generate events from, and 2 consumer tasks that
> > read events and serialize them to another set of files.
> > As it stands it's a perverse file copy!
> > * Anything interesting in the log?
> > Not that I can see.
> >
> >
> > I ran the following command across several of the topics and each gave
> > the same result:
> > only partition 31 contains any data!
> > $ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
> > localhost:9092 -topic topicN --time -1 --offsets 5
> > topicN:23:0
> > topicN:32:0
> > topicN:41:0
> > topicN:17:0
> > topicN:8:0
> > topicN:44:0
> > topicN:35:0
> > topicN:26:0
> > topicN:11:0
> > topicN:29:0
> > topicN:47:0
> > topicN:38:0
> > topicN:20:0
> > topicN:2:0
> > topicN:5:0
> > topicN:14:0
> > topicN:46:0
> > topicN:40:0
> > topicN:49:0
> > topicN:13:0
> > topicN:4:0
> > topicN:31:91169
> > topicN:22:0
> > topicN:16:0
> > topicN:7:0
> > topicN:43:0
> > topicN:25:0
> > topicN:34:0
> > topicN:10:0
> > topicN:37:0
> > topicN:1:0
> > topicN:28:0
> > topicN:19:0
> > topicN:45:0
> > topicN:36:0
> > topicN:27:0
> > topicN:9:0
> > topicN:18:0
> > topicN:21:0
> > topicN:48:0
> > topicN:3:0
> > topicN:12:0
> > topicN:30:0
> > topicN:39:0
> > topicN:15:0
> > topicN:42:0
> > topicN:24:0
> > topicN:33:0
> > topicN:6:0
> > topicN:0:0
> >
> >
> > Below is an edited version of the log output when I launch the Kafka
> > Connect source and sink connectors/tasks.
> >
> > Source Task [2134126231] retrieves 500 events, in 5 batches.
> > Source Task [1714504927] retrieves 500 events, in 5 batches.
> >
> > Sink Task [518962883] {id=topicsToFolders-0} processes none of the
> events.
> > Sink Task [602953950] {id=topicsToFolders-1} processes all of the 1000
> > events that are written by the source tasks.
> >
> > It seems to me that the consumer tasks are behaving reasonably given
> > that all data for all topics is in a single partition.
> > But I don't understand why the producer tasks write to a single
> partition.
> > Does it suggest that the DefaultPartitioner is generating the same
> > hash value for the data I'm providing it and that I need to introduce
> > a custom partitioner?
> >
> > Once again, many thanks for your help in resolving this.
> >
> > Best wishes,
> > David
> >
> >
> >
> >         offset.flush.timeout.ms = 10000
> >         offset.flush.interval.ms = 1000
> >         rest.port = 8084
> > [41:24,897] INFO Kafka Connect starting (org.apache.kafka.connect.
> > runtime.Connect:52)
> > [41:24,897] INFO Herder starting (org.apache.kafka.connect.
> > runtime.standalone.StandaloneHerder:71)
> > [41:24,897] INFO Worker starting (org.apache.kafka.connect.
> > runtime.Worker:102)
> > [41:24,912] INFO ProducerConfig values:
> >         partitioner.class = class org.apache.kafka.clients.
> > producer.internals.DefaultPartitioner
> > [41:24,990] INFO ProducerConfig values:
> >         partitioner.class = class org.apache.kafka.clients.
> > producer.internals.DefaultPartitioner
> > [41:24,990] INFO Kafka version : 0.10.0.1 (org.apache.kafka.common.
> > utils.AppInfoParser:83)
> > [41:24,990] INFO Starting FileOffsetBackingStore with file
> > \tmp\kafkaConnect\foldersToTopicsToFolders.offsets
> > (org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
> > [41:25,006] INFO Worker started (org.apache.kafka.connect.
> > runtime.Worker:124)
> > [41:25,006] INFO Herder started (org.apache.kafka.connect.
> > runtime.standalone.StandaloneHerder:73)
> > [41:25,006] INFO Starting REST server (org.apache.kafka.connect.
> > runtime.rest.RestServer:98)
> >
> > [41:25,600] INFO Started
> > o.e.j.s.ServletContextHandler@14d14731{/,null,AVAILABLE}
> > (org.eclipse.jetty.server.handler.ContextHandler:744)
> > [41:25,615] INFO Started
> > ServerConnector@793138bd{HTTP/1.1}{0.0.0.0:8084}
> > (org.eclipse.jetty.server.ServerConnector:266)
> > [41:25,615] INFO Started @1365ms (org.eclipse.jetty.server.Server:379)
> > [41:25,615] INFO REST server listening at xxxx, advertising URL xxxx
> > (org.apache.kafka.connect.runtime.rest.RestServer:150)
> > [41:25,615] INFO Kafka Connect started (org.apache.kafka.connect.
> > runtime.Connect:58)
> > [41:25,615] INFO ConnectorConfig values:
> >         connector.class = c.b.b.pp.kafkaConnect.connectors.
> > FoldersToTopicsSourceConnector
> >         tasks.max = 2
> >         name = foldersToTopics
> > [41:25,631] INFO Creating connector foldersToTopics of type
> > c.b.b.pp.kafkaConnect.connectors.FoldersToTopicsSourceConnector
> > (org.apache.kafka.connect.runtime.Worker:168)
> > [41:25,631] INFO Instantiated connector foldersToTopics with version
> > 0.10.0.1 of type
> > c.b.b.pp.kafkaConnect.connectors.FoldersToTopicsSourceConnector
> > (org.apache.kafka.connect.runtime.Worker:176)
> > [41:25,631] INFO ### FoldersToTopicsSourceConnector is monitoring
> > inbox [e:/events/sensor-103] and has [2] tasks (c.b.b.pp.kafkaConnect.
> > connectors.FoldersToTopicsSourceConnector:55)
> > [41:25,631] INFO Finished creating connector foldersToTopics
> > (org.apache.kafka.connect.runtime.Worker:181)
> > [41:25,631] INFO SourceConnectorConfig values:
> >         connector.class = c.b.b.pp.kafkaConnect.connectors.
> > FoldersToTopicsSourceConnector
> >         tasks.max = 2
> >         name = foldersToTopics
> >
> > [41:26,162] INFO ### FileLister found [1000] files under
> > [~eventSource103]
> > (c.b.b.pp.InboxMonitor:89)
> > [41:26,162] INFO ### Files in inbox [~eventSource103] have changed to
> > [1000] files - requesting task reconfiguration
> > (c.b.b.pp.InboxMonitor:61) [41:26,162] INFO ### inbox
> > [~eventSource103] contains [1000] files
> > (c.b.b.pp.kafkaConnect.connectors.FoldersToTopicsSourceConnector:76)
> >
> > [41:26,162] INFO Creating task foldersToTopics-0
> (org.apache.kafka.connect.
> > runtime.Worker:315)
> > [41:26,162] INFO Instantiated task foldersToTopics-0 with version
> > 0.10.0.1 of type c.b.b.pp.kafkaConnect.tasks.FoldersToTopicsSourceTask
> > (org.apache.kafka.connect.runtime.Worker:326)
> >
> > [41:26,162] INFO Creating task foldersToTopics-1
> (org.apache.kafka.connect.
> > runtime.Worker:315)
> > [41:26,162] INFO Instantiated task foldersToTopics-1 with version
> > 0.10.0.1 of type c.b.b.pp.kafkaConnect.tasks.FoldersToTopicsSourceTask
> > (org.apache.kafka.connect.runtime.Worker:326)
> > [41:26,162] INFO Created connector foldersToTopics
> > (org.apache.kafka.connect.cli.ConnectStandalone:91)
> >
> > [41:26,162] INFO ConnectorConfig values:
> >         connector.class = c.b.b.pp.kafkaConnect.connectors.
> > TopicsToFoldersSinkConnector
> >         tasks.max = 2
> >         name = topicsToFolders
> >  (org.apache.kafka.connect.runtime.ConnectorConfig:178)
> > [41:26,162] INFO Creating connector topicsToFolders of type
> > c.b.b.pp.kafkaConnect.connectors.TopicsToFoldersSinkConnector
> > (org.apache.kafka.connect.runtime.Worker:168)
> > [41:26,162] INFO Instantiated connector topicsToFolders with version
> > 0.10.0.1 of type
> > c.b.b.pp.kafkaConnect.connectors.TopicsToFoldersSinkConnector
> > (org.apache.kafka.connect.runtime.Worker:176)
> > [41:26,162] INFO Finished creating connector topicsToFolders
> > (org.apache.kafka.connect.runtime.Worker:181)
> > [41:26,162] INFO SinkConnectorConfig values:
> >         connector.class = c.b.b.pp.kafkaConnect.connectors.
> > TopicsToFoldersSinkConnector
> >         tasks.max = 2
> >         topics = [topic1, topic2, topic3, topic4, topic5, topic6,
> > topic7, topic8, topic9, topic10, topic11]
> >         name = topicsToFolders
> > [41:26,162] INFO ### Max tasks [2] (c.b.b.pp.kafkaConnect.connectors.
> > TopicsToFoldersSinkConnector:54)
> > [41:26,162] INFO ### Topics for task
> > [topic1,topic2,topic3,topic4,topic5,topic6]
> > (c.b.b.pp.kafkaConnect.connectors.TopicsToFoldersSinkConnector:61)
> > [41:26,162] INFO ### Topics for task
> > [topic7,topic8,topic10,topic9,topic11]
> > (c.b.b.pp.kafkaConnect.connectors.TopicsToFoldersSinkConnector:61)
> >
> > [41:26,162] INFO Creating task topicsToFolders-0
> (org.apache.kafka.connect.
> > runtime.Worker:315)
> > [41:26,162] INFO Instantiated task topicsToFolders-0 with version
> > 0.10.0.1 of type c.b.b.pp.kafkaConnect.tasks.TopicsToFoldersSinkTask
> > (org.apache.kafka.connect.runtime.Worker:326)
> > [41:26,178] INFO ConsumerConfig values:
> >         partition.assignment.strategy = [org.apache.kafka.clients.
> > consumer.RangeAssignor]
> >         max.partition.fetch.bytes = 1048576
> >         max.poll.records = 1000
> >         session.timeout.ms = 30000
> >
> > [41:26,209] INFO Kafka version : 0.10.0.1 (org.apache.kafka.common.
> > utils.AppInfoParser:83)
> > [41:26,209] INFO Kafka commitId : a7a17cdec9eaa6c5
> > (org.apache.kafka.common.utils.AppInfoParser:84)
> > [41:26,209] INFO TaskConfig values:
> >         task.class = class c.b.b.pp.kafkaConnect.tasks.
> > TopicsToFoldersSinkTask
> >  (org.apache.kafka.connect.runtime.TaskConfig:178)
> >
> > [41:26,209] INFO Creating task topicsToFolders-1
> (org.apache.kafka.connect.
> > runtime.Worker:315)
> > [41:26,209] INFO Instantiated task topicsToFolders-1 with version
> > 0.10.0.1 of type c.b.b.pp.kafkaConnect.tasks.TopicsToFoldersSinkTask
> > (org.apache.kafka.connect.runtime.Worker:326)
> > [41:26,209] INFO ConsumerConfig values:
> >         partition.assignment.strategy = [org.apache.kafka.clients.
> > consumer.RangeAssignor]
> >         max.partition.fetch.bytes = 1048576
> >         max.poll.records = 1000
> >         session.timeout.ms = 30000
> >
> > [41:26,225] INFO Kafka version : 0.10.0.1 (org.apache.kafka.common.
> > utils.AppInfoParser:83)
> >
> > [41:26,225] INFO ### Task [518962883] is (ideally) assigned topics
> > [topic1,topic2,topic3,topic4,topic5,topic6] BUT ACTUALLY IT ISN'T!!!
> > (c.b.b.pp.kafkaConnect.tasks.TopicsToFoldersSinkTask:49)
> > [41:26,225] INFO Sink task WorkerSinkTask{id=topicsToFolders-0}
> > finished initialization and start (org.apache.kafka.connect.
> > runtime.WorkerSinkTask:208)
> > [41:26,225] INFO Created connector topicsToFolders
> > (org.apache.kafka.connect.cli.ConnectStandalone:91)
> >
> > [41:26,225] INFO ### Task [602953950] is (ideally) assigned topics
> > [topic7,topic8,topic10,topic9,topic11] BUT ACTUALLY IT ISN'T!!!
> > (c.b.b.pp.kafkaConnect.tasks.TopicsToFoldersSinkTask:49)
> > [41:26,225] INFO Sink task WorkerSinkTask{id=topicsToFolders-1}
> > finished initialization and start (org.apache.kafka.connect.
> > runtime.WorkerSinkTask:208)
> >
> > [41:26,240] INFO Source task WorkerSourceTask{id=foldersToTopics-1}
> > finished initialization and start (org.apache.kafka.connect.
> > runtime.WorkerSourceTask:138)
> > [41:26,240] INFO Source task WorkerSourceTask{id=foldersToTopics-0}
> > finished initialization and start (org.apache.kafka.connect.
> > runtime.WorkerSourceTask:138)
> > [41:26,365] INFO Discovered coordinator localhost:9092 (id: 2147483647
> > rack: null) for group connect-topicsToFolders. (org.apache.kafka.clients.
> > consumer.internals.AbstractCoordinator:505)
> > [41:26,365] INFO Discovered coordinator localhost:9092 (id: 2147483647
> > rack: null) for group connect-topicsToFolders. (org.apache.kafka.clients.
> > consumer.internals.AbstractCoordinator:505)
> > [41:26,365] INFO Revoking previously assigned partitions [] for group
> > connect-topicsToFolders (org.apache.kafka.clients.consumer.internals.
> > ConsumerCoordinator:292)
> > [41:26,365] INFO Revoking previously assigned partitions [] for group
> > connect-topicsToFolders (org.apache.kafka.clients.consumer.internals.
> > ConsumerCoordinator:292)
> > [41:26,365] INFO (Re-)joining group connect-topicsToFolders
> > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:326)
> > [41:26,365] INFO (Re-)joining group connect-topicsToFolders
> > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:326)
> > [41:26,365] INFO (Re-)joining group connect-topicsToFolders
> > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:326)
> > [41:26,381] INFO Successfully joined group connect-topicsToFolders
> > with generation 2 (org.apache.kafka.clients.consumer.internals.
> > AbstractCoordinator:434)
> > [41:26,381] INFO Successfully joined group connect-topicsToFolders
> > with generation 2 (org.apache.kafka.clients.consumer.internals.
> > AbstractCoordinator:434)
> > [41:26,381] INFO Setting newly assigned partitions [topic7-18,
> > topic2-11, topic11-17, topic8-0, topic1-11, topic4-15, topic5-17,
> > topic6-0, topic7-2, topic10-16, topic11-1, topic8-16, topic6-16,
> > topic9-9, topic5-0, topic3-12, topic10-0, topic7-19, topic2-12,
> > topic6-1, topic1-12, topic11-16, topic8-1, topic4-14, topic9-24,
> > topic5-16, topic8-18, topic10-17, topic7-3, topic6-17, topic11-0,
> > topic8-17, topic9-8, topic3-11, topic10-1, topic4-17, topic7-20,
> > topic5-15, topic6-14, topic9-23, topic10-18, topic4-1, topic3-14,
> > topic7-4, topic8-14, topic11-15, topic1-13, topic2-9, topic9-7,
> > topic10-2, topic4-16, topic7-21, topic5-14, topic9-22, topic10-19,
> > topic4-0, topic6-15, topic7-5, topic8-15, topic1-14, topic11-14,
> > topic3-13, topic2-10, topic9-6, topic10-3, topic7-22, topic2-15,
> > topic3-0, topic5-13, topic6-12, topic9-21, topic8-12, topic7-6,
> > topic3-16, topic11-13, topic1-15, topic4-11, topic9-5, topic10-20,
> > topic7-23, topic2-16, topic6-13, topic1-0, topic9-20, topic10-4,
> > topic5-12, topic8-13, topic7-7, topic3-15, topic2-0, topic1-16,
> > topic11-12, topic7-24, topic4-10, topic10-21, topic9-4, topic2-13,
> > topic3-2, topic9-19, topic7-8, topic10-5, topic5-11, topic6-10,
> > topic1-1, topic3-18, topic9-3, topic8-10, topic4-13, topic11-11,
> > topic10-22, topic1-17, topic2-14, topic9-18, topic3-1, topic6-11,
> > topic7-9, topic1-2, topic10-6, topic5-10, topic9-2, topic8-11,
> > topic3-17, topic4-12, topic10-23, topic1-18, topic11-10, topic9-17,
> > topic3-4, topic7-10, topic2-19, topic1-3, topic10-7, topic4-23,
> > topic5-9, topic6-8, topic9-1, topic5-8, topic3-20, topic10-24,
> > topic2-3, topic11-9, topic8-8, topic1-19, topic4-7, topic6-24,
> > topic9-16, topic5-24, topic3-3, topic10-8, topic7-11, topic2-20,
> > topic6-9, topic1-4, topic11-24, topic4-22, topic5-7, topic9-0,
> > topic3-19, topic2-4, topic1-20, topic11-8, topic8-9, topic4-6,
> > topic5-23, topic6-6, topic11-23, topic1-5, topic2-17, topic9-15,
> > topic10-9, topic3-23, topic7-12, topic8-23, topic6-22, topic11-7,
> > topic5-6, topic1-21, topic2-1, topic4-9, topic3-6, topic8-6, topic1-6,
> > topic11-22, topic5-22, topic3-5, topic2-18, topic9-14, topic10-10,
> > topic8-24, topic3-22, topic4-24, topic6-7, topic7-13, topic5-5,
> > topic1-22, topic11-6, topic3-21, topic2-2, topic4-8, topic6-23,
> > topic8-7, topic11-21, topic1-7, topic4-19, topic5-21, topic6-4,
> > topic9-13, topic8-21, topic10-11, topic7-14, topic2-23, topic11-5,
> > topic1-23, topic4-3, topic6-20, topic10-12, topic5-4, topic8-4,
> > topic2-7, topic3-8, topic6-5, topic1-8, topic11-20, topic4-18,
> > topic9-12, topic5-20, topic7-15, topic8-22, topic3-24, topic2-24,
> > topic6-21, topic1-24, topic11-4, topic4-2, topic10-13, topic5-3,
> > topic8-5, topic3-7, topic2-8, topic4-21, topic7-16, topic5-19,
> > topic6-2, topic11-19, topic1-9, topic2-21, topic8-19, topic4-5,
> > topic7-0, topic6-18, topic11-3, topic10-14, topic5-2, topic2-5,
> > topic3-10, topic9-11, topic8-2, topic4-20, topic6-3, topic7-17,
> > topic1-10, topic11-18, topic5-18, topic8-20, topic2-22, topic4-4,
> > topic6-19, topic7-1, topic10-15, topic5-1, topic11-2, topic2-6,
> > topic9-10, topic8-3, topic3-9] for group connect-topicsToFolders
> > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:231)
> > [41:26,381] INFO Setting newly assigned partitions [topic10-32,
> > topic2-44, topic6-33, topic1-44, topic4-48, topic8-33, topic9-25,
> > topic3-29, topic10-49, topic7-35, topic2-28, topic6-49, topic4-32,
> > topic8-49, topic1-27, topic5-33, topic11-34, topic9-42, topic3-45,
> > topic10-33, topic2-45, topic11-49, topic6-34, topic4-47, topic5-49,
> > topic3-28, topic1-29, topic2-29, topic11-33, topic1-28, topic7-36,
> > topic4-31, topic9-41, topic5-32, topic8-34, topic3-44, topic1-45,
> > topic10-34, topic11-48, topic3-31, topic6-31, topic8-31, topic5-48,
> > topic11-31, topic1-30, topic2-25, topic4-34, topic11-32, topic3-47,
> > topic6-47, topic7-37, topic8-47, topic5-31, topic9-40, topic6-30,
> > topic1-46, topic2-42, topic10-35, topic2-43, topic4-49, topic8-32,
> > topic3-30, topic6-32, topic5-47, topic1-31, topic11-30, topic2-26,
> > topic2-27, topic7-38, topic4-33, topic8-48, topic3-46, topic6-48,
> > topic1-47, topic11-47, topic5-30, topic9-39, topic8-29, topic2-48,
> > topic3-33, topic6-45, topic11-29, topic1-32, topic4-27, topic5-46,
> > topic10-36, topic8-45, topic7-39, topic2-32, topic3-49, topic1-48,
> > topic4-44, topic5-29, topic6-28, topic11-46, topic9-38, topic2-49,
> > topic8-30, topic3-32, topic6-46, topic11-28, topic7-40, topic5-45,
> > topic4-26, topic10-37, topic1-33, topic2-33, topic8-46, topic3-48,
> > topic6-29, topic11-45, topic4-43, topic9-37, topic5-28, topic1-49,
> > topic2-46, topic3-35, topic8-27, topic4-46, topic4-29, topic6-43,
> > topic7-41, topic5-44, topic11-27, topic1-34, topic10-38, topic2-30,
> > topic8-43, topic4-30, topic11-44, topic7-25, topic5-27, topic9-36,
> > topic6-26, topic8-28, topic3-34, topic2-47, topic7-42, topic4-28,
> > topic6-44, topic5-43, topic1-35, topic10-39, topic11-26, topic8-44,
> > topic9-35, topic2-31, topic7-26, topic4-45, topic6-27, topic11-43,
> > topic5-26, topic3-37, topic10-40, topic7-43, topic6-41, topic11-25,
> > topic1-36, topic8-25, topic5-42, topic9-34, topic7-27, topic2-36,
> > topic4-40, topic8-41, topic5-25, topic11-42, topic9-49, topic8-26,
> > topic3-36, topic1-37, topic10-41, topic6-42, topic7-44, topic5-41,
> > topic9-33, topic5-40, topic8-42, topic10-25, topic2-37, topic6-25,
> > topic11-41, topic7-28, topic4-39, topic9-48, topic1-38, topic10-42,
> > topic4-25, topic6-39, topic7-45, topic5-39, topic9-32, topic2-34,
> > topic10-26, topic4-42, topic11-40, topic3-39, topic7-29, topic8-39,
> > topic1-39, topic9-47, topic10-43, topic7-46, topic6-40, topic11-39,
> > topic5-38, topic10-44, topic9-31, topic10-27, topic2-35, topic7-30,
> > topic4-41, topic8-40, topic3-38, topic6-37, topic1-40, topic10-28,
> > topic9-46, topic7-47, topic3-25, topic4-36, topic5-37, topic11-38,
> > topic10-45, topic9-30, topic8-37, topic7-31, topic2-40, topic3-41,
> > topic6-38, topic7-48, topic9-45, topic10-29, topic1-41, topic11-37,
> > topic7-32, topic4-35, topic9-29, topic5-36, topic10-46, topic2-41,
> > topic8-38, topic3-40, topic6-35, topic7-49, topic9-44, topic1-42,
> > topic10-30, topic3-27, topic9-27, topic11-36, topic7-33, topic5-35,
> > topic9-28, topic10-47, topic1-25, topic2-38, topic3-43, topic8-35,
> > topic4-38, topic6-36, topic1-43, topic10-31, topic3-26, topic9-26,
> > topic7-34, topic4-37, topic10-48, topic11-35, topic1-26, topic5-34,
> > topic8-36, topic3-42, topic9-43, topic2-39] for group
> > connect-topicsToFolders (org.apache.kafka.clients.consumer.internals.
> > ConsumerCoordinator:231)
> >
> > [41:27,131] INFO ### Task [2134126231] retrieved [100] records
> >
> > [41:27,162] INFO ### [TopicsToFoldersSinkTask]:[518962883] received
> > [0] records [41:27,162] INFO ### Writing [0] events [41:27,162] INFO
> > ### Sink Task [518962883] has processed [0] events
> >
> > [41:27,162] INFO WorkerSinkTask{id=topicsToFolders-0} Committing
> > offsets
> > (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
> >
> > [41:27,178] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [1] records [41:27,178] INFO ### Topic [topic1] Kafka Offset/Partition
> > [91152/31] [41:27,178] INFO ### Writing [1] events [41:27,272] INFO
> > ### Sink Task [602953950] has processed [1] events
> >
> > [41:27,272] INFO WorkerSinkTask{id=topicsToFolders-1} Committing
> > offsets
> > (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
> >
> > [41:27,272] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [1] records [41:27,287] INFO ### Topic [topic11] Kafka
> > Offset/Partition [90542/31] [41:27,287] INFO ### Writing [1] events
> > [41:27,287] INFO ### Sink Task [602953950] has processed [2] events
> >
> > [41:27,312] INFO Finished WorkerSourceTask{id=foldersToTopics-1}
> > commitOffsets successfully in 134 ms (org.apache.kafka.connect.
> > runtime.WorkerSourceTask:358)
> >
> > [41:27,354] INFO ### Task [1714504927] retrieved [100] records
> >
> > [41:27,372] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [1] records [41:27,372] INFO ### Topic [topic8] Kafka Offset/Partition
> > [91028/31] [41:27,372] INFO ### Writing [1] events [41:27,375] INFO
> > ### Sink Task [602953950] has processed [3] events
> >
> > [41:27,382] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [1] records [41:27,382] INFO ### Topic [topic5] Kafka Offset/Partition
> > [91169/31] [41:27,383] INFO ### Writing [1] events [41:27,386] INFO
> > ### Sink Task [602953950] has processed [4] events
> >
> > [41:27,459] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [1] records [41:27,459] INFO ### Topic [topic2] Kafka Offset/Partition
> > [90560/31] [41:27,459] INFO ### Writing [1] events [41:27,459] INFO
> > ### Sink Task [602953950] has processed [5] events
> >
> > [41:27,475] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [2] records [41:27,475] INFO ### Topic [topic2] Kafka Offset/Partition
> > [90561/31] [41:27,475] INFO ### Topic [topic10] Kafka Offset/Partition
> > [91454/31] [41:27,475] INFO ### Writing [2] events [41:27,475] INFO
> > ### Sink Task [602953950] has processed [7] events
> >
> > [41:27,568] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [2] records [41:27,568] INFO ### Topic [topic4] Kafka Offset/Partition
> > [91644/31] [41:27,568] INFO ### Topic [topic4] Kafka Offset/Partition
> > [91645/31] [41:27,568] INFO ### Writing [2] events [41:27,568] INFO
> > ### Sink Task [602953950] has processed [9] events
> >
> > [41:27,584] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [2] records [41:27,584] INFO ### Topic [topic4] Kafka Offset/Partition
> > [91646/31] [41:27,584] INFO ### Topic [topic5] Kafka Offset/Partition
> > [91170/31] [41:27,584] INFO ### Writing [2] events [41:27,584] INFO
> > ### Sink Task [602953950] has processed [11] events
> >
> > [41:27,678] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [1] records [41:27,678] INFO ### Topic [topic7] Kafka Offset/Partition
> > [90928/31] [41:27,678] INFO ### Writing [1] events [41:27,678] INFO
> > ### Sink Task [602953950] has processed [12] events
> >
> > [41:27,678] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [2] records [41:27,678] INFO ### Topic [topic4] Kafka Offset/Partition
> > [91647/31] [41:27,678] INFO ### Topic [topic9] Kafka Offset/Partition
> > [90943/31] [41:27,678] INFO ### Writing [2] events [41:27,693] INFO
> > ### Sink Task [602953950] has processed [14] events
> >
> > [41:27,787] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [1] records [41:27,787] INFO ### Topic [topic6] Kafka Offset/Partition
> > [91126/31] [41:27,787] INFO ### Writing [1] events [41:27,803] INFO
> > ### Sink Task [602953950] has processed [15] events
> >
> > [41:27,803] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [185] records [41:27,803] INFO ### Topic [topic3] Kafka
> > Offset/Partition [90454/31] ...
> > [41:27,818] INFO ### Topic [topic3] Kafka Offset/Partition [90468/31]
> > [41:27,818] INFO ### Topic [topic4] Kafka Offset/Partition [91648/31]
> > ...
> > [41:27,818] INFO ### Topic [topic4] Kafka Offset/Partition [91665/31]
> > [41:27,818] INFO ### Topic [topic6] Kafka Offset/Partition [91127/31]
> > ...
> > [41:27,818] INFO ### Topic [topic6] Kafka Offset/Partition [91147/31]
> > [41:27,818] INFO ### Topic [topic2] Kafka Offset/Partition [90562/31]
> > ...
> > [41:27,818] INFO ### Topic [topic2] Kafka Offset/Partition [90577/31]
> > [41:27,818] INFO ### Topic [topic9] Kafka Offset/Partition [90944/31]
> > ...
> > [41:27,818] INFO ### Topic [topic9] Kafka Offset/Partition [90958/31]
> > [41:27,818] INFO ### Topic [topic10] Kafka Offset/Partition [91455/31]
> > ...
> > [41:27,818] INFO ### Topic [topic10] Kafka Offset/Partition [91466/31]
> > [41:27,818] INFO ### Topic [topic8] Kafka Offset/Partition [91029/31]
> > ...
> > [41:27,818] INFO ### Topic [topic8] Kafka Offset/Partition [91047/31]
> > [41:27,818] INFO ### Topic [topic5] Kafka Offset/Partition [91171/31]
> > ...
> > [41:27,818] INFO ### Topic [topic5] Kafka Offset/Partition [91186/31]
> > [41:27,818] INFO ### Topic [topic1] Kafka Offset/Partition [91153/31]
> > ...
> > [41:27,818] INFO ### Topic [topic1] Kafka Offset/Partition [91163/31]
> > [41:27,818] INFO ### Topic [topic11] Kafka Offset/Partition [90543/31]
> > ...
> > [41:27,834] INFO ### Topic [topic11] Kafka Offset/Partition [90564/31]
> > [41:27,834] INFO ### Topic [topic7] Kafka Offset/Partition [90929/31]
> > ...
> > [41:27,834] INFO ### Topic [topic7] Kafka Offset/Partition [90948/31]
> > [41:27,834] INFO ### Writing [185] events [41:28,053] INFO ### Sink
> > Task [602953950] has processed [200] events
> >
> > [41:28,178] INFO ### [TopicsToFoldersSinkTask]:[518962883] received
> > [0] records [41:28,178] INFO ### Writing [0] events [41:28,178] INFO
> > ### Sink Task [518962883] has processed [0] events
> >
> > [41:28,178] INFO WorkerSinkTask{id=topicsToFolders-0} Committing
> > offsets
> > (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
> > [41:28,193] INFO Finished WorkerSourceTask{id=foldersToTopics-0}
> > commitOffsets successfully in 0 ms (org.apache.kafka.connect.
> > runtime.WorkerSourceTask:358)
> >
> > [41:28,209] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [0] records [41:28,209] INFO ### Writing [0] events [41:28,209] INFO
> > ### Sink Task [602953950] has processed [200] events
> >
> > [41:28,209] INFO WorkerSinkTask{id=topicsToFolders-1} Committing
> > offsets
> > (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
> > [41:28,209] INFO Reflections took 3141 ms to scan 61 urls, producing
> > 6178 keys and 37307 values  (org.reflections.Reflections:229)
> > [41:28,318] INFO Finished WorkerSourceTask{id=foldersToTopics-1}
> > commitOffsets successfully in 0 ms (org.apache.kafka.connect.
> > runtime.WorkerSourceTask:358)
> >
> > [41:28,521] INFO ### Task [2134126231] retrieved [100] records
> >
> > [41:28,537] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [1] records [41:28,537] INFO ### Topic [topic7] Kafka Offset/Partition
> > [90949/31] [41:28,537] INFO ### Writing [1] events [41:28,537] INFO
> > ### Sink Task [602953950] has processed [201] events
> >
> > [41:28,553] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [99] records [41:28,553] INFO ### Topic [topic3] Kafka
> > Offset/Partition [90469/31] ...
> > [41:28,553] INFO ### Topic [topic3] Kafka Offset/Partition [90476/31]
> > [41:28,553] INFO ### Topic [topic4] Kafka Offset/Partition [91666/31]
> > ...
> > [41:28,553] INFO ### Topic [topic4] Kafka Offset/Partition [91681/31]
> > [41:28,553] INFO ### Topic [topic6] Kafka Offset/Partition [91148/31]
> > ...
> > [41:28,553] INFO ### Topic [topic6] Kafka Offset/Partition [91152/31]
> > [41:28,553] INFO ### Topic [topic2] Kafka Offset/Partition [90578/31]
> > ...
> > [41:28,553] INFO ### Topic [topic2] Kafka Offset/Partition [90587/31]
> > [41:28,553] INFO ### Topic [topic9] Kafka Offset/Partition [90959/31]
> > ...
> > [41:28,553] INFO ### Topic [topic9] Kafka Offset/Partition [90963/31]
> > [41:28,553] INFO ### Topic [topic10] Kafka Offset/Partition [91467/31]
> > ...
> > [41:28,553] INFO ### Topic [topic10] Kafka Offset/Partition [91473/31]
> > [41:28,553] INFO ### Topic [topic8] Kafka Offset/Partition [91048/31]
> > ...
> > [41:28,553] INFO ### Topic [topic8] Kafka Offset/Partition [91054/31]
> > [41:28,553] INFO ### Topic [topic5] Kafka Offset/Partition [91187/31]
> > ...
> > [41:28,568] INFO ### Topic [topic5] Kafka Offset/Partition [91199/31]
> > [41:28,568] INFO ### Topic [topic1] Kafka Offset/Partition [91164/31]
> > ...
> > [41:28,568] INFO ### Topic [topic1] Kafka Offset/Partition [91171/31]
> > [41:28,568] INFO ### Topic [topic11] Kafka Offset/Partition [90565/31]
> > ...
> > [41:28,568] INFO ### Topic [topic11] Kafka Offset/Partition [90572/31]
> > [41:28,568] INFO ### Topic [topic7] Kafka Offset/Partition [90950/31]
> > ...
> > [41:28,568] INFO ### Topic [topic7] Kafka Offset/Partition [90961/31]
> > [41:28,568] INFO ### Writing [99] events [41:28,662] INFO ### Sink
> > Task [602953950] has processed [300] events
> >
> > [41:28,834] INFO ### Task [1714504927] retrieved [100] records
> >
> > [41:28,850] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [1] records [41:28,850] INFO ### Topic [topic11] Kafka
> > Offset/Partition [90573/31] [41:28,850] INFO ### Writing [1] events
> > [41:28,850] INFO ### Sink Task [602953950] has processed [301] events
> >
> > [41:28,850] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [99] records [41:28,850] INFO ### Topic [topic3] Kafka
> > Offset/Partition [90477/31] ...
> > [41:28,850] INFO ### Topic [topic3] Kafka Offset/Partition [90482/31]
> > [41:28,850] INFO ### Topic [topic4] Kafka Offset/Partition [91682/31]
> > ...
> > [41:28,850] INFO ### Topic [topic4] Kafka Offset/Partition [91691/31]
> > [41:28,865] INFO ### Topic [topic6] Kafka Offset/Partition [91153/31]
> > ...
> > [41:28,865] INFO ### Topic [topic6] Kafka Offset/Partition [91157/31]
> > [41:28,865] INFO ### Topic [topic2] Kafka Offset/Partition [90588/31]
> > ...
> > [41:28,865] INFO ### Topic [topic2] Kafka Offset/Partition [90596/31]
> > [41:28,865] INFO ### Topic [topic9] Kafka Offset/Partition [90964/31]
> > ...
> > [41:28,865] INFO ### Topic [topic9] Kafka Offset/Partition [90971/31]
> > [41:28,865] INFO ### Topic [topic10] Kafka Offset/Partition [91474/31]
> > ...
> > [41:28,865] INFO ### Topic [topic10] Kafka Offset/Partition [91485/31]
> > [41:28,865] INFO ### Topic [topic8] Kafka Offset/Partition [91055/31]
> > ...
> > [41:28,865] INFO ### Topic [topic8] Kafka Offset/Partition [91068/31]
> > [41:28,865] INFO ### Topic [topic5] Kafka Offset/Partition [91200/31]
> > ...
> > [41:28,865] INFO ### Topic [topic5] Kafka Offset/Partition [91210/31]
> > [41:28,865] INFO ### Topic [topic1] Kafka Offset/Partition [91172/31]
> > ...
> > [41:28,865] INFO ### Topic [topic1] Kafka Offset/Partition [91176/31]
> > [41:28,865] INFO ### Topic [topic11] Kafka Offset/Partition [90574/31]
> > ...
> > [41:28,865] INFO ### Topic [topic11] Kafka Offset/Partition [90587/31]
> > [41:28,865] INFO ### Topic [topic7] Kafka Offset/Partition [90962/31]
> > ...
> > [41:28,865] INFO ### Topic [topic7] Kafka Offset/Partition [90966/31]
> > [41:28,865] INFO ### Writing [99] events [41:28,943] INFO ### Sink
> > Task [602953950] has processed [400] events
> >
> > [41:29,162] INFO ### [TopicsToFoldersSinkTask]:[518962883] received
> > [0] records [41:29,162] INFO ### Writing [0] events [41:29,162] INFO
> > ### Sink Task [518962883] has processed [0] events
> >
> > [41:29,162] INFO WorkerSinkTask{id=topicsToFolders-0} Committing
> > offsets
> > (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
> >
> > [41:29,178] INFO ### Task [2134126231] retrieved [100] records
> >
> > [41:29,178] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [1] records [41:29,178] INFO ### Topic [topic11] Kafka
> > Offset/Partition [90588/31] [41:29,178] INFO ### Writing [1] events
> > [41:29,178] INFO ### Sink Task [602953950] has processed [401] events
> >
> > [41:29,193] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [99] records [41:29,193] INFO ### Topic [topic3] Kafka
> > Offset/Partition [90483/31] ...
> > [41:29,193] INFO ### Topic [topic3] Kafka Offset/Partition [90492/31]
> > [41:29,193] INFO ### Topic [topic4] Kafka Offset/Partition [91692/31]
> > ...
> > [41:29,193] INFO ### Topic [topic4] Kafka Offset/Partition [91699/31]
> > [41:29,193] INFO ### Topic [topic6] Kafka Offset/Partition [91158/31]
> > ...
> > [41:29,193] INFO ### Topic [topic6] Kafka Offset/Partition [91161/31]
> > [41:29,193] INFO ### Topic [topic2] Kafka Offset/Partition [90597/31]
> > ...
> > [41:29,193] INFO ### Topic [topic2] Kafka Offset/Partition [90607/31]
> > [41:29,193] INFO ### Topic [topic9] Kafka Offset/Partition [90972/31]
> > ...
> > [41:29,193] INFO ### Topic [topic9] Kafka Offset/Partition [90981/31]
> > [41:29,193] INFO ### Topic [topic10] Kafka Offset/Partition [91486/31]
> > ...
> > [41:29,193] INFO ### Topic [topic10] Kafka Offset/Partition [91492/31]
> > [41:29,193] INFO ### Topic [topic8] Kafka Offset/Partition [91069/31]
> > ...
> > [41:29,193] INFO ### Topic [topic8] Kafka Offset/Partition [91074/31]
> > [41:29,193] INFO ### Topic [topic5] Kafka Offset/Partition [91211/31]
> > ...
> > [41:29,193] INFO ### Topic [topic5] Kafka Offset/Partition [91218/31]
> > [41:29,193] INFO ### Topic [topic1] Kafka Offset/Partition [91177/31]
> > ...
> > [41:29,193] INFO ### Topic [topic1] Kafka Offset/Partition [91186/31]
> > [41:29,193] INFO ### Topic [topic11] Kafka Offset/Partition [90589/31]
> > ...
> > [41:29,193] INFO ### Topic [topic11] Kafka Offset/Partition [90603/31]
> > [41:29,193] INFO ### Topic [topic7] Kafka Offset/Partition [90967/31]
> > ...
> > [41:29,193] INFO ### Topic [topic7] Kafka Offset/Partition [90976/31]
> > [41:29,193] INFO ### Writing [99] events [41:29,271] INFO ### Sink
> > Task [602953950] has processed [500] events
> >
> > [41:29,209] INFO Finished WorkerSourceTask{id=foldersToTopics-0}
> > commitOffsets successfully in 0 ms (org.apache.kafka.connect.
> > runtime.WorkerSourceTask:358)
> > [41:29,271] INFO WorkerSinkTask{id=topicsToFolders-1} Committing
> > offsets
> > (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
> > [41:29,334] INFO Finished WorkerSourceTask{id=foldersToTopics-1}
> > commitOffsets successfully in 0 ms (org.apache.kafka.connect.
> > runtime.WorkerSourceTask:358)
> >
> > [41:29,865] INFO ### Task [1714504927] retrieved [100] records
> >
> > [41:29,865] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [1] records [41:29,865] INFO ### Topic [topic5] Kafka Offset/Partition
> > [91219/31] [41:29,865] INFO ### Writing [1] events [41:29,865] INFO
> > ### Sink Task [602953950] has processed [501] events
> >
> > [41:29,881] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [99] records [41:29,881] INFO ### Topic [topic3] Kafka
> > Offset/Partition [90493/31] ...
> > [41:29,881] INFO ### Topic [topic3] Kafka Offset/Partition [90502/31]
> > [41:29,881] INFO ### Topic [topic4] Kafka Offset/Partition [91700/31]
> > ...
> > [41:29,881] INFO ### Topic [topic4] Kafka Offset/Partition [91707/31]
> > [41:29,881] INFO ### Topic [topic6] Kafka Offset/Partition [91162/31]
> > ...
> > [41:29,881] INFO ### Topic [topic6] Kafka Offset/Partition [91168/31]
> > [41:29,881] INFO ### Topic [topic2] Kafka Offset/Partition [90608/31]
> > ...
> > [41:29,881] INFO ### Topic [topic2] Kafka Offset/Partition [90617/31]
> > [41:29,881] INFO ### Topic [topic9] Kafka Offset/Partition [90982/31]
> > ...
> > [41:29,881] INFO ### Topic [topic9] Kafka Offset/Partition [90991/31]
> > [41:29,881] INFO ### Topic [topic10] Kafka Offset/Partition [91493/31]
> > ...
> > [41:29,881] INFO ### Topic [topic10] Kafka Offset/Partition [91497/31]
> > [41:29,881] INFO ### Topic [topic8] Kafka Offset/Partition [91075/31]
> > ...
> > [41:29,881] INFO ### Topic [topic8] Kafka Offset/Partition [91081/31]
> > [41:29,881] INFO ### Topic [topic5] Kafka Offset/Partition [91220/31]
> > ...
> > [41:29,881] INFO ### Topic [topic5] Kafka Offset/Partition [91228/31]
> > [41:29,881] INFO ### Topic [topic1] Kafka Offset/Partition [91187/31]
> > ...
> > [41:29,881] INFO ### Topic [topic1] Kafka Offset/Partition [91197/31]
> > [41:29,881] INFO ### Topic [topic11] Kafka Offset/Partition [90604/31]
> > ...
> > [41:29,881] INFO ### Topic [topic11] Kafka Offset/Partition [90611/31]
> > [41:29,881] INFO ### Topic [topic7] Kafka Offset/Partition [90977/31]
> > ...
> > [41:29,881] INFO ### Topic [topic7] Kafka Offset/Partition [90990/31]
> > [41:29,881] INFO ### Writing [99] events [41:29,912] INFO ### Task
> > [2134126231] retrieved [100] records [41:29,943] INFO ### Sink Task
> > [602953950] has processed [600] events
> >
> > [41:29,943] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [1] records [41:29,943] INFO ### Topic [topic5] Kafka Offset/Partition
> > [91229/31] [41:29,943] INFO ### Writing [1] events .[41:29,943] INFO
> > ### Sink Task [602953950] has processed [601] events
> >
> > [41:29,943] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [99] records [41:29,943] INFO ### Topic [topic3] Kafka
> > Offset/Partition [90503/31] ...
> > [41:29,943] INFO ### Topic [topic3] Kafka Offset/Partition [90513/31]
> > [41:29,943] INFO ### Topic [topic4] Kafka Offset/Partition [91708/31]
> > ...
> > [41:29,943] INFO ### Topic [topic4] Kafka Offset/Partition [91715/31]
> > [41:29,943] INFO ### Topic [topic6] Kafka Offset/Partition [91169/31]
> > ...
> > [41:29,943] INFO ### Topic [topic6] Kafka Offset/Partition [91177/31]
> > [41:29,943] INFO ### Topic [topic2] Kafka Offset/Partition [90618/31]
> > ...
> > [41:29,943] INFO ### Topic [topic2] Kafka Offset/Partition [90627/31]
> > [41:29,943] INFO ### Topic [topic9] Kafka Offset/Partition [90992/31]
> > ...
> > [41:29,943] INFO ### Topic [topic9] Kafka Offset/Partition [90994/31]
> > [41:29,943] INFO ### Topic [topic10] Kafka Offset/Partition [91498/31]
> > ...
> > [41:29,943] INFO ### Topic [topic10] Kafka Offset/Partition [91507/31]
> > [41:29,943] INFO ### Topic [topic8] Kafka Offset/Partition [91082/31]
> > ...
> > [41:29,943] INFO ### Topic [topic8] Kafka Offset/Partition [91093/31]
> > [41:29,943] INFO ### Topic [topic5] Kafka Offset/Partition [91230/31]
> > ...
> > [41:29,943] INFO ### Topic [topic5] Kafka Offset/Partition [91236/31]
> > [41:29,943] INFO ### Topic [topic1] Kafka Offset/Partition [91198/31]
> > ...
> > [41:29,943] INFO ### Topic [topic1] Kafka Offset/Partition [91208/31]
> > [41:29,943] INFO ### Topic [topic11] Kafka Offset/Partition [90612/31]
> > ...
> > [41:29,943] INFO ### Topic [topic11] Kafka Offset/Partition [90620/31]
> > [41:29,943] INFO ### Topic [topic7] Kafka Offset/Partition [90991/31]
> > ...
> > [41:29,943] INFO ### Topic [topic7] Kafka Offset/Partition [90999/31]
> > [41:29,943] INFO ### Writing [99] events [41:30,021] INFO ### Sink
> > Task [602953950] has processed [700] events
> >
> > [41:30,162] INFO ### [TopicsToFoldersSinkTask]:[518962883] received
> > [0] records [41:30,162] INFO ### Writing [0] events [41:30,162] INFO
> > ### Sink Task [518962883] has processed [0] events
> >
> > [41:30,162] INFO WorkerSinkTask{id=topicsToFolders-0} Committing
> > offsets
> > (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
> >
> > [41:30,225] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [0] records [41:30,225] INFO ### Writing [0] events [41:30,225] INFO
> > ### Sink Task [602953950] has processed [700] events
> >
> > [41:30,225] INFO WorkerSinkTask{id=topicsToFolders-1} Committing
> > offsets
> > (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
> > [41:30,225] INFO Finished WorkerSourceTask{id=foldersToTopics-0}
> > commitOffsets successfully in 0 ms (org.apache.kafka.connect.
> > runtime.WorkerSourceTask:358)
> > [41:30,350] INFO Finished WorkerSourceTask{id=foldersToTopics-1}
> > commitOffsets successfully in 0 ms (org.apache.kafka.connect.
> > runtime.WorkerSourceTask:358)
> >
> > [41:30,475] INFO ### Task [2134126231] retrieved [100] records
> > [41:30,490] INFO ### Task [2134126231] has read all of its files
> > (c.b.b.pp.kafkaConnect.tasks.RecordRetriever:65)
> >
> > [41:30,490] INFO ### Task [1714504927] retrieved [100] records
> >
> > [41:30,490] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [1] records [41:30,490] INFO ### Topic [topic11] Kafka
> > Offset/Partition [90621/31] [41:30,490] INFO ### Writing [1] events
> > [41:30,490] INFO ### Sink Task [602953950] has processed [701] events
> >
> > [41:30,506] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [101] records [41:30,506] INFO ### Topic [topic3] Kafka
> > Offset/Partition [90514/31] ...
> > [41:30,506] INFO ### Topic [topic3] Kafka Offset/Partition [90522/31]
> > [41:30,506] INFO ### Topic [topic4] Kafka Offset/Partition [91716/31]
> > ...
> > [41:30,506] INFO ### Topic [topic4] Kafka Offset/Partition [91728/31]
> > [41:30,506] INFO ### Topic [topic6] Kafka Offset/Partition [91178/31]
> > ...
> > [41:30,506] INFO ### Topic [topic6] Kafka Offset/Partition [91186/31]
> > [41:30,506] INFO ### Topic [topic2] Kafka Offset/Partition [90628/31]
> > ...
> > [41:30,521] INFO ### Topic [topic2] Kafka Offset/Partition [90634/31]
> > [41:30,521] INFO ### Topic [topic9] Kafka Offset/Partition [90995/31]
> > ...
> > [41:30,521] INFO ### Topic [topic9] Kafka Offset/Partition [91003/31]
> > [41:30,521] INFO ### Topic [topic10] Kafka Offset/Partition [91508/31]
> > ...
> > [41:30,521] INFO ### Topic [topic10] Kafka Offset/Partition [91516/31]
> > [41:30,521] INFO ### Topic [topic8] Kafka Offset/Partition [91094/31]
> > ...
> > [41:30,521] INFO ### Topic [topic8] Kafka Offset/Partition [91105/31]
> > [41:30,521] INFO ### Topic [topic5] Kafka Offset/Partition [91237/31]
> > ...
> > [41:30,521] INFO ### Topic [topic5] Kafka Offset/Partition [91242/31]
> > [41:30,521] INFO ### Topic [topic1] Kafka Offset/Partition [91209/31]
> > ...
> > [41:30,521] INFO ### Topic [topic1] Kafka Offset/Partition [91212/31]
> > ...
> > [41:30,521] INFO ### Topic [topic11] Kafka Offset/Partition [90633/31]
> > [41:30,521] INFO ### Topic [topic7] Kafka Offset/Partition [91000/31]
> > ...
> > [41:30,521] INFO ### Topic [topic7] Kafka Offset/Partition [91010/31]
> > [41:30,521] INFO ### Writing [101] events [41:30,584] INFO ### Sink
> > Task [602953950] has processed [802] events
> >
> > [41:30,584] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [98] records [41:30,584] INFO ### Topic [topic3] Kafka
> > Offset/Partition [90523/31] ...
> > [41:30,584] INFO ### Topic [topic3] Kafka Offset/Partition [90530/31]
> > [41:30,584] INFO ### Topic [topic4] Kafka Offset/Partition [91729/31]
> > ...
> > [41:30,584] INFO ### Topic [topic4] Kafka Offset/Partition [91731/31]
> > [41:30,584] INFO ### Topic [topic6] Kafka Offset/Partition [91187/31]
> > ...
> > [41:30,584] INFO ### Topic [topic6] Kafka Offset/Partition [91200/31]
> > [41:30,584] INFO ### Topic [topic2] Kafka Offset/Partition [90635/31]
> > ...
> > [41:30,584] INFO ### Topic [topic2] Kafka Offset/Partition [90647/31]
> > [41:30,584] INFO ### Topic [topic9] Kafka Offset/Partition [91004/31]
> > ...
> > [41:30,584] INFO ### Topic [topic9] Kafka Offset/Partition [91010/31]
> > [41:30,584] INFO ### Topic [topic10] Kafka Offset/Partition [91517/31]
> > ...
> > [41:30,584] INFO ### Topic [topic10] Kafka Offset/Partition [91525/31]
> > [41:30,584] INFO ### Topic [topic8] Kafka Offset/Partition [91106/31]
> > ...
> > [41:30,584] INFO ### Topic [topic8] Kafka Offset/Partition [91116/31]
> > [41:30,584] INFO ### Topic [topic5] Kafka Offset/Partition [91243/31]
> > ...
> > [41:30,584] INFO ### Topic [topic5] Kafka Offset/Partition [91250/31]
> > [41:30,584] INFO ### Topic [topic1] Kafka Offset/Partition [91213/31]
> > ...
> > [41:30,584] INFO ### Topic [topic1] Kafka Offset/Partition [91222/31]
> > [41:30,584] INFO ### Topic [topic11] Kafka Offset/Partition [90634/31]
> > ...
> > [41:30,584] INFO ### Topic [topic11] Kafka Offset/Partition [90639/31]
> > [41:30,584] INFO ### Topic [topic7] Kafka Offset/Partition [91011/31]
> > ...
> > [41:30,584] INFO ### Topic [topic7] Kafka Offset/Partition [91019/31]
> > [41:30,584] INFO ### Writing [98] events [41:30,646] INFO ### Sink
> > Task [602953950] has processed [900] events
> >
> > [41:30,881] INFO ### Task [1714504927] retrieved [100] records
> > [41:30,881] INFO ### Task [1714504927] has read all of its files
> > (c.b.b.pp.kafkaConnect.tasks.RecordRetriever:65)
> >
> > [41:30,881] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [1] records [41:30,881] INFO ### Topic [topic8] Kafka Offset/Partition
> > [91117/31] [41:30,881] INFO ### Writing [1] events [41:30,896] INFO
> > ### Sink Task [602953950] has processed [901] events
> >
> > [41:30,896] INFO ### [TopicsToFoldersSinkTask]:[602953950] received
> > [99] records [41:30,896] INFO ### Topic [topic3] Kafka
> > Offset/Partition [90531/31] ...
> > [41:30,896] INFO ### Topic [topic3] Kafka Offset/Partition [90539/31]
> > [41:30,896] INFO ### Topic [topic4] Kafka Offset/Partition [91732/31]
> > ...
> > [41:30,896] INFO ### Topic [topic4] Kafka Offset/Partition [91737/31]
> > [41:30,896] INFO ### Topic [topic6] Kafka Offset/Partition [91201/31]
> > ...
> > [41:30,896] INFO ### Topic [topic6] Kafka Offset/Partition [91208/31]
> > [41:30,896] INFO ### Topic [topic2] Kafka Offset/Partition [90648/31]
> > ...
> > [41:30,896] INFO ### Topic [topic2] Kafka Offset/Partition [90657/31]
> > [41:30,896] INFO ### Topic [topic9] Kafka Offset/Partition [91011/31]
> > ...
> > [41:30,896] INFO ### Topic [topic9] Kafka Offset/Partition [91020/31]
> > [41:30,896] INFO ### Topic [topic10] Kafka Offset/Partition [91526/31]
> > ...
> > [41:30,896] INFO ### Topic [topic10] Kafka Offset/Partition [91534/31]
> > [41:30,896] INFO ### Topic [topic8] Kafka Offset/Partition [91118/31]
> > ...
> > [41:30,896] INFO ### Topic [topic8] Kafka Offset/Partition [91122/31]
> > [41:30,896] INFO ### Topic [topic5] Kafka Offset/Partition [91251/31]
> > ...
> > [41:30,896] INFO ### Topic [topic5] Kafka Offset/Partition [91262/31]
> > [41:30,896] INFO ### Topic [topic1] Kafka Offset/Partition [91223/31]
> > ...
> > [41:30,896] INFO ### Topic [topic1] Kafka Offset/Partition [91231/31]
> > [41:30,896] INFO ### Topic [topic11] Kafka Offset/Partition [90640/31]
> > ...
> > [41:30,896] INFO ### Topic [topic11] Kafka Offset/Partition [90650/31]
> > [41:30,896] INFO ### Topic [topic7] Kafka Offset/Partition [91020/31]
> > ...
> > [41:30,896] INFO ### Topic [topic7] Kafka Offset/Partition [91029/31]
> > [41:30,896] INFO ### Writing [99] events [41:30,943] INFO ### Sink
> > Task [602953950] has processed [1000] events
> >
> > [41:31,178] INFO ### [TopicsToFoldersSinkTask]:[518962883] received
> > [0] records [41:31,178] INFO ### Writing [0] events [41:31,178] INFO
> > ### Sink Task [518962883] has processed [0] events
> >
> > [41:31,178] INFO WorkerSinkTask{id=topicsToFolders-0} Committing
> > offsets
> > (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
> >
> > -----Original Message-----
> > From: Gwen Shapira [mailto:gwen@confluent.io]
> > Sent: 26 April 2017 18:29
> > To: dev@kafka.apache.org
> > Subject: Re: Kafka Connect and Partitions
> >
> > Hi,
> >
> > I'll need a bit more detail to help :) Are you writing a connector or
> > trying to use an existing one? If existing, which connector? Is it
> > source or sink?
> >
> > Here are few things I'd look at when debugging:
> >
> > * Is the connector reading from the topic you think you are reading?
> > * Do you actually have 4 tasks? Are they all running? Are there errors?
> > What happens if you stop the only task doing the work?
> > * Is the one task subscribed to all partitions? How did you check that?
> > * Do you have data in all 50 partitions?
> > * Anything interesting in the log?
> >
> > I hope this helps you get started :)
> > In general, if all 50 partitions have data and all 4 tasks are running
> > but only one is actually subscribed to partitions, it sounds like a
> > bug in consumer rebalance - but this also seems highly unlikely, so
> > I'm searching for other causes.
> >
> > Gwen
> >
> > On Wed, Apr 26, 2017 at 8:57 AM, <david.franklin@bt.com> wrote:
> >
> > > I've defined several Kafka Connect tasks via the tasks.max property
> > > to process a set of topics.
> > > Initially I set the partitions on the topics to 1 and partitioned
> > > the topics across the tasks programmatically so that each task
> > > processed a subset of the topics (or so I thought ...).
> > > I then noticed that only 1 of the tasks ever read any Kafka messages
> > > and concluded that the topics property defined in
> > > connector.properties cannot be split across the tasks in this way.
> > >
> > > It then dawned on me that perhaps I ought to be partitioning the
> > > topic at creation time so that each task would be assigned a set of
> > > partitions across the entire set of topics.
> > >
> > > However, that seems not to work either - again only 1 task does any
> > > work - and this task reads from the same partition for every topic
> > > (I have defined
> > > 50 partitions and 4 tasks so would expect (naively perhaps) each
> > > task to get a dozen or so partitions for each topic).
> > >
> > > Could some kind soul point out the error of my ways please and tell
> > > me how to achieve this properly.
> > >
> > > Thanks in advance,
> > > David
> > >
> > >
> > >
> > >
> >
> >
> > --
> > *Gwen Shapira*
> > Product Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter <https://twitter.com/ConfluentInc> | blog <
> > http://www.confluent.io/blog>
> >
>
>
>
> --
> *Gwen Shapira*
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter <https://twitter.com/ConfluentInc> | blog <
> http://www.confluent.io/blog>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message