hbase-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Qianxi Zhang (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (HBASE-11302) ReplicationSourceManager is not thread safe
Date Fri, 06 Jun 2014 11:57:02 GMT

     [ https://issues.apache.org/jira/browse/HBASE-11302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Qianxi Zhang updated HBASE-11302:
---------------------------------

    Affects Version/s: 0.98.2

> ReplicationSourceManager is not thread safe
> -------------------------------------------
>
>                 Key: HBASE-11302
>                 URL: https://issues.apache.org/jira/browse/HBASE-11302
>             Project: HBase
>          Issue Type: Bug
>          Components: Replication
>    Affects Versions: 0.99.0, 0.98.2
>            Reporter: Qianxi Zhang
>         Attachments: HBase-11302-0.99.diff
>
>
> In ReplicationSourceManager, sources is used to record the peers. It could be removed
in removePeer method, and read in preLogRoll method. it is not thread safe.
> ReplicationSourceManager#296
> {code}
>   void preLogRoll(Path newLog) throws IOException {
>     synchronized (this.hlogsById) {
>       String name = newLog.getName();
>       for (ReplicationSourceInterface source : this.sources) {
>         try {
>           this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
>         } catch (ReplicationException e) {
>           throw new IOException("Cannot add log to replication queue with id="
>               + source.getPeerClusterZnode() + ", filename=" + name, e);
>         }
>       }
>       for (SortedSet<String> hlogs : this.hlogsById.values()) {
>         if (this.sources.isEmpty()) {
>           // If there's no slaves, don't need to keep the old hlogs since
>           // we only consider the last one when a new slave comes in
>           hlogs.clear();
>         }
>         hlogs.add(name);
>       }
>     }
>     this.latestPath = newLog;
>   }
> {code}
> ReplicationSourceManager#392
> {code}
>   public void removePeer(String id) {
>     LOG.info("Closing the following queue " + id + ", currently have "
>         + sources.size() + " and another "
>         + oldsources.size() + " that were recovered");
>     String terminateMessage = "Replication stream was removed by a user";
>     ReplicationSourceInterface srcToRemove = null;
>     List<ReplicationSourceInterface> oldSourcesToDelete =
>         new ArrayList<ReplicationSourceInterface>();
>     // First close all the recovered sources for this peer
>     for (ReplicationSourceInterface src : oldsources) {
>       if (id.equals(src.getPeerClusterId())) {
>         oldSourcesToDelete.add(src);
>       }
>     }
>     for (ReplicationSourceInterface src : oldSourcesToDelete) {
>       src.terminate(terminateMessage);
>       closeRecoveredQueue((src));
>     }
>     LOG.info("Number of deleted recovered sources for " + id + ": "
>         + oldSourcesToDelete.size());
>     // Now look for the one on this cluster
>     for (ReplicationSourceInterface src : this.sources) {
>       if (id.equals(src.getPeerClusterId())) {
>         srcToRemove = src;
>         break;
>       }
>     }
>     if (srcToRemove == null) {
>       LOG.error("The queue we wanted to close is missing " + id);
>       return;
>     }
>     srcToRemove.terminate(terminateMessage);
>     this.sources.remove(srcToRemove);
>     deleteSource(id, true);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message