flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ahmed Vila <av...@devlogic.eu>
Subject Re: Error occurs when use flume to put data into s3
Date Mon, 20 Oct 2014 09:01:06 GMT
Guess this is the reason:
a1.sinks.k4.hdfs.path = *s3n*://${path}

It should be s3://

Regards,
Ahmed

On Mon, Oct 20, 2014 at 9:19 AM, Xuesong Ding <xding@appannie.com> wrote:

> Dear all,
>
>   We use flume-ng to put data into s3 and hdfs both, but it occurs some
> errors when close s3 file. Should we adjust flume parameters or do
> something else? Any ideas? Below are the error itself and our flume-ng
> configuration, thanks a lot.
>
>   BTW, our flume-ng version is cdh4.7.0
>
> ​  Exception stack trace:
>   2014-10-14 12:00:30,158 ERROR org.apache.flume.sink.hdfs.BucketWriter:
> Unexpected error
> com.cloudera.org.apache.http.NoHttpResponseException: The target server
> failed to respond
>         at
> com.cloudera.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:95)
>         at
> com.cloudera.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:62)
>         at
> com.cloudera.org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:254)
>         at
> com.cloudera.org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:289)
>         at
> com.cloudera.org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:252)
>         at
> com.cloudera.org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219)
>         at
> com.cloudera.org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:300)
>         at
> com.cloudera.org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:127)
>         at
> com.cloudera.org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:715)
>         at
> com.cloudera.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:520)
>         at
> com.cloudera.org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:906)
>         at
> com.cloudera.org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:805)
>         at
> com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:334)
>         at
> com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:281)
>         at
> com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestPut(RestStorageService.java:1043)
>         at
> com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.copyObjectImpl(RestStorageService.java:2029)
>         at
> com.cloudera.org.jets3t.service.StorageService.copyObject(StorageService.java:871)
>         at
> com.cloudera.org.jets3t.service.StorageService.copyObject(StorageService.java:916)
>         at
> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:314)
>         at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
>         at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
>         at org.apache.hadoop.fs.s3native.$Proxy18.copy(Unknown Source)
>         at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.rename(NativeS3FileSystem.java:645)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:541)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$8$1.run(BucketWriter.java:589)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:161)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.access$800(BucketWriter.java:57)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:586)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
>
>     ​Flume-ng configuration:
>   ## THIS FILE CONTAINS FLUME TIER_1 CONFIGURATION
>
> # DIFINE COMPONENTS
> a1.sources = r1
> a1.sinks =  k1 k2 k3 k4
> a1.channels = c1 c2
> a1.sinkgroups = g1 g2
>
> # SOURCE(CUSTOM)
> a1.sources.r1.type = spooldir
> a1.sources.r1.spoolDir = /ssd/disk2
> a1.sources.r1.deletePolicy= never
> a1.sources.r1.ignorePattern= ^.*\\.tmp$
> a1.sources.r1.batchSize= 1
> a1.sources.r1.deserializer=
> org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
> a1.sources.r1.deserializer.maxBlobLength = 300000000
>
> # SINK (HDFS)
> a1.sinks.k1.type = hdfs
> a1.sinks.k1.hdfs.filePrefix = packet
> a1.sinks.k1.hdfs.batchSize= 1
> a1.sinks.k1.hdfs.fileSuffix = .snappy
> a1.sinks.k1.hdfs.codeC = snappy
> a1.sinks.k1.hdfs.fileType = CompressedStream
> a1.sinks.k1.hdfs.rollCount = 0
> a1.sinks.k1.hdfs.rollSize = 500000000
> a1.sinks.k1.hdfs.rollInterval = 300
> a1.sinks.k1.hdfs.path = hdfs:/${path}
>
> # SINK (HDFS)
> a1.sinks.k2.type = hdfs
> a1.sinks.k2.hdfs.filePrefix = packet
> a1.sinks.k2.hdfs.batchSize= 1
> a1.sinks.k2.hdfs.fileSuffix = .snappy
> a1.sinks.k2.hdfs.codeC = snappy
> a1.sinks.k2.hdfs.fileType = CompressedStream
> a1.sinks.k2.hdfs.rollCount = 0
> a1.sinks.k2.hdfs.rollSize = 500000000
> a1.sinks.k2.hdfs.rollInterval = 300
> a1.sinks.k2.hdfs.path = hdfs://${path}
>
> #SINK (S3)
> a1.sinks.k3.type = hdfs
> a1.sinks.k3.hdfs.filePrefix = packet
> a1.sinks.k3.hdfs.batchSize= 1
> a1.sinks.k3.hdfs.fileSuffix = .snappy
> a1.sinks.k3.hdfs.codeC = snappy
> a1.sinks.k3.hdfs.fileType = CompressedStream
> a1.sinks.k3.hdfs.rollCount = 0
> a1.sinks.k3.hdfs.rollSize = 500000000
> a1.sinks.k3.hdfs.rollInterval = 300
> a1.sinks.k3.hdfs.path = s3n://${path}
>
> #SINK (S3)
> a1.sinks.k4.type = hdfs
> a1.sinks.k4.hdfs.filePrefix = packet
> a1.sinks.k4.hdfs.batchSize= 1
> a1.sinks.k4.hdfs.fileSuffix = .snappy
> a1.sinks.k4.hdfs.codeC = snappy
> a1.sinks.k4.hdfs.fileType = CompressedStream
> a1.sinks.k4.hdfs.rollCount = 0
> a1.sinks.k4.hdfs.rollSize = 500000000
> a1.sinks.k4.hdfs.rollInterval = 300
> a1.sinks.k4.hdfs.path = s3n://${path}
>
> a1.sinkgroups.g1.sinks = k1 k2
> a1.sinkgroups.g1.processor.type = load_balance
> a1.sinkgroups.g1.processor.backoff = true
> a1.sinkgroups.g1.processor.selector = random
>
> a1.sinkgroups.g2.sinks = k3 k4
> a1.sinkgroups.g2.processor.type = load_balance
> a1.sinkgroups.g2.processor.backoff = true
> a1.sinkgroups.g2.processor.selector = random
>
> # INTERCEPTORS (TIMESTAMP FOR HDFS PATH)
> a1.sources.r1.interceptors = i1 i2
> a1.sources.r1.interceptors.i1.type = timestamp
> a1.sources.r1.interceptors.i2.type = host
> a1.sources.r1.interceptors.i2.preserveExisting = false
>
> # CHANNEL (MEM)
> a1.channels.c1.type = memory
> a1.channels.c1.capacity = 500
> a1.channels.c1.transactionCapacity = 1
> #a1.channels.c1.byteCapacity = 3000000000
>
> # CHANNEL (MEM)
> a1.channels.c2.type = memory
> a1.channels.c2.capacity = 500
> a1.channels.c2.transactionCapacity = 1
> #a1.channels.c2.byteCapacity = 3000000000
>
> ## bind the source and sink to the channel
> a1.sources.r1.channels = c1 c2
> a1.sources.r1.selector.type = replicating
>
> a1.sinks.k1.channel = c1
> a1.sinks.k2.channel = c1
>
> a1.sinks.k3.channel = c2
> a1.sinks.k4.channel = c2
>
> --
> *Thanks!*
>

-- 
---------------------------------------------------------------------
This e-mail and any attachment is for authorised use by the intended 
recipient(s) only. This email contains confidential information. It should 
not be copied, disclosed to, retained or used by, any party other than the 
intended recipient. Any unauthorised distribution, dissemination or copying 
of this E-mail or its attachments, and/or any use of any information 
contained in them, is strictly prohibited and may be illegal. If you are 
not an intended recipient then please promptly delete this e-mail and any 
attachment and all copies and inform the sender directly via email. Any 
emails that you send to us may be monitored by systems or persons other 
than the named communicant for the purposes of ascertaining whether the 
communication complies with the law and company policies.

Mime
View raw message