flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [04/27] flink git commit: [storm-compat] Added tests for Storm compatibility wrappers
Date Mon, 15 Jun 2015 09:32:54 GMT
[storm-compat] Added tests for Storm compatibility wrappers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45dbfdde
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45dbfdde
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45dbfdde

Branch: refs/heads/master
Commit: 45dbfddeaf55f2749d16a44caaea2f197bfa9b3f
Parents: 0a8b63a
Author: mjsax <mjsax@informatik.hu-berlin.de>
Authored: Thu May 14 12:28:59 2015 +0200
Committer: mbalassi <mbalassi@apache.org>
Committed: Sun Jun 14 22:59:00 2015 +0200

----------------------------------------------------------------------
 .../wrappers/FlinkDummyRichFunctionTest.java    |  41 +++
 .../wrappers/StormBoltWrapperTest.java          | 217 ++++++++++++
 .../wrappers/StormCollectorTest.java            | 125 +++++++
 .../wrappers/StormFiniteSpoutWrapperTest.java   | 111 ++++++
 .../wrappers/StormOutputFieldsDeclarerTest.java |  82 +++++
 .../wrappers/StormSpoutWrapperTest.java         |  68 ++++
 .../wrappers/StormTupleTest.java                | 353 +++++++++++++++++++
 .../wrappers/StormWrapperSetupHelperTest.java   | 149 ++++++++
 8 files changed, 1146 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/45dbfdde/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java
