flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [1/2] flink git commit: [hotfix][kafka] Undo DataGenerators changes (use inline kafka producer again
Date Thu, 13 Oct 2016 08:24:15 GMT
Repository: flink
Updated Branches:
  refs/heads/master 15df71ba9 -> 3d5bca0ab


[hotfix][kafka] Undo DataGenerators changes (use inline kafka producer again


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/744f8ebb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/744f8ebb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/744f8ebb

Branch: refs/heads/master
Commit: 744f8ebb66b2a7288942be139cd7a7e6d1170c80
Parents: 15df71b
Author: Robert Metzger <rmetzger@apache.org>
Authored: Tue Oct 11 15:48:32 2016 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Wed Oct 12 14:03:14 2016 +0200

----------------------------------------------------------------------
 .../kafka/KafkaTestEnvironmentImpl.java         |   3 -
 .../connectors/kafka/Kafka09ITCase.java         |   9 -
 .../connectors/kafka/KafkaConsumerTestBase.java | 242 +------------------
 .../kafka/testutils/DataGenerators.java         | 165 ++++++-------
 4 files changed, 72 insertions(+), 347 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index af6d254..78fc1c6 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -115,9 +115,6 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema,
props, partitioner);
 		prod.setFlushOnCheckpoint(true);
 		return stream.addSink(prod);
-	/*	FlinkKafkaProducer010.FlinkKafkaProducer010Configuration<T> sink = FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream,
topic, serSchema, props, partitioner);
-		sink.setFlushOnCheckpoint(true);
-		return sink; */
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index fd167a0..b9ec18a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -35,15 +35,6 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 		runSimpleConcurrentProducerConsumerTopology();
 	}
 
-//	@Test(timeout = 60000)
-//	public void testPunctuatedExplicitWMConsumer() throws Exception {
-//		runExplicitPunctuatedWMgeneratingConsumerTest(false);
-//	}
-
-//	@Test(timeout = 60000)
-//	public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception {
-//		runExplicitPunctuatedWMgeneratingConsumerTest(true);
-//	}
 
 	@Test(timeout = 60000)
 	public void testKeyValueSupport() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 3c967ba..0810a3e 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
@@ -31,7 +29,6 @@ import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -39,13 +36,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.table.StreamTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
@@ -68,7 +62,6 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -92,7 +85,6 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.testutils.junit.RetryOnException;
 import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.StringUtils;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.junit.Assert;
@@ -116,7 +108,6 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
@@ -517,7 +508,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		// launch a producer thread
 		DataGenerators.InfiniteStringsGenerator generator =
-				new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic, flinkPort);
+				new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic);
 		generator.start();
 
 		// launch a consumer asynchronously
@@ -571,7 +562,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		assertTrue(failueCause.getMessage().contains("Job was cancelled"));
 
 		if (generator.isAlive()) {
-			JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "String
generator");
 			generator.shutdown();
 			generator.join();
 		}
@@ -1723,234 +1713,4 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase
{
 			this.numElementsTotal = state;
 		}
 	}
-
-	/////////////			Testing the Kafka consumer with embeded watermark generation functionality
		///////////////
