Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B712B200D2F for ; Wed, 1 Nov 2017 13:33:06 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B4BEF160BFA; Wed, 1 Nov 2017 12:33:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D0E1C160BE6 for ; Wed, 1 Nov 2017 13:33:05 +0100 (CET) Received: (qmail 63857 invoked by uid 500); 1 Nov 2017 12:33:05 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 63848 invoked by uid 99); 1 Nov 2017 12:33:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Nov 2017 12:33:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 44A2AC6A08 for ; Wed, 1 Nov 2017 12:33:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -97.889 X-Spam-Level: X-Spam-Status: No, score=-97.889 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, URI_HEX=1.313, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id otHKUa3drrOK for ; Wed, 1 Nov 2017 12:33:03 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id A21405FDFF for ; Wed, 1 Nov 2017 12:33:02 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id A330AE257B for ; Wed, 1 Nov 2017 12:33:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 72A192440B for ; Wed, 1 Nov 2017 12:33:00 +0000 (UTC) Date: Wed, 1 Nov 2017 12:33:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-7943) OptionalDataException when launching Flink jobs concurrently MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 01 Nov 2017 12:33:06 -0000 [ https://issues.apache.org/jira/browse/FLINK-7943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16234009#comment-16234009 ] ASF GitHub Bot commented on FLINK-7943: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4921#discussion_r148247868 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java --- @@ -560,9 +591,13 @@ protected Object clone() throws CloneNotSupportedException { * @return The Merged {@link ParameterTool} */ public ParameterTool mergeWith(ParameterTool other) { - ParameterTool ret = new ParameterTool(this.data); - ret.data.putAll(other.data); - ret.unrequestedParameters.addAll(other.unrequestedParameters); + Map resultData = new HashMap<>(data.size() + other.data.size()); + resultData.putAll(data); + resultData.putAll(other.data); + + ParameterTool ret = new ParameterTool(resultData); + + ret.unrequestedParameters.putAll(other.unrequestedParameters); --- End diff -- Forgot to add it. > OptionalDataException when launching Flink jobs concurrently > ------------------------------------------------------------ > > Key: FLINK-7943 > URL: https://issues.apache.org/jira/browse/FLINK-7943 > Project: Flink > Issue Type: Bug > Components: Client > Affects Versions: 1.4.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Priority: Major > > A user reported that he is getting a {{OptionalDataException}} when he launches multiple Flink jobs from the same program concurrently. The problem seems to appear if one sets the {{GlobalJobParameters}}. The stack trace can be found below: > {code} > Failed to submit job 60f4da5cf76836fe52ceba5cebdae412 (Union4a:14:15) > java.io.OptionalDataException > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1588) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) > at java.util.HashMap.readObject(HashMap.java:1407) > at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2173) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) > at > org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58) > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1283) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} > The user code causing the problem is: > {code} > @SuppressWarnings("serial") > public class UnionThreaded { > static int ThreadPoolSize = 3; > static int JobsPerThread = 2; > static ParameterTool params; > public static class RunSubset implements Runnable { > private int start = 0; > private int end = 0; > RunSubset(int start, int end) { > this.start = start; > this.end = end; > } > @Override > public void run() { > // set up the execution environment > final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > // make parameters available in the web interface > env.getConfig().setGlobalJobParameters(params); > if (params.has("left") && params.has("right")) { > for (int i = start; i < end; i++) { > DataSet l, r; > DataSet j; > DataSet> c1, c2; > r = env.readCsvFile(params.get("right") + "/" + Integer.toString(i)) > .pojoType(DeviceRecord.class, "A", "B", "C") > .setParallelism(1) > .filter(new MyFilter()) > .setParallelism(1); > // read the text file from given input path > j = env.readCsvFile(params.get("left") + "/" + Integer.toString(i)) > .pojoType(DeviceRecord.class, "A", "B", "C") > .setParallelism(1) > .leftOuterJoin(r) > .where("B") > .equalTo("B") > .with(new MyFlatJoinFunction()).setParallelism(1); > j.flatMap(new Mapper(false)) > .groupBy(0) > .sum(1).setParallelism(1) > .writeAsCsv(params.get("output") + "/" + Integer.toString(i), "\n", ","); > j.flatMap(new Mapper2(true)) > .groupBy(0) > .sum(1).setParallelism(1) > .writeAsCsv(params.get("output2") + "/" + Integer.toString(i), "\n", ","); > } > } > try { > System.out.println("calling env.execute()"); // + Calendar.getInstance().getTime(); > env.execute("Union4a" + ":" + Integer.toString(start) + ":" + Integer.toString(end)); > } catch (Exception e) { > System.err.println("env.execute exception: " + e.getMessage()); > } > } > } > // ************************************************************************* > // PROGRAM > // ************************************************************************* > public static void main(String[] args) throws Exception { > params = ParameterTool.fromArgs(args); > // set up the execution environment > final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > // make parameters available in the web interface > env.getConfig().setGlobalJobParameters(params); > int total_to_do = Integer.decode(params.get("filecount")); > // number of threads should be <= number of slots > ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(ThreadPoolSize); > // assumes an even number of jobs > for (int i = 0; i < total_to_do; i += JobsPerThread) { > int end = i + JobsPerThread; > if (end > total_to_do) { > end = total_to_do; > } > executor.execute(new RunSubset(i, end)); > } > executor.shutdown(); > // Many ways of waiting. > try { > executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); > } catch (InterruptedException e) { > System.out.println("Execution interrupted"); > System.exit(-1); > } > // get input data > DataSet> counts; > DataSet> counts2; > counts = env.readCsvFile(params.get("output")) > .types(Integer.class, Integer.class); > counts2 = env.readCsvFile(params.get("output2")) > .types(Integer.class, Integer.class); > // Count by C > counts = counts > .groupBy(0) > .sum(1); > // Count by device > counts2 = counts2 > .groupBy(0) > .sum(1); > // emit result > if (params.has("output")) { > counts.writeAsCsv(params.get("output3"), "\n", ", "); > } > // emit result > if (params.has("output2")) { > counts2.writeAsCsv(params.get("output4"), "\n", ", "); > } > // execute program > env.execute("Union4b"); > } > {code} > [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Execute-multiple-jobs-in-parallel-threading-java-io-OptionalDataException-td16441.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)