Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7257C1181E for ; Wed, 25 Jun 2014 20:55:25 +0000 (UTC) Received: (qmail 98020 invoked by uid 500); 25 Jun 2014 20:55:25 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 97916 invoked by uid 500); 25 Jun 2014 20:55:25 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 97821 invoked by uid 99); 25 Jun 2014 20:55:25 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Jun 2014 20:55:25 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 1247598B85D; Wed, 25 Jun 2014 20:55:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Wed, 25 Jun 2014 20:55:26 -0000 Message-Id: <84624d5e7ed04c55aa517b0fdb622cf3@git.apache.org> In-Reply-To: <2e2e4e61eeaa4960bda07362087d3a20@git.apache.org> References: <2e2e4e61eeaa4960bda07362087d3a20@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] git commit: Merge branch '1.6.1-SNAPSHOT' Merge branch '1.6.1-SNAPSHOT' Merging ACCUMULO-2950, and applying changes to replication services. Conflicts: server/base/pom.xml server/master/src/main/java/org/apache/accumulo/master/Master.java Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4f139138 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4f139138 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4f139138 Branch: refs/heads/master Commit: 4f139138ac648ca63598667cbc0f22e9fff4a428 Parents: 35f618e 8bc8d4e Author: Christopher Tubbs Authored: Wed Jun 25 16:43:09 2014 -0400 Committer: Christopher Tubbs Committed: Wed Jun 25 16:43:09 2014 -0400 ---------------------------------------------------------------------- .../java/org/apache/accumulo/proxy/Proxy.java | 4 +- server/base/pom.xml | 1 + .../apache/accumulo/server/util/RpcWrapper.java | 62 ++++++++++++++++++ .../accumulo/gc/SimpleGarbageCollector.java | 4 +- .../java/org/apache/accumulo/master/Master.java | 6 +- .../apache/accumulo/tserver/TabletServer.java | 7 +- .../thrift/RpcClientInvocationHandler.java | 54 ++++++++++++++++ .../thrift/RpcServerInvocationHandler.java | 53 +++++++++++++++ .../trace/instrument/thrift/TraceWrap.java | 68 ++++---------------- 9 files changed, 195 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f139138/server/base/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f139138/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f139138/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java index 113511a,c367ae0..85012d4 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@@ -984,9 -973,7 +984,9 @@@ public class Master implements LiveTSer throw new IOException(e); } + ZooKeeperInitialization.ensureZooKeeperInitialized(zReaderWriter, zroot); + - Processor processor = new Processor(TraceWrap.service(new MasterClientServiceHandler(this))); + Processor processor = new Processor(RpcWrapper.service(new MasterClientServiceHandler(this))); ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); clientService = sa.server; @@@ -997,33 -984,6 +997,33 @@@ while (!clientService.isServing()) { UtilWaitThread.sleep(100); } + + // Start the daemon to scan the replication table and make units of work + replicationWorkDriver = new ReplicationDriver(this); + replicationWorkDriver.start(); + + // Start the daemon to assign work to tservers to replicate to our peers + try { + replicationWorkAssigner = new WorkDriver(this, getConnector()); + } catch (AccumuloException | AccumuloSecurityException e) { + log.error("Caught exception trying to initialize replication WorkDriver", e); + throw new RuntimeException(e); + } + replicationWorkAssigner.start(); + + // Start the replication coordinator which assigns tservers to service replication requests + ReplicationCoordinator.Processor replicationCoordinatorProcessor = new ReplicationCoordinator.Processor( - TraceWrap.service(new MasterReplicationCoordinator(this))); ++ RpcWrapper.service(new MasterReplicationCoordinator(this))); + ServerAddress replAddress = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_REPLICATION_COORDINATOR_PORT, + replicationCoordinatorProcessor, "Master Replication Coordinator", "Replication Coordinator", null, Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS, + Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); + + log.info("Started replication coordinator service at " + replAddress.address); + + // Advertise that port we used so peers don't have to be told what it is + ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR, + replAddress.address.toString().getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE); + while (clientService.isServing()) { UtilWaitThread.sleep(500); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f139138/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 9e8af0a,57415bd..9f354cd --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -181,9 -193,18 +182,8 @@@ import org.apache.accumulo.start.classl import org.apache.accumulo.start.classloader.vfs.ContextManager; import org.apache.accumulo.trace.instrument.Span; import org.apache.accumulo.trace.instrument.Trace; - import org.apache.accumulo.trace.instrument.thrift.TraceWrap; import org.apache.accumulo.trace.thrift.TInfo; -import org.apache.accumulo.tserver.Compactor.CompactionInfo; import org.apache.accumulo.tserver.RowLocks.RowLock; -import org.apache.accumulo.tserver.Tablet.CommitSession; -import org.apache.accumulo.tserver.Tablet.KVEntry; -import org.apache.accumulo.tserver.Tablet.LookupResult; -import org.apache.accumulo.tserver.Tablet.MinorCompactionReason; -import org.apache.accumulo.tserver.Tablet.ScanBatch; -import org.apache.accumulo.tserver.Tablet.Scanner; -import org.apache.accumulo.tserver.Tablet.SplitInfo; -import org.apache.accumulo.tserver.Tablet.TConstraintViolationException; -import org.apache.accumulo.tserver.Tablet.TabletClosedException; import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager; import org.apache.accumulo.tserver.TabletStatsKeeper.Operation; import org.apache.accumulo.tserver.compaction.MajorCompactionReason; @@@ -2254,30 -3092,7 +2254,30 @@@ public class TabletServer implements Ru return address; } - ZooLock getLock() { + private HostAndPort startReplicationService() throws UnknownHostException { - ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(HdfsZooInstance.getInstance())); ++ ReplicationServicer.Iface repl = RpcWrapper.service(new ReplicationServicerHandler(HdfsZooInstance.getInstance())); + ReplicationServicer.Processor processor = new ReplicationServicer.Processor(repl); + AccumuloConfiguration conf = getSystemConfiguration(); + Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); + ServerAddress sp = TServerUtils.startServer(conf, clientAddress.getHostText(), Property.REPLICATION_RECEIPT_SERVICE_PORT, processor, + "ReplicationServicerHandler", "Replication Servicer", null, Property.REPLICATION_MIN_THREADS, Property.REPLICATION_THREADCHECK, maxMessageSizeProperty); + this.replServer = sp.server; + log.info("Started replication service on " + sp.address); + + try { + // The replication service is unique to the thrift service for a tserver, not just a host. + // Advertise the host and port for replication service given the host and port for the tserver. + ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance()) + ReplicationConstants.ZOO_TSERVERS + "/" + clientAddress.toString(), + sp.address.toString().getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE); + } catch (Exception e) { + log.error("Could not advertise replication service port", e); + throw new RuntimeException(e); + } + + return sp.address; + } + + public ZooLock getLock() { return tabletServerLock; } @@@ -2338,6 -3153,6 +2338,7 @@@ } // main loop listens for client requests ++ @Override public void run() { SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration());