Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1777E196BF for ; Wed, 6 Apr 2016 13:47:01 +0000 (UTC) Received: (qmail 5283 invoked by uid 500); 6 Apr 2016 13:47:00 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 5189 invoked by uid 500); 6 Apr 2016 13:47:00 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 5180 invoked by uid 99); 6 Apr 2016 13:47:00 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Apr 2016 13:47:00 +0000 Received: from mail-wm0-f52.google.com (mail-wm0-f52.google.com [74.125.82.52]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 9D1021A003F for ; Wed, 6 Apr 2016 13:46:59 +0000 (UTC) Received: by mail-wm0-f52.google.com with SMTP id l6so65969634wml.1 for ; Wed, 06 Apr 2016 06:46:59 -0700 (PDT) X-Gm-Message-State: AD7BkJJCJuCpwn6kz/pIVsaGY6dRiFEM0tO3zMKyi1rDckLsHV30H84fiqJSS17oXFKgynaNbtI/6YxYAPKZLA== MIME-Version: 1.0 X-Received: by 10.194.179.168 with SMTP id dh8mr32887726wjc.130.1459950418356; Wed, 06 Apr 2016 06:46:58 -0700 (PDT) Received: by 10.194.122.229 with HTTP; Wed, 6 Apr 2016 06:46:58 -0700 (PDT) In-Reply-To: References: Date: Wed, 6 Apr 2016 15:46:58 +0200 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: RemoteTransportException when trying to redis in flink code From: Till Rohrmann To: user@flink.apache.org Content-Type: multipart/alternative; boundary=089e0102e9741376ce052fd13203 --089e0102e9741376ce052fd13203 Content-Type: text/plain; charset=UTF-8 Great to hear that you solved your problem :-) On Wed, Apr 6, 2016 at 2:29 PM, Balaji Rajagopalan < balaji.rajagopalan@olacabs.com> wrote: > Till, > Found the issue, it was my bad assumption about GlobalConfiguration, > what I thought was once the configuration is read from the client machine > GlobalConfiguration params will passed on to the task manager nodes, as > well, it was not and values from default was getting pickup, which was > localhost 6379 and there was no redis running in localhost of task manager. > > balaji > > On Wed, Apr 6, 2016 at 3:29 PM, Till Rohrmann > wrote: > >> Hmm I'm not a Redis expert, but are you sure that you see a successful >> ping reply in the logs of the TaskManagers and not only in the client logs? >> >> Another thing: Is the redisClient thread safe? Multiple map tasks might >> be accessing the set and get methods concurrently. >> >> Another question: The code of DriverStreamHelper you've just sent is not >> the code you've used when receiving the stack trace, right? Because in the >> stack trace it's written that you access a RedisClientPool from the >> DriverStreamHelper.set method. >> >> Cheers, >> Till >> >> >> On Wed, Apr 6, 2016 at 11:42 AM, Balaji Rajagopalan < >> balaji.rajagopalan@olacabs.com> wrote: >> >>> Till, >>> I have checked from all the taskmanager nodes I am able to establish a >>> connection by installing a redis-cli on those nodes. The thing is in the >>> constructor I am able to set and get values, also I am getting PONG for the >>> ping. But once object is initialized when I try to call DriverStreamHelper.get >>> and DriverStreamHelper.set from map/apply function I get the connection >>> refused. This may not be related to flink but rather to some security >>> setting with Amazon AWS EMR, this is assumption now. I have also tried with >>> 3 different redis libraries to rule out any errors with libraries the same >>> exception in all. >>> >>> object DriverStreamHelper { >>> >>> >>> implicit val akkaSystem = akka.actor.ActorSystem("flink-actorsystem") >>> >>> val redisClient = RedisClient(host=redisHost, port=redisPort) >>> >>> val p = redisClient.ping() >>> p.map{ res => LOG.info(s"Reply from Redis client : $res") } >>> >>> >>> >>> val postFix = System.currentTimeMillis() >>> val key = "some-key" + postFix >>> val value = "some-value" + postFix >>> set(key, value, Some(10000L)) >>> LOG.info(s"Going to get the value from Redis ${get(key)}") >>> >>> def set(k: String, v: String): Unit = { >>> redisClient.set(k,v) >>> } >>> >>> def set(k: String, v: String, exTime: Option[Long]): Unit = { >>> redisClient.set(k,v,exTime) >>> } >>> >>> >>> def get(k: String): Option[String] = { >>> import scala.concurrent.duration._ >>> val f = redisClient.get[String](k) >>> Await.result(f, 1.seconds) //FIXME - really bad need to return future >>> here. >>> } >>> >>> } >>> >>> >>> On Wed, Apr 6, 2016 at 2:42 PM, Till Rohrmann >>> wrote: >>> >>>> Hi Balaji, >>>> >>>> from the stack trace it looks as if you cannot open a connection redis. >>>> Have you checked that you can access redis from all your TaskManager nodes? >>>> >>>> Cheers, >>>> Till >>>> >>>> On Wed, Apr 6, 2016 at 7:46 AM, Balaji Rajagopalan < >>>> balaji.rajagopalan@olacabs.com> wrote: >>>> >>>>> I am trying to use AWS EMR yarn cluster where the flink code runs, in >>>>> one of apply window function, I try to set some values in redis it fails. I >>>>> have tried to access the same redis with no flink code and get/set works, >>>>> but from the flink I get into this exception. Any inputs on what might be >>>>> going wrong. >>>>> >>>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: >>>>> Error at remote task manager 'some-ip'. >>>>> >>>>> at >>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241) >>>>> >>>>> at >>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >>>>> >>>>> at >>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >>>>> >>>>> at >>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >>>>> >>>>> at >>>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) >>>>> >>>>> at >>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) >>>>> >>>>> at >>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >>>>> >>>>> at >>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >>>>> >>>>> at >>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >>>>> >>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >>>>> >>>>> at >>>>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) >>>>> >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> Caused by: >>>>> org.apache.flink.runtime.io.network.partition.ProducerFailedException >>>>> >>>>> at >>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164) >>>>> >>>>> at >>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294) >>>>> >>>>> at >>>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294) >>>>> >>>>> at >>>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294) >>>>> >>>>> at >>>>> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32) >>>>> >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:299) >>>>> >>>>> at >>>>> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) >>>>> >>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) >>>>> >>>>> ... 2 more >>>>> >>>>> >>>>> Caused by: java.lang.RuntimeException: java.net.ConnectException: >>>>> Connection refused >>>>> >>>>> at com.redis.IO >>>>> $class.connect(IO.scala:37) >>>>> >>>>> at com.redis.RedisClient.connect(RedisClient.scala:94) >>>>> >>>>> at >>>>> com.redis.RedisCommand$class.initialize(RedisClient.scala:71) >>>>> >>>>> at com.redis.RedisClient.initialize(RedisClient.scala:94) >>>>> >>>>> at com.redis.RedisClient.(RedisClient.scala:98) >>>>> >>>>> at com.redis.RedisClientFactory.makeObject(Pool.scala:12) >>>>> >>>>> at com.redis.RedisClientFactory.makeObject(Pool.scala:7) >>>>> >>>>> at >>>>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:149) >>>>> >>>>> at com.redis.RedisClientPool.withClient(Pool.scala:34) >>>>> >>>>> at >>>>> com.olacabs.peakpricing.utils.DriverStreamHelper$.set(DriverStreamHelper.scala:57) >>>>> >>>>> at >>>>> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:35) >>>>> >>>>> at >>>>> com.olacabs.peakpricing.datastream.TotalMappedFunction.join(TotalMappedFunction.scala:29) >>>>> >>>>> at >>>>> org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:312) >>>>> >>>>> at >>>>> org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:583) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:44) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processTriggerResult(WindowOperator.java:256) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:287) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) >>>>> >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) >>>>> >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>> >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> Caused by: java.net.ConnectException: Connection refused >>>>> >>>>> at java.net.PlainSocketImpl.socketConnect(Native Method) >>>>> >>>>> at >>>>> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) >>>>> >>>>> at >>>>> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) >>>>> >>>>> at >>>>> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) >>>>> >>>>> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) >>>>> >>>>> at java.net.Socket.connect(Socket.java:589) >>>>> >>>>> at java.net.Socket.connect(Socket.java:538) >>>>> >>>>> at java.net.Socket.(Socket.java:434) >>>>> >>>>> at java.net.Socket.(Socket.java:211) >>>>> >>>> >>>> >>> >> > --089e0102e9741376ce052fd13203 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Great to hear that you solved your problem :-)


On Wed, A= pr 6, 2016 at 3:29 PM, Till Rohrmann <trohrmann@apache.org> wrote:
Hmm I'm no= t a Redis expert, but are you sure that you see a successful ping reply in = the logs of the TaskManagers and not only in the client logs?=C2=A0
Another thing: Is the redisClient thread safe? Multiple map tas= ks might be accessing the set and get methods concurrently.

<= /div>
Another question: The code of DriverStreamHelper you've just = sent is not the code you've used when receiving the stack trace, right?= Because in the stack trace it's written that you access a RedisClientP= ool from the DriverStreamHelper.set method.

Cheers= ,
Till


On Wed, Apr 6, 2016 at 11:42 AM, Balaji= Rajagopalan <balaji.rajagopalan@olacabs.com> w= rote:
Till,
=C2=A0 I= have checked from all the taskmanager nodes I am able to establish a conne= ction by installing a redis-cli on those nodes. The thing is in the constru= ctor I am able to set and get values, also I am getting PONG for the ping. = But once object is initialized when I try to call=C2=A0DriverStreamHelper.get and=C2= =A0= DriverStreamHelper.set from map/apply function I get the connection refused= . This may not be related to flink but rather to some security setting with= Amazon AWS EMR, this is assumption now. I have also tried with 3 different= redis libraries to rule out any errors with libraries the same exception i= n all.=C2=A0

object DriverStreamHelper {


implicit val akkaSystem =3D akka.ac= tor.ActorSystem("flink-actorsystem")
=
val redisClient =3D RedisClient(host=3DredisHost, port=3DredisPort)

val p =3D redisClient.ping()
p.map{ res =3D> LOG.info(s"Reply from Redis clie= nt : $r= es") }
<= br>

val postFix = =3D System.currentTimeMillis()
= val key =3D "some-key" + postFix
val value =3D "some-value" + postFix
= set(key, value, <= span style=3D"font-style:italic">Some(10000L))
LOG.info(= s"Going to get the value from Redis ${get(key)}<= span style=3D"color:rgb(0,128,0);font-weight:bold">")

= def set(k: String
, v: String): Unit =3D {
redisClient.set(k,v)
}
  def set(k: String, v: String, exTime: Option=
[Long]): Unit =3D {
redisClient.set(k,v,exTime)
}

def get(k: String): Option[String] =3D {
import scala.concurrent.duration._
val f =3D redisClient.get[String](k)
Await.result(f, 1.seconds) //= FIXME - really bad need to return future here.
}
}

On W= ed, Apr 6, 2016 at 2:42 PM, Till Rohrmann <trohrmann@apache.org>= wrote:
Hi Balaji= ,

from the stack trace it looks as if you cannot open a = connection redis. Have you checked that you can access redis from all your = TaskManager nodes?

Cheers,
Till

On W= ed, Apr 6, 2016 at 7:46 AM, Balaji Rajagopalan <balaji.rajago= palan@olacabs.com> wrote:
<= div dir=3D"ltr">

I am trying to use AWS EMR yarn cluster where the flink code runs,= in one of apply window function, I try to set some values in redis it fail= s. I have tried to access the same redis with no flink code and get/set wor= ks, but from the flink I get =C2=A0into this exception. Any inputs on what = might be going wrong.=C2=A0

org.apache.flink.runtime.io.networ= k.netty.exception.RemoteTransportException: Error at remote task manager &#= 39;some-ip'.

at org.apache.flink.runtime.io.network.netty.Partiti= onRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)

at org.apache.flink.runtime.io.network.netty.Partiti= onRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)<= /span>

