flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: RemoteTransportException when trying to redis in flink code
Date Wed, 06 Apr 2016 13:46:58 GMT
Great to hear that you solved your problem :-)

On Wed, Apr 6, 2016 at 2:29 PM, Balaji Rajagopalan <
balaji.rajagopalan@olacabs.com> wrote:

> Till,
>   Found the issue, it was my bad assumption about GlobalConfiguration,
> what I thought was once the configuration is read from the client machine
> GlobalConfiguration params will passed on to the task manager nodes, as
> well, it was not and values from default was getting pickup, which was
> localhost 6379 and there was no redis running in localhost of task manager.
>
> balaji
>
> On Wed, Apr 6, 2016 at 3:29 PM, Till Rohrmann <trohrmann@apache.org>
> wrote:
>
>> Hmm I'm not a Redis expert, but are you sure that you see a successful
>> ping reply in the logs of the TaskManagers and not only in the client logs?
>>
>> Another thing: Is the redisClient thread safe? Multiple map tasks might
>> be accessing the set and get methods concurrently.
>>
>> Another question: The code of DriverStreamHelper you've just sent is not
>> the code you've used when receiving the stack trace, right? Because in the
>> stack trace it's written that you access a RedisClientPool from the
>> DriverStreamHelper.set method.
>>
>> Cheers,
>> Till
>>
>>
>> On Wed, Apr 6, 2016 at 11:42 AM, Balaji Rajagopalan <
>> balaji.rajagopalan@olacabs.com> wrote:
>>
>>> Till,
>>>   I have checked from all the taskmanager nodes I am able to establish a
>>> connection by installing a redis-cli on those nodes. The thing is in the
>>> constructor I am able to set and get values, also I am getting PONG for the
>>> ping. But once object is initialized when I try to call DriverStreamHelper.get
>>> and DriverStreamHelper.set from map/apply function I get the connection
>>> refused. This may not be related to flink but rather to some security
>>> setting with Amazon AWS EMR, this is assumption now. I have also tried with
>>> 3 different redis libraries to rule out any errors with libraries the same
>>> exception in all.
>>>
>>> object DriverStreamHelper {
>>>
>>>
>>>   implicit val akkaSystem = akka.actor.ActorSystem("flink-actorsystem")
>>>
>>>   val redisClient = RedisClient(host=redisHost, port=redisPort)
>>>
>>>   val p = redisClient.ping()
>>>   p.map{ res => LOG.info(s"Reply from Redis client : $res") }
>>>
>>>
>>>
>>>   val postFix = System.currentTimeMillis()
>>>   val key = "some-key" + postFix
>>>   val value = "some-value" + postFix
>>>   set(key, value, Some(10000L))
>>>   LOG.info(s"Going to get the value from Redis ${get(key)}")
>>>
>>>   def set(k: String, v: String): Unit = {
>>>     redisClient.set(k,v)
>>>   }
>>>
>>>   def set(k: String, v: String, exTime: Option[Long]): Unit = {
>>>       redisClient.set(k,v,exTime)
>>>   }
>>>
>>>
>>> def get(k: String): Option[String] = {
>>> import scala.concurrent.duration._
>>> val f = redisClient.get[String](k)
>>> Await.result(f, 1.seconds) //FIXME - really bad need to return future
>>> here.
>>> }
>>>
>>> }
>>>
>>>
>>> On Wed, Apr 6, 2016 at 2:42 PM, Till Rohrmann <trohrmann@apache.org>
>>> wrote:
>>>
>>>> Hi Balaji,
>>>>
>>>> from the stack trace it looks as if you cannot open a connection redis.
>>>> Have you checked that you can access redis from all your TaskManager nodes?
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Wed, Apr 6, 2016 at 7:46 AM, Balaji Rajagopalan <
>>>> balaji.rajagopalan@olacabs.com> wrote:
>>>>
>>>>> I am trying to use AWS EMR yarn cluster where the flink code runs, in
>>>>> one of apply window function, I try to set some values in redis it fails.
I
>>>>> have tried to access the same redis with no flink code and get/set works,
>>>>> but from the flink I get  into this exception. Any inputs on what might
be
>>>>> going wrong.
>>>>>
>>>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>>>> Error at remote task manager 'some-ip'.
>>>>>
>>>>> at
>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)
>>>>>
>>>>> at
>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>>>>
>>>>> at
>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>>>>
>>>>> at
>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>>>>
>>>>> at
>>>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>>>>>
>>>>> at
>>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>>>>>
>>>>> at
>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>>>>
>>>>> at
>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>>>>
>>>>> at
>>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>>>>
>>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>>>>
>>>>> at
>>>>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>>>>
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> Caused by:
>>>>> org.apache.flink.runtime.io.network.partition.ProducerFailedException
>>>>>
>>>>> at
>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)
>>>>>
>>>>> at
>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>>>>
>>>>> at
>>>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>>>>
>>>>> at
>>>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
>>>>>
>>>>> at
>>>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)
>>>>>
>>>>> at
>>>>> io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299)
>>>>>
>>>>> at
>>>>> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>>>>>
>>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>>>>>
>>>>> ... 2 more
>>>>>
>>>>>
>>>>> Caused by: java.lang.RuntimeException: java.net.ConnectException:
>>>>> Connection refused
>>>>>
>>>>>         at com.redis.IO <http://com.redis.io/>
>>>>> $class.connect(IO.scala:37)
>>>>>
>>>>>         at com.redis.RedisClient.connect(RedisClient.scala:94)
>>>>>
>>>>>         at
>>>>> com.redis.RedisCommand$class.initialize(RedisClient.scala:71)
>>>>>
>>>>>         at com.redis.RedisClient.initialize(RedisClient.scala:94)
>>>>>
>>>>>         at com.redis.RedisClient.<init>(RedisClient.scala:98)
>>>>>
>>>>>         at com.redis.RedisClientFactory.makeObject(Pool.scala:12)
>>>>>
>>>>>         at com.redis.RedisClientFactory.makeObject(Pool.scala:7)
>>>>>
>>>>>         at
>>>>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149)
>>>>>
>>>>>         at com.redis.RedisClientPool.withClient(Pool.scala:34)
>>>>>
>>>>>         at
>>>>> com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57)
>>>>>
>>>>>         at
>>>>> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35)
>>>>>
>>>>>         at
>>>>> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:29)
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:312)
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:583)
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:44)
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33)
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processTriggerResult(WindowOperator.java:256)
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:287)
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>>>>>
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>>>>>
>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> Caused by: java.net.ConnectException: Connection refused
>>>>>
>>>>>         at java.net.PlainSocketImpl.socketConnect(Native Method)
>>>>>
>>>>>         at
>>>>> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>>>>>
>>>>>         at
>>>>> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>>>>>
>>>>>         at
>>>>> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>>>>>
>>>>>         at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>>>>>
>>>>>         at java.net.Socket.connect(Socket.java:589)
>>>>>
>>>>>         at java.net.Socket.connect(Socket.java:538)
>>>>>
>>>>>         at java.net.Socket.<init>(Socket.java:434)
>>>>>
>>>>>         at java.net.Socket.<init>(Socket.java:211)
>>>>>
>>>>
>>>>
>>>
>>
>

Mime
View raw message