Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 252D7200BE0 for ; Fri, 2 Dec 2016 14:34:56 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2408A160B2F; Fri, 2 Dec 2016 13:34:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 87BF0160B38 for ; Fri, 2 Dec 2016 14:34:53 +0100 (CET) Received: (qmail 70452 invoked by uid 500); 2 Dec 2016 13:34:52 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 69276 invoked by uid 99); 2 Dec 2016 13:34:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Dec 2016 13:34:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9B1CBF2139; Fri, 2 Dec 2016 13:34:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Fri, 02 Dec 2016 13:35:07 -0000 Message-Id: In-Reply-To: <627ed550fa9e4aed8e3752516869c22e@git.apache.org> References: <627ed550fa9e4aed8e3752516869c22e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [17/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module. archived-at: Fri, 02 Dec 2016 13:34:56 -0000 http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml b/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml deleted file mode 100644 index 51f55b3..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml +++ /dev/null @@ -1,83 +0,0 @@ - - - - - 4.0.0 - - - org.apache.flink - flink-streaming-connectors - 1.2-SNAPSHOT - .. - - - flink-connector-elasticsearch2_2.10 - flink-connector-elasticsearch2 - - jar - - - - 2.3.5 - - - - - - - - org.apache.flink - flink-streaming-java_2.10 - ${project.version} - provided - - - - org.elasticsearch - elasticsearch - ${elasticsearch.version} - - - - com.fasterxml.jackson.core - jackson-core - - - - - - org.apache.flink - flink-test-utils_2.10 - ${project.version} - test - - - - org.apache.flink - flink-streaming-java_2.10 - ${project.version} - test - test-jar - - - - http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java deleted file mode 100644 index 650931f..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/BulkProcessorIndexer.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch2; - -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.bulk.BulkProcessor; - -public class BulkProcessorIndexer implements RequestIndexer { - private final BulkProcessor bulkProcessor; - - public BulkProcessorIndexer(BulkProcessor bulkProcessor) { - this.bulkProcessor = bulkProcessor; - } - - @Override - public void add(ActionRequest... actionRequests) { - for (ActionRequest actionRequest : actionRequests) { - this.bulkProcessor.add(actionRequest); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java deleted file mode 100644 index e839589..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch2; - -import com.google.common.collect.ImmutableList; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.util.Preconditions; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Sink that emits its input elements in bulk to an Elasticsearch cluster. - * - *

- * When using the second constructor - * {@link #ElasticsearchSink(java.util.Map, java.util.List, ElasticsearchSinkFunction)} a {@link TransportClient} will - * be used. - * - *

- * Attention: When using the {@code TransportClient} the sink will fail if no cluster - * can be connected to. - * - *

- * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating - * {@link TransportClient}. The config keys can be found in the Elasticsearch - * documentation. An important setting is {@code cluster.name}, this should be set to the name - * of the cluster that the sink should emit to. - * - *

- * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}. - * This will buffer elements before sending a request to the cluster. The behaviour of the - * {@code BulkProcessor} can be configured using these config keys: - *

    - *
  • {@code bulk.flush.max.actions}: Maximum amount of elements to buffer - *
  • {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer - *
  • {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two - * settings in milliseconds - *
- * - *

- * You also have to provide an {@link RequestIndexer}. This is used to create an - * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See - * {@link RequestIndexer} for an example. - * - * @param Type of the elements emitted by this sink - */ -public class ElasticsearchSink extends RichSinkFunction { - - public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; - public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; - public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class); - - /** - * The user specified config map that we forward to Elasticsearch when we create the Client. - */ - private final Map userConfig; - - /** - * The list of nodes that the TransportClient should connect to. This is null if we are using - * an embedded Node to get a Client. - */ - private final List transportAddresses; - - /** - * The builder that is used to construct an {@link IndexRequest} from the incoming element. - */ - private final ElasticsearchSinkFunction elasticsearchSinkFunction; - - /** - * The Client that was either retrieved from a Node or is a TransportClient. - */ - private transient Client client; - - /** - * Bulk processor that was created using the client - */ - private transient BulkProcessor bulkProcessor; - - /** - * Bulk {@link org.elasticsearch.action.ActionRequest} indexer - */ - private transient RequestIndexer requestIndexer; - - /** - * This is set from inside the BulkProcessor listener if there where failures in processing. - */ - private final AtomicBoolean hasFailure = new AtomicBoolean(false); - - /** - * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing. - */ - private final AtomicReference failureThrowable = new AtomicReference<>(); - - /** - * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient. - * - * @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor - * @param transportAddresses The Elasticsearch Nodes to which to connect using a {@code TransportClient} - * @param elasticsearchSinkFunction This is used to generate the ActionRequest from the incoming element - * - */ - public ElasticsearchSink(Map userConfig, List transportAddresses, ElasticsearchSinkFunction elasticsearchSinkFunction) { - this.userConfig = userConfig; - this.elasticsearchSinkFunction = elasticsearchSinkFunction; - Preconditions.checkArgument(transportAddresses != null && transportAddresses.size() > 0); - this.transportAddresses = transportAddresses; - } - - /** - * Initializes the connection to Elasticsearch by creating a - * {@link org.elasticsearch.client.transport.TransportClient}. - */ - @Override - public void open(Configuration configuration) { - List transportNodes; - transportNodes = new ArrayList<>(transportAddresses.size()); - for (InetSocketAddress address : transportAddresses) { - transportNodes.add(new InetSocketTransportAddress(address)); - } - - Settings settings = Settings.settingsBuilder().put(userConfig).build(); - - TransportClient transportClient = TransportClient.builder().settings(settings).build(); - for (TransportAddress transport: transportNodes) { - transportClient.addTransportAddress(transport); - } - - // verify that we actually are connected to a cluster - ImmutableList nodes = ImmutableList.copyOf(transportClient.connectedNodes()); - if (nodes.isEmpty()) { - throw new RuntimeException("Client is not connected to any Elasticsearch nodes!"); - } - - client = transportClient; - - if (LOG.isInfoEnabled()) { - LOG.info("Created Elasticsearch TransportClient {}", client); - } - - BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(client, new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - if (response.hasFailures()) { - for (BulkItemResponse itemResp : response.getItems()) { - if (itemResp.isFailed()) { - LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); - failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); - } - } - hasFailure.set(true); - } - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - LOG.error(failure.getMessage()); - failureThrowable.compareAndSet(null, failure); - hasFailure.set(true); - } - }); - - // This makes flush() blocking - bulkProcessorBuilder.setConcurrentRequests(0); - - ParameterTool params = ParameterTool.fromMap(userConfig); - - if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { - bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)); - } - - if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) { - bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt( - CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB)); - } - - if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) { - bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS))); - } - - bulkProcessor = bulkProcessorBuilder.build(); - requestIndexer = new BulkProcessorIndexer(bulkProcessor); - } - - @Override - public void invoke(T element) { - elasticsearchSinkFunction.process(element, getRuntimeContext(), requestIndexer); - } - - @Override - public void close() { - if (bulkProcessor != null) { - bulkProcessor.close(); - bulkProcessor = null; - } - - if (client != null) { - client.close(); - } - - if (hasFailure.get()) { - Throwable cause = failureThrowable.get(); - if (cause != null) { - throw new RuntimeException("An error occured in ElasticsearchSink.", cause); - } else { - throw new RuntimeException("An error occured in ElasticsearchSink."); - } - } - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java deleted file mode 100644 index 55ba720..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkFunction.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch2; - -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.RuntimeContext; - -import java.io.Serializable; - -/** - * Method that creates an {@link org.elasticsearch.action.ActionRequest} from an element in a Stream. - * - *

- * This is used by {@link ElasticsearchSink} to prepare elements for sending them to Elasticsearch. - * - *

- * Example: - * - *

{@code
- *					private static class TestElasticSearchSinkFunction implements
- *						ElasticsearchSinkFunction> {
- *
- *					public IndexRequest createIndexRequest(Tuple2 element) {
- *						Map json = new HashMap<>();
- *						json.put("data", element.f1);
- *
- *						return Requests.indexRequest()
- *							.index("my-index")
- *							.type("my-type")
- *							.id(element.f0.toString())
- *							.source(json);
- *						}
- *
- *				public void process(Tuple2 element, RuntimeContext ctx, RequestIndexer indexer) {
- *					indexer.add(createIndexRequest(element));
- *				}
- *		}
- *
- * }
- * - * @param The type of the element handled by this {@code ElasticsearchSinkFunction} - */ -public interface ElasticsearchSinkFunction extends Serializable, Function { - void process(T element, RuntimeContext ctx, RequestIndexer indexer); -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java deleted file mode 100644 index 144a87b..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/RequestIndexer.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch2; - -import org.elasticsearch.action.ActionRequest; - -import java.io.Serializable; - -public interface RequestIndexer extends Serializable { - void add(ActionRequest... actionRequests); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java deleted file mode 100644 index bc9bedc..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java +++ /dev/null @@ -1,233 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch2; - -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.Requests; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.NodeBuilder; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.File; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase { - - private static final int NUM_ELEMENTS = 20; - - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); - - @Test - public void testTransportClient() throws Exception { - - File dataDir = tempFolder.newFolder(); - - Node node = NodeBuilder.nodeBuilder() - .settings(Settings.settingsBuilder() - .put("path.home", dataDir.getParent()) - .put("http.enabled", false) - .put("path.data", dataDir.getAbsolutePath())) - // set a custom cluster name to verify that user config works correctly - .clusterName("my-transport-client-cluster") - .node(); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource> source = env.addSource(new TestSourceFunction()); - - Map config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-transport-client-cluster"); - - // Can't use {@link TransportAddress} as its not Serializable in Elasticsearch 2.x - List transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - - source.addSink(new ElasticsearchSink<>(config, transports, new TestElasticsearchSinkFunction())); - - env.execute("Elasticsearch TransportClient Test"); - - // verify the results - Client client = node.client(); - for (int i = 0; i < NUM_ELEMENTS; i++) { - GetResponse response = client.get(new GetRequest("my-index", - "my-type", Integer.toString(i))).actionGet(); - Assert.assertEquals("message #" + i, response.getSource().get("data")); - } - - node.close(); - } - - @Test(expected = IllegalArgumentException.class) - public void testNullTransportClient() throws Exception { - - File dataDir = tempFolder.newFolder(); - - Node node = NodeBuilder.nodeBuilder() - .settings(Settings.settingsBuilder() - .put("path.home", dataDir.getParent()) - .put("http.enabled", false) - .put("path.data", dataDir.getAbsolutePath())) - // set a custom cluster name to verify that user config works correctly - .clusterName("my-transport-client-cluster") - .node(); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource> source = env.addSource(new TestSourceFunction()); - - Map config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-transport-client-cluster"); - - source.addSink(new ElasticsearchSink<>(config, null, new TestElasticsearchSinkFunction())); - - env.execute("Elasticsearch TransportClient Test"); - - // verify the results - Client client = node.client(); - for (int i = 0; i < NUM_ELEMENTS; i++) { - GetResponse response = client.get(new GetRequest("my-index", - "my-type", Integer.toString(i))).actionGet(); - Assert.assertEquals("message #" + i, response.getSource().get("data")); - } - - node.close(); - } - - @Test(expected = IllegalArgumentException.class) - public void testEmptyTransportClient() throws Exception { - - File dataDir = tempFolder.newFolder(); - - Node node = NodeBuilder.nodeBuilder() - .settings(Settings.settingsBuilder() - .put("path.home", dataDir.getParent()) - .put("http.enabled", false) - .put("path.data", dataDir.getAbsolutePath())) - // set a custom cluster name to verify that user config works correctly - .clusterName("my-transport-client-cluster") - .node(); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource> source = env.addSource(new TestSourceFunction()); - - Map config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-transport-client-cluster"); - - source.addSink(new ElasticsearchSink<>(config, new ArrayList(), new TestElasticsearchSinkFunction())); - - env.execute("Elasticsearch TransportClient Test"); - - // verify the results - Client client = node.client(); - for (int i = 0; i < NUM_ELEMENTS; i++) { - GetResponse response = client.get(new GetRequest("my-index", - "my-type", Integer.toString(i))).actionGet(); - Assert.assertEquals("message #" + i, response.getSource().get("data")); - } - - node.close(); - } - - @Test(expected = JobExecutionException.class) - public void testTransportClientFails() throws Exception{ - // this checks whether the TransportClient fails early when there is no cluster to - // connect to. There isn't a similar test for the Node Client version since that - // one will block and wait for a cluster to come online - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource> source = env.addSource(new TestSourceFunction()); - - Map config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - config.put("cluster.name", "my-node-client-cluster"); - - List transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - - source.addSink(new ElasticsearchSink<>(config, transports, new TestElasticsearchSinkFunction())); - - env.execute("Elasticsearch Node Client Test"); - } - - private static class TestSourceFunction implements SourceFunction> { - private static final long serialVersionUID = 1L; - - private volatile boolean running = true; - - @Override - public void run(SourceContext> ctx) throws Exception { - for (int i = 0; i < NUM_ELEMENTS && running; i++) { - ctx.collect(Tuple2.of(i, "message #" + i)); - } - } - - @Override - public void cancel() { - running = false; - } - } - - private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction> { - private static final long serialVersionUID = 1L; - - public IndexRequest createIndexRequest(Tuple2 element) { - Map json = new HashMap<>(); - json.put("data", element.f1); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .id(element.f0.toString()) - .source(json); - } - - @Override - public void process(Tuple2 element, RuntimeContext ctx, RequestIndexer indexer) { - indexer.add(createIndexRequest(element)); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java deleted file mode 100644 index 05760e8..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/examples/ElasticsearchExample.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.elasticsearch2.examples; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink; -import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSinkFunction; -import org.apache.flink.streaming.connectors.elasticsearch2.RequestIndexer; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that - * you have a cluster named "elasticsearch" running or change the name of cluster in the config map. - */ -public class ElasticsearchExample { - - public static void main(String[] args) throws Exception { - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - SingleOutputStreamOperator source = - env.generateSequence(0, 20).map(new MapFunction() { - /** - * The mapping method. Takes an element from the input data set and transforms - * it into exactly one element. - * - * @param value The input value. - * @return The transformed value - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ - @Override - public String map(Long value) throws Exception { - return "message #" + value; - } - }); - - Map config = new HashMap<>(); - // This instructs the sink to emit after every element, otherwise they would be buffered - config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); - - List transports = new ArrayList<>(); - transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); - - source.addSink(new ElasticsearchSink<>(config, transports, new ElasticsearchSinkFunction(){ - @Override - public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { - indexer.add(createIndexRequest(element)); - } - })); - - env.execute("Elasticsearch Example"); - } - - private static IndexRequest createIndexRequest(String element) { - Map json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index("my-index") - .type("my-type") - .id(element) - .source(json); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties deleted file mode 100644 index dc20726..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,27 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=OFF, testlogger - -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml deleted file mode 100644 index 7a077c2..0000000 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/resources/logback-test.xml +++ /dev/null @@ -1,30 +0,0 @@ - - - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n - - - - - - - - \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/pom.xml b/flink-streaming-connectors/flink-connector-filesystem/pom.xml deleted file mode 100644 index 20c48c6..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/pom.xml +++ /dev/null @@ -1,163 +0,0 @@ - - - - - 4.0.0 - - - org.apache.flink - flink-streaming-connectors - 1.2-SNAPSHOT - .. - - - flink-connector-filesystem_2.10 - flink-connector-filesystem - - jar - - - - - - - - - org.apache.flink - flink-streaming-java_2.10 - ${project.version} - provided - - - - org.apache.flink - flink-shaded-hadoop2 - ${project.version} - provided - - - - - - org.apache.flink - flink-test-utils-junit - ${project.version} - test - - - - org.apache.flink - flink-test-utils_2.10 - ${project.version} - test - - - - org.apache.flink - flink-hadoop-compatibility_2.10 - ${project.version} - test - - - - org.apache.flink - flink-streaming-java_2.10 - ${project.version} - test - test-jar - - - - org.apache.flink - flink-tests_2.10 - ${project.version} - test - test-jar - - - - org.apache.flink - flink-runtime_2.10 - ${project.version} - test - test-jar - - - - - org.apache.hadoop - hadoop-hdfs - test - test-jar - ${hadoop.version} - - - - org.apache.hadoop - hadoop-common - test - test-jar - ${hadoop.version} - - - - org.apache.hadoop - hadoop-minikdc - ${minikdc.version} - test - - - - - - - - - - org.apache.felix - maven-bundle-plugin - 3.0.1 - true - true - - - - org.apache.maven.plugins - maven-surefire-plugin - - - 1 - false - - - - - - http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java deleted file mode 100644 index 3e3c86b..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java +++ /dev/null @@ -1,309 +0,0 @@ -package org.apache.flink.streaming.connectors.fs; - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.IOException; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.Map; - -import org.apache.avro.Schema; -import org.apache.avro.file.CodecFactory; -import org.apache.avro.file.DataFileConstants; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumWriter; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.InputTypeConfigurable; -import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; - -/** -* Implementation of AvroKeyValue writer that can be used in Sink. -* Each entry would be wrapped in GenericRecord with key/value fields(same as in m/r lib) -
-Usage:
-{@code
-		BucketingSink> sink = new BucketingSink>("/tmp/path");
-		sink.setBucketer(new DateTimeBucketer>("yyyy-MM-dd/HH/mm/"));
-		sink.setPendingSuffix(".avro");
-		Map properties = new HashMap<>();
-		Schema longSchema = Schema.create(Type.LONG);
-		String keySchema = longSchema.toString();
-		String valueSchema = longSchema.toString();
-		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema);
-		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema);
-		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, Boolean.toString(true));
-		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC);
-		
-		sink.setWriter(new AvroSinkWriter(properties));
-		sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB,
-}
-
-*/ -public class AvroKeyValueSinkWriter extends StreamWriterBase> implements Writer>, InputTypeConfigurable { - private static final long serialVersionUID = 1L; - public static final String CONF_OUTPUT_KEY_SCHEMA = "avro.schema.output.key"; - public static final String CONF_OUTPUT_VALUE_SCHEMA = "avro.schema.output.value"; - public static final String CONF_COMPRESS = FileOutputFormat.COMPRESS; - public static final String CONF_COMPRESS_CODEC = FileOutputFormat.COMPRESS_CODEC; - public static final String CONF_DEFLATE_LEVEL = "avro.deflate.level"; - public static final String CONF_XZ_LEVEL = "avro.xz.level"; - - private transient AvroKeyValueWriter keyValueWriter; - - private final Map properties; - - /** - * C'tor for the writer - *

- * You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above) - * @param properties - */ - @SuppressWarnings("deprecation") - public AvroKeyValueSinkWriter(Map properties) { - this.properties = properties; - - String keySchemaString = properties.get(CONF_OUTPUT_KEY_SCHEMA); - if (keySchemaString == null) { - throw new IllegalStateException("No key schema provided, set '" + CONF_OUTPUT_KEY_SCHEMA + "' property"); - } - Schema.parse(keySchemaString);//verifying that schema valid - - String valueSchemaString = properties.get(CONF_OUTPUT_VALUE_SCHEMA); - if (valueSchemaString == null) { - throw new IllegalStateException("No value schema provided, set '" + CONF_OUTPUT_VALUE_SCHEMA + "' property"); - } - Schema.parse(valueSchemaString);//verifying that schema valid - } - - private boolean getBoolean(Map conf, String key, boolean def) { - String value = conf.get(key); - if (value == null) { - return def; - } - return Boolean.parseBoolean(value); - } - - private int getInt(Map conf, String key, int def) { - String value = conf.get(key); - if (value == null) { - return def; - } - return Integer.parseInt(value); - } - - //this derived from AvroOutputFormatBase.getCompressionCodec(..) - private CodecFactory getCompressionCodec(Map conf) { - if (getBoolean(conf, CONF_COMPRESS, false)) { - int deflateLevel = getInt(conf, CONF_DEFLATE_LEVEL, CodecFactory.DEFAULT_DEFLATE_LEVEL); - int xzLevel = getInt(conf, CONF_XZ_LEVEL, CodecFactory.DEFAULT_XZ_LEVEL); - - String outputCodec = conf.get(CONF_COMPRESS_CODEC); - - if (DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) { - return CodecFactory.deflateCodec(deflateLevel); - } else if (DataFileConstants.XZ_CODEC.equals(outputCodec)) { - return CodecFactory.xzCodec(xzLevel); - } else { - return CodecFactory.fromString(outputCodec); - } - } - return CodecFactory.nullCodec(); - } - - @Override - @SuppressWarnings("deprecation") - public void open(FileSystem fs, Path path) throws IOException { - super.open(fs, path); - - CodecFactory compressionCodec = getCompressionCodec(properties); - Schema keySchema = Schema.parse(properties.get(CONF_OUTPUT_KEY_SCHEMA)); - Schema valueSchema = Schema.parse(properties.get(CONF_OUTPUT_VALUE_SCHEMA)); - keyValueWriter = new AvroKeyValueWriter(keySchema, valueSchema, compressionCodec, getStream()); - } - - @Override - public void close() throws IOException { - super.close();//the order is important since super.close flushes inside - if (keyValueWriter != null) { - keyValueWriter.close(); - } - } - - @Override - public long flush() throws IOException { - if (keyValueWriter != null) { - keyValueWriter.sync(); - } - return super.flush(); - } - - @Override - public void write(Tuple2 element) throws IOException { - getStream(); // Throws if the stream is not open - keyValueWriter.write(element.f0, element.f1); - } - - @Override - public void setInputType(TypeInformation type, ExecutionConfig executionConfig) { - if (!type.isTupleType()) { - throw new IllegalArgumentException("Input TypeInformation is not a tuple type."); - } - - TupleTypeInfoBase tupleType = (TupleTypeInfoBase) type; - - if (tupleType.getArity() != 2) { - throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type."); - } - } - - @Override - public Writer> duplicate() { - return new AvroKeyValueSinkWriter(properties); - } - - // taken from m/r avro lib to remove dependency on it - private static final class AvroKeyValueWriter { - /** A writer for the Avro container file. */ - private final DataFileWriter mAvroFileWriter; - - /** - * The writer schema for the generic record entries of the Avro - * container file. - */ - private final Schema mKeyValuePairSchema; - - /** - * A reusable Avro generic record for writing key/value pairs to the - * file. - */ - private final AvroKeyValue mOutputRecord; - - AvroKeyValueWriter(Schema keySchema, Schema valueSchema, - CodecFactory compressionCodec, OutputStream outputStream, - int syncInterval) throws IOException { - // Create the generic record schema for the key/value pair. - mKeyValuePairSchema = AvroKeyValue - .getSchema(keySchema, valueSchema); - - // Create an Avro container file and a writer to it. - DatumWriter genericDatumWriter = new GenericDatumWriter( - mKeyValuePairSchema); - mAvroFileWriter = new DataFileWriter( - genericDatumWriter); - mAvroFileWriter.setCodec(compressionCodec); - mAvroFileWriter.setSyncInterval(syncInterval); - mAvroFileWriter.create(mKeyValuePairSchema, outputStream); - - // Create a reusable output record. - mOutputRecord = new AvroKeyValue( - new GenericData.Record(mKeyValuePairSchema)); - } - - AvroKeyValueWriter(Schema keySchema, Schema valueSchema, - CodecFactory compressionCodec, OutputStream outputStream) - throws IOException { - this(keySchema, valueSchema, compressionCodec, outputStream, - DataFileConstants.DEFAULT_SYNC_INTERVAL); - } - - void write(K key, V value) throws IOException { - mOutputRecord.setKey(key); - mOutputRecord.setValue(value); - mAvroFileWriter.append(mOutputRecord.get()); - } - - void close() throws IOException { - mAvroFileWriter.close(); - } - - long sync() throws IOException { - return mAvroFileWriter.sync(); - } - } - - // taken from AvroKeyValue avro-mapr lib - public static class AvroKeyValue { - /** The name of the key value pair generic record. */ - public static final String KEY_VALUE_PAIR_RECORD_NAME = "KeyValuePair"; - - /** The namespace of the key value pair generic record. */ - public static final String KEY_VALUE_PAIR_RECORD_NAMESPACE = "org.apache.avro.mapreduce"; - - /** The name of the generic record field containing the key. */ - public static final String KEY_FIELD = "key"; - - /** The name of the generic record field containing the value. */ - public static final String VALUE_FIELD = "value"; - - /** The key/value generic record wrapped by this class. */ - public final GenericRecord mKeyValueRecord; - - /** - * Wraps a GenericRecord that is a key value pair. - */ - public AvroKeyValue(GenericRecord keyValueRecord) { - mKeyValueRecord = keyValueRecord; - } - - public GenericRecord get() { - return mKeyValueRecord; - } - - public void setKey(K key) { - mKeyValueRecord.put(KEY_FIELD, key); - } - - public void setValue(V value) { - mKeyValueRecord.put(VALUE_FIELD, value); - } - - @SuppressWarnings("unchecked") - public K getKey() { - return (K) mKeyValueRecord.get(KEY_FIELD); - } - - @SuppressWarnings("unchecked") - public V getValue() { - return (V) mKeyValueRecord.get(VALUE_FIELD); - } - - /** - * Creates a KeyValuePair generic record schema. - * - * @return A schema for a generic record with two fields: 'key' and - * 'value'. - */ - public static Schema getSchema(Schema keySchema, Schema valueSchema) { - Schema schema = Schema.createRecord(KEY_VALUE_PAIR_RECORD_NAME, - "A key/value pair", KEY_VALUE_PAIR_RECORD_NAMESPACE, false); - schema.setFields(Arrays.asList(new Schema.Field(KEY_FIELD, - keySchema, "The key", null), new Schema.Field(VALUE_FIELD, - valueSchema, "The value", null))); - return schema; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java deleted file mode 100644 index 24ad6ab..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs; - -import org.apache.hadoop.fs.Path; - -import java.io.Serializable; - -/** - * A bucketer is used with a {@link RollingSink} - * to put emitted elements into rolling files. - * - *

- * The {@code RollingSink} has one active bucket that it is writing to at a time. Whenever - * a new element arrives it will ask the {@code Bucketer} if a new bucket should be started and - * the old one closed. The {@code Bucketer} can, for example, decide to start new buckets - * based on system time. - * - * @deprecated use {@link org.apache.flink.streaming.connectors.fs.bucketing.Bucketer} instead. - */ -@Deprecated -public interface Bucketer extends Serializable { - - /** - * Returns {@code true} when a new bucket should be started. - * - * @param currentBucketPath The bucket {@code Path} that is currently being used. - */ - boolean shouldStartNewBucket(Path basePath, Path currentBucketPath); - - /** - * Returns the {@link Path} of a new bucket file. - * - * @param basePath The base path containing all the buckets. - * - * @return The complete new {@code Path} of the new bucket. This should include the {@code basePath} - * and also the {@code subtaskIndex} tp avoid clashes with parallel sinks. - */ - Path getNextBucketPath(Path basePath); -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java deleted file mode 100644 index 174707c..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs; - - -/** - * A clock that can provide the current time. - * - *

- * Normally this would be system time, but for testing a custom {@code Clock} can be provided. - */ -public interface Clock { - - /** - * Return the current system time in milliseconds. - */ - public long currentTimeMillis(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java deleted file mode 100644 index 0df8998..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs; - -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.text.SimpleDateFormat; -import java.util.Date; - -/** - * A {@link Bucketer} that assigns to buckets based on current system time. - * - *

- * The {@code DateTimeBucketer} will create directories of the following form: - * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path - * that was specified as a base path when creating the - * {@link RollingSink}. The {@code dateTimePath} - * is determined based on the current system time and the user provided format string. - * - *

- * {@link SimpleDateFormat} is used to derive a date string from the current system time and - * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling - * files will have a granularity of hours. - * - * - *

- * Example: - * - *

{@code
- *     Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
- * }
- * - * This will create for example the following bucket path: - * {@code /base/1976-12-31-14/} - * - * @deprecated use {@link org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer} instead. - */ -@Deprecated -public class DateTimeBucketer implements Bucketer { - - private static Logger LOG = LoggerFactory.getLogger(DateTimeBucketer.class); - - private static final long serialVersionUID = 1L; - - private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH"; - - // We have this so that we can manually set it for tests. - private static Clock clock = new SystemClock(); - - private final String formatString; - - private transient SimpleDateFormat dateFormatter; - - /** - * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}. - */ - public DateTimeBucketer() { - this(DEFAULT_FORMAT_STRING); - } - - /** - * Creates a new {@code DateTimeBucketer} with the given date/time format string. - * - * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine - * the bucket path. - */ - public DateTimeBucketer(String formatString) { - this.formatString = formatString; - - this.dateFormatter = new SimpleDateFormat(formatString); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - - this.dateFormatter = new SimpleDateFormat(formatString); - } - - - @Override - public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) { - String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis())); - return !(new Path(basePath, newDateTimeString).equals(currentBucketPath)); - } - - @Override - public Path getNextBucketPath(Path basePath) { - String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis())); - return new Path(basePath + "/" + newDateTimeString); - } - - @Override - public String toString() { - return "DateTimeBucketer{" + - "formatString='" + formatString + '\'' + - '}'; - } - - /** - * This sets the internal {@link Clock} implementation. This method should only be used for testing - * - * @param newClock The new clock to set. - */ - public static void setClock(Clock newClock) { - clock = newClock; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java deleted file mode 100644 index 6854596..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs; - -import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer; -import org.apache.hadoop.fs.Path; - -/** - * A {@link Bucketer} that does not perform any - * rolling of files. All files are written to the base path. - * - * @deprecated use {@link BasePathBucketer} instead. - */ -@Deprecated -public class NonRollingBucketer implements Bucketer { - private static final long serialVersionUID = 1L; - - @Override - public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) { - return false; - } - - @Override - public Path getNextBucketPath(Path basePath) { - return basePath; - } - - @Override - public String toString() { - return "NonRollingBucketer"; - } -}