flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [2/2] flink git commit: [FLINK-3248] add constructor params and generic ConnectionFactory
Date Sat, 20 Feb 2016 18:16:14 GMT
[FLINK-3248] add constructor params and generic ConnectionFactory

This adds more default constructor parameters to the RMQSource. In addition,
users may override the setupConnectionFactory() method to return their onwn
configured factory.

This closes #1670.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9bdee1b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9bdee1b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9bdee1b3

Branch: refs/heads/master
Commit: 9bdee1b355c6113db35ba7476f65deffe5e39ea8
Parents: 7f9ade4
Author: Maximilian Michels <mxm@apache.org>
Authored: Tue Feb 16 10:34:01 2016 +0100
Committer: Maximilian Michels <mxm@apache.org>
Committed: Sat Feb 20 19:13:37 2016 +0100

----------------------------------------------------------------------
 .../connectors/rabbitmq/RMQSource.java          | 79 ++++++++++++++--
 .../connectors/rabbitmq/RMQSourceTest.java      | 99 +++++++++++++++++---
 2 files changed, 157 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9bdee1b3/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 09bb07c..39f483b 100644
--- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -60,6 +60,9 @@ import org.slf4j.LoggerFactory;
  *    (correlation id is not set).
  * 3) No strong delivery guarantees (without checkpointing) with RabbitMQ auto-commit mode.
  *
+ * Users may overwrite the setupConnectionFactory() method to pass their setup their own
+ * ConnectionFactory in case the constructor parameters are not sufficient.
+ *
  * @param <OUT> The type of the data read from RabbitMQ.
  */
 public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT,
String, Long>
@@ -70,6 +73,9 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 	private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
 
 	private final String hostName;
+	private final Integer port;
+	private final String username;
+	private final String password;
 	private final String queueName;
 	private final boolean usesCorrelationId;
 	protected DeserializationSchema<OUT> schema;
@@ -82,7 +88,6 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 
 	private transient volatile boolean running;
 
