impala-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bharath Vissapragada (Code Review)" <>
Subject [Impala-ASF-CR] IMPALA-5429: Multi threaded block metadata loading
Date Mon, 16 Oct 2017 18:32:55 GMT
Bharath Vissapragada has posted comments on this change. (

Change subject: IMPALA-5429: Multi threaded block metadata loading

Patch Set 5:

Commit Message:
PS4, Line 59: 
> I see. So are these literally as simple as, to pick the first one, a single
That is correct.
File be/src/catalog/
PS5, Line 35: DEFINE_int32(num_metadata_loading_threads, 16,
            :     "(Advanced) The number of metadata loading threads (degree of parallelism)
to use "
            :     "when loading catalog metadata.");
> I'm confused by the commit message which talks about not loading from hms u
Right, this flag controls the number parallel *table* loads. And each table loading involves

- Loading HMS metadata (including schema details, partition information etc.)
- Loading/synthesizing block metadata

So this flag is about inter-table parallelism and the commit message talks about intra-table
parallel loading. Does that make sense? (I clarified the content in the commit message a little
to reflect this, incase that helps).
PS5, Line 42: DEFINE_int32(max_s3_parts_parallel_load, 10,
> I would be more aggressive with this parameter and put it at 20.
File fe/src/main/java/org/apache/impala/catalog/
PS5, Line 215:   // Block metadata loading stats for a single HDFS path.
> nit: File/block (since we're also loading/refreshing files). Also, you may 
PS5, Line 217: loadedFiles_
> You may want to add a comment here. What is loaded vs refreshed? Is the one
PS5, Line 218: _
> I believe the convention is that we don't use '_' for public members.
Done. Was not aware of this.
PS5, Line 217: public int loadedFiles_ = 0;
             :     public int refreshedFiles_ = 0;
             :     public int ignoredFiles_ = 0;
> add comments for these-- see the question regarding refreshedFiles below, f
Yep, the current variables are confusing for sure, redid them and added comments.
PS5, Line 231: Runnable
> I don't know how this is used later on, but alternatively you can make Path
I settled for Runnable because it looked more cleaner to me with this class exposing the debugString()
method and only those callers who need the return value can access that method. I'm fine either
way. I switched to Callable.
PS5, Line 247: Blocks on the loadBlockMetadata() call. 
> Not following this comment. run() either calls refreshBlockMetadata() or lo
The comment was stale. Updated it.
PS5, Line 333: loadBlockMetadata
> I know I am guilty for some of these names but maybe rename this to "resetA
PS5, Line 363: numUnknownDiskIds
> Are you overriding or incrementing the value of numUnknownDiskIds in the cr
Incrementing and will use the final value in L369. Tracking the create call which eventually
calls HdfsPartition.createDiskIds(), it only increments and doesn't overwrite.
PS5, Line 368: for (HdfsPartition partition: partitions) partition.setFileDescriptors(
> Am I misreading this or does each partition get set to the same list of new
That is right. All the HdfsPartitions in 'partitions' map to the same partDir. So we update
the fileDescs for each of them.
PS5, Line 368: newFileDescs
> Hm, that doesn't seem particularly safe (i.e. using the same list for every
Like we discussed, there is a subtle bug here (even without the patch). We can see the inconsistency
with the following steps

- create table test(a int) partitioned by (b int);
- insert into test partition(b=1) values (1);
- alter table test add partition (b=2) location 'hdfs://localhost:20500/test-warehouse/test/b=1/'
-- Point b=2 to the same location as b=1

[localhost:21000] > show partitions test;
Query: show partitions test
| b     | #Rows | #Files | Size | Bytes Cached | Cache Replication | Format | Incremental
stats | Location                                       |
| 1     | -1    | 1      | 2B   | NOT CACHED   | NOT CACHED        | TEXT   | false      
      | hdfs://localhost:20500/test-warehouse/test/b=1 |
| 2     | -1    | 1      | 2B   | NOT CACHED   | NOT CACHED        | TEXT   | false      
      | hdfs://localhost:20500/test-warehouse/test/b=1 |
| Total | -1    | 2      | 4B   | 0B           |                   |        |            
      |                                                |

insert into test partition(b=1) values (2);
show files in test;
Query: show files in test
| Path                                                                                   
           | Size | Partition |
| hdfs://localhost:20500/test-warehouse/test/b=1/2e44cd49e8c3d30d-572fc97800000000_627280230_data.0.
| 2B   | b=1       |
| hdfs://localhost:20500/test-warehouse/test/b=1/e44245ad5c0ef020-a08716d00000000_1244237483_data.0.
| 2B   | b=1       |
| hdfs://localhost:20500/test-warehouse/test/b=1/e44245ad5c0ef020-a08716d00000000_1244237483_data.0.
| 2B   | b=2       |

invalidate metadata test;
 show files in test;
Query: show files in test
| Path                                                                                   
           | Size | Partition |
| hdfs://localhost:20500/test-warehouse/test/b=1/2e44cd49e8c3d30d-572fc97800000000_627280230_data.0.
| 2B   | b=1       |
| hdfs://localhost:20500/test-warehouse/test/b=1/e44245ad5c0ef020-a08716d00000000_1244237483_data.0.
| 2B   | b=1       |
| hdfs://localhost:20500/test-warehouse/test/b=1/2e44cd49e8c3d30d-572fc97800000000_627280230_data.0.
| 2B   | b=2       |
| hdfs://localhost:20500/test-warehouse/test/b=1/e44245ad5c0ef020-a08716d00000000_1244237483_data.0.
| 2B   | b=2       |

Bottom line: 'show files' shows different files before and after invalidate.

Ideally we can fix it in this patch, but as we discussed I think we can do it as a separate
patch after discussing with the community.
PS5, Line 368: for (HdfsPartition partition: partitions) partition.setFileDescriptors(
> Just clarified this so I'll post my misunderstanding here. The comment stat
Yea, s/correspond/map.
PS5, Line 380: changed
             :    * mtime
> It's not just the changed mtime that we're looking for in order to determin
Reworded this a little with a reference to hasFileChanged(). LMK if it looks ok.
PS5, Line 383: The initial table load still uses the listFiles()
             :    * on the data directory that fetches both the FileStatus as well as BlockLocations
             :    * a single call.
> remove
PS5, Line 398: get(0)
> Comment why we take the file descriptors of one partition and that it doesn
PS5, Line 399: public String apply(FileDescriptor desc) {
             :               return desc.getFileName();
             :             }
> Add @Override and make it a single line (if it fits)
PS5, Line 426: new Reference<Long>(Long.valueOf(0)
> why not use numUnknownDiskIds here?
PS5, Line 431: ++loadStats.refreshedFiles_;
> does refreshedFiles mean "file blocks reloaded" or "file checked for reload
Yea the whole thing about refreshedFiles is confusing and it is not needed I guess. We can
just have loaded/ignored files. In a typical loadFromScrach/invalidate, loadedFiles >>
ignoredFiles and during refresh loadedFiles << ignoredFiles. ignoredFiles now account
for the files that were checked for reload but weren't reloaded since they haven't changed.
IMO that is more clear.
PS5, Line 433: for (HdfsPartition partition: partitions) partition.setFileDescriptors(n
> same question as in the load method.
Answered above.
PS5, Line 448: (fd == null) || (fd.getFileLength() != status.getLen()) ||
             :       (fd.getModificationTime() != status.getModificationTime());
> I think we were also checking if the partition was cached. Isn't this check
Couldn't think of why we need to force reload block md for cached partitions. Hence removed
it. Let me know if it was included for a specific reason.
PS5, Line 455: refreshFileMetadata
> Maybe call it refreshPartitionStorageMetadata? Overall, it may make sense t
PartitionStorageMetadata sounds better, I changed it at multiple places. Let me know if it
looks better.
PS5, Line 694: Exception
> Why this change?
Stale change, forgot to undo. Done
PS5, Line 773: HDFS and S3
> just to clarify, HdfsTable covers both hdfs table metadata as well as metad
Correct. We use the same hdfs client library to access both of them.
PS5, Line 783: getFileSystem(CONF)
> I noticed that this is called in many places in this class-- is it bc a giv
Correct. This is one of the overheads as noticed in the perf runs and unfortunately we can't
workaround it
PS5, Line 787:     int threadPoolSize = FileSystemUtil.supportsStorageIds(tableFs) ?
> What is the expected behavior for tables with mixed FSs?
Just implemented this as a first cut. I don't think it makes sense to loop through all partitions
in this call, instead we need to maintain counters of hdfs and non-hdfs partitions and keep
them updated in adds/drops. Once we have those numbers we can do some fancy things. May be
we can do it as a follow up patch? I can raise a new jira for it and link it to this one
PS5, Line 801: getLoadingThreadPoolSize
> can different partitions have different number of files? if so, work across
Yes, each partition can have its own no. of files, so the work definitely varies across threads.
It is roughly proportional to the no. of blocks for all the files in that partition directory.
Not sure if that answers your question.
PS5, Line 801: getLoadingThreadPoolSize
> Parallelization of metadata loading is done on per partition granularity.
PS5, Line 805: ExecutorService partitionLoadingPool = Executors.newFixedThreadPool(threadPoolSize);
> What is the benefit of having a one thread pool per loaded table instead of
I'm summarizing what we discussed so that others following this can comment.

I don't think the overhead is too much, especially given we just start 5/10 threads. While
I agree that having a global thread pool (across all the tables) can give us more control
of the resource consumption/queue bounds, I think it is more complicated and involves writing
proper scheduler.
PS5, Line 821: debugString
> This will get information from loadingStats but how do you know that loadin
PS5, Line 1237: HashMap
> nit: Map
PS5, Line 1303: Returns the HdfsPartition objects associated with the specified list of partition
              :    * names.
> Update comment.
File fe/src/main/java/org/apache/impala/service/
PS5, Line 626: private Table addHdfsPartitions(Table tbl, List<Partition> partitions)
             :       throws CatalogException {
             :     Preconditions.checkNotNull(tbl);
             :     Preconditions.checkNotNull(partitions);
             :     if (!(tbl instanceof HdfsTable)) {
             :       throw new CatalogException("Table " + tbl.getFullName() + " is not an
HDFS table");
             :     }
             :     HdfsTable hdfsTable = (HdfsTable) tbl;
             :     List<HdfsPartition> hdfsPartitions = hdfsTable.createAndLoadPartitions(partitions);
             :     for (HdfsPartition hdfsPartition: hdfsPartitions) {
             :       catalog_.addPartition(hdfsPartition);
             :     }
             :     return hdfsTable;
             :   }
> Lot's of duplication with the previous one. How expensive would it be to si
File fe/src/main/java/org/apache/impala/util/
PS5, Line 59: synchronized (list_) {
> Why synchronized on list_? This confuses the locking scheme. If you want yo
PS5, Line 78: list_ = Collections.synchronizedList(list);
> Hm, I think this new implementation is way too complicated. I'd start with 
Not totally sure I follow this. Do you mean this list_ need not be synchronized?

To view, visit
To unsubscribe, visit

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I07eaa7151dfc4d56da8db8c2654bd65d8f808481
Gerrit-Change-Number: 8235
Gerrit-PatchSet: 5
Gerrit-Owner: Bharath Vissapragada <>
Gerrit-Reviewer: Bharath Vissapragada <>
Gerrit-Reviewer: Dimitris Tsirogiannis <>
Gerrit-Reviewer: Jim Apple <>
Gerrit-Reviewer: Mostafa Mokhtar <>
Gerrit-Reviewer: Vuk Ercegovac <>
Gerrit-Comment-Date: Mon, 16 Oct 2017 18:32:55 +0000
Gerrit-HasComments: Yes

  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message