flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Isaac <dataenginee...@gmail.com>
Subject Re: Issue while running integration test using AbstractTestBase
Date Thu, 11 Oct 2018 06:27:12 GMT
I'm using flink 1.5.0. The test gives the same error even with flink-1.6.0.
Also, I introduced a Thread.sleep(30000); before the assert statement. That
didn't help either.

Regards,
James

On Thu, Oct 11, 2018 at 11:11 AM James Isaac <dataengineer21@gmail.com>
wrote:

> Hi,
>
> I'm trying to run an integration test of my flink application. My test
> code looks like this:
>
> public class HttpsCsvIngestorTest extends AbstractTestBase {
>
>     private final static Logger LOG =
> LoggerFactory.getLogger(HttpsCsvIngestorTest.class);
>
>     @Test
>     public void testHttpsCsvIngestion() throws Exception {
>
>         Thread flinkJob = new Thread(new Runnable() {
>             @Override
>             public void run() {
>                 String[] args = new String[] { "--configFile",
> "src/test/resources/config.properties", "--secretKey",
>                         "12345" };
>                 JobExecutionResult execResult =
> CsvProcessorFlinkDriver.runFlinkJob(args);
>             }
>         });
>
>         flinkJob.start();
>         LOG.info("Starting flink job");
>
>         Thread.sleep(10000);
>         String[] args2 = new String[] { "localhost", filename };
>         FileUploadClient.main(args2);
>
>         assertTrue(new File(System.getProperty("user.dir") +
> File.separator + "src/main/resources/Result.csv")
>                 .exists());
>         System.out.println("Test completed. Going to shutdown flink job");
>     }
>
> }
>
>
> Here I'm starting my flink application from a child thread, and uploading
> a file for processing from the main thread. The test runs fine, and I get
> the expected result file.
> However, I get the following error at the end, when the application is
> being shut down:
>
> 2018-10-10 16:24:46,670 ERROR Source: JettyServerFileSource -> Map ->
> Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477  - Error
> during disposal of stream operator.
> java.lang.NoSuchMethodError:
> org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:745)
> 2018-10-10 16:24:46,670 ERROR Source: JettyServerFileSource -> Map ->
> Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477  - Error
> during disposal of stream operator.
> java.lang.NoSuchMethodError:
> org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:745)
> 2018-10-10 16:24:46,670 ERROR Source: JettyServerFileSource -> Map ->
> Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477  - Error
> during disposal of stream operator.
> java.lang.NoSuchMethodError:
> org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:745)
> 2018-10-10 16:24:46,671 ERROR Source: JettyServerFileSource -> Map ->
> Process -> Sink: Unnamed (1/1) StreamTask StreamTask.java:477  - Error
> during disposal of stream operator.
> java.lang.NoSuchMethodError:
> org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.dispose(DefaultOperatorStateBackend.java:174)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:330)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:745)
> 2018-10-10 16:24:46,671 ERROR Source: JettyServerFileSource -> Map ->
> Process -> Sink: Unnamed (1/1) Task Task.java:843  - FATAL - exception in
> resource cleanup of task Source: JettyServerFileSource -> Map -> Process ->
> Sink: Unnamed (1/1) (12d3e0627e62ad44c57c45b720682e56).
> java.lang.IllegalStateException: Memory manager has been shut down.
> at
> org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:470)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:824)
> at java.lang.Thread.run(Thread.java:745)
> org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve
> the JobExecutionResult from the JobManager.
> at
> org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300)
> at
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
> at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:566)
> at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:540)
> at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.executeJobBlocking(FlinkMiniCluster.scala:714)
> at
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79)
> at
> mycode.CsvProcessorFlinkDriver.flinkJettyExecution(CsvProcessorFlinkDriver.java:132)
> at
> mycode.CsvProcessorFlinkDriver.runFlinkJob(CsvProcessorFlinkDriver.java:56)
> at com.demo.code.HttpsCsvIngestorTest$1.run(HttpsCsvIngestorTest.java:30)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/$a#-1711434410]] after [21474835000 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.messages.JobClientMessages$SubmitJobAndWait".
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.run(LightArrayRevolverScheduler.scala:338)
> at
> akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(LightArrayRevolverScheduler.scala:142)
> at
> akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(LightArrayRevolverScheduler.scala:141)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> akka.actor.LightArrayRevolverScheduler.close(LightArrayRevolverScheduler.scala:140)
> at akka.actor.ActorSystemImpl.stopScheduler(ActorSystem.scala:892)
> at
> akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply$mcV$sp(ActorSystem.scala:826)
> at
> akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply(ActorSystem.scala:826)
> at
> akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply(ActorSystem.scala:826)
> at akka.actor.ActorSystemImpl$$anon$3.run(ActorSystem.scala:842)
> at
> akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$addRec$1$1.applyOrElse(ActorSystem.scala:1021)
> at
> akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$addRec$1$1.applyOrElse(ActorSystem.scala:1021)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Here CsvProcessorFlinkDriver.java:132 is the executionResult = env.execute
> line.
>
> Is there something I'm doing wrong? I also notice that if I start the
> flink application in the main thread of the test class(instead of from a
> child thread), execution does not progress to the lines starting from
> LOG.info("Starting flink job");
>
> Regards,
> James
>
>

Mime
View raw message