flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [46/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package
Date Wed, 21 Oct 2015 09:04:02 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
deleted file mode 100644
index d511796..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.admin.AdminUtils;
-import kafka.common.KafkaException;
-import kafka.consumer.ConsumerConfig;
-import kafka.network.SocketServer;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-
-import org.I0Itec.zkclient.ZkClient;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.kafka.common.PartitionInfo;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.BindException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * The base for the Kafka tests. It brings up:
- * <ul>
- *     <li>A ZooKeeper mini cluster</li>
- *     <li>Three Kafka Brokers (mini clusters)</li>
- *     <li>A Flink mini cluster</li>
- * </ul>
- * 
- * <p>Code in this test is based on the following GitHub repository:
- * <a href="https://github.com/sakserv/hadoop-mini-clusters">
- *   https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed),
- * as per commit <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i></p>
- */
-@SuppressWarnings("serial")
-public abstract class KafkaTestBase extends TestLogger {
-
-	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class);
-	
-	protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
-
-	protected static String zookeeperConnectionString;
-
-	protected static File tmpZkDir;
-
-	protected static File tmpKafkaParent;
-
-	protected static TestingServer zookeeper;
-	protected static List<KafkaServer> brokers;
-	protected static String brokerConnectionStrings = "";
-
-	protected static ConsumerConfig standardCC;
-	protected static Properties standardProps;
-	
-	protected static ForkableFlinkMiniCluster flink;
-
-	protected static int flinkPort;
-
-	protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
-
-	protected static List<File> tmpKafkaDirs;
-
-	protected static String kafkaHost = "localhost";
-
-	// ------------------------------------------------------------------------
-	//  Setup and teardown of the mini clusters
-	// ------------------------------------------------------------------------
-	
-	@BeforeClass
-	public static void prepare() throws IOException {
-		LOG.info("-------------------------------------------------------------------------");
-		LOG.info("    Starting KafkaITCase ");
-		LOG.info("-------------------------------------------------------------------------");
-		
-		LOG.info("Starting KafkaITCase.prepare()");
-		
-		File tempDir = new File(System.getProperty("java.io.tmpdir"));
-		
-		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
-		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
-
-		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
-		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
-
-		tmpKafkaDirs = new ArrayList<>(NUMBER_OF_KAFKA_SERVERS);
-		for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
-			File tmpDir = new File(tmpKafkaParent, "server-" + i);
-			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
-			tmpKafkaDirs.add(tmpDir);
-		}
-		
-		zookeeper = null;
-		brokers = null;
-
-		try {
-			LOG.info("Starting Zookeeper");
-			zookeeper = new TestingServer(-1, tmpZkDir);
-			zookeeperConnectionString = zookeeper.getConnectString();
-
-			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<>(NUMBER_OF_KAFKA_SERVERS);
-			
-			for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
-				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i), kafkaHost, zookeeperConnectionString));
-				SocketServer socketServer = brokers.get(i).socketServer();
-				
-				String host = socketServer.host() == null ? "localhost" : socketServer.host();
-				brokerConnectionStrings += hostAndPortToUrlString(host, socketServer.port()) + ",";
-			}
-
-			LOG.info("ZK and KafkaServer started.");
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("Test setup failed: " + t.getMessage());
-		}
-
-		standardProps = new Properties();
-
-		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
-		standardProps.setProperty("bootstrap.servers", brokerConnectionStrings);
-		standardProps.setProperty("group.id", "flink-tests");
-		standardProps.setProperty("auto.commit.enable", "false");
-		standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default. Seems to be too small for travis.
-		standardProps.setProperty("zookeeper.connection.timeout.ms", "20000");
-		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning.
-		standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
-		
-		Properties consumerConfigProps = new Properties();
-		consumerConfigProps.putAll(standardProps);
-		consumerConfigProps.setProperty("auto.offset.reset", "smallest");
-		standardCC = new ConsumerConfig(consumerConfigProps);
-		
-		// start also a re-usable Flink mini cluster
-		
-		Configuration flinkConfig = new Configuration();
-		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
-		flinkConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 s");
-
-		flink = new ForkableFlinkMiniCluster(flinkConfig, false, StreamingMode.STREAMING);
-		flink.start();
-
-		flinkPort = flink.getLeaderRPCPort();
-	}
-
-	@AfterClass
-	public static void shutDownServices() {
-
-		LOG.info("-------------------------------------------------------------------------");
-		LOG.info("    Shut down KafkaITCase ");
-		LOG.info("-------------------------------------------------------------------------");
-
-		flinkPort = -1;
-		if (flink != null) {
-			flink.shutdown();
-		}
-		
-		for (KafkaServer broker : brokers) {
-			if (broker != null) {
-				broker.shutdown();
-			}
-		}
-		brokers.clear();
-		
-		if (zookeeper != null) {
-			try {
-				zookeeper.stop();
-			}
-			catch (Exception e) {
-				LOG.warn("ZK.stop() failed", e);
-			}
-			zookeeper = null;
-		}
-		
-		// clean up the temp spaces
-		
-		if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
-			try {
-				FileUtils.deleteDirectory(tmpKafkaParent);
-			}
-			catch (Exception e) {
-				// ignore
-			}
-		}
-		if (tmpZkDir != null && tmpZkDir.exists()) {
-			try {
-				FileUtils.deleteDirectory(tmpZkDir);
-			}
-			catch (Exception e) {
-				// ignore
-			}
-		}
-
-		LOG.info("-------------------------------------------------------------------------");
-		LOG.info("    KafkaITCase finished"); 
-		LOG.info("-------------------------------------------------------------------------");
-	}
-
-	/**
-	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
-	 */
-	protected static KafkaServer getKafkaServer(int brokerId, File tmpFolder,
-												String kafkaHost,
-												String zookeeperConnectionString) throws Exception {
-		Properties kafkaProperties = new Properties();
-
-		// properties have to be Strings
-		kafkaProperties.put("advertised.host.name", kafkaHost);
-		kafkaProperties.put("broker.id", Integer.toString(brokerId));
-		kafkaProperties.put("log.dir", tmpFolder.toString());
-		kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
-		kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024));
-		kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
-		
-		// for CI stability, increase zookeeper session timeout
-		kafkaProperties.put("zookeeper.session.timeout.ms", "20000");
-
-		final int numTries = 5;
-		
-		for (int i = 1; i <= numTries; i++) { 
-			int kafkaPort = NetUtils.getAvailablePort();
-			kafkaProperties.put("port", Integer.toString(kafkaPort));
-			KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
-
-			try {
-				KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
-				server.startup();
-				return server;
-			}
-			catch (KafkaException e) {
-				if (e.getCause() instanceof BindException) {
-					// port conflict, retry...
-					LOG.info("Port conflict when starting Kafka Broker. Retrying...");
-				}
-				else {
-					throw e;
-				}
-			}
-		}
-		
-		throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
-	}
-
-	// ------------------------------------------------------------------------
-	//  Execution utilities
-	// ------------------------------------------------------------------------
-	
-	protected ZkClient createZookeeperClient() {
-		return new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
-				standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
-	}
-	
-	protected static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
-		try {
-			see.execute(name);
-		}
-		catch (ProgramInvocationException | JobExecutionException root) {
-			Throwable cause = root.getCause();
-			
-			// search for nested SuccessExceptions
-			int depth = 0;
-			while (!(cause instanceof SuccessException)) {
-				if (cause == null || depth++ == 20) {
-					root.printStackTrace();
-					fail("Test failed: " + root.getMessage());
-				}
-				else {
-					cause = cause.getCause();
-				}
-			}
-		}
-	}
-
-	protected static void tryExecutePropagateExceptions(StreamExecutionEnvironment see, String name) throws Exception {
-		try {
-			see.execute(name);
-		}
-		catch (ProgramInvocationException | JobExecutionException root) {
-			Throwable cause = root.getCause();
-
-			// search for nested SuccessExceptions
-			int depth = 0;
-			while (!(cause instanceof SuccessException)) {
-				if (cause == null || depth++ == 20) {
-					throw root;
-				}
-				else {
-					cause = cause.getCause();
-				}
-			}
-		}
-	}
-	
-	
-
-	protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
-		
-		// create topic with one client
-		Properties topicConfig = new Properties();
-		LOG.info("Creating topic {}", topic);
-
-		ZkClient creator = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
-				standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
-		
-		AdminUtils.createTopic(creator, topic, numberOfPartitions, replicationFactor, topicConfig);
-		creator.close();
-		
-		// validate that the topic has been created
-		final long deadline = System.currentTimeMillis() + 30000;
-		do {
-			try {
-				Thread.sleep(100);
-			}
-			catch (InterruptedException e) {
-				// restore interrupted state
-			}
-			List<PartitionInfo> partitions = FlinkKafkaConsumer.getPartitionsForTopic(topic, standardProps);
-			if (partitions != null && partitions.size() > 0) {
-				return;
-			}
-		}
-		while (System.currentTimeMillis() < deadline);
-		fail ("Test topic could not be created");
-	}
-	
-	protected static void deleteTestTopic(String topic) {
-		LOG.info("Deleting topic {}", topic);
-
-		ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
-				standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
-
-		AdminUtils.deleteTopic(zk, topic);
-		
-		zk.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
deleted file mode 100644
index 75fdd46..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-
-public class TestFixedPartitioner {
-
-
-	/**
-	 * <pre>
-	 *   		Flink Sinks:		Kafka Partitions
-	 * 			1	---------------->	1
-	 * 			2   --------------/
-	 * 			3   -------------/
-	 * 			4	------------/
-	 * </pre>
-	 */
-	@Test
-	public void testMoreFlinkThanBrokers() {
-		FixedPartitioner part = new FixedPartitioner();
-
-		int[] partitions = new int[]{0};
-
-		part.open(0, 4, partitions);
-		Assert.assertEquals(0, part.partition("abc1", partitions.length));
-
-		part.open(1, 4, partitions);
-		Assert.assertEquals(0, part.partition("abc2", partitions.length));
-
-		part.open(2, 4, partitions);
-		Assert.assertEquals(0, part.partition("abc3", partitions.length));
-		Assert.assertEquals(0, part.partition("abc3", partitions.length)); // check if it is changing ;)
-
-		part.open(3, 4, partitions);
-		Assert.assertEquals(0, part.partition("abc4", partitions.length));
-	}
-
-	/**
-	 *
-	 * <pre>
-	 * 		Flink Sinks:		Kafka Partitions
-	 * 			1	---------------->	1
-	 * 			2	---------------->	2
-	 * 									3
-	 * 									4
-	 * 									5
-	 *
-	 * </pre>
-	 */
-	@Test
-	public void testFewerPartitions() {
-		FixedPartitioner part = new FixedPartitioner();
-
-		int[] partitions = new int[]{0, 1, 2, 3, 4};
-		part.open(0, 2, partitions);
-		Assert.assertEquals(0, part.partition("abc1", partitions.length));
-		Assert.assertEquals(0, part.partition("abc1", partitions.length));
-
-		part.open(1, 2, partitions);
-		Assert.assertEquals(1, part.partition("abc1", partitions.length));
-		Assert.assertEquals(1, part.partition("abc1", partitions.length));
-	}
-
-	/*
-	 * 		Flink Sinks:		Kafka Partitions
-	 * 			1	------------>--->	1
-	 * 			2	-----------/----> 	2
-	 * 			3	----------/
-	 */
-	@Test
-	public void testMixedCase() {
-		FixedPartitioner part = new FixedPartitioner();
-		int[] partitions = new int[]{0,1};
-
-		part.open(0, 3, partitions);
-		Assert.assertEquals(0, part.partition("abc1", partitions.length));
-
-		part.open(1, 3, partitions);
-		Assert.assertEquals(1, part.partition("abc1", partitions.length));
-
-		part.open(2, 3, partitions);
-		Assert.assertEquals(0, part.partition("abc1", partitions.length));
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
deleted file mode 100644
index 27ad2e8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.admin.AdminUtils;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
-
-import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import org.junit.Test;
-
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class ZookeeperOffsetHandlerTest extends KafkaTestBase {
-	
-	@Test
-	public void runOffsetManipulationinZooKeeperTest() {
-		try {
-			final String topicName = "ZookeeperOffsetHandlerTest-Topic";
-			final String groupId = "ZookeeperOffsetHandlerTest-Group";
-			
-			final long offset = (long) (Math.random() * Long.MAX_VALUE);
-
-			ZkClient zkClient = createZookeeperClient();
-			AdminUtils.createTopic(zkClient, topicName, 3, 2, new Properties());
-				
-			ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, groupId, topicName, 0, offset);
-	
-			long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, groupId, topicName, 0);
-
-			zkClient.close();
-			
-			assertEquals(offset, fetchedOffset);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
deleted file mode 100644
index 32377ae..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-
-import java.util.Random;
-
-@SuppressWarnings("serial")
-public class DataGenerators {
-	
-	public static void generateLongStringTupleSequence(StreamExecutionEnvironment env,
-														String brokerConnection, String topic,
-														int numPartitions,
-														final int from, final int to) throws Exception {
-
-		TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
-		env.setParallelism(numPartitions);
-		env.getConfig().disableSysoutLogging();
-		env.setNumberOfExecutionRetries(0);
-		
-		DataStream<Tuple2<Integer, Integer>> stream =env.addSource(
-				new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
-
-					private volatile boolean running = true;
-
-					@Override
-					public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
-						int cnt = from;
-						int partition = getRuntimeContext().getIndexOfThisSubtask();
-
-						while (running && cnt <= to) {
-							ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
-							cnt++;
-						}
-					}
-
-					@Override
-					public void cancel() {
-						running = false;
-					}
-				});
-
-		stream.addSink(new FlinkKafkaProducer<>(topic,
-				new TypeInformationSerializationSchema<>(resultType, env.getConfig()),
-				FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnection),
-				new Tuple2Partitioner(numPartitions)
-		));
-
-		env.execute("Data generator (Int, Int) stream to topic " + topic);
-	}
-
-	// ------------------------------------------------------------------------
-	
-	public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
-															String brokerConnection, String topic,
-															final int numPartitions,
-															final int numElements,
-															final boolean randomizeOrder) throws Exception {
-		env.setParallelism(numPartitions);
-		env.getConfig().disableSysoutLogging();
-		env.setNumberOfExecutionRetries(0);
-
-		DataStream<Integer> stream = env.addSource(
-				new RichParallelSourceFunction<Integer>() {
-
-					private volatile boolean running = true;
-
-					@Override
-					public void run(SourceContext<Integer> ctx) {
-						// create a sequence
-						int[] elements = new int[numElements];
-						for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask();
-								i < numElements;
-								i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
-							
-							elements[i] = val;
-						}
-
-						// scramble the sequence
-						if (randomizeOrder) {
-							Random rnd = new Random();
-							for (int i = 0; i < elements.length; i++) {
-								int otherPos = rnd.nextInt(elements.length);
-								
-								int tmp = elements[i];
-								elements[i] = elements[otherPos];
-								elements[otherPos] = tmp;
-							}
-						}
-
-						// emit the sequence
-						int pos = 0;
-						while (running && pos < elements.length) {
-							ctx.collect(elements[pos++]);
-						}
-					}
-
-					@Override
-					public void cancel() {
-						running = false;
-					}
-				});
-
-		stream
-				.rebalance()
-				.addSink(new FlinkKafkaProducer<>(topic,
-						new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig()),
-						FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnection),
-						new KafkaPartitioner() {
-							@Override
-							public int partition(Object key, int numPartitions) {
-								return ((Integer) key) % numPartitions;
-							}
-						}));
-
-		env.execute("Scrambles int sequence generator");
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	public static class InfiniteStringsGenerator extends Thread {
-
-		private final String kafkaConnectionString;
-		
-		private final String topic;
-		
-		private volatile Throwable error;
-		
-		private volatile boolean running = true;
-
-		
-		public InfiniteStringsGenerator(String kafkaConnectionString, String topic) {
-			this.kafkaConnectionString = kafkaConnectionString;
-			this.topic = topic;
-		}
-
-		@Override
-		public void run() {
-			// we manually feed data into the Kafka sink
-			FlinkKafkaProducer<String> producer = null;
-			try {
-				producer = new FlinkKafkaProducer<>(kafkaConnectionString, topic, new JavaDefaultStringSchema());
-				producer.setRuntimeContext(new MockRuntimeContext(1,0));
-				producer.open(new Configuration());
-				
-				final StringBuilder bld = new StringBuilder();
-				final Random rnd = new Random();
-				
-				while (running) {
-					bld.setLength(0);
-					
-					int len = rnd.nextInt(100) + 1;
-					for (int i = 0; i < len; i++) {
-						bld.append((char) (rnd.nextInt(20) + 'a') );
-					}
-					
-					String next = bld.toString();
-					producer.invoke(next);
-				}
-			}
-			catch (Throwable t) {
-				this.error = t;
-			}
-			finally {
-				if (producer != null) {
-					try {
-						producer.close();
-					}
-					catch (Throwable t) {
-						// ignore
-					}
-				}
-			}
-		}
-		
-		public void shutdown() {
-			this.running = false;
-			this.interrupt();
-		}
-		
-		public Throwable getError() {
-			return this.error;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
deleted file mode 100644
index 987e6c5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-
-/**
- * Sink function that discards data.
- * @param <T> The type of the function.
- */
-public class DiscardingSink<T> implements SinkFunction<T> {
-
-	private static final long serialVersionUID = 2777597566520109843L;
-
-	@Override
-	public void invoke(T value) {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
deleted file mode 100644
index 5a8ffaa..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
-		Checkpointed<Integer>, CheckpointNotifier, Runnable {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);
-	
-	private static final long serialVersionUID = 6334389850158707313L;
-	
-	public static volatile boolean failedBefore;
-	public static volatile boolean hasBeenCheckpointedBeforeFailure;
-
-	private final int failCount;
-	private int numElementsTotal;
-	private int numElementsThisTime;
-	
-	private boolean failer;
-	private boolean hasBeenCheckpointed;
-	
-	private Thread printer;
-	private volatile boolean printerRunning = true;
-
-	public FailingIdentityMapper(int failCount) {
-		this.failCount = failCount;
-	}
-
-	@Override
-	public void open(Configuration parameters) {
-		failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
-		printer = new Thread(this, "FailingIdentityMapper Status Printer");
-		printer.start();
-	}
-
-	@Override
-	public T map(T value) throws Exception {
-		numElementsTotal++;
-		numElementsThisTime++;
-		
-		if (!failedBefore) {
-			Thread.sleep(10);
-			
-			if (failer && numElementsTotal >= failCount) {
-				hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
-				failedBefore = true;
-				throw new Exception("Artificial Test Failure");
-			}
-		}
-		return value;
-	}
-
-	@Override
-	public void close() throws Exception {
-		printerRunning = false;
-		if (printer != null) {
-			printer.interrupt();
-			printer = null;
-		}
-	}
-
-	@Override
-	public void notifyCheckpointComplete(long checkpointId) {
-		this.hasBeenCheckpointed = true;
-	}
-
-	@Override
-	public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-		return numElementsTotal;
-	}
-
-	@Override
-	public void restoreState(Integer state) {
-		numElementsTotal = state;
-	}
-
-	@Override
-	public void run() {
-		while (printerRunning) {
-			try {
-				Thread.sleep(5000);
-			}
-			catch (InterruptedException e) {
-				// ignore
-			}
-			LOG.info("============================> Failing mapper  {}: count={}, totalCount={}",
-					getRuntimeContext().getIndexOfThisSubtask(),
-					numElementsThisTime, numElementsTotal);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
deleted file mode 100644
index e94adb5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class JobManagerCommunicationUtils {
-	
-	private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
-	
-	
-	public static void cancelCurrentJob(ActorGateway jobManager) throws Exception {
-		
-		// find the jobID
-		Future<Object> listResponse = jobManager.ask(
-				JobManagerMessages.getRequestRunningJobsStatus(),
-				askTimeout);
-
-		List<JobStatusMessage> jobs;
-		try {
-			Object result = Await.result(listResponse, askTimeout);
-			jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
-		}
-		catch (Exception e) {
-			throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
-		}
-		
-		if (jobs.isEmpty()) {
-			throw new Exception("Could not cancel job - no running jobs");
-		}
-		if (jobs.size() != 1) {
-			throw new Exception("Could not cancel job - more than one running job.");
-		}
-		
-		JobStatusMessage status = jobs.get(0);
-		if (status.getJobState().isTerminalState()) {
-			throw new Exception("Could not cancel job - job is not running any more");
-		}
-		
-		JobID jobId = status.getJobId();
-		
-		Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout);
-		try {
-			Await.result(response, askTimeout);
-		}
-		catch (Exception e) {
-			throw new Exception("Sending the 'cancel' message failed.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
deleted file mode 100644
index b9fc3de..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.DoubleCounter;
-import org.apache.flink.api.common.accumulators.Histogram;
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-public class MockRuntimeContext implements RuntimeContext {
-
-	private final int numberOfParallelSubtasks;
-	private final int indexOfThisSubtask;
-
-	public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
-		this.numberOfParallelSubtasks = numberOfParallelSubtasks;
-		this.indexOfThisSubtask = indexOfThisSubtask;
-	}
-
-
-	@Override
-	public String getTaskName() {
-		return null;
-	}
-
-	@Override
-	public int getNumberOfParallelSubtasks() {
-		return numberOfParallelSubtasks;
-	}
-
-	@Override
-	public int getIndexOfThisSubtask() {
-		return indexOfThisSubtask;
-	}
-
-	@Override
-	public ExecutionConfig getExecutionConfig() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public ClassLoader getUserCodeClassLoader() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public Map<String, Accumulator<?, ?>> getAllAccumulators() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public IntCounter getIntCounter(String name) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public LongCounter getLongCounter(String name) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public DoubleCounter getDoubleCounter(String name) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public Histogram getHistogram(String name) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public <RT> List<RT> getBroadcastVariable(String name) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public DistributedCache getDistributedCache() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
-		throw new UnsupportedOperationException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
deleted file mode 100644
index e105e01..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-
-import java.util.HashSet;
-import java.util.Set;
-
-
-public class PartitionValidatingMapper implements MapFunction<Integer, Integer> {
-
-	private static final long serialVersionUID = 1088381231244959088L;
-	
-	/* the partitions from which this function received data */
-	private final Set<Integer> myPartitions = new HashSet<>();
-	
-	private final int numPartitions;
-	private final int maxPartitions;
-
-	public PartitionValidatingMapper(int numPartitions, int maxPartitions) {
-		this.numPartitions = numPartitions;
-		this.maxPartitions = maxPartitions;
-	}
-
-	@Override
-	public Integer map(Integer value) throws Exception {
-		// validate that the partitioning is identical
-		int partition = value % numPartitions;
-		myPartitions.add(partition);
-		if (myPartitions.size() > maxPartitions) {
-			throw new Exception("Error: Elements from too many different partitions: " + myPartitions
-					+ ". Expect elements only from " + maxPartitions + " partitions");
-		}
-		return value;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java
deleted file mode 100644
index 12e3460..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-/**
- * Exception that is thrown to terminate a program and indicate success.
- */
-public class SuccessException extends Exception {
-	private static final long serialVersionUID = -7011865671593955887L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
deleted file mode 100644
index 1d61229..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-
-/**
- * An identity map function that sleeps between elements, throttling the
- * processing speed.
- * 
- * @param <T> The type mapped.
- */
-public class ThrottledMapper<T> implements MapFunction<T,T> {
-
-	private static final long serialVersionUID = 467008933767159126L;
-
-	private final int sleep;
-
-	public ThrottledMapper(int sleep) {
-		this.sleep = sleep;
-	}
-
-	@Override
-	public T map(T value) throws Exception {
-		Thread.sleep(this.sleep);
-		return value;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
deleted file mode 100644
index b762e21..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-
-import java.io.Serializable;
-
-/**
- * Special partitioner that uses the first field of a 2-tuple as the partition,
- * and that expects a specific number of partitions.
- */
-public class Tuple2Partitioner extends KafkaPartitioner implements Serializable {
-	
-	private static final long serialVersionUID = 1L;
-
-	private final int expectedPartitions;
-
-	
-	public Tuple2Partitioner(int expectedPartitions) {
-		this.expectedPartitions = expectedPartitions;
-	}
-
-	@Override
-	public int partition(Object key, int numPartitions) {
-		if (numPartitions != expectedPartitions) {
-			throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions");
-		}
-		@SuppressWarnings("unchecked")
-		Tuple2<Integer, Integer> element = (Tuple2<Integer, Integer>) key;
-		
-		return element.f0;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
deleted file mode 100644
index f3cc4fa..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.BitSet;
-
-public class ValidatingExactlyOnceSink implements SinkFunction<Integer>, Checkpointed<Tuple2<Integer, BitSet>> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
-
-	private static final long serialVersionUID = 1748426382527469932L;
-	
-	private final int numElementsTotal;
-	
-	private BitSet duplicateChecker = new BitSet();  // this is checkpointed
-
-	private int numElements; // this is checkpointed
-
-	
-	public ValidatingExactlyOnceSink(int numElementsTotal) {
-		this.numElementsTotal = numElementsTotal;
-	}
-
-	
-	@Override
-	public void invoke(Integer value) throws Exception {
-		numElements++;
-		
-		if (duplicateChecker.get(value)) {
-			throw new Exception("Received a duplicate");
-		}
-		duplicateChecker.set(value);
-		if (numElements == numElementsTotal) {
-			// validate
-			if (duplicateChecker.cardinality() != numElementsTotal) {
-				throw new Exception("Duplicate checker has wrong cardinality");
-			}
-			else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
-				throw new Exception("Received sparse sequence");
-			}
-			else {
-				throw new SuccessException();
-			}
-		}
-	}
-
-	@Override
-	public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) {
-		LOG.info("Snapshot of counter "+numElements+" at checkpoint "+checkpointId);
-		return new Tuple2<>(numElements, duplicateChecker);
-	}
-
-	@Override
-	public void restoreState(Tuple2<Integer, BitSet> state) {
-		LOG.info("restoring num elements to {}", state.f0);
-		this.numElements = state.f0;
-		this.duplicateChecker = state.f1;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
deleted file mode 100644
index 6bdfb48..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml
deleted file mode 100644
index 45b3b92..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-    <logger name="org.apache.flink.streaming" level="WARN"/>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml
deleted file mode 100644
index 9168822..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/pom.xml
+++ /dev/null
@@ -1,94 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-streaming-connectors-parent</artifactId>
-		<version>0.10-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-connector-nifi</artifactId>
-	<name>flink-connector-nifi</name>
-
-	<packaging>jar</packaging>
-
-	<!-- Allow users to pass custom connector versions -->
-	<properties>
-		<nifi.version>0.3.0</nifi.version>
-	</properties>
-
-	<dependencies>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-site-to-site-client</artifactId>
-            <version>${nifi.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-core</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-core</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-tests</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-test-utils</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-surefire-plugin</artifactId>
-				<configuration>
-					<rerunFailingTestsCount>3</rerunFailingTestsCount>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-failsafe-plugin</artifactId>
-				<configuration>
-					<rerunFailingTestsCount>3</rerunFailingTestsCount>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
deleted file mode 100644
index c8ceb57..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.nifi;
-
-import java.util.Map;
-
-/**
- * <p>
- * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both
- * a FlowFile's content and its attributes so that they can be processed by Flink.
- * </p>
- */
-public interface NiFiDataPacket {
-
-	/**
-	 * @return the contents of a NiFi FlowFile
-	 */
-	byte[] getContent();
-
-	/**
-	 * @return a Map of attributes that are associated with the NiFi FlowFile
-	 */
-	Map<String, String> getAttributes();
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
deleted file mode 100644
index 9bb521b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.nifi;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-import java.io.Serializable;
-
-/**
- * A function that can create a NiFiDataPacket from an incoming instance of the given type.
- *
- * @param <T>
- */
-public interface NiFiDataPacketBuilder<T> extends Function, Serializable {
-
-	NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
deleted file mode 100644
index abc6b35..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.nifi;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.nifi.remote.Transaction;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-
-/**
- * A sink that delivers data to Apache NiFi using the NiFi Site-to-Site client. The sink requires
- * a NiFiDataPacketBuilder which can create instances of NiFiDataPacket from the incoming data.
- */
-public class NiFiSink<T> extends RichSinkFunction<T> {
-
-	private SiteToSiteClient client;
-	private SiteToSiteClientConfig clientConfig;
-	private NiFiDataPacketBuilder<T> builder;
-
-	/**
-	 * Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder.
-	 *
-	 * @param clientConfig the configuration for building a NiFi SiteToSiteClient
-	 * @param builder a builder to produce NiFiDataPackets from incoming data
-	 */
-	public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder<T> builder) {
-		this.clientConfig = clientConfig;
-		this.builder = builder;
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		this.client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
-	}
-
-	@Override
-	public void invoke(T value) throws Exception {
-		final NiFiDataPacket niFiDataPacket = builder.createNiFiDataPacket(value, getRuntimeContext());
-
-		final Transaction transaction = client.createTransaction(TransferDirection.SEND);
-		if (transaction == null) {
-			throw new IllegalStateException("Unable to create a NiFi Transaction to send data");
-		}
-
-		transaction.send(niFiDataPacket.getContent(), niFiDataPacket.getAttributes());
-		transaction.confirm();
-		transaction.complete();
-	}
-
-	@Override
-	public void close() throws Exception {
-		super.close();
-		client.close();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
deleted file mode 100644
index a213bb4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.nifi;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.nifi.remote.Transaction;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-import org.apache.nifi.remote.protocol.DataPacket;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A source that pulls data from Apache NiFi using the NiFi Site-to-Site client. This source
- * produces NiFiDataPackets which encapsulate the content and attributes of a NiFi FlowFile.
- */
-public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(NiFiSource.class);
-
-	private static final long DEFAULT_WAIT_TIME_MS = 1000;
-
-	private long waitTimeMs;
-	private SiteToSiteClient client;
-	private SiteToSiteClientConfig clientConfig;
-	private transient volatile boolean running;
-
-	/**
-	 * Constructs a new NiFiSource using the given client config and the default wait time of 1000 ms.
-	 *
-	 * @param clientConfig the configuration for building a NiFi SiteToSiteClient
-	 */
-	public NiFiSource(SiteToSiteClientConfig clientConfig) {
-		this(clientConfig, DEFAULT_WAIT_TIME_MS);
-	}
-
-	/**
-	 * Constructs a new NiFiSource using the given client config and wait time.
-	 *
-	 * @param clientConfig the configuration for building a NiFi SiteToSiteClient
-	 * @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available to pull from NiFi
-	 */
-	public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs) {
-		this.clientConfig = clientConfig;
-		this.waitTimeMs = waitTimeMs;
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
-		running = true;
-	}
-
-	@Override
-	public void run(SourceContext<NiFiDataPacket> ctx) throws Exception {
-		try {
-			while (running) {
-				final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
-				if (transaction == null) {
-					LOG.warn("A transaction could not be created, waiting and will try again...");
-					try {
-						Thread.sleep(waitTimeMs);
-					} catch (InterruptedException e) {
-
-					}
-					continue;
-				}
-
-				DataPacket dataPacket = transaction.receive();
-				if (dataPacket == null) {
-					transaction.confirm();
-					transaction.complete();
-
-					LOG.debug("No data available to pull, waiting and will try again...");
-					try {
-						Thread.sleep(waitTimeMs);
-					} catch (InterruptedException e) {
-
-					}
-					continue;
-				}
-
-				final List<NiFiDataPacket> niFiDataPackets = new ArrayList<>();
-				do {
-					// Read the data into a byte array and wrap it along with the attributes
-					// into a NiFiDataPacket.
-					final InputStream inStream = dataPacket.getData();
-					final byte[] data = new byte[(int) dataPacket.getSize()];
-					StreamUtils.fillBuffer(inStream, data);
-
-					final Map<String, String> attributes = dataPacket.getAttributes();
-
-					niFiDataPackets.add(new StandardNiFiDataPacket(data, attributes));
-					dataPacket = transaction.receive();
-				} while (dataPacket != null);
-
-				// Confirm transaction to verify the data
-				transaction.confirm();
-
-				for (NiFiDataPacket dp : niFiDataPackets) {
-					ctx.collect(dp);
-				}
-
-				transaction.complete();
-			}
-		} finally {
-			ctx.close();
-		}
-	}
-
-	@Override
-	public void cancel() {
-		running = false;
-	}
-
-	@Override
-	public void close() throws Exception {
-		super.close();
-		client.close();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
deleted file mode 100644
index 5ad4bae..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.nifi;
-
-import java.io.Serializable;
-import java.util.Map;
-
-/**
- * An implementation of NiFiDataPacket.
- */
-public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
-	private static final long serialVersionUID = 6364005260220243322L;
-
-	private final byte[] content;
-	private final Map<String, String> attributes;
-
-	public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes) {
-		this.content = content;
-		this.attributes = attributes;
-	}
-
-	@Override
-	public byte[] getContent() {
-		return content;
-	}
-
-	@Override
-	public Map<String, String> getAttributes() {
-		return attributes;
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
deleted file mode 100644
index 572f949..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.connectors.nifi.examples;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
-import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder;
-import org.apache.flink.streaming.connectors.nifi.NiFiSink;
-import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-
-import java.util.HashMap;
-
-/**
- * An example topology that sends data to a NiFi input port named "Data from Flink".
- */
-public class NiFiSinkTopologyExample {
-
-	public static void main(String[] args) throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
-				.url("http://localhost:8080/nifi")
-				.portName("Data from Flink")
-				.buildConfig();
-
-		DataStreamSink<String> dataStream = env.fromElements("one", "two", "three", "four", "five", "q")
-				.addSink(new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<String>() {
-					@Override
-					public NiFiDataPacket createNiFiDataPacket(String s, RuntimeContext ctx) {
-						return new StandardNiFiDataPacket(s.getBytes(), new HashMap<String,String>());
-					}
-				}));
-
-		env.execute();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
deleted file mode 100644
index 79c9a1c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.connectors.nifi.examples;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket;
-import org.apache.flink.streaming.connectors.nifi.NiFiSource;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-
-import java.nio.charset.Charset;
-
-/**
- * An example topology that pulls data from a NiFi output port named "Data for Flink".
- */
-public class NiFiSourceTopologyExample {
-
-	public static void main(String[] args) throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
-				.url("http://localhost:8080/nifi")
-				.portName("Data for Flink")
-				.requestBatchCount(5)
-				.buildConfig();
-
-		SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
-		DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource).setParallelism(2);
-
-		DataStream<String> dataStream = streamSource.map(new MapFunction<NiFiDataPacket, String>() {
-			@Override
-			public String map(NiFiDataPacket value) throws Exception {
-				return new String(value.getContent(), Charset.defaultCharset());
-			}
-		});
-
-		dataStream.print();
-		env.execute();
-	}
-
-}


Mime
View raw message