hbase-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ashu Pachauri (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (HBASE-18027) Replication should respect RPC size limits when batching edits
Date Fri, 26 May 2017 07:51:05 GMT

    [ https://issues.apache.org/jira/browse/HBASE-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16025944#comment-16025944
] 

Ashu Pachauri commented on HBASE-18027:
---------------------------------------

bq. Instead we try to avoid creating an overlarge RPC by setting the replication queue capacity
limit to the lesser of replication.source.size.capacity or 95% of the RPC size limit.

I think this does not solve the underlying problem and can still result in stuck replication
due to requests being too large. I was also reading through the rest of the discussion here.
I think handling rpc size limit inside ReplicationSource does not really make a lot of sense.
This is because the endpoint partitions the batch before making the rpc. So, it makes sense
for the endpoint to make the rpc size enforcement. Changing the batching strategy in the endpoint
actually gives us two benefits:
1. The purpose of this jira, i.e. enforcing rpc size limit.
2. Replication perf is currently plagued by the fact that, in a batch, there is never more
than a single thread shipping edits for a region. So, if a region is receiving heavy traffic
on the source cluster, replication performs very poorly.

As for the implementation, I think you don't need to maintain a lot of state in the endpoint
to enforce rpc limit. All that needs to be done is to partition a single batch into multiple
batches if it exceeds the rpc limit. I think something like the following would work:

{code}
private List<List<Entry>> createBatches(List<Entry> entries) {
    int maxBatchSize =
        (int)(0.95 * conf.getInt(RpcServer.MAX_REQUEST_SIZE, RpcServer.DEFAULT_MAX_REQUEST_SIZE));
    int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1);
    int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
    // Maintains the current batch for a given partition index
    Map<Integer, List<Entry>> entryMap = new HashMap<>(n);
    List<List<Entry>> entryLists = new ArrayList<>();
    int[] sizes = new int[n];

    for (int i = 0; i < n; i++) {
      entryMap.put(i, new ArrayList<Entry>(entries.size()/n+1));
    }

    for (Entry e: entries) {
      int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n);
      int entrySize = estimatedSize(e);
      // If this batch is oversized, add it to final list and initialize a new empty batch
      if (sizes[index] + entrySize > maxBatchSize) {
        entryLists.add(entryMap.get(index));
        entryMap.put(index, new ArrayList<Entry>());
        sizes[index] = 0;
      }
      entryMap.get(index).add(e);
      sizes[index] += entrySize;
    }

    entryLists.addAll(entryMap.values());
    return entryLists;
  }
{code}


> Replication should respect RPC size limits when batching edits
> --------------------------------------------------------------
>
>                 Key: HBASE-18027
>                 URL: https://issues.apache.org/jira/browse/HBASE-18027
>             Project: HBase
>          Issue Type: Bug
>          Components: Replication
>    Affects Versions: 2.0.0, 1.4.0, 1.3.1
>            Reporter: Andrew Purtell
>            Assignee: Andrew Purtell
>             Fix For: 2.0.0, 1.4.0, 1.3.2
>
>         Attachments: HBASE-18027-branch-1.patch, HBASE-18027.patch, HBASE-18027.patch,
HBASE-18027.patch, HBASE-18027.patch, HBASE-18027.patch
>
>
> In HBaseInterClusterReplicationEndpoint#replicate we try to replicate in batches. We
create N lists. N is the minimum of configured replicator threads, number of 100-waledit batches,
or number of current sinks. Every pending entry in the replication context is then placed
in order by hash of encoded region name into one of these N lists. Each of the N lists is
then sent all at once in one replication RPC. We do not test if the sum of data in each N
list will exceed RPC size limits. This code presumes each individual edit is reasonably small.
Not checking for aggregate size while assembling the lists into RPCs is an oversight and can
lead to replication failure when that assumption is violated.
> We can fix this by generating as many replication RPC calls as we need to drain a list,
keeping each RPC under limit, instead of assuming the whole list will fit in one.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message