Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 215F2200CAC for ; Mon, 19 Jun 2017 17:00:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 20937160BFB; Mon, 19 Jun 2017 15:00:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E2122160BF4 for ; Mon, 19 Jun 2017 17:00:15 +0200 (CEST) Received: (qmail 38298 invoked by uid 500); 19 Jun 2017 15:00:14 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 37907 invoked by uid 99); 19 Jun 2017 15:00:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Jun 2017 15:00:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 11C87E10BF; Mon, 19 Jun 2017 15:00:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Mon, 19 Jun 2017 15:00:22 -0000 Message-Id: In-Reply-To: <38efa51924e2440a8fad757a27ba1efc@git.apache.org> References: <38efa51924e2440a8fad757a27ba1efc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/30] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Mon, 19 Jun 2017 15:00:18 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/7f23ee04/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.WorkerState.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.WorkerState.html b/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.WorkerState.html deleted file mode 100644 index 39b1759..0000000 --- a/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.WorkerState.html +++ /dev/null @@ -1,431 +0,0 @@ - - - -Source code - - - -
-
001/*
-002 *
-003 * Licensed to the Apache Software Foundation (ASF) under one
-004 * or more contributor license agreements.  See the NOTICE file
-005 * distributed with this work for additional information
-006 * regarding copyright ownership.  The ASF licenses this file
-007 * to you under the Apache License, Version 2.0 (the
-008 * "License"); you may not use this file except in compliance
-009 * with the License.  You may obtain a copy of the License at
-010 *
-011 *     http://www.apache.org/licenses/LICENSE-2.0
-012 *
-013 * Unless required by applicable law or agreed to in writing, software
-014 * distributed under the License is distributed on an "AS IS" BASIS,
-015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-016 * See the License for the specific language governing permissions and
-017 * limitations under the License.
-018 */
-019package org.apache.hadoop.hbase.replication.regionserver;
-020
-021import java.io.IOException;
-022import java.util.List;
-023import java.util.Map;
-024import java.util.concurrent.PriorityBlockingQueue;
-025import java.util.concurrent.TimeUnit;
-026
-027import org.apache.commons.logging.Log;
-028import org.apache.commons.logging.LogFactory;
-029import org.apache.hadoop.conf.Configuration;
-030import org.apache.hadoop.fs.Path;
-031import org.apache.hadoop.hbase.Cell;
-032import org.apache.hadoop.hbase.CellUtil;
-033import org.apache.hadoop.hbase.MetaTableAccessor;
-034import org.apache.hadoop.hbase.classification.InterfaceAudience;
-035import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-036import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
-038import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
-039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
-040import org.apache.hadoop.hbase.util.Bytes;
-041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-042import org.apache.hadoop.hbase.util.Threads;
-043import org.apache.hadoop.hbase.wal.WAL.Entry;
-044
-045import com.google.common.cache.CacheBuilder;
-046import com.google.common.cache.CacheLoader;
-047import com.google.common.cache.LoadingCache;
-048
-049/**
-050 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by
-051 * ReplicationSourceWALReaderThread
-052 */
-053@InterfaceAudience.Private
-054public class ReplicationSourceShipperThread extends Thread {
-055  private static final Log LOG = LogFactory.getLog(ReplicationSourceShipperThread.class);
-056
-057  // Hold the state of a replication worker thread
-058  public enum WorkerState {
-059    RUNNING,
-060    STOPPED,
-061    FINISHED,  // The worker is done processing a recovered queue
-062  }
-063
-064  protected final Configuration conf;
-065  protected final String walGroupId;
-066  protected final PriorityBlockingQueue<Path> queue;
-067  protected final ReplicationSourceInterface source;
-068
-069  // Last position in the log that we sent to ZooKeeper
-070  protected long lastLoggedPosition = -1;
-071  // Path of the current log
-072  protected volatile Path currentPath;
-073  // Current state of the worker thread
-074  private WorkerState state;
-075  protected ReplicationSourceWALReaderThread entryReader;
-076
-077  // How long should we sleep for each retry
-078  protected final long sleepForRetries;
-079  // Maximum number of retries before taking bold actions
-080  protected final int maxRetriesMultiplier;
-081
-082  // Use guava cache to set ttl for each key
-083  private final LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
-084      .expireAfterAccess(1, TimeUnit.DAYS).build(
-085      new CacheLoader<String, Boolean>() {
-086        @Override
-087        public Boolean load(String key) throws Exception {
-088          return false;
-089        }
-090      }
-091  );
-092
-093  public ReplicationSourceShipperThread(Configuration conf, String walGroupId,
-094      PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
-095    this.conf = conf;
-096    this.walGroupId = walGroupId;
-097    this.queue = queue;
-098    this.source = source;
-099    this.sleepForRetries =
-100        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
-101    this.maxRetriesMultiplier =
-102        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
-103  }
-104
-105  @Override
-106  public void run() {
-107    setWorkerState(WorkerState.RUNNING);
-108    // Loop until we close down
-109    while (isActive()) {
-110      int sleepMultiplier = 1;
-111      // Sleep until replication is enabled again
-112      if (!source.isPeerEnabled()) {
-113        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
-114          sleepMultiplier++;
-115        }
-116        continue;
-117      }
-118
-119      while (entryReader == null) {
-120        if (sleepForRetries("Replication WAL entry reader thread not initialized",
-121          sleepMultiplier)) {
-122          sleepMultiplier++;
-123        }
-124      }
-125
-126      try {
-127        WALEntryBatch entryBatch = entryReader.take();
-128        for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) {
-129          waitingUntilCanPush(entry);
-130        }
-131        shipEdits(entryBatch);
-132      } catch (InterruptedException e) {
-133        LOG.trace("Interrupted while waiting for next replication entry batch", e);
-134        Thread.currentThread().interrupt();
-135      }
-136    }
-137    // If the worker exits run loop without finishing its task, mark it as stopped.
-138    if (state != WorkerState.FINISHED) {
-139      setWorkerState(WorkerState.STOPPED);
-140    }
-141  }
-142
-143  /**
-144   * Do the shipping logic
-145   */
-146  protected void shipEdits(WALEntryBatch entryBatch) {
-147    List<Entry> entries = entryBatch.getWalEntries();
-148    long lastReadPosition = entryBatch.getLastWalPosition();
-149    currentPath = entryBatch.getLastWalPath();
-150    int sleepMultiplier = 0;
-151    if (entries.isEmpty()) {
-152      if (lastLoggedPosition != lastReadPosition) {
-153        // Save positions to meta table before zk.
-154        updateSerialRepPositions(entryBatch.getLastSeqIds());
-155        updateLogPosition(lastReadPosition);
-156        // if there was nothing to ship and it's not an error
-157        // set "ageOfLastShippedOp" to <now> to indicate that we're current
-158        source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
-159          walGroupId);
-160      }
-161      return;
-162    }
-163    int currentSize = (int) entryBatch.getHeapSize();
-164    while (isActive()) {
-165      try {
-166        try {
-167          source.tryThrottle(currentSize);
-168        } catch (InterruptedException e) {
-169          LOG.debug("Interrupted while sleeping for throttling control");
-170          Thread.currentThread().interrupt();
-171          // current thread might be interrupted to terminate
-172          // directly go back to while() for confirm this
-173          continue;
-174        }
-175
-176        // create replicateContext here, so the entries can be GC'd upon return from this call
-177        // stack
-178        ReplicationEndpoint.ReplicateContext replicateContext =
-179            new ReplicationEndpoint.ReplicateContext();
-180        replicateContext.setEntries(entries).setSize(currentSize);
-181        replicateContext.setWalGroupId(walGroupId);
-182
-183        long startTimeNs = System.nanoTime();
-184        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
-185        boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
-186        long endTimeNs = System.nanoTime();
-187
-188        if (!replicated) {
-189          continue;
-190        } else {
-191          sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
-192        }
-193
-194        if (this.lastLoggedPosition != lastReadPosition) {
-195          //Clean up hfile references
-196          int size = entries.size();
-197          for (int i = 0; i < size; i++) {
-198            cleanUpHFileRefs(entries.get(i).getEdit());
-199          }
-200
-201          // Save positions to meta table before zk.
-202          updateSerialRepPositions(entryBatch.getLastSeqIds());
-203          //Log and clean up WAL logs
-204          updateLogPosition(lastReadPosition);
-205        }
-206
-207        source.postShipEdits(entries, currentSize);
-208        // FIXME check relationship between wal group and overall
-209        source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
-210          entryBatch.getNbHFiles());
-211        source.getSourceMetrics().setAgeOfLastShippedOp(
-212          entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
-213        if (LOG.isTraceEnabled()) {
-214          LOG.trace("Replicated " + entries.size() + " entries or " + entryBatch.getNbOperations()
-215              + " operations in " + ((endTimeNs - startTimeNs) / 1000000) + " ms");
-216        }
-217        break;
-218      } catch (Exception ex) {
-219        LOG.warn(source.getReplicationEndpoint().getClass().getName() + " threw unknown exception:"
-220            + org.apache.hadoop.util.StringUtils.stringifyException(ex));
-221        if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
-222          sleepMultiplier++;
-223        }
-224      }
-225    }
-226  }
-227
-228  private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
-229    String key = entry.getKey();
-230    long seq = entry.getValue();
-231    boolean deleteKey = false;
-232    if (seq <= 0) {
-233      // There is a REGION_CLOSE marker, we can not continue skipping after this entry.
-234      deleteKey = true;
-235      seq = -seq;
-236    }
-237
-238    if (!canSkipWaitingSet.getUnchecked(key)) {
-239      try {
-240        source.getSourceManager().waitUntilCanBePushed(Bytes.toBytes(key), seq, source.getPeerId());
-241      } catch (IOException e) {
-242        LOG.error("waitUntilCanBePushed fail", e);
-243        throw new RuntimeException("waitUntilCanBePushed fail");
-244      } catch (InterruptedException e) {
-245        LOG.warn("waitUntilCanBePushed interrupted", e);
-246        Thread.currentThread().interrupt();
-247      }
-248      canSkipWaitingSet.put(key, true);
-249    }
-250    if (deleteKey) {
-251      canSkipWaitingSet.invalidate(key);
-252    }
-253  }
-254
-255  private void cleanUpHFileRefs(WALEdit edit) throws IOException {
-256    String peerId = source.getPeerId();
-257    if (peerId.contains("-")) {
-258      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
-259      // A peerId will not have "-" in its name, see HBASE-11394
-260      peerId = peerId.split("-")[0];
-261    }
-262    List<Cell> cells = edit.getCells();
-263    int totalCells = cells.size();
-264    for (int i = 0; i < totalCells; i++) {
-265      Cell cell = cells.get(i);
-266      if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
-267        BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
-268        List<StoreDescriptor> stores = bld.getStoresList();
-269        int totalStores = stores.size();
-270        for (int j = 0; j < totalStores; j++) {
-271          List<String> storeFileList = stores.get(j).getStoreFileList();
-272          source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
-273          source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
-274        }
-275      }
-276    }
-277  }
-278
-279  protected void updateLogPosition(long lastReadPosition) {
-280    source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
-281      lastReadPosition, false, false);
-282    lastLoggedPosition = lastReadPosition;
-283  }
-284
-285  private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) {
-286    try {
-287      MetaTableAccessor.updateReplicationPositions(source.getSourceManager().getConnection(),
-288        source.getPeerId(), lastPositionsForSerialScope);
-289    } catch (IOException e) {
-290      LOG.error("updateReplicationPositions fail", e);
-291      throw new RuntimeException("updateReplicationPositions fail");
-292    }
-293  }
-294
-295  public void startup(UncaughtExceptionHandler handler) {
-296    String name = Thread.currentThread().getName();
-297    Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + ","
-298        + source.getPeerClusterZnode(), handler);
-299  }
-300
-301  public PriorityBlockingQueue<Path> getLogQueue() {
-302    return this.queue;
-303  }
-304
-305  public Path getCurrentPath() {
-306    return this.entryReader.getCurrentPath();
-307  }
-308
-309  public long getCurrentPosition() {
-310    return this.lastLoggedPosition;
-311  }
-312
-313  public void setWALReader(ReplicationSourceWALReaderThread entryReader) {
-314    this.entryReader = entryReader;
-315  }
-316
-317  public long getStartPosition() {
-318    return 0;
-319  }
-320
-321  protected boolean isActive() {
-322    return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
-323  }
-324
-325  public void setWorkerState(WorkerState state) {
-326    this.state = state;
-327  }
-328
-329  public WorkerState getWorkerState() {
-330    return state;
-331  }
-332
-333  public void stopWorker() {
-334    setWorkerState(WorkerState.STOPPED);
-335  }
-336
-337  public boolean isFinished() {
-338    return state == WorkerState.FINISHED;
-339  }
-340
-341  /**
-342   * Do the sleeping logic
-343   * @param msg Why we sleep
-344   * @param sleepMultiplier by how many times the default sleeping time is augmented
-345   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
-346   */
-347  public boolean sleepForRetries(String msg, int sleepMultiplier) {
-348    try {
-349      if (LOG.isTraceEnabled()) {
-350        LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
-351      }
-352      Thread.sleep(this.sleepForRetries * sleepMultiplier);
-353    } catch (InterruptedException e) {
-354      LOG.debug("Interrupted while sleeping between retries");
-355      Thread.currentThread().interrupt();
-356    }
-357    return sleepMultiplier < maxRetriesMultiplier;
-358  }
-359}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- - http://git-wip-us.apache.org/repos/asf/hbase-site/blob/7f23ee04/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.html b/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.html deleted file mode 100644 index 39b1759..0000000 --- a/devapidocs/src-html/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.html +++ /dev/null @@ -1,431 +0,0 @@ - - - -Source code - - - -
-
001/*
-002 *
-003 * Licensed to the Apache Software Foundation (ASF) under one
-004 * or more contributor license agreements.  See the NOTICE file
-005 * distributed with this work for additional information
-006 * regarding copyright ownership.  The ASF licenses this file
-007 * to you under the Apache License, Version 2.0 (the
-008 * "License"); you may not use this file except in compliance
-009 * with the License.  You may obtain a copy of the License at
-010 *
-011 *     http://www.apache.org/licenses/LICENSE-2.0
-012 *
-013 * Unless required by applicable law or agreed to in writing, software
-014 * distributed under the License is distributed on an "AS IS" BASIS,
-015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-016 * See the License for the specific language governing permissions and
-017 * limitations under the License.
-018 */
-019package org.apache.hadoop.hbase.replication.regionserver;
-020
-021import java.io.IOException;
-022import java.util.List;
-023import java.util.Map;
-024import java.util.concurrent.PriorityBlockingQueue;
-025import java.util.concurrent.TimeUnit;
-026
-027import org.apache.commons.logging.Log;
-028import org.apache.commons.logging.LogFactory;
-029import org.apache.hadoop.conf.Configuration;
-030import org.apache.hadoop.fs.Path;
-031import org.apache.hadoop.hbase.Cell;
-032import org.apache.hadoop.hbase.CellUtil;
-033import org.apache.hadoop.hbase.MetaTableAccessor;
-034import org.apache.hadoop.hbase.classification.InterfaceAudience;
-035import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-036import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
-038import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
-039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
-040import org.apache.hadoop.hbase.util.Bytes;
-041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-042import org.apache.hadoop.hbase.util.Threads;
-043import org.apache.hadoop.hbase.wal.WAL.Entry;
-044
-045import com.google.common.cache.CacheBuilder;
-046import com.google.common.cache.CacheLoader;
-047import com.google.common.cache.LoadingCache;
-048
-049/**
-050 * This thread reads entries from a queue and ships them. Entries are placed onto the queue by
-051 * ReplicationSourceWALReaderThread
-052 */
-053@InterfaceAudience.Private
-054public class ReplicationSourceShipperThread extends Thread {
-055  private static final Log LOG = LogFactory.getLog(ReplicationSourceShipperThread.class);
-056
-057  // Hold the state of a replication worker thread
-058  public enum WorkerState {
-059    RUNNING,
-060    STOPPED,
-061    FINISHED,  // The worker is done processing a recovered queue
-062  }
-063
-064  protected final Configuration conf;
-065  protected final String walGroupId;
-066  protected final PriorityBlockingQueue<Path> queue;
-067  protected final ReplicationSourceInterface source;
-068
-069  // Last position in the log that we sent to ZooKeeper
-070  protected long lastLoggedPosition = -1;
-071  // Path of the current log
-072  protected volatile Path currentPath;
-073  // Current state of the worker thread
-074  private WorkerState state;
-075  protected ReplicationSourceWALReaderThread entryReader;
-076
-077  // How long should we sleep for each retry
-078  protected final long sleepForRetries;
-079  // Maximum number of retries before taking bold actions
-080  protected final int maxRetriesMultiplier;
-081
-082  // Use guava cache to set ttl for each key
-083  private final LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
-084      .expireAfterAccess(1, TimeUnit.DAYS).build(
-085      new CacheLoader<String, Boolean>() {
-086        @Override
-087        public Boolean load(String key) throws Exception {
-088          return false;
-089        }
-090      }
-091  );
-092
-093  public ReplicationSourceShipperThread(Configuration conf, String walGroupId,
-094      PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
-095    this.conf = conf;
-096    this.walGroupId = walGroupId;
-097    this.queue = queue;
-098    this.source = source;
-099    this.sleepForRetries =
-100        this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
-101    this.maxRetriesMultiplier =
-102        this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
-103  }
-104
-105  @Override
-106  public void run() {
-107    setWorkerState(WorkerState.RUNNING);
-108    // Loop until we close down
-109    while (isActive()) {
-110      int sleepMultiplier = 1;
-111      // Sleep until replication is enabled again
-112      if (!source.isPeerEnabled()) {
-113        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
-114          sleepMultiplier++;
-115        }
-116        continue;
-117      }
-118
-119      while (entryReader == null) {
-120        if (sleepForRetries("Replication WAL entry reader thread not initialized",
-121          sleepMultiplier)) {
-122          sleepMultiplier++;
-123        }
-124      }
-125
-126      try {
-127        WALEntryBatch entryBatch = entryReader.take();
-128        for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) {
-129          waitingUntilCanPush(entry);
-130        }
-131        shipEdits(entryBatch);
-132      } catch (InterruptedException e) {
-133        LOG.trace("Interrupted while waiting for next replication entry batch", e);
-134        Thread.currentThread().interrupt();
-135      }
-136    }
-137    // If the worker exits run loop without finishing its task, mark it as stopped.
-138    if (state != WorkerState.FINISHED) {
-139      setWorkerState(WorkerState.STOPPED);
-140    }
-141  }
-142
-143  /**
-144   * Do the shipping logic
-145   */
-146  protected void shipEdits(WALEntryBatch entryBatch) {
-147    List<Entry> entries = entryBatch.getWalEntries();
-148    long lastReadPosition = entryBatch.getLastWalPosition();
-149    currentPath = entryBatch.getLastWalPath();
-150    int sleepMultiplier = 0;
-151    if (entries.isEmpty()) {
-152      if (lastLoggedPosition != lastReadPosition) {
-153        // Save positions to meta table before zk.
-154        updateSerialRepPositions(entryBatch.getLastSeqIds());
-155        updateLogPosition(lastReadPosition);
-156        // if there was nothing to ship and it's not an error
-157        // set "ageOfLastShippedOp" to <now> to indicate that we're current
-158        source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
-159          walGroupId);
-160      }
-161      return;
-162    }
-163    int currentSize = (int) entryBatch.getHeapSize();
-164    while (isActive()) {
-165      try {
-166        try {
-167          source.tryThrottle(currentSize);
-168        } catch (InterruptedException e) {
-169          LOG.debug("Interrupted while sleeping for throttling control");
-170          Thread.currentThread().interrupt();
-171          // current thread might be interrupted to terminate
-172          // directly go back to while() for confirm this
-173          continue;
-174        }
-175
-176        // create replicateContext here, so the entries can be GC'd upon return from this call
-177        // stack
-178        ReplicationEndpoint.ReplicateContext replicateContext =
-179            new ReplicationEndpoint.ReplicateContext();
-180        replicateContext.setEntries(entries).setSize(currentSize);
-181        replicateContext.setWalGroupId(walGroupId);
-182
-183        long startTimeNs = System.nanoTime();
-184        // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
-185        boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
-186        long endTimeNs = System.nanoTime();
-187
-188        if (!replicated) {
-189          continue;
-190        } else {
-191          sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
-192        }
-193
-194        if (this.lastLoggedPosition != lastReadPosition) {
-195          //Clean up hfile references
-196          int size = entries.size();
-197          for (int i = 0; i < size; i++) {
-198            cleanUpHFileRefs(entries.get(i).getEdit());
-199          }
-200
-201          // Save positions to meta table before zk.
-202          updateSerialRepPositions(entryBatch.getLastSeqIds());
-203          //Log and clean up WAL logs
-204          updateLogPosition(lastReadPosition);
-205        }
-206
-207        source.postShipEdits(entries, currentSize);
-208        // FIXME check relationship between wal group and overall
-209        source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
-210          entryBatch.getNbHFiles());
-211        source.getSourceMetrics().setAgeOfLastShippedOp(
-212          entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
-213        if (LOG.isTraceEnabled()) {
-214          LOG.trace("Replicated " + entries.size() + " entries or " + entryBatch.getNbOperations()
-215              + " operations in " + ((endTimeNs - startTimeNs) / 1000000) + " ms");
-216        }
-217        break;
-218      } catch (Exception ex) {
-219        LOG.warn(source.getReplicationEndpoint().getClass().getName() + " threw unknown exception:"
-220            + org.apache.hadoop.util.StringUtils.stringifyException(ex));
-221        if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
-222          sleepMultiplier++;
-223        }
-224      }
-225    }
-226  }
-227
-228  private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
-229    String key = entry.getKey();
-230    long seq = entry.getValue();
-231    boolean deleteKey = false;
-232    if (seq <= 0) {
-233      // There is a REGION_CLOSE marker, we can not continue skipping after this entry.
-234      deleteKey = true;
-235      seq = -seq;
-236    }
-237
-238    if (!canSkipWaitingSet.getUnchecked(key)) {
-239      try {
-240        source.getSourceManager().waitUntilCanBePushed(Bytes.toBytes(key), seq, source.getPeerId());
-241      } catch (IOException e) {
-242        LOG.error("waitUntilCanBePushed fail", e);
-243        throw new RuntimeException("waitUntilCanBePushed fail");
-244      } catch (InterruptedException e) {
-245        LOG.warn("waitUntilCanBePushed interrupted", e);
-246        Thread.currentThread().interrupt();
-247      }
-248      canSkipWaitingSet.put(key, true);
-249    }
-250    if (deleteKey) {
-251      canSkipWaitingSet.invalidate(key);
-252    }
-253  }
-254
-255  private void cleanUpHFileRefs(WALEdit edit) throws IOException {
-256    String peerId = source.getPeerId();
-257    if (peerId.contains("-")) {
-258      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
-259      // A peerId will not have "-" in its name, see HBASE-11394
-260      peerId = peerId.split("-")[0];
-261    }
-262    List<Cell> cells = edit.getCells();
-263    int totalCells = cells.size();
-264    for (int i = 0; i < totalCells; i++) {
-265      Cell cell = cells.get(i);
-266      if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
-267        BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
-268        List<StoreDescriptor> stores = bld.getStoresList();
-269        int totalStores = stores.size();
-270        for (int j = 0; j < totalStores; j++) {
-271          List<String> storeFileList = stores.get(j).getStoreFileList();
-272          source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
-273          source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
-274        }
-275      }
-276    }
-277  }
-278
-279  protected void updateLogPosition(long lastReadPosition) {
-280    source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
-281      lastReadPosition, false, false);
-282    lastLoggedPosition = lastReadPosition;
-283  }
-284
-285  private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) {
-286    try {
-287      MetaTableAccessor.updateReplicationPositions(source.getSourceManager().getConnection(),
-288        source.getPeerId(), lastPositionsForSerialScope);
-289    } catch (IOException e) {
-290      LOG.error("updateReplicationPositions fail", e);
-291      throw new RuntimeException("updateReplicationPositions fail");
-292    }
-293  }
-294
-295  public void startup(UncaughtExceptionHandler handler) {
-296    String name = Thread.currentThread().getName();
-297    Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + ","
-298        + source.getPeerClusterZnode(), handler);
-299  }
-300
-301  public PriorityBlockingQueue<Path> getLogQueue() {
-302    return this.queue;
-303  }
-304
-305  public Path getCurrentPath() {
-306    return this.entryReader.getCurrentPath();
-307  }
-308
-309  public long getCurrentPosition() {
-310    return this.lastLoggedPosition;
-311  }
-312
-313  public void setWALReader(ReplicationSourceWALReaderThread entryReader) {
-314    this.entryReader = entryReader;
-315  }
-316
-317  public long getStartPosition() {
-318    return 0;
-319  }
-320
-321  protected boolean isActive() {
-322    return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
-323  }
-324
-325  public void setWorkerState(WorkerState state) {
-326    this.state = state;
-327  }
-328
-329  public WorkerState getWorkerState() {
-330    return state;
-331  }
-332
-333  public void stopWorker() {
-334    setWorkerState(WorkerState.STOPPED);
-335  }
-336
-337  public boolean isFinished() {
-338    return state == WorkerState.FINISHED;
-339  }
-340
-341  /**
-342   * Do the sleeping logic
-343   * @param msg Why we sleep
-344   * @param sleepMultiplier by how many times the default sleeping time is augmented
-345   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
-346   */
-347  public boolean sleepForRetries(String msg, int sleepMultiplier) {
-348    try {
-349      if (LOG.isTraceEnabled()) {
-350        LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
-351      }
-352      Thread.sleep(this.sleepForRetries * sleepMultiplier);
-353    } catch (InterruptedException e) {
-354      LOG.debug("Interrupted while sleeping between retries");
-355      Thread.currentThread().interrupt();
-356    }
-357    return sleepMultiplier < maxRetriesMultiplier;
-358  }
-359}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- -