Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 44813200BF5 for ; Fri, 2 Dec 2016 14:34:56 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 43741160B2F; Fri, 2 Dec 2016 13:34:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 11877160B41 for ; Fri, 2 Dec 2016 14:34:53 +0100 (CET) Received: (qmail 71034 invoked by uid 500); 2 Dec 2016 13:34:53 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 69646 invoked by uid 99); 2 Dec 2016 13:34:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Dec 2016 13:34:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 11171F17F6; Fri, 2 Dec 2016 13:34:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Fri, 02 Dec 2016 13:35:21 -0000 Message-Id: <90c08eb21c644e1d8e39d06fcb2d539f@git.apache.org> In-Reply-To: <627ed550fa9e4aed8e3752516869c22e@git.apache.org> References: <627ed550fa9e4aed8e3752516869c22e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [31/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module. archived-at: Fri, 02 Dec 2016 13:34:56 -0000 http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java new file mode 100644 index 0000000..2e06160 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.Assert; +import org.junit.Test; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class FlinkKafkaProducerBaseTest { + + /** + * Tests that the constructor eagerly checks bootstrap servers are set in config + */ + @Test(expected = IllegalArgumentException.class) + public void testInstantiationFailsWhenBootstrapServersMissing() throws Exception { + // no bootstrap servers set in props + Properties props = new Properties(); + // should throw IllegalArgumentException + new DummyFlinkKafkaProducer<>(props, null); + } + + /** + * Tests that constructor defaults to key value serializers in config to byte array deserializers if not set + */ + @Test + public void testKeyValueDeserializersSetIfMissing() throws Exception { + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345"); + // should set missing key value deserializers + new DummyFlinkKafkaProducer<>(props, null); + + assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); + assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); + assertTrue(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).equals(ByteArraySerializer.class.getCanonicalName())); + assertTrue(props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).equals(ByteArraySerializer.class.getCanonicalName())); + } + + /** + * Tests that partitions list is determinate and correctly provided to custom partitioner + */ + @Test + public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception { + KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class); + RuntimeContext mockRuntimeContext = mock(RuntimeContext.class); + when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0); + when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1); + + DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer( + FakeStandardProducerConfig.get(), mockPartitioner); + producer.setRuntimeContext(mockRuntimeContext); + + producer.open(new Configuration()); + + // the internal mock KafkaProducer will return an out-of-order list of 4 partitions, + // which should be sorted before provided to the custom partitioner's open() method + int[] correctPartitionList = {0, 1, 2, 3}; + verify(mockPartitioner).open(0, 1, correctPartitionList); + } + + /** + * Test ensuring that the producer is not dropping buffered records.; + * we set a timeout because the test will not finish if the logic is broken + */ + @Test(timeout=5000) + public void testAtLeastOnceProducer() throws Throwable { + runAtLeastOnceTest(true); + } + + /** + * Ensures that the at least once producing test fails if the flushing is disabled + */ + @Test(expected = AssertionError.class, timeout=5000) + public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable { + runAtLeastOnceTest(false); + } + + private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable { + final AtomicBoolean snapshottingFinished = new AtomicBoolean(false); + final DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer<>( + FakeStandardProducerConfig.get(), null, snapshottingFinished); + producer.setFlushOnCheckpoint(flushOnCheckpoint); + + OneInputStreamOperatorTestHarness testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink(producer)); + + testHarness.open(); + + for (int i = 0; i < 100; i++) { + testHarness.processElement(new StreamRecord<>("msg-" + i)); + } + + // start a thread confirming all pending records + final Tuple1 runnableError = new Tuple1<>(null); + final Thread threadA = Thread.currentThread(); + + Runnable confirmer = new Runnable() { + @Override + public void run() { + try { + MockProducer mp = producer.getProducerInstance(); + List pending = mp.getPending(); + + // we need to find out if the snapshot() method blocks forever + // this is not possible. If snapshot() is running, it will + // start removing elements from the pending list. + synchronized (threadA) { + threadA.wait(500L); + } + // we now check that no records have been confirmed yet + Assert.assertEquals(100, pending.size()); + Assert.assertFalse("Snapshot method returned before all records were confirmed", + snapshottingFinished.get()); + + // now confirm all checkpoints + for (Callback c: pending) { + c.onCompletion(null, null); + } + pending.clear(); + } catch(Throwable t) { + runnableError.f0 = t; + } + } + }; + Thread threadB = new Thread(confirmer); + threadB.start(); + + // this should block: + testHarness.snapshot(0, 0); + + synchronized (threadA) { + threadA.notifyAll(); // just in case, to let the test fail faster + } + Assert.assertEquals(0, producer.getProducerInstance().getPending().size()); + Deadline deadline = FiniteDuration.apply(5, "s").fromNow(); + while (deadline.hasTimeLeft() && threadB.isAlive()) { + threadB.join(500); + } + Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive()); + if (runnableError.f0 != null) { + throw runnableError.f0; + } + + testHarness.close(); + } + + + // ------------------------------------------------------------------------ + + private static class DummyFlinkKafkaProducer extends FlinkKafkaProducerBase { + private static final long serialVersionUID = 1L; + + private transient MockProducer prod; + private AtomicBoolean snapshottingFinished; + + @SuppressWarnings("unchecked") + public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner, AtomicBoolean snapshottingFinished) { + super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner); + this.snapshottingFinished = snapshottingFinished; + } + + // constructor variant for test irrelated to snapshotting + @SuppressWarnings("unchecked") + public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) { + super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner); + this.snapshottingFinished = new AtomicBoolean(true); + } + + @Override + protected KafkaProducer getKafkaProducer(Properties props) { + this.prod = new MockProducer(); + return this.prod; + } + + @Override + public void snapshotState(FunctionSnapshotContext ctx) throws Exception { + // call the actual snapshot state + super.snapshotState(ctx); + // notify test that snapshotting has been done + snapshottingFinished.set(true); + } + + @Override + protected void flush() { + this.prod.flush(); + } + + public MockProducer getProducerInstance() { + return this.prod; + } + } + + private static class MockProducer extends KafkaProducer { + List pendingCallbacks = new ArrayList<>(); + + public MockProducer() { + super(FakeStandardProducerConfig.get()); + } + + @Override + public Future send(ProducerRecord record) { + throw new UnsupportedOperationException("Unexpected"); + } + + @Override + public Future send(ProducerRecord record, Callback callback) { + pendingCallbacks.add(callback); + return null; + } + + @Override + public List partitionsFor(String topic) { + List list = new ArrayList<>(); + // deliberately return an out-of-order partition list + list.add(new PartitionInfo(topic, 3, null, null, null)); + list.add(new PartitionInfo(topic, 1, null, null, null)); + list.add(new PartitionInfo(topic, 0, null, null, null)); + list.add(new PartitionInfo(topic, 2, null, null, null)); + return list; + } + + @Override + public Map metrics() { + return null; + } + + + public List getPending() { + return this.pendingCallbacks; + } + + public void flush() { + while (pendingCallbacks.size() > 0) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException("Unable to flush producer, task was interrupted"); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java new file mode 100644 index 0000000..1882a7e --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class JSONDeserializationSchemaTest { + @Test + public void testDeserialize() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode initialValue = mapper.createObjectNode(); + initialValue.put("key", 4).put("value", "world"); + byte[] serializedValue = mapper.writeValueAsBytes(initialValue); + + JSONDeserializationSchema schema = new JSONDeserializationSchema(); + ObjectNode deserializedValue = schema.deserialize(serializedValue); + + Assert.assertEquals(4, deserializedValue.get("key").asInt()); + Assert.assertEquals("world", deserializedValue.get("value").asText()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java new file mode 100644 index 0000000..86d3105 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class JSONKeyValueDeserializationSchemaTest { + @Test + public void testDeserializeWithoutMetadata() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode initialKey = mapper.createObjectNode(); + initialKey.put("index", 4); + byte[] serializedKey = mapper.writeValueAsBytes(initialKey); + + ObjectNode initialValue = mapper.createObjectNode(); + initialValue.put("word", "world"); + byte[] serializedValue = mapper.writeValueAsBytes(initialValue); + + JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false); + ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "", 0, 0); + + + Assert.assertTrue(deserializedValue.get("metadata") == null); + Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt()); + Assert.assertEquals("world", deserializedValue.get("value").get("word").asText()); + } + + @Test + public void testDeserializeWithMetadata() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode initialKey = mapper.createObjectNode(); + initialKey.put("index", 4); + byte[] serializedKey = mapper.writeValueAsBytes(initialKey); + + ObjectNode initialValue = mapper.createObjectNode(); + initialValue.put("word", "world"); + byte[] serializedValue = mapper.writeValueAsBytes(initialValue); + + JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(true); + ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "topic#1", 3, 4); + + Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt()); + Assert.assertEquals("world", deserializedValue.get("value").get("word").asText()); + Assert.assertEquals("topic#1", deserializedValue.get("metadata").get("topic").asText()); + Assert.assertEquals(4, deserializedValue.get("metadata").get("offset").asInt()); + Assert.assertEquals(3, deserializedValue.get("metadata").get("partition").asInt()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java new file mode 100644 index 0000000..68225e2 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class JsonRowDeserializationSchemaTest { + + /** + * Tests simple deserialization. + */ + @Test + public void testDeserialization() throws Exception { + long id = 1238123899121L; + String name = "asdlkjasjkdla998y1122"; + byte[] bytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(bytes); + + ObjectMapper objectMapper = new ObjectMapper(); + + // Root + ObjectNode root = objectMapper.createObjectNode(); + root.put("id", id); + root.put("name", name); + root.put("bytes", bytes); + + byte[] serializedJson = objectMapper.writeValueAsBytes(root); + + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema( + new String[] { "id", "name", "bytes" }, + new Class[] { Long.class, String.class, byte[].class }); + + Row deserialized = deserializationSchema.deserialize(serializedJson); + + assertEquals(3, deserialized.productArity()); + assertEquals(id, deserialized.productElement(0)); + assertEquals(name, deserialized.productElement(1)); + assertArrayEquals(bytes, (byte[]) deserialized.productElement(2)); + } + + /** + * Tests deserialization with non-existing field name. + */ + @Test + public void testMissingNode() throws Exception { + ObjectMapper objectMapper = new ObjectMapper(); + + // Root + ObjectNode root = objectMapper.createObjectNode(); + root.put("id", 123123123); + byte[] serializedJson = objectMapper.writeValueAsBytes(root); + + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema( + new String[] { "name" }, + new Class[] { String.class }); + + Row row = deserializationSchema.deserialize(serializedJson); + + assertEquals(1, row.productArity()); + assertNull("Missing field not null", row.productElement(0)); + + deserializationSchema.setFailOnMissingField(true); + + try { + deserializationSchema.deserialize(serializedJson); + fail("Did not throw expected Exception"); + } catch (IOException e) { + assertTrue(e.getCause() instanceof IllegalStateException); + } + } + + /** + * Tests that number of field names and types has to match. + */ + @Test + public void testNumberOfFieldNamesAndTypesMismatch() throws Exception { + try { + new JsonRowDeserializationSchema( + new String[] { "one", "two", "three" }, + new Class[] { Long.class }); + fail("Did not throw expected Exception"); + } catch (IllegalArgumentException ignored) { + // Expected + } + + try { + new JsonRowDeserializationSchema( + new String[] { "one" }, + new Class[] { Long.class, String.class }); + fail("Did not throw expected Exception"); + } catch (IllegalArgumentException ignored) { + // Expected + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java new file mode 100644 index 0000000..92af15d --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class JsonRowSerializationSchemaTest { + @Test + public void testRowSerialization() throws IOException { + String[] fieldNames = new String[] {"f1", "f2", "f3"}; + Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, String.class}; + Row row = new Row(3); + row.setField(0, 1); + row.setField(1, true); + row.setField(2, "str"); + + Row resultRow = serializeAndDeserialize(fieldNames, fieldTypes, row); + assertEqualRows(row, resultRow); + } + + @Test + public void testSerializationOfTwoRows() throws IOException { + String[] fieldNames = new String[] {"f1", "f2", "f3"}; + Class[] fieldTypes = new Class[] {Integer.class, Boolean.class, String.class}; + Row row1 = new Row(3); + row1.setField(0, 1); + row1.setField(1, true); + row1.setField(2, "str"); + + JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(fieldNames, fieldTypes); + + byte[] bytes = serializationSchema.serialize(row1); + assertEqualRows(row1, deserializationSchema.deserialize(bytes)); + + Row row2 = new Row(3); + row2.setField(0, 10); + row2.setField(1, false); + row2.setField(2, "newStr"); + + bytes = serializationSchema.serialize(row2); + assertEqualRows(row2, deserializationSchema.deserialize(bytes)); + } + + @Test(expected = NullPointerException.class) + public void testInputValidation() { + new JsonRowSerializationSchema(null); + } + + @Test(expected = IllegalStateException.class) + public void testSerializeRowWithInvalidNumberOfFields() { + String[] fieldNames = new String[] {"f1", "f2", "f3"}; + Row row = new Row(1); + row.setField(0, 1); + + JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); + serializationSchema.serialize(row); + } + + private Row serializeAndDeserialize(String[] fieldNames, Class[] fieldTypes, Row row) throws IOException { + JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(fieldNames, fieldTypes); + + byte[] bytes = serializationSchema.serialize(row); + return deserializationSchema.deserialize(bytes); + } + + private void assertEqualRows(Row expectedRow, Row resultRow) { + assertEquals("Deserialized row should have expected number of fields", + expectedRow.productArity(), resultRow.productArity()); + for (int i = 0; i < expectedRow.productArity(); i++) { + assertEquals(String.format("Field number %d should be as in the original row", i), + expectedRow.productElement(i), resultRow.productElement(i)); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java new file mode 100644 index 0000000..9beed22 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.*; + +/** + * Tests that the partition assignment is deterministic and stable. + */ +public class KafkaConsumerPartitionAssignmentTest { + + @Test + public void testPartitionsEqualConsumers() { + try { + List inPartitions = Arrays.asList( + new KafkaTopicPartition("test-topic", 4), + new KafkaTopicPartition("test-topic", 52), + new KafkaTopicPartition("test-topic", 17), + new KafkaTopicPartition("test-topic", 1)); + + for (int i = 0; i < inPartitions.size(); i++) { + List parts = + FlinkKafkaConsumerBase.assignPartitions(inPartitions, inPartitions.size(), i); + + assertNotNull(parts); + assertEquals(1, parts.size()); + assertTrue(contains(inPartitions, parts.get(0).getPartition())); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private boolean contains(List inPartitions, int partition) { + for (KafkaTopicPartition ktp : inPartitions) { + if (ktp.getPartition() == partition) { + return true; + } + } + return false; + } + + @Test + public void testMultiplePartitionsPerConsumers() { + try { + final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14}; + + final List partitions = new ArrayList<>(); + final Set allPartitions = new HashSet<>(); + + for (int p : partitionIDs) { + KafkaTopicPartition part = new KafkaTopicPartition("test-topic", p); + partitions.add(part); + allPartitions.add(part); + } + + final int numConsumers = 3; + final int minPartitionsPerConsumer = partitions.size() / numConsumers; + final int maxPartitionsPerConsumer = partitions.size() / numConsumers + 1; + + for (int i = 0; i < numConsumers; i++) { + List parts = + FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i); + + assertNotNull(parts); + assertTrue(parts.size() >= minPartitionsPerConsumer); + assertTrue(parts.size() <= maxPartitionsPerConsumer); + + for (KafkaTopicPartition p : parts) { + // check that the element was actually contained + assertTrue(allPartitions.remove(p)); + } + } + + // all partitions must have been assigned + assertTrue(allPartitions.isEmpty()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testPartitionsFewerThanConsumers() { + try { + List inPartitions = Arrays.asList( + new KafkaTopicPartition("test-topic", 4), + new KafkaTopicPartition("test-topic", 52), + new KafkaTopicPartition("test-topic", 17), + new KafkaTopicPartition("test-topic", 1)); + + final Set allPartitions = new HashSet<>(); + allPartitions.addAll(inPartitions); + + final int numConsumers = 2 * inPartitions.size() + 3; + + for (int i = 0; i < numConsumers; i++) { + List parts = FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i); + + assertNotNull(parts); + assertTrue(parts.size() <= 1); + + for (KafkaTopicPartition p : parts) { + // check that the element was actually contained + assertTrue(allPartitions.remove(p)); + } + } + + // all partitions must have been assigned + assertTrue(allPartitions.isEmpty()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testAssignEmptyPartitions() { + try { + List ep = new ArrayList<>(); + List parts1 = FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2); + assertNotNull(parts1); + assertTrue(parts1.isEmpty()); + + List parts2 = FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0); + assertNotNull(parts2); + assertTrue(parts2.isEmpty()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testGrowingPartitionsRemainsStable() { + try { + final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14}; + List newPartitions = new ArrayList<>(); + + for (int p : newPartitionIDs) { + KafkaTopicPartition part = new KafkaTopicPartition("test-topic", p); + newPartitions.add(part); + } + + List initialPartitions = newPartitions.subList(0, 7); + + final Set allNewPartitions = new HashSet<>(newPartitions); + final Set allInitialPartitions = new HashSet<>(initialPartitions); + + final int numConsumers = 3; + final int minInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers; + final int maxInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers + 1; + final int minNewPartitionsPerConsumer = newPartitions.size() / numConsumers; + final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1; + + List parts1 = FlinkKafkaConsumerBase.assignPartitions( + initialPartitions, numConsumers, 0); + List parts2 = FlinkKafkaConsumerBase.assignPartitions( + initialPartitions, numConsumers, 1); + List parts3 = FlinkKafkaConsumerBase.assignPartitions( + initialPartitions, numConsumers, 2); + + assertNotNull(parts1); + assertNotNull(parts2); + assertNotNull(parts3); + + assertTrue(parts1.size() >= minInitialPartitionsPerConsumer); + assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer); + assertTrue(parts2.size() >= minInitialPartitionsPerConsumer); + assertTrue(parts2.size() <= maxInitialPartitionsPerConsumer); + assertTrue(parts3.size() >= minInitialPartitionsPerConsumer); + assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer); + + for (KafkaTopicPartition p : parts1) { + // check that the element was actually contained + assertTrue(allInitialPartitions.remove(p)); + } + for (KafkaTopicPartition p : parts2) { + // check that the element was actually contained + assertTrue(allInitialPartitions.remove(p)); + } + for (KafkaTopicPartition p : parts3) { + // check that the element was actually contained + assertTrue(allInitialPartitions.remove(p)); + } + + // all partitions must have been assigned + assertTrue(allInitialPartitions.isEmpty()); + + // grow the set of partitions and distribute anew + + List parts1new = FlinkKafkaConsumerBase.assignPartitions( + newPartitions, numConsumers, 0); + List parts2new = FlinkKafkaConsumerBase.assignPartitions( + newPartitions, numConsumers, 1); + List parts3new = FlinkKafkaConsumerBase.assignPartitions( + newPartitions, numConsumers, 2); + + // new partitions must include all old partitions + + assertTrue(parts1new.size() > parts1.size()); + assertTrue(parts2new.size() > parts2.size()); + assertTrue(parts3new.size() > parts3.size()); + + assertTrue(parts1new.containsAll(parts1)); + assertTrue(parts2new.containsAll(parts2)); + assertTrue(parts3new.containsAll(parts3)); + + assertTrue(parts1new.size() >= minNewPartitionsPerConsumer); + assertTrue(parts1new.size() <= maxNewPartitionsPerConsumer); + assertTrue(parts2new.size() >= minNewPartitionsPerConsumer); + assertTrue(parts2new.size() <= maxNewPartitionsPerConsumer); + assertTrue(parts3new.size() >= minNewPartitionsPerConsumer); + assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer); + + for (KafkaTopicPartition p : parts1new) { + // check that the element was actually contained + assertTrue(allNewPartitions.remove(p)); + } + for (KafkaTopicPartition p : parts2new) { + // check that the element was actually contained + assertTrue(allNewPartitions.remove(p)); + } + for (KafkaTopicPartition p : parts3new) { + // check that the element was actually contained + assertTrue(allNewPartitions.remove(p)); + } + + // all partitions must have been assigned + assertTrue(allNewPartitions.isEmpty()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + +}