Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9A44311CB1 for ; Tue, 17 Jun 2014 23:42:44 +0000 (UTC) Received: (qmail 41002 invoked by uid 500); 17 Jun 2014 23:42:44 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 40961 invoked by uid 500); 17 Jun 2014 23:42:44 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 40947 invoked by uid 99); 17 Jun 2014 23:42:44 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jun 2014 23:42:44 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id F343F981C2D; Tue, 17 Jun 2014 23:42:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Message-Id: <4b4345e9691045a28b1bf83227f0b9cf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: ACCUMULO-2918 Pay the penalty of embedding ZK hierarchy manipulation to avoid creating an upgrade path. Date: Tue, 17 Jun 2014 23:42:43 +0000 (UTC) Repository: accumulo Updated Branches: refs/heads/master d2dc998c7 -> 672801c15 ACCUMULO-2918 Pay the penalty of embedding ZK hierarchy manipulation to avoid creating an upgrade path. In a fresh 1.7.0 instance, this isn't an issue because {{accumulo init}} will create the necessary parent nodes in ZooKeeper. However, for upgrades from 1.6 or 1.7 installs that were created before the replication code was merged, the tserver will error out because the parent ZK nodes do not exist (and ZK will not automatically create them). While it's not ideal to embed this logic in the server processes, it's the better alternative to create an upgrade path, because one does not yet exist. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/672801c1 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/672801c1 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/672801c1 Branch: refs/heads/master Commit: 672801c154362a1aa870ae52439e5f9b81f42c04 Parents: d2dc998 Author: Josh Elser Authored: Tue Jun 17 16:25:09 2014 -0700 Committer: Josh Elser Committed: Tue Jun 17 16:25:09 2014 -0700 ---------------------------------------------------------------------- .../replication/ZooKeeperInitialization.java | 46 +++++++++++ .../ZooKeeperInitializationTest.java | 70 ++++++++++++++++ .../java/org/apache/accumulo/master/Master.java | 7 +- .../monitor/servlets/ReplicationServlet.java | 87 +++++++++++--------- .../apache/accumulo/tserver/TabletServer.java | 10 +++ 5 files changed, 178 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/672801c1/server/base/src/main/java/org/apache/accumulo/server/replication/ZooKeeperInitialization.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ZooKeeperInitialization.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ZooKeeperInitialization.java new file mode 100644 index 0000000..9e0a5a6 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ZooKeeperInitialization.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.replication; + +import org.apache.accumulo.core.replication.ReplicationConstants; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.zookeeper.KeeperException; + +/** + * We don't want to introduce an upgrade path to 1.7 only for some new nodes within ZooKeeper + *

