impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject incubator-impala git commit: IMPALA-5412: Fix scan result with partitions on same file
Date Mon, 14 Aug 2017 03:36:29 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master a99114283 -> 24a6e53d1


IMPALA-5412: Fix scan result with partitions on same file

The maps storing file descriptors and file metadata were using
filename as a key. Multiple partitions pointing to the same
filesystem location resulted that these map entries were
occasionally overwritted by the other partition poing to
the same.

As a solution the map key was enhanced to contain a pair of
partition ID and file name.

Change-Id: Ie74b305377248045c0d87b911943e1cabb7223e9
Reviewed-on: http://gerrit.cloudera.org:8080/7625
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Tim Armstrong <tarmstrong@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/24a6e53d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/24a6e53d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/24a6e53d

Branch: refs/heads/master
Commit: 24a6e53d141cfcea158607942983e62ef5172ca6
Parents: a991142
Author: Gabor Kaszab <gaborkaszab@cloudera.com>
Authored: Mon Aug 7 18:27:27 2017 +0200
Committer: Tim Armstrong <tarmstrong@cloudera.com>
Committed: Mon Aug 14 03:35:21 2017 +0000

----------------------------------------------------------------------
 be/src/exec/base-sequence-scanner.cc      | 11 ++++--
 be/src/exec/hdfs-parquet-scanner.cc       | 13 ++++---
 be/src/exec/hdfs-scan-node-base.cc        | 28 ++++++++------
 be/src/exec/hdfs-scan-node-base.h         | 36 ++++++++++--------
 be/src/exec/hdfs-text-scanner.cc          |  3 +-
 be/src/exec/scanner-context.cc            |  5 ++-
 be/src/util/container-util.h              | 10 +++++
 tests/metadata/test_partition_metadata.py | 51 +++++++++++++++++++++-----
 8 files changed, 110 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24a6e53d/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 25fed0b..a522a1f 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -86,7 +86,8 @@ Status BaseSequenceScanner::Open(ScannerContext* context) {
       scan_node_->runtime_profile(), "BytesSkipped", TUnit::BYTES);
 
   header_ = reinterpret_cast<FileHeader*>(
-      scan_node_->GetFileMetadata(stream_->filename()));
+      scan_node_->GetFileMetadata(
+          context->partition_descriptor()->id(), stream_->filename()));
   if (header_ == nullptr) {
     only_parsing_header_ = true;
     return Status::OK();
@@ -157,8 +158,9 @@ Status BaseSequenceScanner::GetNextInternal(RowBatch* row_batch) {
     }
     // Header is parsed, set the metadata in the scan node and issue more ranges.
     static_cast<HdfsScanNodeBase*>(scan_node_)->SetFileMetadata(
-        stream_->filename(), header_);
-    HdfsFileDesc* desc = scan_node_->GetFileDesc(stream_->filename());
+        context_->partition_descriptor()->id(), stream_->filename(), header_);
+    HdfsFileDesc* desc = scan_node_->GetFileDesc(
+        context_->partition_descriptor()->id(), stream_->filename());
     RETURN_IF_ERROR(scan_node_->AddDiskIoRanges(desc));
     return Status::OK();
   }