at io.netty.channel.AbstractChannelHandlerContext.in= vokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fi= reChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.handler.codec.MessageToMessageDecoder.ch= annelRead(MessageToMessageDecoder.java:103)

at io.netty.channel.AbstractChannelHandlerContext.in= vokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fi= reChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.handler.codec.ByteToMessageDecoder.chann= elRead(ByteToMessageDecoder.java:242)

at io.netty.channel.AbstractChannelHandlerContext.in= vokeChannelRead(AbstractChannelHandlerContext.java:339)

at io.netty.channel.AbstractChannelHandlerContext.fi= reChannelRead(AbstractChannelHandlerContext.java:324)

at io.netty.channel.DefaultChannelPipeline.fireChann= elRead(DefaultChannelPipeline.java:847)

at io.netty.channel.nio.AbstractNioByteChannel$NioBy= teUnsafe.read(AbstractNioByteChannel.java:131)

at io.netty.channel.nio.NioEventLoop.processSelected= Key(NioEventLoop.java:511)

at io.netty.channel.nio.NioEventLoop.processSelected= KeysOptimized(NioEventLoop.java:468)

at io.netty.channel.nio.NioEventLoop.processSelected= Keys(NioEventLoop.java:382)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoo= p.java:354)

at io.netty.util.concurrent.SingleThreadEventExecuto= r$2.run(SingleThreadEventExecutor.java:111)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.flink.runtime.io.network.partition.ProducerF= ailedException

