flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Elasticsearch Sink - Error
Date Wed, 30 Aug 2017 08:16:00 GMT
That's correct Flavio.
The issue has been reported as
https://issues.apache.org/jira/browse/FLINK-7386

Best, Fabian

2017-08-30 9:21 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:

> I also had problems with ES 5.4.3 and I had to modify the connector
> code...I fear that the code is compatible only up to ES 5.2 or similar..
>
> On Wed, Aug 30, 2017 at 5:40 AM, Raj Kumar <smallthings1992@gmail.com>
> wrote:
>
>> Hi,
>> I am using elasticsearch 5.4.3 version in my flink project(flink version
>> 1.3.1)
>> Details
>> 1. Using Maven build tool.
>> 2. Running from intellij IDE.
>> 3. Elasticsearch is running on the local machine.
>>
>> Have added the following maven dependency
>>
>> <dependency>
>>     <groupId>org.apache.flink</groupId>
>>     <artifactId>flink-connector-elasticsearch5_2.10</artifactId>
>>     <version>1.3.1</version>
>> </dependency>
>>
>>
>> *code added*
>>
>> Map<String, String> config = new HashMap<>();
>>             config.put("cluster.name", "elasticsearch");
>>             config.put("bulk.flush.max.actions", "1");
>>
>>             List<InetSocketAddress> transportAddresses = new
>> ArrayList<>();
>>             transportAddresses.add(new
>> InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
>>
>>             alerts.addSink(new ElasticsearchSink<AggResult>(config,
>> transportAddresses, new ElasticsearchSinkFunction<AggResult>() {
>>                 public IndexRequest createIndexRequest(AggResult
>> aggResult){
>>                     Map<String, Long> json = new HashMap<>();
>>                     json.put("totalCount", aggResult.getTotalCount());
>>
>>                     return Requests
>>                             .indexRequest()
>>                             .index("logdata")
>>                             .type("consolidatedStreamData")
>>                             .source(json);
>>
>>                 }
>>                 @Override
>>                 public void process(AggResult aggResult, RuntimeContext
>> runtimeContext, RequestIndexer requestIndexer) {
>>                     requestIndexer.add(createIndexRequest(aggResult));
>>                 }
>>             }));
>>
>>
>>
>> *This results in the following error.*
>>
>> Caused by: java.lang.NoSuchMethodError:
>> org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elastic
>> search/action/ActionRequest;)Lorg/elasticsearch/action/
>> bulk/BulkProcessor;
>>         at
>> org.apache.flink.streaming.connectors.elasticsearch.BulkProc
>> essorIndexer.add(BulkProcessorIndexer.java:52)
>>         at ECSPrototype$2.process(ECSPrototype.java:148)
>>         at ECSPrototype$2.process(ECSPrototype.java:134)
>>         at
>> org.apache.flink.streaming.connectors.elasticsearch.Elastics
>> earchSinkBase.invoke(ElasticsearchSinkBase.java:282)
>>         at
>> org.apache.flink.streaming.api.operators.StreamSink.processE
>> lement(StreamSink.java:41)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.pushToOperator(OperatorChain.java:528)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:503)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:483)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Broad
>> castingOutputCollector.collect(OperatorChain.java:575)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Broad
>> castingOutputCollector.collect(OperatorChain.java:536)
>>         at
>> org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:891)
>>         at
>> org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:869)
>>         at
>> org.apache.flink.streaming.api.operators.TimestampedCollecto
>> r.collect(TimestampedCollector.java:51)
>>         at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.ja
>> va:327)
>>         at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.ja
>> va:303)
>>         at
>> org.apache.flink.streaming.api.operators.KeyedProcessOperato
>> r.processElement(KeyedProcessOperator.java:94)
>>         at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>> rocessInput(StreamInputProcessor.java:206)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>> run(OneInputStreamTask.java:69)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:263)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>         at java.lang.Thread.run(Thread.java:748)
>>
>>
>> Anyidea what is wrong here ?
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 1823908 <+39%200461%20182%203908>
>

Mime
View raw message