impala-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bharath Vissapragada (Code Review)" <ger...@cloudera.org>
Subject [Impala-ASF-CR] IMPALA-5429: Multi threaded block metadata loading
Date Mon, 16 Oct 2017 18:32:55 GMT
Bharath Vissapragada has posted comments on this change. ( http://gerrit.cloudera.org:8080/8235
)

Change subject: IMPALA-5429: Multi threaded block metadata loading
......................................................................


Patch Set 5:

(36 comments)

http://gerrit.cloudera.org:8080/#/c/8235/4//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/8235/4//COMMIT_MSG@59
PS4, Line 59: 
> I see. So are these literally as simple as, to pick the first one, a single
That is correct.


http://gerrit.cloudera.org:8080/#/c/8235/5/be/src/catalog/catalog.cc
File be/src/catalog/catalog.cc:

http://gerrit.cloudera.org:8080/#/c/8235/5/be/src/catalog/catalog.cc@35
PS5, Line 35: DEFINE_int32(num_metadata_loading_threads, 16,
            :     "(Advanced) The number of metadata loading threads (degree of parallelism)
to use "
            :     "when loading catalog metadata.");
> I'm confused by the commit message which talks about not loading from hms u
Right, this flag controls the number parallel *table* loads. And each table loading involves

- Loading HMS metadata (including schema details, partition information etc.)
- Loading/synthesizing block metadata

So this flag is about inter-table parallelism and the commit message talks about intra-table
parallel loading. Does that make sense? (I clarified the content in the commit message a little
to reflect this, incase that helps).


