flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Raj Kumar <smallthings1...@gmail.com>
Subject Elasticsearch Sink - Error
Date Wed, 30 Aug 2017 03:40:54 GMT
Hi, 
I am using elasticsearch 5.4.3 version in my flink project(flink version
1.3.1) 
Details
1. Using Maven build tool.
2. Running from intellij IDE.
3. Elasticsearch is running on the local machine.

Have added the following maven dependency

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch5_2.10</artifactId>
    <version>1.3.1</version>
</dependency>


*code added*

Map<String, String> config = new HashMap<>();
            config.put("cluster.name", "elasticsearch");
            config.put("bulk.flush.max.actions", "1");

            List<InetSocketAddress> transportAddresses = new ArrayList<>();
            transportAddresses.add(new
InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));

            alerts.addSink(new ElasticsearchSink<AggResult>(config,
transportAddresses, new ElasticsearchSinkFunction<AggResult>() {
                public IndexRequest createIndexRequest(AggResult aggResult){
                    Map<String, Long> json = new HashMap<>();
                    json.put("totalCount", aggResult.getTotalCount());

                    return Requests
                            .indexRequest()
                            .index("logdata")
                            .type("consolidatedStreamData")
                            .source(json);

                }
                @Override
                public void process(AggResult aggResult, RuntimeContext
runtimeContext, RequestIndexer requestIndexer) {
                    requestIndexer.add(createIndexRequest(aggResult));
                }
            }));



*This results in the following error.*

Caused by: java.lang.NoSuchMethodError:
org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
	at
org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:52)
	at ECSPrototype$2.process(ECSPrototype.java:148)
	at ECSPrototype$2.process(ECSPrototype.java:134)
	at
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:282)
	at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
	at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
	at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.java:327)
	at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.java:303)
	at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:94)
	at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
	at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)


Anyidea what is wrong here ?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Mime
View raw message