at org.apache.flink.runtime.io.network.netty.Partiti= onRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.jav= a:164)

at org.apache.flink.runtime.io.network.netty.Partiti= onRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)

at io.netty.channel.AbstractChannelHandlerContext.in= vokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fi= reUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.use= rEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.in= vokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fi= reUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.use= rEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.in= vokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.fi= reUserEventTriggered(AbstractChannelHandlerContext.java:294)

at io.netty.channel.ChannelInboundHandlerAdapter.use= rEventTriggered(ChannelInboundHandlerAdapter.java:108)

at io.netty.channel.AbstractChannelHandlerContext.in= vokeUserEventTriggered(AbstractChannelHandlerContext.java:308)

at io.netty.channel.AbstractChannelHandlerContext.ac= cess$500(AbstractChannelHandlerContext.java:32)

at io.netty.channel.AbstractChannelHandlerContext$6.= run(AbstractChannelHandlerContext.java:299)

at io.netty.util.concurrent.SingleThreadEventExecuto= r.runAllTasks(SingleThreadEventExecutor.java:357)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoo= p.java:357)

... 2 more


Caused by: java.lang.RuntimeException: java.net.Conn= ectException: Connection refused

=C2=A0 = =C2=A0 =C2=A0 =C2=A0 at=C2=A0com.redis.IO$class.connect(IO.scala:37)

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at com.redis.RedisClient.connect(RedisCl= ient.scala:94)

