accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [3/3] git commit: Merge branch '1.6.1-SNAPSHOT'
Date Wed, 25 Jun 2014 20:55:26 GMT
Merge branch '1.6.1-SNAPSHOT'

Merging ACCUMULO-2950, and applying changes to replication services.

Conflicts:
	server/base/pom.xml
	server/master/src/main/java/org/apache/accumulo/master/Master.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4f139138
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4f139138
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4f139138

Branch: refs/heads/master
Commit: 4f139138ac648ca63598667cbc0f22e9fff4a428
Parents: 35f618e 8bc8d4e
Author: Christopher Tubbs <ctubbsii@apache.org>
Authored: Wed Jun 25 16:43:09 2014 -0400
Committer: Christopher Tubbs <ctubbsii@apache.org>
Committed: Wed Jun 25 16:43:09 2014 -0400

----------------------------------------------------------------------
 .../java/org/apache/accumulo/proxy/Proxy.java   |  4 +-
 server/base/pom.xml                             |  1 +
 .../apache/accumulo/server/util/RpcWrapper.java | 62 ++++++++++++++++++
 .../accumulo/gc/SimpleGarbageCollector.java     |  4 +-
 .../java/org/apache/accumulo/master/Master.java |  6 +-
 .../apache/accumulo/tserver/TabletServer.java   |  7 +-
 .../thrift/RpcClientInvocationHandler.java      | 54 ++++++++++++++++
 .../thrift/RpcServerInvocationHandler.java      | 53 +++++++++++++++
 .../trace/instrument/thrift/TraceWrap.java      | 68 ++++----------------
 9 files changed, 195 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f139138/server/base/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f139138/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f139138/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java
index 113511a,c367ae0..85012d4
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@@ -984,9 -973,7 +984,9 @@@ public class Master implements LiveTSer
        throw new IOException(e);
      }
  
 +    ZooKeeperInitialization.ensureZooKeeperInitialized(zReaderWriter, zroot);
 +
-     Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(new
MasterClientServiceHandler(this)));
+     Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(new
MasterClientServiceHandler(this)));
      ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT,
processor, "Master",
          "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK,
Property.GENERAL_MAX_MESSAGE_SIZE);
      clientService = sa.server;
@@@ -997,33 -984,6 +997,33 @@@
      while (!clientService.isServing()) {
        UtilWaitThread.sleep(100);
      }
 +
 +    // Start the daemon to scan the replication table and make units of work
 +    replicationWorkDriver = new ReplicationDriver(this);
 +    replicationWorkDriver.start();
 +
 +    // Start the daemon to assign work to tservers to replicate to our peers
 +    try {
 +      replicationWorkAssigner = new WorkDriver(this, getConnector());
 +    } catch (AccumuloException | AccumuloSecurityException e) {
 +      log.error("Caught exception trying to initialize replication WorkDriver", e);
 +      throw new RuntimeException(e);
 +    }
 +    replicationWorkAssigner.start();
 +
 +    // Start the replication coordinator which assigns tservers to service replication requests
 +    ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> replicationCoordinatorProcessor
= new ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>(
-         TraceWrap.service(new MasterReplicationCoordinator(this)));
++        RpcWrapper.service(new MasterReplicationCoordinator(this)));
 +    ServerAddress replAddress = TServerUtils.startServer(getSystemConfiguration(), hostname,
Property.MASTER_REPLICATION_COORDINATOR_PORT,
 +        replicationCoordinatorProcessor, "Master Replication Coordinator", "Replication
Coordinator", null, Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS,
 +        Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
 +
 +    log.info("Started replication coordinator service at " + replAddress.address);
 +
 +    // Advertise that port we used so peers don't have to be told what it is
 +    ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR,
 +        replAddress.address.toString().getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE);
 +
      while (clientService.isServing()) {
        UtilWaitThread.sleep(500);
      }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f139138/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 9e8af0a,57415bd..9f354cd
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -181,9 -193,18 +182,8 @@@ import org.apache.accumulo.start.classl
  import org.apache.accumulo.start.classloader.vfs.ContextManager;
  import org.apache.accumulo.trace.instrument.Span;
  import org.apache.accumulo.trace.instrument.Trace;
- import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
  import org.apache.accumulo.trace.thrift.TInfo;
 -import org.apache.accumulo.tserver.Compactor.CompactionInfo;
  import org.apache.accumulo.tserver.RowLocks.RowLock;
 -import org.apache.accumulo.tserver.Tablet.CommitSession;
 -import org.apache.accumulo.tserver.Tablet.KVEntry;
 -import org.apache.accumulo.tserver.Tablet.LookupResult;
 -import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
 -import org.apache.accumulo.tserver.Tablet.ScanBatch;
 -import org.apache.accumulo.tserver.Tablet.Scanner;
 -import org.apache.accumulo.tserver.Tablet.SplitInfo;
 -import org.apache.accumulo.tserver.Tablet.TConstraintViolationException;
 -import org.apache.accumulo.tserver.Tablet.TabletClosedException;
  import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
  import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
  import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
@@@ -2254,30 -3092,7 +2254,30 @@@ public class TabletServer implements Ru
      return address;
    }
  
 -  ZooLock getLock() {
 +  private HostAndPort startReplicationService() throws UnknownHostException {
-     ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(HdfsZooInstance.getInstance()));
++    ReplicationServicer.Iface repl = RpcWrapper.service(new ReplicationServicerHandler(HdfsZooInstance.getInstance()));
 +    ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl);
 +    AccumuloConfiguration conf = getSystemConfiguration();
 +    Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null
? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
 +    ServerAddress sp = TServerUtils.startServer(conf, clientAddress.getHostText(), Property.REPLICATION_RECEIPT_SERVICE_PORT,
processor,
 +        "ReplicationServicerHandler", "Replication Servicer", null, Property.REPLICATION_MIN_THREADS,
Property.REPLICATION_THREADCHECK, maxMessageSizeProperty);
 +    this.replServer = sp.server;
 +    log.info("Started replication service on " + sp.address);
 +
 +    try {
 +      // The replication service is unique to the thrift service for a tserver, not just
a host.
 +      // Advertise the host and port for replication service given the host and port for
the tserver.
 +      ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance()) + ReplicationConstants.ZOO_TSERVERS
+ "/" + clientAddress.toString(),
 +          sp.address.toString().getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE);
 +    } catch (Exception e) {
 +      log.error("Could not advertise replication service port", e);
 +      throw new RuntimeException(e);
 +    }
 +
 +    return sp.address;
 +  }
 +
 +  public ZooLock getLock() {
      return tabletServerLock;
    }
  
@@@ -2338,6 -3153,6 +2338,7 @@@
    }
  
    // main loop listens for client requests
++  @Override
    public void run() {
      SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration());
  


Mime
View raw message