Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 01397200D26 for ; Fri, 20 Oct 2017 23:29:27 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F3C52160BCB; Fri, 20 Oct 2017 21:29:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1E2F11609ED for ; Fri, 20 Oct 2017 23:29:25 +0200 (CEST) Received: (qmail 87440 invoked by uid 500); 20 Oct 2017 21:29:24 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 87429 invoked by uid 99); 20 Oct 2017 21:29:24 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Oct 2017 21:29:24 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id DA0491800FF for ; Fri, 20 Oct 2017 21:29:23 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.261 X-Spam-Level: X-Spam-Status: No, score=0.261 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, HTML_OBFUSCATE_10_20=1.162, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-2.8, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id NYM-rP_k_m2d for ; Fri, 20 Oct 2017 21:29:22 +0000 (UTC) Received: from mail-vk0-f43.google.com (mail-vk0-f43.google.com [209.85.213.43]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 333195F566 for ; Fri, 20 Oct 2017 21:29:22 +0000 (UTC) Received: by mail-vk0-f43.google.com with SMTP id q13so8165748vkb.2 for ; Fri, 20 Oct 2017 14:29:22 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=kE16dHAJQYd0pu8ECJffOHXB04/Am/fEvx5cjJjajbY=; b=L20Z3Vn5xVuUYoTfApSXnhsnHziCcfhOTlE0Qub7lj8eZTKY7PYw9zCuS8WeyZy0kT /Ch68fwglUxJdxMht17HbBSxp0HLO5VCDHdb9FpkJnw8XE38JabpjoaaNTUJvRhKdtVm 8ABLTJlF8NFv/M/r3ukZm5AKOpKQqzIO00L/sIl/rtYUiGQlhgXwBYvJt5gsflaH2uga 97BSea2OfB4CDjjEHwtnzPanCI+4v1efN2c6k+9/W40aLEzRGGgCBIFS62GnzIJb0kFB CIkDrz3RpJr5zMi86u+mM1yxHZi3Qb+EpI27StuK13tOeZQCWejfqWw0e3laVXlv5k9W /tNw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=kE16dHAJQYd0pu8ECJffOHXB04/Am/fEvx5cjJjajbY=; b=rv1L/o84R1i2ZXtYpEu7kD5ke7zN06cNFdPCKF4UqEc1LiGVOBwMO/US3q90fDKXmW XETOyWq/UwZrBeVZGWfpUSZvk0X5cexdlD4bdLFlawZ1q2IJ8d2PIN4HI3wkABNEY+MZ BZ3w+QF2FrX66Aq7HirmCK9Re1DoQLGUYSprXZ4s70/c0T37gm85u+MOAWoCJdXZyZ+J OUxH0gOhLgMqpsi1ZaO8vtN79oClmQVOq1KMawoV/YM3kHmmwGDkEeBZ78t8fX7xw9Q0 KeorIeFDXcU5ElrlEhhUNbMe3Mzm2LhDVLy5ro1hzEEQuslEEyUxch7AhCQwx+B3hCLt mfMA== X-Gm-Message-State: AMCzsaVFjz/42jUWsxrio4hOETukLfQgZHm2PDJi9mQoyAWtWsAY5eDD 6ci9IVvkKVIqBhm1FCjudBwhgTKTcdXIfUlvX2c= X-Google-Smtp-Source: ABhQp+SdYluRRe5dsc/U5cZ2wBK9DNM28+hxtk7P65vNfnd1MyLekHEtcz+yJunAEYCZ1qqA/TCD+pmT40xJYLJIam4= X-Received: by 10.31.137.149 with SMTP id l143mr4314969vkd.58.1508534961734; Fri, 20 Oct 2017 14:29:21 -0700 (PDT) MIME-Version: 1.0 Received: by 10.103.12.69 with HTTP; Fri, 20 Oct 2017 14:28:41 -0700 (PDT) In-Reply-To: References: From: Fabian Hueske Date: Fri, 20 Oct 2017 23:28:41 +0200 Message-ID: Subject: Re: problem with increase job parallelism To: Lei Chen Cc: user Content-Type: multipart/alternative; boundary="001a11458462868b88055c012a89" archived-at: Fri, 20 Oct 2017 21:29:27 -0000 --001a11458462868b88055c012a89 Content-Type: text/plain; charset="UTF-8" Hi Lei, setting explicit operator ID should solve this issue. As far as I know, the auto-generated operator id also depended on the operator parallelism in previous versions of Flink (not sure until which point). Which version are you running? Best, Fabian 2017-10-17 3:15 GMT+02:00 Lei Chen : > Hi, > > We're trying to implement some module to help autoscale our pipeline which > is built with Flink on YARN. According to the document, the suggested > procedure seems to be: > > 1. cancel job with savepoint > 2. start new job with increased YARN TM number and parallelism. > > However, step 2 always gave error > > Caused by: java.lang.IllegalStateException: Failed to rollback to > savepoint hdfs://10.106.238.14:/tmp/savepoint-767421-20907d234655. Cannot > map savepoint state for operator 37dfe905df17858e07858039ce3d8ae1 to the > new program, because the operator is not available in the new program. If > you want to allow to skip this, you can set the --allowNonRestoredState > option on the CLI. > at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoade > r.loadAndValidateSavepoint(SavepointLoader.java:130) > at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.re > storeSavepoint(CheckpointCoordinator.java:1140) > at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$ > apache$flink$runtime$jobmanager$JobManager$$submitJob$1. > apply$mcV$sp(JobManager.scala:1386) > at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$ > apache$flink$runtime$jobmanager$JobManager$$submitJob$1. > apply(JobManager.scala:1372) > at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$ > apache$flink$runtime$jobmanager$JobManager$$submitJob$1. > apply(JobManager.scala:1372) > at scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte > dTree1$1(Future.scala:24) > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F > uture.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask. > exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For > kJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo > l.java:1979) > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW > orkerThread.java:107) > > The procedure worked fine if parallelism was not changed. > > Also want to mention that I didn't manually specify OperatorID in my job. The > document does mentioned manually OperatorID assignment is suggested, just > curious is that mandatory in my case to fix the problem I'm seeing, given > that my program doesn't change at all so the autogenerated operatorID > should be unchanged after parallelism increase? > > thanks, > Lei > --001a11458462868b88055c012a89 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Lei,

settin= g explicit operator ID should solve this issue.
As far as I know, = the auto-generated operator id also depended on the operator parallelism in= previous versions of Flink (not sure until which point).

Which version are you running?

Best, Fabian

2017= -10-17 3:15 GMT+02:00 Lei Chen <leyncl2@gmail.com>:
Hi,=C2=A0

We're trying to implement some module to help autoscale= our pipeline which is built=C2=A0 with Flink on YARN. According to the doc= ument, the suggested procedure seems to be:

1. cancel job with savepoin= t
2. start new job with increased YARN= TM number and parallelism.=C2=A0

=
However, step 2 always gave error=C2= =A0

Caused by: java.lang.IllegalStateException: Failed to rol= lback to savepoint hdfs://10.106.238.14:/tmp/savepoint-767421-20907d23= 4655. Cannot map savepoint state for operator 37dfe905df17858e07858039ce3d8= ae1 to the new program, because the operator is not available in the n= ew program. If you want to allow to skip this, you can set the --allowNonRe= storedState option on the CLI.
at org.apache.flink.runtime.checkpoint.savepoint.SavepointL= oader.loadAndValidateSavepoint(SavepointLoader.java:130)
at org.apache.flink.runtime.= checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1140)
= at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$ap= ache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$s= p(JobManager.scala:1386)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$o= rg$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.a= pply(JobManager.scala:1372)
= at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org= $apache$flink$runtime$jobmanager$JobManager$$submitJob$1.app= ly(JobManager.scala:1372)
<= /span>at scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte<= wbr>dTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.ru= n(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch= .ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispa= tcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.j= ava:260)
at scala.co= ncurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.jav= a:1339)
at scala.con= current.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.f= orkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The procedure worked fine if parallelism was not changed.=C2=A0

Also want to mention that I didn't m= anually specify OperatorID in my job.=C2=A0The document does mention= ed manually OperatorID assignment is suggested, just curious is that mandat= ory in my case to fix the problem I'm seeing, given that my program doe= sn't change at all so the autogenerated operatorID should be unchanged = after parallelism increase?

=
thanks,
Lei

--001a11458462868b88055c012a89--