flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kashmar, Ali" <Ali.Kash...@emc.com>
Subject Re: Task Parallelism in a Cluster
Date Mon, 14 Dec 2015 15:47:45 GMT
Hi Stephan,

I figured it out. The problem was that the date/time was different on all
3 nodes. Zookeeper thought that it hadn’t heard from the other nodes for
longer than the allowed period and dropped them, therefore causing the
other two task managers in the cluster to fail. I synchronized the time
between the 3 nodes and reran the test. It’s running very smoothly now.

Thanks again for your help.

-Ali

On 2015-12-11, 12:03 PM, "Kashmar, Ali" <Ali.Kashmar@emc.com> wrote:

>Hi Stephan,
>
>I’m using DataStream.writeAsText(String path, WriteMode writemode) for my
>sink. The data is written to disk and there’s plenty of space available.
>
>I looked deeper into the logs and found out that the jobs on 174 and 175
>are not actually stuck, but they’re moving extremely slowly, This is an
>excerpt from the task manager log on 175:
>
>03:44:43,307 INFO 
>com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
>254 to read a 1000 lines
>03:44:43,315 INFO 
>com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
>254 to read a 1000 lines
>03:46:09,360 INFO  org.apache.zookeeper.ClientCnxn
>      - Opening socket connection to server
>192.168.200.173/192.168.200.173:2181. Will not attempt to au
>thenticate using SASL (unknown error)
>03:46:09,361 INFO  org.apache.zookeeper.ClientCnxn
>      - Client session timed out, have not heard from server in 86223ms
>for sessionid 0x25181a544860091,
> closing socket connection and attempting reconnect
>03:46:09,362 WARN 
>org.apache.flink.shaded.org.apache.curator.ConnectionState    - Connection
>attempt unsuccessful after 86221 (greater than max timeout of 60000).
>Resetting conne
>ction and trying again with a new connection.
>03:46:09,391 INFO 
>com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
>86222 to read a 1000 lines
>03:46:09,394 INFO 
>com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
>86243 to read a 1000 lines
>03:46:09,439 INFO 
>com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
>86224 to read a 1000 lines
>03:46:09,445 INFO 
>com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
>86217 to read a 1000 lines
>03:46:09,462 INFO  org.apache.zookeeper.ZooKeeper
>      - Session: 0x25181a544860091 closed
>03:46:09,462 INFO  org.apache.zookeeper.ZooKeeper
>      - Initiating client connection,
>connectString=192.168.200.173:2181,192.168.200.174:2181,192.168.20
>0.175:2181 sessionTimeout=60000
>watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@550a196
>7
>03:46:09,462 INFO  org.apache.zookeeper.ClientCnxn
>      - EventThread shut down
>03:46:09,463 INFO  org.apache.zookeeper.ClientCnxn
>      - Opening socket connection to server
>192.168.200.174/192.168.200.174:2181. Will not attempt to au
>thenticate using SASL (unknown error)
>03:46:09,463 ERROR
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl  - Background operation retry gave up
>org.apache.zookeeper.KeeperException$ConnectionLossException:
>KeeperErrorCode = ConnectionLoss
>        at 
>org.apache.zookeeper.KeeperException.create(KeeperException.java:99)
>        at 
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl.checkBackgroundRetry(CuratorFrameworkImpl.java:708)
>        at 
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl.performBackgroundOperation(CuratorFrameworkImpl.java:826)
>        at 
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792)
>        at 
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl.access$300(CuratorFrameworkImpl.java:62)
>        at 
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl$4.call(CuratorFrameworkImpl.java:257)
>        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>        at 
>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:
>1
>142)
>        at 
>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java
>:
>617)
>        at java.lang.Thread.run(Thread.java:745)
>03:46:09,464 ERROR
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl  - Background retry gave up
>org.apache.flink.shaded.org.apache.curator.CuratorConnectionLossException:
>KeeperErrorCode = ConnectionLoss
>        at 
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl.performBackgroundOperation(CuratorFrameworkImpl.java:809)
>        at 
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792)
>        at 
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl.access$300(CuratorFrameworkImpl.java:62)
>        at 
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl$4.call(CuratorFrameworkImpl.java:257)
>        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>        at 
>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:
>1
>142)
>        at 
>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java
>:
>617)
>        at java.lang.Thread.run(Thread.java:745)
>03:46:09,464 INFO  org.apache.zookeeper.ClientCnxn
>      - Socket connection established to
>192.168.200.174/192.168.200.174:2181, initiating session
>03:46:09,468 INFO  org.apache.zookeeper.ClientCnxn
>      - Session establishment complete on server
>192.168.200.174/192.168.200.174:2181, sessionid = 0x25181a544860094,
>negotiated timeout = 40000
>03:46:09,469 INFO 
>org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionState
>M
>anager  - State change: RECONNECTED
>03:46:09,475 INFO 
>com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
>86212 to read a 1000 lines
>03:46:09,523 INFO 
>com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
>86217 to read a 1000 lines
>
>
>
>You’ll notice that at some point it takes 254 milliseconds to process a
>1000 lines of input, and then it jumps 86 seconds!! And I also see some
>zookeeper exceptions that lead me to believe that it’s a networking
>problem. I have 4 VMs running on 4 different hosts, and connected via a
>10G NIC.
>
>Thanks,
>Ali
>
>
>On 2015-12-11, 11:23 AM, "Stephan Ewen" <sewen@apache.org> wrote:
>
>>Hi Ali!
>>
>>I see, so the tasks 192.168.200.174 and 192.168.200.175 apparently do not
>>make progress, even do not recognize the end-of-stream point.
>>
>>I expect that the streams on 192.168.200.174 and 192.168.200.175 are
>>back-pressured to a stand-still. Since no network is involved, the reason
>>for the back pressure are probably the sinks.
>>
>>What kind of data sink are you using (in the addSink()) function?
>>Can you check if that one starts to fully block on machines
>>192.168.200.174 and 192.168.200.175 ?
>>
>>Greetings,
>>Stephan
>>
>>
>>
>>On Fri, Dec 11, 2015 at 4:50 PM, Kashmar, Ali <Ali.Kashmar@emc.com>
>>wrote:
>>
>>> Hi Stephan,
>>>
>>> I got a request to share the image with someone and I assume it was
>>>you.
>>> You should be able to see it now. This seems to be the main issue I
>>>have
>>> at this time. I've tried running the job on the cluster with a
>>>parallelism
>>> of 16, 24, 36, and even went up to 48. I see all the parallel pipelines
>>> working for a bit and then some of them just stop, I’m not sure if
>>>they’re
>>> stuck or not. Here’s another screenshot:
>>> http://postimg.org/image/gr6ogxqjj/
>>>
>>> Two things you’ll notice:
>>> 1. Pipelines on 192.168.200.174 and 192.168.200.175 have stopped doing
>>> anything at one point and only 192.168.200.173 is doing all the work.
>>> 2. Pipelines on 192.168.200.174 and 192.168.200.175 don’t have an end
>>>time
>>> even though the job should be finished (the screenshot was taken after
>>>the
>>> source was closed).
>>>
>>> I’m not sure if this helps or not, but here are some properties from
>>>the
>>> flink-conf.yaml:
>>>
>>> jobmanager.heap.mb: 8192
>>> taskmanager.heap.mb: 49152
>>> taskmanager.numberOfTaskSlots: 16
>>> parallelism.default: 1
>>>
>>> state.backend: filesystem
>>> state.backend.fs.checkpointdir: file:///tmp/flink-checkpoints
>>>
>>> taskmanager.network.numberOfBuffers: 3072
>>>
>>> recovery.mode: zookeeper
>>> recovery.zookeeper.quorum:
>>> 192.168.200.173:2181,192.168.200.174:2181,192.168.200.175:2181
>>> recovery.zookeeper.storageDir: file:///tmp/zk-recovery
>>> recovery.zookeeper.path.root: /opt/flink-0.10.0
>>>
>>> I appreciate all the help.
>>>
>>>
>>> Thanks,
>>> Ali
>>>
>>>
>>> On 2015-12-10, 10:16 AM, "Stephan Ewen" <sewen@apache.org> wrote:
>>>
>>> >Hi Ali!
>>> >
>>> >Seems like the Google Doc has restricted access, I tells me I have no
>>> >permission to view it...
>>> >
>>> >Stephan
>>> >
>>> >
>>> >On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <Ali.Kashmar@emc.com>
>>>wrote:
>>> >
>>> >> Hi Stephan,
>>> >>
>>> >> Here’s a link to the screenshot I tried to attach earlier:
>>> >>
>>> >> https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28
>>> >>
>>> >> It looks to me like the distribution is fairly skewed across the
>>>nodes,
>>> >> even though they’re executing the same pipeline.
>>> >>
>>> >> Thanks,
>>> >> Ali
>>> >>
>>> >>
>>> >> On 2015-12-09, 12:36 PM, "Stephan Ewen" <sewen@apache.org> wrote:
>>> >>
>>> >> >Hi!
>>> >> >
>>> >> >The parallel socket source looks good.
>>> >> >I think you forgot to attach the screenshot, or the mailing list
>>> >>dropped
>>> >> >the attachment...
>>> >> >
>>> >> >Not sure if I can diagnose that without more details. The sources
>>>all
>>> >>do
>>> >> >the same. Assuming that the server distributes the data evenly
>>>across
>>> >>all
>>> >> >connected sockets, and that the network bandwidth ends up being
>>> >>divided in
>>> >> >a fair way, all pipelines should run be similarly "eager".
>>> >> >
>>> >> >Greetings,
>>> >> >Stephan
>>> >> >
>>> >> >
>>> >> >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <Ali.Kashmar@emc.com>
>>> >>wrote:
>>> >> >
>>> >> >> Hi Stephan,
>>> >> >>
>>> >> >> That was my original understanding, until I realized that I
was
>>>not
>>> >> >>using
>>> >> >> a parallel socket source. I had a custom source that extended
>>> >> >> SourceFunction which always runs with parallelism = 1. I looked
>>> >>through
>>> >> >> the API and found the ParallelSourceFunction interface so I
>>> >>implemented
>>> >> >> that and voila, now all 3 nodes in the cluster are actually
>>>receiving
>>> >> >> traffic on socket connections.
>>> >> >>
>>> >> >> Now that I’m running it successfully end to end, I’m trying
to
>>> >>improve
>>> >> >>the
>>> >> >> performance. Can you take a look at the attached screen shot
and
>>> >>tell me
>>> >> >> if the distribution of work amongst the pipelines is normal?
I
>>>feel
>>> >>like
>>> >> >> some pipelines are lot lazier than others, even though the
>>>cluster
>>> >>nodes
>>> >> >> are exactly the same.
>>> >> >>
>>> >> >> By the way, here’s the class I wrote. It would be useful
to have
>>>this
>>> >> >> available in Flink distro:
>>> >> >>
>>> >> >> public class ParallelSocketSource implements
>>> >> >> ParallelSourceFunction<String> {
>>> >> >>
>>> >> >>         private static final long serialVersionUID =
>>> >> >>-271094428915640892L;
>>> >> >>         private static final Logger LOG =
>>> >> >> LoggerFactory.getLogger(ParallelSocketSource.class);
>>> >> >>
>>> >> >>         private volatile boolean running = true;
>>> >> >>         private String host;
>>> >> >>         private int port;
>>> >> >>
>>> >> >>         public ParallelSocketSource(String host, int port)
{
>>> >> >>                 this.host = host;
>>> >> >>                 this.port = port;
>>> >> >>         }
>>> >> >>
>>> >> >>         @Override
>>> >> >>         public void run(SourceContext<String> ctx) throws
>>>Exception {
>>> >> >>                 try (Socket socket = new Socket(host, port);
>>> >> >>                         BufferedReader reader = new
>>> >>BufferedReader(new
>>> >> >> InputStreamReader(socket.getInputStream()))) {
>>> >> >>                         String line  = null;
>>> >> >>                         while(running && ((line =
>>>reader.readLine())
>>> >>!=
>>> >> >> null)) {
>>> >> >>                                 ctx.collect(line);
>>> >> >>                         }
>>> >> >>                 } catch(IOException ex) {
>>> >> >>                         LOG.error("error reading from socket",
>>>ex);
>>> >> >>                 }
>>> >> >>         }
>>> >> >>
>>> >> >>         @Override
>>> >> >>         public void cancel() {
>>> >> >>                 running = false;
>>> >> >>         }
>>> >> >> }
>>> >> >>
>>> >> >> Regards,
>>> >> >> Ali
>>> >> >>
>>> >> >>
>>> >> >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <sewen@apache.org>
wrote:
>>> >> >>
>>> >> >> >Hi Ali!
>>> >> >> >
>>> >> >> >In the case you have, the sequence of source-map-filter
...
>>>forms a
>>> >> >> >pipeline.
>>> >> >> >
>>> >> >> >You mentioned that you set the parallelism to 16, so there
>>>should
>>> >>be 16
>>> >> >> >pipelines. These pipelines should be completely independent.
>>> >> >> >
>>> >> >> >Looking at the way the scheduler is implemented, independent
>>> >>pipelines
>>> >> >> >should be spread across machines. But when you execute
that in
>>> >> >>parallel,
>>> >> >> >you say all 16 pipelines end up on the same machine?
>>> >> >> >
>>> >> >> >Can you share with us the rough code of your program? Or
a
>>> >>Screenshot
>>> >> >>from
>>> >> >> >the runtime dashboard that shows the program graph?
>>> >> >> >
>>> >> >> >
>>> >> >> >If your cluster is basically for that one job only, you
could
>>>try
>>> >>and
>>> >> >>set
>>> >> >> >the number of slots to 4 for each machine. Then you have
16
>>>slots in
>>> >> >>total
>>> >> >> >and each node would run one of the 16 pipelines.
>>> >> >> >
>>> >> >> >
>>> >> >> >Greetings,
>>> >> >> >Stephan
>>> >> >> >
>>> >> >> >
>>> >> >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali
>>><Ali.Kashmar@emc.com>
>>> >> >>wrote:
>>> >> >> >
>>> >> >> >> There is no shuffle operation in my flow. Mine actually
looks
>>>like
>>> >> >>this:
>>> >> >> >>
>>> >> >> >> Source: Custom Source -> Flat Map -> (Filter
-> Flat Map ->
>>>Map ->
>>> >> >>Map
>>> >> >> >>->
>>> >> >> >> Map, Filter)
>>> >> >> >>
>>> >> >> >>
>>> >> >> >> Maybe it’s treating this whole flow as one pipeline
and
>>>assigning
>>> >>it
>>> >> >>to
>>> >> >> >>a
>>> >> >> >> slot. What I really wanted was to have the custom
source I
>>>built
>>> >>to
>>> >> >>have
>>> >> >> >> running instances on all nodes. I’m not really sure
if that’s
>>>the
>>> >> >>right
>>> >> >> >> approach, but if we could add this as a feature that’d
be
>>>great,
>>> >> >>since
>>> >> >> >> having more than one node running the same pipeline
guarantees
>>>the
>>> >> >> >> pipeline is never offline.
>>> >> >> >>
>>> >> >> >> -Ali
>>> >> >> >>
>>> >> >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <trohrmann@apache.org>
>>> >> wrote:
>>> >> >> >>
>>> >> >> >> >If I'm not mistaken, then the scheduler has already
a
>>>preference
>>> >>to
>>> >> >> >>spread
>>> >> >> >> >independent pipelines out across the cluster.
At least he
>>>uses a
>>> >> >>queue
>>> >> >> >>of
>>> >> >> >> >instances from which it pops the first element
if it
>>>allocates a
>>> >>new
>>> >> >> >>slot.
>>> >> >> >> >This instance is then appended to the queue again,
if it has
>>>some
>>> >> >> >> >resources
>>> >> >> >> >(slots) left.
>>> >> >> >> >
>>> >> >> >> >I would assume that you have a shuffle operation
involved in
>>>your
>>> >> >>job
>>> >> >> >>such
>>> >> >> >> >that it makes sense for the scheduler to deploy
all pipelines
>>>to
>>> >>the
>>> >> >> >>same
>>> >> >> >> >machine.
>>> >> >> >> >
>>> >> >> >> >Cheers,
>>> >> >> >> >Till
>>> >> >> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <sewen@apache.org>
>>>wrote:
>>> >> >> >> >
>>> >> >> >> >> Slots are like "resource groups" which execute
entire
>>> >>pipelines.
>>> >> >>They
>>> >> >> >> >> frequently have more than one operator.
>>> >> >> >> >>
>>> >> >> >> >> What you can try as a workaround is decrease
the number of
>>> >>slots
>>> >> >>per
>>> >> >> >> >> machine to cause the operators to be spread
across more
>>> >>machines.
>>> >> >> >> >>
>>> >> >> >> >> If this is a crucial issue for your use case,
it should be
>>> >>simple
>>> >> >>to
>>> >> >> >> >>add a
>>> >> >> >> >> "preference to spread out" to the scheduler...
>>> >> >> >> >>
>>> >> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar,
Ali
>>> >><Ali.Kashmar@emc.com
>>> >> >
>>> >> >> >> >>wrote:
>>> >> >> >> >>
>>> >> >> >> >> > Is there a way to make a task cluster-parallelizable?
>>>I.e.
>>> >>Make
>>> >> >> >>sure
>>> >> >> >> >>the
>>> >> >> >> >> > parallel instances of the task are distributed
across the
>>> >> >>cluster.
>>> >> >> >> >>When I
>>> >> >> >> >> > run my flink job with a parallelism
of 16, all the
>>>parallel
>>> >> >>tasks
>>> >> >> >>are
>>> >> >> >> >> > assigned to the first task manager.
>>> >> >> >> >> >
>>> >> >> >> >> > - Ali
>>> >> >> >> >> >
>>> >> >> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi"
<uce@apache.org>
>>> wrote:
>>> >> >> >> >> >
>>> >> >> >> >> > >
>>> >> >> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar,
Ali
>>> >><Ali.Kashmar@emc.com>
>>> >> >> >> wrote:
>>> >> >> >> >> > >> Do the parallel instances of
each task get distributed
>>> >>across
>>> >> >> >>the
>>> >> >> >> >> > >>cluster or is it possible that
they all run on the same
>>> >>node?
>>> >> >> >> >> > >
>>> >> >> >> >> > >Yes, slots are requested from all
nodes of the cluster.
>>>But
>>> >> >>keep
>>> >> >> >>in
>>> >> >> >> >>mind
>>> >> >> >> >> > >that multiple tasks (forming a local
pipeline) can be
>>> >> >>scheduled to
>>> >> >> >> >>the
>>> >> >> >> >> > >same slot (1 slot can hold many
tasks).
>>> >> >> >> >> > >
>>> >> >> >> >> > >Have you seen this?
>>> >> >> >> >> > >
>>> >> >> >> >> >
>>> >> >> >> >>
>>> >> >> >> >>
>>> >> >> >>
>>> >> >> >>
>>> >> >>
>>> >> >>
>>> >>
>>> >>
>>> 
>>>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/j
>>>o
>>> >> >> >> >>b
>>> >> >> >> >> > >_scheduling.html
>>> >> >> >> >> > >
>>> >> >> >> >> > >> If they can all run on the
same node, what happens
>>>when
>>> >>that
>>> >> >> >>node
>>> >> >> >> >> > >>crashes? Does the job manager
recreate them using the
>>> >> >>remaining
>>> >> >> >>open
>>> >> >> >> >> > >>slots?
>>> >> >> >> >> > >
>>> >> >> >> >> > >What happens: The job manager tries
to restart the
>>>program
>>> >>with
>>> >> >> >>the
>>> >> >> >> >>same
>>> >> >> >> >> > >parallelism. Thus if you have enough
free slots
>>>available in
>>> >> >>your
>>> >> >> >> >> > >cluster, this works smoothly (so
yes, the
>>> >>remaining/available
>>> >> >> >>slots
>>> >> >> >> >>are
>>> >> >> >> >> > >used)
>>> >> >> >> >> > >
>>> >> >> >> >> > >With a YARN cluster the task manager
containers are
>>> >>restarted
>>> >> >> >> >> > >automatically. In standalone mode,
you have to take care
>>>of
>>> >> >>this
>>> >> >> >> >> yourself.
>>> >> >> >> >> > >
>>> >> >> >> >> > >
>>> >> >> >> >> > >Does this help?
>>> >> >> >> >> > >
>>> >> >> >> >> > >­ Ufuk
>>> >> >> >> >> > >
>>> >> >> >> >> >
>>> >> >> >> >> >
>>> >> >> >> >>
>>> >> >> >>
>>> >> >> >>
>>> >> >>
>>> >>
>>> >>
>>>
>>>
>

Mime
View raw message