hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From git-site-r...@apache.org
Subject [26/40] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd.
Date Fri, 23 Jun 2017 15:01:34 GMT
http://git-wip-us.apache.org/repos/asf/hbase-site/blob/e9db7c5d/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreFileManager.html
----------------------------------------------------------------------
diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreFileManager.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreFileManager.html
index b8042dc..05376b4 100644
--- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreFileManager.html
+++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreFileManager.html
@@ -112,71 +112,77 @@
 <span class="sourceLineNo">104</span>  int getStorefileCount();<a name="line.104"></a>
 <span class="sourceLineNo">105</span><a name="line.105"></a>
 <span class="sourceLineNo">106</span>  /**<a name="line.106"></a>
-<span class="sourceLineNo">107</span>   * Gets the store files to scan for a Scan or Get request.<a name="line.107"></a>
-<span class="sourceLineNo">108</span>   * @param startRow Start row of the request.<a name="line.108"></a>
-<span class="sourceLineNo">109</span>   * @param stopRow Stop row of the request.<a name="line.109"></a>
-<span class="sourceLineNo">110</span>   * @return The list of files that are to be read for this request.<a name="line.110"></a>
-<span class="sourceLineNo">111</span>   */<a name="line.111"></a>
-<span class="sourceLineNo">112</span>  Collection&lt;StoreFile&gt; getFilesForScan(byte[] startRow, boolean includeStartRow, byte[] stopRow,<a name="line.112"></a>
-<span class="sourceLineNo">113</span>      boolean includeStopRow);<a name="line.113"></a>
-<span class="sourceLineNo">114</span><a name="line.114"></a>
-<span class="sourceLineNo">115</span>  /**<a name="line.115"></a>
-<span class="sourceLineNo">116</span>   * Gets initial, full list of candidate store files to check for row-key-before.<a name="line.116"></a>
-<span class="sourceLineNo">117</span>   * @param targetKey The key that is the basis of the search.<a name="line.117"></a>
-<span class="sourceLineNo">118</span>   * @return The files that may have the key less than or equal to targetKey, in reverse<a name="line.118"></a>
-<span class="sourceLineNo">119</span>   *         order of new-ness, and preference for target key.<a name="line.119"></a>
-<span class="sourceLineNo">120</span>   */<a name="line.120"></a>
-<span class="sourceLineNo">121</span>  Iterator&lt;StoreFile&gt; getCandidateFilesForRowKeyBefore(<a name="line.121"></a>
-<span class="sourceLineNo">122</span>    KeyValue targetKey<a name="line.122"></a>
-<span class="sourceLineNo">123</span>  );<a name="line.123"></a>
-<span class="sourceLineNo">124</span><a name="line.124"></a>
-<span class="sourceLineNo">125</span>  /**<a name="line.125"></a>
-<span class="sourceLineNo">126</span>   * Updates the candidate list for finding row key before. Based on the list of candidates<a name="line.126"></a>
-<span class="sourceLineNo">127</span>   * remaining to check from getCandidateFilesForRowKeyBefore, targetKey and current candidate,<a name="line.127"></a>
-<span class="sourceLineNo">128</span>   * may trim and reorder the list to remove the files where a better candidate cannot be found.<a name="line.128"></a>
-<span class="sourceLineNo">129</span>   * @param candidateFiles The candidate files not yet checked for better candidates - return<a name="line.129"></a>
-<span class="sourceLineNo">130</span>   *                       value from {@link #getCandidateFilesForRowKeyBefore(KeyValue)},<a name="line.130"></a>
-<span class="sourceLineNo">131</span>   *                       with some files already removed.<a name="line.131"></a>
-<span class="sourceLineNo">132</span>   * @param targetKey The key to search for.<a name="line.132"></a>
-<span class="sourceLineNo">133</span>   * @param candidate The current best candidate found.<a name="line.133"></a>
-<span class="sourceLineNo">134</span>   * @return The list to replace candidateFiles.<a name="line.134"></a>
-<span class="sourceLineNo">135</span>   */<a name="line.135"></a>
-<span class="sourceLineNo">136</span>  Iterator&lt;StoreFile&gt; updateCandidateFilesForRowKeyBefore(<a name="line.136"></a>
-<span class="sourceLineNo">137</span>    Iterator&lt;StoreFile&gt; candidateFiles, KeyValue targetKey, Cell candidate<a name="line.137"></a>
-<span class="sourceLineNo">138</span>  );<a name="line.138"></a>
-<span class="sourceLineNo">139</span><a name="line.139"></a>
-<span class="sourceLineNo">140</span><a name="line.140"></a>
-<span class="sourceLineNo">141</span>  /**<a name="line.141"></a>
-<span class="sourceLineNo">142</span>   * Gets the split point for the split of this set of store files (approx. middle).<a name="line.142"></a>
-<span class="sourceLineNo">143</span>   * @return The mid-point, or null if no split is possible.<a name="line.143"></a>
-<span class="sourceLineNo">144</span>   * @throws IOException<a name="line.144"></a>
-<span class="sourceLineNo">145</span>   */<a name="line.145"></a>
-<span class="sourceLineNo">146</span>  byte[] getSplitPoint() throws IOException;<a name="line.146"></a>
-<span class="sourceLineNo">147</span><a name="line.147"></a>
-<span class="sourceLineNo">148</span>  /**<a name="line.148"></a>
-<span class="sourceLineNo">149</span>   * @return The store compaction priority.<a name="line.149"></a>
-<span class="sourceLineNo">150</span>   */<a name="line.150"></a>
-<span class="sourceLineNo">151</span>  int getStoreCompactionPriority();<a name="line.151"></a>
-<span class="sourceLineNo">152</span><a name="line.152"></a>
-<span class="sourceLineNo">153</span>  /**<a name="line.153"></a>
-<span class="sourceLineNo">154</span>   * @param maxTs Maximum expired timestamp.<a name="line.154"></a>
-<span class="sourceLineNo">155</span>   * @param filesCompacting Files that are currently compacting.<a name="line.155"></a>
-<span class="sourceLineNo">156</span>   * @return The files which don't have any necessary data according to TTL and other criteria.<a name="line.156"></a>
-<span class="sourceLineNo">157</span>   */<a name="line.157"></a>
-<span class="sourceLineNo">158</span>  Collection&lt;StoreFile&gt; getUnneededFiles(long maxTs, List&lt;StoreFile&gt; filesCompacting);<a name="line.158"></a>
-<span class="sourceLineNo">159</span><a name="line.159"></a>
-<span class="sourceLineNo">160</span>  /**<a name="line.160"></a>
-<span class="sourceLineNo">161</span>   * @return the compaction pressure used for compaction throughput tuning.<a name="line.161"></a>
-<span class="sourceLineNo">162</span>   * @see Store#getCompactionPressure()<a name="line.162"></a>
+<span class="sourceLineNo">107</span>   * Returns the number of compacted files.<a name="line.107"></a>
+<span class="sourceLineNo">108</span>   * @return The number of files.<a name="line.108"></a>
+<span class="sourceLineNo">109</span>   */<a name="line.109"></a>
+<span class="sourceLineNo">110</span>  int getCompactedFilesCount();<a name="line.110"></a>
+<span class="sourceLineNo">111</span><a name="line.111"></a>
+<span class="sourceLineNo">112</span>  /**<a name="line.112"></a>
+<span class="sourceLineNo">113</span>   * Gets the store files to scan for a Scan or Get request.<a name="line.113"></a>
+<span class="sourceLineNo">114</span>   * @param startRow Start row of the request.<a name="line.114"></a>
+<span class="sourceLineNo">115</span>   * @param stopRow Stop row of the request.<a name="line.115"></a>
+<span class="sourceLineNo">116</span>   * @return The list of files that are to be read for this request.<a name="line.116"></a>
+<span class="sourceLineNo">117</span>   */<a name="line.117"></a>
+<span class="sourceLineNo">118</span>  Collection&lt;StoreFile&gt; getFilesForScan(byte[] startRow, boolean includeStartRow, byte[] stopRow,<a name="line.118"></a>
+<span class="sourceLineNo">119</span>      boolean includeStopRow);<a name="line.119"></a>
+<span class="sourceLineNo">120</span><a name="line.120"></a>
+<span class="sourceLineNo">121</span>  /**<a name="line.121"></a>
+<span class="sourceLineNo">122</span>   * Gets initial, full list of candidate store files to check for row-key-before.<a name="line.122"></a>
+<span class="sourceLineNo">123</span>   * @param targetKey The key that is the basis of the search.<a name="line.123"></a>
+<span class="sourceLineNo">124</span>   * @return The files that may have the key less than or equal to targetKey, in reverse<a name="line.124"></a>
+<span class="sourceLineNo">125</span>   *         order of new-ness, and preference for target key.<a name="line.125"></a>
+<span class="sourceLineNo">126</span>   */<a name="line.126"></a>
+<span class="sourceLineNo">127</span>  Iterator&lt;StoreFile&gt; getCandidateFilesForRowKeyBefore(<a name="line.127"></a>
+<span class="sourceLineNo">128</span>    KeyValue targetKey<a name="line.128"></a>
+<span class="sourceLineNo">129</span>  );<a name="line.129"></a>
+<span class="sourceLineNo">130</span><a name="line.130"></a>
+<span class="sourceLineNo">131</span>  /**<a name="line.131"></a>
+<span class="sourceLineNo">132</span>   * Updates the candidate list for finding row key before. Based on the list of candidates<a name="line.132"></a>
+<span class="sourceLineNo">133</span>   * remaining to check from getCandidateFilesForRowKeyBefore, targetKey and current candidate,<a name="line.133"></a>
+<span class="sourceLineNo">134</span>   * may trim and reorder the list to remove the files where a better candidate cannot be found.<a name="line.134"></a>
+<span class="sourceLineNo">135</span>   * @param candidateFiles The candidate files not yet checked for better candidates - return<a name="line.135"></a>
+<span class="sourceLineNo">136</span>   *                       value from {@link #getCandidateFilesForRowKeyBefore(KeyValue)},<a name="line.136"></a>
+<span class="sourceLineNo">137</span>   *                       with some files already removed.<a name="line.137"></a>
+<span class="sourceLineNo">138</span>   * @param targetKey The key to search for.<a name="line.138"></a>
+<span class="sourceLineNo">139</span>   * @param candidate The current best candidate found.<a name="line.139"></a>
+<span class="sourceLineNo">140</span>   * @return The list to replace candidateFiles.<a name="line.140"></a>
+<span class="sourceLineNo">141</span>   */<a name="line.141"></a>
+<span class="sourceLineNo">142</span>  Iterator&lt;StoreFile&gt; updateCandidateFilesForRowKeyBefore(<a name="line.142"></a>
+<span class="sourceLineNo">143</span>    Iterator&lt;StoreFile&gt; candidateFiles, KeyValue targetKey, Cell candidate<a name="line.143"></a>
+<span class="sourceLineNo">144</span>  );<a name="line.144"></a>
+<span class="sourceLineNo">145</span><a name="line.145"></a>
+<span class="sourceLineNo">146</span><a name="line.146"></a>
+<span class="sourceLineNo">147</span>  /**<a name="line.147"></a>
+<span class="sourceLineNo">148</span>   * Gets the split point for the split of this set of store files (approx. middle).<a name="line.148"></a>
+<span class="sourceLineNo">149</span>   * @return The mid-point, or null if no split is possible.<a name="line.149"></a>
+<span class="sourceLineNo">150</span>   * @throws IOException<a name="line.150"></a>
+<span class="sourceLineNo">151</span>   */<a name="line.151"></a>
+<span class="sourceLineNo">152</span>  byte[] getSplitPoint() throws IOException;<a name="line.152"></a>
+<span class="sourceLineNo">153</span><a name="line.153"></a>
+<span class="sourceLineNo">154</span>  /**<a name="line.154"></a>
+<span class="sourceLineNo">155</span>   * @return The store compaction priority.<a name="line.155"></a>
+<span class="sourceLineNo">156</span>   */<a name="line.156"></a>
+<span class="sourceLineNo">157</span>  int getStoreCompactionPriority();<a name="line.157"></a>
+<span class="sourceLineNo">158</span><a name="line.158"></a>
+<span class="sourceLineNo">159</span>  /**<a name="line.159"></a>
+<span class="sourceLineNo">160</span>   * @param maxTs Maximum expired timestamp.<a name="line.160"></a>
+<span class="sourceLineNo">161</span>   * @param filesCompacting Files that are currently compacting.<a name="line.161"></a>
+<span class="sourceLineNo">162</span>   * @return The files which don't have any necessary data according to TTL and other criteria.<a name="line.162"></a>
 <span class="sourceLineNo">163</span>   */<a name="line.163"></a>
-<span class="sourceLineNo">164</span>  double getCompactionPressure();<a name="line.164"></a>
+<span class="sourceLineNo">164</span>  Collection&lt;StoreFile&gt; getUnneededFiles(long maxTs, List&lt;StoreFile&gt; filesCompacting);<a name="line.164"></a>
 <span class="sourceLineNo">165</span><a name="line.165"></a>
 <span class="sourceLineNo">166</span>  /**<a name="line.166"></a>
-<span class="sourceLineNo">167</span>   * @return the comparator used to sort storefiles. Usually, the<a name="line.167"></a>
-<span class="sourceLineNo">168</span>   *         {@link StoreFile#getMaxSequenceId()} is the first priority.<a name="line.168"></a>
+<span class="sourceLineNo">167</span>   * @return the compaction pressure used for compaction throughput tuning.<a name="line.167"></a>
+<span class="sourceLineNo">168</span>   * @see Store#getCompactionPressure()<a name="line.168"></a>
 <span class="sourceLineNo">169</span>   */<a name="line.169"></a>
-<span class="sourceLineNo">170</span>  Comparator&lt;StoreFile&gt; getStoreFileComparator();<a name="line.170"></a>
-<span class="sourceLineNo">171</span>}<a name="line.171"></a>
+<span class="sourceLineNo">170</span>  double getCompactionPressure();<a name="line.170"></a>
+<span class="sourceLineNo">171</span><a name="line.171"></a>
+<span class="sourceLineNo">172</span>  /**<a name="line.172"></a>
+<span class="sourceLineNo">173</span>   * @return the comparator used to sort storefiles. Usually, the<a name="line.173"></a>
+<span class="sourceLineNo">174</span>   *         {@link StoreFile#getMaxSequenceId()} is the first priority.<a name="line.174"></a>
+<span class="sourceLineNo">175</span>   */<a name="line.175"></a>
+<span class="sourceLineNo">176</span>  Comparator&lt;StoreFile&gt; getStoreFileComparator();<a name="line.176"></a>
+<span class="sourceLineNo">177</span>}<a name="line.177"></a>
 
 
 

http://git-wip-us.apache.org/repos/asf/hbase-site/blob/e9db7c5d/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreScanner.StoreScannerCompactionRace.html
----------------------------------------------------------------------
diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreScanner.StoreScannerCompactionRace.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreScanner.StoreScannerCompactionRace.html
index b2b7ff9..b32645e 100644
--- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreScanner.StoreScannerCompactionRace.html
+++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreScanner.StoreScannerCompactionRace.html
@@ -974,185 +974,179 @@
 <span class="sourceLineNo">966</span>    return heap.reseek(kv);<a name="line.966"></a>
 <span class="sourceLineNo">967</span>  }<a name="line.967"></a>
 <span class="sourceLineNo">968</span><a name="line.968"></a>
