giraph-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Avery Ching <ach...@apache.org>
Subject Re: wierd communication errors
Date Sat, 30 Jun 2012 00:41:03 GMT
Please try -Dgiraph.useNetty=true

On 6/29/12 11:35 AM, Yuanyuan Tian wrote:
>
> Hi Avery,
>
> I have got better understanding of the problem now. I think I hit the 
> scalability limit of Giraph. I was running 90 workers on a 16 nodes 
> cluster (each node can run 7 concurrent mappers). So, on average each 
> node ran 5 or 6 workers, with each worker maintaining 89 RPC 
> connections. Plus, each worker in my job was sending a lot of messages 
> in each iteration. As a result, the RPC connections got very unstable. 
> When I tried to reduce the number of workers to 40 and number of 
> concurrent mappers each node can run to 4, the job can run without any 
> problem.
>
> I think my experience revealed two limitations of the current Giraph:
> - Scalability: suppose we have n workers in a Giraph job, then each 
> worker will maintain (n-1) RPC connections. There are totally n*(n-1) 
> RPC connections in the job. As n increases, the number of RPC 
> connections quickly grows out of the capacity of the current giraph 
> system.
> - Fault Tolerance: when "connection reset by peer" or "broken pipe" 
> happens, the job just hangs, then eventually dies after timeout. There 
> is no re-establishment of a connection or automatic restart of a 
> failed worker.
>
> I am very curious whether there is any plan to address these two 
> limitations.
>
> BTW: I checked out the latest code from trunk. I did see some code 
> using netty. But the BasicRPCCommunications is still using Hadoop RPC. 
> Is there a nob or something I need to turn on to use netty?
>
> Yuanyuan
>
> Inactive hide details for Yuanyuan Tian---06/28/2012 10:16:10 AM---I 
> can try the netty version then. But what is the cause of rYuanyuan 
> Tian---06/28/2012 10:16:10 AM---I can try the netty version then. But 
> what is the cause of reset by peer? Time out? And if it happen
>
> From: Yuanyuan Tian/Almaden/IBM
> To: user@giraph.apache.org
> Cc: user@giraph.apache.org
> Date: 06/28/2012 10:16 AM
> Subject: Re: wierd communication errors
>
> ------------------------------------------------------------------------
>
>
> I can try the netty version then. But what is the cause of reset by 
> peer? Time out? And if it happens, how can I reestablish the 
> connection? I can add some code to check the connection first and to 
> reestablish the connection if reset by peer before calling 
> putVertexIdMessagesList.
>
> Yuanyuan
>
>
> Inactive hide details for Avery Ching ---06/28/2012 01:20:48 AM---In 
> my testing, I found the netty implementation of Giraph (trAvery Ching 
> ---06/28/2012 01:20:48 AM---In my testing, I found the netty 
> implementation of Giraph (trunk) to be  more stable than Hadoop RPC
>
> From: Avery Ching <aching@apache.org>
> To: user@giraph.apache.org
> Date: 06/28/2012 01:20 AM
> Subject: Re: wierd communication errors
> ------------------------------------------------------------------------
>
>
>
> In my testing, I found the netty implementation of Giraph (trunk) to 
> be more stable than Hadoop RPC.  But you can't do too much (other than 
> reestablish the connection) when the connection is reset by peer.
>
> Avery
>
> On 6/28/12 12:29 AM, Yuanyuan Tian wrote:
>
>     I want to make a correction about the errors. The error should be
>     as follows. The errors in my previous email are from my added
>     debug message. But the problem is the same, somehow some
>     connection was reset by peer. I did more tries. Occasionally, my
>     job can actually run without a problem, then more times the job
>     fails because of this connection reset problem.  I really don't
>     have a clue what the problem is.
>
>     Yuanyuan
>
>     java.lang.IllegalStateException: run: Caught an unrecoverable
>     exception flush: Got ExecutionException
>                    at
>     org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:859)
>                    at
>     org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>                    at
>     org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
>                    at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>                    at
>     java.security.AccessController.doPrivileged(Native Method)
>                    at javax.security.auth.Subject.doAs(Subject.java:396)
>                    at
>     org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
>                    at org.apache.hadoop.mapred.Child.main(Child.java:253)
>     Caused by: java.lang.IllegalStateException: flush: Got
>     ExecutionException
>                    at
>     org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1085)
>                    at
>     org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:1080)
>                    at
>     org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:806)
>                    at
>     org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:850)
>                    ... 7 more
>     Caused by: java.util.concurrent.ExecutionException:
>     java.lang.RuntimeException: java.io.IOException: Call to
>     idp33.almaden.ibm.com/172.16.0.33:30054 failed on local exception:
>     java.io.IOException: Connection reset by peer
>                    at
>     java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
>                    at
>     java.util.concurrent.FutureTask.get(FutureTask.java:83)
>                    at
>     org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1080)
>                    ... 10 more
>     Caused by: java.lang.RuntimeException: java.io.IOException: Call
>     to idp33.almaden.ibm.com/172.16.0.33:30054 failed on local
>     exception: java.io.IOException: Connection reset by peer
>                    at
>     org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:379)
>                    at
>     java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>                    at
>     java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>                    at
>     java.util.concurrent.FutureTask.run(FutureTask.java:138)
>                    at
>     java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>                    at
>     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>                    at java.lang.Thread.run(Thread.java:662)
>     Caused by: java.io.IOException: Call to
>     idp33.almaden.ibm.com/172.16.0.33:30054 failed on local exception:
>     java.io.IOException: Connection reset by peer
>                    at
>     org.apache.hadoop.ipc.Client.wrapException(Client.java:1065)
>                    at org.apache.hadoop.ipc.Client.call(Client.java:1033)
>                    at
>     org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:224)
>                    at $Proxy3.putVertexIdMessagesList(Unknown Source)
>                    at
>     org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:339)
>                    ... 6 more
>     Caused by: java.io.IOException: Connection reset by peer
>                    at sun.nio.ch.FileDispatcher.read0(Native Method)
>                    at
>     sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
>                    at
>     sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:202)
>                    at sun.nio.ch.IOUtil.read(IOUtil.java:175)
>                    at
>     sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
>                    at
>     org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:55)
>                    at
>     org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
>                    at
>     org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:155)
>                    at
>     org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:128)
>                    at
>     java.io.FilterInputStream.read(FilterInputStream.java:116)
>                    at
>     org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:343)
>                    at
>     java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
>                    at
>     java.io.BufferedInputStream.read(BufferedInputStream.java:237)
>                    at
>     java.io.DataInputStream.readInt(DataInputStream.java:370)
>                    at
>     org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:767)
>                    at
>     org.apache.hadoop.ipc.Client$Connection.run(Client.java:712)
>
>
>
>
>
>     From: Yuanyuan Tian/Almaden/IBM@IBMUS
>     To: _user@giraph.apache.org_ <mailto:user@giraph.apache.org>
>     Cc: _user@giraph.apache.org_ <mailto:user@giraph.apache.org>
>     Date: 06/27/2012 10:02 PM
>     Subject: Re: wierd communication errors
>     ------------------------------------------------------------------------
>
>
>
>     What do you mean using netty? I am not aware that Giraph is using
>     netty. I am just using what ever the default  giraph release 1.0
>     is using.
>
>     Yuanyuan
>
>
>
>     From: Avery Ching _<aching@apache.org>_ <mailto:aching@apache.org>
>     To: _user@giraph.apache.org_ <mailto:user@giraph.apache.org>
>     Date: 06/27/2012 07:57 PM
>     Subject: Re: wierd communication errors
>     ------------------------------------------------------------------------
>
>
>
>     Same issue using netty as well?
>
>
>     On 6/27/12 6:14 PM, Yuanyuan Tian wrote:
>     Hi,
>
>     I was running a giraph job where I constantly got the following
>     communication related errors. The symptom is that in super step 0,
>     most of the workers succeeded but a few of the workers produced
>     the errors below, the machines that caused the connection reset
>     are different in each failed worker. To rule out the probability
>     of the cluster setup error, I also ran a different job and it
>     worked fine. So, the error must be caused by this particular
>     giraph job. My giraph job is just normal message propagation type
>     of job, except that the message is not a of a unique type.
>     Therefore, I defined a special message type (also copied in this
>     email) that incorporates two different types of messages: integer
>     message and double array message.  I have tried all day but still
>     couldn't ping point the source of the bug. Can anyone give me some
>     hints on what may have caused this error?
>
>     Thanks a lot,
>
>     java.lang.IllegalStateException: run: Caught an unrecoverable
>     exception flush: Got ExecutionException
>                  at
>     org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:859)
>                  at
>     org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>                  at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
>                  at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>                  at java.security.AccessController.doPrivileged(Native
>     Method)
>                  at javax.security.auth.Subject.doAs(Subject.java:396)
>                  at
>     org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
>                  at org.apache.hadoop.mapred.Child.main(Child.java:253)
>     Caused by: java.lang.IllegalStateException: flush: Got
>     ExecutionException
>                  at
>     org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1082)
>                  at
>     org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:1080)
>                  at
>     org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:806)
>                  at
>     org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:850)
>                  ... 7 more
>     Caused by: java.util.concurrent.ExecutionException:
>     java.lang.reflect.UndeclaredThrowableException
>                  at
>     java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
>                  at
>     java.util.concurrent.FutureTask.get(FutureTask.java:83)
>                  at
>     org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:1077)
>                  ... 10 more
>     Caused by: java.lang.reflect.UndeclaredThrowableException
>                  at $Proxy3.getName(Unknown Source)
>                  at
>     org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:335)
>                  at
>     java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>                  at
>     java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>                  at
>     java.util.concurrent.FutureTask.run(FutureTask.java:138)
>                  at
>     java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>                  at
>     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>                  at java.lang.Thread.run(Thread.java:662)
>     Caused by: java.io.IOException: Call to
>     idp35.almaden.ibm.com/172.16.0.35:30083 failed on local exception:
>     java.io.IOException: Connection reset by peer
>                  at
>     org.apache.hadoop.ipc.Client.wrapException(Client.java:1065)
>                  at org.apache.hadoop.ipc.Client.call(Client.java:1033)
>                  at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:224)
>                  ... 8 more
>     Caused by: java.io.IOException: Connection reset by peer
>                  at sun.nio.ch.FileDispatcher.read0(Native Method)
>                  at
>     sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
>                  at
>     sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:202)
>                  at sun.nio.ch.IOUtil.read(IOUtil.java:175)
>                  at
>     sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
>                  at
>     org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:55)
>                  at
>     org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
>                  at
>     org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:155)
>                  at
>     org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:128)
>                  at
>     java.io.FilterInputStream.read(FilterInputStream.java:116)
>                  at
>     org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:343)
>                  at
>     java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
>                  at
>     java.io.BufferedInputStream.read(BufferedInputStream.java:237)
>                  at
>     java.io.DataInputStream.readInt(DataInputStream.java:370)
>                  at
>     org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:767)
>                  at
>     org.apache.hadoop.ipc.Client$Connection.run(Client.java:712)
>
>     My special messge type:*
>
>     public**class* MyMessageWritable *implements* Writable{
>
>     *public**byte*msgType=0;
>     *public**long*vertexID=-1;
>     *public**double*[] arrayMsg=*null*;
>     *public**int*intMsg=-1;
>
>     *public* MyMessageWritable ()
>          {
>          }
>
>     *public* MyMessageWritable (*long* id, *byte* tp, *int* msg)
>          {
>     vertexID=id;
>     msgType=tp;
>     intMsg=msg;
>          }
>
>     *public* MyMessageWritable (*long* id, *byte* tp, *double*[] arr)
>          {
>     vertexID=id;
>     msgType=tp;
>     arrayMsg=arr;
>          }
>
>     @Override
>     *public**void* readFields(DataInput in) *throws* IOException {
>     vertexID=in.readLong();
>     msgType=in.readByte();
>     *switch*(msgType)
>                  {
>     *case* 1:
>     *case* 4:
>     intMsg=in.readInt();
>     *break*;
>     *case* 2:
>     *case* 3:
>     *if*(arrayMsg==*null*)
>     arrayMsg=*new**double*[MyVertex./K/];
>     *for*(*int* i=0; i<MyVertex./K/; i++)
>     arrayMsg[i]=in.readDouble();
>     *break*;
>     *default*:
>     *throw**new* IOException("message type invalid: "+msgType);
>                  }
>          }
>
>     @Override
>     *public**void* write(DataOutput out) *throws* IOException {
>                  out.writeLong(vertexID);
>                  out.writeByte(msgType);
>     *switch*(msgType)
>                  {
>     *case* 1:
>     *case* 4:
>                          out.writeInt(intMsg);
>     *break*;
>     *case* 2:
>     *case* 3:
>     *if*(arrayMsg==*null*)
>     *throw**new* IOException("array message is null");
>     *for*(*int* i=0; i<MyVertex./K/; i++)
>                                  out.writeDouble(arrayMsg[i]);
>     *break*;
>     *default*:
>     *throw**new* IOException("message type invalid: "+msgType);
>                  }
>
>          }
>
>
>



Mime
View raw message