Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5DBF9185DC for ; Thu, 3 Sep 2015 13:11:47 +0000 (UTC) Received: (qmail 23821 invoked by uid 500); 3 Sep 2015 13:11:46 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 23729 invoked by uid 500); 3 Sep 2015 13:11:46 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 23532 invoked by uid 99); 3 Sep 2015 13:11:46 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Sep 2015 13:11:46 +0000 Date: Thu, 3 Sep 2015 13:11:46 +0000 (UTC) From: "Arnaud Linz (JIRA)" To: dev@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Created] (FLINK-2617) ConcurrentModificationException when using HCatRecordReader to access a hive table MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 Arnaud Linz created FLINK-2617: ---------------------------------- Summary: ConcurrentModificationException when using HCatRecordReader to access a hive table Key: FLINK-2617 URL: https://issues.apache.org/jira/browse/FLINK-2617 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Reporter: Arnaud Linz I don't know if it's a hcat or a flink problem, but when reading a hive table in a cluster with many slots (20 threads per container), I systematically run into a {{ConcurrentModificationException}} in a copy method of a {{Configuration}} object that change during the copy. >From what I understand, this object comes from {{TaskAttemptContext.getConfiguration()}} created by {{HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());}} Maybe the {{job.Configuration}} object passed to the constructor of {{HadoopInputFormatBase}} should be cloned somewhere? Stack trace is : {code} org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. org.apache.flink.client.program.Client.run(Client.java:413) org.apache.flink.client.program.Client.run(Client.java:356) org.apache.flink.client.program.Client.run(Client.java:349) org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73) com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69) com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50) com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88) com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) org.apache.flink.client.program.Client.run(Client.java:315) org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582) org.apache.flink.client.CliFrontend.run(CliFrontend.java:288) org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878) org.apache.flink.client.CliFrontend.main(CliFrontend.java:920) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) akka.actor.Actor$class.aroundReceive(Actor.scala:465) org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) akka.actor.ActorCell.invoke(ActorCell.scala:487) akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) akka.dispatch.Mailbox.run(Mailbox.scala:221) akka.dispatch.Mailbox.exec(Mailbox.scala:231) scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.ConcurrentModificationException java.util.HashMap$HashIterator.nextEntry(HashMap.java:926) java.util.HashMap$KeyIterator.next(HashMap.java:960) java.util.AbstractCollection.addAll(AbstractCollection.java:341) java.util.HashSet.(HashSet.java:117) org.apache.hadoop.conf.Configuration.(Configuration.java:554) org.apache.hadoop.mapred.JobConf.(JobConf.java:439) org.apache.hive.hcatalog.common.HCatUtil.getJobConfFromContext(HCatUtil.java:637) org.apache.hive.hcatalog.mapreduce.HCatRecordReader.createBaseRecordReader(HCatRecordReader.java:112) org.apache.hive.hcatalog.mapreduce.HCatRecordReader.initialize(HCatRecordReader.java:91) org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:182) org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:56) org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) java.lang.Thread.run(Thread.java:744) {code} Flink "user" code looks like: {code} import java.io.IOException; import java.io.Serializable; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.util.Collector; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.data.DefaultHCatRecord; import org.apache.hive.hcatalog.data.schema.HCatSchema; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; (...) final Job job = Job.getInstance(); @SuppressWarnings({ "unchecked", "rawtypes" }) final HadoopInputFormat inputFormat = new HadoopInputFormat( (InputFormat) HCatInputFormat.setInput(job, dbName, tableName, filter), // NullWritable.class, // DefaultHCatRecord.class, // job); final HCatSchema inputSchema = HCatInputFormat.getTableSchema(job.getConfiguration()); @SuppressWarnings("serial") final DataSet dataSet = cluster .createInput(inputFormat) .flatMap(new FlatMapFunction, T>() { @Override public void flatMap(Tuple2 value, Collector out) throws Exception { // NOPMD final T record = createBean(value.f1, inputSchema); out.collect(record); } }).returns(beanClass); (...) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)