-<span class="sourceLineNo">969</span>  private void trySwitchToStreamRead() {<a name="line.969"></a>
-<span class="sourceLineNo">970</span>    if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null ||<a name="line.970"></a>
-<span class="sourceLineNo">971</span>        bytesRead &lt; preadMaxBytes) {<a name="line.971"></a>
-<span class="sourceLineNo">972</span>      return;<a name="line.972"></a>
-<span class="sourceLineNo">973</span>    }<a name="line.973"></a>
-<span class="sourceLineNo">974</span>    if (LOG.isDebugEnabled()) {<a name="line.974"></a>
-<span class="sourceLineNo">975</span>      LOG.debug("Switch to stream read because we have already read " + bytesRead +<a name="line.975"></a>
-<span class="sourceLineNo">976</span>          " bytes from this scanner");<a name="line.976"></a>
-<span class="sourceLineNo">977</span>    }<a name="line.977"></a>
-<span class="sourceLineNo">978</span>    scanUsePread = false;<a name="line.978"></a>
-<span class="sourceLineNo">979</span>    Cell lastTop = heap.peek();<a name="line.979"></a>
-<span class="sourceLineNo">980</span>    Map&lt;String, StoreFile&gt; name2File = new HashMap&lt;&gt;(store.getStorefilesCount());<a name="line.980"></a>
-<span class="sourceLineNo">981</span>    for (StoreFile file : store.getStorefiles()) {<a name="line.981"></a>
-<span class="sourceLineNo">982</span>      name2File.put(file.getFileInfo().getActiveFileName(), file);<a name="line.982"></a>
-<span class="sourceLineNo">983</span>    }<a name="line.983"></a>
-<span class="sourceLineNo">984</span>    List&lt;StoreFile&gt; filesToReopen = new ArrayList&lt;&gt;();<a name="line.984"></a>
-<span class="sourceLineNo">985</span>    List&lt;KeyValueScanner&gt; memstoreScanners = new ArrayList&lt;&gt;();<a name="line.985"></a>
-<span class="sourceLineNo">986</span>    List&lt;KeyValueScanner&gt; scannersToClose = new ArrayList&lt;&gt;();<a name="line.986"></a>
-<span class="sourceLineNo">987</span>    for (KeyValueScanner kvs : currentScanners) {<a name="line.987"></a>
-<span class="sourceLineNo">988</span>      if (!kvs.isFileScanner()) {<a name="line.988"></a>
-<span class="sourceLineNo">989</span>        memstoreScanners.add(kvs);<a name="line.989"></a>
-<span class="sourceLineNo">990</span>      } else {<a name="line.990"></a>
-<span class="sourceLineNo">991</span>        scannersToClose.add(kvs);<a name="line.991"></a>
-<span class="sourceLineNo">992</span>        if (kvs.peek() == null) {<a name="line.992"></a>
-<span class="sourceLineNo">993</span>          continue;<a name="line.993"></a>
-<span class="sourceLineNo">994</span>        }<a name="line.994"></a>
-<span class="sourceLineNo">995</span>        filesToReopen.add(name2File.get(kvs.getFilePath().getName()));<a name="line.995"></a>
-<span class="sourceLineNo">996</span>      }<a name="line.996"></a>
-<span class="sourceLineNo">997</span>    }<a name="line.997"></a>
-<span class="sourceLineNo">998</span>    if (filesToReopen.isEmpty()) {<a name="line.998"></a>
-<span class="sourceLineNo">999</span>      return;<a name="line.999"></a>
-<span class="sourceLineNo">1000</span>    }<a name="line.1000"></a>
-<span class="sourceLineNo">1001</span>    List&lt;KeyValueScanner&gt; fileScanners = null;<a name="line.1001"></a>
-<span class="sourceLineNo">1002</span>    List&lt;KeyValueScanner&gt; newCurrentScanners;<a name="line.1002"></a>
-<span class="sourceLineNo">1003</span>    KeyValueHeap newHeap;<a name="line.1003"></a>
-<span class="sourceLineNo">1004</span>    try {<a name="line.1004"></a>
-<span class="sourceLineNo">1005</span>      fileScanners =<a name="line.1005"></a>
-<span class="sourceLineNo">1006</span>          store.getScanners(filesToReopen, cacheBlocks, false, false, matcher, scan.getStartRow(),<a name="line.1006"></a>
-<span class="sourceLineNo">1007</span>            scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), readPt, false);<a name="line.1007"></a>
-<span class="sourceLineNo">1008</span>      seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);<a name="line.1008"></a>
-<span class="sourceLineNo">1009</span>      newCurrentScanners = new ArrayList&lt;&gt;(fileScanners.size() + memstoreScanners.size());<a name="line.1009"></a>
-<span class="sourceLineNo">1010</span>      newCurrentScanners.addAll(fileScanners);<a name="line.1010"></a>
-<span class="sourceLineNo">1011</span>      newCurrentScanners.addAll(memstoreScanners);<a name="line.1011"></a>
-<span class="sourceLineNo">1012</span>      newHeap = new KeyValueHeap(newCurrentScanners, store.getComparator());<a name="line.1012"></a>
-<span class="sourceLineNo">1013</span>    } catch (Exception e) {<a name="line.1013"></a>
-<span class="sourceLineNo">1014</span>      LOG.warn("failed to switch to stream read", e);<a name="line.1014"></a>
-<span class="sourceLineNo">1015</span>      if (fileScanners != null) {<a name="line.1015"></a>
-<span class="sourceLineNo">1016</span>        fileScanners.forEach(KeyValueScanner::close);<a name="line.1016"></a>
-<span class="sourceLineNo">1017</span>      }<a name="line.1017"></a>
-<span class="sourceLineNo">1018</span>      return;<a name="line.1018"></a>
-<span class="sourceLineNo">1019</span>    }<a name="line.1019"></a>
-<span class="sourceLineNo">1020</span>    currentScanners.clear();<a name="line.1020"></a>
-<span class="sourceLineNo">1021</span>    addCurrentScanners(newCurrentScanners);<a name="line.1021"></a>
-<span class="sourceLineNo">1022</span>    this.heap = newHeap;<a name="line.1022"></a>
-<span class="sourceLineNo">1023</span>    resetQueryMatcher(lastTop);<a name="line.1023"></a>
-<span class="sourceLineNo">1024</span>    scannersToClose.forEach(KeyValueScanner::close);<a name="line.1024"></a>
-<span class="sourceLineNo">1025</span>  }<a name="line.1025"></a>
-<span class="sourceLineNo">1026</span><a name="line.1026"></a>
-<span class="sourceLineNo">1027</span>  protected final boolean checkFlushed() {<a name="line.1027"></a>
-<span class="sourceLineNo">1028</span>    // check the var without any lock. Suppose even if we see the old<a name="line.1028"></a>
-<span class="sourceLineNo">1029</span>    // value here still it is ok to continue because we will not be resetting<a name="line.1029"></a>
-<span class="sourceLineNo">1030</span>    // the heap but will continue with the referenced memstore's snapshot. For compactions<a name="line.1030"></a>
-<span class="sourceLineNo">1031</span>    // any way we don't need the updateReaders at all to happen as we still continue with<a name="line.1031"></a>
-<span class="sourceLineNo">1032</span>    // the older files<a name="line.1032"></a>
-<span class="sourceLineNo">1033</span>    if (flushed) {<a name="line.1033"></a>
-<span class="sourceLineNo">1034</span>      // If there is a flush and the current scan is notified on the flush ensure that the<a name="line.1034"></a>
-<span class="sourceLineNo">1035</span>      // scan's heap gets reset and we do a seek on the newly flushed file.<a name="line.1035"></a>
-<span class="sourceLineNo">1036</span>      if (this.closing) {<a name="line.1036"></a>
-<span class="sourceLineNo">1037</span>        return false;<a name="line.1037"></a>
-<span class="sourceLineNo">1038</span>      }<a name="line.1038"></a>
-<span class="sourceLineNo">1039</span>      // reset the flag<a name="line.1039"></a>
-<span class="sourceLineNo">1040</span>      flushed = false;<a name="line.1040"></a>
-<span class="sourceLineNo">1041</span>      return true;<a name="line.1041"></a>
-<span class="sourceLineNo">1042</span>    }<a name="line.1042"></a>
-<span class="sourceLineNo">1043</span>    return false;<a name="line.1043"></a>
-<span class="sourceLineNo">1044</span>  }<a name="line.1044"></a>
-<span class="sourceLineNo">1045</span><a name="line.1045"></a>
-<span class="sourceLineNo">1046</span>  /**<a name="line.1046"></a>
-<span class="sourceLineNo">1047</span>   * @see KeyValueScanner#getScannerOrder()<a name="line.1047"></a>
-<span class="sourceLineNo">1048</span>   */<a name="line.1048"></a>
-<span class="sourceLineNo">1049</span>  @Override<a name="line.1049"></a>
-<span class="sourceLineNo">1050</span>  public long getScannerOrder() {<a name="line.1050"></a>
-<span class="sourceLineNo">1051</span>    return 0;<a name="line.1051"></a>
-<span class="sourceLineNo">1052</span>  }<a name="line.1052"></a>
-<span class="sourceLineNo">1053</span><a name="line.1053"></a>
-<span class="sourceLineNo">1054</span>  /**<a name="line.1054"></a>
-<span class="sourceLineNo">1055</span>   * Seek storefiles in parallel to optimize IO latency as much as possible<a name="line.1055"></a>
-<span class="sourceLineNo">1056</span>   * @param scanners the list {@link KeyValueScanner}s to be read from<a name="line.1056"></a>
-<span class="sourceLineNo">1057</span>   * @param kv the KeyValue on which the operation is being requested<a name="line.1057"></a>
-<span class="sourceLineNo">1058</span>   * @throws IOException<a name="line.1058"></a>
-<span class="sourceLineNo">1059</span>   */<a name="line.1059"></a>
-<span class="sourceLineNo">1060</span>  private void parallelSeek(final List&lt;? extends KeyValueScanner&gt;<a name="line.1060"></a>
-<span class="sourceLineNo">1061</span>      scanners, final Cell kv) throws IOException {<a name="line.1061"></a>
-<span class="sourceLineNo">1062</span>    if (scanners.isEmpty()) return;<a name="line.1062"></a>
-<span class="sourceLineNo">1063</span>    int storeFileScannerCount = scanners.size();<a name="line.1063"></a>
-<span class="sourceLineNo">1064</span>    CountDownLatch latch = new CountDownLatch(storeFileScannerCount);<a name="line.1064"></a>
-<span class="sourceLineNo">1065</span>    List&lt;ParallelSeekHandler&gt; handlers = new ArrayList&lt;&gt;(storeFileScannerCount);<a name="line.1065"></a>
-<span class="sourceLineNo">1066</span>    for (KeyValueScanner scanner : scanners) {<a name="line.1066"></a>
-<span class="sourceLineNo">1067</span>      if (scanner instanceof StoreFileScanner) {<a name="line.1067"></a>
-<span class="sourceLineNo">1068</span>        ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,<a name="line.1068"></a>
-<span class="sourceLineNo">1069</span>          this.readPt, latch);<a name="line.1069"></a>
-<span class="sourceLineNo">1070</span>        executor.submit(seekHandler);<a name="line.1070"></a>
-<span class="sourceLineNo">1071</span>        handlers.add(seekHandler);<a name="line.1071"></a>
-<span class="sourceLineNo">1072</span>      } else {<a name="line.1072"></a>
-<span class="sourceLineNo">1073</span>        scanner.seek(kv);<a name="line.1073"></a>
-<span class="sourceLineNo">1074</span>        latch.countDown();<a name="line.1074"></a>
-<span class="sourceLineNo">1075</span>      }<a name="line.1075"></a>
+<span class="sourceLineNo">969</span>  @VisibleForTesting<a name="line.969"></a>
+<span class="sourceLineNo">970</span>  void trySwitchToStreamRead() {<a name="line.970"></a>
+<span class="sourceLineNo">971</span>    if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null ||<a name="line.971"></a>
+<span class="sourceLineNo">972</span>        bytesRead &lt; preadMaxBytes) {<a name="line.972"></a>
+<span class="sourceLineNo">973</span>      return;<a name="line.973"></a>
+<span class="sourceLineNo">974</span>    }<a name="line.974"></a>
+<span class="sourceLineNo">975</span>    if (LOG.isDebugEnabled()) {<a name="line.975"></a>
+<span class="sourceLineNo">976</span>      LOG.debug("Switch to stream read because we have already read " + bytesRead +<a name="line.976"></a>
+<span class="sourceLineNo">977</span>          " bytes from this scanner");<a name="line.977"></a>
+<span class="sourceLineNo">978</span>    }<a name="line.978"></a>
+<span class="sourceLineNo">979</span>    scanUsePread = false;<a name="line.979"></a>
+<span class="sourceLineNo">980</span>    Cell lastTop = heap.peek();<a name="line.980"></a>
+<span class="sourceLineNo">981</span>    List&lt;KeyValueScanner&gt; memstoreScanners = new ArrayList&lt;&gt;();<a name="line.981"></a>
+<span class="sourceLineNo">982</span>    List&lt;KeyValueScanner&gt; scannersToClose = new ArrayList&lt;&gt;();<a name="line.982"></a>
+<span class="sourceLineNo">983</span>    for (KeyValueScanner kvs : currentScanners) {<a name="line.983"></a>
+<span class="sourceLineNo">984</span>      if (!kvs.isFileScanner()) {<a name="line.984"></a>
+<span class="sourceLineNo">985</span>        // collect memstorescanners here<a name="line.985"></a>
+<span class="sourceLineNo">986</span>        memstoreScanners.add(kvs);<a name="line.986"></a>
+<span class="sourceLineNo">987</span>      } else {<a name="line.987"></a>
+<span class="sourceLineNo">988</span>        scannersToClose.add(kvs);<a name="line.988"></a>
+<span class="sourceLineNo">989</span>      }<a name="line.989"></a>
+<span class="sourceLineNo">990</span>    }<a name="line.990"></a>
+<span class="sourceLineNo">991</span>    List&lt;KeyValueScanner&gt; fileScanners = null;<a name="line.991"></a>
+<span class="sourceLineNo">992</span>    List&lt;KeyValueScanner&gt; newCurrentScanners;<a name="line.992"></a>
+<span class="sourceLineNo">993</span>    KeyValueHeap newHeap;<a name="line.993"></a>
+<span class="sourceLineNo">994</span>    try {<a name="line.994"></a>
+<span class="sourceLineNo">995</span>      // recreate the scanners on the current file scanners<a name="line.995"></a>
+<span class="sourceLineNo">996</span>      fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false,<a name="line.996"></a>
+<span class="sourceLineNo">997</span>        matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(),<a name="line.997"></a>
+<span class="sourceLineNo">998</span>        scan.includeStopRow(), readPt, false);<a name="line.998"></a>
+<span class="sourceLineNo">999</span>      if (fileScanners == null) {<a name="line.999"></a>
+<span class="sourceLineNo">1000</span>        return;<a name="line.1000"></a>
+<span class="sourceLineNo">1001</span>      }<a name="line.1001"></a>
+<span class="sourceLineNo">1002</span>      seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);<a name="line.1002"></a>
+<span class="sourceLineNo">1003</span>      newCurrentScanners = new ArrayList&lt;&gt;(fileScanners.size() + memstoreScanners.size());<a name="line.1003"></a>
+<span class="sourceLineNo">1004</span>      newCurrentScanners.addAll(fileScanners);<a name="line.1004"></a>
+<span class="sourceLineNo">1005</span>      newCurrentScanners.addAll(memstoreScanners);<a name="line.1005"></a>
+<span class="sourceLineNo">1006</span>      newHeap = new KeyValueHeap(newCurrentScanners, store.getComparator());<a name="line.1006"></a>
+<span class="sourceLineNo">1007</span>    } catch (Exception e) {<a name="line.1007"></a>
+<span class="sourceLineNo">1008</span>      LOG.warn("failed to switch to stream read", e);<a name="line.1008"></a>
+<span class="sourceLineNo">1009</span>      if (fileScanners != null) {<a name="line.1009"></a>
+<span class="sourceLineNo">1010</span>        fileScanners.forEach(KeyValueScanner::close);<a name="line.1010"></a>
+<span class="sourceLineNo">1011</span>      }<a name="line.1011"></a>
+<span class="sourceLineNo">1012</span>      return;<a name="line.1012"></a>
+<span class="sourceLineNo">1013</span>    }<a name="line.1013"></a>
+<span class="sourceLineNo">1014</span>    currentScanners.clear();<a name="line.1014"></a>
+<span class="sourceLineNo">1015</span>    addCurrentScanners(newCurrentScanners);<a name="line.1015"></a>
+<span class="sourceLineNo">1016</span>    this.heap = newHeap;<a name="line.1016"></a>
+<span class="sourceLineNo">1017</span>    resetQueryMatcher(lastTop);<a name="line.1017"></a>
+<span class="sourceLineNo">1018</span>    scannersToClose.forEach(KeyValueScanner::close);<a name="line.1018"></a>
+<span class="sourceLineNo">1019</span>  }<a name="line.1019"></a>
+<span class="sourceLineNo">1020</span><a name="line.1020"></a>
+<span class="sourceLineNo">1021</span>  protected final boolean checkFlushed() {<a name="line.1021"></a>
+<span class="sourceLineNo">1022</span>    // check the var without any lock. Suppose even if we see the old<a name="line.1022"></a>
+<span class="sourceLineNo">1023</span>    // value here still it is ok to continue because we will not be resetting<a name="line.1023"></a>
+<span class="sourceLineNo">1024</span>    // the heap but will continue with the referenced memstore's snapshot. For compactions<a name="line.1024"></a>
+<span class="sourceLineNo">1025</span>    // any way we don't need the updateReaders at all to happen as we still continue with<a name="line.1025"></a>
+<span class="sourceLineNo">1026</span>    // the older files<a name="line.1026"></a>
+<span class="sourceLineNo">1027</span>    if (flushed) {<a name="line.1027"></a>
+<span class="sourceLineNo">1028</span>      // If there is a flush and the current scan is notified on the flush ensure that the<a name="line.1028"></a>
+<span class="sourceLineNo">1029</span>      // scan's heap gets reset and we do a seek on the newly flushed file.<a name="line.1029"></a>
+<span class="sourceLineNo">1030</span>      if (this.closing) {<a name="line.1030"></a>
+<span class="sourceLineNo">1031</span>        return false;<a name="line.1031"></a>
+<span class="sourceLineNo">1032</span>      }<a name="line.1032"></a>
+<span class="sourceLineNo">1033</span>      // reset the flag<a name="line.1033"></a>
+<span class="sourceLineNo">1034</span>      flushed = false;<a name="line.1034"></a>
+<span class="sourceLineNo">1035</span>      return true;<a name="line.1035"></a>
+<span class="sourceLineNo">1036</span>    }<a name="line.1036"></a>
+<span class="sourceLineNo">1037</span>    return false;<a name="line.1037"></a>
+<span class="sourceLineNo">1038</span>  }<a name="line.1038"></a>
+<span class="sourceLineNo">1039</span><a name="line.1039"></a>
+<span class="sourceLineNo">1040</span>  /**<a name="line.1040"></a>
+<span class="sourceLineNo">1041</span>   * @see KeyValueScanner#getScannerOrder()<a name="line.1041"></a>
+<span class="sourceLineNo">1042</span>   */<a name="line.1042"></a>
+<span class="sourceLineNo">1043</span>  @Override<a name="line.1043"></a>
+<span class="sourceLineNo">1044</span>  public long getScannerOrder() {<a name="line.1044"></a>
+<span class="sourceLineNo">1045</span>    return 0;<a name="line.1045"></a>
+<span class="sourceLineNo">1046</span>  }<a name="line.1046"></a>
+<span class="sourceLineNo">1047</span><a name="line.1047"></a>
+<span class="sourceLineNo">1048</span>  /**<a name="line.1048"></a>
+<span class="sourceLineNo">1049</span>   * Seek storefiles in parallel to optimize IO latency as much as possible<a name="line.1049"></a>
+<span class="sourceLineNo">1050</span>   * @param scanners the list {@link KeyValueScanner}s to be read from<a name="line.1050"></a>
+<span class="sourceLineNo">1051</span>   * @param kv the KeyValue on which the operation is being requested<a name="line.1051"></a>
+<span class="sourceLineNo">1052</span>   * @throws IOException<a name="line.1052"></a>
+<span class="sourceLineNo">1053</span>   */<a name="line.1053"></a>
+<span class="sourceLineNo">1054</span>  private void parallelSeek(final List&lt;? extends KeyValueScanner&gt;<a name="line.1054"></a>
+<span class="sourceLineNo">1055</span>      scanners, final Cell kv) throws IOException {<a name="line.1055"></a>
+<span class="sourceLineNo">1056</span>    if (scanners.isEmpty()) return;<a name="line.1056"></a>
+<span class="sourceLineNo">1057</span>    int storeFileScannerCount = scanners.size();<a name="line.1057"></a>
+<span class="sourceLineNo">1058</span>    CountDownLatch latch = new CountDownLatch(storeFileScannerCount);<a name="line.1058"></a>
+<span class="sourceLineNo">1059</span>    List&lt;ParallelSeekHandler&gt; handlers = new ArrayList&lt;&gt;(storeFileScannerCount);<a name="line.1059"></a>
+<span class="sourceLineNo">1060</span>    for (KeyValueScanner scanner : scanners) {<a name="line.1060"></a>
+<span class="sourceLineNo">1061</span>      if (scanner instanceof StoreFileScanner) {<a name="line.1061"></a>
+<span class="sourceLineNo">1062</span>        ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,<a name="line.1062"></a>
+<span class="sourceLineNo">1063</span>          this.readPt, latch);<a name="line.1063"></a>
+<span class="sourceLineNo">1064</span>        executor.submit(seekHandler);<a name="line.1064"></a>
+<span class="sourceLineNo">1065</span>        handlers.add(seekHandler);<a name="line.1065"></a>
+<span class="sourceLineNo">1066</span>      } else {<a name="line.1066"></a>
+<span class="sourceLineNo">1067</span>        scanner.seek(kv);<a name="line.1067"></a>
+<span class="sourceLineNo">1068</span>        latch.countDown();<a name="line.1068"></a>
+<span class="sourceLineNo">1069</span>      }<a name="line.1069"></a>
+<span class="sourceLineNo">1070</span>    }<a name="line.1070"></a>
+<span class="sourceLineNo">1071</span><a name="line.1071"></a>
+<span class="sourceLineNo">1072</span>    try {<a name="line.1072"></a>
+<span class="sourceLineNo">1073</span>      latch.await();<a name="line.1073"></a>
+<span class="sourceLineNo">1074</span>    } catch (InterruptedException ie) {<a name="line.1074"></a>
+<span class="sourceLineNo">1075</span>      throw (InterruptedIOException)new InterruptedIOException().initCause(ie);<a name="line.1075"></a>
 <span class="sourceLineNo">1076</span>    }<a name="line.1076"></a>
 <span class="sourceLineNo">1077</span><a name="line.1077"></a>
