flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Madhukar Thota <madhukar.th...@gmail.com>
Subject Re: Flink-Elasticsearch connector support for elasticsearch 2.0
Date Fri, 04 Dec 2015 19:55:37 GMT
Sure. I can submit the pull request.

On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels <mxm@apache.org> wrote:

> Hi Madhu,
>
> Great. Do you want to contribute it back via a GitHub pull request? If
> not that's also fine. We will try look into the 2.0 connector next
> week.
>
> Best,
> Max
>
> On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota <madhukar.thota@gmail.com>
> wrote:
> > i have created working connector for Elasticsearch 2.0 based on
> > elasticsearch-flink connector. I am using it right now but i want
> official
> > connector from flink.
> >
> > ElasticsearchSink.java
> >
> >
> > import org.apache.flink.api.java.utils.ParameterTool;
> > import org.apache.flink.configuration.Configuration;
> > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
> > import org.slf4j.Logger;
> > import org.slf4j.LoggerFactory;
> >
> > import java.net.InetAddress;
> > import java.net.UnknownHostException;
> > import java.util.List;
> > import java.util.Map;
> > import java.util.concurrent.atomic.AtomicBoolean;
> > import java.util.concurrent.atomic.AtomicReference;
> >
> > import org.elasticsearch.action.bulk.BulkItemResponse;
> > import org.elasticsearch.action.bulk.BulkProcessor;
> > import org.elasticsearch.action.bulk.BulkRequest;
> > import org.elasticsearch.action.bulk.BulkResponse;
> > import org.elasticsearch.action.index.IndexRequest;
> > import org.elasticsearch.client.Client;
> > import org.elasticsearch.client.transport.TransportClient;
> > import org.elasticsearch.cluster.node.DiscoveryNode;
> > import org.elasticsearch.common.settings.Settings;
> > import org.elasticsearch.common.transport.InetSocketTransportAddress;
> > import org.elasticsearch.common.unit.ByteSizeUnit;
> > import org.elasticsearch.common.unit.ByteSizeValue;
> > import org.elasticsearch.common.unit.TimeValue;
> >
> >
> > public class ElasticsearchSink<T> extends RichSinkFunction<T> {
> >
> >     public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS =
> > "bulk.flush.max.actions";
> >     public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB =
> > "bulk.flush.max.size.mb";
> >     public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS =
> > "bulk.flush.interval.ms";
> >
> >     private static final long serialVersionUID = 1L;
> >     private static final int DEFAULT_PORT = 9300;
> >     private static final Logger LOG =
> > LoggerFactory.getLogger(ElasticsearchSink.class);
> >
> >     /**
> >      * The user specified config map that we forward to Elasticsearch
> when
> > we create the Client.
> >      */
> >     private final Map<String, String> userConfig;
> >
> >     /**
> >      * The builder that is used to construct an {@link IndexRequest} from
> > the incoming element.
> >      */
> >     private final IndexRequestBuilder<T> indexRequestBuilder;
> >
> >     /**
> >      * The Client that was either retrieved from a Node or is a
> > TransportClient.
> >      */
> >     private transient Client client;
> >
> >     /**
> >      * Bulk processor that was created using the client
> >      */
> >     private transient BulkProcessor bulkProcessor;
> >
> >     /**
> >      * This is set from inside the BulkProcessor listener if there where
> > failures in processing.
> >      */
> >     private final AtomicBoolean hasFailure = new AtomicBoolean(false);
> >
> >     /**
> >      * This is set from inside the BulkProcessor listener if a Throwable
> was
> > thrown during processing.
> >      */
> >     private final AtomicReference<Throwable> failureThrowable = new
> > AtomicReference<Throwable>();
> >
> >     public ElasticsearchSink(Map<String, String> userConfig,
> > IndexRequestBuilder<T> indexRequestBuilder) {
> >         this.userConfig = userConfig;
> >         this.indexRequestBuilder = indexRequestBuilder;
> >     }
> >
> >
> >     @Override
> >     public void open(Configuration configuration) {
> >
> >         ParameterTool params = ParameterTool.fromMap(userConfig);
> >         Settings settings = Settings.settingsBuilder()
> >                 .put(userConfig)
> >                 .build();
> >
> >         TransportClient transportClient =
> > TransportClient.builder().settings(settings).build();
> >         for (String server : params.get("esHost").split(";"))
> >         {
> >             String[] components = server.trim().split(":");
> >             String host = components[0];
> >             int port = DEFAULT_PORT;
> >             if (components.length > 1)
> >             {
> >                 port = Integer.parseInt(components[1]);
> >             }
> >
> >             try {
> >                 transportClient = transportClient.addTransportAddress(new
> > InetSocketTransportAddress(InetAddress.getByName(host), port));
> >             } catch (UnknownHostException e) {
> >                 e.printStackTrace();
> >             }
> >         }
> >
> >         List<DiscoveryNode> nodes = transportClient.connectedNodes();
> >         if (nodes.isEmpty()) {
> >             throw new RuntimeException("Client is not connected to any
> > Elasticsearch nodes!");
> >         } else {
> >             if (LOG.isDebugEnabled()) {
> >                 LOG.info("Connected to nodes: " + nodes.toString());
> >             }
> >         }
> >         client = transportClient;
> >
> >         BulkProcessor.Builder bulkProcessorBuilder =
> BulkProcessor.builder(
> >                 client,
> >                 new BulkProcessor.Listener() {
> >                     public void beforeBulk(long executionId,
> >                                            BulkRequest request) {
> >
> >                     }
> >
> >                     public void afterBulk(long executionId,
> >                                           BulkRequest request,
> >                                           BulkResponse response) {
> >                         if (response.hasFailures()) {
> >                             for (BulkItemResponse itemResp :
> > response.getItems()) {
> >                                 if (itemResp.isFailed()) {
> >                                     LOG.error("Failed to index document
> in
> > Elasticsearch: " + itemResp.getFailureMessage());
> >                                     failureThrowable.compareAndSet(null,
> new
> > RuntimeException(itemResp.getFailureMessage()));
> >                                 }
> >                             }
> >                             hasFailure.set(true);
> >                         }
> >                     }
> >
> >                     public void afterBulk(long executionId,
> >                                           BulkRequest request,
> >                                           Throwable failure) {
> >                         LOG.error(failure.getMessage());
> >                         failureThrowable.compareAndSet(null, failure);
> >                         hasFailure.set(true);
> >                     }
> >                 });
> >
> >         // This makes flush() blocking
> >         bulkProcessorBuilder.setConcurrentRequests(0);
> >
> >
> >
> >         if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
> >
> >
> bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
> >         }
> >
> >         if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
> >             bulkProcessorBuilder.setBulkSize(new
> > ByteSizeValue(params.getInt(
> >                     CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB),
> ByteSizeUnit.MB));
> >         }
> >
> >         if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
> >
> >
> bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
> >         }
> >
> >         bulkProcessor = bulkProcessorBuilder.build();
> >     }
> >
> >
> >     @Override
> >     public void invoke(T element) {
> >         IndexRequest indexRequest =
> > indexRequestBuilder.createIndexRequest(element, getRuntimeContext());
> >
> >         if (LOG.isDebugEnabled()) {
> >             LOG.debug("Emitting IndexRequest: {}", indexRequest);
> >         }
> >
> >         bulkProcessor.add(indexRequest);
> >     }
> >
> >     @Override
> >     public void close() {
> >         if (bulkProcessor != null) {
> >             bulkProcessor.close();
> >             bulkProcessor = null;
> >         }
> >
> >         if (client != null) {
> >             client.close();
> >         }
> >
> >         if (hasFailure.get()) {
> >             Throwable cause = failureThrowable.get();
> >             if (cause != null) {
> >                 throw new RuntimeException("An error occured in
> > ElasticsearchSink.", cause);
> >             } else {
> >                 throw new RuntimeException("An error occured in
> > ElasticsearchSink.");
> >
> >             }
> >         }
> >     }
> >
> > }
> >
> >
> > In my Main Class:
> >
> >
> > Map<String, String> config = Maps.newHashMap();
> >
> > //Elasticsearch Parameters
> >
> > config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS,
> > parameter.get("elasticsearch.bulk.flush.max.actions","1"));
> > config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS,
> > parameter.get("elasticsearch.bulk.flush.interval.ms","2"));
> > config.put("cluster.name", parameter.get("elasticsearch.cluster.name"));
> > config.put("esHost", parameter.get("elasticsearch.server",
> > "localhost:9300"));
> >
> >
> > DataStreamSink<String> elastic = messageStream.rebalance().addSink(new
> > ElasticsearchSink<>(config, (IndexRequestBuilder<String>) (element,
> > runtimeContext) -> {
> >     String[] line = element.toLowerCase().split("
> > +(?=(?:([^\"]*\"){2})*[^\"]*$)");
> >     String measureAndTags = line[0];
> >     String[] kvSplit = line[1].split("=");
> >     String fieldName = kvSplit[0];
> >     String fieldValue = kvSplit[1];
> >     Map<String, String> tags = new HashMap<>();
> >     String measure = parseMeasureAndTags(measureAndTags, tags);
> >     long time = (long) (Double.valueOf(line[2]) / 1000000);
> >
> >     Map<String, Object> test = new HashMap<>();
> >     DateFormat dateFormat = new
> > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
> >     dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
> >
> >     test.put(fieldName, setValue(fieldValue));
> >     test.put("tags", tags);
> >     test.put("measurement", measure);
> >     test.put("@timestamp", dateFormat.format(new Date(time)));
> >
> >     return Requests.indexRequest()
> >             .index("metrics")
> >             .type("test")
> >             .source(new Gson().toJson(test).toLowerCase());
> >
> >
> > }));
> >
> >
> > -Madhu
> >
> >
> > On Fri, Dec 4, 2015 at 9:18 AM, Maximilian Michels <mxm@apache.org>
> wrote:
> >>
> >> Hi Madhu,
> >>
> >> Not yet. The API has changed slightly. We'll add one very soon. In the
> >> meantime I've created an issue to keep track of the status:
> >>
> >> https://issues.apache.org/jira/browse/FLINK-3115
> >>
> >> Thanks,
> >> Max
> >>
> >> On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota
> >> <madhukar.thota@gmail.com> wrote:
> >> > is current elasticsearch-flink connector support elasticsearch 2.x
> >> > version?
> >> >
> >> > -Madhu
> >
> >
>

Mime
View raw message