edgent-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dale LaBossiere (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (EDGENT-368) Implement Elasticsearch output connector
Date Fri, 20 Jan 2017 22:23:26 GMT

    [ https://issues.apache.org/jira/browse/EDGENT-368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15832504#comment-15832504

Dale LaBossiere commented on EDGENT-368:

It seems pretty straightforward to utilize the Elasticsearch RestClient API from an Edgent
At least not much harder than whatever you've have to write to utilize the RestClient from
any application.

Is just sample code / recipies needed?

Here's a sample that adds some JsonObject TStream tuples to ES.
The class {{ESPutTuples}} implements {{Function<JsonObject,Response>}} to be used in
a {{TStream.map()}}.
Its {{Response apply(JsonObject tuple)}} is called for each tuple. The impl uses
{{RestClient.performRequest()}} to send it and returns the Response object, which Edgent adds
to the response stream.

public class ESPutSample {

  public static void main(String[] args) {
    DirectProvider dp = new DirectProvider();
    Topology top = dp.newTopology();

    TStream<String> json = top.of(
          "{ \"user\": \"dlaboss\", \"message\": \"Tuple 1 message\" }",
          "{ \"user\": \"dlaboss\", \"message\": \"Tuple 2 message\" }"
    TStream<JsonObject> tuples = json.map(JsonFunctions.fromString());
    TStream<Response> responses = tuples.map(new ESPutTuples());

    // prints
    // apply() sending endpoint=/sample/tuple/1 tuple={"user":"dlaboss","message":"Tuple 1
    // Response{requestLine=PUT /sample/tuple/1 HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1
201 Created}
    // apply() sending endpoint=/sample/tuple/2 tuple={"user":"dlaboss","message":"Tuple 2
    // Response{requestLine=PUT /sample/tuple/2 HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1
201 Created}
    // Use curl to query elasticsearch to see them:
    // curl -XGET 'http://localhost:9200/sample/tuple/0?pretty=true'
    // curl -XGET 'http://localhost:9200/sample/tuple/1?pretty=true'
  static class ESPutTuples implements Function<JsonObject,Response>, AutoCloseable {
    private static final long serialVersionUID = 1L;
    private final RestClient client;
    private final Map<String,String> params = new HashMap<>();
    private long tupCnt;
    public ESPutTuples() {
      client = RestClient.builder(new HttpHost("localhost", 9200, "http")).build();

    public void close() throws Exception {
      if (client != null) {

    public Response apply(JsonObject tuple) {
      try {
        String endpoint = "/sample/tuple/" + ++tupCnt;
        HttpEntity entity = new StringEntity(tuple.toString(), ContentType.APPLICATION_JSON);
        System.out.println("apply() sending endpoint="+endpoint+" tuple="+tuple);
        return client.performRequest("PUT", endpoint, params, entity);
      catch (IOException e) {
        throw new RuntimeException(e);

> Implement Elasticsearch output connector
> ----------------------------------------
>                 Key: EDGENT-368
>                 URL: https://issues.apache.org/jira/browse/EDGENT-368
>             Project: Edgent
>          Issue Type: New Feature
>          Components: Connectors
>            Reporter: Otis Gospodnetic
>              Labels: connector, elasticsearch
>             Fix For: Apache Edgent 1.1.0
> It would be great to be able to write data to Elasticsearch (bulk) API.
> This connector should use the ES the ES HTTP client, not TransportClient (TC has issues
with things like different client vs. server library versions, so should be avoided and the
ES HTTP client, written in Java, was written to avoid these TC issues)
> Once such connector exists one could use it not only writing to their own ES cluster,
but also services that expose the ES API, like Logsene (http://sematext.com/logsene) and others.

This message was sent by Atlassian JIRA

View raw message