-
 	/**
 	 * Creates a new RabbitMQ source with at-least-once message processing guarantee when
 	 * checkpointing is enabled. No strong delivery guarantees when checkpointing is disabled.
@@ -95,8 +100,8 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 	 *               				into Java objects.
 	 */
 	public RMQSource(String hostName, String queueName,
-					DeserializationSchema<OUT> deserializationSchema) {
-		this(hostName, queueName, false, deserializationSchema);
+				DeserializationSchema<OUT> deserializationSchema) {
+		this(hostName, null, null, null, queueName, false, deserializationSchema);
 	}
 
 	/**
@@ -104,7 +109,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 	 * at the producer. The correlation id must be unique. Otherwise the behavior of the source
is
 	 * undefined. In doubt, set {@param usesCorrelationId} to false. When correlation ids are
not
 	 * used, this source has at-least-once processing semantics when checkpointing is enabled.
-	 * @param hostName The RabbiMQ broker's address to connect to.
+	 * @param hostName The RabbitMQ broker's address to connect to.
 	 * @param queueName The queue to receive messages from.
 	 * @param usesCorrelationId Whether the messages received are supplied with a <b>unique</b>
 	 *                          id to deduplicate messages (in case of failed acknowledgments).
@@ -113,20 +118,80 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 	 *                              into Java objects.
 	 */
 	public RMQSource(String hostName, String queueName, boolean usesCorrelationId,
-			DeserializationSchema<OUT> deserializationSchema) {
+				DeserializationSchema<OUT> deserializationSchema) {
+		this(hostName, null, null, null, queueName, usesCorrelationId, deserializationSchema);
+	}
+
+	/**
+	 * Creates a new RabbitMQ source. For exactly-once, you must set the correlation ids of
messages
+	 * at the producer. The correlation id must be unique. Otherwise the behavior of the source
is
+	 * undefined. In doubt, set {@param usesCorrelationId} to false. When correlation ids are
not
+	 * used, this source has at-least-once processing semantics when checkpointing is enabled.
+	 * @param hostName The RabbitMQ broker's address to connect to.
+	 * @param port The RabbitMQ broker's port.
+	 * @param queueName The queue to receive messages from.
+	 * @param usesCorrelationId Whether the messages received are supplied with a <b>unique</b>
+	 *                          id to deduplicate messages (in case of failed acknowledgments).
+	 *                          Only used when checkpointing is enabled.
+	 * @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received
+	 *                              into Java objects.
+	 */
+	public RMQSource(String hostName, Integer port,
+				String queueName, boolean usesCorrelationId,
+				DeserializationSchema<OUT> deserializationSchema) {
+		this(hostName, port, null, null, queueName, usesCorrelationId, deserializationSchema);
+	}
+
+	/**
+	 * Creates a new RabbitMQ source. For exactly-once, you must set the correlation ids of
messages
+	 * at the producer. The correlation id must be unique. Otherwise the behavior of the source
is
+	 * undefined. In doubt, set {@param usesCorrelationId} to false. When correlation ids are
not
+	 * used, this source has at-least-once processing semantics when checkpointing is enabled.
+	 * @param hostName The RabbitMQ broker's address to connect to.
+	 * @param port The RabbitMQ broker's port.
+	 * @param queueName The queue to receive messages from.
+	 * @param usesCorrelationId Whether the messages received are supplied with a <b>unique</b>
+	 *                          id to deduplicate messages (in case of failed acknowledgments).
+	 *                          Only used when checkpointing is enabled.
+	 * @param deserializationSchema A {@link DeserializationSchema} for turning the bytes received
+	 *                              into Java objects.
+	 */
+	public RMQSource(String hostName, Integer port, String username, String password,
+				String queueName, boolean usesCorrelationId,
+				DeserializationSchema<OUT> deserializationSchema) {
 		super(String.class);
 		this.hostName = hostName;
+		this.port = port;
+		this.username = username;
+		this.password = password;
 		this.queueName = queueName;
 		this.usesCorrelationId = usesCorrelationId;
 		this.schema = deserializationSchema;
 	}
 
 	/**
+	 * Initializes the connection to RMQ with a default connection factory. The user may override
+	 * this method to setup and configure their own ConnectionFactory.
+	 */
+	protected ConnectionFactory setupConnectionFactory() {
+		return new ConnectionFactory();
+	}
+
+	/**
 	 * Initializes the connection to RMQ.
 	 */
