flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [2/7] flink git commit: [FLINK-1874] [streaming] Connectors separated into individual projects
Date Thu, 28 May 2015 13:36:28 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
deleted file mode 100644
index 032ed08..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api.persistent;
-
-import com.google.common.base.Preconditions;
-import kafka.common.TopicAndPartition;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import kafka.utils.ZKGroupTopicDirs;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.collection.JavaConversions;
-import scala.collection.Seq;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Source for reading from Kafka using Flink Streaming Fault Tolerance.
- * This source is updating the committed offset in Zookeeper based on the internal checkpointing of Flink.
- *
- * Note that the autocommit feature of Kafka needs to be disabled for using this source.
- */
-public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> implements
-		ResultTypeQueryable<OUT>,
-		CheckpointCommitter,
-		CheckpointedAsynchronously<long[]> {
-	private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
-
-	protected transient ConsumerConfig consumerConfig;
-	private transient ConsumerIterator<byte[], byte[]> iteratorToRead;
-	private transient ConsumerConnector consumer;
-
-	private String topicName;
-	private DeserializationSchema<OUT> deserializationSchema;
-	private boolean running = true;
-
-	private transient long[] lastOffsets;
-	private transient ZkClient zkClient;
-	private transient long[] commitedOffsets; // maintain committed offsets, to avoid committing the same over and over again.
-
-	// We set this in reachedEnd to carry it over to next()
-	private OUT nextElement = null;
-
-	/**
-	 *
-	 * For the @param consumerConfig, specify at least the "groupid" and "zookeeper.connect" string.
-	 * The config will be passed into the Kafka High Level Consumer.
-	 * For a full list of possible values, check this out: https://kafka.apache.org/documentation.html#consumerconfigs
-	 */
-	public PersistentKafkaSource(String topicName, DeserializationSchema<OUT> deserializationSchema, ConsumerConfig consumerConfig) {
-		Preconditions.checkNotNull(topicName);
-		Preconditions.checkNotNull(deserializationSchema);
-		Preconditions.checkNotNull(consumerConfig);
-
-		this.topicName = topicName;
-		this.deserializationSchema = deserializationSchema;
-		this.consumerConfig = consumerConfig;
-		if(consumerConfig.autoCommitEnable()) {
-			throw new IllegalArgumentException("'auto.commit.enable' is set to 'true'. " +
-					"This source can only be used with auto commit disabled because the " +
-					"source is committing to zookeeper by itself (not using the KafkaConsumer).");
-		}
-		if(!consumerConfig.offsetsStorage().equals("zookeeper")) {
-			// we can currently only commit to ZK.
-			throw new IllegalArgumentException("The 'offsets.storage' has to be set to 'zookeeper' for this Source to work reliably");
-		}
-	}
-
-	// ---------------------- ParallelSourceFunction Lifecycle -----------------
-
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		ConsumerConnector consumer = Consumer.createJavaConsumerConnector(this.consumerConfig);
-		// we request only one stream per consumer instance. Kafka will make sure that each consumer group
-		// will see each message only once.
-		Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
-		Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumer.createMessageStreams(topicCountMap);
-		if(streams.size() != 1) {
-			throw new RuntimeException("Expected only one message stream but got "+streams.size());
-		}
-		List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
-		if(kafkaStreams == null) {
-			throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
-		}
-		if(kafkaStreams.size() != 1) {
-			throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
-		}
-		LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, consumerConfig.groupId());
-		this.iteratorToRead = kafkaStreams.get(0).iterator();
-		this.consumer = consumer;
-
-		zkClient = new ZkClient(consumerConfig.zkConnect(),
-			consumerConfig.zkSessionTimeoutMs(),
-			consumerConfig.zkConnectionTimeoutMs(),
-			new KafkaZKStringSerializer());
-
-		// most likely the number of offsets we're going to store here will be lower than the number of partitions.
-		int numPartitions = getNumberOfPartitions();
-		LOG.debug("The topic {} has {} partitions", topicName, numPartitions);
-		this.lastOffsets = new long[numPartitions];
-		this.commitedOffsets = new long[numPartitions];
-		Arrays.fill(this.lastOffsets, -1);
-		Arrays.fill(this.commitedOffsets, 0); // just to make it clear
-
-		nextElement = null;
-	}
-
-	@Override
-	public boolean reachedEnd() throws Exception {
-		if (nextElement != null) {
-			return false;
-		}
-
-		while (iteratorToRead.hasNext()) {
-			MessageAndMetadata<byte[], byte[]> message = iteratorToRead.next();
-			if(lastOffsets[message.partition()] >= message.offset()) {
-				LOG.info("Skipping message with offset {} from partition {}", message.offset(), message.partition());
-				continue;
-			}
-			lastOffsets[message.partition()] = message.offset();
-
-			OUT out = deserializationSchema.deserialize(message.message());
-			if (deserializationSchema.isEndOfStream(out)) {
-				LOG.info("DeserializationSchema signaled end of stream for this source");
-				break;
-			}
-
-			nextElement = out;
-			if (LOG.isTraceEnabled()) {
-				LOG.trace("Processed record with offset {} from partition {}", message.offset(), message.partition());
-			}
-			break;
-		}
-
-		return nextElement == null;
-	}
-
-	@Override
-	public OUT next() throws Exception {
-		if (!reachedEnd()) {
-			OUT result = nextElement;
-			nextElement = null;
-			return result;
-		} else {
-			throw new RuntimeException("Source exhausted");
-		}
-	}
-
-	@Override
-	public void close() {
-		LOG.info("Closing Kafka consumer");
-		this.consumer.shutdown();
-		zkClient.close();
-	}
-
-
-	// ---------------------- State / Checkpoint handling  -----------------
-	// this source is keeping the partition offsets in Zookeeper
-
-	private Map<Long, long[]> pendingCheckpoints = new HashMap<Long, long[]>();
-
-	@Override
-	public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-		if(lastOffsets == null) {
-			LOG.warn("State snapshot requested on not yet opened source. Returning null");
-			return null;
-		}
-		LOG.info("Snapshotting state. Offsets: {}, checkpoint id {}, timestamp {}", Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
-
-		long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length);
-		pendingCheckpoints.put(checkpointId, currentOffsets);
-		return currentOffsets;
-	}
-
-	@Override
-	public void restoreState(long[] state) {
-		// we maintain the offsets in Kafka, so nothing to do.
-	}
-
-
-	/**
-	 * Notification on completed checkpoints
-	 * @param checkpointId The ID of the checkpoint that has been completed.
-	 */
-	@Override
-	public void commitCheckpoint(long checkpointId) {
-		LOG.info("Commit checkpoint {}", checkpointId);
-		long[] checkpointOffsets = pendingCheckpoints.remove(checkpointId);
-		if(checkpointOffsets == null) {
-			LOG.warn("Unable to find pending checkpoint for id {}", checkpointId);
-			return;
-		}
-		LOG.info("Got corresponding offsets {}", Arrays.toString(checkpointOffsets));
-
-		for(int partition = 0; partition < checkpointOffsets.length; partition++) {
-			long offset = checkpointOffsets[partition];
-			if(offset != -1) {
-				setOffset(partition, offset);
-			}
-		}
-	}
-
-	// --------------------- Zookeeper / Offset handling -----------------------------
-
-	private int getNumberOfPartitions() {
-		scala.collection.immutable.List<String> scalaSeq = JavaConversions.asScalaBuffer(Collections.singletonList(topicName)).toList();
-		scala.collection.mutable.Map<String, Seq<Object>> list =  ZkUtils.getPartitionsForTopics(zkClient, scalaSeq);
-		Option<Seq<Object>> topicOption = list.get(topicName);
-		if(topicOption.isEmpty()) {
-			throw new IllegalArgumentException("Unable to get number of partitions for topic "+topicName+" from "+list.toString());
-		}
-		Seq<Object> topic = topicOption.get();
-		return topic.size();
-	}
-
-	protected void setOffset(int partition, long offset) {
-		if(commitedOffsets[partition] < offset) {
-			setOffset(zkClient, consumerConfig.groupId(), topicName, partition, offset);
-			commitedOffsets[partition] = offset;
-		} else {
-			LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
-		}
-	}
-
-
-
-	// the following two methods are static to allow access from the outside as well (Testcases)
-
-	/**
-	 * This method's code is based on ZookeeperConsumerConnector.commitOffsetToZooKeeper()
-	 */
-	public static void setOffset(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
-		LOG.info("Setting offset for partition {} of topic {} in group {} to offset {}", partition, topic, groupId, offset);
-		TopicAndPartition tap = new TopicAndPartition(topic, partition);
-		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
-		ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset));
-	}
-
-	public static long getOffset(ZkClient zkClient, String groupId, String topic, int partition) {
-		TopicAndPartition tap = new TopicAndPartition(topic, partition);
-		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
-		scala.Tuple2<String, Stat> data = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition());
-		return Long.valueOf(data._1());
-	}
-
-
-	// ---------------------- (Java)Serialization methods for the consumerConfig -----------------
-
-	private void writeObject(ObjectOutputStream out)
-			throws IOException, ClassNotFoundException {
-		out.defaultWriteObject();
-		out.writeObject(consumerConfig.props().props());
-	}
-
-	private void readObject(ObjectInputStream in)
-			throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-		Properties props = (Properties) in.readObject();
-		consumerConfig = new ConsumerConfig(props);
-	}
-
-
-	@Override
-	public TypeInformation<OUT> getProducedType() {
-		return deserializationSchema.getProducedType();
-	}
-
-
-	// ---------------------- Zookeeper Serializer copied from Kafka (because it has private access there)  -----------------
-
-	public static class KafkaZKStringSerializer implements ZkSerializer {
-
-		@Override
-		public byte[] serialize(Object data) throws ZkMarshallingError {
-			try {
-				return ((String) data).getBytes("UTF-8");
-			} catch (UnsupportedEncodingException e) {
-				throw new RuntimeException(e);
-			}
-		}
-
-		@Override
-		public Object deserialize(byte[] bytes) throws ZkMarshallingError {
-			if (bytes == null) {
-				return null;
-			} else {
-				try {
-					return new String(bytes, "UTF-8");
-				} catch (UnsupportedEncodingException e) {
-					throw new RuntimeException(e);
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaConstantPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaConstantPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaConstantPartitioner.java
deleted file mode 100644
index 661d0bd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaConstantPartitioner.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-public class KafkaConstantPartitioner implements SerializableKafkaPartitioner {
-
-	private static final long serialVersionUID = 1L;
-	private int partition;
-
-	public KafkaConstantPartitioner(int partition) {
-		this.partition = partition;
-	}
-
-	@Override
-	public int partition(Object value, int numberOfPartitions) {
-		return partition;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/SerializableKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/SerializableKafkaPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/SerializableKafkaPartitioner.java
deleted file mode 100644
index 77a774e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/SerializableKafkaPartitioner.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-import kafka.producer.Partitioner;
-import java.io.Serializable;
-
-public interface SerializableKafkaPartitioner extends Serializable, Partitioner {
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
deleted file mode 100644
index fa729d6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.rabbitmq;
-
-import java.io.IOException;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-
-public class RMQSink<IN> extends RichSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);
-
-	private String QUEUE_NAME;
-	private String HOST_NAME;
-	private transient ConnectionFactory factory;
-	private transient Connection connection;
-	private transient Channel channel;
-	private SerializationSchema<IN, byte[]> schema;
-
-	public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema<IN, byte[]> schema) {
-		this.HOST_NAME = HOST_NAME;
-		this.QUEUE_NAME = QUEUE_NAME;
-		this.schema = schema;
-	}
-
-	/**
-	 * Initializes the connection to RMQ.
-	 */
-	public void initializeConnection() {
-		factory = new ConnectionFactory();
-		factory.setHost(HOST_NAME);
-		try {
-			connection = factory.newConnection();
-			channel = connection.createChannel();
-			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	/**
-	 * Called when new data arrives to the sink, and forwards it to RMQ.
-	 * 
-	 * @param value
-	 *            The incoming data
-	 */
-	@Override
-	public void invoke(IN value) {
-		try {
-			byte[] msg = schema.serialize(value);
-
-			channel.basicPublish("", QUEUE_NAME, null, msg);
-
-		} catch (IOException e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
-			}
-		}
-
-	}
-
-	/**
-	 * Closes the connection.
-	 */
-	private void closeChannel() {
-		try {
-			channel.close();
-			connection.close();
-		} catch (IOException e) {
-			throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
-					+ " at " + HOST_NAME, e);
-		}
-
-	}
-
-	@Override
-	public void open(Configuration config) {
-		initializeConnection();
-	}
-
-	@Override
-	public void close() {
-		closeChannel();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
deleted file mode 100644
index bc087f8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.rabbitmq;
-
-import java.io.IOException;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.QueueingConsumer;
-
-public class RMQSource<OUT> extends ConnectorSource<OUT> {
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
-
-	private final String QUEUE_NAME;
-	private final String HOST_NAME;
-
-	private transient ConnectionFactory factory;
-	private transient Connection connection;
-	private transient Channel channel;
-	private transient QueueingConsumer consumer;
-	private transient QueueingConsumer.Delivery delivery;
-
-	private volatile boolean isRunning = false;
-
-	OUT out;
-
-	public RMQSource(String HOST_NAME, String QUEUE_NAME,
-			DeserializationSchema<OUT> deserializationSchema) {
-		super(deserializationSchema);
-		this.HOST_NAME = HOST_NAME;
-		this.QUEUE_NAME = QUEUE_NAME;
-	}
-
-	/**
-	 * Initializes the connection to RMQ.
-	 */
-	private void initializeConnection() {
-		factory = new ConnectionFactory();
-		factory.setHost(HOST_NAME);
-		try {
-			connection = factory.newConnection();
-			channel = connection.createChannel();
-			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-			consumer = new QueueingConsumer(channel);
-			channel.basicConsume(QUEUE_NAME, true, consumer);
-		} catch (IOException e) {
-			throw new RuntimeException("Cannot create RMQ connection with " + QUEUE_NAME + " at "
-					+ HOST_NAME, e);
-		}
-	}
-
-	@Override
-	public void open(Configuration config) throws Exception {
-		initializeConnection();
-	}
-
-	@Override
-	public void close() throws Exception {
-		super.close();
-		try {
-			connection.close();
-		} catch (IOException e) {
-			throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
-					+ " at " + HOST_NAME, e);
-		}
-	}
-
-	@Override
-	public boolean reachedEnd() throws Exception {
-		if (out != null) {
-			return true;
-		}
-		try {
-			delivery = consumer.nextDelivery();
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
-			}
-		}
-
-		out = schema.deserialize(delivery.getBody());
-		if (schema.isEndOfStream(out)) {
-			out = null;
-			return false;
-		}
-		return true;
-	}
-
-	@Override
-	public OUT next() throws Exception {
-		if (out != null) {
-			OUT result = out;
-			out = null;
-			return result;
-		}
-
-		try {
-			delivery = consumer.nextDelivery();
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
-			}
-		}
-
-		out = schema.deserialize(delivery.getBody());
-		if (schema.isEndOfStream(out)) {
-			throw new RuntimeException("RMQ source is at end.");
-		}
-		OUT result = out;
-		out = null;
-		return result;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
deleted file mode 100644
index 0f06235..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.rabbitmq;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-public class RMQTopology {
-
-	public static void main(String[] args) throws Exception {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		@SuppressWarnings("unused")
-		DataStream<String> dataStream1 = env.addSource(
-				new RMQSource<String>("localhost", "hello", new SimpleStringSchema())).print();
-
-		@SuppressWarnings("unused")
-		DataStream<String> dataStream2 = env.fromElements("one", "two", "three", "four", "five",
-				"q").addSink(
-				new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
-
-		env.execute();
-	}
-
-	public static class StringToByteSerializer implements SerializationSchema<String, byte[]> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public byte[] serialize(String element) {
-			return element.getBytes();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
deleted file mode 100644
index 78e4aa5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.twitter;
-
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.twitter.hbc.ClientBuilder;
-import com.twitter.hbc.core.Constants;
-import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
-import com.twitter.hbc.core.processor.StringDelimitedProcessor;
-import com.twitter.hbc.httpclient.BasicClient;
-import com.twitter.hbc.httpclient.auth.Authentication;
-import com.twitter.hbc.httpclient.auth.OAuth1;
-
-/**
- * Implementation of {@link SourceFunction} specialized to emit tweets from
- * Twitter. It can connect to Twitter Streaming API, collect tweets and
- */
-public class TwitterSource extends RichParallelSourceFunction<String> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);
-
-	private static final long serialVersionUID = 1L;
-	private String authPath;
-	private transient BlockingQueue<String> queue;
-	private int queueSize = 10000;
-	private transient BasicClient client;
-	private int waitSec = 5;
-
-	private int maxNumberOfTweets;
-	private int currentNumberOfTweets;
-
-	private String nextElement = null;
-
-	private volatile boolean isRunning = false;
-
-	/**
-	 * Create {@link TwitterSource} for streaming
-	 * 
-	 * @param authPath
-	 *            Location of the properties file containing the required
-	 *            authentication information.
-	 */
-	public TwitterSource(String authPath) {
-		this.authPath = authPath;
-		maxNumberOfTweets = -1;
-	}
-
-	/**
-	 * Create {@link TwitterSource} to collect finite number of tweets
-	 * 
-	 * @param authPath
-	 *            Location of the properties file containing the required
-	 *            authentication information.
-	 * @param numberOfTweets
-	 * 
-	 */
-	public TwitterSource(String authPath, int numberOfTweets) {
-		this.authPath = authPath;
-		this.maxNumberOfTweets = numberOfTweets;
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		initializeConnection();
-		currentNumberOfTweets = 0;
-	}
-
-	/**
-	 * Initialize Hosebird Client to be able to consume Twitter's Streaming API
-	 */
-	private void initializeConnection() {
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Initializing Twitter Streaming API connection");
-		}
-
-		queue = new LinkedBlockingQueue<String>(queueSize);
-
-		StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
-		endpoint.stallWarnings(false);
-
-		Authentication auth = authenticate();
-
-		initializeClient(endpoint, auth);
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Twitter Streaming API connection established successfully");
-		}
-	}
-
-	private OAuth1 authenticate() {
-
-		Properties authenticationProperties = loadAuthenticationProperties();
-
-		return new OAuth1(authenticationProperties.getProperty("consumerKey"),
-				authenticationProperties.getProperty("consumerSecret"),
-				authenticationProperties.getProperty("token"),
-				authenticationProperties.getProperty("secret"));
-	}
-
-	/**
-	 * Reads the given properties file for the authentication data.
-	 * 
-	 * @return the authentication data.
-	 */
-	private Properties loadAuthenticationProperties() {
-		Properties properties = new Properties();
-		try {
-			InputStream input = new FileInputStream(authPath);
-			properties.load(input);
-			input.close();
-		} catch (Exception e) {
-			throw new RuntimeException("Cannot open .properties file: " + authPath, e);
-		}
-		return properties;
-	}
-
-	private void initializeClient(StatusesSampleEndpoint endpoint, Authentication auth) {
-
-		client = new ClientBuilder().name("twitterSourceClient").hosts(Constants.STREAM_HOST)
-				.endpoint(endpoint).authentication(auth)
-				.processor(new StringDelimitedProcessor(queue)).build();
-
-		client.connect();
-	}
-
-	/**
-	 * Put tweets into output
-	 * 
-	 * @param collector
-	 *            Collector in which the tweets are collected.
-	 */
-	protected void collectFiniteMessages(Collector<String> collector) {
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Collecting tweets");
-		}
-
-		for (int i = 0; i < maxNumberOfTweets; i++) {
-			collectOneMessage(collector);
-		}
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Collecting tweets finished");
-		}
-	}
-
-	/**
-	 * Put tweets into output
-	 * 
-	 * @param collector
-	 *            Collector in which the tweets are collected.
-	 */
-	protected void collectMessages(Collector<String> collector) {
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Tweet-stream begins");
-		}
-
-		while (isRunning) {
-			collectOneMessage(collector);
-		}
-	}
-
-	/**
-	 * Put one tweet into the output.
-	 * 
-	 * @param collector
-	 *            Collector in which the tweets are collected.
-	 */
-	protected void collectOneMessage(Collector<String> collector) {
-		if (client.isDone()) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Client connection closed unexpectedly: {}", client.getExitEvent()
-						.getMessage());
-			}
-		}
-
-		try {
-			String msg = queue.poll(waitSec, TimeUnit.SECONDS);
-			if (msg != null) {
-				collector.collect(msg);
-			} else {
-				if (LOG.isInfoEnabled()) {
-					LOG.info("Did not receive a message in {} seconds", waitSec);
-				}
-			}
-		} catch (InterruptedException e) {
-			throw new RuntimeException("'Waiting for tweet' thread is interrupted", e);
-		}
-
-	}
-
-	private void closeConnection() {
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Initiating connection close");
-		}
-
-		client.stop();
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Connection closed successfully");
-		}
-	}
-
-	/**
-	 * Get the size of the queue in which the tweets are contained temporarily.
-	 * 
-	 * @return the size of the queue in which the tweets are contained
-	 *         temporarily
-	 */
-	public int getQueueSize() {
-		return queueSize;
-	}
-
-	/**
-	 * Set the size of the queue in which the tweets are contained temporarily.
-	 * 
-	 * @param queueSize
-	 *            The desired value.
-	 */
-	public void setQueueSize(int queueSize) {
-		this.queueSize = queueSize;
-	}
-
-	/**
-	 * This function tells how long TwitterSource waits for the tweets.
-	 * 
-	 * @return Number of second.
-	 */
-	public int getWaitSec() {
-		return waitSec;
-	}
-
-	/**
-	 * This function sets how long TwitterSource should wait for the tweets.
-	 * 
-	 * @param waitSec
-	 *            The desired value.
-	 */
-	public void setWaitSec(int waitSec) {
-		this.waitSec = waitSec;
-	}
-
-	@Override
-	public boolean reachedEnd() throws Exception {
-		if (currentNumberOfTweets >= maxNumberOfTweets) {
-			return false;
-		}
-
-		if (nextElement != null) {
-			return true;
-		}
-		if (client.isDone()) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Client connection closed unexpectedly: {}", client.getExitEvent()
-						.getMessage());
-			}
-			return false;
-		}
-
-		try {
-			String msg = queue.poll(waitSec, TimeUnit.SECONDS);
-			if (msg != null) {
-				nextElement = msg;
-				return true;
-			} else {
-				if (LOG.isInfoEnabled()) {
-					LOG.info("Did not receive a message in {} seconds", waitSec);
-				}
-			}
-		} catch (InterruptedException e) {
-			throw new RuntimeException("'Waiting for tweet' thread is interrupted", e);
-		}
-		return false;
-	}
-
-	@Override
-	public String next() throws Exception {
-		if (nextElement != null) {
-			String result = nextElement;
-			nextElement = null;
-			return result;
-		}
-		if (reachedEnd()) {
-			throw new RuntimeException("Twitter stream end reached.");
-		} else {
-			String result = nextElement;
-			nextElement = null;
-			return result;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
deleted file mode 100644
index e500fef..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.twitter;
-
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
-import org.apache.flink.util.Collector;
-import org.apache.sling.commons.json.JSONException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TwitterStreaming {
-
-	private static final int PARALLELISM = 1;
-	private static final int SOURCE_PARALLELISM = 1;
-	private static final int NUMBEROFTWEETS = 100;
-
-	private static final Logger LOG = LoggerFactory.getLogger(TwitterStreaming.class);
-
-	public static class TwitterSink implements SinkFunction<Tuple5<Long, Integer, String, String, String>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Tuple5<Long, Integer, String, String, String> tuple) {
-			System.out.println("ID: " + tuple.f0 + " int: " + tuple.f1 + " LANGUAGE: " + tuple.f2);
-			System.out.println("NAME: " + tuple.f4);
-			System.out.println("TEXT: " + tuple.f3);
-			System.out.println("");
-		}
-
-	}
-
-	public static class SelectDataFlatMap extends
-			JSONParseFlatMap<String, Tuple5<Long, Integer, String, String, String>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(String value, Collector<Tuple5<Long, Integer, String, String, String>> out)
-				throws Exception {
-			try {
-				out.collect(new Tuple5<Long, Integer, String, String, String>(
-						getLong(value, "id"),
-						getInt(value, "entities.hashtags[0].indices[1]"),
-						getString(value, "lang"),
-						getString(value, "text"),
-						getString(value, "user.name")));
-			} catch (JSONException e) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Field not found");
-				}
-			} 
-		}
-	}
-
-	public static void main(String[] args) throws Exception {
-
-		String path = new String();
-
-		if (args != null && args.length == 1) {
-			path = args[0];
-		} else {
-			System.err.println("USAGE:\nTwitterStreaming <pathToPropertiesFile>");
-			return;
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
-
-		DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS))
-				.setParallelism(SOURCE_PARALLELISM);
-
-		DataStream<Tuple5<Long, Integer, String, String, String>> selectedDataStream = streamSource
-				.flatMap(new SelectDataFlatMap());
-
-		selectedDataStream.addSink(new TwitterSink());
-
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
deleted file mode 100644
index 4bc6df0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.twitter;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
-import org.apache.flink.util.Collector;
-import org.apache.sling.commons.json.JSONException;
-
-/**
- * This program demonstrate the use of TwitterSource. 
- * Its aim is to count the frequency of the languages of tweets
- */
-public class TwitterTopology {
-
-	private static final int NUMBEROFTWEETS = 100;
-	
-	/**
-	 * FlatMapFunction to determine the language of tweets if possible 
-	 */
-	public static class SelectLanguageFlatMap extends
-			JSONParseFlatMap<String, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		/**
-		 * Select the language from the incoming JSON text
-		 */
-		@Override
-		public void flatMap(String value, Collector<String> out) throws Exception {
-			try{
-				out.collect(getString(value, "lang"));
-			}
-			catch (JSONException e){
-				out.collect("");
-			}
-		}
-
-	}
-
-	public static void main(String[] args) throws Exception {
-
-		String path = new String();
-
-		if (args != null && args.length == 1) {
-			path = args[0];
-		} else {
-			System.err.println("USAGE:\nTwitterLocal <pathToPropertiesFile>");
-			return;
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS));
-
-
-		DataStream<Tuple2<String, Integer>> dataStream = streamSource
-				.flatMap(new SelectLanguageFlatMap())
-				.map(new MapFunction<String, Tuple2<String, Integer>>() {
-					private static final long serialVersionUID = 1L;
-					
-					@Override
-					public Tuple2<String, Integer> map(String value) throws Exception {
-						return new Tuple2<String, Integer>(value, 1);
-					}
-				})
-				.groupBy(0)
-				.sum(1);
-
-		dataStream.print();
-
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
deleted file mode 100644
index b1d4115..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.json;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-import org.apache.flink.streaming.connectors.json.JSONParser;
-import org.apache.sling.commons.json.JSONException;
-import org.apache.sling.commons.json.JSONObject;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(Parameterized.class)
-public class JSONParserTest {
-
-	private String jsonText;
-	private String searchedField;
-
-	public JSONParserTest(String text, String field) {
-		jsonText = text;
-		searchedField = field;
-	}
-
-	@Parameters
-	public static Collection<Object[]> initParameterList() {
-
-		Object[][] parameterList = new Object[][] { 
-				{ "{\"key\":\"value\"}", 							"key" },
-				{ "{\"key\":[\"value\"]}", 							"key[0]" },
-				{ "{\"key\":[{\"key\":\"value\"}]}", 				"key[0].key" },
-				{ "{\"key\":[{\"key\":[{\"key\":\"value\"}]}]}", 	"key[0].key[0].key"},
-				{ "{\"key\":[1,[{\"key\":\"value\"}]]}", 			"key[1][0].key" },
-				{ "{\"key\":[1,[[\"key\",2,\"value\"]]]}", 			"key[1][0][2]" },
-				{ "{\"key\":{\"key\":{\"otherKey\":\"wrongValue\",\"key\":\"value\"},\"otherKey\":\"wrongValue\"},\"otherKey\":\"wrongValue\"}" , "key.key.key"}
-				};
-
-		return Arrays.asList(parameterList);
-	}
-
-	@Test
-	public void test() {
-		try {
-			JSONParser parser = new JSONParser(jsonText);
-			JSONObject jo = parser.parse(searchedField);
-			String expected = "{\"retValue\":\"value\"}";
-
-			assertTrue(expected.equals(jo.toString()));
-		} 
-		catch (JSONException e) {
-			fail();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
deleted file mode 100644
index 8851086..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.json;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.streaming.connectors.json.JSONParser;
-import org.apache.sling.commons.json.JSONException;
-import org.apache.sling.commons.json.JSONObject;
-import org.junit.Test;
-
-
-public class JSONParserTest2 {
-	
-	@Test
-	public void testGetBooleanFunction() {
-		String jsonText = "{\"key\":true}";
-		String searchedField = "key";
-		try {
-			JSONParser parser = new JSONParser(jsonText);
-			JSONObject jo = parser.parse(searchedField);
-
-			assertTrue(jo.getBoolean("retValue"));
-		} 
-		catch (JSONException e) {
-			fail();
-		}
-	}
-	
-	@Test
-	public void testGetDoubleFunction() {
-		double expected = 12345.12345;
-		String jsonText = "{\"key\":" + expected + "}";
-		String searchedField = "key";
-		try {
-			JSONParser parser = new JSONParser(jsonText);
-			JSONObject jo = parser.parse(searchedField);
-
-			assertEquals(expected,jo.getDouble("retValue"),0.000001);
-		} 
-		catch (JSONException e) {
-			fail();
-		}
-	}
-	
-	@Test
-	public void testGetIntFunction() {
-		int expected = 15;
-		String jsonText = "{\"key\":" + expected + "}";
-		String searchedField = "key";
-		try {
-			JSONParser parser = new JSONParser(jsonText);
-			JSONObject jo = parser.parse(searchedField);
-
-			assertEquals(expected,jo.getInt("retValue"));
-		} 
-		catch (JSONException e) {
-			fail();
-		}
-	}
-
-	@Test
-	public void testGetLongFunction() {
-		long expected = 111111111111L;
-		String jsonText = "{\"key\":" + expected + "}";
-		String searchedField = "key";
-		try {
-			JSONParser parser = new JSONParser(jsonText);
-			JSONObject jo = parser.parse(searchedField);
-
-			assertEquals(expected,jo.getLong("retValue"));
-		} 
-		catch (JSONException e) {
-			fail();
-		}
-	}
-	
-}


Mime
View raw message