giraph-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yuanyuan Tian <yt...@us.ibm.com>
Subject Re: wierd communication errors
Date Fri, 29 Jun 2012 18:35:26 GMT

Hi Avery,

I have got better understanding of the problem now. I think I hit the
scalability limit of Giraph. I was running 90 workers on a 16 nodes cluster
(each node can run 7 concurrent mappers). So, on average each node ran 5 or
6 workers, with each worker maintaining 89 RPC connections. Plus, each
worker in my job was sending a lot of messages in each iteration. As a
result, the RPC connections got very unstable. When I tried to reduce the
number of workers to 40 and number of concurrent mappers each node can run
to 4, the job can run without any problem.

I think my experience revealed two limitations of the current Giraph:
- Scalability: suppose we have n workers in a Giraph job, then each worker
will maintain (n-1) RPC connections. There are totally n*(n-1) RPC
connections in the job. As n increases, the number of RPC connections
quickly grows out of the capacity of the current giraph system.
- Fault Tolerance: when "connection reset by peer" or "broken pipe"
happens, the job just hangs, then eventually dies after timeout. There is
no re-establishment of a connection or automatic restart of a failed
worker.

I am very curious whether there is any plan to address these two
limitations.

BTW: I checked out the latest code from trunk. I did see some code using
netty. But the BasicRPCCommunications is still using Hadoop RPC. Is there a
nob or something I need to turn on to use netty?

Yuanyuan



From:	Yuanyuan Tian/Almaden/IBM
To:	user@giraph.apache.org
Cc:	user@giraph.apache.org
Date:	06/28/2012 10:16 AM
Subject:	Re: wierd communication errors


I can try the netty version then. But what is the cause of reset by peer?
Time out? And if it happens, how can I reestablish the connection? I can
add some code to check the connection first and to reestablish the
connection if reset by peer before calling putVertexIdMessagesList.

Yuanyuan




From:	Avery Ching <aching@apache.org>
To:	user@giraph.apache.org
Date:	06/28/2012 01:20 AM
Subject:	Re: wierd communication errors



In my testing, I found the netty implementation of Giraph (trunk) to be
more stable than Hadoop RPC.  But you can't do too much (other than
reestablish the connection) when the connection is reset by peer.

Avery

