hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mi...@apache.org
Subject [28/51] [partial] hbase-site git commit: Published site at f6945c4631e7697976fd8c2272f8152905c6f875.
Date Thu, 10 Mar 2016 17:23:11 GMT
http://git-wip-us.apache.org/repos/asf/hbase-site/blob/5eb82203/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RegionScannerImpl.html
----------------------------------------------------------------------
diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RegionScannerImpl.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RegionScannerImpl.html
index 57b645b..8c624dc 100644
--- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RegionScannerImpl.html
+++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RegionScannerImpl.html
@@ -5305,2629 +5305,2632 @@
 <span class="sourceLineNo">5297</span>    Preconditions.checkNotNull(familyPaths);<a name="line.5297"></a>
 <span class="sourceLineNo">5298</span>    // we need writeLock for multi-family bulk load<a name="line.5298"></a>
 <span class="sourceLineNo">5299</span>    startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));<a name="line.5299"></a>
-<span class="sourceLineNo">5300</span>    try {<a name="line.5300"></a>
-<span class="sourceLineNo">5301</span>      this.writeRequestsCount.increment();<a name="line.5301"></a>
-<span class="sourceLineNo">5302</span><a name="line.5302"></a>
-<span class="sourceLineNo">5303</span>      // There possibly was a split that happened between when the split keys<a name="line.5303"></a>
-<span class="sourceLineNo">5304</span>      // were gathered and before the HRegion's write lock was taken.  We need<a name="line.5304"></a>
-<span class="sourceLineNo">5305</span>      // to validate the HFile region before attempting to bulk load all of them<a name="line.5305"></a>
-<span class="sourceLineNo">5306</span>      List&lt;IOException&gt; ioes = new ArrayList&lt;IOException&gt;();<a name="line.5306"></a>
-<span class="sourceLineNo">5307</span>      List&lt;Pair&lt;byte[], String&gt;&gt; failures = new ArrayList&lt;Pair&lt;byte[], String&gt;&gt;();<a name="line.5307"></a>
-<span class="sourceLineNo">5308</span>      for (Pair&lt;byte[], String&gt; p : familyPaths) {<a name="line.5308"></a>
-<span class="sourceLineNo">5309</span>        byte[] familyName = p.getFirst();<a name="line.5309"></a>
-<span class="sourceLineNo">5310</span>        String path = p.getSecond();<a name="line.5310"></a>
-<span class="sourceLineNo">5311</span><a name="line.5311"></a>
-<span class="sourceLineNo">5312</span>        Store store = getStore(familyName);<a name="line.5312"></a>
-<span class="sourceLineNo">5313</span>        if (store == null) {<a name="line.5313"></a>
-<span class="sourceLineNo">5314</span>          IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(<a name="line.5314"></a>
-<span class="sourceLineNo">5315</span>              "No such column family " + Bytes.toStringBinary(familyName));<a name="line.5315"></a>
-<span class="sourceLineNo">5316</span>          ioes.add(ioe);<a name="line.5316"></a>
-<span class="sourceLineNo">5317</span>        } else {<a name="line.5317"></a>
-<span class="sourceLineNo">5318</span>          try {<a name="line.5318"></a>
-<span class="sourceLineNo">5319</span>            store.assertBulkLoadHFileOk(new Path(path));<a name="line.5319"></a>
-<span class="sourceLineNo">5320</span>          } catch (WrongRegionException wre) {<a name="line.5320"></a>
-<span class="sourceLineNo">5321</span>            // recoverable (file doesn't fit in region)<a name="line.5321"></a>
-<span class="sourceLineNo">5322</span>            failures.add(p);<a name="line.5322"></a>
-<span class="sourceLineNo">5323</span>          } catch (IOException ioe) {<a name="line.5323"></a>
-<span class="sourceLineNo">5324</span>            // unrecoverable (hdfs problem)<a name="line.5324"></a>
-<span class="sourceLineNo">5325</span>            ioes.add(ioe);<a name="line.5325"></a>
-<span class="sourceLineNo">5326</span>          }<a name="line.5326"></a>
-<span class="sourceLineNo">5327</span>        }<a name="line.5327"></a>
-<span class="sourceLineNo">5328</span>      }<a name="line.5328"></a>
-<span class="sourceLineNo">5329</span><a name="line.5329"></a>
-<span class="sourceLineNo">5330</span>      // validation failed because of some sort of IO problem.<a name="line.5330"></a>
-<span class="sourceLineNo">5331</span>      if (ioes.size() != 0) {<a name="line.5331"></a>
-<span class="sourceLineNo">5332</span>        IOException e = MultipleIOException.createIOException(ioes);<a name="line.5332"></a>
-<span class="sourceLineNo">5333</span>        LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);<a name="line.5333"></a>
-<span class="sourceLineNo">5334</span>        throw e;<a name="line.5334"></a>
-<span class="sourceLineNo">5335</span>      }<a name="line.5335"></a>
-<span class="sourceLineNo">5336</span><a name="line.5336"></a>
-<span class="sourceLineNo">5337</span>      // validation failed, bail out before doing anything permanent.<a name="line.5337"></a>
-<span class="sourceLineNo">5338</span>      if (failures.size() != 0) {<a name="line.5338"></a>
-<span class="sourceLineNo">5339</span>        StringBuilder list = new StringBuilder();<a name="line.5339"></a>
-<span class="sourceLineNo">5340</span>        for (Pair&lt;byte[], String&gt; p : failures) {<a name="line.5340"></a>
-<span class="sourceLineNo">5341</span>          list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")<a name="line.5341"></a>
-<span class="sourceLineNo">5342</span>              .append(p.getSecond());<a name="line.5342"></a>
-<span class="sourceLineNo">5343</span>        }<a name="line.5343"></a>
-<span class="sourceLineNo">5344</span>        // problem when validating<a name="line.5344"></a>
-<span class="sourceLineNo">5345</span>        LOG.warn("There was a recoverable bulk load failure likely due to a" +<a name="line.5345"></a>
-<span class="sourceLineNo">5346</span>            " split.  These (family, HFile) pairs were not loaded: " + list);<a name="line.5346"></a>
-<span class="sourceLineNo">5347</span>        return false;<a name="line.5347"></a>
-<span class="sourceLineNo">5348</span>      }<a name="line.5348"></a>
-<span class="sourceLineNo">5349</span><a name="line.5349"></a>
-<span class="sourceLineNo">5350</span>      // We need to assign a sequential ID that's in between two memstores in order to preserve<a name="line.5350"></a>
-<span class="sourceLineNo">5351</span>      // the guarantee that all the edits lower than the highest sequential ID from all the<a name="line.5351"></a>
-<span class="sourceLineNo">5352</span>      // HFiles are flushed on disk. See HBASE-10958.  The sequence id returned when we flush is<a name="line.5352"></a>
-<span class="sourceLineNo">5353</span>      // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is<a name="line.5353"></a>
-<span class="sourceLineNo">5354</span>      // a sequence id that we can be sure is beyond the last hfile written).<a name="line.5354"></a>
-<span class="sourceLineNo">5355</span>      if (assignSeqId) {<a name="line.5355"></a>
-<span class="sourceLineNo">5356</span>        FlushResult fs = flushcache(true, false);<a name="line.5356"></a>
-<span class="sourceLineNo">5357</span>        if (fs.isFlushSucceeded()) {<a name="line.5357"></a>
-<span class="sourceLineNo">5358</span>          seqId = ((FlushResultImpl)fs).flushSequenceId;<a name="line.5358"></a>
-<span class="sourceLineNo">5359</span>        } else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {<a name="line.5359"></a>
-<span class="sourceLineNo">5360</span>          seqId = ((FlushResultImpl)fs).flushSequenceId;<a name="line.5360"></a>
-<span class="sourceLineNo">5361</span>        } else {<a name="line.5361"></a>
-<span class="sourceLineNo">5362</span>          throw new IOException("Could not bulk load with an assigned sequential ID because the "+<a name="line.5362"></a>
-<span class="sourceLineNo">5363</span>            "flush didn't run. Reason for not flushing: " + ((FlushResultImpl)fs).failureReason);<a name="line.5363"></a>
-<span class="sourceLineNo">5364</span>        }<a name="line.5364"></a>
-<span class="sourceLineNo">5365</span>      }<a name="line.5365"></a>
-<span class="sourceLineNo">5366</span><a name="line.5366"></a>
-<span class="sourceLineNo">5367</span>      for (Pair&lt;byte[], String&gt; p : familyPaths) {<a name="line.5367"></a>
-<span class="sourceLineNo">5368</span>        byte[] familyName = p.getFirst();<a name="line.5368"></a>
-<span class="sourceLineNo">5369</span>        String path = p.getSecond();<a name="line.5369"></a>
-<span class="sourceLineNo">5370</span>        Store store = getStore(familyName);<a name="line.5370"></a>
-<span class="sourceLineNo">5371</span>        try {<a name="line.5371"></a>
-<span class="sourceLineNo">5372</span>          String finalPath = path;<a name="line.5372"></a>
-<span class="sourceLineNo">5373</span>          if (bulkLoadListener != null) {<a name="line.5373"></a>
-<span class="sourceLineNo">5374</span>            finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);<a name="line.5374"></a>
-<span class="sourceLineNo">5375</span>          }<a name="line.5375"></a>
-<span class="sourceLineNo">5376</span>          Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);<a name="line.5376"></a>
-<span class="sourceLineNo">5377</span><a name="line.5377"></a>
-<span class="sourceLineNo">5378</span>          if(storeFiles.containsKey(familyName)) {<a name="line.5378"></a>
-<span class="sourceLineNo">5379</span>            storeFiles.get(familyName).add(commitedStoreFile);<a name="line.5379"></a>
-<span class="sourceLineNo">5380</span>          } else {<a name="line.5380"></a>
-<span class="sourceLineNo">5381</span>            List&lt;Path&gt; storeFileNames = new ArrayList&lt;Path&gt;();<a name="line.5381"></a>
-<span class="sourceLineNo">5382</span>            storeFileNames.add(commitedStoreFile);<a name="line.5382"></a>
-<span class="sourceLineNo">5383</span>            storeFiles.put(familyName, storeFileNames);<a name="line.5383"></a>
-<span class="sourceLineNo">5384</span>          }<a name="line.5384"></a>
-<span class="sourceLineNo">5385</span>          if (bulkLoadListener != null) {<a name="line.5385"></a>
-<span class="sourceLineNo">5386</span>            bulkLoadListener.doneBulkLoad(familyName, path);<a name="line.5386"></a>
-<span class="sourceLineNo">5387</span>          }<a name="line.5387"></a>
-<span class="sourceLineNo">5388</span>        } catch (IOException ioe) {<a name="line.5388"></a>
-<span class="sourceLineNo">5389</span>          // A failure here can cause an atomicity violation that we currently<a name="line.5389"></a>
-<span class="sourceLineNo">5390</span>          // cannot recover from since it is likely a failed HDFS operation.<a name="line.5390"></a>
-<span class="sourceLineNo">5391</span><a name="line.5391"></a>
-<span class="sourceLineNo">5392</span>          // TODO Need a better story for reverting partial failures due to HDFS.<a name="line.5392"></a>
-<span class="sourceLineNo">5393</span>          LOG.error("There was a partial failure due to IO when attempting to" +<a name="line.5393"></a>
-<span class="sourceLineNo">5394</span>              " load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);<a name="line.5394"></a>
-<span class="sourceLineNo">5395</span>          if (bulkLoadListener != null) {<a name="line.5395"></a>
-<span class="sourceLineNo">5396</span>            try {<a name="line.5396"></a>
-<span class="sourceLineNo">5397</span>              bulkLoadListener.failedBulkLoad(familyName, path);<a name="line.5397"></a>
-<span class="sourceLineNo">5398</span>            } catch (Exception ex) {<a name="line.5398"></a>
-<span class="sourceLineNo">5399</span>              LOG.error("Error while calling failedBulkLoad for family " +<a name="line.5399"></a>
-<span class="sourceLineNo">5400</span>                  Bytes.toString(familyName) + " with path " + path, ex);<a name="line.5400"></a>
-<span class="sourceLineNo">5401</span>            }<a name="line.5401"></a>
-<span class="sourceLineNo">5402</span>          }<a name="line.5402"></a>
-<span class="sourceLineNo">5403</span>          throw ioe;<a name="line.5403"></a>
-<span class="sourceLineNo">5404</span>        }<a name="line.5404"></a>
-<span class="sourceLineNo">5405</span>      }<a name="line.5405"></a>
-<span class="sourceLineNo">5406</span><a name="line.5406"></a>
-<span class="sourceLineNo">5407</span>      return true;<a name="line.5407"></a>
-<span class="sourceLineNo">5408</span>    } finally {<a name="line.5408"></a>
-<span class="sourceLineNo">5409</span>      if (wal != null &amp;&amp; !storeFiles.isEmpty()) {<a name="line.5409"></a>
-<span class="sourceLineNo">5410</span>        // @rite a bulk load event when not all hfiles are loaded<a name="line.5410"></a>
-<span class="sourceLineNo">5411</span>        try {<a name="line.5411"></a>
-<span class="sourceLineNo">5412</span>          WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(<a name="line.5412"></a>
-<span class="sourceLineNo">5413</span>              this.getRegionInfo().getTable(),<a name="line.5413"></a>
-<span class="sourceLineNo">5414</span>              ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);<a name="line.5414"></a>
-<span class="sourceLineNo">5415</span>          WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),<a name="line.5415"></a>
-<span class="sourceLineNo">5416</span>              loadDescriptor, mvcc);<a name="line.5416"></a>
-<span class="sourceLineNo">5417</span>        } catch (IOException ioe) {<a name="line.5417"></a>
-<span class="sourceLineNo">5418</span>          if (this.rsServices != null) {<a name="line.5418"></a>
-<span class="sourceLineNo">5419</span>            // Have to abort region server because some hfiles has been loaded but we can't write<a name="line.5419"></a>
-<span class="sourceLineNo">5420</span>            // the event into WAL<a name="line.5420"></a>
-<span class="sourceLineNo">5421</span>            this.rsServices.abort("Failed to write bulk load event into WAL.", ioe);<a name="line.5421"></a>
-<span class="sourceLineNo">5422</span>          }<a name="line.5422"></a>
-<span class="sourceLineNo">5423</span>        }<a name="line.5423"></a>
-<span class="sourceLineNo">5424</span>      }<a name="line.5424"></a>
-<span class="sourceLineNo">5425</span><a name="line.5425"></a>
-<span class="sourceLineNo">5426</span>      closeBulkRegionOperation();<a name="line.5426"></a>
-<span class="sourceLineNo">5427</span>    }<a name="line.5427"></a>
-<span class="sourceLineNo">5428</span>  }<a name="line.5428"></a>
-<span class="sourceLineNo">5429</span><a name="line.5429"></a>
-<span class="sourceLineNo">5430</span>  @Override<a name="line.5430"></a>
-<span class="sourceLineNo">5431</span>  public boolean equals(Object o) {<a name="line.5431"></a>
-<span class="sourceLineNo">5432</span>    return o instanceof HRegion &amp;&amp; Bytes.equals(getRegionInfo().getRegionName(),<a name="line.5432"></a>
-<span class="sourceLineNo">5433</span>                                                ((HRegion) o).getRegionInfo().getRegionName());<a name="line.5433"></a>
-<span class="sourceLineNo">5434</span>  }<a name="line.5434"></a>
-<span class="sourceLineNo">5435</span><a name="line.5435"></a>
-<span class="sourceLineNo">5436</span>  @Override<a name="line.5436"></a>
-<span class="sourceLineNo">5437</span>  public int hashCode() {<a name="line.5437"></a>
-<span class="sourceLineNo">5438</span>    return Bytes.hashCode(getRegionInfo().getRegionName());<a name="line.5438"></a>
-<span class="sourceLineNo">5439</span>  }<a name="line.5439"></a>
-<span class="sourceLineNo">5440</span><a name="line.5440"></a>
-<span class="sourceLineNo">5441</span>  @Override<a name="line.5441"></a>
-<span class="sourceLineNo">5442</span>  public String toString() {<a name="line.5442"></a>
-<span class="sourceLineNo">5443</span>    return getRegionInfo().getRegionNameAsString();<a name="line.5443"></a>
-<span class="sourceLineNo">5444</span>  }<a name="line.5444"></a>
-<span class="sourceLineNo">5445</span><a name="line.5445"></a>
-<span class="sourceLineNo">5446</span>  /**<a name="line.5446"></a>
-<span class="sourceLineNo">5447</span>   * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).<a name="line.5447"></a>
-<span class="sourceLineNo">5448</span>   */<a name="line.5448"></a>
-<span class="sourceLineNo">5449</span>  class RegionScannerImpl implements RegionScanner, org.apache.hadoop.hbase.ipc.RpcCallback {<a name="line.5449"></a>
-<span class="sourceLineNo">5450</span>    // Package local for testability<a name="line.5450"></a>
-<span class="sourceLineNo">5451</span>    KeyValueHeap storeHeap = null;<a name="line.5451"></a>
-<span class="sourceLineNo">5452</span>    /** Heap of key-values that are not essential for the provided filters and are thus read<a name="line.5452"></a>
-<span class="sourceLineNo">5453</span>     * on demand, if on-demand column family loading is enabled.*/<a name="line.5453"></a>
-<span class="sourceLineNo">5454</span>    KeyValueHeap joinedHeap = null;<a name="line.5454"></a>
-<span class="sourceLineNo">5455</span>    /**<a name="line.5455"></a>
-<span class="sourceLineNo">5456</span>     * If the joined heap data gathering is interrupted due to scan limits, this will<a name="line.5456"></a>
-<span class="sourceLineNo">5457</span>     * contain the row for which we are populating the values.*/<a name="line.5457"></a>
-<span class="sourceLineNo">5458</span>    protected Cell joinedContinuationRow = null;<a name="line.5458"></a>
-<span class="sourceLineNo">5459</span>    private boolean filterClosed = false;<a name="line.5459"></a>
-<span class="sourceLineNo">5460</span><a name="line.5460"></a>
-<span class="sourceLineNo">5461</span>    protected final int isScan;<a name="line.5461"></a>
-<span class="sourceLineNo">5462</span>    protected final byte[] stopRow;<a name="line.5462"></a>
-<span class="sourceLineNo">5463</span>    protected final HRegion region;<a name="line.5463"></a>
-<span class="sourceLineNo">5464</span>    protected final CellComparator comparator;<a name="line.5464"></a>
-<span class="sourceLineNo">5465</span>    protected boolean copyCellsFromSharedMem = false;<a name="line.5465"></a>
-<span class="sourceLineNo">5466</span><a name="line.5466"></a>
-<span class="sourceLineNo">5467</span>    private final long readPt;<a name="line.5467"></a>
-<span class="sourceLineNo">5468</span>    private final long maxResultSize;<a name="line.5468"></a>
-<span class="sourceLineNo">5469</span>    private final ScannerContext defaultScannerContext;<a name="line.5469"></a>
-<span class="sourceLineNo">5470</span>    private final FilterWrapper filter;<a name="line.5470"></a>
-<span class="sourceLineNo">5471</span><a name="line.5471"></a>
-<span class="sourceLineNo">5472</span>    @Override<a name="line.5472"></a>
-<span class="sourceLineNo">5473</span>    public HRegionInfo getRegionInfo() {<a name="line.5473"></a>
-<span class="sourceLineNo">5474</span>      return region.getRegionInfo();<a name="line.5474"></a>
-<span class="sourceLineNo">5475</span>    }<a name="line.5475"></a>
-<span class="sourceLineNo">5476</span><a name="line.5476"></a>
-<span class="sourceLineNo">5477</span>    public void setCopyCellsFromSharedMem(boolean copyCells) {<a name="line.5477"></a>
-<span class="sourceLineNo">5478</span>      this.copyCellsFromSharedMem = copyCells;<a name="line.5478"></a>
-<span class="sourceLineNo">5479</span>    }<a name="line.5479"></a>
-<span class="sourceLineNo">5480</span><a name="line.5480"></a>
-<span class="sourceLineNo">5481</span>    RegionScannerImpl(Scan scan, List&lt;KeyValueScanner&gt; additionalScanners, HRegion region,<a name="line.5481"></a>
-<span class="sourceLineNo">5482</span>        boolean copyCellsFromSharedMem)<a name="line.5482"></a>
-<span class="sourceLineNo">5483</span>        throws IOException {<a name="line.5483"></a>
-<span class="sourceLineNo">5484</span>      this.region = region;<a name="line.5484"></a>
-<span class="sourceLineNo">5485</span>      this.maxResultSize = scan.getMaxResultSize();<a name="line.5485"></a>
-<span class="sourceLineNo">5486</span>      if (scan.hasFilter()) {<a name="line.5486"></a>
-<span class="sourceLineNo">5487</span>        this.filter = new FilterWrapper(scan.getFilter());<a name="line.5487"></a>
-<span class="sourceLineNo">5488</span>      } else {<a name="line.5488"></a>
-<span class="sourceLineNo">5489</span>        this.filter = null;<a name="line.5489"></a>
-<span class="sourceLineNo">5490</span>      }<a name="line.5490"></a>
-<span class="sourceLineNo">5491</span>      this.comparator = region.getCellCompartor();<a name="line.5491"></a>
-<span class="sourceLineNo">5492</span>      /**<a name="line.5492"></a>
-<span class="sourceLineNo">5493</span>       * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default<a name="line.5493"></a>
-<span class="sourceLineNo">5494</span>       * scanner context that can be used to enforce the batch limit in the event that a<a name="line.5494"></a>
-<span class="sourceLineNo">5495</span>       * ScannerContext is not specified during an invocation of next/nextRaw<a name="line.5495"></a>
-<span class="sourceLineNo">5496</span>       */<a name="line.5496"></a>
-<span class="sourceLineNo">5497</span>      defaultScannerContext = ScannerContext.newBuilder()<a name="line.5497"></a>
-<span class="sourceLineNo">5498</span>          .setBatchLimit(scan.getBatch()).build();<a name="line.5498"></a>
-<span class="sourceLineNo">5499</span><a name="line.5499"></a>
-<span class="sourceLineNo">5500</span>      if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) &amp;&amp; !scan.isGetScan()) {<a name="line.5500"></a>
-<span class="sourceLineNo">5501</span>        this.stopRow = null;<a name="line.5501"></a>
-<span class="sourceLineNo">5502</span>      } else {<a name="line.5502"></a>
-<span class="sourceLineNo">5503</span>        this.stopRow = scan.getStopRow();<a name="line.5503"></a>
-<span class="sourceLineNo">5504</span>      }<a name="line.5504"></a>
-<span class="sourceLineNo">5505</span>      // If we are doing a get, we want to be [startRow,endRow]. Normally<a name="line.5505"></a>
-<span class="sourceLineNo">5506</span>      // it is [startRow,endRow) and if startRow=endRow we get nothing.<a name="line.5506"></a>
-<span class="sourceLineNo">5507</span>      this.isScan = scan.isGetScan() ? 1 : 0;<a name="line.5507"></a>
-<span class="sourceLineNo">5508</span><a name="line.5508"></a>
-<span class="sourceLineNo">5509</span>      // synchronize on scannerReadPoints so that nobody calculates<a name="line.5509"></a>
-<span class="sourceLineNo">5510</span>      // getSmallestReadPoint, before scannerReadPoints is updated.<a name="line.5510"></a>
-<span class="sourceLineNo">5511</span>      IsolationLevel isolationLevel = scan.getIsolationLevel();<a name="line.5511"></a>
-<span class="sourceLineNo">5512</span>      synchronized(scannerReadPoints) {<a name="line.5512"></a>
-<span class="sourceLineNo">5513</span>        this.readPt = getReadPoint(isolationLevel);<a name="line.5513"></a>
-<span class="sourceLineNo">5514</span>        scannerReadPoints.put(this, this.readPt);<a name="line.5514"></a>
-<span class="sourceLineNo">5515</span>      }<a name="line.5515"></a>
-<span class="sourceLineNo">5516</span><a name="line.5516"></a>
-<span class="sourceLineNo">5517</span>      // Here we separate all scanners into two lists - scanner that provide data required<a name="line.5517"></a>
-<span class="sourceLineNo">5518</span>      // by the filter to operate (scanners list) and all others (joinedScanners list).<a name="line.5518"></a>
-<span class="sourceLineNo">5519</span>      List&lt;KeyValueScanner&gt; scanners = new ArrayList&lt;KeyValueScanner&gt;(scan.getFamilyMap().size());<a name="line.5519"></a>
-<span class="sourceLineNo">5520</span>      List&lt;KeyValueScanner&gt; joinedScanners<a name="line.5520"></a>
-<span class="sourceLineNo">5521</span>        = new ArrayList&lt;KeyValueScanner&gt;(scan.getFamilyMap().size());<a name="line.5521"></a>
-<span class="sourceLineNo">5522</span>      if (additionalScanners != null) {<a name="line.5522"></a>
-<span class="sourceLineNo">5523</span>        scanners.addAll(additionalScanners);<a name="line.5523"></a>
-<span class="sourceLineNo">5524</span>      }<a name="line.5524"></a>
-<span class="sourceLineNo">5525</span><a name="line.5525"></a>
-<span class="sourceLineNo">5526</span>      for (Map.Entry&lt;byte[], NavigableSet&lt;byte[]&gt;&gt; entry : scan.getFamilyMap().entrySet()) {<a name="line.5526"></a>
-<span class="sourceLineNo">5527</span>        Store store = stores.get(entry.getKey());<a name="line.5527"></a>
-<span class="sourceLineNo">5528</span>        KeyValueScanner scanner;<a name="line.5528"></a>
-<span class="sourceLineNo">5529</span>        try {<a name="line.5529"></a>
-<span class="sourceLineNo">5530</span>          scanner = store.getScanner(scan, entry.getValue(), this.readPt);<a name="line.5530"></a>
-<span class="sourceLineNo">5531</span>        } catch (FileNotFoundException e) {<a name="line.5531"></a>
-<span class="sourceLineNo">5532</span>          throw handleFileNotFound(e);<a name="line.5532"></a>
-<span class="sourceLineNo">5533</span>        }<a name="line.5533"></a>
-<span class="sourceLineNo">5534</span>        if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()<a name="line.5534"></a>
-<span class="sourceLineNo">5535</span>          || this.filter.isFamilyEssential(entry.getKey())) {<a name="line.5535"></a>
-<span class="sourceLineNo">5536</span>          scanners.add(scanner);<a name="line.5536"></a>
-<span class="sourceLineNo">5537</span>        } else {<a name="line.5537"></a>
-<span class="sourceLineNo">5538</span>          joinedScanners.add(scanner);<a name="line.5538"></a>
-<span class="sourceLineNo">5539</span>        }<a name="line.5539"></a>
-<span class="sourceLineNo">5540</span>      }<a name="line.5540"></a>
-<span class="sourceLineNo">5541</span>      this.copyCellsFromSharedMem = copyCellsFromSharedMem;<a name="line.5541"></a>
-<span class="sourceLineNo">5542</span>      initializeKVHeap(scanners, joinedScanners, region);<a name="line.5542"></a>
-<span class="sourceLineNo">5543</span>    }<a name="line.5543"></a>
-<span class="sourceLineNo">5544</span><a name="line.5544"></a>
-<span class="sourceLineNo">5545</span>    protected void initializeKVHeap(List&lt;KeyValueScanner&gt; scanners,<a name="line.5545"></a>
-<span class="sourceLineNo">5546</span>        List&lt;KeyValueScanner&gt; joinedScanners, HRegion region)<a name="line.5546"></a>
-<span class="sourceLineNo">5547</span>        throws IOException {<a name="line.5547"></a>
-<span class="sourceLineNo">5548</span>      this.storeHeap = new KeyValueHeap(scanners, comparator);<a name="line.5548"></a>
-<span class="sourceLineNo">5549</span>      if (!joinedScanners.isEmpty()) {<a name="line.5549"></a>
-<span class="sourceLineNo">5550</span>        this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);<a name="line.5550"></a>
-<span class="sourceLineNo">5551</span>      }<a name="line.5551"></a>
-<span class="sourceLineNo">5552</span>    }<a name="line.5552"></a>
-<span class="sourceLineNo">5553</span><a name="line.5553"></a>
-<span class="sourceLineNo">5554</span>    @Override<a name="line.5554"></a>
-<span class="sourceLineNo">5555</span>    public long getMaxResultSize() {<a name="line.5555"></a>
-<span class="sourceLineNo">5556</span>      return maxResultSize;<a name="line.5556"></a>
-<span class="sourceLineNo">5557</span>    }<a name="line.5557"></a>
-<span class="sourceLineNo">5558</span><a name="line.5558"></a>
-<span class="sourceLineNo">5559</span>    @Override<a name="line.5559"></a>
-<span class="sourceLineNo">5560</span>    public long getMvccReadPoint() {<a name="line.5560"></a>
-<span class="sourceLineNo">5561</span>      return this.readPt;<a name="line.5561"></a>
-<span class="sourceLineNo">5562</span>    }<a name="line.5562"></a>
-<span class="sourceLineNo">5563</span><a name="line.5563"></a>
-<span class="sourceLineNo">5564</span>    @Override<a name="line.5564"></a>
-<span class="sourceLineNo">5565</span>    public int getBatch() {<a name="line.5565"></a>
-<span class="sourceLineNo">5566</span>      return this.defaultScannerContext.getBatchLimit();<a name="line.5566"></a>
-<span class="sourceLineNo">5567</span>    }<a name="line.5567"></a>
-<span class="sourceLineNo">5568</span><a name="line.5568"></a>
-<span class="sourceLineNo">5569</span>    /**<a name="line.5569"></a>
-<span class="sourceLineNo">5570</span>     * Reset both the filter and the old filter.<a name="line.5570"></a>
-<span class="sourceLineNo">5571</span>     *<a name="line.5571"></a>
-<span class="sourceLineNo">5572</span>     * @throws IOException in case a filter raises an I/O exception.<a name="line.5572"></a>
-<span class="sourceLineNo">5573</span>     */<a name="line.5573"></a>
-<span class="sourceLineNo">5574</span>    protected void resetFilters() throws IOException {<a name="line.5574"></a>
-<span class="sourceLineNo">5575</span>      if (filter != null) {<a name="line.5575"></a>
-<span class="sourceLineNo">5576</span>        filter.reset();<a name="line.5576"></a>
-<span class="sourceLineNo">5577</span>      }<a name="line.5577"></a>
-<span class="sourceLineNo">5578</span>    }<a name="line.5578"></a>
-<span class="sourceLineNo">5579</span><a name="line.5579"></a>
-<span class="sourceLineNo">5580</span>    @Override<a name="line.5580"></a>
-<span class="sourceLineNo">5581</span>    public boolean next(List&lt;Cell&gt; outResults)<a name="line.5581"></a>
-<span class="sourceLineNo">5582</span>        throws IOException {<a name="line.5582"></a>
-<span class="sourceLineNo">5583</span>      // apply the batching limit by default<a name="line.5583"></a>
-<span class="sourceLineNo">5584</span>      return next(outResults, defaultScannerContext);<a name="line.5584"></a>
-<span class="sourceLineNo">5585</span>    }<a name="line.5585"></a>
-<span class="sourceLineNo">5586</span><a name="line.5586"></a>
-<span class="sourceLineNo">5587</span>    @Override<a name="line.5587"></a>
-<span class="sourceLineNo">5588</span>    public synchronized boolean next(List&lt;Cell&gt; outResults, ScannerContext scannerContext)<a name="line.5588"></a>
-<span class="sourceLineNo">5589</span>    throws IOException {<a name="line.5589"></a>
-<span class="sourceLineNo">5590</span>      if (this.filterClosed) {<a name="line.5590"></a>
-<span class="sourceLineNo">5591</span>        throw new UnknownScannerException("Scanner was closed (timed out?) " +<a name="line.5591"></a>
-<span class="sourceLineNo">5592</span>            "after we renewed it. Could be caused by a very slow scanner " +<a name="line.5592"></a>
-<span class="sourceLineNo">5593</span>            "or a lengthy garbage collection");<a name="line.5593"></a>
-<span class="sourceLineNo">5594</span>      }<a name="line.5594"></a>
-<span class="sourceLineNo">5595</span>      startRegionOperation(Operation.SCAN);<a name="line.5595"></a>
-<span class="sourceLineNo">5596</span>      readRequestsCount.increment();<a name="line.5596"></a>
-<span class="sourceLineNo">5597</span>      try {<a name="line.5597"></a>
-<span class="sourceLineNo">5598</span>        return nextRaw(outResults, scannerContext);<a name="line.5598"></a>
-<span class="sourceLineNo">5599</span>      } finally {<a name="line.5599"></a>
-<span class="sourceLineNo">5600</span>        closeRegionOperation(Operation.SCAN);<a name="line.5600"></a>
-<span class="sourceLineNo">5601</span>      }<a name="line.5601"></a>
-<span class="sourceLineNo">5602</span>    }<a name="line.5602"></a>
-<span class="sourceLineNo">5603</span><a name="line.5603"></a>
-<span class="sourceLineNo">5604</span>    @Override<a name="line.5604"></a>
-<span class="sourceLineNo">5605</span>    public boolean nextRaw(List&lt;Cell&gt; outResults) throws IOException {<a name="line.5605"></a>
-<span class="sourceLineNo">5606</span>      // Use the RegionScanner's context by default<a name="line.5606"></a>
-<span class="sourceLineNo">5607</span>      return nextRaw(outResults, defaultScannerContext);<a name="line.5607"></a>
-<span class="sourceLineNo">5608</span>    }<a name="line.5608"></a>
-<span class="sourceLineNo">5609</span><a name="line.5609"></a>
-<span class="sourceLineNo">5610</span>    @Override<a name="line.5610"></a>
-<span class="sourceLineNo">5611</span>    public boolean nextRaw(List&lt;Cell&gt; outResults, ScannerContext scannerContext)<a name="line.5611"></a>
-<span class="sourceLineNo">5612</span>        throws IOException {<a name="line.5612"></a>
-<span class="sourceLineNo">5613</span>      if (storeHeap == null) {<a name="line.5613"></a>
-<span class="sourceLineNo">5614</span>        // scanner is closed<a name="line.5614"></a>
-<span class="sourceLineNo">5615</span>        throw new UnknownScannerException("Scanner was closed");<a name="line.5615"></a>
-<span class="sourceLineNo">5616</span>      }<a name="line.5616"></a>
-<span class="sourceLineNo">5617</span>      boolean moreValues = false;<a name="line.5617"></a>
-<span class="sourceLineNo">5618</span>      try {<a name="line.5618"></a>
-<span class="sourceLineNo">5619</span>        if (outResults.isEmpty()) {<a name="line.5619"></a>
-<span class="sourceLineNo">5620</span>          // Usually outResults is empty. This is true when next is called<a name="line.5620"></a>
-<span class="sourceLineNo">5621</span>          // to handle scan or get operation.<a name="line.5621"></a>
-<span class="sourceLineNo">5622</span>          moreValues = nextInternal(outResults, scannerContext);<a name="line.5622"></a>
-<span class="sourceLineNo">5623</span>        } else {<a name="line.5623"></a>
-<span class="sourceLineNo">5624</span>          List&lt;Cell&gt; tmpList = new ArrayList&lt;Cell&gt;();<a name="line.5624"></a>
-<span class="sourceLineNo">5625</span>          moreValues = nextInternal(tmpList, scannerContext);<a name="line.5625"></a>
-<span class="sourceLineNo">5626</span>          outResults.addAll(tmpList);<a name="line.5626"></a>
-<span class="sourceLineNo">5627</span>        }<a name="line.5627"></a>
-<span class="sourceLineNo">5628</span><a name="line.5628"></a>
-<span class="sourceLineNo">5629</span>        // If the size limit was reached it means a partial Result is being<a name="line.5629"></a>
-<span class="sourceLineNo">5630</span>        // returned. Returning a<a name="line.5630"></a>
-<span class="sourceLineNo">5631</span>        // partial Result means that we should not reset the filters; filters<a name="line.5631"></a>
-<span class="sourceLineNo">5632</span>        // should only be reset in<a name="line.5632"></a>
-<span class="sourceLineNo">5633</span>        // between rows<a name="line.5633"></a>
-<span class="sourceLineNo">5634</span>        if (!scannerContext.partialResultFormed()) resetFilters();<a name="line.5634"></a>
-<span class="sourceLineNo">5635</span><a name="line.5635"></a>
-<span class="sourceLineNo">5636</span>        if (isFilterDoneInternal()) {<a name="line.5636"></a>
-<span class="sourceLineNo">5637</span>          moreValues = false;<a name="line.5637"></a>
-<span class="sourceLineNo">5638</span>        }<a name="line.5638"></a>
-<span class="sourceLineNo">5639</span><a name="line.5639"></a>
-<span class="sourceLineNo">5640</span>        // If copyCellsFromSharedMem = true, then we need to copy the cells. Otherwise<a name="line.5640"></a>
-<span class="sourceLineNo">5641</span>        // it is a call coming from the RsRpcServices.scan().<a name="line.5641"></a>
-<span class="sourceLineNo">5642</span>        if (copyCellsFromSharedMem &amp;&amp; !outResults.isEmpty()) {<a name="line.5642"></a>
-<span class="sourceLineNo">5643</span>          // Do the copy of the results here.<a name="line.5643"></a>
-<span class="sourceLineNo">5644</span>          ListIterator&lt;Cell&gt; listItr = outResults.listIterator();<a name="line.5644"></a>
-<span class="sourceLineNo">5645</span>          Cell cell = null;<a name="line.5645"></a>
-<span class="sourceLineNo">5646</span>          while (listItr.hasNext()) {<a name="line.5646"></a>
-<span class="sourceLineNo">5647</span>            cell = listItr.next();<a name="line.5647"></a>
-<span class="sourceLineNo">5648</span>            if (cell instanceof ShareableMemory) {<a name="line.5648"></a>
-<span class="sourceLineNo">5649</span>              listItr.set(((ShareableMemory) cell).cloneToCell());<a name="line.5649"></a>
-<span class="sourceLineNo">5650</span>            }<a name="line.5650"></a>
-<span class="sourceLineNo">5651</span>          }<a name="line.5651"></a>
-<span class="sourceLineNo">5652</span>        }<a name="line.5652"></a>
-<span class="sourceLineNo">5653</span>      } finally {<a name="line.5653"></a>
-<span class="sourceLineNo">5654</span>        if (copyCellsFromSharedMem) {<a name="line.5654"></a>
-<span class="sourceLineNo">5655</span>          // In case of copyCellsFromSharedMem==true (where the CPs wrap a scanner) we return<a name="line.5655"></a>
-<span class="sourceLineNo">5656</span>          // the blocks then and there (for wrapped CPs)<a name="line.5656"></a>
-<span class="sourceLineNo">5657</span>          this.shipped();<a name="line.5657"></a>
-<span class="sourceLineNo">5658</span>        }<a name="line.5658"></a>
-<span class="sourceLineNo">5659</span>      }<a name="line.5659"></a>
-<span class="sourceLineNo">5660</span>      return moreValues;<a name="line.5660"></a>
-<span class="sourceLineNo">5661</span>    }<a name="line.5661"></a>
-<span class="sourceLineNo">5662</span><a name="line.5662"></a>
-<span class="sourceLineNo">5663</span>    /**<a name="line.5663"></a>
-<span class="sourceLineNo">5664</span>     * @return true if more cells exist after this batch, false if scanner is done<a name="line.5664"></a>
-<span class="sourceLineNo">5665</span>     */<a name="line.5665"></a>
-<span class="sourceLineNo">5666</span>    private boolean populateFromJoinedHeap(List&lt;Cell&gt; results, ScannerContext scannerContext)<a name="line.5666"></a>
-<span class="sourceLineNo">5667</span>            throws IOException {<a name="line.5667"></a>
-<span class="sourceLineNo">5668</span>      assert joinedContinuationRow != null;<a name="line.5668"></a>
-<span class="sourceLineNo">5669</span>      boolean moreValues = populateResult(results, this.joinedHeap, scannerContext,<a name="line.5669"></a>
-<span class="sourceLineNo">5670</span>          joinedContinuationRow);<a name="line.5670"></a>
-<span class="sourceLineNo">5671</span><a name="line.5671"></a>
-<span class="sourceLineNo">5672</span>      if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {<a name="line.5672"></a>
-<span class="sourceLineNo">5673</span>        // We are done with this row, reset the continuation.<a name="line.5673"></a>
-<span class="sourceLineNo">5674</span>        joinedContinuationRow = null;<a name="line.5674"></a>
-<span class="sourceLineNo">5675</span>      }<a name="line.5675"></a>
-<span class="sourceLineNo">5676</span>      // As the data is obtained from two independent heaps, we need to<a name="line.5676"></a>
-<span class="sourceLineNo">5677</span>      // ensure that result list is sorted, because Result relies on that.<a name="line.5677"></a>
-<span class="sourceLineNo">5678</span>      sort(results, comparator);<a name="line.5678"></a>
-<span class="sourceLineNo">5679</span>      return moreValues;<a name="line.5679"></a>
-<span class="sourceLineNo">5680</span>    }<a name="line.5680"></a>
-<span class="sourceLineNo">5681</span><a name="line.5681"></a>
-<span class="sourceLineNo">5682</span>    /**<a name="line.5682"></a>
-<span class="sourceLineNo">5683</span>     * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is<a name="line.5683"></a>
-<span class="sourceLineNo">5684</span>     * reached, or remainingResultSize (if not -1) is reaced<a name="line.5684"></a>
-<span class="sourceLineNo">5685</span>     * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.<a name="line.5685"></a>
-<span class="sourceLineNo">5686</span>     * @param scannerContext<a name="line.5686"></a>
-<span class="sourceLineNo">5687</span>     * @param currentRowCell<a name="line.5687"></a>
-<span class="sourceLineNo">5688</span>     * @return state of last call to {@link KeyValueHeap#next()}<a name="line.5688"></a>
-<span class="sourceLineNo">5689</span>     */<a name="line.5689"></a>
-<span class="sourceLineNo">5690</span>    private boolean populateResult(List&lt;Cell&gt; results, KeyValueHeap heap,<a name="line.5690"></a>
-<span class="sourceLineNo">5691</span>        ScannerContext scannerContext, Cell currentRowCell) throws IOException {<a name="line.5691"></a>
-<span class="sourceLineNo">5692</span>      Cell nextKv;<a name="line.5692"></a>
-<span class="sourceLineNo">5693</span>      boolean moreCellsInRow = false;<a name="line.5693"></a>
-<span class="sourceLineNo">5694</span>      boolean tmpKeepProgress = scannerContext.getKeepProgress();<a name="line.5694"></a>
-<span class="sourceLineNo">5695</span>      // Scanning between column families and thus the scope is between cells<a name="line.5695"></a>
-<span class="sourceLineNo">5696</span>      LimitScope limitScope = LimitScope.BETWEEN_CELLS;<a name="line.5696"></a>
-<span class="sourceLineNo">5697</span>      try {<a name="line.5697"></a>
-<span class="sourceLineNo">5698</span>        do {<a name="line.5698"></a>
-<span class="sourceLineNo">5699</span>          // We want to maintain any progress that is made towards the limits while scanning across<a name="line.5699"></a>
-<span class="sourceLineNo">5700</span>          // different column families. To do this, we toggle the keep progress flag on during calls<a name="line.5700"></a>
-<span class="sourceLineNo">5701</span>          // to the StoreScanner to ensure that any progress made thus far is not wiped away.<a name="line.5701"></a>
-<span class="sourceLineNo">5702</span>          scannerContext.setKeepProgress(true);<a name="line.5702"></a>
-<span class="sourceLineNo">5703</span>          heap.next(results, scannerContext);<a name="line.5703"></a>
-<span class="sourceLineNo">5704</span>          scannerContext.setKeepProgress(tmpKeepProgress);<a name="line.5704"></a>
-<span class="sourceLineNo">5705</span><a name="line.5705"></a>
-<span class="sourceLineNo">5706</span>          nextKv = heap.peek();<a name="line.5706"></a>
-<span class="sourceLineNo">5707</span>          moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);<a name="line.5707"></a>
-<span class="sourceLineNo">5708</span>          if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);<a name="line.5708"></a>
-<span class="sourceLineNo">5709</span>          if (scannerContext.checkBatchLimit(limitScope)) {<a name="line.5709"></a>
-<span class="sourceLineNo">5710</span>            return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();<a name="line.5710"></a>
-<span class="sourceLineNo">5711</span>          } else if (scannerContext.checkSizeLimit(limitScope)) {<a name="line.5711"></a>
-<span class="sourceLineNo">5712</span>            ScannerContext.NextState state =<a name="line.5712"></a>
-<span class="sourceLineNo">5713</span>              moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED;<a name="line.5713"></a>
-<span class="sourceLineNo">5714</span>            return scannerContext.setScannerState(state).hasMoreValues();<a name="line.5714"></a>
-<span class="sourceLineNo">5715</span>          } else if (scannerContext.checkTimeLimit(limitScope)) {<a name="line.5715"></a>
-<span class="sourceLineNo">5716</span>            ScannerContext.NextState state =<a name="line.5716"></a>
-<span class="sourceLineNo">5717</span>              moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED;<a name="line.5717"></a>
-<span class="sourceLineNo">5718</span>            return scannerContext.setScannerState(state).hasMoreValues();<a name="line.5718"></a>
-<span class="sourceLineNo">5719</span>          }<a name="line.5719"></a>
-<span class="sourceLineNo">5720</span>        } while (moreCellsInRow);<a name="line.5720"></a>
-<span class="sourceLineNo">5721</span>      } catch (FileNotFoundException e) {<a name="line.5721"></a>
-<span class="sourceLineNo">5722</span>        throw handleFileNotFound(e);<a name="line.5722"></a>
-<span class="sourceLineNo">5723</span>      }<a name="line.5723"></a>
-<span class="sourceLineNo">5724</span>      return nextKv != null;<a name="line.5724"></a>
-<span class="sourceLineNo">5725</span>    }<a name="line.5725"></a>
-<span class="sourceLineNo">5726</span><a name="line.5726"></a>
-<span class="sourceLineNo">5727</span>    /**<a name="line.5727"></a>
-<span class="sourceLineNo">5728</span>     * Based on the nextKv in the heap, and the current row, decide whether or not there are more<a name="line.5728"></a>
-<span class="sourceLineNo">5729</span>     * cells to be read in the heap. If the row of the nextKv in the heap matches the current row<a name="line.5729"></a>
-<span class="sourceLineNo">5730</span>     * then there are more cells to be read in the row.<a name="line.5730"></a>
-<span class="sourceLineNo">5731</span>     * @param nextKv<a name="line.5731"></a>
-<span class="sourceLineNo">5732</span>     * @param currentRowCell<a name="line.5732"></a>
-<span class="sourceLineNo">5733</span>     * @return true When there are more cells in the row to be read<a name="line.5733"></a>
-<span class="sourceLineNo">5734</span>     */<a name="line.5734"></a>
-<span class="sourceLineNo">5735</span>    private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) {<a name="line.5735"></a>
-<span class="sourceLineNo">5736</span>      return nextKv != null &amp;&amp; CellUtil.matchingRow(nextKv, currentRowCell);<a name="line.5736"></a>
-<span class="sourceLineNo">5737</span>    }<a name="line.5737"></a>
-<span class="sourceLineNo">5738</span><a name="line.5738"></a>
-<span class="sourceLineNo">5739</span>    /*<a name="line.5739"></a>
-<span class="sourceLineNo">5740</span>     * @return True if a filter rules the scanner is over, done.<a name="line.5740"></a>
-<span class="sourceLineNo">5741</span>     */<a name="line.5741"></a>
-<span class="sourceLineNo">5742</span>    @Override<a name="line.5742"></a>
-<span class="sourceLineNo">5743</span>    public synchronized boolean isFilterDone() throws IOException {<a name="line.5743"></a>
-<span class="sourceLineNo">5744</span>      return isFilterDoneInternal();<a name="line.5744"></a>
-<span class="sourceLineNo">5745</span>    }<a name="line.5745"></a>
-<span class="sourceLineNo">5746</span><a name="line.5746"></a>
-<span class="sourceLineNo">5747</span>    private boolean isFilterDoneInternal() throws IOException {<a name="line.5747"></a>
-<span class="sourceLineNo">5748</span>      return this.filter != null &amp;&amp; this.filter.filterAllRemaining();<a name="line.5748"></a>
-<span class="sourceLineNo">5749</span>    }<a name="line.5749"></a>
-<span class="sourceLineNo">5750</span><a name="line.5750"></a>
-<span class="sourceLineNo">5751</span>    private boolean nextInternal(List&lt;Cell&gt; results, ScannerContext scannerContext)<a name="line.5751"></a>
-<span class="sourceLineNo">5752</span>        throws IOException {<a name="line.5752"></a>
-<span class="sourceLineNo">5753</span>      if (!results.isEmpty()) {<a name="line.5753"></a>
-<span class="sourceLineNo">5754</span>        throw new IllegalArgumentException("First parameter should be an empty list");<a name="line.5754"></a>
-<span class="sourceLineNo">5755</span>      }<a name="line.5755"></a>
-<span class="sourceLineNo">5756</span>      if (scannerContext == null) {<a name="line.5756"></a>
-<span class="sourceLineNo">5757</span>        throw new IllegalArgumentException("Scanner context cannot be null");<a name="line.5757"></a>
+<span class="sourceLineNo">5300</span>    boolean isSuccessful = false;<a name="line.5300"></a>
+<span class="sourceLineNo">5301</span>    try {<a name="line.5301"></a>
+<span class="sourceLineNo">5302</span>      this.writeRequestsCount.increment();<a name="line.5302"></a>
+<span class="sourceLineNo">5303</span><a name="line.5303"></a>
+<span class="sourceLineNo">5304</span>      // There possibly was a split that happened between when the split keys<a name="line.5304"></a>
+<span class="sourceLineNo">5305</span>      // were gathered and before the HRegion's write lock was taken.  We need<a name="line.5305"></a>
+<span class="sourceLineNo">5306</span>      // to validate the HFile region before attempting to bulk load all of them<a name="line.5306"></a>
+<span class="sourceLineNo">5307</span>      List&lt;IOException&gt; ioes = new ArrayList&lt;IOException&gt;();<a name="line.5307"></a>
+<span class="sourceLineNo">5308</span>      List&lt;Pair&lt;byte[], String&gt;&gt; failures = new ArrayList&lt;Pair&lt;byte[], String&gt;&gt;();<a name="line.5308"></a>
+<span class="sourceLineNo">5309</span>      for (Pair&lt;byte[], String&gt; p : familyPaths) {<a name="line.5309"></a>
+<span class="sourceLineNo">5310</span>        byte[] familyName = p.getFirst();<a name="line.5310"></a>
+<span class="sourceLineNo">5311</span>        String path = p.getSecond();<a name="line.5311"></a>
+<span class="sourceLineNo">5312</span><a name="line.5312"></a>
+<span class="sourceLineNo">5313</span>        Store store = getStore(familyName);<a name="line.5313"></a>
+<span class="sourceLineNo">5314</span>        if (store == null) {<a name="line.5314"></a>
+<span class="sourceLineNo">5315</span>          IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(<a name="line.5315"></a>
+<span class="sourceLineNo">5316</span>              "No such column family " + Bytes.toStringBinary(familyName));<a name="line.5316"></a>
+<span class="sourceLineNo">5317</span>          ioes.add(ioe);<a name="line.5317"></a>
+<span class="sourceLineNo">5318</span>        } else {<a name="line.5318"></a>
+<span class="sourceLineNo">5319</span>          try {<a name="line.5319"></a>
+<span class="sourceLineNo">5320</span>            store.assertBulkLoadHFileOk(new Path(path));<a name="line.5320"></a>
+<span class="sourceLineNo">5321</span>          } catch (WrongRegionException wre) {<a name="line.5321"></a>
+<span class="sourceLineNo">5322</span>            // recoverable (file doesn't fit in region)<a name="line.5322"></a>
+<span class="sourceLineNo">5323</span>            failures.add(p);<a name="line.5323"></a>
+<span class="sourceLineNo">5324</span>          } catch (IOException ioe) {<a name="line.5324"></a>
+<span class="sourceLineNo">5325</span>            // unrecoverable (hdfs problem)<a name="line.5325"></a>
+<span class="sourceLineNo">5326</span>            ioes.add(ioe);<a name="line.5326"></a>
+<span class="sourceLineNo">5327</span>          }<a name="line.5327"></a>
+<span class="sourceLineNo">5328</span>        }<a name="line.5328"></a>
+<span class="sourceLineNo">5329</span>      }<a name="line.5329"></a>
+<span class="sourceLineNo">5330</span><a name="line.5330"></a>
+<span class="sourceLineNo">5331</span>      // validation failed because of some sort of IO problem.<a name="line.5331"></a>
+<span class="sourceLineNo">5332</span>      if (ioes.size() != 0) {<a name="line.5332"></a>
+<span class="sourceLineNo">5333</span>        IOException e = MultipleIOException.createIOException(ioes);<a name="line.5333"></a>
+<span class="sourceLineNo">5334</span>        LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);<a name="line.5334"></a>
+<span class="sourceLineNo">5335</span>        throw e;<a name="line.5335"></a>
+<span class="sourceLineNo">5336</span>      }<a name="line.5336"></a>
+<span class="sourceLineNo">5337</span><a name="line.5337"></a>
+<span class="sourceLineNo">5338</span>      // validation failed, bail out before doing anything permanent.<a name="line.5338"></a>
+<span class="sourceLineNo">5339</span>      if (failures.size() != 0) {<a name="line.5339"></a>
+<span class="sourceLineNo">5340</span>        StringBuilder list = new StringBuilder();<a name="line.5340"></a>
+<span class="sourceLineNo">5341</span>        for (Pair&lt;byte[], String&gt; p : failures) {<a name="line.5341"></a>
+<span class="sourceLineNo">5342</span>          list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")<a name="line.5342"></a>
+<span class="sourceLineNo">5343</span>              .append(p.getSecond());<a name="line.5343"></a>
+<span class="sourceLineNo">5344</span>        }<a name="line.5344"></a>
+<span class="sourceLineNo">5345</span>        // problem when validating<a name="line.5345"></a>
+<span class="sourceLineNo">5346</span>        LOG.warn("There was a recoverable bulk load failure likely due to a" +<a name="line.5346"></a>
+<span class="sourceLineNo">5347</span>            " split.  These (family, HFile) pairs were not loaded: " + list);<a name="line.5347"></a>
+<span class="sourceLineNo">5348</span>        return isSuccessful;<a name="line.5348"></a>
+<span class="sourceLineNo">5349</span>      }<a name="line.5349"></a>
+<span class="sourceLineNo">5350</span><a name="line.5350"></a>
+<span class="sourceLineNo">5351</span>      // We need to assign a sequential ID that's in between two memstores in order to preserve<a name="line.5351"></a>
+<span class="sourceLineNo">5352</span>      // the guarantee that all the edits lower than the highest sequential ID from all the<a name="line.5352"></a>
+<span class="sourceLineNo">5353</span>      // HFiles are flushed on disk. See HBASE-10958.  The sequence id returned when we flush is<a name="line.5353"></a>
+<span class="sourceLineNo">5354</span>      // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is<a name="line.5354"></a>
+<span class="sourceLineNo">5355</span>      // a sequence id that we can be sure is beyond the last hfile written).<a name="line.5355"></a>
+<span class="sourceLineNo">5356</span>      if (assignSeqId) {<a name="line.5356"></a>
+<span class="sourceLineNo">5357</span>        FlushResult fs = flushcache(true, false);<a name="line.5357"></a>
+<span class="sourceLineNo">5358</span>        if (fs.isFlushSucceeded()) {<a name="line.5358"></a>
+<span class="sourceLineNo">5359</span>          seqId = ((FlushResultImpl)fs).flushSequenceId;<a name="line.5359"></a>
+<span class="sourceLineNo">5360</span>        } else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {<a name="line.5360"></a>
+<span class="sourceLineNo">5361</span>          seqId = ((FlushResultImpl)fs).flushSequenceId;<a name="line.5361"></a>
+<span class="sourceLineNo">5362</span>        } else {<a name="line.5362"></a>
+<span class="sourceLineNo">5363</span>          throw new IOException("Could not bulk load with an assigned sequential ID because the "+<a name="line.5363"></a>
+<span class="sourceLineNo">5364</span>            "flush didn't run. Reason for not flushing: " + ((FlushResultImpl)fs).failureReason);<a name="line.5364"></a>
+<span class="sourceLineNo">5365</span>        }<a name="line.5365"></a>
+<span class="sourceLineNo">5366</span>      }<a name="line.5366"></a>
+<span class="sourceLineNo">5367</span><a name="line.5367"></a>
+<span class="sourceLineNo">5368</span>      for (Pair&lt;byte[], String&gt; p : familyPaths) {<a name="line.5368"></a>
+<span class="sourceLineNo">5369</span>        byte[] familyName = p.getFirst();<a name="line.5369"></a>
+<span class="sourceLineNo">5370</span>        String path = p.getSecond();<a name="line.5370"></a>
+<span class="sourceLineNo">5371</span>        Store store = getStore(familyName);<a name="line.5371"></a>
+<span class="sourceLineNo">5372</span>        try {<a name="line.5372"></a>
+<span class="sourceLineNo">5373</span>          String finalPath = path;<a name="line.5373"></a>
+<span class="sourceLineNo">5374</span>          if (bulkLoadListener != null) {<a name="line.5374"></a>
+<span class="sourceLineNo">5375</span>            finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);<a name="line.5375"></a>
+<span class="sourceLineNo">5376</span>          }<a name="line.5376"></a>
+<span class="sourceLineNo">5377</span>          Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);<a name="line.5377"></a>
+<span class="sourceLineNo">5378</span><a name="line.5378"></a>
+<span class="sourceLineNo">5379</span>          if(storeFiles.containsKey(familyName)) {<a name="line.5379"></a>
+<span class="sourceLineNo">5380</span>            storeFiles.get(familyName).add(commitedStoreFile);<a name="line.5380"></a>
+<span class="sourceLineNo">5381</span>          } else {<a name="line.5381"></a>
+<span class="sourceLineNo">5382</span>            List&lt;Path&gt; storeFileNames = new ArrayList&lt;Path&gt;();<a name="line.5382"></a>
+<span class="sourceLineNo">5383</span>            storeFileNames.add(commitedStoreFile);<a name="line.5383"></a>
+<span class="sourceLineNo">5384</span>            storeFiles.put(familyName, storeFileNames);<a name="line.5384"></a>
+<span class="sourceLineNo">5385</span>          }<a name="line.5385"></a>
+<span class="sourceLineNo">5386</span>          if (bulkLoadListener != null) {<a name="line.5386"></a>
+<span class="sourceLineNo">5387</span>            bulkLoadListener.doneBulkLoad(familyName, path);<a name="line.5387"></a>
+<span class="sourceLineNo">5388</span>          }<a name="line.5388"></a>
+<span class="sourceLineNo">5389</span>        } catch (IOException ioe) {<a name="line.5389"></a>
+<span class="sourceLineNo">5390</span>          // A failure here can cause an atomicity violation that we currently<a name="line.5390"></a>
+<span class="sourceLineNo">5391</span>          // cannot recover from since it is likely a failed HDFS operation.<a name="line.5391"></a>
+<span class="sourceLineNo">5392</span><a name="line.5392"></a>
+<span class="sourceLineNo">5393</span>          // TODO Need a better story for reverting partial failures due to HDFS.<a name="line.5393"></a>
+<span class="sourceLineNo">5394</span>          LOG.error("There was a partial failure due to IO when attempting to" +<a name="line.5394"></a>
+<span class="sourceLineNo">5395</span>              " load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);<a name="line.5395"></a>
+<span class="sourceLineNo">5396</span>          if (bulkLoadListener != null) {<a name="line.5396"></a>
+<span class="sourceLineNo">5397</span>            try {<a name="line.5397"></a>
+<span class="sourceLineNo">5398</span>              bulkLoadListener.failedBulkLoad(familyName, path);<a name="line.5398"></a>
+<span class="sourceLineNo">5399</span>            } catch (Exception ex) {<a name="line.5399"></a>
+<span class="sourceLineNo">5400</span>              LOG.error("Error while calling failedBulkLoad for family " +<a name="line.5400"></a>
+<span class="sourceLineNo">5401</span>                  Bytes.toString(familyName) + " with path " + path, ex);<a name="line.5401"></a>
+<span class="sourceLineNo">5402</span>            }<a name="line.5402"></a>
+<span class="sourceLineNo">5403</span>          }<a name="line.5403"></a>
+<span class="sourceLineNo">5404</span>          throw ioe;<a name="line.5404"></a>
+<span class="sourceLineNo">5405</span>        }<a name="line.5405"></a>
+<span class="sourceLineNo">5406</span>      }<a name="line.5406"></a>
+<span class="sourceLineNo">5407</span><a name="line.5407"></a>
+<span class="sourceLineNo">5408</span>      isSuccessful = true;<a name="line.5408"></a>
+<span class="sourceLineNo">5409</span>    } finally {<a name="line.5409"></a>
+<span class="sourceLineNo">5410</span>      if (wal != null &amp;&amp; !storeFiles.isEmpty()) {<a name="line.5410"></a>
+<span class="sourceLineNo">5411</span>        // Write a bulk load event for hfiles that are loaded<a name="line.5411"></a>
+<span class="sourceLineNo">5412</span>        try {<a name="line.5412"></a>
+<span class="sourceLineNo">5413</span>          WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(<a name="line.5413"></a>
+<span class="sourceLineNo">5414</span>              this.getRegionInfo().getTable(),<a name="line.5414"></a>
+<span class="sourceLineNo">5415</span>              ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);<a name="line.5415"></a>
+<span class="sourceLineNo">5416</span>          WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),<a name="line.5416"></a>
+<span class="sourceLineNo">5417</span>              loadDescriptor, mvcc);<a name="line.5417"></a>
+<span class="sourceLineNo">5418</span>        } catch (IOException ioe) {<a name="line.5418"></a>
+<span class="sourceLineNo">5419</span>          if (this.rsServices != null) {<a name="line.5419"></a>
+<span class="sourceLineNo">5420</span>            // Have to abort region server because some hfiles has been loaded but we can't write<a name="line.5420"></a>
+<span class="sourceLineNo">5421</span>            // the event into WAL<a name="line.5421"></a>
+<span class="sourceLineNo">5422</span>            isSuccessful = false;<a name="line.5422"></a>
+<span class="sourceLineNo">5423</span>            this.rsServices.abort("Failed to write bulk load event into WAL.", ioe);<a name="line.5423"></a>
+<span class="sourceLineNo">5424</span>          }<a name="line.5424"></a>
+<span class="sourceLineNo">5425</span>        }<a name="line.5425"></a>
+<span class="sourceLineNo">5426</span>      }<a name="line.5426"></a>
+<span class="sourceLineNo">5427</span><a name="line.5427"></a>
+<span class="sourceLineNo">5428</span>      closeBulkRegionOperation();<a name="line.5428"></a>
+<span class="sourceLineNo">5429</span>    }<a name="line.5429"></a>
+<span class="sourceLineNo">5430</span>    return isSuccessful;<a name="line.5430"></a>
+<span class="sourceLineNo">5431</span>  }<a name="line.5431"></a>
+<span class="sourceLineNo">5432</span><a name="line.5432"></a>
+<span class="sourceLineNo">5433</span>  @Override<a name="line.5433"></a>
+<span class="sourceLineNo">5434</span>  public boolean equals(Object o) {<a name="line.5434"></a>
+<span class="sourceLineNo">5435</span>    return o instanceof HRegion &amp;&amp; Bytes.equals(getRegionInfo().getRegionName(),<a name="line.5435"></a>
+<span class="sourceLineNo">5436</span>                                                ((HRegion) o).getRegionInfo().getRegionName());<a name="line.5436"></a>
+<span class="sourceLineNo">5437</span>  }<a name="line.5437"></a>
+<span class="sourceLineNo">5438</span><a name="line.5438"></a>
+<span class="sourceLineNo">5439</span>  @Override<a name="line.5439"></a>
+<span class="sourceLineNo">5440</span>  public int hashCode() {<a name="line.5440"></a>
+<span class="sourceLineNo">5441</span>    return Bytes.hashCode(getRegionInfo().getRegionName());<a name="line.5441"></a>
+<span class="sourceLineNo">5442</span>  }<a name="line.5442"></a>
+<span class="sourceLineNo">5443</span><a name="line.5443"></a>
+<span class="sourceLineNo">5444</span>  @Override<a name="line.5444"></a>
+<span class="sourceLineNo">5445</span>  public String toString() {<a name="line.5445"></a>
+<span class="sourceLineNo">5446</span>    return getRegionInfo().getRegionNameAsString();<a name="line.5446"></a>
+<span class="sourceLineNo">5447</span>  }<a name="line.5447"></a>
+<span class="sourceLineNo">5448</span><a name="line.5448"></a>
+<span class="sourceLineNo">5449</span>  /**<a name="line.5449"></a>
+<span class="sourceLineNo">5450</span>   * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).<a name="line.5450"></a>
+<span class="sourceLineNo">5451</span>   */<a name="line.5451"></a>
+<span class="sourceLineNo">5452</span>  class RegionScannerImpl implements RegionScanner, org.apache.hadoop.hbase.ipc.RpcCallback {<a name="line.5452"></a>
+<span class="sourceLineNo">5453</span>    // Package local for testability<a name="line.5453"></a>
+<span class="sourceLineNo">5454</span>    KeyValueHeap storeHeap = null;<a name="line.5454"></a>
+<span class="sourceLineNo">5455</span>    /** Heap of key-values that are not essential for the provided filters and are thus read<a name="line.5455"></a>
+<span class="sourceLineNo">5456</span>     * on demand, if on-demand column family loading is enabled.*/<a name="line.5456"></a>
+<span class="sourceLineNo">5457</span>    KeyValueHeap joinedHeap = null;<a name="line.5457"></a>
+<span class="sourceLineNo">5458</span>    /**<a name="line.5458"></a>
+<span class="sourceLineNo">5459</span>     * If the joined heap data gathering is interrupted due to scan limits, this will<a name="line.5459"></a>
+<span class="sourceLineNo">5460</span>     * contain the row for which we are populating the values.*/<a name="line.5460"></a>
+<span class="sourceLineNo">5461</span>    protected Cell joinedContinuationRow = null;<a name="line.5461"></a>
+<span class="sourceLineNo">5462</span>    private boolean filterClosed = false;<a name="line.5462"></a>
+<span class="sourceLineNo">5463</span><a name="line.5463"></a>
+<span class="sourceLineNo">5464</span>    protected final int isScan;<a name="line.5464"></a>
+<span class="sourceLineNo">5465</span>    protected final byte[] stopRow;<a name="line.5465"></a>
+<span class="sourceLineNo">5466</span>    protected final HRegion region;<a name="line.5466"></a>
+<span class="sourceLineNo">5467</span>    protected final CellComparator comparator;<a name="line.5467"></a>
+<span class="sourceLineNo">5468</span>    protected boolean copyCellsFromSharedMem = false;<a name="line.5468"></a>
+<span class="sourceLineNo">5469</span><a name="line.5469"></a>
+<span class="sourceLineNo">5470</span>    private final long readPt;<a name="line.5470"></a>
+<span class="sourceLineNo">5471</span>    private final long maxResultSize;<a name="line.5471"></a>
+<span class="sourceLineNo">5472</span>    private final ScannerContext defaultScannerContext;<a name="line.5472"></a>
+<span class="sourceLineNo">5473</span>    private final FilterWrapper filter;<a name="line.5473"></a>
+<span class="sourceLineNo">5474</span><a name="line.5474"></a>
+<span class="sourceLineNo">5475</span>    @Override<a name="line.5475"></a>
+<span class="sourceLineNo">5476</span>    public HRegionInfo getRegionInfo() {<a name="line.5476"></a>
+<span class="sourceLineNo">5477</span>      return region.getRegionInfo();<a name="line.5477"></a>
+<span class="sourceLineNo">5478</span>    }<a name="line.5478"></a>
+<span class="sourceLineNo">5479</span><a name="line.5479"></a>
+<span class="sourceLineNo">5480</span>    public void setCopyCellsFromSharedMem(boolean copyCells) {<a name="line.5480"></a>
+<span class="sourceLineNo">5481</span>      this.copyCellsFromSharedMem = copyCells;<a name="line.5481"></a>
+<span class="sourceLineNo">5482</span>    }<a name="line.5482"></a>
+<span class="sourceLineNo">5483</span><a name="line.5483"></a>
+<span class="sourceLineNo">5484</span>    RegionScannerImpl(Scan scan, List&lt;KeyValueScanner&gt; additionalScanners, HRegion region,<a name="line.5484"></a>
+<span class="sourceLineNo">5485</span>        boolean copyCellsFromSharedMem)<a name="line.5485"></a>
+<span class="sourceLineNo">5486</span>        throws IOException {<a name="line.5486"></a>
+<span class="sourceLineNo">5487</span>      this.region = region;<a name="line.5487"></a>
+<span class="sourceLineNo">5488</span>      this.maxResultSize = scan.getMaxResultSize();<a name="line.5488"></a>
+<span class="sourceLineNo">5489</span>      if (scan.hasFilter()) {<a name="line.5489"></a>
+<span class="sourceLineNo">5490</span>        this.filter = new FilterWrapper(scan.getFilter());<a name="line.5490"></a>
+<span class="sourceLineNo">5491</span>      } else {<a name="line.5491"></a>
+<span class="sourceLineNo">5492</span>        this.filter = null;<a name="line.5492"></a>
+<span class="sourceLineNo">5493</span>      }<a name="line.5493"></a>
+<span class="sourceLineNo">5494</span>      this.comparator = region.getCellCompartor();<a name="line.5494"></a>
+<span class="sourceLineNo">5495</span>      /**<a name="line.5495"></a>
+<span class="sourceLineNo">5496</span>       * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default<a name="line.5496"></a>
+<span class="sourceLineNo">5497</span>       * scanner context that can be used to enforce the batch limit in the event that a<a name="line.5497"></a>
+<span class="sourceLineNo">5498</span>       * ScannerContext is not specified during an invocation of next/nextRaw<a name="line.5498"></a>
+<span class="sourceLineNo">5499</span>       */<a name="line.5499"></a>
+<span class="sourceLineNo">5500</span>      defaultScannerContext = ScannerContext.newBuilder()<a name="line.5500"></a>
+<span class="sourceLineNo">5501</span>          .setBatchLimit(scan.getBatch()).build();<a name="line.5501"></a>
+<span class="sourceLineNo">5502</span><a name="line.5502"></a>
+<span class="sourceLineNo">5503</span>      if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) &amp;&amp; !scan.isGetScan()) {<a name="line.5503"></a>
+<span class="sourceLineNo">5504</span>        this.stopRow = null;<a name="line.5504"></a>
+<span class="sourceLineNo">5505</span>      } else {<a name="line.5505"></a>
+<span class="sourceLineNo">5506</span>        this.stopRow = scan.getStopRow();<a name="line.5506"></a>
+<span class="sourceLineNo">5507</span>      }<a name="line.5507"></a>
+<span class="sourceLineNo">5508</span>      // If we are doing a get, we want to be [startRow,endRow]. Normally<a name="line.5508"></a>
+<span class="sourceLineNo">5509</span>      // it is [startRow,endRow) and if startRow=endRow we get nothing.<a name="line.5509"></a>
+<span class="sourceLineNo">5510</span>      this.isScan = scan.isGetScan() ? 1 : 0;<a name="line.5510"></a>
+<span class="sourceLineNo">5511</span><a name="line.5511"></a>
+<span class="sourceLineNo">5512</span>      // synchronize on scannerReadPoints so that nobody calculates<a name="line.5512"></a>
+<span class="sourceLineNo">5513</span>      // getSmallestReadPoint, before scannerReadPoints is updated.<a name="line.5513"></a>
+<span class="sourceLineNo">5514</span>      IsolationLevel isolationLevel = scan.getIsolationLevel();<a name="line.5514"></a>
+<span class="sourceLineNo">5515</span>      synchronized(scannerReadPoints) {<a name="line.5515"></a>
+<span class="sourceLineNo">5516</span>        this.readPt = getReadPoint(isolationLevel);<a name="line.5516"></a>
+<span class="sourceLineNo">5517</span>        scannerReadPoints.put(this, this.readPt);<a name="line.5517"></a>
+<span class="sourceLineNo">5518</span>      }<a name="line.5518"></a>
+<span class="sourceLineNo">5519</span><a name="line.5519"></a>
+<span class="sourceLineNo">5520</span>      // Here we separate all scanners into two lists - scanner that provide data required<a name="line.5520"></a>
+<span class="sourceLineNo">5521</span>      // by the filter to operate (scanners list) and all others (joinedScanners list).<a name="line.5521"></a>
+<span class="sourceLineNo">5522</span>      List&lt;KeyValueScanner&gt; scanners = new ArrayList&lt;KeyValueScanner&gt;(scan.getFamilyMap().size());<a name="line.5522"></a>
+<span class="sourceLineNo">5523</span>      List&lt;KeyValueScanner&gt; joinedScanners<a name="line.5523"></a>
+<span class="sourceLineNo">5524</span>        = new ArrayList&lt;KeyValueScanner&gt;(scan.getFamilyMap().size());<a name="line.5524"></a>
+<span class="sourceLineNo">5525</span>      if (additionalScanners != null) {<a name="line.5525"></a>
+<span class="sourceLineNo">5526</span>        scanners.addAll(additionalScanners);<a name="line.5526"></a>
+<span class="sourceLineNo">5527</span>      }<a name="line.5527"></a>
+<span class="sourceLineNo">5528</span><a name="line.5528"></a>
+<span class="sourceLineNo">5529</span>      for (Map.Entry&lt;byte[], NavigableSet&lt;byte[]&gt;&gt; entry : scan.getFamilyMap().entrySet()) {<a name="line.5529"></a>
+<span class="sourceLineNo">5530</span>        Store store = stores.get(entry.getKey());<a name="line.5530"></a>
+<span class="sourceLineNo">5531</span>        KeyValueScanner scanner;<a name="line.5531"></a>
+<span class="sourceLineNo">5532</span>        try {<a name="line.5532"></a>
+<span class="sourceLineNo">5533</span>          scanner = store.getScanner(scan, entry.getValue(), this.readPt);<a name="line.5533"></a>
+<span class="sourceLineNo">5534</span>        } catch (FileNotFoundException e) {<a name="line.5534"></a>
+<span class="sourceLineNo">5535</span>          throw handleFileNotFound(e);<a name="line.5535"></a>
+<span class="sourceLineNo">5536</span>        }<a name="line.5536"></a>
+<span class="sourceLineNo">5537</span>        if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()<a name="line.5537"></a>
+<span class="sourceLineNo">5538</span>          || this.filter.isFamilyEssential(entry.getKey())) {<a name="line.5538"></a>
+<span class="sourceLineNo">5539</span>          scanners.add(scanner);<a name="line.5539"></a>
+<span class="sourceLineNo">5540</span>        } else {<a name="line.5540"></a>
+<span class="sourceLineNo">5541</span>          joinedScanners.add(scanner);<a name="line.5541"></a>
+<span class="sourceLineNo">5542</span>        }<a name="line.5542"></a>
+<span class="sourceLineNo">5543</span>      }<a name="line.5543"></a>
+<span class="sourceLineNo">5544</span>      this.copyCellsFromSharedMem = copyCellsFromSharedMem;<a name="line.5544"></a>
+<span class="sourceLineNo">5545</span>      initializeKVHeap(scanners, joinedScanners, region);<a name="line.5545"></a>
+<span class="sourceLineNo">5546</span>    }<a name="line.5546"></a>
+<span class="sourceLineNo">5547</span><a name="line.5547"></a>
+<span class="sourceLineNo">5548</span>    protected void initializeKVHeap(List&lt;KeyValueScanner&gt; scanners,<a name="line.5548"></a>
+<span class="sourceLineNo">5549</span>        List&lt;KeyValueScanner&gt; joinedScanners, HRegion region)<a name="line.5549"></a>
+<span class="sourceLineNo">5550</span>        throws IOException {<a name="line.5550"></a>
+<span class="sourceLineNo">5551</span>      this.storeHeap = new KeyValueHeap(scanners, comparator);<a name="line.5551"></a>
+<span class="sourceLineNo">5552</span>      if (!joinedScanners.isEmpty()) {<a name="line.5552"></a>
+<span class="sourceLineNo">5553</span>        this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);<a name="line.5553"></a>
+<span class="sourceLineNo">5554</span>      }<a name="line.5554"></a>
+<span class="sourceLineNo">5555</span>    }<a name="line.5555"></a>
+<span class="sourceLineNo">5556</span><a name="line.5556"></a>
+<span class="sourceLineNo">5557</span>    @Override<a name="line.5557"></a>
+<span class="sourceLineNo">5558</span>    public long getMaxResultSize() {<a name="line.5558"></a>
+<span class="sourceLineNo">5559</span>      return maxResultSize;<a name="line.5559"></a>
+<span class="sourceLineNo">5560</span>    }<a name="line.5560"></a>
+<span class="sourceLineNo">5561</span><a name="line.5561"></a>
+<span class="sourceLineNo">5562</span>    @Override<a name="line.5562"></a>
+<span class="sourceLineNo">5563</span>    public long getMvccReadPoint() {<a name="line.5563"></a>
+<span class="sourceLineNo">5564</span>      return this.readPt;<a name="line.5564"></a>
+<span class="sourceLineNo">5565</span>    }<a name="line.5565"></a>
+<span class="sourceLineNo">5566</span><a name="line.5566"></a>
+<span class="sourceLineNo">5567</span>    @Override<a name="line.5567"></a>
+<span class="sourceLineNo">5568</span>    public int getBatch() {<a name="line.5568"></a>
+<span class="sourceLineNo">5569</span>      return this.defaultScannerContext.getBatchLimit();<a name="line.5569"></a>
+<span class="sourceLineNo">5570</span>    }<a name="line.5570"></a>
+<span class="sourceLineNo">5571</span><a name="line.5571"></a>
+<span class="sourceLineNo">5572</span>    /**<a name="line.5572"></a>
+<span class="sourceLineNo">5573</span>     * Reset both the filter and the old filter.<a name="line.5573"></a>
+<span class="sourceLineNo">5574</span>     *<a name="line.5574"></a>
+<span class="sourceLineNo">5575</span>     * @throws IOException in case a filter raises an I/O exception.<a name="line.5575"></a>
+<span class="sourceLineNo">5576</span>     */<a name="line.5576"></a>
+<span class="sourceLineNo">5577</span>    protected void resetFilters() throws IOException {<a name="line.5577"></a>
+<span class="sourceLineNo">5578</span>      if (filter != null) {<a name="line.5578"></a>
+<span class="sourceLineNo">5579</span>        filter.reset();<a name="line.5579"></a>
+<span class="sourceLineNo">5580</span>      }<a name="line.5580"></a>
+<span class="sourceLineNo">5581</span>    }<a name="line.5581"></a>
+<span class="sourceLineNo">5582</span><a name="line.5582"></a>
+<span class="sourceLineNo">5583</span>    @Override<a name="line.5583"></a>
+<span class="sourceLineNo">5584</span>    public boolean next(List&lt;Cell&gt; outResults)<a name="line.5584"></a>
+<span class="sourceLineNo">5585</span>        throws IOException {<a name="line.5585"></a>
+<span class="sourceLineNo">5586</span>      // apply the batching limit by default<a name="line.5586"></a>
+<span class="sourceLineNo">5587</span>      return next(outResults, defaultScannerContext);<a name="line.5587"></a>
+<span class="sourceLineNo">5588</span>    }<a name="line.5588"></a>
+<span class="sourceLineNo">5589</span><a name="line.5589"></a>
+<span class="sourceLineNo">5590</span>    @Override<a name="line.5590"></a>
+<span class="sourceLineNo">5591</span>    public synchronized boolean next(List&lt;Cell&gt; outResults, ScannerContext scannerContext)<a name="line.5591"></a>
+<span class="sourceLineNo">5592</span>    throws IOException {<a name="line.5592"></a>
+<span class="sourceLineNo">5593</span>      if (this.filterClosed) {<a name="line.5593"></a>
+<span class="sourceLineNo">5594</span>        throw new UnknownScannerException("Scanner was closed (timed out?) " +<a name="line.5594"></a>
+<span class="sourceLineNo">5595</span>            "after we renewed it. Could be caused by a very slow scanner " +<a name="line.5595"></a>
+<span class="sourceLineNo">5596</span>            "or a lengthy garbage collection");<a name="line.5596"></a>
+<span class="sourceLineNo">5597</span>      }<a name="line.5597"></a>
+<span class="sourceLineNo">5598</span>      startRegionOperation(Operation.SCAN);<a name="line.5598"></a>
+<span class="sourceLineNo">5599</span>      readRequestsCount.increment();<a name="line.5599"></a>
+<span class="sourceLineNo">5600</span>      try {<a name="line.5600"></a>
+<span class="sourceLineNo">5601</span>        return nextRaw(outResults, scannerContext);<a name="line.5601"></a>
+<span class="sourceLineNo">5602</span>      } finally {<a name="line.5602"></a>
+<span class="sourceLineNo">5603</span>        closeRegionOperation(Operation.SCAN);<a name="line.5603"></a>
+<span class="sourceLineNo">5604</span>      }<a name="line.5604"></a>
+<span class="sourceLineNo">5605</span>    }<a name="line.5605"></a>
+<span class="sourceLineNo">5606</span><a name="line.5606"></a>
+<span class="sourceLineNo">5607</span>    @Override<a name="line.5607"></a>
+<span class="sourceLineNo">5608</span>    public boolean nextRaw(List&lt;Cell&gt; outResults) throws IOException {<a name="line.5608"></a>
+<span class="sourceLineNo">5609</span>      // Use the RegionScanner's context by default<a name="line.5609"></a>
+<span class="sourceLineNo">5610</span>      return nextRaw(outResults, defaultScannerContext);<a name="line.5610"></a>
+<span class="sourceLineNo">5611</span>    }<a name="line.5611"></a>
+<span class="sourceLineNo">5612</span><a name="line.5612"></a>
+<span class="sourceLineNo">5613</span>    @Override<a name="line.5613"></a>
+<span class="sourceLineNo">5614</span>    public boolean nextRaw(List&lt;Cell&gt; outResults, ScannerContext scannerContext)<a name="line.5614"></a>
+<span class="sourceLineNo">5615</span>        throws IOException {<a name="line.5615"></a>
+<span class="sourceLineNo">5616</span>      if (storeHeap == null) {<a name="line.5616"></a>
+<span class="sourceLineNo">5617</span>        // scanner is closed<a name="line.5617"></a>
+<span class="sourceLineNo">5618</span>        throw new UnknownScannerException("Scanner was closed");<a name="line.5618"></a>
+<span class="sourceLineNo">5619</span>      }<a name="line.5619"></a>
+<span class="sourceLineNo">5620</span>      boolean moreValues = false;<a name="line.5620"></a>
+<span class="sourceLineNo">5621</span>      try {<a name="line.5621"></a>
+<span class="sourceLineNo">5622</span>        if (outResults.isEmpty()) {<a name="line.5622"></a>
+<span class="sourceLineNo">5623</span>          // Usually outResults is empty. This is true when next is called<a name="line.5623"></a>
+<span class="sourceLineNo">5624</span>          // to handle scan or get operation.<a name="line.5624"></a>
+<span class="sourceLineNo">5625</span>          moreValues = nextInternal(outResults, scannerContext);<a name="line.5625"></a>
+<span class="sourceLineNo">5626</span>        } else {<a name="line.5626"></a>
+<span class="sourceLineNo">5627</span>          List&lt;Cell&gt; tmpList = new ArrayList&lt;Cell&gt;();<a name="line.5627"></a>
+<span class="sourceLineNo">5628</span>          moreValues = nextInternal(tmpList, scannerContext);<a name="line.5628"></a>
+<span class="sourceLineNo">5629</span>          outResults.addAll(tmpList);<a name="line.5629"></a>
+<span class="sourceLineNo">5630</span>        }<a name="line.5630"></a>
+<span class="sourceLineNo">5631</span><a name="line.5631"></a>
+<span class="sourceLineNo">5632</span>        // If the size limit was reached it means a partial Result is being<a name="line.5632"></a>
+<span class="sourceLineNo">5633</span>        // returned. Returning a<a name="line.5633"></a>
+<span class="sourceLineNo">5634</span>        // partial Result means that we should not reset the filters; filters<a name="line.5634"></a>
+<span class="sourceLineNo">5635</span>        // should only be reset in<a name="line.5635"></a>
+<span class="sourceLineNo">5636</span>        // between rows<a name="line.5636"></a>
+<span class="sourceLineNo">5637</span>        if (!scannerContext.partialResultFormed()) resetFilters();<a name="line.5637"></a>
+<span class="sourceLineNo">5638</span><a name="line.5638"></a>
+<span class="sourceLineNo">5639</span>        if (isFilterDoneInternal()) {<a name="line.5639"></a>
+<span class="sourceLineNo">5640</span>          moreValues = false;<a name="line.5640"></a>
+<span class="sourceLineNo">5641</span>        }<a name="line.5641"></a>
+<span class="sourceLineNo">5642</span><a name="line.5642"></a>
+<span class="sourceLineNo">5643</span>        // If copyCellsFromSharedMem = true, then we need to copy the cells. Otherwise<a name="line.5643"></a>
+<span class="sourceLineNo">5644</span>        // it is a call coming from the RsRpcServices.scan().<a name="line.5644"></a>
+<span class="sourceLineNo">5645</span>        if (copyCellsFromSharedMem &amp;&amp; !outResults.isEmpty()) {<a name="line.5645"></a>
+<span class="sourceLineNo">5646</span>          // Do the copy of the results here.<a name="line.5646"></a>
+<span class="sourceLineNo">5647</span>          ListIterator&lt;Cell&gt; listItr = outResults.listIterator();<a name="line.5647"></a>
+<span class="sourceLineNo">5648</span>          Cell cell = null;<a name="line.5648"></a>
+<span class="sourceLineNo">5649</span>          while (listItr.hasNext()) {<a name="line.5649"></a>
+<span class="sourceLineNo">5650</span>            cell = listItr.next();<a name="line.5650"></a>
+<span class="sourceLineNo">5651</span>            if (cell instanceof ShareableMemory) {<a name="line.5651"></a>
+<span class="sourceLineNo">5652</span>              listItr.set(((ShareableMemory) cell).cloneToCell());<a name="line.5652"></a>
+<span class="sourceLineNo">5653</span>            }<a name="line.5653"></a>
+<span class="sourceLineNo">5654</span>          }<a name="line.5654"></a>
+<span class="sourceLineNo">5655</span>        }<a name="line.5655"></a>
+<span class="sourceLineNo">5656</span>      } finally {<a name="line.5656"></a>
+<span class="sourceLineNo">5657</span>        if (copyCellsFromSharedMem) {<a name="line.5657"></a>
+<span class="sourceLineNo">5658</span>          // In case of copyCellsFromSharedMem==true (where the CPs wrap a scanner) we return<a name="line.5658"></a>
+<span class="sourceLineNo">5659</span>          // the blocks then and there (for wrapped CPs)<a name="line.5659"></a>
+<span class="sourceLineNo">5660</span>          this.shipped();<a name="line.5660"></a>
+<span class="sourceLineNo">5661</span>        }<a name="line.5661"></a>
+<span class="sourceLineNo">5662</span>      }<a name="line.5662"></a>
+<span class="sourceLineNo">5663</span>      return moreValues;<a name="line.5663"></a>
+<span class="sourceLineNo">5664</span>    }<a name="line.5664"></a>
+<span class="sourceLineNo">5665</span><a name="line.5665"></a>
+<span class="sourceLineNo">5666</span>    /**<a name="line.5666"></a>
+<span class="sourceLineNo">5667</span>     * @return true if more cells exist after this batch, false if scanner is done<a name="line.5667"></a>
+<span class="sourceLineNo">5668</span>     */<a name="line.5668"></a>
+<span class="sourceLineNo">5669</span>    private boolean populateFromJoinedHeap(List&lt;Cell&gt; results, ScannerContext scannerContext)<a name="line.5669"></a>
+<span class="sourceLineNo">5670</span>            throws IOException {<a name="line.5670"></a>
+<span class="sourceLineNo">5671</span>      assert joinedContinuationRow != null;<a name="line.5671"></a>
+<span class="sourceLineNo">5672</span>      boolean moreValues = populateResult(results, this.joinedHeap, scannerContext,<a name="line.5672"></a>
+<span class="sourceLineNo">5673</span>          joinedContinuationRow);<a name="line.5673"></a>
+<span class="sourceLineNo">5674</span><a name="line.5674"></a>
+<span class="sourceLineNo">5675</span>      if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {<a name="line.5675"></a>
+<span class="sourceLineNo">5676</span>        // We are done with this row, reset the continuation.<a name="line.5676"></a>
+<span class="sourceLineNo">5677</span>        joinedContinuationRow = null;<a name="line.5677"></a>
+<span class="sourceLineNo">5678</span>      }<a name="line.5678"></a>
+<span class="sourceLineNo">5679</span>      // As the data is obtained from two independent heaps, we need to<a name="line.5679"></a>
+<span class="sourceLineNo">5680</span>      // ensure that result list is sorted, because Result relies on that.<a name="line.5680"></a>
+<span class="sourceLineNo">5681</span>      sort(results, comparator);<a name="line.5681"></a>
+<span class="sourceLineNo">5682</span>      return moreValues;<a name="line.5682"></a>
+<span class="sourceLineNo">5683</span>    }<a name="line.5683"></a>
+<span class="sourceLineNo">5684</span><a name="line.5684"></a>
+<span class="sourceLineNo">5685</span>    /**<a name="line.5685"></a>
+<span class="sourceLineNo">5686</span>     * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is<a name="line.5686"></a>
+<span class="sourceLineNo">5687</span>     * reached, or remainingResultSize (if not -1) is reaced<a name="line.5687"></a>
+<span class="sourceLineNo">5688</span>     * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.<a name="line.5688"></a>
+<span class="sourceLineNo">5689</span>     * @param scannerContext<a name="line.5689"></a>
+<span class="sourceLineNo">5690</span>     * @param currentRowCell<a name="line.5690"></a>
+<span class="sourceLineNo">5691</span>     * @return state of last call to {@link KeyValueHeap#next()}<a name="line.5691"></a>
+<span class="sourceLineNo">5692</span>     */<a name="line.5692"></a>
+<span class="sourceLineNo">5693</span>    private boolean populateResult(List&lt;Cell&gt; results, KeyValueHeap heap,<a name="line.5693"></a>
+<span class="sourceLineNo">5694</span>        ScannerContext scannerContext, Cell currentRowCell) throws IOException {<a name="line.5694"></a>
+<span class="sourceLineNo">5695</span>      Cell nextKv;<a name="line.5695"></a>
+<span class="sourceLineNo">5696</span>      boolean moreCellsInRow = false;<a name="line.5696"></a>
+<span class="sourceLineNo">5697</span>      boolean tmpKeepProgress = scannerContext.getKeepProgress();<a name="line.5697"></a>
+<span class="sourceLineNo">5698</span>      // Scanning between column families and thus the scope is between cells<a name="line.5698"></a>
+<span class="sourceLineNo">5699</span>      LimitScope limitScope = LimitScope.BETWEEN

<TRUNCATED>

Mime
View raw message