beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Raghu Angadi <rang...@google.com>
Subject Re: kafkaIO Run with Spark Runner: "streaming-job-executor-0"
Date Wed, 13 Jun 2018 15:10:49 GMT
Can you check the logs on the worker?

On Wed, Jun 13, 2018 at 2:26 AM <linrick@itri.org.tw> wrote:

> Dear all,
>
>
>
> I am using the kafkaIO in my project (Beam 2.0.0 with Spark runner).
>
> My running environment is:
>
> OS: Ubuntn 14.04.3 LTS
>
> The different version for these tools:
>
> JAVA: JDK 1.8
>
> Beam 2.0.0 (Spark runner with Standalone mode)
>
> Spark 1.6.0
>
> Standalone mode :One driver node: ubuntu7; One master node: ubuntu8; Two
> worker nodes: ubuntu8 and ubuntu9
>
> Kafka: 2.10-0.10.1.1
>
>
>
> The java code of my project is:
>
>
> ==============================================================================
>
> SparkPipelineOptions options = PipelineOptionsFactory.*as*
> (SparkPipelineOptions.*class*);
>
> options.setRunner(SparkRunner.*class*);
>
> options.setSparkMaster("spark://ubuntu8:7077");
>
> options.setAppName("App kafkaBeamTest");
>
> options.setJobName("Job kafkaBeamTest");
>
> *options**.setMaxRecordsPerBatch(1000L);*
>
>
>
> Pipeline p = Pipeline.create(options);
>
>
>
> System.out.println("Beamtokafka");
>
> PCollection<KV<Long, String>> readData = p.apply(KafkaIO.<Long,
> String>read()
>
> .withBootstrapServers(ubuntu7:9092)
>
> .withTopic("kafkasink")
>
> .withKeyDeserializer(LongDeserializer.class)
>
> .withValueDeserializer(StringDeserializer.class)
>
>        .withoutMetadata()
>
>        );
>
>
>
> PCollection<KV<Long, String>> readDivideData = readData.
>
>
> apply(Window.<KV<Long,String>>into(FixedWindows.of(Duration.standardSeconds(1)))
>
>      .triggering(AfterWatermark.pastEndOfWindow()
>
>
> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
>
>      .withAllowedLateness(Duration.ZERO) .discardingFiredPanes());
>
>
>
> System.out.println("CountData");
>
>
>
> PCollection<KV<Long, Long>> countData =
> readDivideData.apply(Count.perKey());
>
>
>
> p.run();
>
>
> ==============================================================================
>
>
>
> The message of error is:
>
>
> ==============================================================================
>
> Exception in thread "streaming-job-executor-0" java.lang.Error:
> java.lang.InterruptedException
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>         at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.InterruptedException
>
>         at java.lang.Object.wait(Native Method)
>
>         at java.lang.Object.wait(Object.java:502)
>
>         at
> org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
>
>         at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612)
>
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
>
>         at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)
>
>         at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
>
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>
>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>
>         at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
>
> …
>
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> ... 2 more
>
>
> ==============================================================================
>
>
>
> Maven 3.5.0, in which related dependencies are listed in my project’s
> pom.xml:
>
> <dependency>
>
> <groupId>org.apache.beam</groupId>
>
>   <artifactId>beam-sdks-java-core</artifactId>
>
>   <version>2.0.0</version>
>
> </dependency>
>
> <dependency>
>
> <groupId>org.apache.beam</groupId>
>
>    <artifactId>beam-sdks-java-io-kafka</artifactId>
>
>    <version>2.0.0</version>
>
> </dependency>
>
> <dependency>
>
> <groupId>org.apache.spark</groupId>
>
>   <artifactId>spark-core_2.10</artifactId>
>
>   <version>1.6.0</version>
>
> </dependency>
>
> <dependency>
>
> <groupId>org.apache.spark</groupId>
>
>   <artifactId>spark-streaming_2.10</artifactId>
>
>   <version>1.6.0</version>
>
> </dependency>
>
>
>
> <dependency>
>
> <groupId>org.apache.kafka</groupId>
>
>   <artifactId>kafka-clients</artifactId>
>
>   <version>0.10.1.1</version>
>
> </dependency>
>
> <dependency>
>
> <groupId>org.apache.kafka</groupId>
>
>   <artifactId>kafka_2.10</artifactId>
>
>   <version>0.10.1.1</version>
>
> </dependency>
>
>
>
>
>
> When I use the above code in Spark Runner (Local [4]), this project worked
> well (2000~4000 data/s). However, if I run it on Standalone mode, it failed
> along with the above error.
>
>
>
> If you have any idea about the error ("streaming-job-executor-0"), I am
> looking forward to hearing from you.
>
>
>
> Note that: perform command line is “./spark-submit --class
> com.itri.beam.kafkatest --master spark:// ubuntu8:7077
> /root/BeamKafkaAdvanced-0.1-shaded.jar --runner=SparkRunner”
>
>
>
> Thanks
>
>
>
> Rick
>
>
>
>
>
>
> --
> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。
This email may contain
> confidential information. Please do not use or disclose it in any way and
> delete it if you are not the intended recipient.
>

Mime
View raw message