From commits-return-22477-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Mon Jan 7 22:53:51 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 7C831180647 for ; Mon, 7 Jan 2019 22:53:50 +0100 (CET) Received: (qmail 8015 invoked by uid 500); 7 Jan 2019 21:53:49 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 8006 invoked by uid 99); 7 Jan 2019 21:53:49 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Jan 2019 21:53:49 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 0BC1B85BAF; Mon, 7 Jan 2019 21:53:49 +0000 (UTC) Date: Mon, 07 Jan 2019 21:53:48 +0000 To: "commits@accumulo.apache.org" Subject: [accumulo] branch master updated: Make replication services start when configured (#868) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154689802893.13590.16577355416141023578@gitbox.apache.org> From: mmiller@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: accumulo X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: d59f3d68943430abc54bd0ca33774d4e996eca22 X-Git-Newrev: 91988183d0649e9fd09901e2c5905e9f7a678368 X-Git-Rev: 91988183d0649e9fd09901e2c5905e9f7a678368 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/master by this push: new 9198818 Make replication services start when configured (#868) 9198818 is described below commit 91988183d0649e9fd09901e2c5905e9f7a678368 Author: Mike Miller AuthorDate: Mon Jan 7 16:53:44 2019 -0500 Make replication services start when configured (#868) * Replication services in tserver and master will now only start when replication.name is set --- .../java/org/apache/accumulo/master/Master.java | 103 +++++++++++++-------- .../org/apache/accumulo/tserver/TabletServer.java | 66 +++++++------ .../replication/MultiTserverReplicationIT.java | 3 + 3 files changed, 102 insertions(+), 70 deletions(-) diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index 1bffe6b..cc1d800 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; import java.io.IOException; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -39,6 +40,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloClient; @@ -1261,22 +1263,6 @@ public class Master clientService = sa.server; log.info("Started Master client service at {}", sa.address); - // Start the replication coordinator which assigns tservers to service replication requests - MasterReplicationCoordinator impl = new MasterReplicationCoordinator(this); - ReplicationCoordinator.Iface haReplicationProxy = HighlyAvailableServiceWrapper.service(impl, - this); - // @formatter:off - ReplicationCoordinator.Processor replicationCoordinatorProcessor = - new ReplicationCoordinator.Processor<>(TraceWrap.service(haReplicationProxy)); - // @formatter:on - ServerAddress replAddress = TServerUtils.startServer(context, hostname, - Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor, - "Master Replication Coordinator", "Replication Coordinator", null, - Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS, - Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); - - log.info("Started replication coordinator service at " + replAddress.address); - // block until we can obtain the ZK lock for the master getMasterLock(zroot + Constants.ZMASTER_LOCK); @@ -1391,27 +1377,20 @@ public class Master sleepUninterruptibly(100, TimeUnit.MILLISECONDS); } - // Start the daemon to scan the replication table and make units of work - replicationWorkDriver = new ReplicationDriver(this); - replicationWorkDriver.start(); - - // Start the daemon to assign work to tservers to replicate to our peers - replicationWorkAssigner = new WorkDriver(this); - replicationWorkAssigner.start(); - - // Advertise that port we used so peers don't have to be told what it is - context.getZooReaderWriter().putPersistentData( - getZooKeeperRoot() + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR, - replAddress.address.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); - - // Register replication metrics - MasterMetricsFactory factory = new MasterMetricsFactory(getConfiguration(), this); - Metrics replicationMetrics = factory.createReplicationMetrics(); - try { - replicationMetrics.register(); - } catch (Exception e) { - log.error("Failed to register replication metrics", e); - } + // if the replication name is ever set, then start replication services + final AtomicReference replServer = new AtomicReference<>(); + SimpleTimer.getInstance(getConfiguration()).schedule(() -> { + try { + if (replServer.get() == null) { + if (!getConfiguration().get(Property.REPLICATION_NAME).isEmpty()) { + log.info(Property.REPLICATION_NAME.getKey() + " was set, starting repl services."); + replServer.set(setupReplication()); + } + } + } catch (UnknownHostException | KeeperException | InterruptedException e) { + log.error("Error occurred starting replication services. ", e); + } + }, 0, 5000); // The master is fully initialized. Clients are allowed to connect now. masterInitialized.set(true); @@ -1427,9 +1406,12 @@ public class Master final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME; statusThread.join(remaining(deadline)); - replicationWorkAssigner.join(remaining(deadline)); - replicationWorkDriver.join(remaining(deadline)); - replAddress.server.stop(); + if (replicationWorkAssigner != null) + replicationWorkAssigner.join(remaining(deadline)); + if (replicationWorkDriver != null) + replicationWorkDriver.join(remaining(deadline)); + TServerUtils.stopTServer(replServer.get()); + // Signal that we want it to stop, and wait for it to do so. if (authenticationTokenKeyManager != null) { authenticationTokenKeyManager.gracefulStop(); @@ -1444,6 +1426,47 @@ public class Master log.info("exiting"); } + private TServer setupReplication() + throws UnknownHostException, KeeperException, InterruptedException { + // Start the replication coordinator which assigns tservers to service replication requests + MasterReplicationCoordinator impl = new MasterReplicationCoordinator(this); + ReplicationCoordinator.Iface haReplicationProxy = HighlyAvailableServiceWrapper.service(impl, + this); + // @formatter:off + ReplicationCoordinator.Processor replicationCoordinatorProcessor = + new ReplicationCoordinator.Processor<>(TraceWrap.service(haReplicationProxy)); + // @formatter:on + ServerAddress replAddress = TServerUtils.startServer(context, hostname, + Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor, + "Master Replication Coordinator", "Replication Coordinator", null, + Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS, + Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); + + log.info("Started replication coordinator service at " + replAddress.address); + // Start the daemon to scan the replication table and make units of work + replicationWorkDriver = new ReplicationDriver(this); + replicationWorkDriver.start(); + + // Start the daemon to assign work to tservers to replicate to our peers + replicationWorkAssigner = new WorkDriver(this); + replicationWorkAssigner.start(); + + // Advertise that port we used so peers don't have to be told what it is + context.getZooReaderWriter().putPersistentData( + getZooKeeperRoot() + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR, + replAddress.address.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); + + // Register replication metrics + MasterMetricsFactory factory = new MasterMetricsFactory(getConfiguration(), this); + Metrics replicationMetrics = factory.createReplicationMetrics(); + try { + replicationMetrics.register(); + } catch (Exception e) { + log.error("Failed to register replication metrics", e); + } + return replAddress.server; + } + private long remaining(long deadline) { return Math.max(1, deadline - System.currentTimeMillis()); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index db69705..a7ddcac 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -341,7 +341,7 @@ public class TabletServer implements Runnable { private ZooLock tabletServerLock; private TServer server; - private TServer replServer; + private volatile TServer replServer; private DistributedWorkQueue bulkFailedCopyQ; @@ -2664,7 +2664,7 @@ public class TabletServer implements Runnable { return address; } - private HostAndPort startReplicationService() throws UnknownHostException { + private void startReplicationService() throws UnknownHostException { final ReplicationServicerHandler handler = new ReplicationServicerHandler(this); ReplicationServicer.Iface rpcProxy = TraceWrap.service(handler); ReplicationServicer.Iface repl = TCredentialsUpdatingWrapper.service(rpcProxy, @@ -2695,8 +2695,6 @@ public class TabletServer implements Runnable { log.error("Could not advertise replication service port", e); throw new RuntimeException(e); } - - return sp.address; } public ZooLock getLock() { @@ -2841,35 +2839,16 @@ public class TabletServer implements Runnable { log.error("Error setting watches for recoveries"); throw new RuntimeException(ex); } - - // Start the thrift service listening for incoming replication requests - try { - startReplicationService(); - } catch (UnknownHostException e) { - throw new RuntimeException("Failed to start replication service", e); - } - - // Start the pool to handle outgoing replications - final ThreadPoolExecutor replicationThreadPool = new SimpleThreadPool( - getConfiguration().getCount(Property.REPLICATION_WORKER_THREADS), "replication task"); - replWorker.setExecutor(replicationThreadPool); - replWorker.run(); - - // Check the configuration value for the size of the pool and, if changed, resize the pool, - // every 5 seconds); final AccumuloConfiguration aconf = getConfiguration(); - Runnable replicationWorkThreadPoolResizer = new Runnable() { - @Override - public void run() { - int maxPoolSize = aconf.getCount(Property.REPLICATION_WORKER_THREADS); - if (replicationThreadPool.getMaximumPoolSize() != maxPoolSize) { - log.info("Resizing thread pool for sending replication work from {} to {}", - replicationThreadPool.getMaximumPoolSize(), maxPoolSize); - replicationThreadPool.setMaximumPoolSize(maxPoolSize); + // if the replication name is ever set, then start replication services + SimpleTimer.getInstance(aconf).schedule(() -> { + if (this.replServer == null) { + if (!getConfiguration().get(Property.REPLICATION_NAME).isEmpty()) { + log.info(Property.REPLICATION_NAME.getKey() + " was set, starting repl services."); + setupReplication(aconf); } } - }; - SimpleTimer.getInstance(aconf).schedule(replicationWorkThreadPoolResizer, 10000, 30000); + }, 0, 5000); final long CLEANUP_BULK_LOADED_CACHE_MILLIS = 15 * 60 * 1000; SimpleTimer.getInstance(aconf).schedule(new BulkImportCacheCleaner(this), @@ -2954,6 +2933,7 @@ public class TabletServer implements Runnable { } log.debug("Stopping Replication Server"); TServerUtils.stopTServer(this.replServer); + log.debug("Stopping Thrift Servers"); TServerUtils.stopTServer(server); @@ -2975,6 +2955,32 @@ public class TabletServer implements Runnable { } } + private void setupReplication(AccumuloConfiguration aconf) { + // Start the thrift service listening for incoming replication requests + try { + startReplicationService(); + } catch (UnknownHostException e) { + throw new RuntimeException("Failed to start replication service", e); + } + + // Start the pool to handle outgoing replications + final ThreadPoolExecutor replicationThreadPool = new SimpleThreadPool( + getConfiguration().getCount(Property.REPLICATION_WORKER_THREADS), "replication task"); + replWorker.setExecutor(replicationThreadPool); + replWorker.run(); + + // Check the configuration value for the size of the pool and, if changed, resize the pool + Runnable replicationWorkThreadPoolResizer = () -> { + int maxPoolSize = aconf.getCount(Property.REPLICATION_WORKER_THREADS); + if (replicationThreadPool.getMaximumPoolSize() != maxPoolSize) { + log.info("Resizing thread pool for sending replication work from {} to {}", + replicationThreadPool.getMaximumPoolSize(), maxPoolSize); + replicationThreadPool.setMaximumPoolSize(maxPoolSize); + } + }; + SimpleTimer.getInstance(aconf).schedule(replicationWorkThreadPoolResizer, 10000, 30000); + } + private static Pair verifyRootTablet(ServerContext context, TServerInstance instance) throws AccumuloException { ZooTabletStateStore store = new ZooTabletStateStore(context); diff --git a/test/src/main/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java b/test/src/main/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java index 9a2e6be..30291ab 100644 --- a/test/src/main/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java @@ -28,6 +28,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.replication.ReplicationConstants; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.HostAndPort; @@ -47,6 +48,8 @@ public class MultiTserverReplicationIT extends ConfigurableMacBase { @Override public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + // set the name to kick off the replication services + cfg.setProperty(Property.REPLICATION_NAME.getKey(), "test"); cfg.setNumTservers(2); }