beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jianfeng Qian <qianjianf...@outlook.com>
Subject spark wordcount case(local mode,empty result file)
Date Tue, 12 Apr 2016 10:41:29 GMT
Hi,

I just start to use Beam.

I had installed Scala 2.11 , Hadoop 2.72, Spark 1.6.1 and started hadoop and spark.

I downloaded and build the Beam.

I downloaded the filed by:

curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > /home/jeff/test/beam/input/kinglear.txt

jeff@T:~/test/beam$ ls -l input
total 184
-rw-rw-r-- 1 jeff jeff 185965 Apr 11 17:16 kinglear.txt

then I run the local mode:

jeff@T:~/git/incubator-beam/runners/spark$ mvn exec:exec -DmainClass=com.google.cloud.dataflow.examples.WordCount
\
>   -Dinput=~/test/beam/input/kinglear.txt -Doutput=/test/beam/output -Drunner=SparkPipelineRunner
\
>   -DsparkMaster=local
However, the result file is empty. Is anyone faced with the same problem?

jeff@T:~/test/beam$ ls -l
total 4
drwxrwxr-x 2 jeff jeff 4096 Apr 12 17:32 input
-rw-r--r-- 1 jeff jeff    0 Apr 12 18:20 output-00000-of-00001
-rw-r--r-- 1 jeff jeff    0 Apr 12 18:20 _SUCCESS


local mode log is as following:

[INFO] Scanning for projects...
[WARNING] The POM for org.eclipse.m2e:lifecycle-mapping:jar:1.0.0 is missing, no dependency
information available
[WARNING] Failed to retrieve plugin descriptor for org.eclipse.m2e:lifecycle-mapping:1.0.0:
Plugin org.eclipse.m2e:lifecycle-mapping:1.0.0 or one of its dependencies could not be resolved:
Failure to find org.eclipse.m2e:lifecycle-mapping:jar:1.0.0 in https://repo.maven.apache.org/maven2
was cached in the local repository, resolution will not be reattempted until the update interval
of central has elapsed or updates are forced
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Apache Beam :: Runners :: Spark 0.1.0-incubating-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[WARNING] The POM for org.eclipse.m2e:lifecycle-mapping:jar:1.0.0 is missing, no dependency
information available
[WARNING] Failed to retrieve plugin descriptor for org.eclipse.m2e:lifecycle-mapping:1.0.0:
Plugin org.eclipse.m2e:lifecycle-mapping:1.0.0 or one of its dependencies could not be resolved:
Failure to find org.eclipse.m2e:lifecycle-mapping:jar:1.0.0 in https://repo.maven.apache.org/maven2
was cached in the local repository, resolution will not be reattempted until the update interval
of central has elapsed or updates are forced
[INFO]
[INFO] --- exec-maven-plugin:1.4.0:exec (default-cli) @ spark-runner ---
log4j:WARN No appenders could be found for logger (com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/04/12 18:20:29 INFO SparkContext: Running Spark version 1.5.2
16/04/12 18:20:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform...
using builtin-java classes where applicable
16/04/12 18:20:29 WARN Utils: Your hostname, T resolves to a loopback address: 127.0.1.1;
using 192.168.1.119 instead (on interface wlp3s0)
16/04/12 18:20:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
16/04/12 18:20:29 INFO SecurityManager: Changing view acls to: jeff
16/04/12 18:20:29 INFO SecurityManager: Changing modify acls to: jeff
16/04/12 18:20:29 INFO SecurityManager: SecurityManager: authentication disabled; ui acls
disabled; users with view permissions: Set(jeff); users with modify permissions: Set(jeff)
16/04/12 18:20:31 INFO Slf4jLogger: Slf4jLogger started
16/04/12 18:20:31 INFO Remoting: Starting remoting
16/04/12 18:20:32 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.119:40821]
16/04/12 18:20:32 INFO Utils: Successfully started service 'sparkDriver' on port 40821.
16/04/12 18:20:32 INFO SparkEnv: Registering MapOutputTracker
16/04/12 18:20:32 INFO SparkEnv: Registering BlockManagerMaster
16/04/12 18:20:32 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-12aaa425-2b1b-4182-865d-5d231ee10cda
16/04/12 18:20:33 INFO MemoryStore: MemoryStore started with capacity 441.7 MB
16/04/12 18:20:33 INFO HttpFileServer: HTTP File server directory is /tmp/spark-091800d6-b90b-48f6-9c8d-f3f01755f59b/httpd-bf2fd43c-b3c9-4840-bc72-e2f16965df9a
16/04/12 18:20:33 INFO HttpServer: Starting HTTP Server
16/04/12 18:20:33 INFO Utils: Successfully started service 'HTTP file server' on port 46840.
16/04/12 18:20:33 INFO SparkEnv: Registering OutputCommitCoordinator
16/04/12 18:20:33 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/04/12 18:20:33 INFO SparkUI: Started SparkUI at http://192.168.1.119:4040
16/04/12 18:20:34 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id
is not set.
16/04/12 18:20:34 INFO Executor: Starting executor ID driver on host localhost
16/04/12 18:20:34 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService'
on port 43549.
16/04/12 18:20:34 INFO NettyBlockTransferService: Server created on 43549
16/04/12 18:20:34 INFO BlockManagerMaster: Trying to register BlockManager
16/04/12 18:20:34 INFO BlockManagerMasterEndpoint: Registering block manager localhost:43549
with 441.7 MB RAM, BlockManagerId(driver, localhost, 43549)
16/04/12 18:20:34 INFO BlockManagerMaster: Registered BlockManager
16/04/12 18:20:34 INFO SparkPipelineRunner$Evaluator: Entering directly-translatable composite
transform: 'ReadLines'
16/04/12 18:20:34 INFO SparkPipelineRunner$Evaluator: Skipping 'ReadLines/Read'; already in
composite transform.
16/04/12 18:20:34 INFO SparkPipelineRunner$Evaluator: Post-visiting directly-translatable
composite transform: 'ReadLines'
16/04/12 18:20:34 INFO SparkPipelineRunner$Evaluator: Evaluating ReadLines [TextIO.Read]
16/04/12 18:20:35 INFO MemoryStore: ensureFreeSpace(110248) called with curMem=0, maxMem=463176990
16/04/12 18:20:35 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated
size 107.7 KB, free 441.6 MB)
16/04/12 18:20:35 INFO MemoryStore: ensureFreeSpace(10056) called with curMem=110248, maxMem=463176990
16/04/12 18:20:35 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated
size 9.8 KB, free 441.6 MB)
16/04/12 18:20:35 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:43549
(size: 9.8 KB, free: 441.7 MB)
16/04/12 18:20:35 INFO SparkContext: Created broadcast 0 from textFile at TransformTranslator.java:471
16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Evaluating ParDo(ExtractWords)
16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Evaluating Init [AnonymousParDo]
16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Entering directly-translatable composite
transform: 'WordCount.CountWords/Count.PerElement/Count.PerKey'
16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Skipping 'WordCount.CountWords/Count.PerElement/Count.PerKey/GroupByKey/GroupByKeyViaGroupByKeyOnly/GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows/ParDo(ReifyTimestampAndWindows)';
already in composite transform.
16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Skipping 'WordCount.CountWords/Count.PerElement/Count.PerKey/GroupByKey/GroupByKeyViaGroupByKeyOnly/GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly';
already in composite transform.
16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Skipping 'WordCount.CountWords/Count.PerElement/Count.PerKey/GroupByKey/GroupByKeyViaGroupByKeyOnly/GroupByKeyViaGroupByKeyOnly.SortValuesByTimestamp/AnonymousParDo';
already in composite transform.
16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Skipping 'WordCount.CountWords/Count.PerElement/Count.PerKey/GroupByKey/GroupByKeyViaGroupByKeyOnly/GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow/ParDo(GroupAlsoByWindowsViaOutputBuffer)';
already in composite transform.
16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Skipping 'WordCount.CountWords/Count.PerElement/Count.PerKey/Combine.GroupedValues/AnonymousParDo';
already in composite transform.
16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Post-visiting directly-translatable
composite transform: 'WordCount.CountWords/Count.PerElement/Count.PerKey'
16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Evaluating Count.PerKey [Combine.PerKey]
16/04/12 18:20:36 INFO FileInputFormat: Total input paths to process : 1
16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Evaluating Map [AnonymousParDo]
16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Entering directly-translatable composite
transform: 'WriteCounts'
16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping 'WriteCounts/Write/Create.Values';
already in composite transform.
16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping 'WriteCounts/Write/Initialize';
already in composite transform.
16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping 'WriteCounts/Write/View.AsSingleton/View.CreatePCollectionView';
already in composite transform.
16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping 'WriteCounts/Write/WriteBundles';
already in composite transform.
16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping 'WriteCounts/Write/Window.Into()';
already in composite transform.
16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping 'WriteCounts/Write/View.AsIterable/View.CreatePCollectionView';
already in composite transform.
16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping 'WriteCounts/Write/Finalize';
already in composite transform.
16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Post-visiting directly-translatable
composite transform: 'WriteCounts'
16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Evaluating WriteCounts [TextIO.Write]
16/04/12 18:20:36 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
16/04/12 18:20:36 INFO SparkContext: Starting job: saveAsNewAPIHadoopFile at TransformTranslator.java:660
16/04/12 18:20:36 INFO DAGScheduler: Registering RDD 6 (mapToPair at TransformTranslator.java:304)
16/04/12 18:20:36 INFO DAGScheduler: Got job 0 (saveAsNewAPIHadoopFile at TransformTranslator.java:660)
with 1 output partitions
16/04/12 18:20:36 INFO DAGScheduler: Final stage: ResultStage 1(saveAsNewAPIHadoopFile at
TransformTranslator.java:660)
16/04/12 18:20:36 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/04/12 18:20:36 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/04/12 18:20:36 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[6] at
mapToPair at TransformTranslator.java:304), which has no missing parents
16/04/12 18:20:36 INFO MemoryStore: ensureFreeSpace(10368) called with curMem=120304, maxMem=463176990
16/04/12 18:20:36 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated
size 10.1 KB, free 441.6 MB)
16/04/12 18:20:36 INFO MemoryStore: ensureFreeSpace(5001) called with curMem=130672, maxMem=463176990
16/04/12 18:20:36 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated
size 4.9 KB, free 441.6 MB)
16/04/12 18:20:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:43549
(size: 4.9 KB, free: 441.7 MB)
16/04/12 18:20:36 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861
16/04/12 18:20:36 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[6]
at mapToPair at TransformTranslator.java:304)
16/04/12 18:20:36 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/04/12 18:20:36 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL,
2142 bytes)
16/04/12 18:20:36 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/04/12 18:20:36 INFO HadoopRDD: Input split: file:/home/jeff/test/beam/input/kinglear.txt:0+185965
16/04/12 18:20:36 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
16/04/12 18:20:36 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
16/04/12 18:20:36 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
16/04/12 18:20:36 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/04/12 18:20:36 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
16/04/12 18:20:37 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 4316 bytes result
sent to driver
16/04/12 18:20:37 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 477 ms on
localhost (1/1)
16/04/12 18:20:37 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed,
from pool
16/04/12 18:20:37 INFO DAGScheduler: ShuffleMapStage 0 (mapToPair at TransformTranslator.java:304)
finished in 0.511 s
16/04/12 18:20:37 INFO DAGScheduler: looking for newly runnable stages
16/04/12 18:20:37 INFO DAGScheduler: running: Set()
16/04/12 18:20:37 INFO DAGScheduler: waiting: Set(ResultStage 1)
16/04/12 18:20:37 INFO DAGScheduler: failed: Set()
16/04/12 18:20:37 INFO DAGScheduler: Missing parents for ResultStage 1: List()
16/04/12 18:20:37 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[14] at mapToPair
at TransformTranslator.java:486), which is now runnable
16/04/12 18:20:37 INFO MemoryStore: ensureFreeSpace(54976) called with curMem=135673, maxMem=463176990
16/04/12 18:20:37 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated
size 53.7 KB, free 441.5 MB)
16/04/12 18:20:37 INFO MemoryStore: ensureFreeSpace(19334) called with curMem=190649, maxMem=463176990
16/04/12 18:20:37 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated
size 18.9 KB, free 441.5 MB)
16/04/12 18:20:37 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:43549
(size: 18.9 KB, free: 441.7 MB)
16/04/12 18:20:37 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:861
16/04/12 18:20:37 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[14]
at mapToPair at TransformTranslator.java:486)
16/04/12 18:20:37 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/04/12 18:20:37 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL,
1901 bytes)
16/04/12 18:20:37 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
16/04/12 18:20:37 INFO deprecation: mapreduce.outputformat.class is deprecated. Instead, use
mapreduce.job.outputformat.class
16/04/12 18:20:37 INFO deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
16/04/12 18:20:37 INFO deprecation: mapred.output.value.class is deprecated. Instead, use
mapreduce.job.output.value.class
16/04/12 18:20:37 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
16/04/12 18:20:37 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 6 ms
16/04/12 18:20:37 INFO FileOutputCommitter: Saved output of task 'attempt_201604121820_0014_r_000000_0'
to file:/home/jeff/test/beam/_temporary/0/task_201604121820_0014_r_000000
16/04/12 18:20:37 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1326 bytes result
sent to driver
16/04/12 18:20:37 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 126 ms on
localhost (1/1)
16/04/12 18:20:37 INFO DAGScheduler: ResultStage 1 (saveAsNewAPIHadoopFile at TransformTranslator.java:660)
finished in 0.127 s
16/04/12 18:20:37 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed,
from pool
16/04/12 18:20:37 INFO DAGScheduler: Job 0 finished: saveAsNewAPIHadoopFile at TransformTranslator.java:660,
took 0.881797 s
16/04/12 18:20:37 INFO SparkPipelineRunner: Pipeline execution complete.
16/04/12 18:20:37 INFO SparkContext: Invoking stop() from shutdown hook
16/04/12 18:20:37 INFO SparkUI: Stopped Spark web UI at http://192.168.1.119:4040
16/04/12 18:20:37 INFO DAGScheduler: Stopping DAGScheduler
16/04/12 18:20:37 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/04/12 18:20:37 INFO MemoryStore: MemoryStore cleared
16/04/12 18:20:37 INFO BlockManager: BlockManager stopped
16/04/12 18:20:37 INFO BlockManagerMaster: BlockManagerMaster stopped
16/04/12 18:20:37 INFO SparkContext: Successfully stopped SparkContext
16/04/12 18:20:37 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator
stopped!
16/04/12 18:20:37 INFO ShutdownHookManager: Shutdown hook called
16/04/12 18:20:37 INFO ShutdownHookManager: Deleting directory /tmp/spark-091800d6-b90b-48f6-9c8d-f3f01755f59b
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 14.206 s
[INFO] Finished at: 2016-04-12T18:20:37+08:00
[INFO] Final Memory: 23M/167M
[INFO] ------------------------------------------------------------------------



Mime
View raw message