Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 622DB200D23 for ; Thu, 19 Oct 2017 18:47:58 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 60B991609EE; Thu, 19 Oct 2017 16:47:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 80B5E1609D7 for ; Thu, 19 Oct 2017 18:47:57 +0200 (CEST) Received: (qmail 28639 invoked by uid 500); 19 Oct 2017 16:47:51 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 28629 invoked by uid 99); 19 Oct 2017 16:47:51 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Oct 2017 16:47:51 +0000 Received: from [192.168.0.3] (ip5f5bd695.dynamic.kabel-deutschland.de [95.91.214.149]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id F003F1A0019; Thu, 19 Oct 2017 16:47:49 +0000 (UTC) From: Aljoscha Krettek Message-Id: <00D1BD7E-25B8-493F-AAE5-F4C98036CC3B@apache.org> Content-Type: multipart/alternative; boundary="Apple-Mail=_E69A6C31-7082-42D6-81A2-A7D9CD9AA4B4" Mime-Version: 1.0 (Mac OS X Mail 11.0 \(3445.1.7\)) Subject: Re: problem scale Flink job on YARN Date: Thu, 19 Oct 2017 18:47:46 +0200 In-Reply-To: Cc: user@flink.apache.org To: Lei Chen References: X-Mailer: Apple Mail (2.3445.1.7) archived-at: Thu, 19 Oct 2017 16:47:58 -0000 --Apple-Mail=_E69A6C31-7082-42D6-81A2-A7D9CD9AA4B4 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=us-ascii Hi Lei, Which version of Flink would that be? I'm guessing >=3D 1.3.x? In Flink = 1.1 the hash of an operator was tied to the parallelism but starting = with 1.2 that shouldn't happen anymore. Also, are you changing the parallelism job-wide or are there operators = with differing parallelism? For example, could there be a source with = parallelism 1 and an operator that had parallelism 1 after that which = now has a different parallelism? Best, Aljoscha > On 16. Oct 2017, at 06:28, Lei Chen wrote: >=20 > Hi,=20 >=20 > We're trying to implement some module to help autoscale our pipeline = which is built with Flink on YARN. According to the document, the = suggested procedure seems to be: >=20 > 1. cancel job with savepoint > 2. start new job with increased YARN TM number and parallelism.=20 >=20 > However, step 2 always gave error=20 >=20 > Caused by: java.lang.IllegalStateException: Failed to rollback to = savepoint hdfs://10.106.238.14:/tmp/savepoint-767421-20907d234655. = Cannot map savepoint state for operator 37dfe905df17858e07858039ce3d8ae1 = to the new program, because the operator is not available in the new = program. If you want to allow to skip this, you can set the = --allowNonRestoredState option on the CLI. > at = org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValid= ateSavepoint(SavepointLoader.java:130) > at = org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint= (CheckpointCoordinator.java:1140) > at = org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$r= untime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:13= 86) > at = org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$r= untime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372) > at = org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$r= untime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372) > at = scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Futur= e.scala:24) > at = scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24= ) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at = akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractD= ispatcher.scala:397) > at = scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at = scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java= :1339) > at = scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at = scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.ja= va:107) >=20 > The procedure worked fine if parallelism was not changed.=20 >=20 > Also want to mention that I didn't manually specify OperatorID in my = job. The document does mentioned manually OperatorID assignment is = suggested, just curious is that mandatory in my case to fix the problem = I'm seeing, given that my program doesn't change at all so the = autogenerated operatorID should be unchanged after parallelism increase? >=20 > thanks, > Lei --Apple-Mail=_E69A6C31-7082-42D6-81A2-A7D9CD9AA4B4 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=us-ascii Hi = Lei,

Which version = of Flink would that be? I'm guessing >=3D 1.3.x? In Flink 1.1 the = hash of an operator was tied to the parallelism but starting with 1.2 = that shouldn't happen anymore.

Also, are you changing the parallelism = job-wide or are there operators with differing parallelism? For example, = could there be a source with parallelism 1 and an operator that had = parallelism 1 after that which now has a different = parallelism?

Best,
Aljoscha

On 16. = Oct 2017, at 06:28, Lei Chen <leyncl@gmail.com> wrote:

Hi, 

We're trying = to implement some module to help autoscale our pipeline which is = built  with Flink on YARN. According to the document, the suggested = procedure seems to be:
1. cancel = job with savepoint
2. = start new job with increased YARN TM number and = parallelism. 

However, = step 2 always gave error 

Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint hdfs://10.106.238.14:/tmp/savepoint-767421-20907d234655. Cannot map savepoint state for = operator 37dfe905df17858e07858039ce3d8ae1 to the new = program, because the operator is not available in the new program. If = you want to allow to skip this, you can set the --allowNonRestoredState = option on the CLI.
at = org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:130)
at = org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1140)
= at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1386)
at = org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
at = org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
at = scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at = scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at = akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
= at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at = scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at = scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
= at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at = scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The procedure worked fine if = parallelism was not changed. 

Also want to = mention that I didn't manually specify OperatorID in my = job. The document does mentioned manually OperatorID = assignment is suggested, just curious is that mandatory in my case to = fix the problem I'm seeing, given that my program doesn't change at all = so the autogenerated operatorID should be unchanged after parallelism = increase?

thanks,
Lei

= --Apple-Mail=_E69A6C31-7082-42D6-81A2-A7D9CD9AA4B4--