flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kashmar, Ali" <Ali.Kash...@emc.com>
Subject Re: Task Parallelism in a Cluster
Date Fri, 11 Dec 2015 17:03:50 GMT
Hi Stephan,

I’m using DataStream.writeAsText(String path, WriteMode writemode) for my
sink. The data is written to disk and there’s plenty of space available.

I looked deeper into the logs and found out that the jobs on 174 and 175
are not actually stuck, but they’re moving extremely slowly, This is an
excerpt from the task manager log on 175:

03:44:43,307 INFO  
com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
254 to read a 1000 lines
03:44:43,315 INFO  
com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
254 to read a 1000 lines
03:46:09,360 INFO  org.apache.zookeeper.ClientCnxn
      - Opening socket connection to server
192.168.200.173/192.168.200.173:2181. Will not attempt to au
thenticate using SASL (unknown error)
03:46:09,361 INFO  org.apache.zookeeper.ClientCnxn
      - Client session timed out, have not heard from server in 86223ms
for sessionid 0x25181a544860091,
 closing socket connection and attempting reconnect
03:46:09,362 WARN  
org.apache.flink.shaded.org.apache.curator.ConnectionState    - Connection
attempt unsuccessful after 86221 (greater than max timeout of 60000).
Resetting conne
ction and trying again with a new connection.
03:46:09,391 INFO  
com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
86222 to read a 1000 lines
03:46:09,394 INFO  
com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
86243 to read a 1000 lines
03:46:09,439 INFO  
com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
86224 to read a 1000 lines
03:46:09,445 INFO  
com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
86217 to read a 1000 lines
03:46:09,462 INFO  org.apache.zookeeper.ZooKeeper
      - Session: 0x25181a544860091 closed
03:46:09,462 INFO  org.apache.zookeeper.ZooKeeper
      - Initiating client connection,
connectString=192.168.200.173:2181,192.168.200.174:2181,192.168.20
0.175:2181 sessionTimeout=60000
watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@550a1967
03:46:09,462 INFO  org.apache.zookeeper.ClientCnxn
      - EventThread shut down
03:46:09,463 INFO  org.apache.zookeeper.ClientCnxn
      - Opening socket connection to server
192.168.200.174/192.168.200.174:2181. Will not attempt to au
thenticate using SASL (unknown error)
03:46:09,463 ERROR 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl  - Background operation retry gave up
org.apache.zookeeper.KeeperException$ConnectionLossException:
KeeperErrorCode = ConnectionLoss
        at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:99)
        at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl.checkBackgroundRetry(CuratorFrameworkImpl.java:708)
        at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl.performBackgroundOperation(CuratorFrameworkImpl.java:826)
        at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792)
        at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl.access$300(CuratorFrameworkImpl.java:62)
        at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl$4.call(CuratorFrameworkImpl.java:257)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1
142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:
617)
        at java.lang.Thread.run(Thread.java:745)
03:46:09,464 ERROR 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl  - Background retry gave up
org.apache.flink.shaded.org.apache.curator.CuratorConnectionLossException:
KeeperErrorCode = ConnectionLoss
        at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl.performBackgroundOperation(CuratorFrameworkImpl.java:809)
        at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792)
        at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl.access$300(CuratorFrameworkImpl.java:62)
        at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl$4.call(CuratorFrameworkImpl.java:257)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1
142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:
617)
        at java.lang.Thread.run(Thread.java:745)
03:46:09,464 INFO  org.apache.zookeeper.ClientCnxn
      - Socket connection established to
192.168.200.174/192.168.200.174:2181, initiating session
03:46:09,468 INFO  org.apache.zookeeper.ClientCnxn
      - Session establishment complete on server
192.168.200.174/192.168.200.174:2181, sessionid = 0x25181a544860094,
negotiated timeout = 40000
03:46:09,469 INFO  
org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateM
anager  - State change: RECONNECTED
03:46:09,475 INFO  
com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
86212 to read a 1000 lines
03:46:09,523 INFO  
com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
86217 to read a 1000 lines



You’ll notice that at some point it takes 254 milliseconds to process a
1000 lines of input, and then it jumps 86 seconds!! And I also see some
zookeeper exceptions that lead me to believe that it’s a networking
problem. I have 4 VMs running on 4 different hosts, and connected via a
10G NIC.

Thanks,
Ali


On 2015-12-11, 11:23 AM, "Stephan Ewen" <sewen@apache.org> wrote:

