Ryan Kennedy created KAFKA-5773:
-----------------------------------
Summary: ConnectRecord.equals() doesn't properly handle array keys/values
Key: KAFKA-5773
URL: https://issues.apache.org/jira/browse/KAFKA-5773
Project: Kafka
Issue Type: Bug
Components: KafkaConnect
Affects Versions: 0.11.0.0, 0.9.0.1
Reporter: Ryan Kennedy
Priority: Minor
ConnectRecord.equals() isn't handling comparison properly when the key or value is an array
(a byte array, for instance). The following test will fail because ConnectRecord is using
.equals() to compare two byte arrays, which is doing an identity check instead of comparing
the arrays themselves.
{code:java}
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;
import java.util.Collections;
import static org.assertj.core.api.Assertions.assertThat;
public class SourceRecordTest {
@Test
public void testEquals() {
byte[] firstBytes = "first".getBytes();
byte[] secondBytes = "first".getBytes();
SourceRecord firstRecord = new SourceRecord(Collections.EMPTY_MAP,
Collections.EMPTY_MAP,"topic", 1, Schema.BYTES_SCHEMA, firstBytes);
SourceRecord secondRecord = new SourceRecord(Collections.EMPTY_MAP,
Collections.EMPTY_MAP,"topic", 1, Schema.BYTES_SCHEMA, firstBytes);
SourceRecord thirdRecord = new SourceRecord(Collections.EMPTY_MAP,
Collections.EMPTY_MAP,"topic", 1, Schema.BYTES_SCHEMA, secondBytes);
assertThat(firstRecord).isEqualTo(secondRecord);
assertThat(firstRecord).isEqualTo(thirdRecord);
}
}
{code}
As a result, I have a failing unit test that should otherwise pass:
{code:java}
List<SourceRecord> sourceRecords = task.poll();
final SourceRecord expectedRecord = new SourceRecord(
ImmutableMap.of(JdbcEventSourceTask.PARTITION_DATABASE, JDBC_URL),
ImmutableMap.of(JdbcEventSourceTask.OFFSET_ID, publishedId),
"topicname",
1,
Schema.BYTES_SCHEMA, "MessageKey".getBytes(),
Schema.BYTES_SCHEMA, "MessageValue".getBytes());
assertThat(sourceRecords).containsOnly(expectedRecord);
{code}
The workaround at the moment is to implement a custom Comparator<SourceRecord> instance
to use with assertj. But I wonder if there's anything in Kafka Connect itself that may be
affected by this issue.
The code causing the issue can be seen [here](https://github.com/apache/kafka/blob/0.11.0.0/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java#L108-L109)
(keys) and [here](https://github.com/apache/kafka/blob/0.11.0.0/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java#L112-L113)
(values). The comparison there should use (when arrays are present) java.util.Arrays.equals(byte[],
byte[]) to perform an equality check instead of an identity check, assuming that's the desired
behavior.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
|