hive-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Michal Klempa (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (HIVE-15658) hive.ql.session.SessionState start() is not atomic, SessionState thread local variable can get into inconsistent state
Date Fri, 20 Jan 2017 12:18:26 GMT

    [ https://issues.apache.org/jira/browse/HIVE-15658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15831627#comment-15831627
] 

Michal Klempa commented on HIVE-15658:
--------------------------------------

Another exception type, which can cause this error to happen is, when during creation of directories
in HDFS the InterruptedException occurs. See the stack trace:
{code}

2017-01-19 08:36:19,917 WARN org.apache.hadoop.ipc.Client: interrupted waiting to send rpc
request to server
java.lang.InterruptedException
        at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:400)
        at java.util.concurrent.FutureTask.get(FutureTask.java:187)
        at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1055)
        at org.apache.hadoop.ipc.Client.call(Client.java:1450)
        at org.apache.hadoop.ipc.Client.call(Client.java:1408)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
        at com.sun.proxy.$Proxy24.getFileInfo(Unknown Source) 
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:762)
        at sun.reflect.GeneratedMethodAccessor68.invoke(Unknown Source) 
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
        at com.sun.proxy.$Proxy25.getFileInfo(Unknown Source) 
        at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2102)
        at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1215)
        at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1211)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1211)
        at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1412)
        at org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:616)
        at org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:574)
        at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:518)
        at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:358)
        at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
        at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
        at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
        at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
        at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
        at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
        at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:373)
        at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
{code}