>Hi Ali!
>
>I see, so the tasks 192.168.200.174 and 192.168.200.175 apparently do not
>make progress, even do not recognize the end-of-stream point.
>
>I expect that the streams on 192.168.200.174 and 192.168.200.175 are
>back-pressured to a stand-still. Since no network is involved, the reason
>for the back pressure are probably the sinks.
>
>What kind of data sink are you using (in the addSink()) function?
>Can you check if that one starts to fully block on machines
>192.168.200.174 and 192.168.200.175 ?
>
>Greetings,
>Stephan
>
>
>
>On Fri, Dec 11, 2015 at 4:50 PM, Kashmar, Ali <Ali.Kashmar@emc.com> wrote:
>
>> Hi Stephan,
>>
>> I got a request to share the image with someone and I assume it was you.
>> You should be able to see it now. This seems to be the main issue I have
>> at this time. I've tried running the job on the cluster with a
>>parallelism
>> of 16, 24, 36, and even went up to 48. I see all the parallel pipelines
>> working for a bit and then some of them just stop, I’m not sure if
>>they’re
>> stuck or not. Here’s another screenshot:
>> http://postimg.org/image/gr6ogxqjj/
>>
>> Two things you’ll notice:
>> 1. Pipelines on 192.168.200.174 and 192.168.200.175 have stopped doing
>> anything at one point and only 192.168.200.173 is doing all the work.
>> 2. Pipelines on 192.168.200.174 and 192.168.200.175 don’t have an end
>>time
>> even though the job should be finished (the screenshot was taken after
>>the
>> source was closed).
>>
>> I’m not sure if this helps or not, but here are some properties from the
>> flink-conf.yaml:
>>
>> jobmanager.heap.mb: 8192
>> taskmanager.heap.mb: 49152
>> taskmanager.numberOfTaskSlots: 16
>> parallelism.default: 1
>>
>> state.backend: filesystem
>> state.backend.fs.checkpointdir: file:///tmp/flink-checkpoints
>>
>> taskmanager.network.numberOfBuffers: 3072
>>
>> recovery.mode: zookeeper
>> recovery.zookeeper.quorum:
>> 192.168.200.173:2181,192.168.200.174:2181,192.168.200.175:2181
>> recovery.zookeeper.storageDir: file:///tmp/zk-recovery
>> recovery.zookeeper.path.root: /opt/flink-0.10.0
>>
>> I appreciate all the help.
>>
>>
>> Thanks,
>> Ali
>>
>>
>> On 2015-12-10, 10:16 AM, "Stephan Ewen" <sewen@apache.org> wrote:
>>
>> >Hi Ali!
>> >
>> >Seems like the Google Doc has restricted access, I tells me I have no
>> >permission to view it...
>> >
>> >Stephan
>> >
>> >
>> >On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <Ali.Kashmar@emc.com>
>>wrote:
>> >
>> >> Hi Stephan,
>> >>
>> >> Here’s a link to the screenshot I tried to attach earlier:
>> >>
>> >> https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28
>> >>
>> >> It looks to me like the distribution is fairly skewed across the
>>nodes,
>> >> even though they’re executing the same pipeline.
>> >>
>> >> Thanks,
>> >> Ali
>> >>
>> >>
>> >> On 2015-12-09, 12:36 PM, "Stephan Ewen" <sewen@apache.org> wrote:
>> >>
>> >> >Hi!
>> >> >
>> >> >The parallel socket source looks good.
>> >> >I think you forgot to attach the screenshot, or the mailing list
>> >>dropped
>> >> >the attachment...
>> >> >
>> >> >Not sure if I can diagnose that without more details. The sources
>>all
>> >>do
>> >> >the same. Assuming that the server distributes the data evenly
>>across
>> >>all
>> >> >connected sockets, and that the network bandwidth ends up being
>> >>divided in
>> >> >a fair way, all pipelines should run be similarly "eager".
>> >> >
>> >> >Greetings,
>> >> >Stephan
>> >> >
>> >> >
>> >> >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <Ali.Kashmar@emc.com>
>> >>wrote:
>> >> >
>> >> >> Hi Stephan,
>> >> >>
>> >> >> That was my original understanding, until I realized that I was
>>not
>> >> >>using
>> >> >> a parallel socket source. I had a custom source that extended
>> >> >> SourceFunction which always runs with parallelism = 1. I looked
>> >>through
>> >> >> the API and found the ParallelSourceFunction interface so I
>> >>implemented
>> >> >> that and voila, now all 3 nodes in the cluster are actually
>>receiving
>> >> >> traffic on socket connections.
>> >> >>
>> >> >> Now that I’m running it successfully end to end, I’m trying
to
>> >>improve
>> >> >>the
>> >> >> performance. Can you take a look at the attached screen shot and
>> >>tell me
>> >> >> if the distribution of work amongst the pipelines is normal? I
>>feel
>> >>like
>> >> >> some pipelines are lot lazier than others, even though the cluster
>> >>nodes
>> >> >> are exactly the same.
>> >> >>
>> >> >> By the way, here’s the class I wrote. It would be useful to have
>>this
>> >> >> available in Flink distro:
>> >> >>
>> >> >> public class ParallelSocketSource implements
>> >> >> ParallelSourceFunction<String> {
>> >> >>
>> >> >>         private static final long serialVersionUID =
>> >> >>-271094428915640892L;
>> >> >>         private static final Logger LOG =
>> >> >> LoggerFactory.getLogger(ParallelSocketSource.class);
>> >> >>
>> >> >>         private volatile boolean running = true;
>> >> >>         private String host;
>> >> >>         private int port;
>> >> >>
>> >> >>         public ParallelSocketSource(String host, int port) {
>> >> >>                 this.host = host;
>> >> >>                 this.port = port;
>> >> >>         }
>> >> >>
>> >> >>         @Override
>> >> >>         public void run(SourceContext<String> ctx) throws
>>Exception {
>> >> >>                 try (Socket socket = new Socket(host, port);
>> >> >>                         BufferedReader reader = new
>> >>BufferedReader(new
>> >> >> InputStreamReader(socket.getInputStream()))) {
>> >> >>                         String line  = null;
>> >> >>                         while(running && ((line =
>>reader.readLine())
>> >>!=
>> >> >> null)) {
>> >> >>                                 ctx.collect(line);
>> >> >>                         }
>> >> >>                 } catch(IOException ex) {
>> >> >>                         LOG.error("error reading from socket",
>>ex);
>> >> >>                 }
>> >> >>         }
>> >> >>
>> >> >>         @Override
>> >> >>         public void cancel() {
>> >> >>                 running = false;
>> >> >>         }
>> >> >> }
>> >> >>
>> >> >> Regards,
>> >> >> Ali
>> >> >>
>> >> >>
>> >> >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <sewen@apache.org>
wrote:
>> >> >>
>> >> >> >Hi Ali!
>> >> >> >
>> >> >> >In the case you have, the sequence of source-map-filter ...
>>forms a
>> >> >> >pipeline.
>> >> >> >
>> >> >> >You mentioned that you set the parallelism to 16, so there
should
>> >>be 16
>> >> >> >pipelines. These pipelines should be completely independent.
>> >> >> >
>> >> >> >Looking at the way the scheduler is implemented, independent
>> >>pipelines
>> >> >> >should be spread across machines. But when you execute that
in
>> >> >>parallel,
>> >> >> >you say all 16 pipelines end up on the same machine?
>> >> >> >
>> >> >> >Can you share with us the rough code of your program? Or a
>> >>Screenshot
>> >> >>from
>> >> >> >the runtime dashboard that shows the program graph?
>> >> >> >
>> >> >> >
>> >> >> >If your cluster is basically for that one job only, you could
try
>> >>and
>> >> >>set
>> >> >> >the number of slots to 4 for each machine. Then you have 16
>>slots in
>> >> >>total
>> >> >> >and each node would run one of the 16 pipelines.
>> >> >> >
>> >> >> >
>> >> >> >Greetings,
>> >> >> >Stephan
>> >> >> >
>> >> >> >
>> >> >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali
>><Ali.Kashmar@emc.com>
>> >> >>wrote:
>> >> >> >
>> >> >> >> There is no shuffle operation in my flow. Mine actually
looks
>>like
>> >> >>this:
>> >> >> >>
>> >> >> >> Source: Custom Source -> Flat Map -> (Filter ->
Flat Map ->
>>Map ->
>> >> >>Map
>> >> >> >>->
>> >> >> >> Map, Filter)
>> >> >> >>
>> >> >> >>
>> >> >> >> Maybe it’s treating this whole flow as one pipeline
and
>>assigning
>> >>it
>> >> >>to
>> >> >> >>a
>> >> >> >> slot. What I really wanted was to have the custom source
I
>>built
>> >>to
>> >> >>have
>> >> >> >> running instances on all nodes. I’m not really sure
if that’s
>>the
>> >> >>right
>> >> >> >> approach, but if we could add this as a feature that’d
be
>>great,
>> >> >>since
>> >> >> >> having more than one node running the same pipeline guarantees
>>the
>> >> >> >> pipeline is never offline.
>> >> >> >>
>> >> >> >> -Ali
>> >> >> >>
>> >> >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <trohrmann@apache.org>
>> >> wrote:
>> >> >> >>
>> >> >> >> >If I'm not mistaken, then the scheduler has already
a
>>preference
>> >>to
>> >> >> >>spread
>> >> >> >> >independent pipelines out across the cluster. At least
he
>>uses a
>> >> >>queue
>> >> >> >>of
>> >> >> >> >instances from which it pops the first element if
it
>>allocates a
>> >>new
>> >> >> >>slot.
>> >> >> >> >This instance is then appended to the queue again,
if it has
>>some
>> >> >> >> >resources
>> >> >> >> >(slots) left.
>> >> >> >> >
>> >> >> >> >I would assume that you have a shuffle operation involved
in
>>your
>> >> >>job
>> >> >> >>such
>> >> >> >> >that it makes sense for the scheduler to deploy all
pipelines
>>to
>> >>the
>> >> >> >>same
>> >> >> >> >machine.
>> >> >> >> >
>> >> >> >> >Cheers,
>> >> >> >> >Till
>> >> >> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <sewen@apache.org>
>>wrote:
>> >> >> >> >
>> >> >> >> >> Slots are like "resource groups" which execute
entire
>> >>pipelines.
>> >> >>They
>> >> >> >> >> frequently have more than one operator.
>> >> >> >> >>
>> >> >> >> >> What you can try as a workaround is decrease
the number of
>> >>slots
>> >> >>per
>> >> >> >> >> machine to cause the operators to be spread across
more
>> >>machines.
>> >> >> >> >>
>> >> >> >> >> If this is a crucial issue for your use case,
it should be
>> >>simple
>> >> >>to
>> >> >> >> >>add a
>> >> >> >> >> "preference to spread out" to the scheduler...
>> >> >> >> >>
>> >> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali
>> >><Ali.Kashmar@emc.com
>> >> >
>> >> >> >> >>wrote:
>> >> >> >> >>
>> >> >> >> >> > Is there a way to make a task cluster-parallelizable?
I.e.
>> >>Make
>> >> >> >>sure
>> >> >> >> >>the
>> >> >> >> >> > parallel instances of the task are distributed
across the
>> >> >>cluster.
>> >> >> >> >>When I
>> >> >> >> >> > run my flink job with a parallelism of 16,
all the
>>parallel
>> >> >>tasks
>> >> >> >>are
>> >> >> >> >> > assigned to the first task manager.
>> >> >> >> >> >
>> >> >> >> >> > - Ali
>> >> >> >> >> >
>> >> >> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <uce@apache.org>
>> wrote:
>> >> >> >> >> >
>> >> >> >> >> > >
>> >> >> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar,
Ali
>> >><Ali.Kashmar@emc.com>
>> >> >> >> wrote:
>> >> >> >> >> > >> Do the parallel instances of each
task get distributed
>> >>across
>> >> >> >>the
>> >> >> >> >> > >>cluster or is it possible that they
all run on the same
>> >>node?
>> >> >> >> >> > >
>> >> >> >> >> > >Yes, slots are requested from all nodes
of the cluster.
>>But
>> >> >>keep
>> >> >> >>in
>> >> >> >> >>mind
>> >> >> >> >> > >that multiple tasks (forming a local
pipeline) can be
>> >> >>scheduled to
>> >> >> >> >>the
>> >> >> >> >> > >same slot (1 slot can hold many tasks).
>> >> >> >> >> > >
>> >> >> >> >> > >Have you seen this?
>> >> >> >> >> > >
>> >> >> >> >> >
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >>
>> >> >>
>> >>
>> >>
>> 
>>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo
>> >> >> >> >>b
>> >> >> >> >> > >_scheduling.html
>> >> >> >> >> > >
>> >> >> >> >> > >> If they can all run on the same
node, what happens when
>> >>that
>> >> >> >>node
>> >> >> >> >> > >>crashes? Does the job manager recreate
them using the
>> >> >>remaining
>> >> >> >>open
>> >> >> >> >> > >>slots?
>> >> >> >> >> > >
>> >> >> >> >> > >What happens: The job manager tries
to restart the
>>program
>> >>with
>> >> >> >>the
>> >> >> >> >>same
>> >> >> >> >> > >parallelism. Thus if you have enough
free slots
>>available in
>> >> >>your
>> >> >> >> >> > >cluster, this works smoothly (so yes,
the
>> >>remaining/available
>> >> >> >>slots
>> >> >> >> >>are
>> >> >> >> >> > >used)
>> >> >> >> >> > >
>> >> >> >> >> > >With a YARN cluster the task manager
containers are
>> >>restarted
>> >> >> >> >> > >automatically. In standalone mode, you
have to take care
>>of
>> >> >>this
>> >> >> >> >> yourself.
>> >> >> >> >> > >
>> >> >> >> >> > >
>> >> >> >> >> > >Does this help?
>> >> >> >> >> > >
>> >> >> >> >> > >­ Ufuk
>> >> >> >> >> > >
>> >> >> >> >> >
>> >> >> >> >> >
>> >> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> >>
>>
>>

Mime
View raw message