On 6/28/12 12:29 AM, Yuanyuan Tian wrote:
      I want to make a correction about the errors. The error should be as
      follows. The errors in my previous email are from my added debug
      message. But the problem is the same, somehow some connection was
      reset by peer. I did more tries. Occasionally, my job can actually
      run without a problem, then more times the job fails because of this
      connection reset problem.  I really don't have a clue what the
      problem is.

      Yuanyuan

      java.lang.IllegalStateException: run: Caught an unrecoverable
      exception flush: Got ExecutionException
                      at org.apache.giraph.graph.GraphMapper.run
      (GraphMapper.java:859)
                      at org.apache.hadoop.mapred.MapTask.runNewMapper
      (MapTask.java:763)
                      at org.apache.hadoop.mapred.MapTask.run
      (MapTask.java:369)
                      at org.apache.hadoop.mapred.Child$4.run
      (Child.java:259)
                      at java.security.AccessController.doPrivileged(Native
      Method)
                      at javax.security.auth.Subject.doAs(Subject.java:396)
                      at
      org.apache.hadoop.security.UserGroupInformation.doAs
      (UserGroupInformation.java:1059)
                      at org.apache.hadoop.mapred.Child.main
      (Child.java:253)
      Caused by: java.lang.IllegalStateException: flush: Got
      ExecutionException
                      at
      org.apache.giraph.comm.BasicRPCCommunications.flush
      (BasicRPCCommunications.java:1085)
                      at
      org.apache.giraph.graph.BspServiceWorker.finishSuperstep
      (BspServiceWorker.java:1080)
                      at org.apache.giraph.graph.GraphMapper.map
      (GraphMapper.java:806)
                      at org.apache.giraph.graph.GraphMapper.run
      (GraphMapper.java:850)
                      ... 7 more
      Caused by: java.util.concurrent.ExecutionException:
      java.lang.RuntimeException: java.io.IOException: Call to
      idp33.almaden.ibm.com/172.16.0.33:30054 failed on local exception:
      java.io.IOException: Connection reset by peer
                      at java.util.concurrent.FutureTask$Sync.innerGet
      (FutureTask.java:222)
                      at java.util.concurrent.FutureTask.get
      (FutureTask.java:83)
                      at
      org.apache.giraph.comm.BasicRPCCommunications.flush
      (BasicRPCCommunications.java:1080)
                      ... 10 more
      Caused by: java.lang.RuntimeException: java.io.IOException: Call to
      idp33.almaden.ibm.com/172.16.0.33:30054 failed on local exception:
      java.io.IOException: Connection reset by peer
                      at org.apache.giraph.comm.BasicRPCCommunications
      $PeerFlushExecutor.run(BasicRPCCommunications.java:379)
                      at java.util.concurrent.Executors
      $RunnableAdapter.call(Executors.java:441)
                      at java.util.concurrent.FutureTask$Sync.innerRun
      (FutureTask.java:303)
                      at java.util.concurrent.FutureTask.run
      (FutureTask.java:138)
                      at java.util.concurrent.ThreadPoolExecutor
      $Worker.runTask(ThreadPoolExecutor.java:886)
                      at java.util.concurrent.ThreadPoolExecutor$Worker.run
      (ThreadPoolExecutor.java:908)
                      at java.lang.Thread.run(Thread.java:662)
      Caused by: java.io.IOException: Call to
      idp33.almaden.ibm.com/172.16.0.33:30054 failed on local exception:
      java.io.IOException: Connection reset by peer
                      at org.apache.hadoop.ipc.Client.wrapException
      (Client.java:1065)
                      at org.apache.hadoop.ipc.Client.call
      (Client.java:1033)
                      at org.apache.hadoop.ipc.RPC$Invoker.invoke
      (RPC.java:224)
                      at $Proxy3.putVertexIdMessagesList(Unknown Source)
                      at org.apache.giraph.comm.BasicRPCCommunications
      $PeerFlushExecutor.run(BasicRPCCommunications.java:339)
                      ... 6 more
      Caused by: java.io.IOException: Connection reset by peer
                      at sun.nio.ch.FileDispatcher.read0(Native Method)
                      at sun.nio.ch.SocketDispatcher.read
      (SocketDispatcher.java:21)
                      at sun.nio.ch.IOUtil.readIntoNativeBuffer
      (IOUtil.java:202)
                      at sun.nio.ch.IOUtil.read(IOUtil.java:175)
                      at sun.nio.ch.SocketChannelImpl.read
      (SocketChannelImpl.java:243)
                      at org.apache.hadoop.net.SocketInputStream
      $Reader.performIO(SocketInputStream.java:55)
                      at org.apache.hadoop.net.SocketIOWithTimeout.doIO
      (SocketIOWithTimeout.java:142)
                      at org.apache.hadoop.net.SocketInputStream.read
      (SocketInputStream.java:155)
                      at org.apache.hadoop.net.SocketInputStream.read
      (SocketInputStream.java:128)
                      at java.io.FilterInputStream.read
      (FilterInputStream.java:116)
                      at org.apache.hadoop.ipc.Client$Connection
      $PingInputStream.read(Client.java:343)
                      at java.io.BufferedInputStream.fill
      (BufferedInputStream.java:218)
                      at java.io.BufferedInputStream.read
      (BufferedInputStream.java:237)
                      at java.io.DataInputStream.readInt
      (DataInputStream.java:370)
                      at org.apache.hadoop.ipc.Client
      $Connection.receiveResponse(Client.java:767)
                      at org.apache.hadoop.ipc.Client$Connection.run
      (Client.java:712)





      From:        Yuanyuan Tian/Almaden/IBM@IBMUS
      To:        user@giraph.apache.org
      Cc:        user@giraph.apache.org
      Date:        06/27/2012 10:02 PM
      Subject:        Re: wierd communication errors



      What do you mean using netty? I am not aware that Giraph is using
      netty. I am just using what ever the default  giraph release 1.0 is
      using.

      Yuanyuan



      From:        Avery Ching <aching@apache.org>
      To:        user@giraph.apache.org
      Date:        06/27/2012 07:57 PM
      Subject:        Re: wierd communication errors



      Same issue using netty as well?


      On 6/27/12 6:14 PM, Yuanyuan Tian wrote:
      Hi,

      I was running a giraph job where I constantly got the following
      communication related errors. The symptom is that in super step 0,
      most of the workers succeeded but a few of the workers produced the
      errors below, the machines that caused the connection reset are
      different in each failed worker. To rule out the probability of the
      cluster setup error, I also ran a different job and it worked fine.
      So, the error must be caused by this particular giraph job. My giraph
      job is just normal message propagation type of job, except that the
      message is not a of a unique type. Therefore, I defined a special
      message type (also copied in this email) that incorporates two
      different types of messages: integer message and double array
      message.  I have tried all day but still couldn't ping point the
      source of the bug. Can anyone give me some hints on what may have
      caused this error?

      Thanks a lot,

      java.lang.IllegalStateException: run: Caught an unrecoverable
      exception flush: Got ExecutionException
                    at org.apache.giraph.graph.GraphMapper.run
      (GraphMapper.java:859)
                    at org.apache.hadoop.mapred.MapTask.runNewMapper
      (MapTask.java:763)
                    at org.apache.hadoop.mapred.MapTask.run
      (MapTask.java:369)
                    at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
                    at java.security.AccessController.doPrivileged(Native
      Method)
                    at javax.security.auth.Subject.doAs(Subject.java:396)
                    at org.apache.hadoop.security.UserGroupInformation.doAs
      (UserGroupInformation.java:1059)
                    at org.apache.hadoop.mapred.Child.main(Child.java:253)
      Caused by: java.lang.IllegalStateException: flush: Got
      ExecutionException
                    at org.apache.giraph.comm.BasicRPCCommunications.flush
      (BasicRPCCommunications.java:1082)
                    at
      org.apache.giraph.graph.BspServiceWorker.finishSuperstep
      (BspServiceWorker.java:1080)
                    at org.apache.giraph.graph.GraphMapper.map
      (GraphMapper.java:806)
                    at org.apache.giraph.graph.GraphMapper.run
      (GraphMapper.java:850)
                    ... 7 more
      Caused by: java.util.concurrent.ExecutionException:
      java.lang.reflect.UndeclaredThrowableException
                    at java.util.concurrent.FutureTask$Sync.innerGet
      (FutureTask.java:222)
                    at java.util.concurrent.FutureTask.get
      (FutureTask.java:83)
                    at org.apache.giraph.comm.BasicRPCCommunications.flush
      (BasicRPCCommunications.java:1077)
                    ... 10 more
      Caused by: java.lang.reflect.UndeclaredThrowableException
                    at $Proxy3.getName(Unknown Source)
                    at org.apache.giraph.comm.BasicRPCCommunications
      $PeerFlushExecutor.run(BasicRPCCommunications.java:335)
                    at java.util.concurrent.Executors$RunnableAdapter.call
      (Executors.java:441)
                    at java.util.concurrent.FutureTask$Sync.innerRun
      (FutureTask.java:303)
                    at java.util.concurrent.FutureTask.run
      (FutureTask.java:138)
                    at java.util.concurrent.ThreadPoolExecutor
      $Worker.runTask(ThreadPoolExecutor.java:886)
                    at java.util.concurrent.ThreadPoolExecutor$Worker.run
      (ThreadPoolExecutor.java:908)
                    at java.lang.Thread.run(Thread.java:662)
      Caused by: java.io.IOException: Call to
      idp35.almaden.ibm.com/172.16.0.35:30083 failed on local exception:
      java.io.IOException: Connection reset by peer
                    at org.apache.hadoop.ipc.Client.wrapException
      (Client.java:1065)
                    at org.apache.hadoop.ipc.Client.call(Client.java:1033)
                    at org.apache.hadoop.ipc.RPC$Invoker.invoke
      (RPC.java:224)
                    ... 8 more
      Caused by: java.io.IOException: Connection reset by peer
                    at sun.nio.ch.FileDispatcher.read0(Native Method)
                    at sun.nio.ch.SocketDispatcher.read
      (SocketDispatcher.java:21)
                    at sun.nio.ch.IOUtil.readIntoNativeBuffer
      (IOUtil.java:202)
                    at sun.nio.ch.IOUtil.read(IOUtil.java:175)
                    at sun.nio.ch.SocketChannelImpl.read
      (SocketChannelImpl.java:243)
                    at org.apache.hadoop.net.SocketInputStream
      $Reader.performIO(SocketInputStream.java:55)
                    at org.apache.hadoop.net.SocketIOWithTimeout.doIO
      (SocketIOWithTimeout.java:142)
                    at org.apache.hadoop.net.SocketInputStream.read
      (SocketInputStream.java:155)
                    at org.apache.hadoop.net.SocketInputStream.read
      (SocketInputStream.java:128)
                    at java.io.FilterInputStream.read
      (FilterInputStream.java:116)
                    at org.apache.hadoop.ipc.Client$Connection
      $PingInputStream.read(Client.java:343)
                    at java.io.BufferedInputStream.fill
      (BufferedInputStream.java:218)
                    at java.io.BufferedInputStream.read
      (BufferedInputStream.java:237)
                    at java.io.DataInputStream.readInt
      (DataInputStream.java:370)
                    at org.apache.hadoop.ipc.Client
      $Connection.receiveResponse(Client.java:767)
                    at org.apache.hadoop.ipc.Client$Connection.run
      (Client.java:712)

      My special messge type:

      public class MyMessageWritable implements Writable{

            public byte msgType=0;
            public long vertexID=-1;
            public double[] arrayMsg=null;
            public int intMsg=-1;

            public MyMessageWritable ()
            {
            }

            public MyMessageWritable (long id, byte tp, int msg)
            {
                    vertexID=id;
                    msgType=tp;
                    intMsg=msg;
            }

            public MyMessageWritable (long id, byte tp, double[] arr)
            {
                    vertexID=id;
                    msgType=tp;
                    arrayMsg=arr;
            }

            @Override
            public void readFields(DataInput in) throws IOException {
                    vertexID=in.readLong();
                    msgType=in.readByte();
                    switch(msgType)
                    {
                    case 1:
                    case 4:
                            intMsg=in.readInt();
                            break;
                    case 2:
                    case 3:
                            if(arrayMsg==null)
                                    arrayMsg=new double[MyVertex.K];
                            for(int i=0; i<MyVertex.K; i++)
                                    arrayMsg[i]=in.readDouble();
                            break;
                    default:
                                    throw new IOException("message type
      invalid: "+msgType);
                    }
            }

            @Override
            public void write(DataOutput out) throws IOException {
                    out.writeLong(vertexID);
                    out.writeByte(msgType);
                    switch(msgType)
                    {
                    case 1:
                    case 4:
                            out.writeInt(intMsg);
                            break;
                    case 2:
                    case 3:
                            if(arrayMsg==null)
                                    throw new IOException("array message is
      null");
                            for(int i=0; i<MyVertex.K; i++)
                                    out.writeDouble(arrayMsg[i]);
                            break;
                    default:
                                    throw new IOException("message type
      invalid: "+msgType);
                    }

            }




Mime
View raw message