@@ -303,7 +305,8 @@ Status BaseSequenceScanner::SkipToSync(const uint8_t* sync, int sync_size)
{
 
 void BaseSequenceScanner::CloseFileRanges(const char* filename) {
   DCHECK(only_parsing_header_);
-  HdfsFileDesc* desc = scan_node_->GetFileDesc(filename);
+  HdfsFileDesc* desc = scan_node_->GetFileDesc(
+      context_->partition_descriptor()->id(), filename);
   const vector<DiskIoMgr::ScanRange*>& splits = desc->splits;
   for (int i = 0; i < splits.size(); ++i) {
     COUNTER_ADD(bytes_skipped_counter_, splits[i]->len());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24a6e53d/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index e97572c..ecf14f8 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -609,7 +609,8 @@ Status HdfsParquetScanner::NextRowGroup() {
   int64_t split_offset = split_range->offset();
   int64_t split_length = split_range->len();
 
-  HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename());
+  HdfsFileDesc* file_desc = scan_node_->GetFileDesc(
+      context_->partition_descriptor()->id(), filename());
 
   bool start_with_first_row_group = row_group_idx_ == -1;
   bool misaligned_row_group_skipped = false;
@@ -1361,7 +1362,8 @@ Status HdfsParquetScanner::ProcessFooter() {
     // In this case, the metadata is bigger than our guess meaning there are
     // not enough bytes in the footer range from IssueInitialRanges().
     // We'll just issue more ranges to the IoMgr that is the actual footer.
-    const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename());
+    int64_t partition_id = context_->partition_descriptor()->id();
+    const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename());
     DCHECK(file_desc != NULL);
     // The start of the metadata is:
     // file_length - 4-byte metadata size - footer-size - metadata size
@@ -1383,7 +1385,7 @@ Status HdfsParquetScanner::ProcessFooter() {
 
     // Read the header into the metadata buffer.
     DiskIoMgr::ScanRange* metadata_range = scan_node_->AllocateScanRange(
-        metadata_range_->fs(), filename(), metadata_size, metadata_start, -1,
+        metadata_range_->fs(), filename(), metadata_size, metadata_start, partition_id,
         metadata_range_->disk_id(), metadata_range_->expected_local(),
         DiskIoMgr::BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size));
 
@@ -1585,7 +1587,8 @@ Status HdfsParquetScanner::CreateCountingReader(const SchemaPath&
parent_path,
 
 Status HdfsParquetScanner::InitColumns(
     int row_group_idx, const vector<ParquetColumnReader*>& column_readers) {
-  const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename());
+  int64_t partition_id = context_->partition_descriptor()->id();
+  const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename());
   DCHECK(file_desc != NULL);
   parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx];
 
@@ -1665,7 +1668,7 @@ Status HdfsParquetScanner::InitColumns(
         && col_start >= split_range->offset()
         && col_end <= split_range->offset() + split_range->len();
     DiskIoMgr::ScanRange* col_range = scan_node_->AllocateScanRange(metadata_range_->fs(),
-        filename(), col_len, col_start, scalar_reader->col_idx(), split_range->disk_id(),
+        filename(), col_len, col_start, partition_id, split_range->disk_id(),
         col_range_local,
         DiskIoMgr::BufferOpts(split_range->try_cache(), file_desc->mtime));
     col_ranges.push_back(col_range);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24a6e53d/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 0e5e974..ca71201 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -263,12 +263,13 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
     file_path.append(split.file_name, filesystem::path::codecvt());
     const string& native_file_path = file_path.native();
 
+    auto file_desc_map_key = make_pair(partition_desc->id(), native_file_path);
     HdfsFileDesc* file_desc = NULL;
-    FileDescMap::iterator file_desc_it = file_descs_.find(native_file_path);
+    FileDescMap::iterator file_desc_it = file_descs_.find(file_desc_map_key);
     if (file_desc_it == file_descs_.end()) {
       // Add new file_desc to file_descs_ and per_type_files_
       file_desc = runtime_state_->obj_pool()->Add(new HdfsFileDesc(native_file_path));
-      file_descs_[native_file_path] = file_desc;
+      file_descs_[file_desc_map_key] = file_desc;
       file_desc->file_length = split.file_length;
       file_desc->mtime = split.mtime;
       file_desc->file_compression = split.file_compression;
@@ -603,7 +604,7 @@ DiskIoMgr::ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const
char*
   // beyond the end of the file).
   DCHECK_GE(offset, 0);
   DCHECK_GE(len, 0);
-  DCHECK_LE(offset + len, GetFileDesc(file)->file_length)
+  DCHECK_LE(offset + len, GetFileDesc(partition_id, file)->file_length)
       << "Scan range beyond end of file (offset=" << offset << ", len="
<< len << ")";
   disk_id = runtime_state_->io_mgr()->AssignQueue(file, disk_id, expected_local);
 
@@ -630,20 +631,25 @@ Status HdfsScanNodeBase::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>&
ra
   return Status::OK();
 }
 
-HdfsFileDesc* HdfsScanNodeBase::GetFileDesc(const string& filename) {
-  DCHECK(file_descs_.find(filename) != file_descs_.end());
-  return file_descs_[filename];
+HdfsFileDesc* HdfsScanNodeBase::GetFileDesc(int64_t partition_id, const string& filename)
{
+  auto file_desc_map_key = make_pair(partition_id, filename);
+  DCHECK(file_descs_.find(file_desc_map_key) != file_descs_.end());
+  return file_descs_[file_desc_map_key];
 }
 
-void HdfsScanNodeBase::SetFileMetadata(const string& filename, void* metadata) {
+void HdfsScanNodeBase::SetFileMetadata(
+    int64_t partition_id, const string& filename, void* metadata) {
   unique_lock<mutex> l(metadata_lock_);
-  DCHECK(per_file_metadata_.find(filename) == per_file_metadata_.end());
-  per_file_metadata_[filename] = metadata;
+  auto file_metadata_map_key = make_pair(partition_id, filename);
+  DCHECK(per_file_metadata_.find(file_metadata_map_key) == per_file_metadata_.end());
+  per_file_metadata_[file_metadata_map_key] = metadata;
 }
 
-void* HdfsScanNodeBase::GetFileMetadata(const string& filename) {
+void* HdfsScanNodeBase::GetFileMetadata(
+    int64_t partition_id, const string& filename) {
   unique_lock<mutex> l(metadata_lock_);
-  auto it = per_file_metadata_.find(filename);
+  auto file_metadata_map_key = make_pair(partition_id, filename);
+  auto it = per_file_metadata_.find(file_metadata_map_key);
   if (it == per_file_metadata_.end()) return NULL;
   return it->second;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24a6e53d/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index c79a6e8..e1c431f 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -34,6 +34,7 @@
 #include "util/avro-util.h"
 #include "util/progress-updater.h"
 #include "util/spinlock.h"
+#include "util/container-util.h"
 
 namespace impala {
 
@@ -196,10 +197,10 @@ class HdfsScanNodeBase : public ScanNode {
 
   /// Allocate a new scan range object, stored in the runtime state's object pool. For
   /// scan ranges that correspond to the original hdfs splits, the partition id must be
-  /// set to the range's partition id. For other ranges (e.g. columns in parquet, read
-  /// past buffers), the partition_id is unused. expected_local should be true if this
-  /// scan range is not expected to require a remote read. The range must fall within
-  /// the file bounds. That is, the offset must be >= 0, and offset + len <= file_length.
+  /// set to the range's partition id. Partition_id is mandatory as it is used to gather
+  /// file descriptor info. expected_local should be true if this scan range is not
+  /// expected to require a remote read. The range must fall within the file bounds.
+  /// That is, the offset must be >= 0, and offset + len <= file_length.
   /// If not NULL, the 'original_split' pointer is stored for reference in the scan range
   /// metadata of the scan range that is to be allocated.
   /// This is thread safe.
@@ -233,16 +234,17 @@ class HdfsScanNodeBase : public ScanNode {
   Tuple* InitTemplateTuple(const std::vector<ScalarExprEvaluator*>& value_evals,
       MemPool* pool, RuntimeState* state) const;
 
-  /// Returns the file desc for 'filename'.  Returns nullptr if filename is invalid.
-  HdfsFileDesc* GetFileDesc(const std::string& filename);
+  /// Given a partition_id and filename returns the related file descriptor
+  /// DCHECK ensures there is always file descriptor returned
+  HdfsFileDesc* GetFileDesc(int64_t partition_id, const std::string& filename);
 
-  /// Sets the scanner specific metadata for 'filename'. Scanners can use this to store
-  /// file header information. Thread safe.
-  void SetFileMetadata(const std::string& filename, void* metadata);
+  /// Sets the scanner specific metadata for 'partition_id' and 'filename'.
+  /// Scanners can use this to store file header information. Thread safe.
+  void SetFileMetadata(int64_t partition_id, const std::string& filename, void* metadata);
 
-  /// Returns the scanner specific metadata for 'filename'. Returns nullptr if there is
-  /// no metadata. Thread safe.
-  void* GetFileMetadata(const std::string& filename);
+  /// Returns the scanner specific metadata for 'partition_id' and 'filename'.
+  /// Returns nullptr if there is no metadata. Thread safe.
+  void* GetFileMetadata(int64_t partition_id, const std::string& filename);
 
   /// Called by scanners when a range is complete. Used to record progress.
   /// This *must* only be called after a scanner has completely finished its
@@ -356,8 +358,11 @@ class HdfsScanNodeBase : public ScanNode {
   /// Partitions scanned by this scan node.
   std::unordered_set<int64_t> partition_ids_;
 
-  /// File path => file descriptor (which includes the file's splits)
-  typedef std::unordered_map<std::string, HdfsFileDesc*> FileDescMap;
+  /// This is a pair for partition ID and filename
+  typedef pair<int64_t, std::string> PartitionFileKey;
+
+  /// partition_id, File path => file descriptor (which includes the file's splits)
+  typedef std::unordered_map<PartitionFileKey, HdfsFileDesc*, pair_hash> FileDescMap;
   FileDescMap file_descs_;
 
   /// File format => file descriptors.
@@ -366,9 +371,10 @@ class HdfsScanNodeBase : public ScanNode {
   FileFormatsMap per_type_files_;
 
   /// Scanner specific per file metadata (e.g. header information) and associated lock.
+  /// Key of the map is partition_id, filename pair
   /// TODO: Remove this lock when removing the legacy scanners and scan nodes.
   boost::mutex metadata_lock_;
-  std::unordered_map<std::string, void*> per_file_metadata_;
+  std::unordered_map<PartitionFileKey, void*, pair_hash> per_file_metadata_;
 
   /// Conjuncts for each materialized tuple (top-level row batch tuples and collection
   /// item tuples). Includes a copy of ExecNode.conjuncts_.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24a6e53d/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 378b45e..5962f94 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -585,7 +585,8 @@ Status HdfsTextScanner::FillByteBufferCompressedFile(bool* eosr) {
   // For other compressed text: attempt to read and decompress the entire file, point
   // to the decompressed buffer, and then continue normal processing.
   DCHECK(decompression_type_ != THdfsCompression::SNAPPY);
-  HdfsFileDesc* desc = scan_node_->GetFileDesc(stream_->filename());
+  HdfsFileDesc* desc = scan_node_->GetFileDesc(
+      context_->partition_descriptor()->id(), stream_->filename());
   int64_t file_size = desc->file_length;
   DCHECK_GT(file_size, 0);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24a6e53d/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 7a998b3..b1577a1 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -77,7 +77,7 @@ ScannerContext::Stream::Stream(ScannerContext* parent)
 ScannerContext::Stream* ScannerContext::AddStream(DiskIoMgr::ScanRange* range) {
   std::unique_ptr<Stream> stream(new Stream(this));
   stream->scan_range_ = range;
-  stream->file_desc_ = scan_node_->GetFileDesc(stream->filename());
+  stream->file_desc_ = scan_node_->GetFileDesc(partition_desc_->id(), stream->filename());
   stream->file_len_ = stream->file_desc_->file_length;
   stream->total_bytes_returned_ = 0;
   stream->io_buffer_pos_ = NULL;
@@ -175,8 +175,9 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
       // TODO: We are leaving io_buffer_ = NULL, revisit.
       return Status::OK();
     }
+    int64_t partition_id = parent_->partition_descriptor()->id();
     DiskIoMgr::ScanRange* range = parent_->scan_node_->AllocateScanRange(
-        scan_range_->fs(), filename(), read_past_buffer_size, offset, -1,
+        scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id,
         scan_range_->disk_id(), false, DiskIoMgr::BufferOpts::Uncached());
     RETURN_IF_ERROR(parent_->state_->io_mgr()->Read(
         parent_->scan_node_->reader_context(), range, &io_buffer_));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24a6e53d/be/src/util/container-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/container-util.h b/be/src/util/container-util.h
index 80ec984..5cd2d0c 100644
--- a/be/src/util/container-util.h
+++ b/be/src/util/container-util.h
@@ -64,6 +64,16 @@ struct TNetworkAddressPtrEquals : public std::unary_function<TNetworkAddress*,
b
 };
 
 
+struct pair_hash {
+  template <class T1, class T2>
+  std::size_t operator () (const std::pair<T1, T2> &p) const {
+    size_t seed = 0;
+    boost::hash_combine(seed, std::hash<T1>{}(p.first));
+    boost::hash_combine(seed, std::hash<T2>{}(p.second));
+    return seed;
+  }
+};
+
 /// FindOrInsert(): if the key is present, return the value; if the key is not present,
 /// create a new entry (key, default_val) and return default_val.
 /// TODO: replace with single template which takes a template param

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/24a6e53d/tests/metadata/test_partition_metadata.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_partition_metadata.py b/tests/metadata/test_partition_metadata.py
index 0758fde..a6f635a 100644
--- a/tests/metadata/test_partition_metadata.py
+++ b/tests/metadata/test_partition_metadata.py
@@ -20,6 +20,11 @@ from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
 from tests.common.test_dimensions import create_single_exec_option_dimension
 from tests.util.filesystem_utils import WAREHOUSE
 
+# Map from the test dimension file_format string to the SQL "STORED AS"
+# argument.
+STORED_AS_ARGS = { 'text': 'textfile', 'parquet': 'parquet', 'avro': 'avro',
+    'seq': 'sequencefile' }
+
 # Tests specific to partition metadata.
 # TODO: Split up the DDL tests and move some of the partition-specific tests
 # here.
@@ -33,10 +38,15 @@ class TestPartitionMetadata(ImpalaTestSuite):
     super(TestPartitionMetadata, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
 
-    # There is no reason to run these tests using all dimensions.
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-        v.get_value('table_format').file_format == 'text' and\
-        v.get_value('table_format').compression_codec == 'none')
+    # Run one variation of the test with each file formats that we support writing.
+    # The compression shouldn't affect the partition handling so restrict to the core
+    # compression codecs.
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        (v.get_value('table_format').file_format in ('text', 'parquet') and
+         v.get_value('table_format').compression_codec == 'none') or
+        (v.get_value('table_format').file_format in ('seq', 'avro') and
+         v.get_value('table_format').compression_codec == 'snap' and
+         v.get_value('table_format').compression_type == 'block'))
 
   @SkipIfLocal.hdfs_client
   def test_multiple_partitions_same_location(self, vector, unique_database):
@@ -46,11 +56,13 @@ class TestPartitionMetadata(ImpalaTestSuite):
     TBL_NAME = "same_loc_test"
     FQ_TBL_NAME = unique_database + "." + TBL_NAME
     TBL_LOCATION = '%s/%s.db/%s' % (WAREHOUSE, unique_database, TBL_NAME)
+    file_format = vector.get_value('table_format').file_format
     # Cleanup any existing data in the table directory.
     self.filesystem_client.delete_file_dir(TBL_NAME, recursive=True)
     # Create the table
-    self.client.execute("create table %s (i int) partitioned by(j int) location '%s'"
-        % (FQ_TBL_NAME, TBL_LOCATION))
+    self.client.execute(
+        "create table %s (i int) partitioned by(j int) stored as %s location '%s'"
+        % (FQ_TBL_NAME, STORED_AS_ARGS[file_format], TBL_LOCATION))
 
     # Point multiple partitions to the same location and use partition locations that
     # do not contain a key=value path.
@@ -62,20 +74,41 @@ class TestPartitionMetadata(ImpalaTestSuite):
     self.client.execute("alter table %s add partition (j=2) location '%s/p'"
         % (FQ_TBL_NAME, TBL_LOCATION))
 
+    # Allow unsupported avro and sequence file writer.
+    self.client.execute("set allow_unsupported_formats=true")
+
     # Insert some data. This will only update partition j=1 (IMPALA-1480).
     self.client.execute("insert into table %s partition(j=1) select 1" % FQ_TBL_NAME)
-    # Refresh to update file metadata of both partitions.
+    # Refresh to update file metadata of both partitions
     self.client.execute("refresh %s" % FQ_TBL_NAME)
 
     # The data will be read twice because each partition points to the same location.
     data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME)
-    assert data.split('\t') == ['2', '3']
+    if file_format == 'avro':
+      # Avro writer is broken and produces nulls. Only check partition column.
+      assert data.split('\t')[1] == '3'
+    else:
+      assert data.split('\t') == ['2', '3']
 
     self.client.execute("insert into %s partition(j) select 1, 1" % FQ_TBL_NAME)
     self.client.execute("insert into %s partition(j) select 1, 2" % FQ_TBL_NAME)
     self.client.execute("refresh %s" % FQ_TBL_NAME)
     data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME)
-    assert data.split('\t') == ['6', '9']
+    if file_format == 'avro':
+      # Avro writer is broken and produces nulls. Only check partition column.
+      assert data.split('\t')[1] == '9'
+    else:
+      assert data.split('\t') == ['6', '9']
+
+    # Force all scan ranges to be on the same node. It should produce the same
+    # result as above. See IMPALA-5412.
+    self.client.execute("set num_nodes=1")
+    data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME)
+    if file_format == 'avro':
+      # Avro writer is broken and produces nulls. Only check partition column.
+      assert data.split('\t')[1] == '9'
+    else:
+      assert data.split('\t') == ['6', '9']
 
   @SkipIfS3.hive
   @SkipIfADLS.hive


Mime
View raw message