flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Martin Neumann <mneum...@sics.se>
Subject Re: unclear exception when writing to elasticsearch
Date Thu, 02 Mar 2017 12:59:25 GMT
Hej,

I finally found out what the problem was. I had added another dependency
that was necessary to run things on hops for some reason that broke things.
When I remove it, it works fine. I talking to the hops guys about it to
understand what's going on.

Thanks for the help.
Cheers Martin



On Wed, Mar 1, 2017 at 3:14 PM, Tzu-Li (Gordon) Tai <tzulitai@apache.org>
wrote:

> Hi Martin,
>
> I followed your setup:
>
> 1. Maven java quick start archetype (Flink version 1.1.3)
> 2. Added `flink-connector-elasticsearch2_2.10` version 1.1.3 dependency
> 3. Ran the example in the Flink Elasticsearch docs against a Elasticsearch
> 2.4.1 installation
>
> and everything worked fine.
>
> Just to make sure nothing is conflicting, you could also try to do a `mvn
> dependency:purge-local-repository` on your project, and then re-download
> the dependencies with `mvn clean install`, and finally re-importing your
> project in the IDE.
>
> Let me know if this works for you!
>
> Cheers,
> Gordon
>
>
> On March 1, 2017 at 9:23:35 PM, Tzu-Li (Gordon) Tai (tzulitai@apache.org)
> wrote:
>
> Hi Martin,
>
> Just letting you know I’m trying your setup right now, and will get back
> to you once I confirm the results.
>
> - Gordon
>
>
> On March 1, 2017 at 9:15:16 PM, Martin Neumann (mneumann@sics.se) wrote:
>
> I created the project using the maven archetype
> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/java_api_quickstart.html>
so
> I'm using the packaged version pulled by maven.
>
> At this point, I just try to run it directly from inside the IDE
> (IntelliJ), mostly since I don't want to build it and deploy it on the
> cluster all the time. I tried building it (maven 3.0.5), it builds fine but
> fails to run on the cluster with the same exception that I get if I run
> things from within the IDE.
>
> My guess is that maybe some function names have changed between elastic
> search versions and they are just not compatible anymore.
>
> In the Worst case, I will hack something together that just writes the
> data using HttpURLConnection pushing things to the rest interface. (If
> that works from within flink)
>
>
> cheers Martin
>
> On Wed, Mar 1, 2017 at 12:24 PM, Flavio Pompermaier <pompermaier@okkam.it>
> wrote:
>
>> Did you build Flink from sources or are you using the packeged version?
>> Because I had an annoying problem when compiling Flink with maven > 3.3.
>> From https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> setup/building.html#dependency-shading:
>>
>> Maven 3.0.x, 3.1.x, and 3.2.x It is sufficient to call mvn clean install
>> -DskipTests in the root directory of Flink code base.
>>
>> Maven 3.3.x The build has to be done in two steps: First in the base
>> directory, then in the distribution project:
>>
>> mvn clean install -DskipTestscd flink-dist
>> mvn clean install
>>
>> * Note:* To check your Maven version, run mvn --version.
>>
>> On Wed, Mar 1, 2017 at 12:19 PM, Martin Neumann <mneumann@sics.se> wrote:
>>
>>> I tried to change the elastic search version to 2.4.1 which results in a
>>> new exception:
>>>
>>> Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurr
>>>> ent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
>>>> at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.
>>>> java:192)
>>>> at org.elasticsearch.client.transport.TransportClient$Builder.b
>>>> uild(TransportClient.java:131)
>>>> at org.apache.flink.streaming.connectors.elasticsearch2.Elastic
>>>> searchSink.open(ElasticsearchSink.java:164)
>>>> at org.apache.flink.api.common.functions.util.FunctionUtils.ope
>>>> nFunction(FunctionUtils.java:38)
>>>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>>>> erator.open(AbstractUdfStreamOperator.java:91)
>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>>>> perators(StreamTask.java:376)
>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>>>> treamTask.java:256)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>>
>>> On Wed, Mar 1, 2017 at 7:58 AM, Tzu-Li (Gordon) Tai <tzulitai@apache.org
>>> > wrote:
>>>
>>>> Hi Martin,
>>>>
>>>> You can do that by adding a dependency to the Elasticsearch client of
>>>> your desired version in your project.
>>>>
>>>> You can also check what Elasticsearch client version the project is
>>>> using by checking `mvn dependency:tree` from the base directory of your
>>>> project.
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>>
>>>> On March 1, 2017 at 1:21:56 AM, Martin Neumann (mneumann@sics.se)
>>>> wrote:
>>>>
>>>> Hej,
>>>>
>>>> thanks for the fast reply.
>>>>
>>>> I'm currently running things from inside my IDE so it should not be a
>>>> packaging problem. That said I added the plugin from the link provided but
>>>> I'm not sure what elastic search library is needed.
>>>>
>>>> Where do I override the elastic search version? The only thing I'm
>>>> currently using is the flink-connector do I have to modify its code?
>>>>
>>>> <dependency>
>>>>> <groupId>org.apache.flink</groupId>
>>>>> <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
>>>>> <version>1.1.3</version>
>>>>> </dependency>
>>>>
>>>>
>>>> One thing I forgot to mention, I can only modify things locally packing
>>>> it into a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm
>>>> running things on top of Hopsworks.
>>>>
>>>> cheers Martin
>>>>
>>>> On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai <
>>>> tzulitai@apache.org> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> This could be a Elasticsearch server / client version conflict, or
>>>>> that the uber jar of your code wasn’t built properly.
>>>>>
>>>>> For the first possible issue, we’re currently using Elasticsearch
>>>>> 2.3.5 to build the Flink Elasticsearch Connector. Could you try overriding
>>>>> this version to 2.4.1 when building your code and see if the problem
>>>>> remains?
>>>>>
>>>>> For the second issue, please check out https://ci.apache.org/proj
>>>>> ects/flink/flink-docs-release-1.3/dev/linking.html#packaging
>>>>> -dependencies-with-your-usercode-with-maven.
>>>>>
>>>>> Let me know if the problem remains after trying out the above :-)
>>>>>
>>>>> Cheers,
>>>>> Gordon
>>>>>
>>>>> On March 1, 2017 at 12:24:08 AM, Martin Neumann (mneumann@sics.se)
>>>>> wrote:
>>>>>
>>>>> Hej,
>>>>>
>>>>> I'm trying to write to elastic search from a streaming application and
>>>>> I get a weird error message I that I can't decipher. Hopefully, someone
>>>>> here can help me. I'm trying to run the java example
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/elasticsearch2.html>
from
>>>>> the website.I doublechecked that I can reach the elastic search from
the
>>>>> development machine by putting some data in with curl. Has anyone an
idea
>>>>> what the problem is?
>>>>>
>>>>> *Technical info:*
>>>>> Flink 1.1.3
>>>>>
>>>>> Elasticsearch 2.4.1
>>>>>
>>>>> http://bbc2.sics.se:19208/
>>>>>> {
>>>>>> "name" : "hopsworks",
>>>>>> "cluster_name" : "hops",
>>>>>> "cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
>>>>>> "version" : {
>>>>>> "number" : "2.4.1",
>>>>>> "build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
>>>>>> "build_timestamp" : "2016-09-27T18:57:55Z",
>>>>>> "build_snapshot" : false,
>>>>>> "lucene_version" : "5.5.2"
>>>>>> },
>>>>>> "tagline" : "You Know, for Search"
>>>>>> }
>>>>>
>>>>>
>>>>> Changes in the code:
>>>>>
>>>>> Map<String, String> config = new HashMap<>();
>>>>>>
>>>>> // This instructs the sink to emit after every element, otherwise they
>>>>>> would be buffered
>>>>>
>>>>> config.put("bulk.flush.max.actions", "1");
>>>>>>
>>>>> config.put("cluster.name", "hops");
>>>>>>
>>>>>
>>>>>> ArrayList<InetSocketAddress> transports = new ArrayList<>();
>>>>>>
>>>>> transports.add(new InetSocketAddress(InetAddress.getByName("
>>>>>> bbc2.sics.se"), 19208));
>>>>>
>>>>>
>>>>>
>>>>> Exception:
>>>>>
>>>>> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
>>>>>> org.elasticsearch.threadpool.ThreadPool
>>>>>> at org.elasticsearch.client.transport.TransportClient$Builder.b
>>>>>> uild(TransportClient.java:133)
>>>>>> at org.apache.flink.streaming.connectors.elasticsearch2.Elastic
>>>>>> searchSink.open(ElasticsearchSink.java:164)
>>>>>> at org.apache.flink.api.common.functions.util.FunctionUtils.ope
>>>>>> nFunction(FunctionUtils.java:38)
>>>>>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>>>>>> erator.open(AbstractUdfStreamOperator.java:91)
>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>>>>>> perators(StreamTask.java:376)
>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>>>>>> treamTask.java:256)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Mime
View raw message