flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Thanh Hong Dai" <hdth...@tma.com.vn>
Subject RE: Unable to deliver event. org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch has been closed()
Date Tue, 05 Jul 2016 05:01:49 GMT
I forgot to include the version information. I'm currently using Flume 1.5.2
from HDP 2.4.2.

 

Looking at the changelog of Flume 1.6.0, the latest version, there seems to
be some improvements for Hive support.

This makes me wondering - does Flume 1.5.2 support Hive streaming to ACID
table?

 

Best regards,

Thanh Hong.

 

From: Thanh Hong Dai [mailto:hdthanh@tma.com.vn] 
Sent: Tuesday, 5 July, 2016 11:47 AM
To: user@flume.apache.org
Subject: Unable to deliver event. org.apache.flume.EventDeliveryException:
java.lang.IllegalStateException: TransactionBatch has been closed()

 

Does anyone knows the cause of this exception when using Hive Sink, and how
to fix it?

 

The Hive Sink managed to write data in the Hive table for a few minutes
(which I can confirm by querying the table), but then it shows the Exception
below in the log file (/var/log/flume/flume-<streamname>.log) for all the
nodes.

 

05 Jul 2016 04:24:22,737 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
event. Exception follows.

org.apache.flume.EventDeliveryException: java.lang.IllegalStateException:
TransactionBatch TxnIds=[29489...30488] on endPoint =
{metaStoreUri='thrift://hive.metastore:9083', database='default',
table='acid', partitionVals=[0804] } has been closed()

        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375)

        at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java
:68)

        at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalStateException: TransactionBatch
TxnIds=[29489...30488] on endPoint =
{metaStoreUri='thrift://hive.metastore:9083', database='default',
table='acid', partitionVals=[0804] } has been closed()

        at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIs
Closed(HiveEndPoint.java:690)

        at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H
iveEndPoint.java:729)

        at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H
iveEndPoint.java:686)

        at
org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTe
xtSerializer.java:48)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155)

        at
org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)

        ... 1 more

05 Jul 2016 04:24:27,891 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
event. Exception follows.

org.apache.flume.EventDeliveryException: java.lang.IllegalStateException:
TransactionBatch TxnIds=[29489...30488] on endPoint =
{metaStoreUri='thrift://hive.metastore:9083', database='default',
table='acid', partitionVals=[0804] } has been closed()

        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375)

        at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java
:68)

        at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalStateException: TransactionBatch
TxnIds=[29489...30488] on endPoint =
{metaStoreUri='thrift://hive.metastore:9083', database='default',
table='acid', partitionVals=[0804] } has been closed()

        at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIs
Closed(HiveEndPoint.java:690)

        at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H
iveEndPoint.java:729)

        at
org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(H
iveEndPoint.java:686)

        at
org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTe
xtSerializer.java:48)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155)

        at
org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)

        ... 1 more

 

My flume.conf file:

 

# acidstream - streaming data from Kafka into Hive transactional table

acidstream.sources = kafka-source

acidstream.sinks = hive-sink

acidstream.channels = gutter

 

acidstream.sources.kafka-source.channels = gutter

acidstream.sources.kafka-source.type =
org.apache.flume.source.kafka.KafkaSource

acidstream.sources.kafka-source.zookeeperConnect =
chdhost125.vitaldev.tma.com.vn:2181,chdhost27.vitaldev.tma.com.vn:2181,chdho
st185.vitaldev.tma.com.vn:2181

acidstream.sources.kafka-source.topic = lan

acidstream.sources.kafka-source.groupId = acid

acidstream.sources.kafka-source.batchSize = 10000

acidstream.sources.kafka-source.batchDurationMillis = 60000

acidstream.sources.kafka-source.kafka.consumer.timeout.ms = 200

 

acidstream.sources.kafka-source.interceptors = i1

acidstream.sources.kafka-source.interceptors.i1.type = regex_extractor

acidstream.sources.kafka-source.interceptors.i1.regex =
^(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2}
<file:///\\d%7b4%7d-\d%7b2%7d-\d%7b2%7d\s\d%7b2%7d:\d%7b2%7d:\d%7b2%7d> )

acidstream.sources.kafka-source.interceptors.i1.serializers = s1

acidstream.sources.kafka-source.interceptors.i1.serializers.s1.type =
org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer

acidstream.sources.kafka-source.interceptors.i1.serializers.s1.name =
timestamp

acidstream.sources.kafka-source.interceptors.i1.serializers.s1.pattern =
yyyy-MM-dd HH:mm:ss

 

acidstream.sinks.hive-sink.channel = gutter

acidstream.sinks.hive-sink.type = hive

acidstream.sinks.hive-sink.hive.metastore = thrift://hive.metastore:9083

acidstream.sinks.hive-sink.hive.database = default

acidstream.sinks.hive-sink.hive.table = acid

acidstream.sinks.hive-sink.hive.partition = %m%d

acidstream.sinks.hive-sink.heartBeatInterval = 10

acidstream.sinks.hive-sink.useLocalTimeStamp = false

acidstream.sinks.hive-sink.round = false

acidstream.sinks.hive-sink.hive.txnsPerBatchAsk = 1000

acidstream.sinks.hive-sink.batchSize = 10000

acidstream.sinks.hive-sink.callTimeout = 30000

acidstream.sinks.hive-sink.serializer = DELIMITED

acidstream.sinks.hive-sink.serializer.delimiter = "\t"

acidstream.sinks.hive-sink.serializer.serdeSeparator = '\t'

acidstream.sinks.hive-sink.serializer.fieldnames = timestamp,id,data

 

acidstream.channels.gutter.type = memory

acidstream.channels.gutter.capacity = 100000

acidstream.channels.gutter.transactionCapacity = 50000

 

My flume-env file has this line added:

 

export JAVA_OPTS="-Xms100m -Xmx3g"

 

My table on Hive has the following properties:

 

PARTITIONED BY (md string)

CLUSTERED BY (id) INTO 10 BUCKETS

STORED AS ORC

TBLPROPERTIES ('transactional' = 'true');

 

Hive has Tez engine set as the default execution engine.

 

Could this error be caused by low number of threads? (NameNode has 100
server threads available)

 

Best regards,

Thanh Hong.

 


Mime
View raw message