-<span class="sourceLineNo">1078</span>    try {<a name="line.1078"></a>
-<span class="sourceLineNo">1079</span>      latch.await();<a name="line.1079"></a>
-<span class="sourceLineNo">1080</span>    } catch (InterruptedException ie) {<a name="line.1080"></a>
-<span class="sourceLineNo">1081</span>      throw (InterruptedIOException)new InterruptedIOException().initCause(ie);<a name="line.1081"></a>
+<span class="sourceLineNo">1078</span>    for (ParallelSeekHandler handler : handlers) {<a name="line.1078"></a>
+<span class="sourceLineNo">1079</span>      if (handler.getErr() != null) {<a name="line.1079"></a>
+<span class="sourceLineNo">1080</span>        throw new IOException(handler.getErr());<a name="line.1080"></a>
+<span class="sourceLineNo">1081</span>      }<a name="line.1081"></a>
 <span class="sourceLineNo">1082</span>    }<a name="line.1082"></a>
-<span class="sourceLineNo">1083</span><a name="line.1083"></a>
-<span class="sourceLineNo">1084</span>    for (ParallelSeekHandler handler : handlers) {<a name="line.1084"></a>
-<span class="sourceLineNo">1085</span>      if (handler.getErr() != null) {<a name="line.1085"></a>
-<span class="sourceLineNo">1086</span>        throw new IOException(handler.getErr());<a name="line.1086"></a>
-<span class="sourceLineNo">1087</span>      }<a name="line.1087"></a>
-<span class="sourceLineNo">1088</span>    }<a name="line.1088"></a>
-<span class="sourceLineNo">1089</span>  }<a name="line.1089"></a>
-<span class="sourceLineNo">1090</span><a name="line.1090"></a>
-<span class="sourceLineNo">1091</span>  /**<a name="line.1091"></a>
-<span class="sourceLineNo">1092</span>   * Used in testing.<a name="line.1092"></a>
-<span class="sourceLineNo">1093</span>   * @return all scanners in no particular order<a name="line.1093"></a>
-<span class="sourceLineNo">1094</span>   */<a name="line.1094"></a>
-<span class="sourceLineNo">1095</span>  @VisibleForTesting<a name="line.1095"></a>
-<span class="sourceLineNo">1096</span>  List&lt;KeyValueScanner&gt; getAllScannersForTesting() {<a name="line.1096"></a>
-<span class="sourceLineNo">1097</span>    List&lt;KeyValueScanner&gt; allScanners = new ArrayList&lt;&gt;();<a name="line.1097"></a>
-<span class="sourceLineNo">1098</span>    KeyValueScanner current = heap.getCurrentForTesting();<a name="line.1098"></a>
-<span class="sourceLineNo">1099</span>    if (current != null)<a name="line.1099"></a>
-<span class="sourceLineNo">1100</span>      allScanners.add(current);<a name="line.1100"></a>
-<span class="sourceLineNo">1101</span>    for (KeyValueScanner scanner : heap.getHeap())<a name="line.1101"></a>
-<span class="sourceLineNo">1102</span>      allScanners.add(scanner);<a name="line.1102"></a>
-<span class="sourceLineNo">1103</span>    return allScanners;<a name="line.1103"></a>
-<span class="sourceLineNo">1104</span>  }<a name="line.1104"></a>
-<span class="sourceLineNo">1105</span><a name="line.1105"></a>
-<span class="sourceLineNo">1106</span>  static void enableLazySeekGlobally(boolean enable) {<a name="line.1106"></a>
-<span class="sourceLineNo">1107</span>    lazySeekEnabledGlobally = enable;<a name="line.1107"></a>
-<span class="sourceLineNo">1108</span>  }<a name="line.1108"></a>
-<span class="sourceLineNo">1109</span><a name="line.1109"></a>
-<span class="sourceLineNo">1110</span>  /**<a name="line.1110"></a>
-<span class="sourceLineNo">1111</span>   * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).<a name="line.1111"></a>
-<span class="sourceLineNo">1112</span>   */<a name="line.1112"></a>
-<span class="sourceLineNo">1113</span>  public long getEstimatedNumberOfKvsScanned() {<a name="line.1113"></a>
-<span class="sourceLineNo">1114</span>    return this.kvsScanned;<a name="line.1114"></a>
-<span class="sourceLineNo">1115</span>  }<a name="line.1115"></a>
-<span class="sourceLineNo">1116</span><a name="line.1116"></a>
-<span class="sourceLineNo">1117</span>  @Override<a name="line.1117"></a>
-<span class="sourceLineNo">1118</span>  public Cell getNextIndexedKey() {<a name="line.1118"></a>
-<span class="sourceLineNo">1119</span>    return this.heap.getNextIndexedKey();<a name="line.1119"></a>
-<span class="sourceLineNo">1120</span>  }<a name="line.1120"></a>
-<span class="sourceLineNo">1121</span><a name="line.1121"></a>
-<span class="sourceLineNo">1122</span>  @Override<a name="line.1122"></a>
-<span class="sourceLineNo">1123</span>  public void shipped() throws IOException {<a name="line.1123"></a>
-<span class="sourceLineNo">1124</span>    if (prevCell != null) {<a name="line.1124"></a>
-<span class="sourceLineNo">1125</span>      // Do the copy here so that in case the prevCell ref is pointing to the previous<a name="line.1125"></a>
-<span class="sourceLineNo">1126</span>      // blocks we can safely release those blocks.<a name="line.1126"></a>
-<span class="sourceLineNo">1127</span>      // This applies to blocks that are got from Bucket cache, L1 cache and the blocks<a name="line.1127"></a>
-<span class="sourceLineNo">1128</span>      // fetched from HDFS. Copying this would ensure that we let go the references to these<a name="line.1128"></a>
-<span class="sourceLineNo">1129</span>      // blocks so that they can be GCed safely(in case of bucket cache)<a name="line.1129"></a>
-<span class="sourceLineNo">1130</span>      prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);<a name="line.1130"></a>
-<span class="sourceLineNo">1131</span>    }<a name="line.1131"></a>
-<span class="sourceLineNo">1132</span>    matcher.beforeShipped();<a name="line.1132"></a>
-<span class="sourceLineNo">1133</span>    // There wont be further fetch of Cells from these scanners. Just close.<a name="line.1133"></a>
-<span class="sourceLineNo">1134</span>    clearAndClose(scannersForDelayedClose);<a name="line.1134"></a>
-<span class="sourceLineNo">1135</span>    if (this.heap != null) {<a name="line.1135"></a>
-<span class="sourceLineNo">1136</span>      this.heap.shipped();<a name="line.1136"></a>
-<span class="sourceLineNo">1137</span>      // When switching from pread to stream, we will open a new scanner for each store file, but<a name="line.1137"></a>
-<span class="sourceLineNo">1138</span>      // the old scanner may still track the HFileBlocks we have scanned but not sent back to client<a name="line.1138"></a>
-<span class="sourceLineNo">1139</span>      // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others<a name="line.1139"></a>
-<span class="sourceLineNo">1140</span>      // before we serialize and send it back to client. The HFileBlocks will be released in shipped<a name="line.1140"></a>
-<span class="sourceLineNo">1141</span>      // method, so we here will also open new scanners and close old scanners in shipped method.<a name="line.1141"></a>
-<span class="sourceLineNo">1142</span>      // See HBASE-18055 for more details.<a name="line.1142"></a>
-<span class="sourceLineNo">1143</span>      trySwitchToStreamRead();<a name="line.1143"></a>
-<span class="sourceLineNo">1144</span>    }<a name="line.1144"></a>
-<span class="sourceLineNo">1145</span>  }<a name="line.1145"></a>
-<span class="sourceLineNo">1146</span>}<a name="line.1146"></a>
-<span class="sourceLineNo">1147</span><a name="line.1147"></a>
+<span class="sourceLineNo">1083</span>  }<a name="line.1083"></a>
+<span class="sourceLineNo">1084</span><a name="line.1084"></a>
+<span class="sourceLineNo">1085</span>  /**<a name="line.1085"></a>
+<span class="sourceLineNo">1086</span>   * Used in testing.<a name="line.1086"></a>
+<span class="sourceLineNo">1087</span>   * @return all scanners in no particular order<a name="line.1087"></a>
+<span class="sourceLineNo">1088</span>   */<a name="line.1088"></a>
+<span class="sourceLineNo">1089</span>  @VisibleForTesting<a name="line.1089"></a>
+<span class="sourceLineNo">1090</span>  List&lt;KeyValueScanner&gt; getAllScannersForTesting() {<a name="line.1090"></a>
+<span class="sourceLineNo">1091</span>    List&lt;KeyValueScanner&gt; allScanners = new ArrayList&lt;&gt;();<a name="line.1091"></a>
+<span class="sourceLineNo">1092</span>    KeyValueScanner current = heap.getCurrentForTesting();<a name="line.1092"></a>
+<span class="sourceLineNo">1093</span>    if (current != null)<a name="line.1093"></a>
+<span class="sourceLineNo">1094</span>      allScanners.add(current);<a name="line.1094"></a>
+<span class="sourceLineNo">1095</span>    for (KeyValueScanner scanner : heap.getHeap())<a name="line.1095"></a>
+<span class="sourceLineNo">1096</span>      allScanners.add(scanner);<a name="line.1096"></a>
+<span class="sourceLineNo">1097</span>    return allScanners;<a name="line.1097"></a>
+<span class="sourceLineNo">1098</span>  }<a name="line.1098"></a>
+<span class="sourceLineNo">1099</span><a name="line.1099"></a>
+<span class="sourceLineNo">1100</span>  static void enableLazySeekGlobally(boolean enable) {<a name="line.1100"></a>
+<span class="sourceLineNo">1101</span>    lazySeekEnabledGlobally = enable;<a name="line.1101"></a>
+<span class="sourceLineNo">1102</span>  }<a name="line.1102"></a>
+<span class="sourceLineNo">1103</span><a name="line.1103"></a>
+<span class="sourceLineNo">1104</span>  /**<a name="line.1104"></a>
+<span class="sourceLineNo">1105</span>   * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).<a name="line.1105"></a>
+<span class="sourceLineNo">1106</span>   */<a name="line.1106"></a>
+<span class="sourceLineNo">1107</span>  public long getEstimatedNumberOfKvsScanned() {<a name="line.1107"></a>
+<span class="sourceLineNo">1108</span>    return this.kvsScanned;<a name="line.1108"></a>
+<span class="sourceLineNo">1109</span>  }<a name="line.1109"></a>
+<span class="sourceLineNo">1110</span><a name="line.1110"></a>
+<span class="sourceLineNo">1111</span>  @Override<a name="line.1111"></a>
+<span class="sourceLineNo">1112</span>  public Cell getNextIndexedKey() {<a name="line.1112"></a>
+<span class="sourceLineNo">1113</span>    return this.heap.getNextIndexedKey();<a name="line.1113"></a>
+<span class="sourceLineNo">1114</span>  }<a name="line.1114"></a>
+<span class="sourceLineNo">1115</span><a name="line.1115"></a>
+<span class="sourceLineNo">1116</span>  @Override<a name="line.1116"></a>
+<span class="sourceLineNo">1117</span>  public void shipped() throws IOException {<a name="line.1117"></a>
+<span class="sourceLineNo">1118</span>    if (prevCell != null) {<a name="line.1118"></a>
+<span class="sourceLineNo">1119</span>      // Do the copy here so that in case the prevCell ref is pointing to the previous<a name="line.1119"></a>
+<span class="sourceLineNo">1120</span>      // blocks we can safely release those blocks.<a name="line.1120"></a>
+<span class="sourceLineNo">1121</span>      // This applies to blocks that are got from Bucket cache, L1 cache and the blocks<a name="line.1121"></a>
+<span class="sourceLineNo">1122</span>      // fetched from HDFS. Copying this would ensure that we let go the references to these<a name="line.1122"></a>
+<span class="sourceLineNo">1123</span>      // blocks so that they can be GCed safely(in case of bucket cache)<a name="line.1123"></a>
+<span class="sourceLineNo">1124</span>      prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);<a name="line.1124"></a>
+<span class="sourceLineNo">1125</span>    }<a name="line.1125"></a>
+<span class="sourceLineNo">1126</span>    matcher.beforeShipped();<a name="line.1126"></a>
+<span class="sourceLineNo">1127</span>    // There wont be further fetch of Cells from these scanners. Just close.<a name="line.1127"></a>
+<span class="sourceLineNo">1128</span>    clearAndClose(scannersForDelayedClose);<a name="line.1128"></a>
+<span class="sourceLineNo">1129</span>    if (this.heap != null) {<a name="line.1129"></a>
+<span class="sourceLineNo">1130</span>      this.heap.shipped();<a name="line.1130"></a>
+<span class="sourceLineNo">1131</span>      // When switching from pread to stream, we will open a new scanner for each store file, but<a name="line.1131"></a>
+<span class="sourceLineNo">1132</span>      // the old scanner may still track the HFileBlocks we have scanned but not sent back to client<a name="line.1132"></a>
+<span class="sourceLineNo">1133</span>      // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others<a name="line.1133"></a>
+<span class="sourceLineNo">1134</span>      // before we serialize and send it back to client. The HFileBlocks will be released in shipped<a name="line.1134"></a>
+<span class="sourceLineNo">1135</span>      // method, so we here will also open new scanners and close old scanners in shipped method.<a name="line.1135"></a>
+<span class="sourceLineNo">1136</span>      // See HBASE-18055 for more details.<a name="line.1136"></a>
+<span class="sourceLineNo">1137</span>      trySwitchToStreamRead();<a name="line.1137"></a>
+<span class="sourceLineNo">1138</span>    }<a name="line.1138"></a>
+<span class="sourceLineNo">1139</span>  }<a name="line.1139"></a>
+<span class="sourceLineNo">1140</span>}<a name="line.1140"></a>
+<span class="sourceLineNo">1141</span><a name="line.1141"></a>
 
 
 