-
-//	@RetryOnException(times=0, exception=kafka.common.NotLeaderForPartitionException.class)
-//	public void runExplicitPunctuatedWMgeneratingConsumerTest(boolean emptyPartition) throws
Exception {
-//
-//		final String topic1 = "wmExtractorTopic1_" + UUID.randomUUID().toString();
-//		final String topic2 = "wmExtractorTopic2_" + UUID.randomUUID().toString();
-//
-//		final Map<String, Boolean> topics = new HashMap<>();
-//		topics.put(topic1, false);
-//		topics.put(topic2, emptyPartition);
-//
-//		final int noOfTopcis = topics.size();
-//		final int partitionsPerTopic = 1;
-//		final int elementsPerPartition = 100 + 1;
-//
-//		final int totalElements = emptyPartition ?
-//			partitionsPerTopic * elementsPerPartition :
-//			noOfTopcis * partitionsPerTopic * elementsPerPartition;
-//
-//		createTestTopic(topic1, partitionsPerTopic, 1);
-//		createTestTopic(topic2, partitionsPerTopic, 1);
-//
-//		final StreamExecutionEnvironment env =
-//			StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-//		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-//		env.setParallelism(partitionsPerTopic);
-//		env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
-//		env.getConfig().disableSysoutLogging();
-//
-//		TypeInformation<Tuple2<Long, Integer>> longIntType = TypeInfoParser.parse("Tuple2<Long,
Integer>");
-//
-//		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
-//		producerProperties.setProperty("retries", "0");
-//
-//		putDataInTopics(env, producerProperties, elementsPerPartition, topics, longIntType);
-//
-//		List<String> topicTitles = new ArrayList<>(topics.keySet());
-//		runPunctuatedComsumer(env, topicTitles, totalElements, longIntType);
-//
-//		executeAndCatchException(env, "runComsumerWithPunctuatedExplicitWMTest");
-//
-//		for(String topic: topicTitles) {
-//			deleteTestTopic(topic);
-//		}
-//	}
-//
-//	private void executeAndCatchException(StreamExecutionEnvironment env, String execName)
throws Exception {
-//		try {
-//			tryExecutePropagateExceptions(env, execName);
-//		}
-//		catch (ProgramInvocationException | JobExecutionException e) {
-//			// look for NotLeaderForPartitionException
-//			Throwable cause = e.getCause();
-//
-//			// search for nested SuccessExceptions
-//			int depth = 0;
-//			while (cause != null && depth++ < 20) {
-//				if (cause instanceof kafka.common.NotLeaderForPartitionException) {
-//					throw (Exception) cause;
-//				}
-//				cause = cause.getCause();
-//			}
-//			throw e;
-//		}
-//	}
-//
-//	private void putDataInTopics(StreamExecutionEnvironment env,
-//								Properties producerProperties,
-//								final int elementsPerPartition,
-//								Map<String, Boolean> topics,
-//								TypeInformation<Tuple2<Long, Integer>> outputTypeInfo) {
-//		if(topics.size() != 2) {
-//			throw new RuntimeException("This method accepts two topics as arguments.");
-//		}
-//
-//		TypeInformationSerializationSchema<Tuple2<Long, Integer>> sinkSchema =
-//			new TypeInformationSerializationSchema<>(outputTypeInfo, env.getConfig());
-//
-//		DataStream<Tuple2<Long, Integer>> stream = env
-//			.addSource(new RichParallelSourceFunction<Tuple2<Long, Integer>>() {
-//				private boolean running = true;
-//
-//				@Override
-//				public void run(SourceContext<Tuple2<Long, Integer>> ctx) throws InterruptedException
{
-//					int topic = 0;
-//					int currentTs = 1;
-//
-//					while (running && currentTs < elementsPerPartition) {
-//						long timestamp = (currentTs % 10 == 0) ? -1L : currentTs;
-//						ctx.collect(new Tuple2<Long, Integer>(timestamp, topic));
-//						currentTs++;
-//					}
-//
-//					Tuple2<Long, Integer> toWrite2 = new Tuple2<Long, Integer>(-1L, topic);
-//					ctx.collect(toWrite2);
-//				}
-//
-//				@Override
-//				public void cancel() {
-//				running = false;
-//			}
-//			}).setParallelism(1);
-//
-//		List<Map.Entry<String, Boolean>> topicsL = new ArrayList<>(topics.entrySet());
-//
-//		stream = stream.map(new MapFunction<Tuple2<Long,Integer>, Tuple2<Long,Integer>>()
{
-//
-//			@Override
-//			public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws Exception
{
-//				return value;
-//			}
-//		}).setParallelism(1);
-//		kafkaServer.produceIntoKafka(stream, topicsL.get(0).getKey(),
-//			new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null).setParallelism(1);
-//
-//		if(!topicsL.get(1).getValue()) {
-//			stream.map(new MapFunction<Tuple2<Long,Integer>, Tuple2<Long,Integer>>()
{
-//
-//				@Override
-//				public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws Exception
{
-//					long timestamp = (value.f0 == -1) ? -1L : 1000 + value.f0;
-//					return new Tuple2<>(timestamp, 1);
-//				}
-//			}).setParallelism(1).addSink(kafkaServer.produceIntoKafka(topicsL.get(1).getKey(),
-//				new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null)).setParallelism(1);
-//		}
-//	}
-
-	private DataStreamSink<Tuple2<Long, Integer>> runPunctuatedComsumer(StreamExecutionEnvironment
env,
-																		List<String> topics,
-																		final int totalElementsToExpect,
-																		TypeInformation<Tuple2<Long, Integer>> inputTypeInfo) {
-
-		TypeInformationSerializationSchema<Tuple2<Long, Integer>> sourceSchema =
-			new TypeInformationSerializationSchema<>(inputTypeInfo, env.getConfig());
-
-		Properties props = new Properties();
-		props.putAll(standardProps);
-		props.putAll(secureProps);
-		FlinkKafkaConsumerBase<Tuple2<Long, Integer>> source = kafkaServer
-			.getConsumer(topics, sourceSchema, props)
-			.assignTimestampsAndWatermarks(new TestPunctuatedTSExtractor());
-
-		DataStreamSource<Tuple2<Long, Integer>> consuming = env.setParallelism(1).addSource(source);
-
-		return consuming
-			.transform("testingWatermarkOperator", inputTypeInfo, new WMTestingOperator())
-			.addSink(new RichSinkFunction<Tuple2<Long, Integer>>() {
-
-				private int elementCount = 0;
-
-				@Override
-				public void invoke(Tuple2<Long, Integer> value) throws Exception {
-					elementCount++;
-					if (elementCount == totalElementsToExpect) {
-						throw new SuccessException();
-					}
-				}
-
-				@Override
-				public void close() throws Exception {
-					super.close();
-				}
-			});
-	}
-
-	/** An extractor that emits a Watermark whenever the timestamp <b>in the record</b>
is equal to {@code -1}. */
-	private static class TestPunctuatedTSExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<Long,
Integer>> {
-
-		@Override
-		public Watermark checkAndGetNextWatermark(Tuple2<Long, Integer> lastElement, long
extractedTimestamp) {
-			return (lastElement.f0 == -1) ? new Watermark(extractedTimestamp) : null;
-		}
-
-		@Override
-		public long extractTimestamp(Tuple2<Long, Integer> element, long previousElementTimestamp)
{
-			return element.f0;
-		}
-	}
-
-	private static class WMTestingOperator extends AbstractStreamOperator<Tuple2<Long,
Integer>> implements OneInputStreamOperator<Tuple2<Long, Integer>, Tuple2<Long,
Integer>> {
-
-		private long lastReceivedWatermark = Long.MIN_VALUE;
-
-		private Map<Integer, Boolean> isEligible = new HashMap<>();
-		private Map<Integer, Long> perPartitionMaxTs = new HashMap<>();
-
-		WMTestingOperator() {
-			isEligible = new HashMap<>();
-			perPartitionMaxTs = new HashMap<>();
-		}
-
-		@Override
-		public void processElement(StreamRecord<Tuple2<Long, Integer>> element) throws
Exception {
-			int partition = element.getValue().f1;
-			Long maxTs = perPartitionMaxTs.get(partition);
-			if(maxTs == null || maxTs < element.getValue().f0) {
-				perPartitionMaxTs.put(partition, element.getValue().f0);
-				isEligible.put(partition, element.getValue().f0 > lastReceivedWatermark);
-			}
-			output.collect(element);
-		}
-
-		@Override
-		public void processWatermark(Watermark mark) throws Exception {
-			int partition = -1;
-			long minTS = Long.MAX_VALUE;
-			for (Integer part : perPartitionMaxTs.keySet()) {
-				Long ts = perPartitionMaxTs.get(part);
-				if (ts < minTS && isEligible.get(part)) {
-					partition = part;
-					minTS = ts;
-					lastReceivedWatermark = ts;
-				}
-			}
-			isEligible.put(partition, false);
-
-			assertEquals(minTS, mark.getTimestamp());
-			output.emitWatermark(mark);
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			perPartitionMaxTs.clear();
-			isEligible.clear();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 3f035fd..ba75212 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -18,74 +18,35 @@
 
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
 import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 
-import java.io.Serializable;
+import java.util.Collection;
 import java.util.Properties;
 import java.util.Random;
 
 @SuppressWarnings("serial")
 public class DataGenerators {
-	
-	public static void generateLongStringTupleSequence(StreamExecutionEnvironment env,
-													   KafkaTestEnvironment testServer, String topic,
-													   int numPartitions,
-													   final int from, final int to) throws Exception {
 
-		TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer,
Integer>");
-
-		env.setParallelism(numPartitions);
-		env.getConfig().disableSysoutLogging();
-		env.setRestartStrategy(RestartStrategies.noRestart());
-		
-		DataStream<Tuple2<Integer, Integer>> stream =env.addSource(
-				new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
-
-					private volatile boolean running = true;
-
-					@Override
-					public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception
{
-						int cnt = from;
-						int partition = getRuntimeContext().getIndexOfThisSubtask();
-
-						while (running && cnt <= to) {
-							ctx.collect(new Tuple2<>(partition, cnt));
-							cnt++;
-						}
-					}
-
-					@Override
-					public void cancel() {
-						running = false;
-					}
-				});
-
-		testServer.produceIntoKafka(stream, topic,
-				new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(resultType,
env.getConfig())),
-				FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()),
-				new Tuple2Partitioner(numPartitions)
-		);
-
-		env.execute("Data generator (Int, Int) stream to topic " + topic);
-	}
-
-	// ------------------------------------------------------------------------
-	
 	public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
 														 KafkaTestEnvironment testServer, String topic,
 														 final int numPartitions,
@@ -105,9 +66,9 @@ public class DataGenerators {
 						// create a sequence
 						int[] elements = new int[numElements];
 						for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask();
-								i < numElements;
-								i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
-							
+							 i < numElements;
+							 i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
+
 							elements[i] = val;
 						}
 
@@ -116,7 +77,7 @@ public class DataGenerators {
 							Random rnd = new Random();
 							for (int i = 0; i < elements.length; i++) {
 								int otherPos = rnd.nextInt(elements.length);
-								
+
 								int tmp = elements[i];
 								elements[i] = elements[otherPos];
 								elements[otherPos] = tmp;
@@ -142,7 +103,7 @@ public class DataGenerators {
 		if(secureProps != null) {
 			props.putAll(testServer.getSecureProperties());
 		}
-		
+
 		stream = stream.rebalance();
 		testServer.produceIntoKafka(stream, topic,
 				new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO,
env.getConfig())),
@@ -156,63 +117,55 @@ public class DataGenerators {
 
 		env.execute("Scrambles int sequence generator");
 	}
-	
+
 	// ------------------------------------------------------------------------
-	
-	public static class InfiniteStringsGenerator extends Thread implements Serializable {
 
-		private transient KafkaTestEnvironment server;
-		
-		private final String topic;
+	public static class InfiniteStringsGenerator extends Thread {
 
-		private final int flinkPort;
+		private final KafkaTestEnvironment server;
+
+		private final String topic;
 
 		private volatile Throwable error;
-		
+
 		private volatile boolean running = true;
 
-		
-		public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic, int flinkPort)
{
+
+		public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic) {
 			this.server = server;
 			this.topic = topic;
-			this.flinkPort = flinkPort;
 		}
 
 		@Override
 		public void run() {
 			// we manually feed data into the Kafka sink
-			FlinkKafkaProducerBase<String> producer = null;
+			RichFunction producer = null;
 			try {
-				final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost",
flinkPort);
-				DataStream<String> stream = env.addSource(new SourceFunction<String>() {
-					@Override
-					public void run(SourceContext<String> ctx) throws Exception {
-						final StringBuilder bld = new StringBuilder();
-						final Random rnd = new Random();
-						while (running) {
-							bld.setLength(0);
-							int len = rnd.nextInt(100) + 1;
-							for (int i = 0; i < len; i++) {
-								bld.append((char) (rnd.nextInt(20) + 'a'));
-							}
+				Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString());
+				producerProperties.setProperty("retries", "3");
+				StreamTransformation<String> mockTransform = new MockStreamTransformation();
+				DataStream<String> stream = new DataStream<>(new DummyStreamExecutionEnvironment(),
mockTransform);
+				DataStreamSink<String> sink = server.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new
SimpleStringSchema()),
+						producerProperties, new FixedPartitioner<String>());
+				StreamSink<String> producerOperator = sink.getTransformation().getOperator();
+				producer = (RichFunction) producerOperator.getUserFunction();
+				producer.setRuntimeContext(new MockRuntimeContext(1,0));
+				producer.open(new Configuration());
 
-							String next = bld.toString();
-							ctx.collect(next);
-						}
-					}
+				final StringBuilder bld = new StringBuilder();
+				final Random rnd = new Random();
 
-					@Override
-					public void cancel() {
-						running = false;
+				while (running) {
+					bld.setLength(0);
+
+					int len = rnd.nextInt(100) + 1;
+					for (int i = 0; i < len; i++) {
+						bld.append((char) (rnd.nextInt(20) + 'a') );
 					}
-				});
 
-				Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString());
-				producerProperties.setProperty("retries", "3");
-				server.produceIntoKafka(stream, topic,
-						new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
-						producerProperties, new FixedPartitioner<String>());
-				env.execute("String generator");
+					String next = bld.toString();
+					producerOperator.processElement(new StreamRecord<>(next));
+				}
 			}
 			catch (Throwable t) {
 				this.error = t;
@@ -228,14 +181,38 @@ public class DataGenerators {
 				}
 			}
 		}
-		
+
 		public void shutdown() {
 			this.running = false;
 			this.interrupt();
 		}
-		
+
 		public Throwable getError() {
 			return this.error;
 		}
+
+		private static class MockStreamTransformation extends StreamTransformation<String>
{
+			public MockStreamTransformation() {
+				super("MockTransform", TypeInfoParser.<String>parse("String"), 1);
+			}
+
+			@Override
+			public void setChainingStrategy(ChainingStrategy strategy) {
+
+			}
+
+			@Override
+			public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+				return null;
+			}
+		}
+
+		public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment
{
+
+			@Override
+			public JobExecutionResult execute(String jobName) throws Exception {
+				return null;
+			}
+		}
 	}
-}
+}
\ No newline at end of file


Mime
View raw message