camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Grzegorz Grzybek (JIRA)" <>
Subject [jira] [Commented] (CAMEL-7318) Concurrency on HDFS Consumer not working efficiently
Date Thu, 05 Mar 2015 09:29:38 GMT


Grzegorz Grzybek commented on CAMEL-7318:

Actually the file is not being read twice.
I've setup two consumers and watched the behavior under debugger.
This code you've mentioned (in org.apache.camel.component.hdfs2.HdfsInputStream#createInputStream()):
        info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath));
doesn't return {{false}} if can't rename, it throws exception with the following stack trace
("file://" case):
{noformat} File file:/data/ggrzybek/sources/
does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(
	at org.apache.hadoop.fs.FileUtil.copy(
	at org.apache.hadoop.fs.FileUtil.copy(
	at org.apache.hadoop.fs.RawLocalFileSystem.rename(
	at org.apache.hadoop.fs.ChecksumFileSystem.rename(
	at org.apache.camel.component.hdfs2.HdfsInputStream.createInputStream(
	at org.apache.camel.component.hdfs2.HdfsConsumer.doPoll(
	at org.apache.camel.component.hdfs2.HdfsConsumer.poll(
	at org.apache.camel.impl.ScheduledPollConsumer.doRun(
	at java.util.concurrent.Executors$
	at java.util.concurrent.FutureTask.runAndReset(
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(
	at java.util.concurrent.ScheduledThreadPoolExecutor$
	at java.util.concurrent.ThreadPoolExecutor.runWorker(
	at java.util.concurrent.ThreadPoolExecutor$
so then Camel invokes org.apache.camel.spi.PollingConsumerPollStrategy#rollback() and simply
skips this file and move to the next one.
I'll check how it works in "hdfs://" case.

> Concurrency on HDFS Consumer not working efficiently
> ----------------------------------------------------
>                 Key: CAMEL-7318
>                 URL:
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-hdfs
>    Affects Versions: 2.11.2
>            Reporter: Martha Obrinteschi
>            Assignee: Grzegorz Grzybek
>            Priority: Minor
>              Labels: concurrency, parallel
> If we have two HDFS consumers the files are being processed twice (once by each consumer,
waiting one after the other) so the consumers are not working in parallel. If we add this
the consumers will work as a team and the transfer will go faster.
> This happens because there is no exception thrown (the rename method just returns true
or false and everything carries on as nothing would have happened :).
> In order to fix this we could add in the HdfsInputStream: 49
> boolean tf = info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath));
> if (!tf) {
> 	throw new IOException("This exception is thrown because the rename did not succeded.");
> 	}
> And also in the HdfsConsumer: 150
> try {
>      this.rwlock.writeLock().lock();
>      this.istream = HdfsInputStream.createInputStream(fileStatuses[i].getPath().toString(),
> } catch (IOException ioe) {
>	+ " If the rename fails we move on to the next file.");
>      continue;
> }

This message was sent by Atlassian JIRA

View raw message