http://git-wip-us.apache.org/repos/asf/hbase-site/blob/e9db7c5d/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreScanner.html
----------------------------------------------------------------------
diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreScanner.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreScanner.html
index b2b7ff9..b32645e 100644
--- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreScanner.html
+++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreScanner.html
@@ -974,185 +974,179 @@
 <span class="sourceLineNo">966</span>    return heap.reseek(kv);<a name="line.966"></a>
 <span class="sourceLineNo">967</span>  }<a name="line.967"></a>
 <span class="sourceLineNo">968</span><a name="line.968"></a>
-<span class="sourceLineNo">969</span>  private void trySwitchToStreamRead() {<a name="line.969"></a>
-<span class="sourceLineNo">970</span>    if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null ||<a name="line.970"></a>
-<span class="sourceLineNo">971</span>        bytesRead &lt; preadMaxBytes) {<a name="line.971"></a>
-<span class="sourceLineNo">972</span>      return;<a name="line.972"></a>
-<span class="sourceLineNo">973</span>    }<a name="line.973"></a>
-<span class="sourceLineNo">974</span>    if (LOG.isDebugEnabled()) {<a name="line.974"></a>
-<span class="sourceLineNo">975</span>      LOG.debug("Switch to stream read because we have already read " + bytesRead +<a name="line.975"></a>
-<span class="sourceLineNo">976</span>          " bytes from this scanner");<a name="line.976"></a>
-<span class="sourceLineNo">977</span>    }<a name="line.977"></a>
-<span class="sourceLineNo">978</span>    scanUsePread = false;<a name="line.978"></a>
-<span class="sourceLineNo">979</span>    Cell lastTop = heap.peek();<a name="line.979"></a>
-<span class="sourceLineNo">980</span>    Map&lt;String, StoreFile&gt; name2File = new HashMap&lt;&gt;(store.getStorefilesCount());<a name="line.980"></a>
-<span class="sourceLineNo">981</span>    for (StoreFile file : store.getStorefiles()) {<a name="line.981"></a>
-<span class="sourceLineNo">982</span>      name2File.put(file.getFileInfo().getActiveFileName(), file);<a name="line.982"></a>
-<span class="sourceLineNo">983</span>    }<a name="line.983"></a>
-<span class="sourceLineNo">984</span>    List&lt;StoreFile&gt; filesToReopen = new ArrayList&lt;&gt;();<a name="line.984"></a>
-<span class="sourceLineNo">985</span>    List&lt;KeyValueScanner&gt; memstoreScanners = new ArrayList&lt;&gt;();<a name="line.985"></a>
-<span class="sourceLineNo">986</span>    List&lt;KeyValueScanner&gt; scannersToClose = new ArrayList&lt;&gt;();<a name="line.986"></a>
-<span class="sourceLineNo">987</span>    for (KeyValueScanner kvs : currentScanners) {<a name="line.987"></a>
-<span class="sourceLineNo">988</span>      if (!kvs.isFileScanner()) {<a name="line.988"></a>
-<span class="sourceLineNo">989</span>        memstoreScanners.add(kvs);<a name="line.989"></a>
-<span class="sourceLineNo">990</span>      } else {<a name="line.990"></a>
-<span class="sourceLineNo">991</span>        scannersToClose.add(kvs);<a name="line.991"></a>
-<span class="sourceLineNo">992</span>        if (kvs.peek() == null) {<a name="line.992"></a>
-<span class="sourceLineNo">993</span>          continue;<a name="line.993"></a>
-<span class="sourceLineNo">994</span>        }<a name="line.994"></a>
-<span class="sourceLineNo">995</span>        filesToReopen.add(name2File.get(kvs.getFilePath().getName()));<a name="line.995"></a>
-<span class="sourceLineNo">996</span>      }<a name="line.996"></a>
-<span class="sourceLineNo">997</span>    }<a name="line.997"></a>
-<span class="sourceLineNo">998</span>    if (filesToReopen.isEmpty()) {<a name="line.998"></a>
-<span class="sourceLineNo">999</span>      return;<a name="line.999"></a>
-<span class="sourceLineNo">1000</span>    }<a name="line.1000"></a>
-<span class="sourceLineNo">1001</span>    List&lt;KeyValueScanner&gt; fileScanners = null;<a name="line.1001"></a>
-<span class="sourceLineNo">1002</span>    List&lt;KeyValueScanner&gt; newCurrentScanners;<a name="line.1002"></a>
-<span class="sourceLineNo">1003</span>    KeyValueHeap newHeap;<a name="line.1003"></a>
-<span class="sourceLineNo">1004</span>    try {<a name="line.1004"></a>
-<span class="sourceLineNo">1005</span>      fileScanners =<a name="line.1005"></a>
-<span class="sourceLineNo">1006</span>          store.getScanners(filesToReopen, cacheBlocks, false, false, matcher, scan.getStartRow(),<a name="line.1006"></a>
-<span class="sourceLineNo">1007</span>            scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), readPt, false);<a name="line.1007"></a>
-<span class="sourceLineNo">1008</span>      seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);<a name="line.1008"></a>
-<span class="sourceLineNo">1009</span>      newCurrentScanners = new ArrayList&lt;&gt;(fileScanners.size() + memstoreScanners.size());<a name="line.1009"></a>
-<span class="sourceLineNo">1010</span>      newCurrentScanners.addAll(fileScanners);<a name="line.1010"></a>
-<span class="sourceLineNo">1011</span>      newCurrentScanners.addAll(memstoreScanners);<a name="line.1011"></a>
-<span class="sourceLineNo">1012</span>      newHeap = new KeyValueHeap(newCurrentScanners, store.getComparator());<a name="line.1012"></a>
-<span class="sourceLineNo">1013</span>    } catch (Exception e) {<a name="line.1013"></a>
-<span class="sourceLineNo">1014</span>      LOG.warn("failed to switch to stream read", e);<a name="line.1014"></a>
-<span class="sourceLineNo">1015</span>      if (fileScanners != null) {<a name="line.1015"></a>
-<span class="sourceLineNo">1016</span>        fileScanners.forEach(KeyValueScanner::close);<a name="line.1016"></a>
-<span class="sourceLineNo">1017</span>      }<a name="line.1017"></a>
-<span class="sourceLineNo">1018</span>      return;<a name="line.1018"></a>
-<span class="sourceLineNo">1019</span>    }<a name="line.1019"></a>
-<span class="sourceLineNo">1020</span>    currentScanners.clear();<a name="line.1020"></a>
-<span class="sourceLineNo">1021</span>    addCurrentScanners(newCurrentScanners);<a name="line.1021"></a>
-<span class="sourceLineNo">1022</span>    this.heap = newHeap;<a name="line.1022"></a>
-<span class="sourceLineNo">1023</span>    resetQueryMatcher(lastTop);<a name="line.1023"></a>
-<span class="sourceLineNo">1024</span>    scannersToClose.forEach(KeyValueScanner::close);<a name="line.1024"></a>
-<span class="sourceLineNo">1025</span>  }<a name="line.1025"></a>
-<span class="sourceLineNo">1026</span><a name="line.1026"></a>
-<span class="sourceLineNo">1027</span>  protected final boolean checkFlushed() {<a name="line.1027"></a>
-<span class="sourceLineNo">1028</span>    // check the var without any lock. Suppose even if we see the old<a name="line.1028"></a>
-<span class="sourceLineNo">1029</span>    // value here still it is ok to continue because we will not be resetting<a name="line.1029"></a>
-<span class="sourceLineNo">1030</span>    // the heap but will continue with the referenced memstore's snapshot. For compactions<a name="line.1030"></a>
-<span class="sourceLineNo">1031</span>    // any way we don't need the updateReaders at all to happen as we still continue with<a name="line.1031"></a>
-<span class="sourceLineNo">1032</span>    // the older files<a name="line.1032"></a>
-<span class="sourceLineNo">1033</span>    if (flushed) {<a name="line.1033"></a>
-<span class="sourceLineNo">1034</span>      // If there is a flush and the current scan is notified on the flush ensure that the<a name="line.1034"></a>
-<span class="sourceLineNo">1035</span>      // scan's heap gets reset and we do a seek on the newly flushed file.<a name="line.1035"></a>
-<span class="sourceLineNo">1036</span>      if (this.closing) {<a name="line.1036"></a>
-<span class="sourceLineNo">1037</span>        return false;<a name="line.1037"></a>
-<span class="sourceLineNo">1038</span>      }<a name="line.1038"></a>
-<span class="sourceLineNo">1039</span>      // reset the flag<a name="line.1039"></a>
-<span class="sourceLineNo">1040</span>      flushed = false;<a name="line.1040"></a>
-<span class="sourceLineNo">1041</span>      return true;<a name="line.1041"></a>
-<span class="sourceLineNo">1042</span>    }<a name="line.1042"></a>
-<span class="sourceLineNo">1043</span>    return false;<a name="line.1043"></a>
-<span class="sourceLineNo">1044</span>  }<a name="line.1044"></a>
-<span class="sourceLineNo">1045</span><a name="line.1045"></a>
-<span class="sourceLineNo">1046</span>  /**<a name="line.1046"></a>
-<span class="sourceLineNo">1047</span>   * @see KeyValueScanner#getScannerOrder()<a name="line.1047"></a>
-<span class="sourceLineNo">1048</span>   */<a name="line.1048"></a>
-<span class="sourceLineNo">1049</span>  @Override<a name="line.1049"></a>
-<span class="sourceLineNo">1050</span>  public long getScannerOrder() {<a name="line.1050"></a>
-<span class="sourceLineNo">1051</span>    return 0;<a name="line.1051"></a>
-<span class="sourceLineNo">1052</span>  }<a name="line.1052"></a>
-<span class="sourceLineNo">1053</span><a name="line.1053"></a>
-<span class="sourceLineNo">1054</span>  /**<a name="line.1054"></a>
-<span class="sourceLineNo">1055</span>   * Seek storefiles in parallel to optimize IO latency as much as possible<a name="line.1055"></a>
-<span class="sourceLineNo">1056</span>   * @param scanners the list {@link KeyValueScanner}s to be read from<a name="line.1056"></a>
-<span class="sourceLineNo">1057</span>   * @param kv the KeyValue on which the operation is being requested<a name="line.1057"></a>
-<span class="sourceLineNo">1058</span>   * @throws IOException<a name="line.1058"></a>
-<span class="sourceLineNo">1059</span>   */<a name="line.1059"></a>
-<span class="sourceLineNo">1060</span>  private void parallelSeek(final List&lt;? extends KeyValueScanner&gt;<a name="line.1060"></a>
-<span class="sourceLineNo">1061</span>      scanners, final Cell kv) throws IOException {<a name="line.1061"></a>
-<span class="sourceLineNo">1062</span>    if (scanners.isEmpty()) return;<a name="line.1062"></a>
-<span class="sourceLineNo">1063</span>    int storeFileScannerCount = scanners.size();<a name="line.1063"></a>
-<span class="sourceLineNo">1064</span>    CountDownLatch latch = new CountDownLatch(storeFileScannerCount);<a name="line.1064"></a>
-<span class="sourceLineNo">1065</span>    List&lt;ParallelSeekHandler&gt; handlers = new ArrayList&lt;&gt;(storeFileScannerCount);<a name="line.1065"></a>
-<span class="sourceLineNo">1066</span>    for (KeyValueScanner scanner : scanners) {<a name="line.1066"></a>
-<span class="sourceLineNo">1067</span>      if (scanner instanceof StoreFileScanner) {<a name="line.1067"></a>
-<span class="sourceLineNo">1068</span>        ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,<a name="line.1068"></a>
-<span class="sourceLineNo">1069</span>          this.readPt, latch);<a name="line.1069"></a>
-<span class="sourceLineNo">1070</span>        executor.submit(seekHandler);<a name="line.1070"></a>
-<span class="sourceLineNo">1071</span>        handlers.add(seekHandler);<a name="line.1071"></a>
-<span class="sourceLineNo">1072</span>      } else {<a name="line.1072"></a>
-<span class="sourceLineNo">1073</span>        scanner.seek(kv);<a name="line.1073"></a>
-<span class="sourceLineNo">1074</span>        latch.countDown();<a name="line.1074"></a>
-<span class="sourceLineNo">1075</span>      }<a name="line.1075"></a>
+<span class="sourceLineNo">969</span>  @VisibleForTesting<a name="line.969"></a>
+<span class="sourceLineNo">970</span>  void trySwitchToStreamRead() {<a name="line.970"></a>
+<span class="sourceLineNo">971</span>    if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null ||<a name="line.971"></a>
+<span class="sourceLineNo">972</span>        bytesRead &lt; preadMaxBytes) {<a name="line.972"></a>
+<span class="sourceLineNo">973</span>      return;<a name="line.973"></a>
+<span class="sourceLineNo">974</span>    }<a name="line.974"></a>
+<span class="sourceLineNo">975</span>    if (LOG.isDebugEnabled()) {<a name="line.975"></a>
+<span class="sourceLineNo">976</span>      LOG.debug("Switch to stream read because we have already read " + bytesRead +<a name="line.976"></a>
+<span class="sourceLineNo">977</span>          " bytes from this scanner");<a name="line.977"></a>
+<span class="sourceLineNo">978</span>    }<a name="line.978"></a>
+<span class="sourceLineNo">979</span>    scanUsePread = false;<a name="line.979"></a>
+<span class="sourceLineNo">980</span>    Cell lastTop = heap.peek();<a name="line.980"></a>
+<span class="sourceLineNo">981</span>    List&lt;KeyValueScanner&gt; memstoreScanners = new ArrayList&lt;&gt;();<a name="line.981"></a>
+<span class="sourceLineNo">982</span>    List&lt;KeyValueScanner&gt; scannersToClose = new ArrayList&lt;&gt;();<a name="line.982"></a>
+<span class="sourceLineNo">983</span>    for (KeyValueScanner kvs : currentScanners) {<a name="line.983"></a>
+<span class="sourceLineNo">984</span>      if (!kvs.isFileScanner()) {<a name="line.984"></a>
+<span class="sourceLineNo">985</span>        // collect memstorescanners here<a name="line.985"></a>
+<span class="sourceLineNo">986</span>        memstoreScanners.add(kvs);<a name="line.986"></a>
+<span class="sourceLineNo">987</span>      } else {<a name="line.987"></a>
+<span class="sourceLineNo">988</span>        scannersToClose.add(kvs);<a name="line.988"></a>
+<span class="sourceLineNo">989</span>      }<a name="line.989"></a>
+<span class="sourceLineNo">990</span>    }<a name="line.990"></a>
+<span class="sourceLineNo">991</span>    List&lt;KeyValueScanner&gt; fileScanners = null;<a name="line.991"></a>
+<span class="sourceLineNo">992</span>    List&lt;KeyValueScanner&gt; newCurrentScanners;<a name="line.992"></a>
+<span class="sourceLineNo">993</span>    KeyValueHeap newHeap;<a name="line.993"></a>
+<span class="sourceLineNo">994</span>    try {<a name="line.994"></a>
+<span class="sourceLineNo">995</span>      // recreate the scanners on the current file scanners<a name="line.995"></a>
+<span class="sourceLineNo">996</span>      fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false,<a name="line.996"></a>
+<span class="sourceLineNo">997</span>        matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(),<a name="line.997"></a>
+<span class="sourceLineNo">998</span>        scan.includeStopRow(), readPt, false);<a name="line.998"></a>
+<span class="sourceLineNo">999</span>      if (fileScanners == null) {<a name="line.999"></a>
+<span class="sourceLineNo">1000</span>        return;<a name="line.1000"></a>
+<span class="sourceLineNo">1001</span>      }<a name="line.1001"></a>
+<span class="sourceLineNo">1002</span>      seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);<a name="line.1002"></a>
+<span class="sourceLineNo">1003</span>      newCurrentScanners = new ArrayList&lt;&gt;(fileScanners.size() + memstoreScanners.size());<a name="line.1003"></a>
+<span class="sourceLineNo">1004</span>      newCurrentScanners.addAll(fileScanners);<a name="line.1004"></a>
+<span class="sourceLineNo">1005</span>      newCurrentScanners.addAll(memstoreScanners);<a name="line.1005"></a>
+<span class="sourceLineNo">1006</span>      newHeap = new KeyValueHeap(newCurrentScanners, store.getComparator());<a name="line.1006"></a>
+<span class="sourceLineNo">1007</span>    } catch (Exception e) {<a name="line.1007"></a>
+<span class="sourceLineNo">1008</span>      LOG.warn("failed to switch to stream read", e);<a name="line.1008"></a>
+<span class="sourceLineNo">1009</span>      if (fileScanners != null) {<a name="line.1009"></a>
+<span class="sourceLineNo">1010</span>        fileScanners.forEach(KeyValueScanner::close);<a name="line.1010"></a>
+<span class="sourceLineNo">1011</span>      }<a name="line.1011"></a>
+<span class="sourceLineNo">1012</span>      return;<a name="line.1012"></a>
+<span class="sourceLineNo">1013</span>    }<a name="line.1013"></a>
+<span class="sourceLineNo">1014</span>    currentScanners.clear();<a name="line.1014"></a>
+<span class="sourceLineNo">1015</span>    addCurrentScanners(newCurrentScanners);<a name="line.1015"></a>
+<span class="sourceLineNo">1016</span>    this.heap = newHeap;<a name="line.1016"></a>
+<span class="sourceLineNo">1017</span>    resetQueryMatcher(lastTop);<a name="line.1017"></a>
+<span class="sourceLineNo">1018</span>    scannersToClose.forEach(KeyValueScanner::close);<a name="line.1018"></a>
+<span class="sourceLineNo">1019</span>  }<a name="line.1019"></a>
+<span class="sourceLineNo">1020</span><a name="line.1020"></a>
+<span class="sourceLineNo">1021</span>  protected final boolean checkFlushed() {<a name="line.1021"></a>
+<span class="sourceLineNo">1022</span>    // check the var without any lock. Suppose even if we see the old<a name="line.1022"></a>
+<span class="sourceLineNo">1023</span>    // value here still it is ok to continue because we will not be resetting<a name="line.1023"></a>
+<span class="sourceLineNo">1024</span>    // the heap but will continue with the referenced memstore's snapshot. For compactions<a name="line.1024"></a>
+<span class="sourceLineNo">1025</span>    // any way we don't need the updateReaders at all to happen as we still continue with<a name="line.1025"></a>
+<span class="sourceLineNo">1026</span>    // the older files<a name="line.1026"></a>
+<span class="sourceLineNo">1027</span>    if (flushed) {<a name="line.1027"></a>
+<span class="sourceLineNo">1028</span>      // If there is a flush and the current scan is notified on the flush ensure that the<a name="line.1028"></a>
+<span class="sourceLineNo">1029</span>      // scan's heap gets reset and we do a seek on the newly flushed file.<a name="line.1029"></a>
+<span class="sourceLineNo">1030</span>      if (this.closing) {<a name="line.1030"></a>
+<span class="sourceLineNo">1031</span>        return false;<a name="line.1031"></a>
+<span class="sourceLineNo">1032</span>      }<a name="line.1032"></a>
+<span class="sourceLineNo">1033</span>      // reset the flag<a name="line.1033"></a>
+<span class="sourceLineNo">1034</span>      flushed = false;<a name="line.1034"></a>
+<span class="sourceLineNo">1035</span>      return true;<a name="line.1035"></a>
+<span class="sourceLineNo">1036</span>    }<a name="line.1036"></a>
+<span class="sourceLineNo">1037</span>    return false;<a name="line.1037"></a>
+<span class="sourceLineNo">1038</span>  }<a name="line.1038"></a>
+<span class="sourceLineNo">1039</span><a name="line.1039"></a>
+<span class="sourceLineNo">1040</span>  /**<a name="line.1040"></a>
+<span class="sourceLineNo">1041</span>   * @see KeyValueScanner#getScannerOrder()<a name="line.1041"></a>
+<span class="sourceLineNo">1042</span>   */<a name="line.1042"></a>
+<span class="sourceLineNo">1043</span>  @Override<a name="line.1043"></a>
+<span class="sourceLineNo">1044</span>  public long getScannerOrder() {<a name="line.1044"></a>
+<span class="sourceLineNo">1045</span>    return 0;<a name="line.1045"></a>
+<span class="sourceLineNo">1046</span>  }<a name="line.1046"></a>
+<span class="sourceLineNo">1047</span><a name="line.1047"></a>
+<span class="sourceLineNo">1048</span>  /**<a name="line.1048"></a>
+<span class="sourceLineNo">1049</span>   * Seek storefiles in parallel to optimize IO latency as much as possible<a name="line.1049"></a>
+<span class="sourceLineNo">1050</span>   * @param scanners the list {@link KeyValueScanner}s to be read from<a name="line.1050"></a>
+<span class="sourceLineNo">1051</span>   * @param kv the KeyValue on which the operation is being requested<a name="line.1051"></a>
+<span class="sourceLineNo">1052</span>   * @throws IOException<a name="line.1052"></a>
+<span class="sourceLineNo">1053</span>   */<a name="line.1053"></a>
+<span class="sourceLineNo">1054</span>  private void parallelSeek(final List&lt;? extends KeyValueScanner&gt;<a name="line.1054"></a>
+<span class="sourceLineNo">1055</span>      scanners, final Cell kv) throws IOException {<a name="line.1055"></a>
+<span class="sourceLineNo">1056</span>    if (scanners.isEmpty()) return;<a name="line.1056"></a>
+<span class="sourceLineNo">1057</span>    int storeFileScannerCount = scanners.size();<a name="line.1057"></a>
+<span class="sourceLineNo">1058</span>    CountDownLatch latch = new CountDownLatch(storeFileScannerCount);<a name="line.1058"></a>
+<span class="sourceLineNo">1059</span>    List&lt;ParallelSeekHandler&gt; handlers = new ArrayList&lt;&gt;(storeFileScannerCount);<a name="line.1059"></a>
+<span class="sourceLineNo">1060</span>    for (KeyValueScanner scanner : scanners) {<a name="line.1060"></a>
+<span class="sourceLineNo">1061</span>      if (scanner instanceof StoreFileScanner) {<a name="line.1061"></a>
+<span class="sourceLineNo">1062</span>        ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,<a name="line.1062"></a>
+<span class="sourceLineNo">1063</span>          this.readPt, latch);<a name="line.1063"></a>
+<span class="sourceLineNo">1064</span>        executor.submit(seekHandler);<a name="line.1064"></a>
+<span class="sourceLineNo">1065</span>        handlers.add(seekHandler);<a name="line.1065"></a>
+<span class="sourceLineNo">1066</span>      } else {<a name="line.1066"></a>
+<span class="sourceLineNo">1067</span>        scanner.seek(kv);<a name="line.1067"></a>
+<span class="sourceLineNo">1068</span>        latch.countDown();<a name="line.1068"></a>
+<span class="sourceLineNo">1069</span>      }<a name="line.1069"></a>
+<span class="sourceLineNo">1070</span>    }<a name="line.1070"></a>
+<span class="sourceLineNo">1071</span><a name="line.1071"></a>
+<span class="sourceLineNo">1072</span>    try {<a name="line.1072"></a>
+<span class="sourceLineNo">1073</span>      latch.await();<a name="line.1073"></a>
+<span class="sourceLineNo">1074</span>    } catch (InterruptedException ie) {<a name="line.1074"></a>
+<span class="sourceLineNo">1075</span>      throw (InterruptedIOException)new InterruptedIOException().initCause(ie);<a name="line.1075"></a>
 <span class="sourceLineNo">1076</span>    }<a name="line.1076"></a>
 <span class="sourceLineNo">1077</span><a name="line.1077"></a>