-	protected void initializeConnection() {
-		ConnectionFactory factory = new ConnectionFactory();
+	private void initializeConnection() {
+		ConnectionFactory factory = setupConnectionFactory();
 		factory.setHost(hostName);
+		if (port != null) {
+			factory.setPort(port);
+		}
+		if (username != null) {
+			factory.setUsername(username);
+		}
+		if (password != null) {
+			factory.setPassword(password);
+		}
 		try {
 			connection = factory.newConnection();
 			channel = connection.createChannel();

http://git-wip-us.apache.org/repos/asf/flink/blob/9bdee1b3/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index e0eed70..21f185f 100644
--- a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -19,8 +19,10 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.QueueingConsumer;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -28,6 +30,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.SerializedCheckpointData;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.junit.After;
@@ -72,18 +75,18 @@ public class RMQSourceTest {
 
 	private volatile long messageId;
 
-	private boolean generateCorrelationIds = true;
+	private boolean generateCorrelationIds;
 
 	private volatile Exception exception;
 
 	@Before
 	public void beforeTest() throws Exception {
 
-		source = new RMQTestSource<>("hostDummy", "queueDummy", true, new StringDeserializationScheme());
+		source = new RMQTestSource();
 		source.open(config);
-		source.initializeConnection();
 
 		messageId = 0;
+		generateCorrelationIds = true;
 
 		sourceThread = new Thread(new Runnable() {
 			@Override
@@ -213,6 +216,57 @@ public class RMQSourceTest {
 		assertTrue(exception instanceof NullPointerException);
 	}
 
+	/**
+	 * Tests whether constructor params are passed correctly.
+	 */
+	@Test
+	public void testConstructorParams() {
+		// verify construction params
+		ConstructorTestClass testObj = new ConstructorTestClass(
+			"hostTest", 999, "userTest", "passTest",
+			"queueTest", false, new StringDeserializationScheme());
+
+		try {
+			testObj.open(new Configuration());
+		} catch (Exception e) {
+			// connection fails but check if args have been passed correctly
+		}
+
+		assertEquals("hostTest", testObj.getFactory().getHost());
+		assertEquals(999, testObj.getFactory().getPort());
+		assertEquals("userTest", testObj.getFactory().getUsername());
+		assertEquals("passTest", testObj.getFactory().getPassword());
+	}
+
+	private static class ConstructorTestClass extends RMQSource<String> {
+
+		private ConnectionFactory factory = Mockito.spy(new ConnectionFactory());
+
+		public ConstructorTestClass(String hostName, Integer port,
+				String username,
+				String password,
+				String queueName,
+				boolean usesCorrelationId,
+				DeserializationSchema<String> deserializationSchema) {
+			super(hostName, port, username, password,
+				queueName, usesCorrelationId, deserializationSchema);
+
+			try {
+				Mockito.doThrow(new RuntimeException()).when(factory).newConnection();
+			} catch (IOException e) {
+				fail("Failed to stub connection method");
+			}
+		}
+
+		@Override
+		protected ConnectionFactory setupConnectionFactory() {
+			return factory;
+		}
+
+		public ConnectionFactory getFactory() {
+			return factory;
+		}
+	}
 
 	private static class StringDeserializationScheme implements DeserializationSchema<String>
{
 
@@ -238,33 +292,32 @@ public class RMQSourceTest {
 		}
 	}
 
-	private class RMQTestSource<OUT> extends RMQSource<OUT> {
+	private class RMQTestSource extends RMQSource<String> {
 
-		public RMQTestSource(String hostName, String queueName, boolean usesCorrelationIds,
-							 DeserializationSchema<OUT> deserializationSchema) {
-			super(hostName, queueName, usesCorrelationIds, deserializationSchema);
+		public RMQTestSource() {
+			super("hostDummy", -1, "", "", "queueDummy", true, new StringDeserializationScheme());
 		}
 
 		@Override
-		protected void initializeConnection() {
-			connection = Mockito.mock(Connection.class);
-			channel = Mockito.mock(Channel.class);
+		public void open(Configuration config) throws Exception {
+			super.open(config);
+
 			consumer = Mockito.mock(QueueingConsumer.class);
 
 			// Mock for delivery
 			final QueueingConsumer.Delivery deliveryMock = Mockito.mock(QueueingConsumer.Delivery.class);
 			Mockito.when(deliveryMock.getBody()).thenReturn("test".getBytes());
 
-			// Mock for envelope
-			Envelope envelope = Mockito.mock(Envelope.class);
-			Mockito.when(deliveryMock.getEnvelope()).thenReturn(envelope);
-
 			try {
 				Mockito.when(consumer.nextDelivery()).thenReturn(deliveryMock);
 			} catch (InterruptedException e) {
 				fail("Couldn't setup up deliveryMock");
 			}
 
+			// Mock for envelope
+			Envelope envelope = Mockito.mock(Envelope.class);
+			Mockito.when(deliveryMock.getEnvelope()).thenReturn(envelope);
+
 			Mockito.when(envelope.getDeliveryTag()).thenAnswer(new Answer<Long>() {
 				@Override
 				public Long answer(InvocationOnMock invocation) throws Throwable {
@@ -286,6 +339,24 @@ public class RMQSourceTest {
 		}
 
 		@Override
+		protected ConnectionFactory setupConnectionFactory() {
+			ConnectionFactory connectionFactory = Mockito.mock(ConnectionFactory.class);
+			Connection connection = Mockito.mock(Connection.class);
+			try {
+				Mockito.when(connectionFactory.newConnection()).thenReturn(connection);
+				Mockito.when(connection.createChannel()).thenReturn(Mockito.mock(Channel.class));
+			} catch (IOException e) {
+				fail("Test environment couldn't be created.");
+			}
+			return connectionFactory;
+		}
+
+		@Override
+		public RuntimeContext getRuntimeContext() {
+			return Mockito.mock(StreamingRuntimeContext.class);
+		}
+
+		@Override
 		protected boolean addId(String uid) {
 			assertEquals(false, autoAck);
 			return super.addId(uid);


Mime
View raw message