+ * We can take the penalty of embedding this logic into the server processes, but alleviate + * users/developers from having to worry about the zookeeper state. + */ +public class ZooKeeperInitialization { + /** + * Ensure that the full path to ZooKeeper nodes that will be used exist + * @param zooReaderWriter + * @param zRoot + * @throws KeeperException + * @throws InterruptedException + */ + public static void ensureZooKeeperInitialized(final ZooReaderWriter zooReaderWriter, final String zRoot) throws KeeperException, InterruptedException { + if (!zooReaderWriter.exists(zRoot + ReplicationConstants.ZOO_TSERVERS, null)) { + zooReaderWriter.mkdirs(zRoot + ReplicationConstants.ZOO_TSERVERS); + } + + if (!zooReaderWriter.exists(zRoot + ReplicationConstants.ZOO_WORK_QUEUE, null)) { + zooReaderWriter.mkdirs(zRoot + ReplicationConstants.ZOO_WORK_QUEUE); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/672801c1/server/base/src/test/java/org/apache/accumulo/server/replication/ZooKeeperInitializationTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/replication/ZooKeeperInitializationTest.java b/server/base/src/test/java/org/apache/accumulo/server/replication/ZooKeeperInitializationTest.java new file mode 100644 index 0000000..25e7bc8 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/replication/ZooKeeperInitializationTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.replication; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; + +import org.apache.accumulo.core.replication.ReplicationConstants; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.junit.Test; + +/** + * + */ +public class ZooKeeperInitializationTest { + + @Test + public void parentNodesAreCreatedWhenMissing() throws Exception { + ZooReaderWriter zReaderWriter = createMock(ZooReaderWriter.class); + String zRoot = "/accumulo"; + + expect(zReaderWriter.exists(zRoot + ReplicationConstants.ZOO_TSERVERS, null)).andReturn(false).once(); + zReaderWriter.mkdirs(zRoot + ReplicationConstants.ZOO_TSERVERS); + expectLastCall().once(); + + expect(zReaderWriter.exists(zRoot + ReplicationConstants.ZOO_WORK_QUEUE, null)).andReturn(false).once(); + zReaderWriter.mkdirs(zRoot + ReplicationConstants.ZOO_WORK_QUEUE); + expectLastCall().once(); + + replay(zReaderWriter); + + ZooKeeperInitialization.ensureZooKeeperInitialized(zReaderWriter, zRoot); + + verify(zReaderWriter); + } + + @Test + public void parentNodesAreNotRecreatedWhenAlreadyExist() throws Exception { + ZooReaderWriter zReaderWriter = createMock(ZooReaderWriter.class); + String zRoot = "/accumulo"; + + expect(zReaderWriter.exists(zRoot + ReplicationConstants.ZOO_TSERVERS, null)).andReturn(true).once(); + + expect(zReaderWriter.exists(zRoot + ReplicationConstants.ZOO_WORK_QUEUE, null)).andReturn(true).once(); + + replay(zReaderWriter); + + ZooKeeperInitialization.ensureZooKeeperInitialized(zReaderWriter, zRoot); + + verify(zReaderWriter); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/672801c1/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index a722008..820db5d 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -106,6 +106,7 @@ import org.apache.accumulo.server.master.state.TabletMigration; import org.apache.accumulo.server.master.state.TabletState; import org.apache.accumulo.server.master.state.ZooStore; import org.apache.accumulo.server.master.state.ZooTabletStateStore; +import org.apache.accumulo.server.replication.ZooKeeperInitialization; import org.apache.accumulo.server.security.AuditedSecurityOperation; import org.apache.accumulo.server.security.SecurityOperation; import org.apache.accumulo.server.security.SystemCredentials; @@ -935,7 +936,9 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt tserverSet.startListeningForTabletServerChanges(); - ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, new Watcher() { + ZooReaderWriter zReaderWriter = ZooReaderWriter.getInstance(); + + zReaderWriter.getChildren(zroot + Constants.ZRECOVERY, new Watcher() { @Override public void process(WatchedEvent event) { nextEvent.event("Noticed recovery changes", event.getType()); @@ -981,6 +984,8 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt throw new IOException(e); } + ZooKeeperInitialization.ensureZooKeeperInitialized(zReaderWriter, zroot); + Processor processor = new Processor(TraceWrap.service(new MasterClientServiceHandler(this))); ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); http://git-wip-us.apache.org/repos/asf/accumulo/blob/672801c1/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java index 2fcd67d..563fb6d 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java @@ -57,6 +57,7 @@ import org.apache.accumulo.server.security.SystemCredentials; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; import org.apache.accumulo.server.zookeeper.ZooCache; import org.apache.hadoop.io.Text; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -210,56 +211,60 @@ public class ReplicationServlet extends BasicServlet { DistributedWorkQueue workQueue = new DistributedWorkQueue(workQueuePath, ServerConfiguration.getSystemConfiguration(inst)); - for (String queueKey : workQueue.getWorkQueued()) { - Entry queueKeyPair = DistributedWorkQueueWorkAssignerHelper.fromQueueKey(queueKey); - String filename = queueKeyPair.getKey(); - ReplicationTarget target = queueKeyPair.getValue(); - - byte[] data = zooCache.get(workQueuePath + "/" + queueKey); - - // We could try to grep over the table, but without knowing the full file path, we - // can't find the status quickly - String status = "Unknown"; - String path = null; - if (null != data) { - path = new String(data); - Scanner s = conn.createScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY); - s.setRange(Range.exact(path)); - s.fetchColumn(WorkSection.NAME, target.toText()); + try { + for (String queueKey : workQueue.getWorkQueued()) { + Entry queueKeyPair = DistributedWorkQueueWorkAssignerHelper.fromQueueKey(queueKey); + String filename = queueKeyPair.getKey(); + ReplicationTarget target = queueKeyPair.getValue(); - // Fetch the work entry for this item - Entry kv = null; - try { - kv = Iterables.getOnlyElement(s); - } catch (NoSuchElementException e) { - log.trace("Could not find status of {} replicating to {}", filename, target); - status = "Unknown"; - } finally { - s.close(); - } + byte[] data = zooCache.get(workQueuePath + "/" + queueKey); - // If we found the work entry for it, try to compute some progress - if (null != kv) { + // We could try to grep over the table, but without knowing the full file path, we + // can't find the status quickly + String status = "Unknown"; + String path = null; + if (null != data) { + path = new String(data); + Scanner s = conn.createScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY); + s.setRange(Range.exact(path)); + s.fetchColumn(WorkSection.NAME, target.toText()); + + // Fetch the work entry for this item + Entry kv = null; try { - Status stat = Status.parseFrom(kv.getValue().get()); - if (StatusUtil.isFullyReplicated(stat)) { - status = "Finished"; - } else { - if (stat.getInfiniteEnd()) { - status = stat.getBegin() + "/∞ records"; + kv = Iterables.getOnlyElement(s); + } catch (NoSuchElementException e) { + log.trace("Could not find status of {} replicating to {}", filename, target); + status = "Unknown"; + } finally { + s.close(); + } + + // If we found the work entry for it, try to compute some progress + if (null != kv) { + try { + Status stat = Status.parseFrom(kv.getValue().get()); + if (StatusUtil.isFullyReplicated(stat)) { + status = "Finished"; } else { - status = stat.getBegin() + "/" + stat.getEnd() + " records"; + if (stat.getInfiniteEnd()) { + status = stat.getBegin() + "/∞ records"; + } else { + status = stat.getBegin() + "/" + stat.getEnd() + " records"; + } } + } catch (InvalidProtocolBufferException e) { + log.warn("Could not deserialize protobuf for {}", kv.getKey(), e); + status = "Unknown"; } - } catch (InvalidProtocolBufferException e) { - log.warn("Could not deserialize protobuf for {}", kv.getKey(), e); - status = "Unknown"; } } + + // Add a row in the table + replicationInProgress.addRow(null == path ? ".../" + filename : path, target.getPeerName(), target.getSourceTableId(), target.getRemoteIdentifier(), status); } - - // Add a row in the table - replicationInProgress.addRow(null == path ? ".../" + filename : path, target.getPeerName(), target.getSourceTableId(), target.getRemoteIdentifier(), status); + } catch (KeeperException | InterruptedException e) { + log.warn("Could not calculate replication in progress", e); } replicationInProgress.generate(req, sb); http://git-wip-us.apache.org/repos/asf/accumulo/blob/672801c1/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index f67c51f..9e8af0a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -160,6 +160,7 @@ import org.apache.accumulo.server.master.state.TabletStateStore; import org.apache.accumulo.server.master.state.ZooTabletStateStore; import org.apache.accumulo.server.problems.ProblemReport; import org.apache.accumulo.server.problems.ProblemReports; +import org.apache.accumulo.server.replication.ZooKeeperInitialization; import org.apache.accumulo.server.security.AuditedSecurityOperation; import org.apache.accumulo.server.security.SecurityOperation; import org.apache.accumulo.server.security.SystemCredentials; @@ -2340,6 +2341,15 @@ public class TabletServer implements Runnable { public void run() { SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration()); + // To make things easier on users/devs, and to avoid creating an upgrade path to 1.7 + // We can just make the zookeeper paths before we try to use. + try { + ZooKeeperInitialization.ensureZooKeeperInitialized(ZooReaderWriter.getInstance(), ZooUtil.getRoot(getInstance())); + } catch (KeeperException | InterruptedException e) { + log.error("Could not ensure that ZooKeeper is properly initialized", e); + throw new RuntimeException(e); + } + try { clientAddress = startTabletClientService(); } catch (UnknownHostException e1) {