flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Brock Noland <br...@cloudera.com>
Subject Re: HDFSsink failover error
Date Tue, 15 Jan 2013 00:30:23 GMT
This is https://issues.apache.org/jira/browse/FLUME-1779

On Mon, Jan 14, 2013 at 4:25 PM, Connor Woodson <cwoodson.dev@gmail.com> wrote:
> Oh alright, found it. What is happening is that the HDFS sink does not throw
> an exception for this write error, but instead returns a Status.BACKOFF, and
> as such the failover processor doesn't think this sink failed.
>
> (What is strange is that the processor deals with the backoff message for
> failed sinks, but not active sinks).
>
> So until that's fixed there isn't a clean way to fix this. The best solution
> I can offer is to get the source code for Flume (either the latest one or
> the 1.3.1 tag), and make the following change:
>
> In the process method of the HDFSEventSink, in the catch-statements, change:
>
> LOG.warn("HDFS IO error", eIO);
> return Status.BACKOFF;
>
>
> to:
>
> LOG.warn("HDFS IO error", eIO);
> throw eIO;
>
>
> (Line 457 in 1.3.1 or line 454 in trunk)
>
> What this will end up doing is if you ever use the sink outside of a
> failover processor, when there is a write error then the sink will throw an
> exception and it will probably stop - so, this change will only make it able
> to work within the failover sink processor. Optionally, you could make a
> copy of the HDFSEventSink (call it FailoverHDFSEventSink if you want) and
> put that change in it, so that you can have both versions of the sink.
>
> (if you want instructions on compiling Flume after this change, look for the
> thread 'custome serializer')
>
> Unfortunate issue, but it will be fixed in 1.4 I'm sure.
>
> - Connor
>
> On Mon, Jan 14, 2013 at 4:00 PM, Rahul Ravindran <rahulrv@yahoo.com> wrote:
>>
>> Here is the entire log file after I restart flume
>>
>> ________________________________
>> From: Connor Woodson <cwoodson.dev@gmail.com>
>> To: "user@flume.apache.org" <user@flume.apache.org>; Rahul Ravindran
>> <rahulrv@yahoo.com>
>> Sent: Monday, January 14, 2013 3:51 PM
>>
>> Subject: Re: HDFSsink failover error
>>
>> Can you look at the full log file and post the above section as well as
>> 5-10 lines above/below it (you don't have to post that stack trace if you
>> don't want)? Because that error, while it should definitely be logged,
>> should be followed by some error lines giving context as to what is going
>> on. And if that is the end of the log file then...well, that just shouldn't
>> happen, as there are several different places that would have produced log
>> messages as that exception propagates
>>
>> - Connor
>>
>> On Mon, Jan 14, 2013 at 3:13 PM, Rahul Ravindran <rahulrv@yahoo.com>
>> wrote:
>>
>> The writes to the backup were successful when I attempted to write to it
>> directly but not via the failover sink processor. I did not see the warning
>> that you mentioned about "Sink hdfs-sink1failed".
>>
>> The full log trace is below:
>>
>> 14 Jan 2013 22:48:24,727 INFO  [hdfs-hdfs-sink2-call-runner-1]
>> (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating
>> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102//event.1358203448551.tmp
>> 14 Jan 2013 22:48:24,739 WARN
>> [SinkRunner-PollingRunner-FailoverSinkProcessor]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
>>
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>> Operation category WRITE is not supported in state standby
>>         at
>> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
>>         at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
>>         at
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
>>         at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
>>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
>>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
>>         at java.security.AccessController.doPrivileged(Native Method)
>>         at javax.security.auth.Subject.doAs(Subject.java:396)
>>         at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
>>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
>>
>>         at org.apache.hadoop.ipc.Client.call(Client.java:1160)
>>         at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>>         at $Proxy11.create(Unknown Source)
>>         at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:616)
>>         at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
>>         at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
>>         at $Proxy11.create(Unknown Source)
>>         at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
>>         at
>> org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
>>         at
>> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
>>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
>>         at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
>>         at
>> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
>>         at
>> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
>>         at
>> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
>>        at
>> org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:209)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:364)
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
>>         at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>         at java.lang.Thread.run(Thread.java:679)
>>
>> ________________________________
>> From: Connor Woodson <cwoodson.dev@gmail.com>
>> To: Rahul Ravindran <rahulrv@yahoo.com>
>> Cc: "user@flume.apache.org" <user@flume.apache.org>
>> Sent: Monday, January 14, 2013 3:05 PM
>>
>> Subject: Re: HDFSsink failover error
>>
>> So you are able to write normally to the back-up HDFS servers? And that
>> error you got was when you were trying to write to the normal server? Was it
>> supposed to be an error (it looks like it's due to how your Hadoop is set
>> up)?
>>
>> The log lines you pasted make it look like there was a problem with your
>> hdfs-sink1 (like I said above, maybe your Hadoop cluster is set up wrong);
>> what should have happened is that the event was then written to the backup
>> server. Below the stack trace there should probably have been another WARN
>> statement saying "Sink hdfs-sink1 failed and has been sent to the failover
>> list". And if hdfs-sink1-back then was unable to write, you would see a
>> thrown EventDeliveryException in your log.
>>
>> If there isn't anything else in the log, and the event wasn't written to
>> the backup server, then that would be a bug.
>>
>> - Connor
>>
>>
>> On Mon, Jan 14, 2013 at 2:46 PM, Rahul Ravindran <rahulrv@yahoo.com>
>> wrote:
>>
>> Here is the full config. I swapped the priorities on the sink processor
>> after performing the namenode failiver and the writes were successful to the
>> newly active namenode.
>>
>> agent1.channels.ch1.type = FILE
>> agent1.channels.ch1.checkpointDir = /flume_runtime/checkpoint
>> agent1.channels.ch1.dataDirs = /flume_runtime/data
>>
>>
>> agent1.channels.ch2.type = FILE
>> agent1.channels.ch2.checkpointDir = /flume_runtime/checkpoint2
>> agent1.channels.ch2.dataDirs = /flume_runtime/data2
>>
>>
>>
>> # Define an Avro source called avro-source1 on agent1 and tell it
>>
>> # to bind to 0.0.0.0:41414. Connect it to channel ch1.
>>
>> agent1.sources.avro-source1.channels = ch1
>> agent1.sources.avro-source1.type = avro
>> agent1.sources.avro-source1.bind = 0.0.0.0
>> agent1.sources.avro-source1.port = 4545
>>
>>
>>
>> agent1.sources.avro-source2.channels = ch2
>> agent1.sources.avro-source2.type = avro
>> agent1.sources.avro-source2.bind = 0.0.0.0
>> agent1.sources.avro-source2.port = 4546
>>
>>
>> agent1.sinks.hdfs-sink1.channel = ch1
>> agent1.sinks.hdfs-sink1.type = hdfs
>> agent1.sinks.hdfs-sink1.hdfs.path =
>> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
>> agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
>> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
>> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
>> agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
>> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
>> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
>> agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
>> agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>>
>> agent1.sinks.hdfs-sink2.channel = ch2
>> agent1.sinks.hdfs-sink2.type = hdfs
>> agent1.sinks.hdfs-sink2.hdfs.path =
>> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host102/
>> agent1.sinks.hdfs-sink2.hdfs.filePrefix = event
>> agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text
>> agent1.sinks.hdfs-sink2.hdfs.rollInterval = 120
>> agent1.sinks.hdfs-sink2.hdfs.rollCount = 0
>> agent1.sinks.hdfs-sink2.hdfs.rollSize = 0
>> agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream
>> agent1.sinks.hdfs-sink2.hdfs.batchSize = 1000
>> agent1.sinks.hdfs-sink2.hdfs.txnEventSize = 1000
>>
>>
>> agent1.sinks.hdfs-sink1-back.channel = ch1
>> agent1.sinks.hdfs-sink1-back.type = hdfs
>> agent1.sinks.hdfs-sink1-back.hdfs.path =
>> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
>> agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
>> agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
>> agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
>> agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
>> agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
>> agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
>> agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
>> agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>>
>> agent1.sinks.hdfs-sink2-back.channel = ch2
>> agent1.sinks.hdfs-sink2-back.type = hdfs
>> agent1.sinks.hdfs-sink2-back.hdfs.path =
>> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host102/
>> agent1.sinks.hdfs-sink2-back.hdfs.filePrefix = event
>> agent1.sinks.hdfs-sink2-back.hdfs.writeFormat = Text
>> agent1.sinks.hdfs-sink2-back.hdfs.rollInterval = 120
>> agent1.sinks.hdfs-sink2-back.hdfs.rollCount = 0
>> agent1.sinks.hdfs-sink2-back.hdfs.rollSize = 0
>> agent1.sinks.hdfs-sink2-back.hdfs.fileType = DataStream
>> agent1.sinks.hdfs-sink2-back.hdfs.batchSize = 1000
>> agent1.sinks.hdfs-sink2-back.hdfs.txnEventSize = 1000
>>
>>
>>
>> agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
>> agent1.sinkgroups.failoverGroup1.processor.type = failover
>> #higher number in priority is higher priority
>> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
>> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
>> #failover if failure detected for 10 seconds
>> agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>>
>>
>> agent1.sinkgroups.failoverGroup2.sinks = hdfs-sink2 hdfs-sink2-back
>> agent1.sinkgroups.failoverGroup2.processor.type = failover
>> #higher number in priority is higher priority
>> agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2 = 10
>> agent1.sinkgroups.failoverGroup2.processor.priority.hdfs-sink2-back = 5
>> #failover if failure detected for 10 seconds
>> agent1.sinkgroups.failoverGroup2.processor.maxpenalty = 10000
>>
>> # Finally, now that we've defined all of our components, tell
>> # agent1 which ones we want to activate.
>> agent1.sinkgroups = failoverGroup1 failoverGroup2
>> agent1.channels = ch1 ch2
>> agent1.sources = avro-source1 avro-source2
>> agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink1-back hdfs-sink2-back
>>
>> ________________________________
>> From: Connor Woodson <cwoodson.dev@gmail.com>
>> To: user@flume.apache.org; Rahul Ravindran <rahulrv@yahoo.com>
>> Sent: Monday, January 14, 2013 2:28 PM
>> Subject: Re: HDFSsink failover error
>>
>> I assume that's only part of your config as it's missing a source; if you
>> get rid of the sink processor, can you write to each hdfs sink individually?
>> (comment one out at a time)
>>
>> - Connor
>>
>>
>> On Mon, Jan 14, 2013 at 1:42 PM, Rahul Ravindran <rahulrv@yahoo.com>
>> wrote:
>>
>> Hi,
>>    I am attempting to setup an HDFS sink such that when a namenode
>> failover occurs (active namenode is brought down and the standby namenode
>> switches to active), the failover sink would send events to the new active
>> namenode. I see an error about WRITE not supported in standby state..Does
>> this not count as a failure for the failover sink?
>> Thanks,
>> ~Rahul.
>>
>> My config is as follows:
>>
>> agent1.sinks.hdfs-sink1.channel = ch1
>> agent1.sinks.hdfs-sink1.type = hdfs
>> agent1.sinks.hdfs-sink1.hdfs.path =
>> hdfs://ip-10-4-71-187.ec2.internal/user/br/shim/eventstream/event/host101/
>> agent1.sinks.hdfs-sink1.hdfs.filePrefix = event
>> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
>> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 120
>> agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
>> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
>> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
>> agent1.sinks.hdfs-sink1.hdfs.batchSize = 1000
>> agent1.sinks.hdfs-sink1.hdfs.txnEventSize = 1000
>>
>> agent1.sinks.hdfs-sink1-back.channel = ch1
>> agent1.sinks.hdfs-sink1-back.type = hdfs
>> agent1.sinks.hdfs-sink1-back.hdfs.path =
>> hdfs://ip-10-110-69-240.ec2.internal/user/br/shim/eventstream/event/host101/
>> agent1.sinks.hdfs-sink1-back.hdfs.filePrefix = event
>> agent1.sinks.hdfs-sink1-back.hdfs.writeFormat = Text
>> agent1.sinks.hdfs-sink1-back.hdfs.rollInterval = 120
>> agent1.sinks.hdfs-sink1-back.hdfs.rollCount = 0
>> agent1.sinks.hdfs-sink1-back.hdfs.rollSize = 0
>> agent1.sinks.hdfs-sink1-back.hdfs.fileType = DataStream
>> agent1.sinks.hdfs-sink1-back.hdfs.batchSize = 1000
>> agent1.sinks.hdfs-sink1-back.hdfs.txnEventSize = 1000
>>
>> agent1.sinkgroups.failoverGroup1.sinks = hdfs-sink1 hdfs-sink1-back
>> agent1.sinkgroups.failoverGroup1.processor.type = failover
>> #higher number in priority is higher priority
>> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1 = 10
>> agent1.sinkgroups.failoverGroup1.processor.priority.hdfs-sink1-back = 5
>> #failover if failure detected for 10 seconds
>> agent1.sinkgroups.failoverGroup1.processor.maxpenalty = 10000
>>
>>
>> agent1.sinkgroups = failoverGroup1
>>
>> 14 Jan 2013 21:37:28,819 INFO  [hdfs-hdfs-sink2-call-runner-6]
>> (org.apache.flume.sink.hdfs.BucketWriter.doOpen:208)  - Creating
>> hdfs://ip-10-4-71-187.ec2.internal/....
>> 14 Jan 2013 21:37:28,834 WARN
>> [SinkRunner-PollingRunner-FailoverSinkProcessor]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:456)  - HDFS IO error
>>
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>> Operation category WRITE is not supported in state standby
>>         at
>> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1379)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:762)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:1688)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1669)
>>         at
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:409)
>>         at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:205)
>>         at
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44068)
>>         at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
>>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:898)
>>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1693)
>>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1689)
>>         at java.security.AccessController.doPrivileged(Native Method)
>>         at javax.security.auth.Subject.doAs(Subject.java:396)
>>         at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
>>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1687)
>>
>>         at org.apache.hadoop.ipc.Client.call(Client.java:1160)
>>         at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
>>         at $Proxy11.create(Unknown Source)
>>         at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:616)
>>         at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMetho
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>



-- 
Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/

Mime
View raw message