flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rafal green <rafal.gree...@gmail.com>
Subject Re: Local Cluster have problem with connect to elasticsearch
Date Thu, 12 May 2016 22:50:45 GMT
...btw I found this (in folder:
"flink/flink-streaming-connectors/flink-connector-elasticsearch2.pom.xml")
:

<!-- Allow users to pass custom connector versions -->
<properties>
<elasticsearch.version>2.2.1</elasticsearch.version>
</properties>

I change it to 2.3.2 version and of course rebuild with that command "mvn
clean install -DskipTests"

...but nothing is changed.


2016-05-12 22:39 GMT+02:00 rafal green <rafal.green17@gmail.com>:

> Sorry not jar from elasticsearch-connector but from twitter-connector   *".m2/org/apache/flink/flink-connector-twitter_2.11/1.1-SNAPSHOT"
> - *it's work fine
>
> 2016-05-12 22:35 GMT+02:00 rafal green <rafal.green17@gmail.com>:
>
>> This is my working jar that i download it form
>> *.m2/org/apache/flink/flink-connector-elasticsearch2_2.11/1.1-SNAPSHOT*
>>
>> 2016-05-12 22:26 GMT+02:00 rafal green <rafal.green17@gmail.com>:
>>
>>> Hi Gordon,
>>>
>>> Thanks for advice - it's work perfect but only in elasticsearch case.
>>>
>>> This pom version works for elasticsearch 2.2.1.
>>>
>>> <artifactItem>
>>>    <groupId>org.apache.flink</groupId>
>>>    <artifactId>flink-connector-elasticsearch2_${scala.version}</artifactId>
>>>    <version>1.1-SNAPSHOT</version>
>>>    <type>jar</type>
>>>    <overWrite>false</overWrite>
>>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>>    <includes>org/apache/flink/**</includes>
>>> </artifactItem>
>>> <artifactItem>
>>>    <groupId>org.elasticsearch</groupId>
>>>    <artifactId>elasticsearch</artifactId>
>>>    <version>2.2.1</version>
>>>    <type>jar</type>
>>>    <overWrite>false</overWrite>
>>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>>    <includes>org/elasticsearch/**</includes>
>>> </artifactItem>
>>>
>>>
>>>
>>> Why 2.2.1 ? Beacuse if you check the *"flink/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml"
>>> * you will see this line*
>>> "<elasticsearch.version>2.2.1</elasticsearch.version>"*
>>>
>>>
>>> But Gordon your idea* not working with twitter-connector*. and I try
>>> add  this: (to pom) and it's not working
>>>
>>> <artifactItem>
>>>    <groupId>org.apache.flink</groupId>
>>>    <artifactId>flink-connector-twitter_${scala.version}</artifactId>
>>>    <version>1.1-SNAPSHOT</version>
>>>    <type>jar</type>
>>>    <overWrite>false</overWrite>
>>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>>    <includes>org/apache/flink/**</includes>
>>> </artifactItem>
>>> <artifactItem>
>>>    <groupId>com.twitter</groupId>
>>>    <artifactId>hbc-core</artifactId>
>>>    <version>2.2.0</version>
>>>    <type>jar</type>
>>>    <overWrite>false</overWrite>
>>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>>    <includes>com/twitter/**</includes>
>>> </artifactItem>
>>>
>>>
>>>
>>> or that
>>>
>>> <artifactItem>
>>>    <groupId>org.apache.flink</groupId>
>>>    <artifactId>flink-connector-twitter_${scala.version}</artifactId>
>>>    <version>1.1-SNAPSHOT</version>
>>>    <type>jar</type>
>>>    <overWrite>false</overWrite>
>>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>>    <includes>org/apache/flink/**</includes>
>>> </artifactItem>
>>> <artifactItem>
>>>    <groupId>com.twitter</groupId>
>>>    <artifactId>hbc-core</artifactId>
>>>    <version>2.2.0</version>
>>>    <type>jar</type>
>>>    <overWrite>false</overWrite>
>>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>>    <includes>com/twitter/**</includes>
>>> </artifactItem>
>>> <artifactItem>
>>>    <groupId>org.apache.httpcomponents</groupId>
>>>    <artifactId>httpclient</artifactId>
>>>    <version>4.2.5</version>
>>>    <type>jar</type>
>>>    <overWrite>false</overWrite>
>>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>>    <includes>org/apache/httpcomponents/**</includes>
>>> </artifactItem>
>>> <artifactItem>
>>>    <groupId>com.twitter</groupId>
>>>    <artifactId>joauth</artifactId>
>>>    <version>6.0.2</version>
>>>    <type>jar</type>
>>>    <overWrite>false</overWrite>
>>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>>    <includes>com/twitter/**</includes>
>>> </artifactItem>
>>> <artifactItem>
>>>    <groupId>org.apache.httpcomponents</groupId>
>>>    <artifactId>httpcore</artifactId>
>>>    <version>4.2.4</version>
>>>    <type>jar</type>
>>>    <overWrite>false</overWrite>
>>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>>    <includes>org/apache/httpcomponents/**</includes>
>>> </artifactItem>
>>> <artifactItem>
>>>    <groupId>com.google.guava</groupId>
>>>    <artifactId>guava</artifactId>
>>>    <version>14.0.1</version>
>>>    <type>jar</type>
>>>    <overWrite>false</overWrite>
>>>    <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>>    <includes>com/google/guava/**</includes>
>>> </artifactItem>
>>>
>>>
>>>
>>> And if I run job I see this error:
>>>
>>> 2016-05-12 21:49:37,681 INFO  org.elasticsearch.plugins                     
               - [node-1] modules [], plugins [], sites []
>>> 2016-05-12 21:49:37,738 INFO  org.apache.flink.runtime.blob.BlobCache       
               - Downloading 5ff307efcde8deebfb2886733e40994c01fbba7d from localhost/127.0.0.1:47639
>>> 2016-05-12 21:49:38,109 INFO  org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink
 - Created Elasticsearch TransportClient org.elasticsearch.client.transport.TransportClient@66cdf89
>>> 2016-05-12 21:49:38,114 INFO  org.apache.flink.streaming.connectors.twitter.TwitterSource
  - Initializing Twitter Streaming API connection
>>> 2016-05-12 21:49:38,357 INFO  com.twitter.hbc.httpclient.BasicClient        
               - New connection executed: flink-twitter-source, endpoint: /1.1/statuses/sample.json
>>> 2016-05-12 21:49:38,357 INFO  org.apache.flink.streaming.connectors.twitter.TwitterSource
  - Twitter Streaming API connection established successfully
>>> 2016-05-12 21:49:38,376 WARN  com.twitter.hbc.httpclient.ClientBase         
               - flink-twitter-source Uncaught exception
>>> java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
>>> 	at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140)
>>> 	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:114)
>>> 	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:99)
>>> 	at org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:85)
>>> 	at com.twitter.hbc.httpclient.RestartableHttpClient.setup(RestartableHttpClient.java:56)
>>> 	at com.twitter.hbc.httpclient.ClientBase.run(ClientBase.java:118)
>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> 	at java.lang.Thread.run(Thread.java:745)
>>> 2016-05-12 21:49:38,379 INFO  com.twitter.hbc.httpclient.ClientBase         
               - flink-twitter-source exit event - java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
>>> 2016-05-12 21:49:38,380 INFO  com.twitter.hbc.httpclient.ClientBase         
               - flink-twitter-source Shutting down httpclient connection manager
>>>
>>>
>>>
>>>
>>>  ... and finaly  "flink-connector-twitter_2.11-1.1-SNAPSHOT.jar" -  if I
>>> add jar to this location: flink/build-target/lib/   - it's working. No idea
>>> why :P
>>>
>>>
>>> 2016-05-12 0:32 GMT+02:00 Tzu-Li (Gordon) Tai <tzulitai@gmail.com>:
>>>
>>>> Hi Rafal,
>>>>
>>>> From your description, it seems like Flink is complaining because it
>>>> cannot
>>>> access the Elasticsearch API related dependencies as well. You'd also
>>>> have
>>>> to include the following into your Maven build, under <artifactItems>:
>>>>
>>>> <artifactItem>
>>>>     <groupId>org.elasticsearch</groupId>
>>>>     <artifactId>elasticsearch</artifactId>
>>>>     <version>2.3.2</version>
>>>>     <type>jar</type>
>>>>     <overWrite>false</overWrite>
>>>>
>>>> <outputDirectory>${project.build.directory}/classes</outputDirectory>
>>>>     <includes>org/elasticsearch/**</includes>
>>>> </artifactItem>
>>>>
>>>> Now your built jar should correctly include all required dependencies
>>>> (the
>>>> connector & Elasticsearch API).
>>>>
>>>> As explained in  Linking with modules not contained in the binary
>>>> distribution
>>>> <
>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution
>>>> >
>>>> , it will be enough to package dependencies along with your code for
>>>> Flink
>>>> to access all required dependencies, and you wouldn't need to copy the
>>>> jar
>>>> to the lib folder. I would recommend to clean up the lib folder of the
>>>> previous jars you copied, and follow this approach in the future, just
>>>> in
>>>> case they mess up the classloader.
>>>>
>>>> As with your first attempt that Flink cannot find any Elasticsearch
>>>> nodes
>>>> when executed in the IDE, I'm suspecting the reason is that the
>>>> elasticsearch2 connector by default uses version 2.2.1, lower than your
>>>> cluster version 2.3.2. I had previous experience when Elasticsearch
>>>> strangely complains not finding any nodes when using lower client
>>>> versions
>>>> than the deployment. Can you try compiling the elasticsearch2 connector
>>>> with
>>>> the option -Delasticsearch.version=2.3.2, and use the newly build
>>>> connector
>>>> jar, following the same method mentioned above?
>>>>
>>>> Hope this helps!
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Local-Cluster-have-problem-with-connect-to-elasticsearch-tp6788p6838.html
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive at Nabble.com.
>>>>
>>>
>>>
>>
>

Mime
View raw message