> hive.ql.session.SessionState start() is not atomic, SessionState thread local variable
can get into inconsistent state
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-15658
>                 URL: https://issues.apache.org/jira/browse/HIVE-15658
>             Project: Hive
>          Issue Type: Bug
>          Components: API, HCatalog
>    Affects Versions: 1.1.0, 1.2.1, 2.0.0, 2.0.1
>         Environment: CDH5.8.0, Flume 1.6.0, Hive 1.1.0
>            Reporter: Michal Klempa
>         Attachments: HIVE-15658_branch-1.2_1.patch, HIVE-15658_branch-2.1_1.patch
>
>
> Method start() in hive.ql.session.SessionState is supposed to setup needed preconditions,
like HDFS scratch directories for session.
> This happens to be not an atomic operation with setting thread local variable, which
can later be obtained by calling SessionState.get().
> Therefore, even is the start() method itself fails, the SessionState.get() does not return
null and further re-use of the thread which previously invoked start() may lead to obtaining
SessionState object in inconsistent state.
> I have observed this using Flume Hive Sink, which uses Hive Streaming interface. When
the directory /tmp/hive is not writable by session user, the start() method fails (throwing
RuntimeException). If the thread is re-used (like it is in Flume), further executions work
with wrongly initialized SessionState object (HDFS dirs are non-existent). In Flume, this
happens to me when Flume should create partition if not exists (but the code doing this is
in Hive Streaming).
> Steps to reproduce:
> 0. create test spooldir and allow flume to write to it, in my case /home/ubuntu/flume_test,
775, ubuntu:flume
> 1. create Flume config (see attachment)
> 2. create Hive table
> {code}
> create table default.flume_test (column1 string, column2 string) partitioned by (dt string)
clustered by (column1) INTO 2 BUCKETS STORED AS ORC;
> {code}
> 3. start flume agent:
> {code}
> bin/flume-ng agent -n a1 -c conf -f conf/flume-config.txt
> {code}
> 4. hdfs dfs -chmod 600 /tmp/hive
> 5. put this file into spooldir:
> {code}
> echo value1,value2 > file1
> {code}
> Expected behavior:
> Exception regarding scratch dir permissions to be thrown repeatedly.
> example (note that the line numbers are wrong as Cloudera is cloning the source codes
here https://github.com/cloudera/flume-ng/ and here https://github.com/cloudera/hive):
> {code}
> 2017-01-18 12:39:38,926 WARN org.apache.flume.sink.hive.HiveSink: sink_hive_1 : Failed
connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', table='flume_test',
partitionVals=[20170118] }
> org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint
{metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', table='flume_test', partitionVals=[20170118]
} 
>         at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
>         at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:344)
>         at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
>         at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
>         at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting
to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', table='flume_test',
partitionVals=[20170118] }
>         at org.apache.flume.sink.hive.HiveWriter.newConnection(HiveWriter.java:380)
>         at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:86)
>         ... 6 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir:
/tmp/hive on HDFS should be writable. Current permissions are: rw-------
>         at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:540)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:358)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
>         at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
>         at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:373)
>         at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         ... 1 more
> Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should
be writable. Current permissions are: rw-------
>         at org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:625)
>         at org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:574)
>         at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:518)
>         ... 13 more
> {code}
> Actual behavior:
> Exception regarding scratch dir permissions thrown once, meaningless exceptions from
code, which should be unreachable, are re-thrown again and again, obfuscating the
> source of the problem to the user.
> exceptions thrown repeatedly:
> {code}
> java.lang.NullPointerException: Non-local session path expected to be non-null
>         at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>         at org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:686)
>         at org.apache.hadoop.hive.ql.Context.<init>(Context.java:131)
>         at org.apache.hadoop.hive.ql.Context.<init>(Context.java:118)
>         at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:411)
>         at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:312)
>         at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1201)
>         at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1296)
>         at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1127)
>         at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1115)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.runDDL(HiveEndPoint.java:404)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:369)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
>         at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
>         at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:373)
>         at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> {code}
> 2017-01-18 12:39:44,453 WARN org.apache.flume.sink.hive.HiveSink: sink_hive_1 : Failed
connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', table='flume_test',
partitionVals=[20170118] }
> org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint
{metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', table='flume_test', partitionVals=[20170118]
}
>         at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
>         at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:344)
>         at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
>         at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
>         at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hive.hcatalog.streaming.StreamingException: partition values=[20170118].
Unable to get path for end point: [20170118]
>         at org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:162)
>         at org.apache.hive.hcatalog.streaming.AbstractRecordWriter.<init>(AbstractRecordWriter.java:66)
>         at org.apache.hive.hcatalog.streaming.DelimitedInputWriter.<init>(DelimitedInputWriter.java:115)
>         at org.apache.flume.sink.hive.HiveDelimitedTextSerializer.createRecordWriter(HiveDelimitedTextSerializer.java:67)
>         at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:89)
>         ... 6 more
> Caused by: NoSuchObjectException(message:partition values=[20170118])
>         at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60283)
>         at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60251)
>         at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result.read(ThriftHiveMetastore.java:60182)
>         at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
>         at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partition(ThriftHiveMetastore.java:1892)
>         at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partition(ThriftHiveMetastore.java:1877)
>         at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getPartition(HiveMetaStoreClient.java:1171)
>         at org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:157)
>         ... 10 more
> {code}
> Detailed description on whats going on:
> Flume, as the Hive Streaming client, does the streaming in the HiveSink class, main part
is done on line
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L253
> where one "Batch" is drained (batch in sense of flume batch of incoming messages from
channel).
> Main for loop for batch drain is:
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L282
> Flume creates hive endpoint for each line it tries to insert into Hive (https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L290),
not very effective, but, the .equals in HiveEndPoint is properly written, so everything works.
> Then, it creates the helper HiveWriter (https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L295),
which
> is cached - one for each HiveEndPoint, if no HiveWriter for endpoint exists, it is created
on line
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L343
> Inspecting the constructor of HiveWriter, brings us to creating new connection to Hive
using the Streaming API:
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L86
> The connection is created in a separate thread:
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L376
> as the submitted Future (https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L425)
> into the thread pool callTimeoutPool (the pool comes from HiveWriter https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L493
and is of constant size 1, which seems like Flume is using 1 thread per Hive Sink to talk
with Hive.
> When creating newConnection (https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L379),
> with the request of autoCreatePartitions=true, the HiveEndPoint, the entry point to Hive
Streaming is called : https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L105
> As I was testing non-authenticated, it boils to https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L192
> and finally to https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L215
> Constructor for inner private class ConnectionImpl then tries to create partition if
it not exists, on the line https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L318
> And the trouble starts in method createPartitionIfNotExists on line
> https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L455
> as the SessionState.get() returns null - we did not started the session yet, we try to
create a new one.
> In SessionState.start() first thing done is registering the object itself as the threadlocal
variable:
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L526
> Thereafter, the directories (scratchdir and subdirs) are tried to be created:
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L548
> but if this fails, the RuntimeException (from https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L619
and https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L677)
is not caught in the catch blocks (nor there is any finally block).
> So basically, SessionState.start() has failed with proper initialization (e.g. HDFS dirs
are not created, nor is the SessionState.hdfsSessionPath set to non-null) and yet the execution
continues.
> With RuntimeException thrown from .start() method, the caller (HiveEndPoint) propagates
the exception back to the HiveWriter https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L379
> The exception is caught https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L442
but handled only as do logging and go on: https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L456
> This is the moment this exception is logged:
> {code}
> Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS should
be writable. Current permissions are: rw-------
>         at org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:625)
>         at org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:574)
>         at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:518)
>         ... 13 more
> {code}
> What happens next? Flume re-runs the delivery, calling HiveSink.process, boiling into
newConnection again. But Flume uses the SAME and exact one thread it used before to do this.
> This time, the if clause: https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L454
> returns true, as the SessionState.get() return the incorrectly initialized SessionState
from previous attempt.
> Then, it goes into https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L466
and down to the 
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L738
which fails on null value of hdfsSessionPath in SessionState.
> But this RuntimeException (NullPointerException) is not caught by https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L470
and so it is logged:
> {code}
> 2017-01-18 12:39:44,194 ERROR org.apache.hadoop.hive.ql.Driver: FAILED: NullPointerException
Non-local session path expected to be non-null
> java.lang.NullPointerException: Non-local session path expected to be non-null
>         at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>         at org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:686)
>         at org.apache.hadoop.hive.ql.Context.<init>(Context.java:131)
>         at org.apache.hadoop.hive.ql.Context.<init>(Context.java:118)
>         at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:411)
>         at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:312)
>         at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1201)
>         at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1296)
>         at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1127)
>         at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1115)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.runDDL(HiveEndPoint.java:404)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:369)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
>         at org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
>         at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
>         at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:373)
>         at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> Sometimes, Flume manages to run through the https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L86
as the newConnection is created in separate thread, the Flume rushes into https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L89
creating another meaningless exception:
> {code}
> 2017-01-18 12:39:44,453 WARN org.apache.flume.sink.hive.HiveSink: sink_hive_1 : Failed
connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', table='flume_test',
partitionVals=[20170118] }
> org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint
{metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', table='flume_test', partitionVals=[20170118]
}
>         at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
>         at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:344)
>         at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
>         at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
>         at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hive.hcatalog.streaming.StreamingException: partition values=[20170118].
Unable to get path for end point: [20170118]
>         at org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:162)
>         at org.apache.hive.hcatalog.streaming.AbstractRecordWriter.<init>(AbstractRecordWriter.java:66)
>         at org.apache.hive.hcatalog.streaming.DelimitedInputWriter.<init>(DelimitedInputWriter.java:115)
>         at org.apache.flume.sink.hive.HiveDelimitedTextSerializer.createRecordWriter(HiveDelimitedTextSerializer.java:67)
>         at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:89)
>         ... 6 more
> Caused by: NoSuchObjectException(message:partition values=[20170118])
>         at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60283)
>         at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60251)
>         at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result.read(ThriftHiveMetastore.java:60182)
>         at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
>         at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partition(ThriftHiveMetastore.java:1892)
>         at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partition(ThriftHiveMetastore.java:1877)
>         at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getPartition(HiveMetaStoreClient.java:1171)
>         at org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:157)
>         ... 10 more
> {code}
> Proposing solution:
> If Hive Streaming API is allowed to be used with same thread again (which probably is),
then the threadlocal set in 
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L526
> has to be unset in case of any exception in proceeding blocks:
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L539
> so set the thread local back to null before rethrowing exceptions here:
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L568
> and here:
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L602
> Links to source codes are from latest version, although I have been doing testing on
Hive 1.1.0. From code, it seems like
> bug has to be present also in recent versions.
> If Hive Streaming API is not allowed to be called by reusing threads, then not only Flume,
but probably also NiFi client (https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveWriter.java#L237)
has to be fixed (well, NiFi just copy&pasted the Flume codebase, is there any other copy
of this HiveWriter out there?).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message