b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java
new file mode 100644
index 0000000..7ecc98c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.wrappers;
+
+import static org.mockito.Mockito.mock;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+
+
+
+public class FlinkDummyRichFunctionTest {
+	
+	@Test
+	public void testRuntimeContext() {
+		final FlinkDummyRichFunction dummy = new FlinkDummyRichFunction();
+		
+		final RuntimeContext context = mock(RuntimeContext.class);
+		dummy.setRuntimeContext(context);
+		
+		Assert.assertSame(context, dummy.getRuntimeContext());
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/45dbfdde/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
new file mode 100644
index 0000000..50ae763
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskContext;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.tuple.Fields;
+
+
+
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({StreamRecordSerializer.class, StormWrapperSetupHelper.class})
+public class StormBoltWrapperTest {
+	
+	@SuppressWarnings("unused")
+	@Test(expected = IllegalArgumentException.class)
+	public void testWrapperRawType() throws Exception {
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		declarer.declare(new Fields("dummy1", "dummy2"));
+		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		
+		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), true);
+	}
+	
+	@SuppressWarnings("unused")
+	@Test(expected = IllegalArgumentException.class)
+	public void testWrapperToManyAttributes1() throws Exception {
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		final String[] schema = new String[26];
+		for(int i = 0; i < schema.length; ++i) {
+			schema[i] = "a" + i;
+		}
+		declarer.declare(new Fields(schema));
+		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		
+		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class));
+	}
+	
+	@SuppressWarnings("unused")
+	@Test(expected = IllegalArgumentException.class)
+	public void testWrapperToManyAttributes2() throws Exception {
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		final String[] schema = new String[26];
+		for(int i = 0; i < schema.length; ++i) {
+			schema[i] = "a" + i;
+		}
+		declarer.declare(new Fields(schema));
+		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		
+		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), false);
+	}
+	
+	@Test
+	public void testWrapper() throws Exception {
+		for(int i = 0; i < 26; ++i) {
+			this.testWrapper(i);
+		}
+	}
+	
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	private void testWrapper(final int numberOfAttributes) throws Exception {
+		assert ((0 <= numberOfAttributes) && (numberOfAttributes <= 25));
+		
+		Tuple flinkTuple = null;
+		String rawTuple = null;
+		
+		if(numberOfAttributes == 0) {
+			rawTuple = new String("test");
+		} else {
+			flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
+		}
+		
+		String[] schema = new String[numberOfAttributes];
+		if(numberOfAttributes == 0) {
+			schema = new String[1];
+		}
+		for(int i = 0; i < schema.length; ++i) {
+			schema[i] = "a" + i;
+		}
+		
+		
+		final StreamRecord record = mock(StreamRecord.class);
+		if(numberOfAttributes == 0) {
+			when(record.getObject()).thenReturn(rawTuple);
+		} else {
+			when(record.getObject()).thenReturn(flinkTuple);
+		}
+		
+		final StreamRecordSerializer serializer = mock(StreamRecordSerializer.class);
+		when(serializer.createInstance()).thenReturn(record);
+		
+		final IndexedReaderIterator reader = mock(IndexedReaderIterator.class);
+		when(reader.next(record)).thenReturn(record).thenReturn(null);
+		
+		final StreamTaskContext taskContext = mock(StreamTaskContext.class);
+		when(taskContext.getInputSerializer(0)).thenReturn(serializer);
+		when(taskContext.getIndexedInput(0)).thenReturn(reader);
+		
+		
+		
+		final IRichBolt bolt = mock(IRichBolt.class);
+		
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		declarer.declare(new Fields(schema));
+		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		
+		final StormBoltWrapper wrapper = new StormBoltWrapper(bolt);
+		wrapper.setup(taskContext);
+		
+		
+		
+		wrapper.callUserFunction();
+		if(numberOfAttributes == 0) {
+			verify(bolt).execute(eq(new StormTuple<String>(rawTuple)));
+		} else {
+			verify(bolt).execute(eq(new StormTuple<Tuple>(flinkTuple)));
+		}
+		
+		
+		
+		wrapper.run();
+		if(numberOfAttributes == 0) {
+			verify(bolt, times(2)).execute(eq(new StormTuple<String>(rawTuple)));
+		} else {
+			verify(bolt, times(2)).execute(eq(new StormTuple<Tuple>(flinkTuple)));
+		}
+	}
+	
+	@Test
+	public void testOpen() throws Exception {
+		final IRichBolt bolt = mock(IRichBolt.class);
+		
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		declarer.declare(new Fields("dummy"));
+		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		
+		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object,
Object>(bolt);
+		wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+		
+		wrapper.open(mock(Configuration.class));
+		
+		verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
+	}
+	
+	@Test
+	public void testOpenSink() throws Exception {
+		final IRichBolt bolt = mock(IRichBolt.class);
+		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object,
Object>(bolt);
+		wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+		
+		wrapper.open(mock(Configuration.class));
+		
+		verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNull(OutputCollector.class));
+	}
+	
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testClose() throws Exception {
+		final IRichBolt bolt = mock(IRichBolt.class);
+		
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		declarer.declare(new Fields("dummy"));
+		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		
+		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object,
Object>(bolt);
+		
+		final StreamTaskContext<Object> taskContext = mock(StreamTaskContext.class);
+		when(taskContext.getOutputCollector()).thenReturn(mock(Collector.class));
+		wrapper.setup(taskContext);
+		
+		wrapper.close();
+		verify(bolt).cleanup();
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/45dbfdde/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java
b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java
new file mode 100644
index 0000000..660fcea
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import backtype.storm.tuple.Values;
+
+
+
+
+
+public class StormCollectorTest extends AbstractTest {
+	
+	@Test
+	public void testSpoutStormCollector() throws InstantiationException, IllegalAccessException
{
+		for(int i = 0; i < 26; ++i) {
+			this.testStromCollector(true, i);
+		}
+	}
+	
+	@Test
+	public void testBoltStormCollector() throws InstantiationException, IllegalAccessException
{
+		for(int i = 0; i < 26; ++i) {
+			this.testStromCollector(false, i);
+		}
+	}
+	
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	private void testStromCollector(final boolean spoutTest, final int numberOfAttributes)
+		throws InstantiationException, IllegalAccessException {
+		assert ((0 <= numberOfAttributes) && (numberOfAttributes <= 25));
+		
+		final Collector flinkCollector = mock(Collector.class);
+		Tuple flinkTuple = null;
+		final Values tuple = new Values();
+		
+		StormCollector<?> collector = null;
+		
+		if(numberOfAttributes == 0) {
+			collector = new StormCollector(numberOfAttributes, flinkCollector);
+			tuple.add(new Integer(this.r.nextInt()));
+			
+		} else {
+			collector = new StormCollector(numberOfAttributes, flinkCollector);
+			flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
+			
+			for(int i = 0; i < numberOfAttributes; ++i) {
+				tuple.add(new Integer(this.r.nextInt()));
+				flinkTuple.setField(tuple.get(i), i);
+			}
+		}
+		
+		final String streamId = "streamId";
+		final Collection anchors = mock(Collection.class);
+		final List<Integer> taskIds;
+		final Object messageId = new Integer(this.r.nextInt());
+		if(spoutTest) {
+			taskIds = collector.emit(streamId, tuple, messageId);
+		} else {
+			taskIds = collector.emit(streamId, anchors, tuple);
+		}
+		
+		Assert.assertNull(taskIds);
+		
+		if(numberOfAttributes == 0) {
+			verify(flinkCollector).collect(tuple.get(0));
+		} else {
+			verify(flinkCollector).collect(flinkTuple);
+		}
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testReportError() {
+		new StormCollector<Object>(1, null).reportError(null);
+	}
+	
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	@Test(expected = UnsupportedOperationException.class)
+	public void testBoltEmitDirect() {
+		new StormCollector<Object>(1, null).emitDirect(0, (String)null, (Collection)null,
(List)null);
+	}
+	
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	@Test(expected = UnsupportedOperationException.class)
+	public void testSpoutEmitDirect() {
+		new StormCollector<Object>(1, null).emitDirect(0, (String)null, (List)null, (Object)null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testAck() {
+		new StormCollector<Object>(1, null).ack(null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testFail() {
+		new StormCollector<Object>(1, null).fail(null);
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/45dbfdde/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
new file mode 100644
index 0000000..3c4f5aa
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.wrappers;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.LinkedList;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.tuple.Fields;
+
+
+
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(StormWrapperSetupHelper.class)
+public class StormFiniteSpoutWrapperTest extends AbstractTest {
+	
+	@Test
+	public void testRunExecuteFixedNumber() throws Exception {
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		declarer.declare(new Fields("dummy"));
+		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		
+		final IRichSpout spout = mock(IRichSpout.class);
+		final int numberOfCalls = this.r.nextInt(50);
+		final StormFiniteSpoutWrapper<?> spoutWrapper = new StormFiniteSpoutWrapper<Object>(spout,
numberOfCalls);
+		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+		
+		spoutWrapper.run(null);
+		verify(spout, times(numberOfCalls)).nextTuple();
+	}
+	
+	@Test
+	public void testRunExecute() throws Exception {
+		final int numberOfCalls = this.r.nextInt(50);
+		
+		final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
+		for(int i = numberOfCalls - 1; i >= 0; --i) {
+			expectedResult.add(new Tuple1<Integer>(new Integer(i)));
+		}
+		
+		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
+		final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
+			spout);
+		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+		
+		final TestCollector collector = new TestCollector();
+		spoutWrapper.run(collector);
+		
+		Assert.assertEquals(expectedResult, collector.result);
+	}
+	
+	@Test
+	public void testCancel() throws Exception {
+		final int numberOfCalls = 5 + this.r.nextInt(5);
+		
+		final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
+		expectedResult.add(new Tuple1<Integer>(new Integer(numberOfCalls - 1)));
+		
+		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
+		final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
+			spout);
+		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+		
+		spoutWrapper.cancel();
+		final TestCollector collector = new TestCollector();
+		spoutWrapper.run(collector);
+		
+		Assert.assertEquals(expectedResult, collector.result);
+	}
+	
+	@Test
+	public void testClose() throws Exception {
+		final IRichSpout spout = mock(IRichSpout.class);
+		final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
+			spout);
+		
+		spoutWrapper.close();
+		
+		verify(spout).close();
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/45dbfdde/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
new file mode 100644
index 0000000..5a4c05b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import java.util.ArrayList;
+
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+
+
+
+
+public class StormOutputFieldsDeclarerTest extends AbstractTest {
+	
+	@Test
+	public void testDeclare() {
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		
+		Assert.assertEquals(-1, declarer.getNumberOfAttributes());
+		
+		final int numberOfAttributes = 1 + this.r.nextInt(25);
+		final ArrayList<String> schema = new ArrayList<String>(numberOfAttributes);
+		for(int i = 0; i < numberOfAttributes; ++i) {
+			schema.add("a" + i);
+		}
+		declarer.declare(new Fields(schema));
+		Assert.assertEquals(numberOfAttributes, declarer.getNumberOfAttributes());
+	}
+	
+	public void testDeclareDirect() {
+		new StormOutputFieldsDeclarer().declare(false, null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDeclareDirectFail() {
+		new StormOutputFieldsDeclarer().declare(true, null);
+	}
+	
+	public void testDeclareStream() {
+		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDeclareStreamFail() {
+		new StormOutputFieldsDeclarer().declareStream(null, null);
+	}
+	
+	public void testDeclareFullStream() {
+		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDeclareFullStreamFailNonDefaultStream() {
+		new StormOutputFieldsDeclarer().declareStream(null, false, null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDeclareFullStreamFailDirect() {
+		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/45dbfdde/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
new file mode 100644
index 0000000..88d855c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.wrappers;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.util.LinkedList;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import backtype.storm.topology.IRichSpout;
+
+
+
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(StormWrapperSetupHelper.class)
+public class StormSpoutWrapperTest extends AbstractTest {
+	
+	@Test
+	public void testRunExecuteCancelInfinite() throws Exception {
+		final int numberOfCalls = 5 + this.r.nextInt(5);
+		
+		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
+		final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
+		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
+		
+		spoutWrapper.cancel();
+		final TestCollector collector = new TestCollector();
+		spoutWrapper.run(collector);
+		
+		Assert.assertEquals(new LinkedList<Tuple1<Integer>>(), collector.result);
+	}
+	
+	@Test
+	public void testClose() throws Exception {
+		final IRichSpout spout = mock(IRichSpout.class);
+		final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
+		
+		spoutWrapper.close();
+		
+		verify(spout).close();
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/45dbfdde/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
new file mode 100644
index 0000000..133407a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormTupleTest.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+
+
+
+public class StormTupleTest extends AbstractTest {
+	
+	@Test
+	public void nonTupleTest() {
+		final Object flinkTuple = new Integer(this.r.nextInt());
+		
+		final StormTuple<Object> tuple = new StormTuple<Object>(flinkTuple);
+		Assert.assertSame(flinkTuple, tuple.getValue(0));
+		
+		final List<Object> values = tuple.getValues();
+		Assert.assertEquals(1, values.size());
+		Assert.assertEquals(flinkTuple, values.get(0));
+	}
+	
+	@Test
+	public void tupleTest() throws InstantiationException, IllegalAccessException {
+		final int numberOfAttributes = 1 + this.r.nextInt(25);
+		final Object[] data = new Object[numberOfAttributes];
+		
+		final Tuple flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
+		for(int i = 0; i < numberOfAttributes; ++i) {
+			data[i] = new Integer(this.r.nextInt());
+			flinkTuple.setField(data[i], i);
+		}
+		
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		final List<Object> values = tuple.getValues();
+		
+		Assert.assertEquals(numberOfAttributes, values.size());
+		for(int i = 0; i < numberOfAttributes; ++i) {
+			Assert.assertEquals(flinkTuple.getField(i), values.get(i));
+		}
+		
+		Assert.assertEquals(numberOfAttributes, tuple.size());
+	}
+	
+	@Test
+	public void testBinary() {
+		final byte[] data = new byte[this.r.nextInt(15)];
+		this.r.nextBytes(data);
+		
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+		
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getBinary(index));
+	}
+	
+	@Test
+	public void testBoolean() {
+		final Boolean flinkTuple = new Boolean(this.r.nextBoolean());
+		
+		final StormTuple<Boolean> tuple = new StormTuple<Boolean>(flinkTuple);
+		Assert.assertEquals(flinkTuple, tuple.getBoolean(0));
+	}
+	
+	@Test
+	public void testByte() {
+		final Byte flinkTuple = new Byte((byte)this.r.nextInt());
+		
+		final StormTuple<Byte> tuple = new StormTuple<Byte>(flinkTuple);
+		Assert.assertEquals(flinkTuple, tuple.getByte(0));
+	}
+	
+	@Test
+	public void testDouble() {
+		final Double flinkTuple = new Double(this.r.nextDouble());
+		
+		final StormTuple<Double> tuple = new StormTuple<Double>(flinkTuple);
+		Assert.assertEquals(flinkTuple, tuple.getDouble(0));
+	}
+	
+	@Test
+	public void testFloat() {
+		final Float flinkTuple = new Float(this.r.nextFloat());
+		
+		final StormTuple<Float> tuple = new StormTuple<Float>(flinkTuple);
+		Assert.assertEquals(flinkTuple, tuple.getFloat(0));
+	}
+	
+	@Test
+	public void testInteger() {
+		final Integer flinkTuple = new Integer(this.r.nextInt());
+		
+		final StormTuple<Integer> tuple = new StormTuple<Integer>(flinkTuple);
+		Assert.assertEquals(flinkTuple, tuple.getInteger(0));
+	}
+	
+	@Test
+	public void testLong() {
+		final Long flinkTuple = new Long(this.r.nextInt());
+		
+		final StormTuple<Long> tuple = new StormTuple<Long>(flinkTuple);
+		Assert.assertEquals(flinkTuple, tuple.getLong(0));
+	}
+	
+	@Test
+	public void testShort() {
+		final Short flinkTuple = new Short((short)this.r.nextInt());
+		
+		final StormTuple<Short> tuple = new StormTuple<Short>(flinkTuple);
+		Assert.assertEquals(flinkTuple, tuple.getShort(0));
+	}
+	
+	@Test
+	public void testString() {
+		final byte[] data = new byte[this.r.nextInt(15)];
+		this.r.nextBytes(data);
+		final String flinkTuple = new String(data);
+		
+		final StormTuple<String> tuple = new StormTuple<String>(flinkTuple);
+		Assert.assertEquals(flinkTuple, tuple.getString(0));
+	}
+	
+	@Test
+	public void testBinaryTuple() {
+		final byte[] data = new byte[this.r.nextInt(15)];
+		this.r.nextBytes(data);
+		
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+		
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getBinary(index));
+	}
+	
+	@Test
+	public void testBooleanTuple() {
+		final Boolean data = new Boolean(this.r.nextBoolean());
+		
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+		
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getBoolean(index));
+	}
+	
+	@Test
+	public void testByteTuple() {
+		final Byte data = new Byte((byte)this.r.nextInt());
+		
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+		
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getByte(index));
+	}
+	
+	@Test
+	public void testDoubleTuple() {
+		final Double data = new Double(this.r.nextDouble());
+		
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+		
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getDouble(index));
+	}
+	
+	@Test
+	public void testFloatTuple() {
+		final Float data = new Float(this.r.nextFloat());
+		
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+		
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getFloat(index));
+	}
+	
+	@Test
+	public void testIntegerTuple() {
+		final Integer data = new Integer(this.r.nextInt());
+		
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+		
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getInteger(index));
+	}
+	
+	@Test
+	public void testLongTuple() {
+		final Long data = new Long(this.r.nextInt());
+		
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+		
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getLong(index));
+	}
+	
+	@Test
+	public void testShortTuple() {
+		final Short data = new Short((short)this.r.nextInt());
+		
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+		
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getShort(index));
+	}
+	
+	@Test
+	public void testStringTuple() {
+		final byte[] rawdata = new byte[this.r.nextInt(15)];
+		this.r.nextBytes(rawdata);
+		final String data = new String(rawdata);
+		
+		final int index = this.r.nextInt(5);
+		final Tuple flinkTuple = new Tuple5<Object, Object, Object, Object, Object>();
+		flinkTuple.setField(data, index);
+		
+		final StormTuple<Tuple> tuple = new StormTuple<Tuple>(flinkTuple);
+		Assert.assertEquals(flinkTuple.getField(index), tuple.getString(index));
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testContains() {
+		new StormTuple<Object>(null).contains(null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetFields() {
+		new StormTuple<Object>(null).getFields();
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testFieldIndex() {
+		new StormTuple<Object>(null).fieldIndex(null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testSelect() {
+		new StormTuple<Object>(null).select(null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetValueByField() {
+		new StormTuple<Object>(null).getValueByField(null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetStringByField() {
+		new StormTuple<Object>(null).getStringByField(null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetIntegerByField() {
+		new StormTuple<Object>(null).getIntegerByField(null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetLongByField() {
+		new StormTuple<Object>(null).getLongByField(null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetBooleanByField() {
+		new StormTuple<Object>(null).getBooleanByField(null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetShortByField() {
+		new StormTuple<Object>(null).getShortByField(null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetByteByField() {
+		new StormTuple<Object>(null).getByteByField(null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetDoubleByField() {
+		new StormTuple<Object>(null).getDoubleByField(null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetFloatByField() {
+		new StormTuple<Object>(null).getFloatByField(null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetBinaryByField() {
+		new StormTuple<Object>(null).getBinaryByField(null);
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetSourceGlobalStreamid() {
+		new StormTuple<Object>(null).getSourceGlobalStreamid();
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetSourceComponent() {
+		new StormTuple<Object>(null).getSourceComponent();
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetSourceTask() {
+		new StormTuple<Object>(null).getSourceTask();
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetSourceStreamId() {
+		new StormTuple<Object>(null).getSourceStreamId();
+	}
+	
+	@Test(expected = UnsupportedOperationException.class)
+	public void testGetMessageId() {
+		new StormTuple<Object>(null).getMessageId();
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/45dbfdde/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
new file mode 100644
index 0000000..d1451bf
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import static org.mockito.Mockito.mock;
+
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import backtype.storm.topology.IComponent;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.tuple.Fields;
+
+
+
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(StormWrapperSetupHelper.class)
+public class StormWrapperSetupHelperTest extends AbstractTest {
+	
+	@Test(expected = IllegalArgumentException.class)
+	public void testZeroAttributesDeclarerBolt() throws Exception {
+		IComponent boltOrSpout;
+		
+		
+		if(this.r.nextBoolean()) {
+			boltOrSpout = mock(IRichSpout.class);
+		} else {
+			boltOrSpout = mock(IRichBolt.class);
+		}
+		
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		declarer.declare(new Fields());
+		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		
+		StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, this.r.nextBoolean());
+	}
+	
+	@Test
+	public void testEmptyDeclarerBolt() {
+		IComponent boltOrSpout;
+		
+		if(this.r.nextBoolean()) {
+			boltOrSpout = mock(IRichSpout.class);
+		} else {
+			boltOrSpout = mock(IRichBolt.class);
+		}
+		
+		Assert.assertEquals(-1, StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, this.r.nextBoolean()));
+	}
+	
+	
+	@Test(expected = IllegalArgumentException.class)
+	public void testRawType() throws Exception {
+		IComponent boltOrSpout;
+		
+		if(this.r.nextBoolean()) {
+			boltOrSpout = mock(IRichSpout.class);
+		} else {
+			boltOrSpout = mock(IRichBolt.class);
+		}
+		
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		declarer.declare(new Fields("dummy1", "dummy2"));
+		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		
+		StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, true);
+	}
+	
+	@Test(expected = IllegalArgumentException.class)
+	public void testToManyAttributes() throws Exception {
+		IComponent boltOrSpout;
+		
+		if(this.r.nextBoolean()) {
+			boltOrSpout = mock(IRichSpout.class);
+		} else {
+			boltOrSpout = mock(IRichBolt.class);
+		}
+		
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		final String[] schema = new String[26];
+		for(int i = 0; i < schema.length; ++i) {
+			schema[i] = "a" + i;
+		}
+		declarer.declare(new Fields(schema));
+		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		
+		StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, false);
+	}
+	
+	
+	@Test
+	public void testTupleTypes() throws Exception {
+		for(int i = 0; i < 26; ++i) {
+			this.testTupleTypes(i);
+		}
+	}
+	
+	private void testTupleTypes(final int numberOfAttributes) throws Exception {
+		String[] schema = new String[numberOfAttributes];
+		if(numberOfAttributes == 0) {
+			schema = new String[1];
+		}
+		for(int i = 0; i < schema.length; ++i) {
+			schema[i] = "a" + i;
+		}
+		
+		IComponent boltOrSpout;
+		if(this.r.nextBoolean()) {
+			boltOrSpout = mock(IRichSpout.class);
+		} else {
+			boltOrSpout = mock(IRichBolt.class);
+		}
+		
+		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
+		declarer.declare(new Fields(schema));
+		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		
+		StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, numberOfAttributes == 0);
+	}
+	
+	// @Test
+	// public void testConvertToTopologyContext() {
+	// Assert.fail();
+	// }
+	
+}


Mime
View raw message