flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Madhukar Thota <madhukar.th...@gmail.com>
Subject Re: Flink-Elasticsearch connector support for elasticsearch 2.0
Date Fri, 04 Dec 2015 15:16:45 GMT
i have created working connector for Elasticsearch 2.0 based on
elasticsearch-flink connector. I am using it right now but i want official
connector from flink.

ElasticsearchSink.java


import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;


public class ElasticsearchSink<T> extends RichSinkFunction<T> {

    public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
"bulk.flush.max.actions";
    public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
"bulk.flush.max.size.mb";
    public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
"bulk.flush.interval.ms";

    private static final long serialVersionUID = 1L;
    private static final int DEFAULT_PORT = 9300;
    private static final Logger LOG =
LoggerFactory.getLogger(ElasticsearchSink.class);

    /**
     * The user specified config map that we forward to Elasticsearch
when we create the Client.
     */
    private final Map<String, String> userConfig;

    /**
     * The builder that is used to construct an {@link IndexRequest}
from the incoming element.
     */
    private final IndexRequestBuilder<T> indexRequestBuilder;

    /**
     * The Client that was either retrieved from a Node or is a TransportClient.
     */
    private transient Client client;

    /**
     * Bulk processor that was created using the client
     */
    private transient BulkProcessor bulkProcessor;

    /**
     * This is set from inside the BulkProcessor listener if there
where failures in processing.
     */
    private final AtomicBoolean hasFailure = new AtomicBoolean(false);

    /**
     * This is set from inside the BulkProcessor listener if a
Throwable was thrown during processing.
     */
    private final AtomicReference<Throwable> failureThrowable = new
AtomicReference<Throwable>();

    public ElasticsearchSink(Map<String, String> userConfig,
IndexRequestBuilder<T> indexRequestBuilder) {
        this.userConfig = userConfig;
        this.indexRequestBuilder = indexRequestBuilder;
    }


    @Override
    public void open(Configuration configuration) {

        ParameterTool params = ParameterTool.fromMap(userConfig);
        Settings settings = Settings.settingsBuilder()
                .put(userConfig)
                .build();

        TransportClient transportClient =
TransportClient.builder().settings(settings).build();
        for (String server : params.get("esHost").split(";"))
        {
            String[] components = server.trim().split(":");
            String host = components[0];
            int port = DEFAULT_PORT;
            if (components.length > 1)
            {
                port = Integer.parseInt(components[1]);
            }

            try {
                transportClient =
transportClient.addTransportAddress(new
InetSocketTransportAddress(InetAddress.getByName(host), port));
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
        }

        List<DiscoveryNode> nodes = transportClient.connectedNodes();
        if (nodes.isEmpty()) {
            throw new RuntimeException("Client is not connected to any
Elasticsearch nodes!");
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.info("Connected to nodes: " + nodes.toString());
            }
        }
        client = transportClient;

        BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
                client,
                new BulkProcessor.Listener() {
                    public void beforeBulk(long executionId,
                                           BulkRequest request) {

                    }

                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          BulkResponse response) {
                        if (response.hasFailures()) {
                            for (BulkItemResponse itemResp :
response.getItems()) {
                                if (itemResp.isFailed()) {
                                    LOG.error("Failed to index
document in Elasticsearch: " + itemResp.getFailureMessage());

failureThrowable.compareAndSet(null, new
RuntimeException(itemResp.getFailureMessage()));
                                }
                            }
                            hasFailure.set(true);
                        }
                    }

                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          Throwable failure) {
                        LOG.error(failure.getMessage());
                        failureThrowable.compareAndSet(null, failure);
                        hasFailure.set(true);
                    }
                });

        // This makes flush() blocking
        bulkProcessorBuilder.setConcurrentRequests(0);



        if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
            bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
        }

        if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
            bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt(
                    CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
        }

        if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
            bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
        }

        bulkProcessor = bulkProcessorBuilder.build();
    }


    @Override
    public void invoke(T element) {
        IndexRequest indexRequest =
indexRequestBuilder.createIndexRequest(element, getRuntimeContext());

        if (LOG.isDebugEnabled()) {
            LOG.debug("Emitting IndexRequest: {}", indexRequest);
        }

        bulkProcessor.add(indexRequest);
    }

    @Override
    public void close() {
        if (bulkProcessor != null) {
            bulkProcessor.close();
            bulkProcessor = null;
        }

        if (client != null) {
            client.close();
        }

        if (hasFailure.get()) {
            Throwable cause = failureThrowable.get();
            if (cause != null) {
                throw new RuntimeException("An error occured in
ElasticsearchSink.", cause);
            } else {
                throw new RuntimeException("An error occured in
ElasticsearchSink.");

            }
        }
    }

}


In my Main Class:


Map<String, String> config = Maps.newHashMap();

//Elasticsearch Parameters

config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS,
parameter.get("elasticsearch.bulk.flush.max.actions","1"));
config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS,
parameter.get("elasticsearch.bulk.flush.interval.ms","2"));
config.put("cluster.name", parameter.get("elasticsearch.cluster.name"));
config.put("esHost", parameter.get("elasticsearch.server", "localhost:9300"));


DataStreamSink<String> elastic = messageStream.rebalance().addSink(new
ElasticsearchSink<>(config, (IndexRequestBuilder<String>) (element,
runtimeContext) -> {
    String[] line = element.toLowerCase().split("
+(?=(?:([^\"]*\"){2})*[^\"]*$)");
    String measureAndTags = line[0];
    String[] kvSplit = line[1].split("=");
    String fieldName = kvSplit[0];
    String fieldValue = kvSplit[1];
    Map<String, String> tags = new HashMap<>();
    String measure = parseMeasureAndTags(measureAndTags, tags);
    long time = (long) (Double.valueOf(line[2]) / 1000000);

    Map<String, Object> test = new HashMap<>();
    DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));

    test.put(fieldName, setValue(fieldValue));
    test.put("tags", tags);
    test.put("measurement", measure);
    test.put("@timestamp", dateFormat.format(new Date(time)));

    return Requests.indexRequest()
            .index("metrics")
            .type("test")
            .source(new Gson().toJson(test).toLowerCase());


}));


-Madhu


On Fri, Dec 4, 2015 at 9:18 AM, Maximilian Michels <mxm@apache.org> wrote:

> Hi Madhu,
>
> Not yet. The API has changed slightly. We'll add one very soon. In the
> meantime I've created an issue to keep track of the status:
>
> https://issues.apache.org/jira/browse/FLINK-3115
>
> Thanks,
> Max
>
> On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota
> <madhukar.thota@gmail.com> wrote:
> > is current elasticsearch-flink connector support elasticsearch 2.x
> version?
> >
> > -Madhu
>

Mime
View raw message