crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Attila Sasvari (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (CRUNCH-636) Make replication factor for temporary files configurable
Date Wed, 08 Mar 2017 17:05:38 GMT

     [ https://issues.apache.org/jira/browse/CRUNCH-636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Attila Sasvari updated CRUNCH-636:
----------------------------------
    Attachment: test.WordCount_2017-03-08_16.31.55.737.log
                test.WordCount_2017-03-08_16.31.55.737_jobplan.dot.png

I believe I have found a way to handle the issue. Previously, I set the replication in a wrong
place in {{MSCROutputHandler}}. Simply I cannot rely on target path, we need to make sure
that a job has only temporary output files when we set the supplied replication factor. 

If a job has both temporary and final files, then we should use the default / initial dfs.replication
in the MR job. We can make a decision in the {{build()}} method of {{JobPrototype}}. Before
 https://github.com/apache/crunch/blob/7f85ee5816a19eca0e87ce503ea0b03ea294433c/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java#L155
, we create a boolean that maintains if the job at hand has only temporary files. If there
is any non-temporary output we set this to false inside the loop. Based on this we can set
the proper replication after the loop.

Regarding user configuration, I guess we should allow whatever they set. If MapReduce allows
it, we allow it. 

I have attached an example jobplan and a debug log to show what is happening right now.

Following the execution: 
- At the beginning we see Crunch creates all the temporary directories.
{code}
7/03/08 17:05:56 INFO dist.DistributedPipeline: temporary directory created: /tmp/crunch-1708944155/p1
17/03/08 17:05:56 INFO dist.DistributedPipeline: temporary directory created: /tmp/crunch-1708944155/p2
17/03/08 17:05:56 INFO dist.DistributedPipeline: temporary directory created: /tmp/crunch-1708944155/p3
17/03/08 17:05:56 INFO dist.DistributedPipeline: temporary directory created: /tmp/crunch-1708944155/p4
17/03/08 17:05:56 INFO dist.DistributedPipeline: temporary directory created: /tmp/crunch-1708944155/p5
{code}

- We remember these paths to make sure we can decide whether an output file is temporary or
not.

- Then we check current replication factor set for the job and adjust it if we are dealing
with a temporary job output:
{code}
Replication factor: 3
17/03/08 16:31:55 INFO plan.JobPrototype: --- target :Text(/user/asasvari/5FXm_union_count)
17/03/08 16:31:55 INFO plan.MSCROutputHandler: Target: Text(/user/asasvari/5FXm_union_count)
17/03/08 16:31:55 INFO plan.JobPrototype: Setting initial replication factor (3)
Replication factor: 3
17/03/08 16:31:55 INFO plan.JobPrototype: --- target :SeqFile(/tmp/crunch-1543087915/p2)
17/03/08 16:31:55 INFO plan.MSCROutputHandler: Target: SeqFile(/tmp/crunch-1543087915/p2)
17/03/08 16:31:55 INFO plan.JobPrototype: Setting replication factor to: 9 
Replication factor: 3
17/03/08 16:31:55 INFO plan.JobPrototype: --- target :Text(/user/asasvari/5FXm2)
17/03/08 16:31:55 INFO plan.MSCROutputHandler: Target: Text(/user/asasvari/5FXm2)
17/03/08 16:31:55 INFO plan.JobPrototype: --- target :SeqFile(/tmp/crunch-1543087915/p1)
17/03/08 16:31:55 INFO plan.MSCROutputHandler: Target: SeqFile(/tmp/crunch-1543087915/p1)
17/03/08 16:31:55 INFO plan.JobPrototype: Setting initial replication factor (3)
{code}

* Crunch Job 3 creates only a temporary file {{ /tmp/crunch-1543087915/p2}} so we set replication
factor to 9 (user specified value of {{crunch.tmp.dir.replication}}). 
{code}
$ hdfs dfs -ls  /tmp/crunch-1543087915/p2        
17/03/08 17:44:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your
platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   1 asasvari supergroup          0 2017-03-08 16:32 /tmp/crunch-1543087915/p2/_SUCCESS
-rw-r--r--   9 asasvari supergroup       1183 2017-03-08 16:32 /tmp/crunch-1543087915/p2/part-r-00000
{code}
* Crunch Job 2 produces a temporary file and a non-temporary file, so we must replication
factor to the original {{dfs.replication}}.
* Crunch Job 1 has 1 non-temporary output file, so {{dfs.replication}} is used.
{code}
hdfs dfs -ls  /user/asasvari/iwTV_union_count 
17/03/08 17:45:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your
platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   1 asasvari supergroup          0 2017-03-08 17:06 /user/asasvari/iwTV_union_count/_SUCCESS
-rw-r--r--   3 asasvari supergroup        921 2017-03-08 17:06 /user/asasvari/iwTV_union_count/part-r-00000
{code}

I had some other questions regarding failure of a big pipeline. 
- Let's assume replication factor is set to 1 for temporary files, and we have a huge pipeline
with hundreds of nodes. Node fails that stores temporary data very near to the end of pipeline
execution, so the last job cannot finish. So the pipeline failed. There are no rollback actions
are performed that delete outputs by earlier jobs. With checkpointing enabled, previous successful
jobs are not necessary to perform.

- Does it make sense to allow setting replication higher than initial for temporary files?
Probably not, but we can allow it. 



> Make replication factor for temporary files configurable
> --------------------------------------------------------
>
>                 Key: CRUNCH-636
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-636
>             Project: Crunch
>          Issue Type: New Feature
>            Reporter: Attila Sasvari
>            Assignee: Attila Sasvari
>         Attachments: test.WordCount_2017-03-08_16.31.55.737_jobplan.dot.png, test.WordCount_2017-03-08_16.31.55.737.log
>
>
> As of now, Crunch does not allow having different replication factor for temporary files
and non-temporary files (e.g. final output data of leaf nodes) at the same time. If a user
has a large amount of data (say hundreds a of gigabytes) to process, they might want to have
lower replication factor for large temporary files between Crunch jobs. 
> We could make this configurable via a new setting (e.g. {{crunch.tmp.dir.replication}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message