flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Elasticsearch Sink - Error
Date Wed, 30 Aug 2017 07:21:49 GMT
I also had problems with ES 5.4.3 and I had to modify the connector
code...I fear that the code is compatible only up to ES 5.2 or similar..

On Wed, Aug 30, 2017 at 5:40 AM, Raj Kumar <smallthings1992@gmail.com>
wrote:

> Hi,
> I am using elasticsearch 5.4.3 version in my flink project(flink version
> 1.3.1)
> Details
> 1. Using Maven build tool.
> 2. Running from intellij IDE.
> 3. Elasticsearch is running on the local machine.
>
> Have added the following maven dependency
>
> <dependency>
>     <groupId>org.apache.flink</groupId>
>     <artifactId>flink-connector-elasticsearch5_2.10</artifactId>
>     <version>1.3.1</version>
> </dependency>
>
>
> *code added*
>
> Map<String, String> config = new HashMap<>();
>             config.put("cluster.name", "elasticsearch");
>             config.put("bulk.flush.max.actions", "1");
>
>             List<InetSocketAddress> transportAddresses = new ArrayList<>();
>             transportAddresses.add(new
> InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
>
>             alerts.addSink(new ElasticsearchSink<AggResult>(config,
> transportAddresses, new ElasticsearchSinkFunction<AggResult>() {
>                 public IndexRequest createIndexRequest(AggResult
> aggResult){
>                     Map<String, Long> json = new HashMap<>();
>                     json.put("totalCount", aggResult.getTotalCount());
>
>                     return Requests
>                             .indexRequest()
>                             .index("logdata")
>                             .type("consolidatedStreamData")
>                             .source(json);
>
>                 }
>                 @Override
>                 public void process(AggResult aggResult, RuntimeContext
> runtimeContext, RequestIndexer requestIndexer) {
>                     requestIndexer.add(createIndexRequest(aggResult));
>                 }
>             }));
>
>
>
> *This results in the following error.*
>
> Caused by: java.lang.NoSuchMethodError:
> org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/
> ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
>         at
> org.apache.flink.streaming.connectors.elasticsearch.
> BulkProcessorIndexer.add(BulkProcessorIndexer.java:52)
>         at ECSPrototype$2.process(ECSPrototype.java:148)
>         at ECSPrototype$2.process(ECSPrototype.java:134)
>         at
> org.apache.flink.streaming.connectors.elasticsearch.
> ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:282)
>         at
> org.apache.flink.streaming.api.operators.StreamSink.
> processElement(StreamSink.java:41)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:503)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:483)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:575)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:536)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:891)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:869)
>         at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
>         at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.
> java:327)
>         at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.
> java:303)
>         at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.
> processElement(KeyedProcessOperator.java:94)
>         at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
> StreamInputProcessor.java:206)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:69)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:263)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:748)
>
>
> Anyidea what is wrong here ?
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>



-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 1823908

Mime
View raw message