flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Jones <andrewjone...@gmail.com>
Subject Re: Custom sink - "close() called when transaction is OPEN" error
Date Fri, 16 Nov 2012 16:17:44 GMT
OK, got it. I had this:

Event event = channel.take();
if (event == null) {
return Status.BACKOFF;
}

I changed it to:

if (event == null) {
        transaction.commit();
return Status.BACKOFF;
}

And it looks like it has fixed the issue.

Thanks a lot for your help. Much appreciated!

Andrew


On 16 November 2012 16:02, Brock Noland <brock@cloudera.com> wrote:

> If channel.take() returns null, no commit or rollback is called....
>
> Checkout how logger sink handles this:
>
>
> https://git-wip-us.apache.org/repos/asf?p=flume.git;a=blob;f=flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java;h=128fa8427af633c0c7c50093f8f6c4ef9bb8ae76;hb=HEAD
>
> brock
>
> On Fri, Nov 16, 2012 at 9:45 AM, Andrew Jones <andrewjones86@gmail.com>
> wrote:
> > Sure.
> >
> > Sink: http://pastebin.com/N6zh73hU
> > Config: http://pastebin.com/Tc2MH9iV
> >
> > Thanks.
> >
> >
> > On 16 November 2012 15:32, Brock Noland <brock@cloudera.com> wrote:
> >>
> >> Would you be able to send the source of your sink via pastbin in
> >> addition to your config?
> >>
> >> On Fri, Nov 16, 2012 at 9:21 AM, Andrew Jones <andrewjones86@gmail.com>
> >> wrote:
> >> > I tried logging the first throwable, but now that is just the
> >> > IllegalStateException.
> >> >
> >> > Today I have been looking at Flume-1.3.0rc3 and I have noticed the
> same
> >> > problem. This is using the Avro source, File channel and our custom
> >> > sink.
> >> > After Flume reloads its config, the first error message comes when the
> >> > Avro
> >> > source starts up:
> >> >
> >> > 16 Nov 2012 16:04:25,237 INFO  [lifecycleSupervisor-1-4]
> >> > (org.apache.flume.source.AvroSource.start:142)  - Starting Avro source
> >> > source: { bindAddress: 0.0.0.0, port: 36060 }...
> >> > 16 Nov 2012 16:04:25,484 ERROR
> >> > [SinkRunner-PollingRunner-DefaultSinkProcessor]
> >> > (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to
> deliver
> >> > event. Exception follows.
> >> > java.lang.IllegalStateException: close() called when transaction is
> OPEN
> >> > -
> >> > you must either commit or rollback first
> >> >         at
> >> >
> com.google.common.base.Preconditions.checkState(Preconditions.java:176)
> >> >         at
> >> >
> >> >
> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
> >> >         at
> >> >
> >> >
> com.arm.pd.brodie.flume.sink.ResultSink.processSingle(ResultSink.java:440)
> >> >         at
> >> > com.arm.pd.brodie.flume.sink.ResultSink.process(ResultSink.java:172)
> >> >         at
> >> >
> >> >
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> >> >         at
> >> > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> >> >         at java.lang.Thread.run(Thread.java:636)
> >> > 16 Nov 2012 16:04:25,594 INFO  [lifecycleSupervisor-1-4]
> >> > (org.apache.flume.instrumentation.MonitoredCounterGroup.register:89)
>  -
> >> > Monitoried counter group for type: SOURCE, name: source, registered
> >> > successfully.
> >> > 16 Nov 2012 16:04:25,595 INFO  [lifecycleSupervisor-1-4]
> >> > (org.apache.flume.instrumentation.MonitoredCounterGroup.start:73)  -
> >> > Component type: SOURCE, name: source started
> >> > 16 Nov 2012 16:04:25,595 INFO  [lifecycleSupervisor-1-4]
> >> > (org.apache.flume.source.AvroSource.start:168)  - Avro source source
> >> > started.
> >> >
> >> > I then continually get errors from the Sink, presumably as its been
> >> > called
> >> > periodically to check for events in the channel. So is it possible its
> >> > the
> >> > Avro source causing the issue?
> >> >
> >> > There should have been nothing persisted in the file channel when
> >> > restarting.
> >> >
> >> > When the transaction gets messed up like this, is there a way to
> refresh
> >> > it,
> >> > preferably without losing any data?
> >> >
> >> > I am still able to send things to flume and they get processed and
> >> > inserted
> >> > by my sink, so it still seems to work OK.
> >> >
> >> > Thanks,
> >> > Andrew
> >> >
> >> >
> >> >
> >> > On 15 November 2012 12:50, Brock Noland <brock@cloudera.com> wrote:
> >> >>
> >> >> Can you log the Throwable as the first thing in the catch block to
> see
> >> >> if something and what it is, is being thrown?
> >> >>
> >> >> Transactions are thread local so if for some reason the the
> sequencing
> >> >> gets messed up on an earlier call the process, every call to
> >> >> transaction will thrown an exception including begin.
> >> >>
> >> >>
> >> >>
> >> >>
> https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java
> >> >>
> >> >> As I stated in FLUME-1089 I think that when close is called it should
> >> >> forcefully destroy the transaction like JDBC close() but I have not
> >> >> got much agreement.
> >> >>
> >> >>
> >> >> On Thu, Nov 15, 2012 at 5:24 AM, Andrew Jones <
> andrewjones86@gmail.com>
> >> >> wrote:
> >> >> > We are using Flume 1.2.0. We have a custom source, although it
> passes
> >> >> > through an Avro Sink and Source before getting to the sink. We
are
> >> >> > now
> >> >> > using
> >> >> > the memory channel, although had just switched from the JDBC
> channel
> >> >> > when we
> >> >> > started seeing these errors, so maybe that's something to do with
> it?
> >> >> >
> >> >> > I tried wrapping transaction.rollback(); in a try catch and logging
> >> >> > in
> >> >> > the
> >> >> > catch, but it wasn't called, so I don't think the rollback is
> >> >> > throwing
> >> >> > an
> >> >> > error.
> >> >> >
> >> >> > I think it may have something to do with switching channels, as
> right
> >> >> > after
> >> >> > Flume reloaded the config we started getting errors. I have
> restarted
> >> >> > the
> >> >> > flume node manually and we are still getting the error.
> >> >> >
> >> >> > Thanks,
> >> >> > Andrew
> >> >> >
> >> >> >
> >> >> > On 14 November 2012 20:02, Hari Shreedharan
> >> >> > <hshreedharan@cloudera.com>
> >> >> > wrote:
> >> >> >>
> >> >> >> Which version of Flume are you using? It looks like the
> transaction
> >> >> >> was
> >> >> >> never rolled back or committed. It is likely that the rollback
> >> >> >> method
> >> >> >> too
> >> >> >> threw some exception, and the rollback was not successful.
Also,
> >> >> >> what
> >> >> >> channel are you using?
> >> >> >>
> >> >> >>
> >> >> >> Thanks,
> >> >> >> Hari
> >> >> >>
> >> >> >> --
> >> >> >> Hari Shreedharan
> >> >> >>
> >> >> >> On Wednesday, November 14, 2012 at 8:55 AM, Andrew Jones wrote:
> >> >> >>
> >> >> >> Hi,
> >> >> >>
> >> >> >> I have a custom sink which has been working fine, but recently
I
> >> >> >> have
> >> >> >> started seeing this error in the logs:
> >> >> >>
> >> >> >> Unable to deliver event. Exception follows.
> >> >> >> java.lang.IllegalStateException: close() called when transaction
> is
> >> >> >> OPEN -
> >> >> >> you must either commit or rollback first
> >> >> >>         at
> >> >> >>
> >> >> >>
> com.google.common.base.Preconditions.checkState(Preconditions.java:176)
> >> >> >> ...
> >> >> >>
> >> >> >>
> >> >> >> After having a google and finding
> >> >> >> https://issues.apache.org/jira/browse/FLUME-1089, I have double
> >> >> >> checked
> >> >> >> I am
> >> >> >> using the correct try, catch, finally idiom that other sinks
use,
> >> >> >> and I
> >> >> >> seem
> >> >> >> to be doing the same. I do the following:
> >> >> >>
> >> >> >> public Status process() throws EventDeliveryException {
> >> >> >> Status status = Status.READY;
> >> >> >>
> >> >> >> Channel channel = getChannel();
> >> >> >> Transaction transaction = channel.getTransaction();
> >> >> >>
> >> >> >> try {
> >> >> >> transaction.begin();
> >> >> >>
> >> >> >>                         // does a bit of processing and
> >> >> >>                         // writes out the event to MongoDB
> >> >> >>
> >> >> >>                         transaction.commit();
> >> >> >>
> >> >> >> } catch (Throwable t) {
> >> >> >> transaction.rollback();
> >> >> >>
> >> >> >> if (t instanceof Error) {
> >> >> >> throw (Error) t;
> >> >> >> } else if  (t instanceof EventDeliveryException) {
> >> >> >> throw (EventDeliveryException) t;
> >> >> >> } else if (t instanceof ChannelException) {
> >> >> >> logger.error("Brodie Log Sink " + getName() + ": Unable to
get
> event
> >> >> >> from"
> >> >> >> +
> >> >> >> " channel " + channel.getName() + ". Exception follows.",
t);
> >> >> >> status = Status.BACKOFF;
> >> >> >> } else {
> >> >> >> throw new EventDeliveryException("Failed to send events",
t);
> >> >> >> }
> >> >> >> } finally {
> >> >> >> transaction.close();
> >> >> >> }
> >> >> >>
> >> >> >> return status;
> >> >> >> }
> >> >> >>
> >> >> >> }
> >> >> >>
> >> >> >> All of this code came from looking at other sinks (Avro and
HDFS),
> >> >> >> so I
> >> >> >> am
> >> >> >> pretty sure its correct.
> >> >> >>
> >> >> >> Can anyone see anything that might be a problem, or is there
> >> >> >> anything
> >> >> >> else
> >> >> >> I can do to avoid this error?
> >> >> >>
> >> >> >> Thanks,
> >> >> >> Andrew
> >> >> >>
> >> >> >>
> >> >> >
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >> Apache MRUnit - Unit testing MapReduce -
> >> >> http://incubator.apache.org/mrunit/
> >> >
> >> >
> >>
> >>
> >>
> >> --
> >> Apache MRUnit - Unit testing MapReduce -
> >> http://incubator.apache.org/mrunit/
> >
> >
>
>
>
> --
> Apache MRUnit - Unit testing MapReduce -
> http://incubator.apache.org/mrunit/
>

Mime
View raw message