flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Flink-Elasticsearch connector support for elasticsearch 2.0
Date Fri, 04 Dec 2015 20:04:18 GMT
shouldn't be better to have both connectors for ES?one for 1.x and another
for 2.x?
On 4 Dec 2015 20:55, "Madhukar Thota" <madhukar.thota@gmail.com> wrote:

> Sure. I can submit the pull request.
>
> On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels <mxm@apache.org>
> wrote:
>
>> Hi Madhu,
>>
>> Great. Do you want to contribute it back via a GitHub pull request? If
>> not that's also fine. We will try look into the 2.0 connector next
>> week.
>>
>> Best,
>> Max
>>
>> On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota <madhukar.thota@gmail.com>
>> wrote:
>> > i have created working connector for Elasticsearch 2.0 based on
>> > elasticsearch-flink connector. I am using it right now but i want
>> official
>> > connector from flink.
>> >
>> > ElasticsearchSink.java
>> >
>> >
>> > import org.apache.flink.api.java.utils.ParameterTool;
>> > import org.apache.flink.configuration.Configuration;
>> > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
>> > import org.slf4j.Logger;
>> > import org.slf4j.LoggerFactory;
>> >
>> > import java.net.InetAddress;
>> > import java.net.UnknownHostException;
>> > import java.util.List;
>> > import java.util.Map;
>> > import java.util.concurrent.atomic.AtomicBoolean;
>> > import java.util.concurrent.atomic.AtomicReference;
>> >
>> > import org.elasticsearch.action.bulk.BulkItemResponse;
>> > import org.elasticsearch.action.bulk.BulkProcessor;
>> > import org.elasticsearch.action.bulk.BulkRequest;
>> > import org.elasticsearch.action.bulk.BulkResponse;
>> > import org.elasticsearch.action.index.IndexRequest;
>> > import org.elasticsearch.client.Client;
>> > import org.elasticsearch.client.transport.TransportClient;
>> > import org.elasticsearch.cluster.node.DiscoveryNode;
>> > import org.elasticsearch.common.settings.Settings;
>> > import org.elasticsearch.common.transport.InetSocketTransportAddress;
>> > import org.elasticsearch.common.unit.ByteSizeUnit;
>> > import org.elasticsearch.common.unit.ByteSizeValue;
>> > import org.elasticsearch.common.unit.TimeValue;
>> >
>> >
>> > public class ElasticsearchSink<T> extends RichSinkFunction<T> {
>> >
>> >     public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
>> > "bulk.flush.max.actions";
>> >     public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
>> > "bulk.flush.max.size.mb";
>> >     public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
>> > "bulk.flush.interval.ms";
>> >
>> >     private static final long serialVersionUID = 1L;
>> >     private static final int DEFAULT_PORT = 9300;
>> >     private static final Logger LOG =
>> > LoggerFactory.getLogger(ElasticsearchSink.class);
>> >
>> >     /**
>> >      * The user specified config map that we forward to Elasticsearch
>> when
>> > we create the Client.
>> >      */
>> >     private final Map<String, String> userConfig;
>> >
>> >     /**
>> >      * The builder that is used to construct an {@link IndexRequest}
>> from
>> > the incoming element.
>> >      */
>> >     private final IndexRequestBuilder<T> indexRequestBuilder;
>> >
>> >     /**
>> >      * The Client that was either retrieved from a Node or is a
>> > TransportClient.
>> >      */
>> >     private transient Client client;
>> >
>> >     /**
>> >      * Bulk processor that was created using the client
>> >      */
>> >     private transient BulkProcessor bulkProcessor;
>> >
>> >     /**
>> >      * This is set from inside the BulkProcessor listener if there where
>> > failures in processing.
>> >      */
>> >     private final AtomicBoolean hasFailure = new AtomicBoolean(false);
>> >
>> >     /**
>> >      * This is set from inside the BulkProcessor listener if a
>> Throwable was
>> > thrown during processing.
>> >      */
>> >     private final AtomicReference<Throwable> failureThrowable = new
>> > AtomicReference<Throwable>();
>> >
>> >     public ElasticsearchSink(Map<String, String> userConfig,
>> > IndexRequestBuilder<T> indexRequestBuilder) {
>> >         this.userConfig = userConfig;
>> >         this.indexRequestBuilder = indexRequestBuilder;
>> >     }
>> >
>> >
>> >     @Override
>> >     public void open(Configuration configuration) {
>> >
>> >         ParameterTool params = ParameterTool.fromMap(userConfig);
>> >         Settings settings = Settings.settingsBuilder()
>> >                 .put(userConfig)
>> >                 .build();
>> >
>> >         TransportClient transportClient =
>> > TransportClient.builder().settings(settings).build();
>> >         for (String server : params.get("esHost").split(";"))
>> >         {
>> >             String[] components = server.trim().split(":");
>> >             String host = components[0];
>> >             int port = DEFAULT_PORT;
>> >             if (components.length > 1)
>> >             {
>> >                 port = Integer.parseInt(components[1]);
>> >             }
>> >
>> >             try {
>> >                 transportClient =
>> transportClient.addTransportAddress(new
>> > InetSocketTransportAddress(InetAddress.getByName(host), port));
>> >             } catch (UnknownHostException e) {
>> >                 e.printStackTrace();
>> >             }
>> >         }
>> >
>> >         List<DiscoveryNode> nodes = transportClient.connectedNodes();
>> >         if (nodes.isEmpty()) {
>> >             throw new RuntimeException("Client is not connected to any
>> > Elasticsearch nodes!");
>> >         } else {
>> >             if (LOG.isDebugEnabled()) {
>> >                 LOG.info("Connected to nodes: " + nodes.toString());
>> >             }
>> >         }
>> >         client = transportClient;
>> >
>> >         BulkProcessor.Builder bulkProcessorBuilder =
>> BulkProcessor.builder(
>> >                 client,
>> >                 new BulkProcessor.Listener() {
>> >                     public void beforeBulk(long executionId,
>> >                                            BulkRequest request) {
>> >
>> >                     }
>> >
>> >                     public void afterBulk(long executionId,
>> >                                           BulkRequest request,
>> >                                           BulkResponse response) {
>> >                         if (response.hasFailures()) {
>> >                             for (BulkItemResponse itemResp :
>> > response.getItems()) {
>> >                                 if (itemResp.isFailed()) {
>> >                                     LOG.error("Failed to index document
>> in
>> > Elasticsearch: " + itemResp.getFailureMessage());
>> >
>>  failureThrowable.compareAndSet(null, new
>> > RuntimeException(itemResp.getFailureMessage()));
>> >                                 }
>> >                             }
>> >                             hasFailure.set(true);
>> >                         }
>> >                     }
>> >
>> >                     public void afterBulk(long executionId,
>> >                                           BulkRequest request,
>> >                                           Throwable failure) {
>> >                         LOG.error(failure.getMessage());
>> >                         failureThrowable.compareAndSet(null, failure);
>> >                         hasFailure.set(true);
>> >                     }
>> >                 });
>> >
>> >         // This makes flush() blocking
>> >         bulkProcessorBuilder.setConcurrentRequests(0);
>> >
>> >
>> >
>> >         if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
>> >
>> >
>> bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
>> >         }
>> >
>> >         if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
>> >             bulkProcessorBuilder.setBulkSize(new
>> > ByteSizeValue(params.getInt(
>> >                     CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB),
>> ByteSizeUnit.MB));
>> >         }
>> >
>> >         if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
>> >
>> >
>> bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
>> >         }
>> >
>> >         bulkProcessor = bulkProcessorBuilder.build();
>> >     }
>> >
>> >
>> >     @Override
>> >     public void invoke(T element) {
>> >         IndexRequest indexRequest =
>> > indexRequestBuilder.createIndexRequest(element, getRuntimeContext());
>> >
>> >         if (LOG.isDebugEnabled()) {
>> >             LOG.debug("Emitting IndexRequest: {}", indexRequest);
>> >         }
>> >
>> >         bulkProcessor.add(indexRequest);
>> >     }
>> >
>> >     @Override
>> >     public void close() {
>> >         if (bulkProcessor != null) {
>> >             bulkProcessor.close();
>> >             bulkProcessor = null;
>> >         }
>> >
>> >         if (client != null) {
>> >             client.close();
>> >         }
>> >
>> >         if (hasFailure.get()) {
>> >             Throwable cause = failureThrowable.get();
>> >             if (cause != null) {
>> >                 throw new RuntimeException("An error occured in
>> > ElasticsearchSink.", cause);
>> >             } else {
>> >                 throw new RuntimeException("An error occured in
>> > ElasticsearchSink.");
>> >
>> >             }
>> >         }
>> >     }
>> >
>> > }
>> >
>> >
>> > In my Main Class:
>> >
>> >
>> > Map<String, String> config = Maps.newHashMap();
>> >
>> > //Elasticsearch Parameters
>> >
>> > config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS,
>> > parameter.get("elasticsearch.bulk.flush.max.actions","1"));
>> > config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS,
>> > parameter.get("elasticsearch.bulk.flush.interval.ms","2"));
>> > config.put("cluster.name", parameter.get("elasticsearch.cluster.name
>> "));
>> > config.put("esHost", parameter.get("elasticsearch.server",
>> > "localhost:9300"));
>> >
>> >
>> > DataStreamSink<String> elastic = messageStream.rebalance().addSink(new
>> > ElasticsearchSink<>(config, (IndexRequestBuilder<String>) (element,
>> > runtimeContext) -> {
>> >     String[] line = element.toLowerCase().split("
>> > +(?=(?:([^\"]*\"){2})*[^\"]*$)");
>> >     String measureAndTags = line[0];
>> >     String[] kvSplit = line[1].split("=");
>> >     String fieldName = kvSplit[0];
>> >     String fieldValue = kvSplit[1];
>> >     Map<String, String> tags = new HashMap<>();
>> >     String measure = parseMeasureAndTags(measureAndTags, tags);
>> >     long time = (long) (Double.valueOf(line[2]) / 1000000);
>> >
>> >     Map<String, Object> test = new HashMap<>();
>> >     DateFormat dateFormat = new
>> > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>> >     dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
>> >
>> >     test.put(fieldName, setValue(fieldValue));
>> >     test.put("tags", tags);
>> >     test.put("measurement", measure);
>> >     test.put("@timestamp", dateFormat.format(new Date(time)));
>> >
>> >     return Requests.indexRequest()
>> >             .index("metrics")
>> >             .type("test")
>> >             .source(new Gson().toJson(test).toLowerCase());
>> >
>> >
>> > }));
>> >
>> >
>> > -Madhu
>> >
>> >
>> > On Fri, Dec 4, 2015 at 9:18 AM, Maximilian Michels <mxm@apache.org>
>> wrote:
>> >>
>> >> Hi Madhu,
>> >>
>> >> Not yet. The API has changed slightly. We'll add one very soon. In the
>> >> meantime I've created an issue to keep track of the status:
>> >>
>> >> https://issues.apache.org/jira/browse/FLINK-3115
>> >>
>> >> Thanks,
>> >> Max
>> >>
>> >> On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota
>> >> <madhukar.thota@gmail.com> wrote:
>> >> > is current elasticsearch-flink connector support elasticsearch 2.x
>> >> > version?
>> >> >
>> >> > -Madhu
>> >
>> >
>>
>
>

Mime
View raw message