Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4DADD17386 for ; Mon, 2 Feb 2015 18:41:54 +0000 (UTC) Received: (qmail 16419 invoked by uid 500); 2 Feb 2015 18:41:42 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 16344 invoked by uid 500); 2 Feb 2015 18:41:42 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 13937 invoked by uid 99); 2 Feb 2015 18:41:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Feb 2015 18:41:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DFD67E065F; Mon, 2 Feb 2015 18:41:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rmetzger@apache.org To: commits@flink.apache.org Date: Mon, 02 Feb 2015 18:42:21 -0000 Message-Id: <1ce5a170f4174e4abc14b328b95f000b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [43/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging' http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java deleted file mode 100644 index 3cfd7d4..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.flume; - -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.util.SerializationSchema; -import org.apache.flink.streaming.connectors.util.SimpleStringSchema; - -public class FlumeTopology { - - public static void main(String[] args) throws Exception { - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); - - @SuppressWarnings("unused") - DataStream dataStream1 = env.addSource( - new FlumeSource("localhost", 41414, new SimpleStringSchema())).addSink( - new FlumeSink("localhost", 42424, new StringToByteSerializer())); - - env.execute(); - } - - public static class StringToByteSerializer implements SerializationSchema { - - private static final long serialVersionUID = 1L; - - @Override - public byte[] serialize(String element) { - return element.getBytes(); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java deleted file mode 100644 index 0f16541..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.json; - -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.sling.commons.json.JSONException; - -/** - * Abstract class derived from {@link RichFlatMapFunction} to handle JSON files. - * - * @param - * Type of the input elements. - * @param - * Type of the returned elements. - */ -public abstract class JSONParseFlatMap extends RichFlatMapFunction { - - private static final long serialVersionUID = 1L; - - // private static final Log LOG = LogFactory.getLog(JSONParseFlatMap.class); - - /** - * Get the value object associated with a key form a JSON code. It can find - * embedded fields, too. - * - * @param jsonText - * JSON String in which the field is searched. - * @param field - * The key whose value is searched for. - * @return The object associated with the field. - * @throws JSONException - * If the field is not found. - */ - public Object get(String jsonText, String field) throws JSONException { - JSONParser parser = new JSONParser(jsonText); - - return parser.parse(field).get("retValue"); - } - - /** - * Get the boolean value associated with a key form a JSON code. It can find - * embedded fields, too. - * - * @param jsonText - * JSON String in which the field is searched. - * @param field - * The key whose value is searched for. - * @return The object associated with the field. - * @throws JSONException - * If the field is not found. - */ - public boolean getBoolean(String jsonText, String field) throws JSONException { - JSONParser parser = new JSONParser(jsonText); - - return parser.parse(field).getBoolean("retValue"); - } - - /** - * Get the double value associated with a key form a JSON code. It can find - * embedded fields, too. - * - * @param jsonText - * JSON String in which the field is searched. - * @param field - * The key whose value is searched for. - * @return The object associated with the field. - * @throws JSONException - * If the field is not found. - */ - public double getDouble(String jsonText, String field) throws JSONException { - JSONParser parser = new JSONParser(jsonText); - - return parser.parse(field).getDouble("retValue"); - } - - /** - * Get the int value associated with a key form a JSON code. It can find - * embedded fields, too. - * - * @param jsonText - * JSON String in which the field is searched. - * @param field - * The key whose value is searched for. - * @return The object associated with the field. - * @throws JSONException - * If the field is not found. - */ - public int getInt(String jsonText, String field) throws JSONException { - JSONParser parser = new JSONParser(jsonText); - - return parser.parse(field).getInt("retValue"); - } - - /** - * Get the long value associated with a key form a JSON code. It can find - * embedded fields, too. - * - * @param jsonText - * JSON String in which the field is searched. - * @param field - * The key whose value is searched for. - * @return The object associated with the field. - * @throws JSONException - * If the field is not found. - */ - public long getLong(String jsonText, String field) throws JSONException { - JSONParser parser = new JSONParser(jsonText); - - return parser.parse(field).getLong("retValue"); - } - - /** - * Get the String value associated with a key form a JSON code. It can find - * embedded fields, too. - * - * @param jsonText - * JSON String in which the field is searched. - * @param field - * The key whose value is searched for. - * @return The object associated with the field. - * @throws JSONException - * If the field is not found. - */ - public String getString(String jsonText, String field) throws JSONException { - JSONParser parser = new JSONParser(jsonText); - - return parser.parse(field).getString("retValue"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java deleted file mode 100644 index c1eabbd..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.json; - -import java.util.Arrays; -import java.util.List; -import java.util.ListIterator; -import java.util.StringTokenizer; - -import org.apache.sling.commons.json.JSONArray; -import org.apache.sling.commons.json.JSONException; -import org.apache.sling.commons.json.JSONObject; - -/** - * A JSONParser contains a JSONObject and provides opportunity to access - * embedded fields in JSON code. - */ -public class JSONParser { - - private JSONObject originalJO; - private String searchedfield; - private Object temp; - - /** - * Construct a JSONParser from a string. The string has to be a JSON code - * from which we want to get a field. - * - * @param jsonText - * A string which contains a JSON code. String representation of - * a JSON code. - * @throws JSONException - * If there is a syntax error in the source string. - */ - public JSONParser(String jsonText) throws JSONException { - originalJO = new JSONObject(jsonText); - } - - /** - * - * Parse the JSON code passed to the constructor to find the given key. - * - * @param key - * The key whose value is searched for. - * @return A JSONObject which has only one field called "retValue" and the - * value associated to it is the searched value. The methods of - * JSONObject can be used to get the field value in a desired - * format. - * @throws JSONException - * If the key is not found. - */ - public JSONObject parse(String key) throws JSONException { - initializeParser(key); - parsing(); - return putResultInJSONObj(); - } - - /** - * Prepare the fields of the class for the parsing - * - * @param key - * The key whose value is searched for. - * @throws JSONException - * If the key is not found. - */ - private void initializeParser(String key) throws JSONException { - searchedfield = key; - temp = new JSONObject(originalJO.toString()); - } - - /** - * This function goes through the given field and calls the appropriate - * functions to treat the units between the punctuation marks. - * - * @throws JSONException - * If the key is not found. - */ - private void parsing() throws JSONException { - StringTokenizer st = new StringTokenizer(searchedfield, "."); - while (st.hasMoreTokens()) { - find(st.nextToken()); - } - } - - /** - * Search for the next part of the field and update the state if it was - * found. - * - * @param nextToken - * The current part of the searched field. - * @throws JSONException - * If the key is not found. - */ - private void find(String nextToken) throws JSONException { - if (endsWithBracket(nextToken)) { - treatAllBracket(nextToken); - } else { - temp = ((JSONObject) temp).get(nextToken); - } - } - - /** - * Determine whether the given string ends with a closing square bracket ']' - * - * @param nextToken - * The current part of the searched field. - * @return True if the given string ends with a closing square bracket ']' - * and false otherwise. - */ - private boolean endsWithBracket(String nextToken) { - return nextToken.substring(nextToken.length() - 1).endsWith("]"); - } - - /** - * Handle (multidimensional) arrays. Treat the square bracket pairs one - * after the other if necessary. - * - * @param nextToken - * The current part of the searched field. - * @throws JSONException - * If the searched element is not found. - */ - private void treatAllBracket(String nextToken) throws JSONException { - List list = Arrays.asList(nextToken.split("\\[")); - ListIterator iter = list.listIterator(); - - temp = ((JSONObject) temp).get(iter.next()); - - while (iter.hasNext()) { - int index = Integer.parseInt(cutBracket(iter.next())); - temp = ((JSONArray) temp).get(index); - } - } - - /** - * Remove the last character of the string. - * - * @param string - * String to modify. - * @return The given string without the last character. - */ - private String cutBracket(String string) { - return string.substring(0, string.length() - 1); - } - - /** - * Save the result of the search into a JSONObject. - * - * @return A special JSONObject which contain only one key. The value - * associated to this key is the result of the search. - * @throws JSONException - * If there is a problem creating the JSONObject. (e.g. invalid - * syntax) - */ - private JSONObject putResultInJSONObj() throws JSONException { - JSONObject jo = new JSONObject(); - jo.put("retValue", temp); - return jo; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java deleted file mode 100644 index 9bb87a0..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import java.util.Properties; - -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; - -import org.apache.flink.streaming.api.function.sink.RichSinkFunction; -import org.apache.flink.streaming.connectors.util.SerializationSchema; - -public class KafkaSink extends RichSinkFunction { - private static final long serialVersionUID = 1L; - - private kafka.javaapi.producer.Producer producer; - private Properties props; - private String topicId; - private String brokerAddr; - private boolean initDone = false; - private SerializationSchema scheme; - - public KafkaSink(String topicId, String brokerAddr, - SerializationSchema serializationSchema) { - this.topicId = topicId; - this.brokerAddr = brokerAddr; - this.scheme = serializationSchema; - - } - - /** - * Initializes the connection to Kafka. - */ - public void initialize() { - props = new Properties(); - - props.put("metadata.broker.list", brokerAddr); - props.put("serializer.class", "kafka.serializer.StringEncoder"); - props.put("request.required.acks", "1"); - - ProducerConfig config = new ProducerConfig(props); - producer = new Producer(config); - initDone = true; - } - - /** - * Called when new data arrives to the sink, and forwards it to Kafka. - * - * @param next - * The incoming data - */ - @Override - public void invoke(IN next) { - if (!initDone) { - initialize(); - } - - producer.send(new KeyedMessage(topicId, scheme.serialize(next))); - - } - - @Override - public void close() { - producer.close(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java deleted file mode 100644 index 7328500..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.connectors.ConnectorSource; -import org.apache.flink.streaming.connectors.util.DeserializationSchema; -import org.apache.flink.util.Collector; - -public class KafkaSource extends ConnectorSource { - private static final long serialVersionUID = 1L; - - private final String zkQuorum; - private final String groupId; - private final String topicId; - private ConsumerConnector consumer; - - OUT outTuple; - - public KafkaSource(String zkQuorum, String groupId, String topicId, - DeserializationSchema deserializationSchema) { - super(deserializationSchema); - this.zkQuorum = zkQuorum; - this.groupId = groupId; - this.topicId = topicId; - } - - /** - * Initializes the connection to Kafka. - */ - private void initializeConnection() { - Properties props = new Properties(); - props.put("zookeeper.connect", zkQuorum); - props.put("group.id", groupId); - props.put("zookeeper.session.timeout.ms", "2000"); - props.put("zookeeper.sync.time.ms", "200"); - props.put("auto.commit.interval.ms", "1000"); - consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); - } - - /** - * Called to forward the data from the source to the {@link DataStream}. - * - * @param collector - * The Collector for sending data to the dataStream - */ - @Override - public void invoke(Collector collector) throws Exception { - - Map>> consumerMap = consumer - .createMessageStreams(Collections.singletonMap(topicId, 1)); - - KafkaStream stream = consumerMap.get(topicId).get(0); - ConsumerIterator it = stream.iterator(); - - while (it.hasNext()) { - OUT out = schema.deserialize(it.next().message()); - if (schema.isEndOfStream(out)) { - break; - } - collector.collect(out); - } - consumer.shutdown(); - } - - @Override - public void open(Configuration config) { - initializeConnection(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java deleted file mode 100644 index 7801d56..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.source.SourceFunction; -import org.apache.flink.streaming.connectors.util.SimpleStringSchema; -import org.apache.flink.util.Collector; - -public class KafkaTopology { - - public static final class MySource implements SourceFunction { - private static final long serialVersionUID = 1L; - - @Override - public void invoke(Collector collector) throws Exception { - for (int i = 0; i < 10; i++) { - collector.collect(new String(Integer.toString(i))); - } - collector.collect(new String("q")); - - } - } - - public static void main(String[] args) throws Exception { - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - @SuppressWarnings("unused") - DataStream stream1 = env - .addSource( - new KafkaSource("localhost:2181", "group", "test", - new SimpleStringSchema())).print(); - - @SuppressWarnings("unused") - DataStream stream2 = env.addSource(new MySource()).addSink( - new KafkaSink("test", "localhost:9092", new SimpleStringSchema())); - - env.execute(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java deleted file mode 100644 index 38c4f5f..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.rabbitmq; - -import java.io.IOException; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.function.sink.RichSinkFunction; -import org.apache.flink.streaming.connectors.util.SerializationSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; - -public class RMQSink extends RichSinkFunction { - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); - - private String QUEUE_NAME; - private String HOST_NAME; - private transient ConnectionFactory factory; - private transient Connection connection; - private transient Channel channel; - private SerializationSchema scheme; - - public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema schema) { - this.HOST_NAME = HOST_NAME; - this.QUEUE_NAME = QUEUE_NAME; - this.scheme = schema; - } - - /** - * Initializes the connection to RMQ. - */ - public void initializeConnection() { - factory = new ConnectionFactory(); - factory.setHost(HOST_NAME); - try { - connection = factory.newConnection(); - channel = connection.createChannel(); - channel.queueDeclare(QUEUE_NAME, false, false, false, null); - - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** - * Called when new data arrives to the sink, and forwards it to RMQ. - * - * @param value - * The incoming data - */ - @Override - public void invoke(IN value) { - try { - byte[] msg = scheme.serialize(value); - - channel.basicPublish("", QUEUE_NAME, null, msg); - - } catch (IOException e) { - if (LOG.isErrorEnabled()) { - LOG.error("Cannot send RMQ message {} at {}", QUEUE_NAME, HOST_NAME); - } - } - - } - - /** - * Closes the connection. - */ - private void closeChannel() { - try { - channel.close(); - connection.close(); - } catch (IOException e) { - throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME - + " at " + HOST_NAME, e); - } - - } - - @Override - public void open(Configuration config) { - initializeConnection(); - } - - @Override - public void close() { - closeChannel(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java deleted file mode 100755 index 7ce864e..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.rabbitmq; - -import java.io.IOException; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.connectors.ConnectorSource; -import org.apache.flink.streaming.connectors.util.DeserializationSchema; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.QueueingConsumer; - -public class RMQSource extends ConnectorSource { - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class); - - private final String QUEUE_NAME; - private final String HOST_NAME; - - private transient ConnectionFactory factory; - private transient Connection connection; - private transient Channel channel; - private transient QueueingConsumer consumer; - private transient QueueingConsumer.Delivery delivery; - - OUT out; - - public RMQSource(String HOST_NAME, String QUEUE_NAME, - DeserializationSchema deserializationSchema) { - super(deserializationSchema); - this.HOST_NAME = HOST_NAME; - this.QUEUE_NAME = QUEUE_NAME; - } - - /** - * Initializes the connection to RMQ. - */ - private void initializeConnection() { - factory = new ConnectionFactory(); - factory.setHost(HOST_NAME); - try { - connection = factory.newConnection(); - channel = connection.createChannel(); - channel.queueDeclare(QUEUE_NAME, false, false, false, null); - consumer = new QueueingConsumer(channel); - channel.basicConsume(QUEUE_NAME, true, consumer); - } catch (IOException e) { - throw new RuntimeException("Cannot create RMQ connection with " + QUEUE_NAME + " at " - + HOST_NAME, e); - } - } - - /** - * Called to forward the data from the source to the {@link DataStream}. - * - * @param collector - * The Collector for sending data to the dataStream - */ - @Override - public void invoke(Collector collector) throws Exception { - - while (true) { - - try { - delivery = consumer.nextDelivery(); - } catch (Exception e) { - if (LOG.isErrorEnabled()) { - LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME); - } - } - - out = schema.deserialize(delivery.getBody()); - if (schema.isEndOfStream(out)) { - break; - } else { - collector.collect(out); - } - } - - } - - @Override - public void open(Configuration config) { - initializeConnection(); - } - - @Override - public void close() { - try { - connection.close(); - } catch (IOException e) { - throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME - + " at " + HOST_NAME, e); - } - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java deleted file mode 100755 index a6ca9ae..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.rabbitmq; - -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.util.SerializationSchema; -import org.apache.flink.streaming.connectors.util.SimpleStringSchema; - -public class RMQTopology { - - public static void main(String[] args) throws Exception { - - StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); - - @SuppressWarnings("unused") - DataStream dataStream1 = env.addSource( - new RMQSource("localhost", "hello", new SimpleStringSchema())).print(); - - @SuppressWarnings("unused") - DataStream dataStream2 = env.fromElements("one", "two", "three", "four", "five", - "q").addSink( - new RMQSink("localhost", "hello", new StringToByteSerializer())); - - env.execute(); - } - - public static class StringToByteSerializer implements SerializationSchema { - - private static final long serialVersionUID = 1L; - - @Override - public byte[] serialize(String element) { - return element.getBytes(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java deleted file mode 100644 index ddb2538..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.twitter; - -import java.io.FileInputStream; -import java.io.InputStream; -import java.util.Properties; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.function.source.SourceFunction; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.twitter.hbc.ClientBuilder; -import com.twitter.hbc.core.Constants; -import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint; -import com.twitter.hbc.core.processor.StringDelimitedProcessor; -import com.twitter.hbc.httpclient.BasicClient; -import com.twitter.hbc.httpclient.auth.Authentication; -import com.twitter.hbc.httpclient.auth.OAuth1; - -/** - * Implementation of {@link SourceFunction} specialized to emit tweets from - * Twitter. It can connect to Twitter Streaming API, collect tweets and - */ -public class TwitterSource extends RichParallelSourceFunction { - - private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class); - - private static final long serialVersionUID = 1L; - private String authPath; - private transient BlockingQueue queue; - private int queueSize = 10000; - private transient BasicClient client; - private int waitSec = 5; - - private boolean streaming; - private int numberOfTweets; - - /** - * Create {@link TwitterSource} for streaming - * - * @param authPath - * Location of the properties file containing the required - * authentication information. - */ - public TwitterSource(String authPath) { - this.authPath = authPath; - streaming = true; - } - - /** - * Create {@link TwitterSource} to collect finite number of tweets - * - * @param authPath - * Location of the properties file containing the required - * authentication information. - * @param numberOfTweets - * - */ - public TwitterSource(String authPath, int numberOfTweets) { - this.authPath = authPath; - streaming = false; - this.numberOfTweets = numberOfTweets; - } - - @Override - public void open(Configuration parameters) throws Exception { - initializeConnection(); - } - - @Override - public void invoke(Collector collector) throws Exception { - - if (streaming) { - collectMessages(collector); - } else { - collectFiniteMessages(collector); - } - } - - @Override - public void close() throws Exception { - closeConnection(); - } - - /** - * Initialize Hosebird Client to be able to consume Twitter's Streaming API - */ - private void initializeConnection() { - - if (LOG.isInfoEnabled()) { - LOG.info("Initializing Twitter Streaming API connection"); - } - - queue = new LinkedBlockingQueue(queueSize); - - StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint(); - endpoint.stallWarnings(false); - - Authentication auth = authenticate(); - - initializeClient(endpoint, auth); - - if (LOG.isInfoEnabled()) { - LOG.info("Twitter Streaming API connection established successfully"); - } - } - - private OAuth1 authenticate() { - - Properties authenticationProperties = loadAuthenticationProperties(); - - return new OAuth1(authenticationProperties.getProperty("consumerKey"), - authenticationProperties.getProperty("consumerSecret"), - authenticationProperties.getProperty("token"), - authenticationProperties.getProperty("secret")); - } - - /** - * Reads the given properties file for the authentication data. - * - * @return the authentication data. - */ - private Properties loadAuthenticationProperties() { - Properties properties = new Properties(); - try { - InputStream input = new FileInputStream(authPath); - properties.load(input); - input.close(); - } catch (Exception e) { - throw new RuntimeException("Cannot open .properties file: " + authPath, e); - } - return properties; - } - - private void initializeClient(StatusesSampleEndpoint endpoint, Authentication auth) { - - client = new ClientBuilder().name("twitterSourceClient").hosts(Constants.STREAM_HOST) - .endpoint(endpoint).authentication(auth) - .processor(new StringDelimitedProcessor(queue)).build(); - - client.connect(); - } - - /** - * Put tweets into collector - * - * @param collector - * Collector in which the tweets are collected. - */ - protected void collectFiniteMessages(Collector collector) { - - if (LOG.isInfoEnabled()) { - LOG.info("Collecting tweets"); - } - - for (int i = 0; i < numberOfTweets; i++) { - collectOneMessage(collector); - } - - if (LOG.isInfoEnabled()) { - LOG.info("Collecting tweets finished"); - } - } - - /** - * Put tweets into collector - * - * @param collector - * Collector in which the tweets are collected. - */ - protected void collectMessages(Collector collector) { - - if (LOG.isInfoEnabled()) { - LOG.info("Tweet-stream begins"); - } - - while (true) { - collectOneMessage(collector); - } - } - - /** - * Put one tweet into the collector. - * - * @param collector - * Collector in which the tweets are collected. - */ - protected void collectOneMessage(Collector collector) { - if (client.isDone()) { - if (LOG.isErrorEnabled()) { - LOG.error("Client connection closed unexpectedly: {}", client.getExitEvent() - .getMessage()); - } - } - - try { - String msg = queue.poll(waitSec, TimeUnit.SECONDS); - if (msg != null) { - collector.collect(msg); - } else { - if (LOG.isInfoEnabled()) { - LOG.info("Did not receive a message in {} seconds", waitSec); - } - } - } catch (InterruptedException e) { - throw new RuntimeException("'Waiting for tweet' thread is interrupted", e); - } - - } - - private void closeConnection() { - - if (LOG.isInfoEnabled()) { - LOG.info("Initiating connection close"); - } - - client.stop(); - - if (LOG.isInfoEnabled()) { - LOG.info("Connection closed successfully"); - } - } - - /** - * Get the size of the queue in which the tweets are contained temporarily. - * - * @return the size of the queue in which the tweets are contained temporarily - */ - public int getQueueSize() { - return queueSize; - } - - /** - * Set the size of the queue in which the tweets are contained temporarily. - * - * @param queueSize - * The desired value. - */ - public void setQueueSize(int queueSize) { - this.queueSize = queueSize; - } - - /** - * This function tells how long TwitterSource waits for the tweets. - * - * @return Number of second. - */ - public int getWaitSec() { - return waitSec; - } - - /** - * This function sets how long TwitterSource should wait for the tweets. - * - * @param waitSec - * The desired value. - */ - public void setWaitSec(int waitSec) { - this.waitSec = waitSec; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java deleted file mode 100644 index a32fe1b..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.twitter; - -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.sink.SinkFunction; -import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; -import org.apache.flink.util.Collector; -import org.apache.sling.commons.json.JSONException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TwitterStreaming { - - private static final int PARALLELISM = 1; - private static final int SOURCE_PARALLELISM = 1; - private static final int NUMBEROFTWEETS = 100; - - private static final Logger LOG = LoggerFactory.getLogger(TwitterStreaming.class); - - public static class TwitterSink implements SinkFunction> { - - private static final long serialVersionUID = 1L; - - @Override - public void invoke(Tuple5 tuple) { - System.out.println("ID: " + tuple.f0 + " int: " + tuple.f1 + " LANGUAGE: " + tuple.f2); - System.out.println("NAME: " + tuple.f4); - System.out.println("TEXT: " + tuple.f3); - System.out.println(""); - } - - } - - public static class SelectDataFlatMap extends - JSONParseFlatMap> { - - private static final long serialVersionUID = 1L; - - @Override - public void flatMap(String value, Collector> out) - throws Exception { - try { - out.collect(new Tuple5( - getLong(value, "id"), - getInt(value, "entities.hashtags[0].indices[1]"), - getString(value, "lang"), - getString(value, "text"), - getString(value, "user.name"))); - } catch (JSONException e) { - if (LOG.isErrorEnabled()) { - LOG.error("Field not found"); - } - } - } - } - - public static void main(String[] args) throws Exception { - - String path = new String(); - - if (args != null && args.length == 1) { - path = args[0]; - } else { - System.err.println("USAGE:\nTwitterStreaming "); - return; - } - - StreamExecutionEnvironment env = StreamExecutionEnvironment - .createLocalEnvironment(PARALLELISM); - - DataStream streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS)) - .setParallelism(SOURCE_PARALLELISM); - - DataStream> selectedDataStream = streamSource - .flatMap(new SelectDataFlatMap()); - - selectedDataStream.addSink(new TwitterSink()); - - env.execute(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java deleted file mode 100644 index 4bc6df0..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.twitter; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; -import org.apache.flink.util.Collector; -import org.apache.sling.commons.json.JSONException; - -/** - * This program demonstrate the use of TwitterSource. - * Its aim is to count the frequency of the languages of tweets - */ -public class TwitterTopology { - - private static final int NUMBEROFTWEETS = 100; - - /** - * FlatMapFunction to determine the language of tweets if possible - */ - public static class SelectLanguageFlatMap extends - JSONParseFlatMap { - - private static final long serialVersionUID = 1L; - - /** - * Select the language from the incoming JSON text - */ - @Override - public void flatMap(String value, Collector out) throws Exception { - try{ - out.collect(getString(value, "lang")); - } - catch (JSONException e){ - out.collect(""); - } - } - - } - - public static void main(String[] args) throws Exception { - - String path = new String(); - - if (args != null && args.length == 1) { - path = args[0]; - } else { - System.err.println("USAGE:\nTwitterLocal "); - return; - } - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStream streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS)); - - - DataStream> dataStream = streamSource - .flatMap(new SelectLanguageFlatMap()) - .map(new MapFunction>() { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2 map(String value) throws Exception { - return new Tuple2(value, 1); - } - }) - .groupBy(0) - .sum(1); - - dataStream.print(); - - env.execute(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java deleted file mode 100644 index 4507a1d..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.util; - -import java.io.Serializable; - -public interface DeserializationSchema extends Serializable { - - /** - * Deserializes the incoming data. - * - * @param message - * The incoming message in a byte array - * @return The deserialized message in the required format. - */ - public T deserialize(byte[] message); - - /** - * Method to decide whether the element signals the end of the stream. If - * true is returned the element won't be emitted - * - * @param nextElement - * The element to test for end signal - * @return The end signal, if true the stream shuts down - */ - public boolean isEndOfStream(T nextElement); -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java deleted file mode 100644 index 29c749a..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.util; - -public class RawSchema implements DeserializationSchema, - SerializationSchema { - - private static final long serialVersionUID = 1L; - - @Override - public byte[] deserialize(byte[] message) { - return message; - } - - @Override - public boolean isEndOfStream(byte[] nextElement) { - return false; - } - - @Override - public byte[] serialize(byte[] element) { - return element; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java deleted file mode 100644 index 7c32312..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.util; - -import java.io.Serializable; - -public interface SerializationSchema extends Serializable { - - /** - * Serializes the incoming element to a specified type. - * - * @param element - * The incoming element to be serialized - * @return The serialized element. - */ - public R serialize(T element); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java deleted file mode 100644 index 4b21580..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.util; - -public class SimpleStringSchema implements DeserializationSchema, - SerializationSchema { - - private static final long serialVersionUID = 1L; - - @Override - public String deserialize(byte[] message) { - return new String(message); - } - - @Override - public boolean isEndOfStream(String nextElement) { - return false; - } - - @Override - public String serialize(String element) { - return element; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/DBStateTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/DBStateTest.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/DBStateTest.java deleted file mode 100644 index fe3783b..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/DBStateTest.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.flink.streaming.connectors.db; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.util.HashMap; - -import org.junit.Ignore; -import org.junit.Test; - -public class DBStateTest { - - public void stateTest(DBState state) { - state.put("k1", 1); - assertEquals(Integer.valueOf(1), state.get("k1")); - - state.put("k2", 2); - state.put("k3", 3); - assertEquals(Integer.valueOf(2), state.get("k2")); - assertEquals(Integer.valueOf(3), state.get("k3")); - state.remove("k2"); - - try { - state.get("k2"); - fail(); - } catch (Exception e) { - } - } - - private void iteratorTest(DBStateWithIterator state) { - HashMap expected = new HashMap(); - HashMap result = new HashMap(); - - state.put("10", 10); - state.put("20", 20); - state.put("30", 30); - - expected.put("10", 10); - expected.put("20", 20); - expected.put("30", 30); - - DBStateIterator iterator = state.getIterator(); - while (iterator.hasNext()) { - String key = iterator.getNextKey(); - Integer value = iterator.getNextValue(); - result.put(key, value); - iterator.next(); - } - state.close(); - - assertEquals(expected, result); - } - - // TODO - @Ignore("Creates files with no licenses") - @Test - public void levelDBTest() { - LevelDBState state = new LevelDBState("test"); - stateTest(state); - state.close(); - - state = new LevelDBState("test"); - iteratorTest(state); - state.close(); - } - - @Ignore("Needs running Memcached") - @Test - public void memcachedTest() { - MemcachedState state = new MemcachedState(); - stateTest(state); - state.close(); - } - - @Ignore("Needs running Redis") - @Test - public void redisTest() { - RedisState state = new RedisState(); - stateTest(state); - state.close(); - - state = new RedisState(); - iteratorTest(state); - state.close(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java deleted file mode 100644 index b1d4115..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.json; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.Arrays; -import java.util.Collection; - -import org.apache.flink.streaming.connectors.json.JSONParser; -import org.apache.sling.commons.json.JSONException; -import org.apache.sling.commons.json.JSONObject; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -@RunWith(Parameterized.class) -public class JSONParserTest { - - private String jsonText; - private String searchedField; - - public JSONParserTest(String text, String field) { - jsonText = text; - searchedField = field; - } - - @Parameters - public static Collection initParameterList() { - - Object[][] parameterList = new Object[][] { - { "{\"key\":\"value\"}", "key" }, - { "{\"key\":[\"value\"]}", "key[0]" }, - { "{\"key\":[{\"key\":\"value\"}]}", "key[0].key" }, - { "{\"key\":[{\"key\":[{\"key\":\"value\"}]}]}", "key[0].key[0].key"}, - { "{\"key\":[1,[{\"key\":\"value\"}]]}", "key[1][0].key" }, - { "{\"key\":[1,[[\"key\",2,\"value\"]]]}", "key[1][0][2]" }, - { "{\"key\":{\"key\":{\"otherKey\":\"wrongValue\",\"key\":\"value\"},\"otherKey\":\"wrongValue\"},\"otherKey\":\"wrongValue\"}" , "key.key.key"} - }; - - return Arrays.asList(parameterList); - } - - @Test - public void test() { - try { - JSONParser parser = new JSONParser(jsonText); - JSONObject jo = parser.parse(searchedField); - String expected = "{\"retValue\":\"value\"}"; - - assertTrue(expected.equals(jo.toString())); - } - catch (JSONException e) { - fail(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java deleted file mode 100644 index 8851086..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.json; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import org.apache.flink.streaming.connectors.json.JSONParser; -import org.apache.sling.commons.json.JSONException; -import org.apache.sling.commons.json.JSONObject; -import org.junit.Test; - - -public class JSONParserTest2 { - - @Test - public void testGetBooleanFunction() { - String jsonText = "{\"key\":true}"; - String searchedField = "key"; - try { - JSONParser parser = new JSONParser(jsonText); - JSONObject jo = parser.parse(searchedField); - - assertTrue(jo.getBoolean("retValue")); - } - catch (JSONException e) { - fail(); - } - } - - @Test - public void testGetDoubleFunction() { - double expected = 12345.12345; - String jsonText = "{\"key\":" + expected + "}"; - String searchedField = "key"; - try { - JSONParser parser = new JSONParser(jsonText); - JSONObject jo = parser.parse(searchedField); - - assertEquals(expected,jo.getDouble("retValue"),0.000001); - } - catch (JSONException e) { - fail(); - } - } - - @Test - public void testGetIntFunction() { - int expected = 15; - String jsonText = "{\"key\":" + expected + "}"; - String searchedField = "key"; - try { - JSONParser parser = new JSONParser(jsonText); - JSONObject jo = parser.parse(searchedField); - - assertEquals(expected,jo.getInt("retValue")); - } - catch (JSONException e) { - fail(); - } - } - - @Test - public void testGetLongFunction() { - long expected = 111111111111L; - String jsonText = "{\"key\":" + expected + "}"; - String searchedField = "key"; - try { - JSONParser parser = new JSONParser(jsonText); - JSONObject jo = parser.parse(searchedField); - - assertEquals(expected,jo.getLong("retValue")); - } - catch (JSONException e) { - fail(); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties deleted file mode 100644 index 2fb9345..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,19 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=OFF \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-connectors/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/resources/logback-test.xml b/flink-addons/flink-streaming/flink-streaming-connectors/src/test/resources/logback-test.xml deleted file mode 100644 index 45b3b92..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/test/resources/logback-test.xml +++ /dev/null @@ -1,30 +0,0 @@ - - - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n - - - - - - - - \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/pom.xml ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/pom.xml b/flink-addons/flink-streaming/flink-streaming-core/pom.xml deleted file mode 100644 index a60bd01..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/pom.xml +++ /dev/null @@ -1,75 +0,0 @@ - - - - - 4.0.0 - - - org.apache.flink - flink-streaming-parent - 0.9-SNAPSHOT - .. - - - flink-streaming-core - flink-streaming-core - - jar - - - - - org.apache.commons - commons-math - 2.2 - - - - org.apache.flink - flink-test-utils - ${project.version} - test - - - - org.apache.sling - org.apache.sling.commons.json - 2.0.6 - - - - - - - org.apache.maven.plugins - maven-jar-plugin - - - - test-jar - - - - - - - -