flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Xuesong Ding <xd...@appannie.com>
Subject Error occurs when use flume to put data into s3
Date Mon, 20 Oct 2014 07:19:42 GMT
Dear all,

  We use flume-ng to put data into s3 and hdfs both, but it occurs some
errors when close s3 file. Should we adjust flume parameters or do
something else? Any ideas? Below are the error itself and our flume-ng
configuration, thanks a lot.

  BTW, our flume-ng version is cdh4.7.0

​  Exception stack trace:
  2014-10-14 12:00:30,158 ERROR org.apache.flume.sink.hdfs.BucketWriter:
Unexpected error
com.cloudera.org.apache.http.NoHttpResponseException: The target server
failed to respond
        at
com.cloudera.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:95)
        at
com.cloudera.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:62)
        at
com.cloudera.org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:254)
        at
com.cloudera.org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:289)
        at
com.cloudera.org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:252)
        at
com.cloudera.org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219)
        at
com.cloudera.org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:300)
        at
com.cloudera.org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:127)
        at
com.cloudera.org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:715)
        at
com.cloudera.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:520)
        at
com.cloudera.org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:906)
        at
com.cloudera.org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:805)
        at
com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:334)
        at
com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:281)
        at
com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestPut(RestStorageService.java:1043)
        at
com.cloudera.org.jets3t.service.impl.rest.httpclient.RestStorageService.copyObjectImpl(RestStorageService.java:2029)
        at
com.cloudera.org.jets3t.service.StorageService.copyObject(StorageService.java:871)
        at
com.cloudera.org.jets3t.service.StorageService.copyObject(StorageService.java:916)
        at
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:314)
        at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
        at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
        at org.apache.hadoop.fs.s3native.$Proxy18.copy(Unknown Source)
        at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.rename(NativeS3FileSystem.java:645)
        at
org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:541)
        at
org.apache.flume.sink.hdfs.BucketWriter$8$1.run(BucketWriter.java:589)
        at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:161)
        at
org.apache.flume.sink.hdfs.BucketWriter.access$800(BucketWriter.java:57)
        at
org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:586)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

    ​Flume-ng configuration:
  ## THIS FILE CONTAINS FLUME TIER_1 CONFIGURATION

# DIFINE COMPONENTS
a1.sources = r1
a1.sinks =  k1 k2 k3 k4
a1.channels = c1 c2
a1.sinkgroups = g1 g2

# SOURCE(CUSTOM)
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /ssd/disk2
a1.sources.r1.deletePolicy= never
a1.sources.r1.ignorePattern= ^.*\\.tmp$
a1.sources.r1.batchSize= 1
a1.sources.r1.deserializer=
org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
a1.sources.r1.deserializer.maxBlobLength = 300000000

# SINK (HDFS)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.filePrefix = packet
a1.sinks.k1.hdfs.batchSize= 1
a1.sinks.k1.hdfs.fileSuffix = .snappy
a1.sinks.k1.hdfs.codeC = snappy
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 500000000
a1.sinks.k1.hdfs.rollInterval = 300
a1.sinks.k1.hdfs.path = hdfs:/${path}

# SINK (HDFS)
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.filePrefix = packet
a1.sinks.k2.hdfs.batchSize= 1
a1.sinks.k2.hdfs.fileSuffix = .snappy
a1.sinks.k2.hdfs.codeC = snappy
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollSize = 500000000
a1.sinks.k2.hdfs.rollInterval = 300
a1.sinks.k2.hdfs.path = hdfs://${path}

#SINK (S3)
a1.sinks.k3.type = hdfs
a1.sinks.k3.hdfs.filePrefix = packet
a1.sinks.k3.hdfs.batchSize= 1
a1.sinks.k3.hdfs.fileSuffix = .snappy
a1.sinks.k3.hdfs.codeC = snappy
a1.sinks.k3.hdfs.fileType = CompressedStream
a1.sinks.k3.hdfs.rollCount = 0
a1.sinks.k3.hdfs.rollSize = 500000000
a1.sinks.k3.hdfs.rollInterval = 300
a1.sinks.k3.hdfs.path = s3n://${path}

#SINK (S3)
a1.sinks.k4.type = hdfs
a1.sinks.k4.hdfs.filePrefix = packet
a1.sinks.k4.hdfs.batchSize= 1
a1.sinks.k4.hdfs.fileSuffix = .snappy
a1.sinks.k4.hdfs.codeC = snappy
a1.sinks.k4.hdfs.fileType = CompressedStream
a1.sinks.k4.hdfs.rollCount = 0
a1.sinks.k4.hdfs.rollSize = 500000000
a1.sinks.k4.hdfs.rollInterval = 300
a1.sinks.k4.hdfs.path = s3n://${path}

a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

a1.sinkgroups.g2.sinks = k3 k4
a1.sinkgroups.g2.processor.type = load_balance
a1.sinkgroups.g2.processor.backoff = true
a1.sinkgroups.g2.processor.selector = random

# INTERCEPTORS (TIMESTAMP FOR HDFS PATH)
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.preserveExisting = false

# CHANNEL (MEM)
a1.channels.c1.type = memory
a1.channels.c1.capacity = 500
a1.channels.c1.transactionCapacity = 1
#a1.channels.c1.byteCapacity = 3000000000

# CHANNEL (MEM)
a1.channels.c2.type = memory
a1.channels.c2.capacity = 500
a1.channels.c2.transactionCapacity = 1
#a1.channels.c2.byteCapacity = 3000000000

## bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.type = replicating

a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

a1.sinks.k3.channel = c2
a1.sinks.k4.channel = c2

-- 
*Thanks!*

Mime
View raw message