flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject flink git commit: [FLINK-4053] Add tests for RMQ sink and check connection for null
Date Fri, 15 Jul 2016 10:18:35 GMT
Repository: flink
Updated Branches:
  refs/heads/master 45edafda5 -> a0c3b879b


[FLINK-4053] Add tests for RMQ sink and check connection for null

This closes #2128


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0c3b879
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0c3b879
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0c3b879

Branch: refs/heads/master
Commit: a0c3b879b25b5e935e9540422b50bbc031a20735
Parents: 45edafd
Author: Ivan Mushketyk <ivan.mushketik@gmail.com>
Authored: Fri Jun 17 21:45:25 2016 +0100
Committer: zentol <chesnay@apache.org>
Committed: Fri Jul 15 12:16:53 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/rabbitmq/RMQSink.java  |   5 +-
 .../connectors/rabbitmq/RMQSource.java          |   3 +
 .../connectors/rabbitmq/RMQSourceTest.java      |  18 +++
 .../connectors/rabbitmq/common/RMQSinkTest.java | 125 +++++++++++++++++++
 4 files changed, 150 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a0c3b879/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 6473164..be7e946 100644
--- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -47,7 +47,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 	private boolean logFailuresOnly = false;
 
 	/**
-	 * @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}.
+	 * @param rmqConnectionConfig The RabbitMQ connection configuration {@link RMQConnectionConfig}.
 	 * @param queueName The queue to publish messages to.
 	 * @param schema A {@link SerializationSchema} for turning the Java objects received into
bytes
      */
@@ -76,6 +76,9 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 		try {
 			connection = factory.newConnection();
 			channel = connection.createChannel();
+			if (channel == null) {
+				throw new RuntimeException("None of RabbitMQ channels are available");
+			}
 			channel.queueDeclare(queueName, false, false, false, null);
 		} catch (IOException e) {
 			throw new RuntimeException("Error while creating the channel", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/a0c3b879/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 0892d61..33cf52c 100644
--- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -148,6 +148,9 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 		try {
 			connection = factory.newConnection();
 			channel = connection.createChannel();
+			if (channel == null) {
+				throw new RuntimeException("None of RabbitMQ channels are available");
+			}
 			setupQueue();
 			consumer = new QueueingConsumer(channel);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a0c3b879/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 31128a9..b63c835 100644
--- a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -108,6 +108,24 @@ public class RMQSourceTest {
 	}
 
 	@Test
+	public void throwExceptionIfConnectionFactoryReturnNull() throws Exception {
+		RMQConnectionConfig connectionConfig = Mockito.mock(RMQConnectionConfig.class);
+		ConnectionFactory connectionFactory = Mockito.mock(ConnectionFactory.class);
+		Connection connection = Mockito.mock(Connection.class);
+		Mockito.when(connectionConfig.getConnectionFactory()).thenReturn(connectionFactory);
+		Mockito.when(connectionFactory.newConnection()).thenReturn(connection);
+		Mockito.when(connection.createChannel()).thenReturn(null);
+
+		RMQSource<String> rmqSource = new RMQSource<>(
+			connectionConfig, "queueDummy", true, new StringDeserializationScheme());
+		try {
+			rmqSource.open(new Configuration());
+		} catch (RuntimeException ex) {
+			assertEquals("None of RabbitMQ channels are available", ex.getMessage());
+		}
+	}
+
+	@Test
 	public void testCheckpointing() throws Exception {
 		source.autoAck = false;
 		sourceThread.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/a0c3b879/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
new file mode 100644
index 0000000..199cd1e
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+public class RMQSinkTest {
+
+	private static final String QUEUE_NAME = "queue";
+	private static final String MESSAGE_STR = "msg";
+	private static final byte[] MESSAGE = new byte[1];
+
+	private RMQConnectionConfig rmqConnectionConfig;
+	private ConnectionFactory connectionFactory;
+	private Connection connection;
+	private Channel channel;
+	private SerializationSchema<String> serializationSchema;
+
+
+	@Before
+	public void before() throws Exception {
+		serializationSchema = spy(new DummySerializationSchema());
+		rmqConnectionConfig = mock(RMQConnectionConfig.class);
+		connectionFactory = mock(ConnectionFactory.class);
+		connection = mock(Connection.class);
+		channel = mock(Channel.class);
+
+		when(rmqConnectionConfig.getConnectionFactory()).thenReturn(connectionFactory);
+		when(connectionFactory.newConnection()).thenReturn(connection);
+		when(connection.createChannel()).thenReturn(channel);
+	}
+
+	@Test
+	public void openCallDeclaresQueue() throws Exception {
+		createRMQSink();
+
+		verify(channel).queueDeclare(QUEUE_NAME, false, false, false, null);
+	}
+
+	@Test
+	public void throwExceptionIfChannelIsNull() throws Exception {
+		when(connection.createChannel()).thenReturn(null);
+		try {
+			createRMQSink();
+		} catch (RuntimeException ex) {
+			assertEquals("None of RabbitMQ channels are available", ex.getMessage());
+		}
+	}
+
+	private RMQSink<String> createRMQSink() throws Exception {
+		RMQSink rmqSink = new RMQSink<String>(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
+		rmqSink.open(new Configuration());
+		return rmqSink;
+	}
+
+	@Test
+	public void invokePublishBytesToQueue() throws Exception {
+		RMQSink<String> rmqSink = createRMQSink();
+
+		rmqSink.invoke(MESSAGE_STR);
+		verify(serializationSchema).serialize(MESSAGE_STR);
+		verify(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
+	}
+
+	@Test(expected = RuntimeException.class)
+	public void exceptionDuringPublishingIsNotIgnored() throws Exception {
+		RMQSink<String> rmqSink = createRMQSink();
+
+		doThrow(IOException.class).when(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
+		rmqSink.invoke("msg");
+	}
+
+	@Test
+	public void exceptionDuringPublishingIsIgnoredIfLogFailuresOnly() throws Exception {
+		RMQSink<String> rmqSink = createRMQSink();
+		rmqSink.setLogFailuresOnly(true);
+
+		doThrow(IOException.class).when(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
+		rmqSink.invoke("msg");
+	}
+
+	@Test
+	public void closeAllResources() throws Exception {
+		RMQSink<String> rmqSink = createRMQSink();
+
+		rmqSink.close();
+
+		verify(channel).close();
+		verify(connection).close();
+	}
+
+	private class DummySerializationSchema implements SerializationSchema<String> {
+		@Override
+		public byte[] serialize(String element) {
+			return MESSAGE;
+		}
+	}
+}


Mime
View raw message