kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Amber Liu (Jira)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-12468) Initial offsets are copied from source to target cluster
Date Thu, 10 Jun 2021 16:55:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17361084#comment-17361084
] 

Amber Liu commented on KAFKA-12468:
-----------------------------------

I was using standalone mode with active-passive setup and saw negative offsets in the past
as well. One of the reasons I found was due to consumer request timeout, e.g. org.apache.kafka.common.errors.DisconnectException.
I increased the request timeout and tasks.max and the offsets are synced correctly now.
{code:java}
# consumer, need to set higher timeout
source.admin.request.timeout.ms = 180000
source.consumer.session.timeout.ms = 180000
source.consumer.request.timeout.ms = 180000{code}
 

> Initial offsets are copied from source to target cluster
> --------------------------------------------------------
>
>                 Key: KAFKA-12468
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12468
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 2.7.0
>            Reporter: Bart De Neuter
>            Priority: Major
>
> We have an active-passive setup where  the 3 connectors from mirror maker 2 (heartbeat,
checkpoint and source) are running on a dedicated Kafka connect cluster on the target cluster.
> Offset syncing is enabled as specified by KIP-545. But when activated, it seems the offsets
from the source cluster are initially copied to the target cluster without translation. This
causes a negative lag for all synced consumer groups. Only when we reset the offsets for each
topic/partition on the target cluster and produce a record on the topic/partition in the source,
the sync starts working correctly. 
> I would expect that the consumer groups are synced but that the current offsets of the
source cluster are not copied to the target cluster.
> This is the configuration we are currently using:
> Heartbeat connector
>  
> {code:xml}
> {
>   "name": "mm2-mirror-heartbeat",
>   "config": {
>     "name": "mm2-mirror-heartbeat",
>     "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
>     "source.cluster.alias": "eventador",
>     "target.cluster.alias": "msk",
>     "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
>     "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
>     "topics": ".*",
>     "groups": ".*",
>     "tasks.max": "1",
>     "replication.policy.class": "CustomReplicationPolicy",
>     "sync.group.offsets.enabled": "true",
>     "sync.group.offsets.interval.seconds": "5",
>     "emit.checkpoints.enabled": "true",
>     "emit.checkpoints.interval.seconds": "30",
>     "emit.heartbeats.interval.seconds": "30",
>     "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
>     "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
> Checkpoint connector:
> {code:xml}
> {
>   "name": "mm2-mirror-checkpoint",
>   "config": {
>     "name": "mm2-mirror-checkpoint",
>     "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
>     "source.cluster.alias": "eventador",
>     "target.cluster.alias": "msk",
>     "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
>     "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
>     "topics": ".*",
>     "groups": ".*",
>     "tasks.max": "40",
>     "replication.policy.class": "CustomReplicationPolicy",
>     "sync.group.offsets.enabled": "true",
>     "sync.group.offsets.interval.seconds": "5",
>     "emit.checkpoints.enabled": "true",
>     "emit.checkpoints.interval.seconds": "30",
>     "emit.heartbeats.interval.seconds": "30",
>     "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
>     "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  Source connector:
> {code:xml}
> {
>   "name": "mm2-mirror-source",
>   "config": {
>     "name": "mm2-mirror-source",
>     "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
>     "source.cluster.alias": "eventador",
>     "target.cluster.alias": "msk",
>     "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>",
>     "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>",
>     "topics": ".*",
>     "groups": ".*",
>     "tasks.max": "40",
>     "replication.policy.class": "CustomReplicationPolicy",
>     "sync.group.offsets.enabled": "true",
>     "sync.group.offsets.interval.seconds": "5",
>     "emit.checkpoints.enabled": "true",
>     "emit.checkpoints.interval.seconds": "30",
>     "emit.heartbeats.interval.seconds": "30",
>     "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
>     "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message