Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 73863104A3 for ; Fri, 6 Sep 2013 01:48:37 +0000 (UTC) Received: (qmail 19327 invoked by uid 500); 6 Sep 2013 01:48:34 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 19236 invoked by uid 500); 6 Sep 2013 01:48:34 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 17765 invoked by uid 99); 6 Sep 2013 01:48:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Sep 2013 01:48:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 95016901D8D; Fri, 6 Sep 2013 01:48:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Fri, 06 Sep 2013 01:49:21 -0000 Message-Id: <65871330f1d146bf9e848a5697119352@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [53/53] [abbrv] git commit: ACCUMULO-658 Move tests and resources to correct modules ACCUMULO-658 Move tests and resources to correct modules Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/162bd40d Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/162bd40d Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/162bd40d Branch: refs/heads/ACCUMULO-210 Commit: 162bd40d081a07265ec4ec2f57f1b3c763499899 Parents: cc9f575 Author: Christopher Tubbs Authored: Tue Aug 6 11:41:15 2013 -0400 Committer: Christopher Tubbs Committed: Thu Sep 5 21:47:33 2013 -0400 ---------------------------------------------------------------------- minicluster/pom.xml | 8 +- server/base/pom.xml | 8 + .../apache/accumulo/master/LiveTServerSet.java | 398 ++++++++++++ .../apache/accumulo/server/util/Initialize.java | 522 ++++++++++++++++ .../constraints/MetadataConstraints.java | 315 ++++++++++ .../iterators/MetadataBulkLoadFilter.java | 91 +++ .../server/client/BulkImporterTest.java | 152 +++++ .../constraints/MetadataConstraintsTest.java | 240 ++++++++ .../server/data/ServerMutationTest.java | 79 +++ .../iterators/MetadataBulkLoadFilterTest.java | 144 +++++ .../server/security/SystemCredentialsTest.java | 65 ++ .../security/handler/ZKAuthenticatorTest.java | 87 +++ .../apache/accumulo/server/util/CloneTest.java | 375 ++++++++++++ .../accumulo/server/util/DefaultMapTest.java | 47 ++ .../server/util/TabletIteratorTest.java | 107 ++++ .../server/util/time/BaseRelativeTimeTest.java | 89 +++ .../base/src/test/resources/accumulo-site.xml | 32 + server/base/src/test/resources/log4j.properties | 21 + .../apache/accumulo/master/LiveTServerSet.java | 398 ------------ server/server/pom.xml | 136 ----- .../accumulo/server/metanalysis/FilterMeta.java | 92 --- .../accumulo/server/metanalysis/FindTablet.java | 66 -- .../accumulo/server/metanalysis/IndexMeta.java | 176 ------ .../server/metanalysis/LogFileInputFormat.java | 116 ---- .../server/metanalysis/LogFileOutputFormat.java | 66 -- .../server/metanalysis/PrintEvents.java | 99 --- .../server/metanalysis/package-info.java | 34 -- .../server/util/FindOfflineTablets.java | 132 ---- .../apache/accumulo/server/util/Initialize.java | 522 ---------------- .../src/main/resources/randomwalk/Basic.xml | 37 -- .../src/main/resources/randomwalk/Simple.xml | 43 -- .../src/main/resources/randomwalk/module.xsd | 69 --- .../server/client/BulkImporterTest.java | 152 ----- .../constraints/MetadataConstraintsTest.java | 240 -------- .../server/data/ServerMutationTest.java | 79 --- .../iterators/MetadataBulkLoadFilterTest.java | 144 ----- .../server/security/SystemCredentialsTest.java | 65 -- .../security/handler/ZKAuthenticatorTest.java | 87 --- .../tabletserver/CheckTabletMetadataTest.java | 122 ---- .../server/tabletserver/InMemoryMapTest.java | 492 --------------- .../tabletserver/log/MultiReaderTest.java | 146 ----- .../tabletserver/log/SortedLogRecoveryTest.java | 602 ------------------- .../apache/accumulo/server/util/CloneTest.java | 375 ------------ .../accumulo/server/util/DefaultMapTest.java | 47 -- .../server/util/TabletIteratorTest.java | 107 ---- .../server/util/time/BaseRelativeTimeTest.java | 89 --- .../accumulo/tserver/logger/LogFileTest.java | 124 ---- .../server/src/test/resources/accumulo-site.xml | 32 - .../server/src/test/resources/log4j.properties | 21 - .../constraints/MetadataConstraints.java | 315 ---------- .../iterators/MetadataBulkLoadFilter.java | 91 --- .../tabletserver/CheckTabletMetadataTest.java | 122 ++++ .../server/tabletserver/InMemoryMapTest.java | 492 +++++++++++++++ .../tabletserver/log/MultiReaderTest.java | 146 +++++ .../tabletserver/log/SortedLogRecoveryTest.java | 602 +++++++++++++++++++ .../accumulo/tserver/logger/LogFileTest.java | 124 ++++ server/utils/pom.xml | 6 +- .../accumulo/server/metanalysis/FilterMeta.java | 92 +++ .../accumulo/server/metanalysis/FindTablet.java | 66 ++ .../accumulo/server/metanalysis/IndexMeta.java | 176 ++++++ .../server/metanalysis/LogFileInputFormat.java | 116 ++++ .../server/metanalysis/LogFileOutputFormat.java | 66 ++ .../server/metanalysis/PrintEvents.java | 99 +++ .../server/metanalysis/package-info.java | 34 ++ .../server/util/FindOfflineTablets.java | 132 ++++ test/pom.xml | 6 +- test/src/main/resources/randomwalk/Basic.xml | 37 ++ test/src/main/resources/randomwalk/Simple.xml | 43 ++ test/src/main/resources/randomwalk/module.xsd | 69 +++ 69 files changed, 5200 insertions(+), 5324 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/minicluster/pom.xml ---------------------------------------------------------------------- diff --git a/minicluster/pom.xml b/minicluster/pom.xml index c35e7a5..f984a42 100644 --- a/minicluster/pom.xml +++ b/minicluster/pom.xml @@ -35,15 +35,11 @@ org.apache.accumulo - accumulo-core + accumulo-master org.apache.accumulo - accumulo-server - - - org.apache.accumulo - accumulo-start + accumulo-tserver org.apache.hadoop http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/pom.xml ---------------------------------------------------------------------- diff --git a/server/base/pom.xml b/server/base/pom.xml index 5fc94cc..409e884 100644 --- a/server/base/pom.xml +++ b/server/base/pom.xml @@ -124,4 +124,12 @@ test + + + + true + src/test/resources + + + http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/main/java/org/apache/accumulo/master/LiveTServerSet.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/master/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/master/LiveTServerSet.java new file mode 100644 index 0000000..59ab8c8 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/master/LiveTServerSet.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master; + +import static org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy.SKIP; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.util.AddressUtil; +import org.apache.accumulo.core.util.ServerServices; +import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.master.state.TServerInstance; +import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.server.util.Halt; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.accumulo.server.zookeeper.ZooCache; +import org.apache.accumulo.server.zookeeper.ZooLock; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.trace.instrument.Tracer; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransport; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NotEmptyException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; + +public class LiveTServerSet implements Watcher { + + public interface Listener { + void update(LiveTServerSet current, Set deleted, Set added); + } + + private static final Logger log = Logger.getLogger(LiveTServerSet.class); + + private final Listener cback; + private final Instance instance; + private final AccumuloConfiguration conf; + private ZooCache zooCache; + + public class TServerConnection { + private final InetSocketAddress address; + + public TServerConnection(InetSocketAddress addr) throws TException { + address = addr; + } + + private String lockString(ZooLock mlock) { + return mlock.getLockID().serialize(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK); + } + + public void assignTablet(ZooLock lock, KeyExtent extent) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.loadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift()); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void unloadTablet(ZooLock lock, KeyExtent extent, boolean save) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.unloadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift(), save); + } finally { + ThriftUtil.returnClient(client); + } + } + + public TabletServerStatus getTableMap(boolean usePooledConnection) throws TException, ThriftSecurityException { + + if (usePooledConnection == true) + throw new UnsupportedOperationException(); + + TTransport transport = ThriftUtil.createTransport(address, conf); + + try { + TabletClientService.Client client = ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport); + return client.getTabletServerStatus(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance)); + } finally { + if (transport != null) + transport.close(); + } + } + + public void halt(ZooLock lock) throws TException, ThriftSecurityException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.halt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock)); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void fastHalt(ZooLock lock) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.fastHalt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock)); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void flush(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.flush(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId, + startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow)); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void chop(ZooLock lock, KeyExtent extent) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.chop(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift()); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void splitTablet(ZooLock lock, KeyExtent extent, Text splitPoint) throws TException, ThriftSecurityException, NotServingTabletException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.splitTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), extent.toThrift(), + ByteBuffer.wrap(splitPoint.getBytes(), 0, splitPoint.getLength())); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void flushTablet(ZooLock lock, KeyExtent extent) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.flushTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift()); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void compact(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.compact(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId, + startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow)); + } finally { + ThriftUtil.returnClient(client); + } + } + + public boolean isActive(long tid) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + return client.isActive(Tracer.traceInfo(), tid); + } finally { + ThriftUtil.returnClient(client); + } + } + + } + + static class TServerInfo { + TServerConnection connection; + TServerInstance instance; + + TServerInfo(TServerInstance instance, TServerConnection connection) { + this.connection = connection; + this.instance = instance; + } + }; + + // The set of active tservers with locks, indexed by their name in zookeeper + private Map current = new HashMap(); + // as above, indexed by TServerInstance + private Map currentInstances = new HashMap(); + + // The set of entries in zookeeper without locks, and the first time each was noticed + private Map locklessServers = new HashMap(); + + public LiveTServerSet(Instance instance, AccumuloConfiguration conf, Listener cback) { + this.cback = cback; + this.instance = instance; + this.conf = conf; + + } + + public synchronized ZooCache getZooCache() { + if (zooCache == null) + zooCache = new ZooCache(this); + return zooCache; + } + + public synchronized void startListeningForTabletServerChanges() { + scanServers(); + SimpleTimer.getInstance().schedule(new Runnable() { + @Override + public void run() { + scanServers(); + } + }, 0, 5000); + } + + public synchronized void scanServers() { + try { + final Set updates = new HashSet(); + final Set doomed = new HashSet(); + + final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS; + + HashSet all = new HashSet(current.keySet()); + all.addAll(getZooCache().getChildren(path)); + + locklessServers.keySet().retainAll(all); + + for (String zPath : all) { + checkServer(updates, doomed, path, zPath); + } + + // log.debug("Current: " + current.keySet()); + if (!doomed.isEmpty() || !updates.isEmpty()) + this.cback.update(this, doomed, updates); + } catch (Exception ex) { + log.error(ex, ex); + } + } + + private void deleteServerNode(String serverNode) throws InterruptedException, KeeperException { + try { + ZooReaderWriter.getInstance().delete(serverNode, -1); + } catch (NotEmptyException ex) { + // race condition: tserver created the lock after our last check; we'll see it at the next check + } catch (NoNodeException nne) { + // someone else deleted it + } + } + + private synchronized void checkServer(final Set updates, final Set doomed, final String path, final String zPath) + throws TException, InterruptedException, KeeperException { + + TServerInfo info = current.get(zPath); + + final String lockPath = path + "/" + zPath; + Stat stat = new Stat(); + byte[] lockData = ZooLock.getLockData(getZooCache(), lockPath, stat); + + if (lockData == null) { + if (info != null) { + doomed.add(info.instance); + current.remove(zPath); + currentInstances.remove(info.instance); + } + + Long firstSeen = locklessServers.get(zPath); + if (firstSeen == null) { + locklessServers.put(zPath, System.currentTimeMillis()); + } else if (System.currentTimeMillis() - firstSeen > 10 * 60 * 1000) { + deleteServerNode(path + "/" + zPath); + locklessServers.remove(zPath); + } + } else { + locklessServers.remove(zPath); + ServerServices services = new ServerServices(new String(lockData)); + InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT); + TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner()); + + if (info == null) { + updates.add(instance); + TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client)); + current.put(zPath, tServerInfo); + currentInstances.put(instance, tServerInfo); + } else if (!info.instance.equals(instance)) { + doomed.add(info.instance); + updates.add(instance); + TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client)); + current.put(zPath, tServerInfo); + currentInstances.put(info.instance, tServerInfo); + } + } + } + + @Override + public void process(WatchedEvent event) { + + // its important that these event are propagated by ZooCache, because this ensures when reading zoocache that is has already processed the event and cleared + // relevant nodes before code below reads from zoocache + + if (event.getPath() != null) { + if (event.getPath().endsWith(Constants.ZTSERVERS)) { + scanServers(); + } else if (event.getPath().contains(Constants.ZTSERVERS)) { + int pos = event.getPath().lastIndexOf('/'); + + // do only if ZTSERVER is parent + if (pos >= 0 && event.getPath().substring(0, pos).endsWith(Constants.ZTSERVERS)) { + + String server = event.getPath().substring(pos + 1); + + final Set updates = new HashSet(); + final Set doomed = new HashSet(); + + final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS; + + try { + checkServer(updates, doomed, path, server); + if (!doomed.isEmpty() || !updates.isEmpty()) + this.cback.update(this, doomed, updates); + } catch (Exception ex) { + log.error(ex, ex); + } + } + } + } + } + + public synchronized TServerConnection getConnection(TServerInstance server) throws TException { + if (server == null) + return null; + TServerInfo tServerInfo = currentInstances.get(server); + if (tServerInfo == null) + return null; + return tServerInfo.connection; + } + + public synchronized Set getCurrentServers() { + return new HashSet(currentInstances.keySet()); + } + + public synchronized int size() { + return current.size(); + } + + public synchronized TServerInstance find(String tabletServer) { + InetSocketAddress addr = AddressUtil.parseAddress(tabletServer); + for (Entry entry : current.entrySet()) { + if (entry.getValue().instance.getLocation().equals(addr)) + return entry.getValue().instance; + } + return null; + } + + public synchronized void remove(TServerInstance server) { + String zPath = null; + for (Entry entry : current.entrySet()) { + if (entry.getValue().instance.equals(server)) { + zPath = entry.getKey(); + break; + } + } + if (zPath == null) + return; + current.remove(zPath); + currentInstances.remove(server); + + log.info("Removing zookeeper lock for " + server); + String fullpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + zPath; + try { + ZooReaderWriter.getRetryingInstance().recursiveDelete(fullpath, SKIP); + } catch (Exception e) { + String msg = "error removing tablet server lock"; + log.fatal(msg, e); + Halt.halt(msg, -1); + } + getZooCache().clear(fullpath); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/main/java/org/apache/accumulo/server/util/Initialize.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/util/Initialize.java new file mode 100644 index 0000000..0eb6d36 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Initialize.java @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map.Entry; +import java.util.UUID; + +import jline.console.ConsoleReader; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.Help; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.iterators.user.VersioningIterator; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.master.thrift.MasterGoalState; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.security.SecurityUtil; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.server.tables.TableManager; +import org.apache.accumulo.server.tabletserver.TabletTime; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.tserver.constraints.MetadataConstraints; +import org.apache.accumulo.tserver.iterators.MetadataBulkLoadFilter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs.Ids; + +import com.beust.jcommander.Parameter; + +/** + * This class is used to setup the directory structure and the root tablet to get an instance started + * + */ +public class Initialize { + private static final Logger log = Logger.getLogger(Initialize.class); + private static final String DEFAULT_ROOT_USER = "root"; + public static final String TABLE_TABLETS_TABLET_DIR = "/table_info"; + + private static ConsoleReader reader = null; + + private static ConsoleReader getConsoleReader() throws IOException { + if (reader == null) + reader = new ConsoleReader(); + return reader; + } + + private static HashMap initialMetadataConf = new HashMap(); + static { + initialMetadataConf.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "32K"); + initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), "5"); + initialMetadataConf.put(Property.TABLE_WALOG_ENABLED.getKey(), "true"); + initialMetadataConf.put(Property.TABLE_MAJC_RATIO.getKey(), "1"); + initialMetadataConf.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "64M"); + initialMetadataConf.put(Property.TABLE_CONSTRAINT_PREFIX.getKey() + "1", MetadataConstraints.class.getName()); + initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers", "10," + VersioningIterator.class.getName()); + initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers.opt.maxVersions", "1"); + initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers", "10," + VersioningIterator.class.getName()); + initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers.opt.maxVersions", "1"); + initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers", "10," + VersioningIterator.class.getName()); + initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers.opt.maxVersions", "1"); + initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.bulkLoadFilter", "20," + MetadataBulkLoadFilter.class.getName()); + initialMetadataConf.put(Property.TABLE_FAILURES_IGNORE.getKey(), "false"); + initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "tablet", + String.format("%s,%s", TabletsSection.TabletColumnFamily.NAME, TabletsSection.CurrentLocationColumnFamily.NAME)); + initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "server", String.format("%s,%s,%s,%s", DataFileColumnFamily.NAME, + LogColumnFamily.NAME, TabletsSection.ServerColumnFamily.NAME, TabletsSection.FutureLocationColumnFamily.NAME)); + initialMetadataConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "tablet,server"); + initialMetadataConf.put(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey(), ""); + initialMetadataConf.put(Property.TABLE_INDEXCACHE_ENABLED.getKey(), "true"); + initialMetadataConf.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true"); + } + + public static boolean doInit(Opts opts, Configuration conf, VolumeManager fs) throws IOException { + if (!ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_URI).equals("")) + log.info("Hadoop Filesystem is " + ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_URI)); + else + log.info("Hadoop Filesystem is " + FileSystem.getDefaultUri(conf)); + + log.info("Accumulo data dirs are " + Arrays.asList(ServerConstants.getBaseDirs())); + log.info("Zookeeper server is " + ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_ZK_HOST)); + log.info("Checking if Zookeeper is available. If this hangs, then you need to make sure zookeeper is running"); + if (!zookeeperAvailable()) { + log.fatal("Zookeeper needs to be up and running in order to init. Exiting ..."); + return false; + } + if (ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_SECRET).equals(Property.INSTANCE_SECRET.getDefaultValue())) { + ConsoleReader c = getConsoleReader(); + c.beep(); + c.println(); + c.println(); + c.println("Warning!!! Your instance secret is still set to the default, this is not secure. We highly recommend you change it."); + c.println(); + c.println(); + c.println("You can change the instance secret in accumulo by using:"); + c.println(" bin/accumulo " + org.apache.accumulo.server.util.ChangeSecret.class.getName() + " oldPassword newPassword."); + c.println("You will also need to edit your secret in your configuration file by adding the property instance.secret to your conf/accumulo-site.xml. Without this accumulo will not operate correctly"); + } + + try { + if (isInitialized(fs)) { + log.fatal("It appears this location was previously initialized, exiting ... "); + return false; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + // prompt user for instance name and root password early, in case they + // abort, we don't leave an inconsistent HDFS/ZooKeeper structure + String instanceNamePath; + try { + instanceNamePath = getInstanceNamePath(opts); + } catch (Exception e) { + log.fatal("Failed to talk to zookeeper", e); + return false; + } + opts.rootpass = getRootPassword(opts); + return initialize(opts, instanceNamePath, fs); + } + + public static boolean initialize(Opts opts, String instanceNamePath, VolumeManager fs) { + + UUID uuid = UUID.randomUUID(); + try { + initZooKeeper(opts, uuid.toString(), instanceNamePath); + } catch (Exception e) { + log.fatal("Failed to initialize zookeeper", e); + return false; + } + + try { + initFileSystem(opts, fs, uuid); + } catch (Exception e) { + log.fatal("Failed to initialize filesystem", e); + return false; + } + + try { + initSecurity(opts, uuid.toString()); + } catch (Exception e) { + log.fatal("Failed to initialize security", e); + return false; + } + return true; + } + + private static boolean zookeeperAvailable() { + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + try { + return zoo.exists("/"); + } catch (KeeperException e) { + return false; + } catch (InterruptedException e) { + return false; + } + } + + private static Path[] paths(String[] paths) { + Path[] result = new Path[paths.length]; + for (int i = 0; i < paths.length; i++) { + result[i] = new Path(paths[i]); + } + return result; + } + + private static T[] concat(T[] a, T[] b) { + List result = new ArrayList(a.length + b.length); + for (int i = 0; i < a.length; i++) { + result.add(a[i]); + } + for (int i = 0; i < b.length; i++) { + result.add(b[i]); + } + return result.toArray(a); + } + + private static void initFileSystem(Opts opts, VolumeManager fs, UUID uuid) throws IOException { + FileStatus fstat; + + // the actual disk locations of the root table and tablets + final Path rootTablet = new Path(ServerConstants.getRootTabletDir()); + + // the actual disk locations of the metadata table and tablets + final Path[] metadataTableDirs = paths(ServerConstants.getMetadataTableDirs()); + final Path[] tableMetadataTabletDirs = paths(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), TABLE_TABLETS_TABLET_DIR)); + final Path[] defaultMetadataTabletDirs = paths(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), Constants.DEFAULT_TABLET_LOCATION)); + + fs.mkdirs(new Path(ServerConstants.getDataVersionLocation(), "" + ServerConstants.DATA_VERSION)); + + // create an instance id + fs.mkdirs(ServerConstants.getInstanceIdLocation()); + fs.createNewFile(new Path(ServerConstants.getInstanceIdLocation(), uuid.toString())); + + // initialize initial metadata config in zookeeper + initMetadataConfig(); + + // create metadata table + for (Path mtd : metadataTableDirs) { + try { + fstat = fs.getFileStatus(mtd); + if (!fstat.isDir()) { + log.fatal("location " + mtd.toString() + " exists but is not a directory"); + return; + } + } catch (FileNotFoundException fnfe) { + if (!fs.mkdirs(mtd)) { + log.fatal("unable to create directory " + mtd.toString()); + return; + } + } + } + + // create root table and tablet + try { + fstat = fs.getFileStatus(rootTablet); + if (!fstat.isDir()) { + log.fatal("location " + rootTablet.toString() + " exists but is not a directory"); + return; + } + } catch (FileNotFoundException fnfe) { + if (!fs.mkdirs(rootTablet)) { + log.fatal("unable to create directory " + rootTablet.toString()); + return; + } + } + + // populate the root tablet with info about the default tablet + // the root tablet contains the key extent and locations of all the + // metadata tablets + String initRootTabFile = rootTablet + "/00000_00000." + FileOperations.getNewFileExtension(AccumuloConfiguration.getDefaultConfiguration()); + FileSystem ns = fs.getFileSystemByPath(new Path(initRootTabFile)); + FileSKVWriter mfw = FileOperations.getInstance().openWriter(initRootTabFile, ns, ns.getConf(), AccumuloConfiguration.getDefaultConfiguration()); + mfw.startDefaultLocalityGroup(); + + Text tableExtent = new Text(KeyExtent.getMetadataEntry(new Text(MetadataTable.ID), MetadataSchema.TabletsSection.getRange().getEndKey().getRow())); + + // table tablet's directory + Key tableDirKey = new Key(tableExtent, TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnFamily(), + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnQualifier(), 0); + mfw.append(tableDirKey, new Value(TABLE_TABLETS_TABLET_DIR.getBytes())); + + // table tablet time + Key tableTimeKey = new Key(tableExtent, TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnFamily(), + TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnQualifier(), 0); + mfw.append(tableTimeKey, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes())); + + // table tablet's prevrow + Key tablePrevRowKey = new Key(tableExtent, TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnFamily(), + TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnQualifier(), 0); + mfw.append(tablePrevRowKey, KeyExtent.encodePrevEndRow(null)); + + // ----------] default tablet info + Text defaultExtent = new Text(KeyExtent.getMetadataEntry(new Text(MetadataTable.ID), null)); + + // default's directory + Key defaultDirKey = new Key(defaultExtent, TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnFamily(), + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnQualifier(), 0); + mfw.append(defaultDirKey, new Value(Constants.DEFAULT_TABLET_LOCATION.getBytes())); + + // default's time + Key defaultTimeKey = new Key(defaultExtent, TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnFamily(), + TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnQualifier(), 0); + mfw.append(defaultTimeKey, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes())); + + // default's prevrow + Key defaultPrevRowKey = new Key(defaultExtent, TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnFamily(), + TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnQualifier(), 0); + mfw.append(defaultPrevRowKey, KeyExtent.encodePrevEndRow(MetadataSchema.TabletsSection.getRange().getEndKey().getRow())); + + mfw.close(); + + // create table and default tablets directories + for (Path dir : concat(defaultMetadataTabletDirs, tableMetadataTabletDirs)) { + try { + fstat = fs.getFileStatus(dir); + if (!fstat.isDir()) { + log.fatal("location " + dir.toString() + " exists but is not a directory"); + return; + } + } catch (FileNotFoundException fnfe) { + try { + fstat = fs.getFileStatus(dir); + if (!fstat.isDir()) { + log.fatal("location " + dir.toString() + " exists but is not a directory"); + return; + } + } catch (FileNotFoundException fnfe2) { + // create table info dir + if (!fs.mkdirs(dir)) { + log.fatal("unable to create directory " + dir.toString()); + return; + } + } + + // create default dir + if (!fs.mkdirs(dir)) { + log.fatal("unable to create directory " + dir.toString()); + return; + } + } + } + } + + private static void initZooKeeper(Opts opts, String uuid, String instanceNamePath) throws KeeperException, InterruptedException { + // setup basic data in zookeeper + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + ZooUtil.putPersistentData(zoo.getZooKeeper(), Constants.ZROOT, new byte[0], -1, NodeExistsPolicy.SKIP, Ids.OPEN_ACL_UNSAFE); + ZooUtil.putPersistentData(zoo.getZooKeeper(), Constants.ZROOT + Constants.ZINSTANCES, new byte[0], -1, NodeExistsPolicy.SKIP, Ids.OPEN_ACL_UNSAFE); + + // setup instance name + if (opts.clearInstanceName) + zoo.recursiveDelete(instanceNamePath, NodeMissingPolicy.SKIP); + zoo.putPersistentData(instanceNamePath, uuid.getBytes(), NodeExistsPolicy.FAIL); + + // setup the instance + String zkInstanceRoot = Constants.ZROOT + "/" + uuid; + zoo.putPersistentData(zkInstanceRoot, new byte[0], NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZTABLES, Constants.ZTABLES_INITIAL_ID, NodeExistsPolicy.FAIL); + TableManager.prepareNewTableState(uuid, RootTable.ID, RootTable.NAME, TableState.ONLINE, NodeExistsPolicy.FAIL); + TableManager.prepareNewTableState(uuid, MetadataTable.ID, MetadataTable.NAME, TableState.ONLINE, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZTSERVERS, new byte[0], NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS, new byte[0], NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET, new byte[0], NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_WALOGS, new byte[0], NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZTRACERS, new byte[0], NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, new byte[0], NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, new byte[0], NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_GOAL_STATE, MasterGoalState.NORMAL.toString().getBytes(), NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZGC, new byte[0], NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZGC_LOCK, new byte[0], NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZCONFIG, new byte[0], NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZTABLE_LOCKS, new byte[0], NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZHDFS_RESERVATIONS, new byte[0], NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZNEXT_FILE, new byte[] {'0'}, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZRECOVERY, new byte[] {'0'}, NodeExistsPolicy.FAIL); + } + + private static String getInstanceNamePath(Opts opts) throws IOException, KeeperException, InterruptedException { + // setup the instance name + String instanceName, instanceNamePath = null; + boolean exists = true; + do { + if (opts.cliInstanceName == null) { + instanceName = getConsoleReader().readLine("Instance name : "); + } else { + instanceName = opts.cliInstanceName; + } + if (instanceName == null) + System.exit(0); + instanceName = instanceName.trim(); + if (instanceName.length() == 0) + continue; + instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName; + if (opts.clearInstanceName) { + exists = false; + break; + } else if (exists = ZooReaderWriter.getInstance().exists(instanceNamePath)) { + String decision = getConsoleReader().readLine("Instance name \"" + instanceName + "\" exists. Delete existing entry from zookeeper? [Y/N] : "); + if (decision == null) + System.exit(0); + if (decision.length() == 1 && decision.toLowerCase(Locale.ENGLISH).charAt(0) == 'y') { + opts.clearInstanceName = true; + exists = false; + } + } + } while (exists); + return instanceNamePath; + } + + private static byte[] getRootPassword(Opts opts) throws IOException { + if (opts.cliPassword != null) { + return opts.cliPassword.getBytes(); + } + String rootpass; + String confirmpass; + do { + rootpass = getConsoleReader() + .readLine("Enter initial password for " + DEFAULT_ROOT_USER + " (this may not be applicable for your security setup): ", '*'); + if (rootpass == null) + System.exit(0); + confirmpass = getConsoleReader().readLine("Confirm initial password for " + DEFAULT_ROOT_USER + ": ", '*'); + if (confirmpass == null) + System.exit(0); + if (!rootpass.equals(confirmpass)) + log.error("Passwords do not match"); + } while (!rootpass.equals(confirmpass)); + return rootpass.getBytes(); + } + + private static void initSecurity(Opts opts, String iid) throws AccumuloSecurityException, ThriftSecurityException { + AuditedSecurityOperation.getInstance(iid, true).initializeSecurity(SystemCredentials.get().toThrift(HdfsZooInstance.getInstance()), DEFAULT_ROOT_USER, + opts.rootpass); + } + + protected static void initMetadataConfig() throws IOException { + try { + Configuration conf = CachedConfiguration.getInstance(); + int max = conf.getInt("dfs.replication.max", 512); + // Hadoop 0.23 switched the min value configuration name + int min = Math.max(conf.getInt("dfs.replication.min", 1), conf.getInt("dfs.namenode.replication.min", 1)); + if (max < 5) + setMetadataReplication(max, "max"); + if (min > 5) + setMetadataReplication(min, "min"); + for (Entry entry : initialMetadataConf.entrySet()) { + if (!TablePropUtil.setTableProperty(RootTable.ID, entry.getKey(), entry.getValue())) + throw new IOException("Cannot create per-table property " + entry.getKey()); + if (!TablePropUtil.setTableProperty(MetadataTable.ID, entry.getKey(), entry.getValue())) + throw new IOException("Cannot create per-table property " + entry.getKey()); + } + } catch (Exception e) { + log.fatal("error talking to zookeeper", e); + throw new IOException(e); + } + } + + private static void setMetadataReplication(int replication, String reason) throws IOException { + String rep = getConsoleReader().readLine( + "Your HDFS replication " + reason + " is not compatible with our default " + MetadataTable.NAME + " replication of 5. What do you want to set your " + + MetadataTable.NAME + " replication to? (" + replication + ") "); + if (rep == null || rep.length() == 0) + rep = Integer.toString(replication); + else + // Lets make sure it's a number + Integer.parseInt(rep); + initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), rep); + } + + public static boolean isInitialized(VolumeManager fs) throws IOException { + return (fs.exists(ServerConstants.getInstanceIdLocation()) || fs.exists(ServerConstants.getDataVersionLocation())); + } + + static class Opts extends Help { + @Parameter(names = "--reset-security", description = "just update the security information") + boolean resetSecurity = false; + @Parameter(names = "--clear-instance-name", description = "delete any existing instance name without prompting") + boolean clearInstanceName = false; + @Parameter(names = "--instance-name", description = "the instance name, if not provided, will prompt") + String cliInstanceName; + @Parameter(names = "--password", description = "set the password on the command line") + String cliPassword; + + byte[] rootpass = null; + } + + public static void main(String[] args) { + Opts opts = new Opts(); + opts.parseArgs(Initialize.class.getName(), args); + + try { + SecurityUtil.serverLogin(); + Configuration conf = CachedConfiguration.getInstance(); + + @SuppressWarnings("deprecation") + VolumeManager fs = VolumeManagerImpl.get(SiteConfiguration.getSiteConfiguration()); + + if (opts.resetSecurity) { + if (isInitialized(fs)) { + opts.rootpass = getRootPassword(opts); + initSecurity(opts, HdfsZooInstance.getInstance().getInstanceID()); + } else { + log.fatal("Attempted to reset security on accumulo before it was initialized"); + } + } else if (!doInit(opts, conf, fs)) + System.exit(-1); + } catch (Exception e) { + log.fatal(e, e); + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/main/java/org/apache/accumulo/tserver/constraints/MetadataConstraints.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/tserver/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/tserver/constraints/MetadataConstraints.java new file mode 100644 index 0000000..f190cee --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/tserver/constraints/MetadataConstraints.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.constraints; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.constraints.Constraint; +import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; +import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator; +import org.apache.accumulo.server.zookeeper.ZooCache; +import org.apache.accumulo.server.zookeeper.ZooLock; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +public class MetadataConstraints implements Constraint { + + private ZooCache zooCache = null; + private String zooRoot = null; + + private static final Logger log = Logger.getLogger(MetadataConstraints.class); + + private static boolean[] validTableNameChars = new boolean[256]; + + { + for (int i = 0; i < 256; i++) { + validTableNameChars[i] = ((i >= 'a' && i <= 'z') || (i >= '0' && i <= '9')) || i == '!'; + } + } + + private static final HashSet validColumnQuals = new HashSet(Arrays.asList(new ColumnFQ[] { + TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN, TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN, + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN, TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN, TabletsSection.ServerColumnFamily.TIME_COLUMN, + TabletsSection.ServerColumnFamily.LOCK_COLUMN, TabletsSection.ServerColumnFamily.FLUSH_COLUMN, TabletsSection.ServerColumnFamily.COMPACT_COLUMN})); + + private static final HashSet validColumnFams = new HashSet(Arrays.asList(new Text[] {TabletsSection.BulkFileColumnFamily.NAME, + LogColumnFamily.NAME, ScanFileColumnFamily.NAME, DataFileColumnFamily.NAME, + TabletsSection.CurrentLocationColumnFamily.NAME, TabletsSection.LastLocationColumnFamily.NAME, TabletsSection.FutureLocationColumnFamily.NAME, + ChoppedColumnFamily.NAME, ClonedColumnFamily.NAME})); + + private static boolean isValidColumn(ColumnUpdate cu) { + + if (validColumnFams.contains(new Text(cu.getColumnFamily()))) + return true; + + if (validColumnQuals.contains(new ColumnFQ(cu))) + return true; + + return false; + } + + static private ArrayList addViolation(ArrayList lst, int violation) { + if (lst == null) + lst = new ArrayList(); + lst.add((short) violation); + return lst; + } + + static private ArrayList addIfNotPresent(ArrayList lst, int intViolation) { + if (lst == null) + return addViolation(lst, intViolation); + short violation = (short) intViolation; + if (!lst.contains(violation)) + return addViolation(lst, intViolation); + return lst; + } + + @Override + public List check(Environment env, Mutation mutation) { + + ArrayList violations = null; + + Collection colUpdates = mutation.getUpdates(); + + // check the row, it should contains at least one ; or end with < + boolean containsSemiC = false; + + byte[] row = mutation.getRow(); + + // always allow rows that fall within reserved areas + if (row.length > 0 && row[0] == '~') + return null; + if (row.length > 2 && row[0] == '!' && row[1] == '!' && row[2] == '~') + return null; + + for (byte b : row) { + if (b == ';') { + containsSemiC = true; + } + + if (b == ';' || b == '<') + break; + + if (!validTableNameChars[0xff & b]) { + violations = addIfNotPresent(violations, 4); + } + } + + if (!containsSemiC) { + // see if last row char is < + if (row.length == 0 || row[row.length - 1] != '<') { + violations = addIfNotPresent(violations, 4); + } + } else { + if (row.length == 0) { + violations = addIfNotPresent(violations, 4); + } + } + + if (row.length > 0 && row[0] == '!') { + if (row.length < 3 || row[1] != '0' || (row[2] != '<' && row[2] != ';')) { + violations = addIfNotPresent(violations, 4); + } + } + + // ensure row is not less than Constants.METADATA_TABLE_ID + if (new Text(row).compareTo(new Text(MetadataTable.ID)) < 0) { + violations = addViolation(violations, 5); + } + + boolean checkedBulk = false; + + for (ColumnUpdate columnUpdate : colUpdates) { + Text columnFamily = new Text(columnUpdate.getColumnFamily()); + + if (columnUpdate.isDeleted()) { + if (!isValidColumn(columnUpdate)) { + violations = addViolation(violations, 2); + } + continue; + } + + if (columnUpdate.getValue().length == 0 && !columnFamily.equals(ScanFileColumnFamily.NAME)) { + violations = addViolation(violations, 6); + } + + if (columnFamily.equals(DataFileColumnFamily.NAME)) { + try { + DataFileValue dfv = new DataFileValue(columnUpdate.getValue()); + + if (dfv.getSize() < 0 || dfv.getNumEntries() < 0) { + violations = addViolation(violations, 1); + } + } catch (NumberFormatException nfe) { + violations = addViolation(violations, 1); + } catch (ArrayIndexOutOfBoundsException aiooe) { + violations = addViolation(violations, 1); + } + } else if (columnFamily.equals(ScanFileColumnFamily.NAME)) { + + } else if (columnFamily.equals(TabletsSection.BulkFileColumnFamily.NAME)) { + if (!columnUpdate.isDeleted() && !checkedBulk) { + // splits, which also write the time reference, are allowed to write this reference even when + // the transaction is not running because the other half of the tablet is holding a reference + // to the file. + boolean isSplitMutation = false; + // When a tablet is assigned, it re-writes the metadata. It should probably only update the location information, + // but it writes everything. We allow it to re-write the bulk information if it is setting the location. + // See ACCUMULO-1230. + boolean isLocationMutation = false; + + HashSet dataFiles = new HashSet(); + HashSet loadedFiles = new HashSet(); + + String tidString = new String(columnUpdate.getValue()); + int otherTidCount = 0; + + for (ColumnUpdate update : mutation.getUpdates()) { + if (new ColumnFQ(update).equals(TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN)) { + isSplitMutation = true; + } else if (new Text(update.getColumnFamily()).equals(TabletsSection.CurrentLocationColumnFamily.NAME)) { + isLocationMutation = true; + } else if (new Text(update.getColumnFamily()).equals(DataFileColumnFamily.NAME)) { + dataFiles.add(new Text(update.getColumnQualifier())); + } else if (new Text(update.getColumnFamily()).equals(TabletsSection.BulkFileColumnFamily.NAME)) { + loadedFiles.add(new Text(update.getColumnQualifier())); + + if (!new String(update.getValue()).equals(tidString)) { + otherTidCount++; + } + } + } + + if (!isSplitMutation && !isLocationMutation) { + long tid = Long.parseLong(tidString); + + try { + if (otherTidCount > 0 || !dataFiles.equals(loadedFiles) || !getArbitrator().transactionAlive(Constants.BULK_ARBITRATOR_TYPE, tid)) { + violations = addViolation(violations, 8); + } + } catch (Exception ex) { + violations = addViolation(violations, 8); + } + } + + checkedBulk = true; + } + } else { + if (!isValidColumn(columnUpdate)) { + violations = addViolation(violations, 2); + } else if (new ColumnFQ(columnUpdate).equals(TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN) && columnUpdate.getValue().length > 0 + && (violations == null || !violations.contains((short) 4))) { + KeyExtent ke = new KeyExtent(new Text(mutation.getRow()), (Text) null); + + Text per = KeyExtent.decodePrevEndRow(new Value(columnUpdate.getValue())); + + boolean prevEndRowLessThanEndRow = per == null || ke.getEndRow() == null || per.compareTo(ke.getEndRow()) < 0; + + if (!prevEndRowLessThanEndRow) { + violations = addViolation(violations, 3); + } + } else if (new ColumnFQ(columnUpdate).equals(TabletsSection.ServerColumnFamily.LOCK_COLUMN)) { + if (zooCache == null) { + zooCache = new ZooCache(); + } + + if (zooRoot == null) { + zooRoot = ZooUtil.getRoot(HdfsZooInstance.getInstance()); + } + + boolean lockHeld = false; + String lockId = new String(columnUpdate.getValue()); + + try { + lockHeld = ZooLock.isLockHeld(zooCache, new ZooUtil.LockID(zooRoot, lockId)); + } catch (Exception e) { + log.debug("Failed to verify lock was held " + lockId + " " + e.getMessage()); + } + + if (!lockHeld) { + violations = addViolation(violations, 7); + } + } + + } + } + + if (violations != null) { + log.debug("violating metadata mutation : " + new String(mutation.getRow())); + for (ColumnUpdate update : mutation.getUpdates()) { + log.debug(" update: " + new String(update.getColumnFamily()) + ":" + new String(update.getColumnQualifier()) + " value " + + (update.isDeleted() ? "[delete]" : new String(update.getValue()))); + } + } + + return violations; + } + + protected Arbitrator getArbitrator() { + return new ZooArbitrator(); + } + + @Override + public String getViolationDescription(short violationCode) { + switch (violationCode) { + case 1: + return "data file size must be a non-negative integer"; + case 2: + return "Invalid column name given."; + case 3: + return "Prev end row is greater than or equal to end row."; + case 4: + return "Invalid metadata row format"; + case 5: + return "Row can not be less than " + MetadataTable.ID; + case 6: + return "Empty values are not allowed for any " + MetadataTable.NAME + " column"; + case 7: + return "Lock not held in zookeeper by writer"; + case 8: + return "Bulk load transaction no longer running"; + } + return null; + } + + @Override + protected void finalize() { + if (zooCache != null) + zooCache.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/main/java/org/apache/accumulo/tserver/iterators/MetadataBulkLoadFilter.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/tserver/iterators/MetadataBulkLoadFilter.java b/server/base/src/main/java/org/apache/accumulo/tserver/iterators/MetadataBulkLoadFilter.java new file mode 100644 index 0000000..8c4c4e2 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/tserver/iterators/MetadataBulkLoadFilter.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.iterators; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.Filter; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator; +import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator; +import org.apache.log4j.Logger; + +/** + * A special iterator for the metadata table that removes inactive bulk load flags + * + */ +public class MetadataBulkLoadFilter extends Filter { + private static Logger log = Logger.getLogger(MetadataBulkLoadFilter.class); + + enum Status { + ACTIVE, INACTIVE + } + + Map bulkTxStatusCache; + Arbitrator arbitrator; + + @Override + public boolean accept(Key k, Value v) { + if (!k.isDeleted() && k.compareColumnFamily(TabletsSection.BulkFileColumnFamily.NAME) == 0) { + long txid = Long.valueOf(v.toString()); + + Status status = bulkTxStatusCache.get(txid); + if (status == null) { + try { + if (arbitrator.transactionComplete(Constants.BULK_ARBITRATOR_TYPE, txid)) { + status = Status.INACTIVE; + } else { + status = Status.ACTIVE; + } + } catch (Exception e) { + status = Status.ACTIVE; + log.error(e, e); + } + + bulkTxStatusCache.put(txid, status); + } + + return status == Status.ACTIVE; + } + + return true; + } + + @Override + public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { + super.init(source, options, env); + + if (env.getIteratorScope() == IteratorScope.scan) { + throw new IOException("This iterator not intended for use at scan time"); + } + + bulkTxStatusCache = new HashMap(); + arbitrator = getArbitrator(); + } + + protected Arbitrator getArbitrator() { + return new ZooArbitrator(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java new file mode 100644 index 0000000..fb4a3dc --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.client; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.impl.TabletLocator; +import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Test; + +public class BulkImporterTest { + + static final SortedSet fakeMetaData = new TreeSet(); + static final Text tableId = new Text("1"); + static { + fakeMetaData.add(new KeyExtent(tableId, new Text("a"), null)); + for (String part : new String[] {"b", "bm", "c", "cm", "d", "dm", "e", "em", "f", "g", "h", "i", "j", "k", "l"}) { + fakeMetaData.add(new KeyExtent(tableId, new Text(part), fakeMetaData.last().getEndRow())); + } + fakeMetaData.add(new KeyExtent(tableId, null, fakeMetaData.last().getEndRow())); + } + + class MockTabletLocator extends TabletLocator { + int invalidated = 0; + + @Override + public TabletLocation locateTablet(Credentials credentials, Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException { + return new TabletLocation(fakeMetaData.tailSet(new KeyExtent(tableId, row, null)).first(), "localhost"); + } + + @Override + public void binMutations(Credentials credentials, List mutations, Map> binnedMutations, List failures) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + throw new NotImplementedException(); + } + + @Override + public List binRanges(Credentials credentials, List ranges, Map>> binnedRanges) throws AccumuloException, + AccumuloSecurityException, TableNotFoundException { + throw new NotImplementedException(); + } + + @Override + public void invalidateCache(KeyExtent failedExtent) { + invalidated++; + } + + @Override + public void invalidateCache(Collection keySet) { + throw new NotImplementedException(); + } + + @Override + public void invalidateCache() { + throw new NotImplementedException(); + } + + @Override + public void invalidateCache(String server) { + throw new NotImplementedException(); + } + } + + @Test + public void testFindOverlappingTablets() throws Exception { + Credentials credentials = null; + MockTabletLocator locator = new MockTabletLocator(); + FileSystem fs = FileSystem.getLocal(CachedConfiguration.getInstance()); + AccumuloConfiguration acuConf = AccumuloConfiguration.getDefaultConfiguration(); + String file = "target/testFile.rf"; + fs.delete(new Path(file), true); + FileSKVWriter writer = FileOperations.getInstance().openWriter(file, fs, fs.getConf(), acuConf); + writer.startDefaultLocalityGroup(); + Value empty = new Value(new byte[] {}); + writer.append(new Key("a", "cf", "cq"), empty); + writer.append(new Key("a", "cf", "cq1"), empty); + writer.append(new Key("a", "cf", "cq2"), empty); + writer.append(new Key("a", "cf", "cq3"), empty); + writer.append(new Key("a", "cf", "cq4"), empty); + writer.append(new Key("a", "cf", "cq5"), empty); + writer.append(new Key("d", "cf", "cq"), empty); + writer.append(new Key("d", "cf", "cq1"), empty); + writer.append(new Key("d", "cf", "cq2"), empty); + writer.append(new Key("d", "cf", "cq3"), empty); + writer.append(new Key("d", "cf", "cq4"), empty); + writer.append(new Key("d", "cf", "cq5"), empty); + writer.append(new Key("dd", "cf", "cq1"), empty); + writer.append(new Key("ichabod", "cf", "cq"), empty); + writer.append(new Key("icky", "cf", "cq1"), empty); + writer.append(new Key("iffy", "cf", "cq2"), empty); + writer.append(new Key("internal", "cf", "cq3"), empty); + writer.append(new Key("is", "cf", "cq4"), empty); + writer.append(new Key("iterator", "cf", "cq5"), empty); + writer.append(new Key("xyzzy", "cf", "cq"), empty); + writer.close(); + List overlaps = BulkImporter.findOverlappingTablets(acuConf, fs, locator, new Path(file), credentials); + Assert.assertEquals(5, overlaps.size()); + Collections.sort(overlaps); + Assert.assertEquals(new KeyExtent(tableId, new Text("a"), null), overlaps.get(0).tablet_extent); + Assert.assertEquals(new KeyExtent(tableId, new Text("d"), new Text("cm")), overlaps.get(1).tablet_extent); + Assert.assertEquals(new KeyExtent(tableId, new Text("dm"), new Text("d")), overlaps.get(2).tablet_extent); + Assert.assertEquals(new KeyExtent(tableId, new Text("j"), new Text("i")), overlaps.get(3).tablet_extent); + Assert.assertEquals(new KeyExtent(tableId, null, new Text("l")), overlaps.get(4).tablet_extent); + + List overlaps2 = BulkImporter.findOverlappingTablets(acuConf, fs, locator, new Path(file), new KeyExtent(tableId, new Text("h"), new Text( + "b")), credentials); + Assert.assertEquals(3, overlaps2.size()); + Assert.assertEquals(new KeyExtent(tableId, new Text("d"), new Text("cm")), overlaps2.get(0).tablet_extent); + Assert.assertEquals(new KeyExtent(tableId, new Text("dm"), new Text("d")), overlaps2.get(1).tablet_extent); + Assert.assertEquals(new KeyExtent(tableId, new Text("j"), new Text("i")), overlaps2.get(2).tablet_extent); + Assert.assertEquals(locator.invalidated, 1); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java new file mode 100644 index 0000000..fbae24c --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.constraints; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.util.List; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator; +import org.apache.accumulo.tserver.constraints.MetadataConstraints; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.Test; + +public class MetadataConstraintsTest { + + static class TestMetadataConstraints extends MetadataConstraints { + @Override + protected Arbitrator getArbitrator() { + return new Arbitrator() { + + @Override + public boolean transactionAlive(String type, long tid) throws Exception { + if (tid == 9) + throw new RuntimeException("txid 9 reserved for future use"); + return tid == 5 || tid == 7; + } + + @Override + public boolean transactionComplete(String type, long tid) throws Exception { + return tid != 5 && tid != 7; + } + }; + } + } + + @Test + public void testCheck() { + Logger.getLogger(AccumuloConfiguration.class).setLevel(Level.ERROR); + Mutation m = new Mutation(new Text("0;foo")); + TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("1foo".getBytes())); + + MetadataConstraints mc = new MetadataConstraints(); + + List violations = mc.check(null, m); + + assertNotNull(violations); + assertEquals(1, violations.size()); + assertEquals(Short.valueOf((short) 3), violations.get(0)); + + m = new Mutation(new Text("0:foo")); + TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("1poo".getBytes())); + + violations = mc.check(null, m); + + assertNotNull(violations); + assertEquals(1, violations.size()); + assertEquals(Short.valueOf((short) 4), violations.get(0)); + + m = new Mutation(new Text("0;foo")); + m.put(new Text("bad_column_name"), new Text(""), new Value("e".getBytes())); + + violations = mc.check(null, m); + + assertNotNull(violations); + assertEquals(1, violations.size()); + assertEquals(Short.valueOf((short) 2), violations.get(0)); + + m = new Mutation(new Text("!!<")); + TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("1poo".getBytes())); + + violations = mc.check(null, m); + + assertNotNull(violations); + assertEquals(2, violations.size()); + assertEquals(Short.valueOf((short) 4), violations.get(0)); + assertEquals(Short.valueOf((short) 5), violations.get(1)); + + m = new Mutation(new Text("0;foo")); + TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("".getBytes())); + + violations = mc.check(null, m); + + assertNotNull(violations); + assertEquals(1, violations.size()); + assertEquals(Short.valueOf((short) 6), violations.get(0)); + + m = new Mutation(new Text("0;foo")); + TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("bar".getBytes())); + + violations = mc.check(null, m); + + assertEquals(null, violations); + + m = new Mutation(new Text("!0<")); + TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("bar".getBytes())); + + violations = mc.check(null, m); + + assertEquals(null, violations); + + m = new Mutation(new Text("!1<")); + TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, new Value("bar".getBytes())); + + violations = mc.check(null, m); + + assertNotNull(violations); + assertEquals(1, violations.size()); + assertEquals(Short.valueOf((short) 4), violations.get(0)); + + } + + @Test + public void testBulkFileCheck() { + MetadataConstraints mc = new TestMetadataConstraints(); + Mutation m; + List violations; + + // inactive txid + m = new Mutation(new Text("0;foo")); + m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("12345".getBytes())); + m.put(DataFileColumnFamily.NAME, new Text("/someFile"), new Value("1,1".getBytes())); + violations = mc.check(null, m); + assertNotNull(violations); + assertEquals(1, violations.size()); + assertEquals(Short.valueOf((short) 8), violations.get(0)); + + // txid that throws exception + m = new Mutation(new Text("0;foo")); + m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("9".getBytes())); + m.put(DataFileColumnFamily.NAME, new Text("/someFile"), new Value("1,1".getBytes())); + violations = mc.check(null, m); + assertNotNull(violations); + assertEquals(1, violations.size()); + assertEquals(Short.valueOf((short) 8), violations.get(0)); + + // active txid w/ file + m = new Mutation(new Text("0;foo")); + m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes())); + m.put(DataFileColumnFamily.NAME, new Text("/someFile"), new Value("1,1".getBytes())); + violations = mc.check(null, m); + assertNull(violations); + + // active txid w/o file + m = new Mutation(new Text("0;foo")); + m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes())); + violations = mc.check(null, m); + assertNotNull(violations); + assertEquals(1, violations.size()); + assertEquals(Short.valueOf((short) 8), violations.get(0)); + + // two active txids w/ files + m = new Mutation(new Text("0;foo")); + m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes())); + m.put(DataFileColumnFamily.NAME, new Text("/someFile"), new Value("1,1".getBytes())); + m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile2"), new Value("7".getBytes())); + m.put(DataFileColumnFamily.NAME, new Text("/someFile2"), new Value("1,1".getBytes())); + violations = mc.check(null, m); + assertNotNull(violations); + assertEquals(1, violations.size()); + assertEquals(Short.valueOf((short) 8), violations.get(0)); + + // two files w/ one active txid + m = new Mutation(new Text("0;foo")); + m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes())); + m.put(DataFileColumnFamily.NAME, new Text("/someFile"), new Value("1,1".getBytes())); + m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile2"), new Value("5".getBytes())); + m.put(DataFileColumnFamily.NAME, new Text("/someFile2"), new Value("1,1".getBytes())); + violations = mc.check(null, m); + assertNull(violations); + + // two loaded w/ one active txid and one file + m = new Mutation(new Text("0;foo")); + m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes())); + m.put(DataFileColumnFamily.NAME, new Text("/someFile"), new Value("1,1".getBytes())); + m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile2"), new Value("5".getBytes())); + violations = mc.check(null, m); + assertNotNull(violations); + assertEquals(1, violations.size()); + assertEquals(Short.valueOf((short) 8), violations.get(0)); + + // active txid, mutation that looks like split + m = new Mutation(new Text("0;foo")); + m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes())); + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value("/t1".getBytes())); + violations = mc.check(null, m); + assertNull(violations); + + // inactive txid, mutation that looks like split + m = new Mutation(new Text("0;foo")); + m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("12345".getBytes())); + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value("/t1".getBytes())); + violations = mc.check(null, m); + assertNull(violations); + + // active txid, mutation that looks like a load + m = new Mutation(new Text("0;foo")); + m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("5".getBytes())); + m.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("789"), new Value("127.0.0.1:9997".getBytes())); + violations = mc.check(null, m); + assertNull(violations); + + // inactive txid, mutation that looks like a load + m = new Mutation(new Text("0;foo")); + m.put(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile"), new Value("12345".getBytes())); + m.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("789"), new Value("127.0.0.1:9997".getBytes())); + violations = mc.check(null, m); + assertNull(violations); + + // deleting a load flag + m = new Mutation(new Text("0;foo")); + m.putDelete(TabletsSection.BulkFileColumnFamily.NAME, new Text("/someFile")); + violations = mc.check(null, m); + assertNull(violations); + + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/162bd40d/server/base/src/test/java/org/apache/accumulo/server/data/ServerMutationTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/data/ServerMutationTest.java b/server/base/src/test/java/org/apache/accumulo/server/data/ServerMutationTest.java new file mode 100644 index 0000000..0df27f1 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/data/ServerMutationTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.data; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.ReflectionUtils; +import org.junit.Test; + +public class ServerMutationTest { + + @Test + public void test() throws Exception { + ServerMutation m = new ServerMutation(new Text("r1")); + m.put(new Text("cf1"), new Text("cq1"), new Value("v1".getBytes())); + m.put(new Text("cf2"), new Text("cq2"), 56, new Value("v2".getBytes())); + m.setSystemTimestamp(42); + + List updates = m.getUpdates(); + + assertEquals(2, updates.size()); + + assertEquals("r1", new String(m.getRow())); + ColumnUpdate cu = updates.get(0); + + assertEquals("cf1", new String(cu.getColumnFamily())); + assertEquals("cq1", new String(cu.getColumnQualifier())); + assertEquals("", new String(cu.getColumnVisibility())); + assertFalse(cu.hasTimestamp()); + assertEquals(42l, cu.getTimestamp()); + + ServerMutation m2 = new ServerMutation(); + ReflectionUtils.copy(CachedConfiguration.getInstance(), m, m2); + + updates = m2.getUpdates(); + + assertEquals(2, updates.size()); + assertEquals("r1", new String(m2.getRow())); + + cu = updates.get(0); + assertEquals("cf1", new String(cu.getColumnFamily())); + assertEquals("cq1", new String(cu.getColumnQualifier())); + assertFalse(cu.hasTimestamp()); + assertEquals(42l, cu.getTimestamp()); + + cu = updates.get(1); + + assertEquals("r1", new String(m2.getRow())); + assertEquals("cf2", new String(cu.getColumnFamily())); + assertEquals("cq2", new String(cu.getColumnQualifier())); + assertTrue(cu.hasTimestamp()); + assertEquals(56, cu.getTimestamp()); + + + } + +}