kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Randall Hauch (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector
Date Fri, 06 Oct 2017 16:29:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194806#comment-16194806

Randall Hauch commented on KAFKA-4794:

This KIP didn't get enough votes to make it into the 1.0 release, so I think it's worth time
to try to update/improve the Motivation section of the KIP to explain why this is important.

I have another use case that requires this capability, but it actually doesn't require the
connector to do any kind of monitoring or actions. I'll try to first describe it in a general
way, and then will give a concrete example.

In many cases, it is possible to determine the tasks based only on the external system. But
in other cases, the number of tasks and their configuration may also depend upon the offsets
persisted by the tasks run previously. After all, the offsets are an excellent and natural
way for the tasks to record their progress.

Consider a source connector that needs perform a set of actions in parallel. Ideally the connector
could use a separate task for each of these actions, and then have Connect manage and distribute
these tasks for the connector. However, the tasks cannot coordinate directly, but they can
record their progress in offsets (as those tasks generate records) and can call for a task
reconfiguration. The problem is that upon reconfiguration, the connector can't read the offsets
to know the state of those tasks, and therefore can't really configure the tasks appropriately.

A concrete example of this use case involves recent work in the Debezium community. The Debezium
MySQL CDC connector can be configured with a set of tables that are to be captured/copied.
When the connector starts up, it performs a consistent snapshot of these tables and then reads
the MySQL binlog to capture the changes committed to those tables after the snapshot was started.
However, if the connector is restarted with a different table filter, such that there are
now several existing tables that are to be captured. The developers are changing the connector
to be able to detect this case and asynchronously perform a snapshot of those additional tables
and then read the binlog to capture subsequent changes. Snapshots of very large tables can
take a long time (e.g., days) to run, so the connector shouldn't stop reading the binlog for
the original set of tables. This could be implemented as two instances of the existing task
each doing the same thing but just on different sets of tables (the original set and the recently
added tables). However, at some point both tasks approach the current head of the binlog,
and at that point the connector only needs one of the tasks. The snapshotting task could request
the tasks be reconfigured, and Connect would stop the two tasks and ask the Connector implementation
to compute the configs for the new task(s). *If the Connector had access to the offsets, it
could detect that the snapshot task had completed, and could know enough to return the configuration
for only the main task*, which then would do a bit of reconciliation before continuing as

> Add access to OffsetStorageReader from SourceConnector
> ------------------------------------------------------
>                 Key: KAFKA-4794
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4794
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>    Affects Versions:
>            Reporter: Florian Hussonnois
>            Priority: Minor
>              Labels: needs-kip
>             Fix For: 1.1.0
> Currently the offsets storage is only accessible from SourceTask to able to initialize
properly tasks after a restart, a crash or a reconfiguration request.
> To implement more complex connectors that need to track the progression of each task
it would helpful to have access to an OffsetStorageReader instance from the SourceConnector.
> In that way, we could have a background thread that could request a tasks reconfiguration
based on source offsets.
> This improvement proposal comes from a customer project that needs to periodically scan
directories on a shared storage for detecting and for streaming new files into Kafka.
> The connector implementation is pretty straightforward.
> The connector uses a background thread to periodically scan directories. When new inputs
files are detected a tasks reconfiguration is requested. Then the connector assigns a file
subset to each task. 
> Each task stores sources offsets for the last sent record. The source offsets data are:
>  - the size of file
>  - the bytes offset
>  - the bytes size 
> Tasks become idle when the assigned files are completed (in : recordBytesOffsets + recordBytesSize
= fileBytesSize).
> Then, the connector should be able to track offsets for each assigned file. 
> When all tasks has finished the connector can stop them or assigned new files by requesting
tasks reconfiguration. 
> Moreover, another advantage of monitoring source offsets from the connector is detect
slow or failed tasks and if necessary to be able to restart all tasks.
> If you think this improvement is OK, I can work a pull request.
> Thanks,

This message was sent by Atlassian JIRA

View raw message