=C2=A0 =C2=A0 =C2=A0 =C2=A0= at com.redis.RedisCommand$class.initialize(RedisClient.scala:71)

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at com.redis.RedisClien= t.initialize(RedisClient.scala:94)

=C2=A0 = =C2=A0 =C2=A0 =C2=A0 at com.redis.RedisClient.<init>(RedisClient.scal= a:98)

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at com.r= edis.RedisClientFactory.makeObject(Pool.scala:12)

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at com.redis.RedisClientFactory.makeObj= ect(Pool.scala:7)

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 at org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjec= tPool.java:149)

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 at com.redis.RedisClientPool.withClient(Pool.scala:34)

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at com.olacabs.peakpricing.uti= ls.DriverStreamHelper$.set(DriverStreamHelper.scala:57)

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at com.olacabs.peakpricing.datast= ream.TotalMappedFunction.join(TotalMappedFunction.scala:35)

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at com.olacabs.peakpricing.da= tastream.TotalMappedFunction.join(TotalMappedFunction.scala:29)

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.strea= ming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams= .java:312)

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at = org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFun= ction.apply(CoGroupedStreams.java:583)

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.streaming.runtime.operators.wi= ndowing.functions.InternalIterableWindowFunction.apply(InternalIterableWind= owFunction.java:44)

=C2=A0 =C2=A0 =C2=A0 = =C2=A0 at org.apache.flink.streaming.runtime.operators.windowing.functions.= InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:33= )

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apach= e.flink.streaming.runtime.operators.windowing.WindowOperator.processTrigger= Result(WindowOperator.java:256)

=C2=A0 =C2= =A0 =C2=A0 =C2=A0 at org.apache.flink.streaming.runtime.operators.windowing= .WindowOperator.processWatermark(WindowOperator.java:287)

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.streaming.r= untime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)<= /p>

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.= flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.jav= a:65)

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.a= pache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)<= /p>

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.= flink.runtime.taskmanager.Task.run(Task.java:559)

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.Thread.run(Thread.java:745= )

Caused by: java.net.ConnectException: Co= nnection refused

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 at java.net.PlainSocketImpl.socketConnect(Native Method)

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.net.AbstractPlainSoc= ketImpl.doConnect(AbstractPlainSocketImpl.java:350)

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.net.AbstractPlainSocketImpl.c= onnectToAddress(AbstractPlainSocketImpl.java:206)

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.net.AbstractPlainSocketImpl.con= nect(AbstractPlainSocketImpl.java:188)

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at java.net.SocksSocketImpl.connect(SocksSocketImp= l.java:392)

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at= java.net.Socket.connect(Socket.java:589)

= =C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.net.Socket.connect(Socket.java:538)

=

=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.net.Socke= t.<init>(Socket.java:434)

=C2=A0 =C2=A0 =C2=A0 =C2=A0=C2= =A0at java.net.Socket.<init>(Socket.java:211)






--089e0102e9741376ce052fd13203--