-<span class="sourceLineNo">1078</span>    try {<a name="line.1078"></a>
-<span class="sourceLineNo">1079</span>      latch.await();<a name="line.1079"></a>
-<span class="sourceLineNo">1080</span>    } catch (InterruptedException ie) {<a name="line.1080"></a>
-<span class="sourceLineNo">1081</span>      throw (InterruptedIOException)new InterruptedIOException().initCause(ie);<a name="line.1081"></a>
+<span class="sourceLineNo">1078</span>    for (ParallelSeekHandler handler : handlers) {<a name="line.1078"></a>
+<span class="sourceLineNo">1079</span>      if (handler.getErr() != null) {<a name="line.1079"></a>
+<span class="sourceLineNo">1080</span>        throw new IOException(handler.getErr());<a name="line.1080"></a>
+<span class="sourceLineNo">1081</span>      }<a name="line.1081"></a>
 <span class="sourceLineNo">1082</span>    }<a name="line.1082"></a>
-<span class="sourceLineNo">1083</span><a name="line.1083"></a>
-<span class="sourceLineNo">1084</span>    for (ParallelSeekHandler handler : handlers) {<a name="line.1084"></a>
-<span class="sourceLineNo">1085</span>      if (handler.getErr() != null) {<a name="line.1085"></a>
-<span class="sourceLineNo">1086</span>        throw new IOException(handler.getErr());<a name="line.1086"></a>
-<span class="sourceLineNo">1087</span>      }<a name="line.1087"></a>
-<span class="sourceLineNo">1088</span>    }<a name="line.1088"></a>
-<span class="sourceLineNo">1089</span>  }<a name="line.1089"></a>
-<span class="sourceLineNo">1090</span><a name="line.1090"></a>
-<span class="sourceLineNo">1091</span>  /**<a name="line.1091"></a>
-<span class="sourceLineNo">1092</span>   * Used in testing.<a name="line.1092"></a>
-<span class="sourceLineNo">1093</span>   * @return all scanners in no particular order<a name="line.1093"></a>
-<span class="sourceLineNo">1094</span>   */<a name="line.1094"></a>
-<span class="sourceLineNo">1095</span>  @VisibleForTesting<a name="line.1095"></a>
-<span class="sourceLineNo">1096</span>  List&lt;KeyValueScanner&gt; getAllScannersForTesting() {<a name="line.1096"></a>
-<span class="sourceLineNo">1097</span>    List&lt;KeyValueScanner&gt; allScanners = new ArrayList&lt;&gt;();<a name="line.1097"></a>
-<span class="sourceLineNo">1098</span>    KeyValueScanner current = heap.getCurrentForTesting();<a name="line.1098"></a>
-<span class="sourceLineNo">1099</span>    if (current != null)<a name="line.1099"></a>
-<span class="sourceLineNo">1100</span>      allScanners.add(current);<a name="line.1100"></a>
-<span class="sourceLineNo">1101</span>    for (KeyValueScanner scanner : heap.getHeap())<a name="line.1101"></a>
-<span class="sourceLineNo">1102</span>      allScanners.add(scanner);<a name="line.1102"></a>
-<span class="sourceLineNo">1103</span>    return allScanners;<a name="line.1103"></a>
-<span class="sourceLineNo">1104</span>  }<a name="line.1104"></a>
-<span class="sourceLineNo">1105</span><a name="line.1105"></a>
-<span class="sourceLineNo">1106</span>  static void enableLazySeekGlobally(boolean enable) {<a name="line.1106"></a>
-<span class="sourceLineNo">1107</span>    lazySeekEnabledGlobally = enable;<a name="line.1107"></a>
-<span class="sourceLineNo">1108</span>  }<a name="line.1108"></a>
-<span class="sourceLineNo">1109</span><a name="line.1109"></a>
-<span class="sourceLineNo">1110</span>  /**<a name="line.1110"></a>
-<span class="sourceLineNo">1111</span>   * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).<a name="line.1111"></a>
-<span class="sourceLineNo">1112</span>   */<a name="line.1112"></a>
-<span class="sourceLineNo">1113</span>  public long getEstimatedNumberOfKvsScanned() {<a name="line.1113"></a>
-<span class="sourceLineNo">1114</span>    return this.kvsScanned;<a name="line.1114"></a>
-<span class="sourceLineNo">1115</span>  }<a name="line.1115"></a>
-<span class="sourceLineNo">1116</span><a name="line.1116"></a>
-<span class="sourceLineNo">1117</span>  @Override<a name="line.1117"></a>
-<span class="sourceLineNo">1118</span>  public Cell getNextIndexedKey() {<a name="line.1118"></a>
-<span class="sourceLineNo">1119</span>    return this.heap.getNextIndexedKey();<a name="line.1119"></a>
-<span class="sourceLineNo">1120</span>  }<a name="line.1120"></a>
-<span class="sourceLineNo">1121</span><a name="line.1121"></a>
-<span class="sourceLineNo">1122</span>  @Override<a name="line.1122"></a>
-<span class="sourceLineNo">1123</span>  public void shipped() throws IOException {<a name="line.1123"></a>
-<span class="sourceLineNo">1124</span>    if (prevCell != null) {<a name="line.1124"></a>
-<span class="sourceLineNo">1125</span>      // Do the copy here so that in case the prevCell ref is pointing to the previous<a name="line.1125"></a>
-<span class="sourceLineNo">1126</span>      // blocks we can safely release those blocks.<a name="line.1126"></a>
-<span class="sourceLineNo">1127</span>      // This applies to blocks that are got from Bucket cache, L1 cache and the blocks<a name="line.1127"></a>
-<span class="sourceLineNo">1128</span>      // fetched from HDFS. Copying this would ensure that we let go the references to these<a name="line.1128"></a>
-<span class="sourceLineNo">1129</span>      // blocks so that they can be GCed safely(in case of bucket cache)<a name="line.1129"></a>
-<span class="sourceLineNo">1130</span>      prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);<a name="line.1130"></a>
-<span class="sourceLineNo">1131</span>    }<a name="line.1131"></a>
-<span class="sourceLineNo">1132</span>    matcher.beforeShipped();<a name="line.1132"></a>
-<span class="sourceLineNo">1133</span>    // There wont be further fetch of Cells from these scanners. Just close.<a name="line.1133"></a>
-<span class="sourceLineNo">1134</span>    clearAndClose(scannersForDelayedClose);<a name="line.1134"></a>
-<span class="sourceLineNo">1135</span>    if (this.heap != null) {<a name="line.1135"></a>
-<span class="sourceLineNo">1136</span>      this.heap.shipped();<a name="line.1136"></a>
-<span class="sourceLineNo">1137</span>      // When switching from pread to stream, we will open a new scanner for each store file, but<a name="line.1137"></a>
-<span class="sourceLineNo">1138</span>      // the old scanner may still track the HFileBlocks we have scanned but not sent back to client<a name="line.1138"></a>
-<span class="sourceLineNo">1139</span>      // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others<a name="line.1139"></a>
-<span class="sourceLineNo">1140</span>      // before we serialize and send it back to client. The HFileBlocks will be released in shipped<a name="line.1140"></a>
-<span class="sourceLineNo">1141</span>      // method, so we here will also open new scanners and close old scanners in shipped method.<a name="line.1141"></a>
-<span class="sourceLineNo">1142</span>      // See HBASE-18055 for more details.<a name="line.1142"></a>
-<span class="sourceLineNo">1143</span>      trySwitchToStreamRead();<a name="line.1143"></a>
-<span class="sourceLineNo">1144</span>    }<a name="line.1144"></a>
-<span class="sourceLineNo">1145</span>  }<a name="line.1145"></a>
-<span class="sourceLineNo">1146</span>}<a name="line.1146"></a>
-<span class="sourceLineNo">1147</span><a name="line.1147"></a>
+<span class="sourceLineNo">1083</span>  }<a name="line.1083"></a>
+<span class="sourceLineNo">1084</span><a name="line.1084"></a>
+<span class="sourceLineNo">1085</span>  /**<a name="line.1085"></a>
+<span class="sourceLineNo">1086</span>   * Used in testing.<a name="line.1086"></a>
+<span class="sourceLineNo">1087</span>   * @return all scanners in no particular order<a name="line.1087"></a>
+<span class="sourceLineNo">1088</span>   */<a name="line.1088"></a>
+<span class="sourceLineNo">1089</span>  @VisibleForTesting<a name="line.1089"></a>
+<span class="sourceLineNo">1090</span>  List&lt;KeyValueScanner&gt; getAllScannersForTesting() {<a name="line.1090"></a>
+<span class="sourceLineNo">1091</span>    List&lt;KeyValueScanner&gt; allScanners = new ArrayList&lt;&gt;();<a name="line.1091"></a>
+<span class="sourceLineNo">1092</span>    KeyValueScanner current = heap.getCurrentForTesting();<a name="line.1092"></a>
+<span class="sourceLineNo">1093</span>    if (current != null)<a name="line.1093"></a>
+<span class="sourceLineNo">1094</span>      allScanners.add(current);<a name="line.1094"></a>
+<span class="sourceLineNo">1095</span>    for (KeyValueScanner scanner : heap.getHeap())<a name="line.1095"></a>
+<span class="sourceLineNo">1096</span>      allScanners.add(scanner);<a name="line.1096"></a>
+<span class="sourceLineNo">1097</span>    return allScanners;<a name="line.1097"></a>
+<span class="sourceLineNo">1098</span>  }<a name="line.1098"></a>
+<span class="sourceLineNo">1099</span><a name="line.1099"></a>
+<span class="sourceLineNo">1100</span>  static void enableLazySeekGlobally(boolean enable) {<a name="line.1100"></a>
+<span class="sourceLineNo">1101</span>    lazySeekEnabledGlobally = enable;<a name="line.1101"></a>
+<span class="sourceLineNo">1102</span>  }<a name="line.1102"></a>
+<span class="sourceLineNo">1103</span><a name="line.1103"></a>
+<span class="sourceLineNo">1104</span>  /**<a name="line.1104"></a>
+<span class="sourceLineNo">1105</span>   * @return The estimated number of KVs seen by this scanner (includes some skipped KVs).<a name="line.1105"></a>
+<span class="sourceLineNo">1106</span>   */<a name="line.1106"></a>
+<span class="sourceLineNo">1107</span>  public long getEstimatedNumberOfKvsScanned() {<a name="line.1107"></a>
+<span class="sourceLineNo">1108</span>    return this.kvsScanned;<a name="line.1108"></a>
+<span class="sourceLineNo">1109</span>  }<a name="line.1109"></a>
+<span class="sourceLineNo">1110</span><a name="line.1110"></a>
+<span class="sourceLineNo">1111</span>  @Override<a name="line.1111"></a>
+<span class="sourceLineNo">1112</span>  public Cell getNextIndexedKey() {<a name="line.1112"></a>
+<span class="sourceLineNo">1113</span>    return this.heap.getNextIndexedKey();<a name="line.1113"></a>
+<span class="sourceLineNo">1114</span>  }<a name="line.1114"></a>
+<span class="sourceLineNo">1115</span><a name="line.1115"></a>
+<span class="sourceLineNo">1116</span>  @Override<a name="line.1116"></a>
+<span class="sourceLineNo">1117</span>  public void shipped() throws IOException {<a name="line.1117"></a>
+<span class="sourceLineNo">1118</span>    if (prevCell != null) {<a name="line.1118"></a>
+<span class="sourceLineNo">1119</span>      // Do the copy here so that in case the prevCell ref is pointing to the previous<a name="line.1119"></a>
+<span class="sourceLineNo">1120</span>      // blocks we can safely release those blocks.<a name="line.1120"></a>
+<span class="sourceLineNo">1121</span>      // This applies to blocks that are got from Bucket cache, L1 cache and the blocks<a name="line.1121"></a>
+<span class="sourceLineNo">1122</span>      // fetched from HDFS. Copying this would ensure that we let go the references to these<a name="line.1122"></a>
+<span class="sourceLineNo">1123</span>      // blocks so that they can be GCed safely(in case of bucket cache)<a name="line.1123"></a>
+<span class="sourceLineNo">1124</span>      prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);<a name="line.1124"></a>
+<span class="sourceLineNo">1125</span>    }<a name="line.1125"></a>
+<span class="sourceLineNo">1126</span>    matcher.beforeShipped();<a name="line.1126"></a>
+<span class="sourceLineNo">1127</span>    // There wont be further fetch of Cells from these scanners. Just close.<a name="line.1127"></a>
+<span class="sourceLineNo">1128</span>    clearAndClose(scannersForDelayedClose);<a name="line.1128"></a>
+<span class="sourceLineNo">1129</span>    if (this.heap != null) {<a name="line.1129"></a>
+<span class="sourceLineNo">1130</span>      this.heap.shipped();<a name="line.1130"></a>
+<span class="sourceLineNo">1131</span>      // When switching from pread to stream, we will open a new scanner for each store file, but<a name="line.1131"></a>
+<span class="sourceLineNo">1132</span>      // the old scanner may still track the HFileBlocks we have scanned but not sent back to client<a name="line.1132"></a>
+<span class="sourceLineNo">1133</span>      // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others<a name="line.1133"></a>
+<span class="sourceLineNo">1134</span>      // before we serialize and send it back to client. The HFileBlocks will be released in shipped<a name="line.1134"></a>
+<span class="sourceLineNo">1135</span>      // method, so we here will also open new scanners and close old scanners in shipped method.<a name="line.1135"></a>
+<span class="sourceLineNo">1136</span>      // See HBASE-18055 for more details.<a name="line.1136"></a>
+<span class="sourceLineNo">1137</span>      trySwitchToStreamRead();<a name="line.1137"></a>
+<span class="sourceLineNo">1138</span>    }<a name="line.1138"></a>
+<span class="sourceLineNo">1139</span>  }<a name="line.1139"></a>
+<span class="sourceLineNo">1140</span>}<a name="line.1140"></a>
+<span class="sourceLineNo">1141</span><a name="line.1141"></a>
 
 
 


Mime
View raw message