http://gerrit.cloudera.org:8080/#/c/8235/5/be/src/catalog/catalog.cc@42
PS5, Line 42: DEFINE_int32(max_s3_parts_parallel_load, 10,
> I would be more aggressive with this parameter and put it at 20.
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
File fe/src/main/java/org/apache/impala/catalog/HdfsTable.java:

http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@215
PS5, Line 215:   // Block metadata loading stats for a single HDFS path.
> nit: File/block (since we're also loading/refreshing files). Also, you may 
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@217
PS5, Line 217: loadedFiles_
> You may want to add a comment here. What is loaded vs refreshed? Is the one
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@218
PS5, Line 218: _
> I believe the convention is that we don't use '_' for public members.
Done. Was not aware of this.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@217
PS5, Line 217: public int loadedFiles_ = 0;
             :     public int refreshedFiles_ = 0;
             :     public int ignoredFiles_ = 0;
> add comments for these-- see the question regarding refreshedFiles below, f
Yep, the current variables are confusing for sure, redid them and added comments.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@231
PS5, Line 231: Runnable
> I don't know how this is used later on, but alternatively you can make Path
I settled for Runnable because it looked more cleaner to me with this class exposing the debugString()
method and only those callers who need the return value can access that method. I'm fine either
way. I switched to Callable.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@247
PS5, Line 247: Blocks on the loadBlockMetadata() call. 
> Not following this comment. run() either calls refreshBlockMetadata() or lo
The comment was stale. Updated it.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@333
PS5, Line 333: loadBlockMetadata
> I know I am guilty for some of these names but maybe rename this to "resetA
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@363
PS5, Line 363: numUnknownDiskIds
> Are you overriding or incrementing the value of numUnknownDiskIds in the cr
Incrementing and will use the final value in L369. Tracking the create call which eventually
calls HdfsPartition.createDiskIds(), it only increments and doesn't overwrite.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@368
PS5, Line 368: for (HdfsPartition partition: partitions) partition.setFileDescriptors(
> Am I misreading this or does each partition get set to the same list of new
That is right. All the HdfsPartitions in 'partitions' map to the same partDir. So we update
the fileDescs for each of them.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@368
PS5, Line 368: newFileDescs
> Hm, that doesn't seem particularly safe (i.e. using the same list for every
Like we discussed, there is a subtle bug here (even without the patch). We can see the inconsistency
with the following steps

- create table test(a int) partitioned by (b int);
- insert into test partition(b=1) values (1);
- alter table test add partition (b=2) location 'hdfs://localhost:20500/test-warehouse/test/b=1/'
-- Point b=2 to the same location as b=1

[localhost:21000] > show partitions test;
Query: show partitions test
+-------+-------+--------+------+--------------+-------------------+--------+-------------------+------------------------------------------------+
| b     | #Rows | #Files | Size | Bytes Cached | Cache Replication | Format | Incremental
stats | Location                                       |
+-------+-------+--------+------+--------------+-------------------+--------+-------------------+------------------------------------------------+
| 1     | -1    | 1      | 2B   | NOT CACHED   | NOT CACHED        | TEXT   | false      
      | hdfs://localhost:20500/test-warehouse/test/b=1 |
| 2     | -1    | 1      | 2B   | NOT CACHED   | NOT CACHED        | TEXT   | false      
      | hdfs://localhost:20500/test-warehouse/test/b=1 |
| Total | -1    | 2      | 4B   | 0B           |                   |        |            
      |                                                |
+-------+-------+--------+------+--------------+-------------------+--------+-------------------+------------------------------------------------+


insert into test partition(b=1) values (2);
show files in test;
Query: show files in test
+----------------------------------------------------------------------------------------------------+------+-----------+
| Path                                                                                   
           | Size | Partition |
+----------------------------------------------------------------------------------------------------+------+-----------+
| hdfs://localhost:20500/test-warehouse/test/b=1/2e44cd49e8c3d30d-572fc97800000000_627280230_data.0.
| 2B   | b=1       |
| hdfs://localhost:20500/test-warehouse/test/b=1/e44245ad5c0ef020-a08716d00000000_1244237483_data.0.
| 2B   | b=1       |
| hdfs://localhost:20500/test-warehouse/test/b=1/e44245ad5c0ef020-a08716d00000000_1244237483_data.0.
| 2B   | b=2       |
+----------------------------------------------------------------------------------------------------+------+-----------+

invalidate metadata test;
 show files in test;
Query: show files in test
+----------------------------------------------------------------------------------------------------+------+-----------+
| Path                                                                                   
           | Size | Partition |
+----------------------------------------------------------------------------------------------------+------+-----------+
| hdfs://localhost:20500/test-warehouse/test/b=1/2e44cd49e8c3d30d-572fc97800000000_627280230_data.0.
| 2B   | b=1       |
| hdfs://localhost:20500/test-warehouse/test/b=1/e44245ad5c0ef020-a08716d00000000_1244237483_data.0.
| 2B   | b=1       |
| hdfs://localhost:20500/test-warehouse/test/b=1/2e44cd49e8c3d30d-572fc97800000000_627280230_data.0.
| 2B   | b=2       |
| hdfs://localhost:20500/test-warehouse/test/b=1/e44245ad5c0ef020-a08716d00000000_1244237483_data.0.
| 2B   | b=2       |
+----------------------------------------------------------------------------------------------------+------+-----------+

Bottom line: 'show files' shows different files before and after invalidate.

Ideally we can fix it in this patch, but as we discussed I think we can do it as a separate
patch after discussing with the community.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@368
PS5, Line 368: for (HdfsPartition partition: partitions) partition.setFileDescriptors(
> Just clarified this so I'll post my misunderstanding here. The comment stat
Yea, s/correspond/map.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@380
PS5, Line 380: changed
             :    * mtime
> It's not just the changed mtime that we're looking for in order to determin
Reworded this a little with a reference to hasFileChanged(). LMK if it looks ok.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@383
PS5, Line 383: The initial table load still uses the listFiles()
             :    * on the data directory that fetches both the FileStatus as well as BlockLocations
in
             :    * a single call.
> remove
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@398
PS5, Line 398: get(0)
> Comment why we take the file descriptors of one partition and that it doesn
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@399
PS5, Line 399: public String apply(FileDescriptor desc) {
             :               return desc.getFileName();
             :             }
> Add @Override and make it a single line (if it fits)
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@426
PS5, Line 426: new Reference<Long>(Long.valueOf(0)
> why not use numUnknownDiskIds here?
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@431
PS5, Line 431: ++loadStats.refreshedFiles_;
> does refreshedFiles mean "file blocks reloaded" or "file checked for reload
Yea the whole thing about refreshedFiles is confusing and it is not needed I guess. We can
just have loaded/ignored files. In a typical loadFromScrach/invalidate, loadedFiles >>
ignoredFiles and during refresh loadedFiles << ignoredFiles. ignoredFiles now account
for the files that were checked for reload but weren't reloaded since they haven't changed.
IMO that is more clear.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@433
PS5, Line 433: for (HdfsPartition partition: partitions) partition.setFileDescriptors(n
> same question as in the load method.
Answered above.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@448
PS5, Line 448: (fd == null) || (fd.getFileLength() != status.getLen()) ||
             :       (fd.getModificationTime() != status.getModificationTime());
> I think we were also checking if the partition was cached. Isn't this check
Couldn't think of why we need to force reload block md for cached partitions. Hence removed
it. Let me know if it was included for a specific reason.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@455
PS5, Line 455: refreshFileMetadata
> Maybe call it refreshPartitionStorageMetadata? Overall, it may make sense t
PartitionStorageMetadata sounds better, I changed it at multiple places. Let me know if it
looks better.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@694
PS5, Line 694: Exception
> Why this change?
Stale change, forgot to undo. Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@773
PS5, Line 773: HDFS and S3
> just to clarify, HdfsTable covers both hdfs table metadata as well as metad
Correct. We use the same hdfs client library to access both of them.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@783
PS5, Line 783: getFileSystem(CONF)
> I noticed that this is called in many places in this class-- is it bc a giv
Correct. This is one of the overheads as noticed in the perf runs and unfortunately we can't
workaround it


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@787
PS5, Line 787:     int threadPoolSize = FileSystemUtil.supportsStorageIds(tableFs) ?
> What is the expected behavior for tables with mixed FSs?
Just implemented this as a first cut. I don't think it makes sense to loop through all partitions
in this call, instead we need to maintain counters of hdfs and non-hdfs partitions and keep
them updated in adds/drops. Once we have those numbers we can do some fancy things. May be
we can do it as a follow up patch? I can raise a new jira for it and link it to this one


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@801
PS5, Line 801: getLoadingThreadPoolSize
> can different partitions have different number of files? if so, work across
Yes, each partition can have its own no. of files, so the work definitely varies across threads.
It is roughly proportional to the no. of blocks for all the files in that partition directory.
Not sure if that answers your question.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@801
PS5, Line 801: getLoadingThreadPoolSize
> Parallelization of metadata loading is done on per partition granularity.
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@805
PS5, Line 805: ExecutorService partitionLoadingPool = Executors.newFixedThreadPool(threadPoolSize);
> What is the benefit of having a one thread pool per loaded table instead of
I'm summarizing what we discussed so that others following this can comment.

I don't think the overhead is too much, especially given we just start 5/10 threads. While
I agree that having a global thread pool (across all the tables) can give us more control
of the resource consumption/queue bounds, I think it is more complicated and involves writing
proper scheduler.


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@821
PS5, Line 821: debugString
> This will get information from loadingStats but how do you know that loadin
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@1237
PS5, Line 1237: HashMap
> nit: Map
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java@1303
PS5, Line 1303: Returns the HdfsPartition objects associated with the specified list of partition
              :    * names.
> Update comment.
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
File fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java:

http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java@626
PS5, Line 626: private Table addHdfsPartitions(Table tbl, List<Partition> partitions)
             :       throws CatalogException {
             :     Preconditions.checkNotNull(tbl);
             :     Preconditions.checkNotNull(partitions);
             :     if (!(tbl instanceof HdfsTable)) {
             :       throw new CatalogException("Table " + tbl.getFullName() + " is not an
HDFS table");
             :     }
             :     HdfsTable hdfsTable = (HdfsTable) tbl;
             :     List<HdfsPartition> hdfsPartitions = hdfsTable.createAndLoadPartitions(partitions);
             :     for (HdfsPartition hdfsPartition: hdfsPartitions) {
             :       catalog_.addPartition(hdfsPartition);
             :     }
             :     return hdfsTable;
             :   }
> Lot's of duplication with the previous one. How expensive would it be to si
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/util/ListMap.java
File fe/src/main/java/org/apache/impala/util/ListMap.java:

http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/util/ListMap.java@59
PS5, Line 59: synchronized (list_) {
> Why synchronized on list_? This confuses the locking scheme. If you want yo
Done


http://gerrit.cloudera.org:8080/#/c/8235/5/fe/src/main/java/org/apache/impala/util/ListMap.java@78
PS5, Line 78: list_ = Collections.synchronizedList(list);
> Hm, I think this new implementation is way too complicated. I'd start with 
Not totally sure I follow this. Do you mean this list_ need not be synchronized?



-- 
To view, visit http://gerrit.cloudera.org:8080/8235
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I07eaa7151dfc4d56da8db8c2654bd65d8f808481
Gerrit-Change-Number: 8235
Gerrit-PatchSet: 5
Gerrit-Owner: Bharath Vissapragada <bharathv@cloudera.com>
Gerrit-Reviewer: Bharath Vissapragada <bharathv@cloudera.com>
Gerrit-Reviewer: Dimitris Tsirogiannis <dtsirogiannis@cloudera.com>
Gerrit-Reviewer: Jim Apple <jbapple-impala@apache.org>
Gerrit-Reviewer: Mostafa Mokhtar <mmokhtar@cloudera.com>
Gerrit-Reviewer: Vuk Ercegovac <vercegovac@cloudera.com>
Gerrit-Comment-Date: Mon, 16 Oct 2017 18:32:55 +0000
Gerrit-HasComments: Yes

Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message