flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joe Lawson <jlaw...@opensourceconnections.com>
Subject Re: Unable to deliver event. org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch has been closed()
Date Tue, 05 Jul 2016 19:41:25 GMT
You may want to look here:
https://community.hortonworks.com/content/kbentry/4321/hive-acid-current-state.html

Flume 1.5.2 doesn't include Hive support AFAIK so whatever they built for
Hortonworks is their own build. Note on the sink docs it says, "This sink
is provided as a preview feature and not recommended for use in
production." (
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.0/ds_flume/FlumeUserGuide.html)
 It appears they are using hive version 1.2.1. Not sure what version the
Sink lines up to.

Looking here: http://hortonworks.com/blog/adding-acid-to-apache-hive/

I appears that Hive go support in 0.14.0 for ACID inserts (
https://issues.apache.org/jira/browse/HIVE-5317) but has a but
https://issues.apache.org/jira/browse/HIVE-12307?jql=project%20%3D%20HIVE%20AND%20text%20~%20%22TransactionBatch%20closed%22
about transactions closing that fixes in hive 1.3.0.

On Tue, Jul 5, 2016 at 1:01 AM, Thanh Hong Dai <hdthanh@tma.com.vn> wrote:

> I forgot to include the version information. I’m currently using Flume
> 1.5.2 from HDP 2.4.2.
>
>
>
> Looking at the changelog of Flume 1.6.0, the latest version, there seems
> to be some improvements for Hive support.
>
> This makes me wondering - does Flume 1.5.2 support Hive streaming to ACID
> table?
>
>
>
> Best regards,
>
> Thanh Hong.
>
>
>
> *From:* Thanh Hong Dai [mailto:hdthanh@tma.com.vn]
> *Sent:* Tuesday, 5 July, 2016 11:47 AM
> *To:* user@flume.apache.org
> *Subject:* Unable to deliver event.
> org.apache.flume.EventDeliveryException: java.lang.IllegalStateException:
> TransactionBatch has been closed()
>
>
>
> Does anyone knows the cause of this exception when using Hive Sink, and
> how to fix it?
>
>
>
> The Hive Sink managed to write data in the Hive table for a few minutes
> (which I can confirm by querying the table), but then it shows the
> Exception below in the log file (/var/log/flume/flume-<streamname>.log) for
> all the nodes.
>
>
>
> 05 Jul 2016 04:24:22,737 ERROR
> [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
> event. Exception follows.
>
> org.apache.flume.EventDeliveryException: java.lang.IllegalStateException:
> TransactionBatch TxnIds=[29489...30488] on endPoint =
> {metaStoreUri='thrift://hive.metastore:9083', database='default',
> table='acid', partitionVals=[0804] } has been closed()
>
>         at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375)
>
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.IllegalStateException: TransactionBatch
> TxnIds=[29489...30488] on endPoint =
> {metaStoreUri='thrift://hive.metastore:9083', database='default',
> table='acid', partitionVals=[0804] } has been closed()
>
>         at
> org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:690)
>
>         at
> org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:729)
>
>         at
> org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:686)
>
>         at
> org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTextSerializer.java:48)
>
>         at
> org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161)
>
>         at
> org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155)
>
>         at
> org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
>
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         ... 1 more
>
> 05 Jul 2016 04:24:27,891 ERROR
> [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
> event. Exception follows.
>
> org.apache.flume.EventDeliveryException: java.lang.IllegalStateException:
> TransactionBatch TxnIds=[29489...30488] on endPoint =
> {metaStoreUri='thrift://hive.metastore:9083', database='default',
> table='acid', partitionVals=[0804] } has been closed()
>
>         at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375)
>
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.IllegalStateException: TransactionBatch
> TxnIds=[29489...30488] on endPoint =
> {metaStoreUri='thrift://hive.metastore:9083', database='default',
> table='acid', partitionVals=[0804] } has been closed()
>
>         at
> org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:690)
>
>         at
> org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:729)
>
>         at
> org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:686)
>
>         at
> org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTextSerializer.java:48)
>
>         at
> org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161)
>
>         at
> org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155)
>
>         at
> org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
>
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         ... 1 more
>
>
>
> My flume.conf file:
>
>
>
> # acidstream - streaming data from Kafka into Hive transactional table
>
> acidstream.sources = kafka-source
>
> acidstream.sinks = hive-sink
>
> acidstream.channels = gutter
>
>
>
> acidstream.sources.kafka-source.channels = gutter
>
> acidstream.sources.kafka-source.type =
> org.apache.flume.source.kafka.KafkaSource
>
> acidstream.sources.kafka-source.zookeeperConnect =
> chdhost125.vitaldev.tma.com.vn:2181,chdhost27.vitaldev.tma.com.vn:2181,
> chdhost185.vitaldev.tma.com.vn:2181
>
> acidstream.sources.kafka-source.topic = lan
>
> acidstream.sources.kafka-source.groupId = acid
>
> acidstream.sources.kafka-source.batchSize = 10000
>
> acidstream.sources.kafka-source.batchDurationMillis = 60000
>
> acidstream.sources.kafka-source.kafka.consumer.timeout.ms = 200
>
>
>
> acidstream.sources.kafka-source.interceptors = i1
>
> acidstream.sources.kafka-source.interceptors.i1.type = regex_extractor
>
> acidstream.sources.kafka-source.interceptors.i1.regex = ^(
> \\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2})
>
> acidstream.sources.kafka-source.interceptors.i1.serializers = s1
>
> acidstream.sources.kafka-source.interceptors.i1.serializers.s1.type =
> org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
>
> acidstream.sources.kafka-source.interceptors.i1.serializers.s1.name =
> timestamp
>
> acidstream.sources.kafka-source.interceptors.i1.serializers.s1.pattern =
> yyyy-MM-dd HH:mm:ss
>
>
>
> acidstream.sinks.hive-sink.channel = gutter
>
> acidstream.sinks.hive-sink.type = hive
>
> acidstream.sinks.hive-sink.hive.metastore = thrift://hive.metastore:9083
>
> acidstream.sinks.hive-sink.hive.database = default
>
> acidstream.sinks.hive-sink.hive.table = acid
>
> acidstream.sinks.hive-sink.hive.partition = %m%d
>
> acidstream.sinks.hive-sink.heartBeatInterval = 10
>
> acidstream.sinks.hive-sink.useLocalTimeStamp = false
>
> acidstream.sinks.hive-sink.round = false
>
> acidstream.sinks.hive-sink.hive.txnsPerBatchAsk = 1000
>
> acidstream.sinks.hive-sink.batchSize = 10000
>
> acidstream.sinks.hive-sink.callTimeout = 30000
>
> acidstream.sinks.hive-sink.serializer = DELIMITED
>
> acidstream.sinks.hive-sink.serializer.delimiter = "\t"
>
> acidstream.sinks.hive-sink.serializer.serdeSeparator = '\t'
>
> acidstream.sinks.hive-sink.serializer.fieldnames = timestamp,id,data
>
>
>
> acidstream.channels.gutter.type = memory
>
> acidstream.channels.gutter.capacity = 100000
>
> acidstream.channels.gutter.transactionCapacity = 50000
>
>
>
> My flume-env file has this line added:
>
>
>
> export JAVA_OPTS="-Xms100m -Xmx3g"
>
>
>
> My table on Hive has the following properties:
>
>
>
> PARTITIONED BY (md string)
>
> CLUSTERED BY (id) INTO 10 BUCKETS
>
> STORED AS ORC
>
> TBLPROPERTIES ('transactional' = 'true');
>
>
>
> Hive has Tez engine set as the default execution engine.
>
>
>
> Could this error be caused by low number of threads? (NameNode has 100
> server threads available)
>
>
>
> Best regards,
>
> Thanh Hong.
>
>
>

Mime
View raw message