flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: unclear exception when writing to elasticsearch
Date Tue, 28 Feb 2017 16:42:27 GMT
Hi!

This could be a Elasticsearch server / client version conflict, or that the uber jar of your
code wasn’t built properly.

For the first possible issue, we’re currently using Elasticsearch 2.3.5 to build the Flink
Elasticsearch Connector. Could you try overriding this version to 2.4.1 when building your
code and see if the problem remains?

For the second issue, please check out https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html#packaging-dependencies-with-your-usercode-with-maven.

Let me know if the problem remains after trying out the above :-)

Cheers,
Gordon

On March 1, 2017 at 12:24:08 AM, Martin Neumann (mneumann@sics.se) wrote:

Hej,

I'm trying to write to elastic search from a streaming application and I get a weird error
message I that I can't decipher. Hopefully, someone here can help me. I'm trying to run the java
example from the website.I doublechecked that I can reach the elastic search from the development
machine by putting some data in with curl. Has anyone an idea what the problem is? 

Technical info:
Flink 1.1.3

Elasticsearch 2.4.1

http://bbc2.sics.se:19208/
{

"name" : "hopsworks",
"cluster_name" : "hops",
"cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
"version" : {
"number" : "2.4.1",
"build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
"build_timestamp" : "2016-09-27T18:57:55Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search" 
} 

Changes in the code:
Map<String, String> config = new HashMap<>();
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "hops");

ArrayList<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se"), 19208));
 

Exception:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.elasticsearch.threadpool.ThreadPool
at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:133)
at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)
Mime
View raw message