Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4524710580 for ; Tue, 4 Feb 2014 17:55:36 +0000 (UTC) Received: (qmail 26400 invoked by uid 500); 4 Feb 2014 17:54:52 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 25567 invoked by uid 500); 4 Feb 2014 17:54:36 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 25447 invoked by uid 99); 4 Feb 2014 17:54:34 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Feb 2014 17:54:34 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 7522C82416F; Tue, 4 Feb 2014 17:54:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Tue, 04 Feb 2014 17:55:14 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [45/48] Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java index a9cdb15,0000000..63bd894 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java @@@ -1,399 -1,0 +1,399 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.master; + +import static org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy.SKIP; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.util.AddressUtil; +import org.apache.accumulo.core.util.ServerServices; +import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.server.util.Halt; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.accumulo.server.zookeeper.ZooCache; +import org.apache.accumulo.server.zookeeper.ZooLock; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.trace.instrument.Tracer; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransport; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NotEmptyException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; + +import com.google.common.net.HostAndPort; + +public class LiveTServerSet implements Watcher { + + public interface Listener { + void update(LiveTServerSet current, Set deleted, Set added); + } + + private static final Logger log = Logger.getLogger(LiveTServerSet.class); + + private final Listener cback; + private final Instance instance; + private final AccumuloConfiguration conf; + private ZooCache zooCache; + + public class TServerConnection { + private final HostAndPort address; + + public TServerConnection(HostAndPort addr) throws TException { + address = addr; + } + + private String lockString(ZooLock mlock) { + return mlock.getLockID().serialize(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK); + } + + public void assignTablet(ZooLock lock, KeyExtent extent) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.loadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift()); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void unloadTablet(ZooLock lock, KeyExtent extent, boolean save) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.unloadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift(), save); + } finally { + ThriftUtil.returnClient(client); + } + } + + public TabletServerStatus getTableMap(boolean usePooledConnection) throws TException, ThriftSecurityException { + + if (usePooledConnection == true) + throw new UnsupportedOperationException(); + + TTransport transport = ThriftUtil.createTransport(address, conf); + + try { + TabletClientService.Client client = ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport); + return client.getTabletServerStatus(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance)); + } finally { + if (transport != null) + transport.close(); + } + } + + public void halt(ZooLock lock) throws TException, ThriftSecurityException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.halt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock)); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void fastHalt(ZooLock lock) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.fastHalt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock)); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void flush(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.flush(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId, + startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow)); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void chop(ZooLock lock, KeyExtent extent) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.chop(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift()); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void splitTablet(ZooLock lock, KeyExtent extent, Text splitPoint) throws TException, ThriftSecurityException, NotServingTabletException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.splitTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), extent.toThrift(), + ByteBuffer.wrap(splitPoint.getBytes(), 0, splitPoint.getLength())); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void flushTablet(ZooLock lock, KeyExtent extent) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.flushTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift()); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void compact(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.compact(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId, + startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow)); + } finally { + ThriftUtil.returnClient(client); + } + } + + public boolean isActive(long tid) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + return client.isActive(Tracer.traceInfo(), tid); + } finally { + ThriftUtil.returnClient(client); + } + } + + } + + static class TServerInfo { + TServerConnection connection; + TServerInstance instance; + + TServerInfo(TServerInstance instance, TServerConnection connection) { + this.connection = connection; + this.instance = instance; + } + }; + + // The set of active tservers with locks, indexed by their name in zookeeper + private Map current = new HashMap(); + // as above, indexed by TServerInstance + private Map currentInstances = new HashMap(); + + // The set of entries in zookeeper without locks, and the first time each was noticed + private Map locklessServers = new HashMap(); + + public LiveTServerSet(Instance instance, AccumuloConfiguration conf, Listener cback) { + this.cback = cback; + this.instance = instance; + this.conf = conf; + + } + + public synchronized ZooCache getZooCache() { + if (zooCache == null) + zooCache = new ZooCache(this); + return zooCache; + } + + public synchronized void startListeningForTabletServerChanges() { + scanServers(); + SimpleTimer.getInstance().schedule(new Runnable() { + @Override + public void run() { + scanServers(); + } + }, 0, 5000); + } + + public synchronized void scanServers() { + try { + final Set updates = new HashSet(); + final Set doomed = new HashSet(); + + final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS; + + HashSet all = new HashSet(current.keySet()); + all.addAll(getZooCache().getChildren(path)); + + locklessServers.keySet().retainAll(all); + + for (String zPath : all) { + checkServer(updates, doomed, path, zPath); + } + + // log.debug("Current: " + current.keySet()); + if (!doomed.isEmpty() || !updates.isEmpty()) + this.cback.update(this, doomed, updates); + } catch (Exception ex) { + log.error(ex, ex); + } + } + + private void deleteServerNode(String serverNode) throws InterruptedException, KeeperException { + try { + ZooReaderWriter.getInstance().delete(serverNode, -1); + } catch (NotEmptyException ex) { + // race condition: tserver created the lock after our last check; we'll see it at the next check + } catch (NoNodeException nne) { + // someone else deleted it + } + } + + private synchronized void checkServer(final Set updates, final Set doomed, final String path, final String zPath) + throws TException, InterruptedException, KeeperException { + + TServerInfo info = current.get(zPath); + + final String lockPath = path + "/" + zPath; + Stat stat = new Stat(); + byte[] lockData = ZooLock.getLockData(getZooCache(), lockPath, stat); + + if (lockData == null) { + if (info != null) { + doomed.add(info.instance); + current.remove(zPath); + currentInstances.remove(info.instance); + } + + Long firstSeen = locklessServers.get(zPath); + if (firstSeen == null) { + locklessServers.put(zPath, System.currentTimeMillis()); + } else if (System.currentTimeMillis() - firstSeen > 10 * 60 * 1000) { + deleteServerNode(path + "/" + zPath); + locklessServers.remove(zPath); + } + } else { + locklessServers.remove(zPath); - ServerServices services = new ServerServices(new String(lockData)); ++ ServerServices services = new ServerServices(new String(lockData, Constants.UTF8)); + HostAndPort client = services.getAddress(ServerServices.Service.TSERV_CLIENT); + TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner()); + + if (info == null) { + updates.add(instance); + TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client)); + current.put(zPath, tServerInfo); + currentInstances.put(instance, tServerInfo); + } else if (!info.instance.equals(instance)) { + doomed.add(info.instance); + updates.add(instance); + TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client)); + current.put(zPath, tServerInfo); + currentInstances.put(info.instance, tServerInfo); + } + } + } + + @Override + public void process(WatchedEvent event) { + + // its important that these event are propagated by ZooCache, because this ensures when reading zoocache that is has already processed the event and cleared + // relevant nodes before code below reads from zoocache + + if (event.getPath() != null) { + if (event.getPath().endsWith(Constants.ZTSERVERS)) { + scanServers(); + } else if (event.getPath().contains(Constants.ZTSERVERS)) { + int pos = event.getPath().lastIndexOf('/'); + + // do only if ZTSERVER is parent + if (pos >= 0 && event.getPath().substring(0, pos).endsWith(Constants.ZTSERVERS)) { + + String server = event.getPath().substring(pos + 1); + + final Set updates = new HashSet(); + final Set doomed = new HashSet(); + + final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS; + + try { + checkServer(updates, doomed, path, server); + if (!doomed.isEmpty() || !updates.isEmpty()) + this.cback.update(this, doomed, updates); + } catch (Exception ex) { + log.error(ex, ex); + } + } + } + } + } + + public synchronized TServerConnection getConnection(TServerInstance server) { + if (server == null) + return null; + TServerInfo tServerInfo = currentInstances.get(server); + if (tServerInfo == null) + return null; + return tServerInfo.connection; + } + + public synchronized Set getCurrentServers() { + return new HashSet(currentInstances.keySet()); + } + + public synchronized int size() { + return current.size(); + } + + public synchronized TServerInstance find(String tabletServer) { + HostAndPort addr = AddressUtil.parseAddress(tabletServer, false); + for (Entry entry : current.entrySet()) { + if (entry.getValue().instance.getLocation().equals(addr)) + return entry.getValue().instance; + } + return null; + } + + public synchronized void remove(TServerInstance server) { + String zPath = null; + for (Entry entry : current.entrySet()) { + if (entry.getValue().instance.equals(server)) { + zPath = entry.getKey(); + break; + } + } + if (zPath == null) + return; + current.remove(zPath); + currentInstances.remove(server); + + log.info("Removing zookeeper lock for " + server); + String fullpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + zPath; + try { + ZooReaderWriter.getRetryingInstance().recursiveDelete(fullpath, SKIP); + } catch (Exception e) { + String msg = "error removing tablet server lock"; + log.fatal(msg, e); + Halt.halt(msg, -1); + } + getZooCache().clear(fullpath); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java index e1ffd2f,0000000..2f657c4 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/DeadServerList.java @@@ -1,89 -1,0 +1,90 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.master.state; + +import java.util.ArrayList; +import java.util.List; + ++import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.master.thrift.DeadServer; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.log4j.Logger; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.KeeperException.NoNodeException; + +public class DeadServerList { + private static final Logger log = Logger.getLogger(DeadServerList.class); + private final String path; + + public DeadServerList(String path) { + this.path = path; + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + try { + zoo.mkdirs(path); + } catch (Exception ex) { + log.error("Unable to make parent directories of " + path, ex); + } + } + + public List getList() { + List result = new ArrayList(); + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + try { + List children = zoo.getChildren(path); + if (children != null) { + for (String child : children) { + Stat stat = new Stat(); + byte[] data; + try { + data = zoo.getData(path + "/" + child, stat); + } catch (NoNodeException nne) { + // Another thread or process can delete child while this loop is running. + // We ignore this error since it's harmless if we miss the deleted server + // in the dead server list. + continue; + } - DeadServer server = new DeadServer(child, stat.getMtime(), new String(data)); ++ DeadServer server = new DeadServer(child, stat.getMtime(), new String(data, Constants.UTF8)); + result.add(server); + } + } + } catch (Exception ex) { + log.error(ex, ex); + } + return result; + } + + public void delete(String server) { + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + try { + zoo.recursiveDelete(path + "/" + server, NodeMissingPolicy.SKIP); + } catch (Exception ex) { + log.error(ex, ex); + } + } + + public void post(String server, String cause) { + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + try { - zoo.putPersistentData(path + "/" + server, cause.getBytes(), NodeExistsPolicy.SKIP); ++ zoo.putPersistentData(path + "/" + server, cause.getBytes(Constants.UTF8), NodeExistsPolicy.SKIP); + } catch (Exception ex) { + log.error(ex, ex); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java index 90a9d19,0000000..f0ac7bc mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java @@@ -1,191 -1,0 +1,191 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.master.state; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.SortedMap; + +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +public class MetaDataTableScanner implements Iterator { + private static final Logger log = Logger.getLogger(MetaDataTableScanner.class); + + BatchScanner mdScanner = null; + Iterator> iter = null; + + public MetaDataTableScanner(Instance instance, Credentials credentials, Range range, CurrentState state) { + this(instance, credentials, range, state, MetadataTable.NAME); + } + + MetaDataTableScanner(Instance instance, Credentials credentials, Range range, CurrentState state, String tableName) { + // scan over metadata table, looking for tablets in the wrong state based on the live servers and online tables + try { + Connector connector = instance.getConnector(credentials.getPrincipal(), credentials.getToken()); + mdScanner = connector.createBatchScanner(tableName, Authorizations.EMPTY, 8); + configureScanner(mdScanner, state); + mdScanner.setRanges(Collections.singletonList(range)); + iter = mdScanner.iterator(); + } catch (Exception ex) { + if (mdScanner != null) + mdScanner.close(); + iter = null; + mdScanner = null; + throw new RuntimeException(ex); + } + } + + static public void configureScanner(ScannerBase scanner, CurrentState state) { + TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); + scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME); + scanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME); + scanner.fetchColumnFamily(TabletsSection.LastLocationColumnFamily.NAME); + scanner.fetchColumnFamily(LogColumnFamily.NAME); + scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); + scanner.addScanIterator(new IteratorSetting(1000, "wholeRows", WholeRowIterator.class)); + IteratorSetting tabletChange = new IteratorSetting(1001, "tabletChange", TabletStateChangeIterator.class); + if (state != null) { + TabletStateChangeIterator.setCurrentServers(tabletChange, state.onlineTabletServers()); + TabletStateChangeIterator.setOnlineTables(tabletChange, state.onlineTables()); + TabletStateChangeIterator.setMerges(tabletChange, state.merges()); + } + scanner.addScanIterator(tabletChange); + } + + public MetaDataTableScanner(Instance instance, Credentials credentials, Range range) { + this(instance, credentials, range, MetadataTable.NAME); + } + + public MetaDataTableScanner(Instance instance, Credentials credentials, Range range, String tableName) { + this(instance, credentials, range, null, tableName); + } + + public void close() { + if (iter != null) { + mdScanner.close(); + iter = null; + } + } + + @Override - public void finalize() { ++ protected void finalize() { + close(); + } + + @Override + public boolean hasNext() { + if (iter == null) + return false; + boolean result = iter.hasNext(); + if (!result) { + close(); + } + return result; + } + + @Override + public TabletLocationState next() { + return fetch(); + } + + public static TabletLocationState createTabletLocationState(Key k, Value v) throws IOException, BadLocationStateException { + final SortedMap decodedRow = WholeRowIterator.decodeRow(k, v); + KeyExtent extent = null; + TServerInstance future = null; + TServerInstance current = null; + TServerInstance last = null; + long lastTimestamp = 0; + List> walogs = new ArrayList>(); + boolean chopped = false; + + for (Entry entry : decodedRow.entrySet()) { + Key key = entry.getKey(); + Text row = key.getRow(); + Text cf = key.getColumnFamily(); + Text cq = key.getColumnQualifier(); + + if (cf.compareTo(TabletsSection.FutureLocationColumnFamily.NAME) == 0) { + TServerInstance location = new TServerInstance(entry.getValue(), cq); + if (future != null) { + throw new BadLocationStateException("found two assignments for the same extent " + key.getRow() + ": " + future + " and " + location, entry.getKey().getRow()); + } + future = location; + } else if (cf.compareTo(TabletsSection.CurrentLocationColumnFamily.NAME) == 0) { + TServerInstance location = new TServerInstance(entry.getValue(), cq); + if (current != null) { + throw new BadLocationStateException("found two locations for the same extent " + key.getRow() + ": " + current + " and " + location, entry.getKey().getRow()); + } + current = location; + } else if (cf.compareTo(LogColumnFamily.NAME) == 0) { + String[] split = entry.getValue().toString().split("\\|")[0].split(";"); + walogs.add(Arrays.asList(split)); + } else if (cf.compareTo(TabletsSection.LastLocationColumnFamily.NAME) == 0) { + if (lastTimestamp < entry.getKey().getTimestamp()) + last = new TServerInstance(entry.getValue(), cq); + } else if (cf.compareTo(ChoppedColumnFamily.NAME) == 0) { + chopped = true; + } else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.equals(cf, cq)) { + extent = new KeyExtent(row, entry.getValue()); + } + } + if (extent == null) { + log.warn("No prev-row for key extent: " + decodedRow); + return null; + } + return new TabletLocationState(extent, future, current, last, walogs, chopped); + } + + private TabletLocationState fetch() { + try { + Entry e = iter.next(); + return createTabletLocationState(e.getKey(), e.getValue()); + } catch (IOException ex) { + throw new RuntimeException(ex); + } catch (BadLocationStateException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public void remove() { + throw new RuntimeException("Unimplemented"); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java index e7dce67,0000000..400d0ac mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java @@@ -1,142 -1,0 +1,143 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.master.state; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + ++import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.util.AddressUtil; +import org.apache.hadoop.io.Text; + +import com.google.common.net.HostAndPort; + +/** + * A tablet is assigned to a tablet server at the given address as long as it is alive and well. When the tablet server is restarted, the instance information + * it advertises will change. Therefore tablet assignments can be considered out-of-date if the tablet server instance information has been changed. + * + */ +public class TServerInstance implements Comparable, Serializable { + + private static final long serialVersionUID = 1L; + + // HostAndPort is not Serializable + private transient HostAndPort location; + private String session; + private String cachedStringRepresentation; + + public TServerInstance(HostAndPort address, String session) { + this.location = address; + this.session = session; + this.cachedStringRepresentation = hostPort() + "[" + session + "]"; + } + + public TServerInstance(HostAndPort address, long session) { + this(address, Long.toHexString(session)); + } + + public TServerInstance(String address, long session) { + this(AddressUtil.parseAddress(address, false), Long.toHexString(session)); + } + + public TServerInstance(Value address, Text session) { - this(AddressUtil.parseAddress(new String(address.get()), false), session.toString()); ++ this(AddressUtil.parseAddress(new String(address.get(), Constants.UTF8), false), session.toString()); + } + + public void putLocation(Mutation m) { + m.put(TabletsSection.CurrentLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue()); + } + + public void putFutureLocation(Mutation m) { + m.put(TabletsSection.FutureLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue()); + } + + public void putLastLocation(Mutation m) { + m.put(TabletsSection.LastLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue()); + } + + public void clearLastLocation(Mutation m) { + m.putDelete(TabletsSection.LastLocationColumnFamily.NAME, asColumnQualifier()); + } + + @Override + public int compareTo(TServerInstance other) { + if (this == other) + return 0; + return this.toString().compareTo(other.toString()); + } + + @Override + public int hashCode() { + return toString().hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof TServerInstance) { + return compareTo((TServerInstance) obj) == 0; + } + return false; + } + + @Override + public String toString() { + return cachedStringRepresentation; + } + + public int port() { + return getLocation().getPort(); + } + + public String host() { + return getLocation().getHostText(); + } + + public String hostPort() { + return getLocation().toString(); + } + + public Text asColumnQualifier() { + return new Text(this.getSession()); + } + + public Value asMutationValue() { - return new Value(getLocation().toString().getBytes()); ++ return new Value(getLocation().toString().getBytes(Constants.UTF8)); + } + + public HostAndPort getLocation() { + return location; + } + + public String getSession() { + return session; + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + out.writeObject(location.toString()); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + location = HostAndPort.fromString(in.readObject().toString()); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java index d1cdd9d,0000000..5749523 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java @@@ -1,188 -1,0 +1,189 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.master.state; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + ++import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SkippingIterator; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.util.AddressUtil; +import org.apache.accumulo.core.util.StringUtil; +import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; + +public class TabletStateChangeIterator extends SkippingIterator { + + private static final String SERVERS_OPTION = "servers"; + private static final String TABLES_OPTION = "tables"; + private static final String MERGES_OPTION = "merges"; + // private static final Logger log = Logger.getLogger(TabletStateChangeIterator.class); + + Set current; + Set onlineTables; + Map merges; + + @Override + public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { + super.init(source, options, env); + current = parseServers(options.get(SERVERS_OPTION)); + onlineTables = parseTables(options.get(TABLES_OPTION)); + merges = parseMerges(options.get(MERGES_OPTION)); + } + + private Set parseTables(String tables) { + if (tables == null) + return null; + Set result = new HashSet(); + for (String table : tables.split(",")) + result.add(table); + return result; + } + + private Set parseServers(String servers) { + if (servers == null) + return null; + // parse "host:port[INSTANCE]" + Set result = new HashSet(); + if (servers.length() > 0) { + for (String part : servers.split(",")) { + String parts[] = part.split("\\[", 2); + String hostport = parts[0]; + String instance = parts[1]; + if (instance != null && instance.endsWith("]")) + instance = instance.substring(0, instance.length() - 1); + result.add(new TServerInstance(AddressUtil.parseAddress(hostport, false), instance)); + } + } + return result; + } + + private Map parseMerges(String merges) { + if (merges == null) + return null; + try { + Map result = new HashMap(); + DataInputBuffer buffer = new DataInputBuffer(); - byte[] data = Base64.decodeBase64(merges.getBytes()); ++ byte[] data = Base64.decodeBase64(merges.getBytes(Constants.UTF8)); + buffer.reset(data, data.length); + while (buffer.available() > 0) { + MergeInfo mergeInfo = new MergeInfo(); + mergeInfo.readFields(buffer); + result.put(mergeInfo.extent.getTableId(), mergeInfo); + } + return result; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + @Override + protected void consume() throws IOException { + while (getSource().hasTop()) { + Key k = getSource().getTopKey(); + Value v = getSource().getTopValue(); + + if (onlineTables == null || current == null) + return; + + TabletLocationState tls; + try { + tls = MetaDataTableScanner.createTabletLocationState(k, v); + if (tls == null) + return; + } catch (BadLocationStateException e) { + // maybe the master can do something with a tablet with bad/inconsistent state + return; + } + // we always want data about merges + MergeInfo merge = merges.get(tls.extent.getTableId()); + if (merge != null && merge.getExtent() != null && merge.getExtent().overlaps(tls.extent)) { + return; + } + // is the table supposed to be online or offline? + boolean shouldBeOnline = onlineTables.contains(tls.extent.getTableId().toString()); + + switch (tls.getState(current)) { + case ASSIGNED: + // we always want data about assigned tablets + return; + case HOSTED: + if (!shouldBeOnline) + return; + case ASSIGNED_TO_DEAD_SERVER: + return; + case UNASSIGNED: + if (shouldBeOnline) + return; + } + // table is in the expected state so don't bother returning any information about it + getSource().next(); + } + } + + @Override + public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { + throw new UnsupportedOperationException(); + } + + public static void setCurrentServers(IteratorSetting cfg, Set goodServers) { + if (goodServers != null) { + List servers = new ArrayList(); + for (TServerInstance server : goodServers) + servers.add(server.toString()); + cfg.addOption(SERVERS_OPTION, StringUtil.join(servers, ",")); + } + } + + public static void setOnlineTables(IteratorSetting cfg, Set onlineTables) { + if (onlineTables != null) + cfg.addOption(TABLES_OPTION, StringUtil.join(onlineTables, ",")); + } + + public static void setMerges(IteratorSetting cfg, Collection merges) { + DataOutputBuffer buffer = new DataOutputBuffer(); + try { + for (MergeInfo info : merges) { + KeyExtent extent = info.getExtent(); + if (extent != null && !info.getState().equals(MergeState.NONE)) { + info.write(buffer); + } + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } - String encoded = new String(Base64.encodeBase64(Arrays.copyOf(buffer.getData(), buffer.getLength()))); ++ String encoded = new String(Base64.encodeBase64(Arrays.copyOf(buffer.getData(), buffer.getLength())), Constants.UTF8); + cfg.addOption(MERGES_OPTION, encoded); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java index bce6681,0000000..b0ed03f mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java @@@ -1,96 -1,0 +1,97 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.master.state; + +import java.io.IOException; +import java.util.List; + ++import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.zookeeper.ZooCache; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.log4j.Logger; + +public class ZooStore implements DistributedStore { + + private static final Logger log = Logger.getLogger(ZooStore.class); + + String basePath; + + ZooCache cache = new ZooCache(); + + public ZooStore(String basePath) throws IOException { + if (basePath.endsWith("/")) + basePath = basePath.substring(0, basePath.length() - 1); + this.basePath = basePath; + } + + public ZooStore() throws IOException { + this(ZooUtil.getRoot(HdfsZooInstance.getInstance().getInstanceID())); + } + + @Override + public byte[] get(String path) throws DistributedStoreException { + try { + return cache.get(relative(path)); + } catch (Exception ex) { + throw new DistributedStoreException(ex); + } + } + + private String relative(String path) { + return basePath + path; + } + + @Override + public List getChildren(String path) throws DistributedStoreException { + try { + return cache.getChildren(relative(path)); + } catch (Exception ex) { + throw new DistributedStoreException(ex); + } + } + + @Override + public void put(String path, byte[] bs) throws DistributedStoreException { + try { + path = relative(path); + ZooReaderWriter.getInstance().putPersistentData(path, bs, NodeExistsPolicy.OVERWRITE); + cache.clear(); - log.debug("Wrote " + new String(bs) + " to " + path); ++ log.debug("Wrote " + new String(bs, Constants.UTF8) + " to " + path); + } catch (Exception ex) { + throw new DistributedStoreException(ex); + } + } + + @Override + public void remove(String path) throws DistributedStoreException { + try { + log.debug("Removing " + path); + path = relative(path); + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + if (zoo.exists(path)) + zoo.recursiveDelete(path, NodeMissingPolicy.SKIP); + cache.clear(); + } catch (Exception ex) { + throw new DistributedStoreException(ex); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java index c792917,0000000..bbd1257 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java @@@ -1,174 -1,0 +1,175 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.master.state; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + ++import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.commons.lang.NotImplementedException; +import org.apache.log4j.Logger; + +import com.google.common.net.HostAndPort; + +public class ZooTabletStateStore extends TabletStateStore { + + private static final Logger log = Logger.getLogger(ZooTabletStateStore.class); + final private DistributedStore store; + + public ZooTabletStateStore(DistributedStore store) { + this.store = store; + } + + public ZooTabletStateStore() throws DistributedStoreException { + try { + store = new ZooStore(); + } catch (IOException ex) { + throw new DistributedStoreException(ex); + } + } + + @Override + public Iterator iterator() { + return new Iterator() { + boolean finished = false; + + @Override + public boolean hasNext() { + return !finished; + } + + @Override + public TabletLocationState next() { + finished = true; + try { + byte[] future = store.get(RootTable.ZROOT_TABLET_FUTURE_LOCATION); + byte[] current = store.get(RootTable.ZROOT_TABLET_LOCATION); + byte[] last = store.get(RootTable.ZROOT_TABLET_LAST_LOCATION); + + TServerInstance currentSession = null; + TServerInstance futureSession = null; + TServerInstance lastSession = null; + + if (future != null) + futureSession = parse(future); + + if (last != null) + lastSession = parse(last); + + if (current != null) { + currentSession = parse(current); + futureSession = null; + } + List> logs = new ArrayList>(); + for (String entry : store.getChildren(RootTable.ZROOT_TABLET_WALOGS)) { + byte[] logInfo = store.get(RootTable.ZROOT_TABLET_WALOGS + "/" + entry); + if (logInfo != null) { + LogEntry logEntry = new LogEntry(); + logEntry.fromBytes(logInfo); + logs.add(logEntry.logSet); + log.debug("root tablet logSet " + logEntry.logSet); + } + } + TabletLocationState result = new TabletLocationState(RootTable.EXTENT, futureSession, currentSession, lastSession, logs, false); + log.debug("Returning root tablet state: " + result); + return result; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + @Override + public void remove() { + throw new NotImplementedException(); + } + }; + } + + protected TServerInstance parse(byte[] current) { - String str = new String(current); ++ String str = new String(current, Constants.UTF8); + String[] parts = str.split("[|]", 2); + HostAndPort address = HostAndPort.fromString(parts[0]); + if (parts.length > 1 && parts[1] != null && parts[1].length() > 0) { + return new TServerInstance(address, parts[1]); + } else { + // a 1.2 location specification: DO NOT WANT + return null; + } + } + + @Override + public void setFutureLocations(Collection assignments) throws DistributedStoreException { + if (assignments.size() != 1) + throw new IllegalArgumentException("There is only one root tablet"); + Assignment assignment = assignments.iterator().next(); + if (assignment.tablet.compareTo(RootTable.EXTENT) != 0) + throw new IllegalArgumentException("You can only store the root tablet location"); + String value = assignment.server.getLocation() + "|" + assignment.server.getSession(); + Iterator currentIter = iterator(); + TabletLocationState current = currentIter.next(); + if (current.current != null) { + throw new DistributedStoreException("Trying to set the root tablet location: it is already set to " + current.current); + } - store.put(RootTable.ZROOT_TABLET_FUTURE_LOCATION, value.getBytes()); ++ store.put(RootTable.ZROOT_TABLET_FUTURE_LOCATION, value.getBytes(Constants.UTF8)); + } + + @Override + public void setLocations(Collection assignments) throws DistributedStoreException { + if (assignments.size() != 1) + throw new IllegalArgumentException("There is only one root tablet"); + Assignment assignment = assignments.iterator().next(); + if (assignment.tablet.compareTo(RootTable.EXTENT) != 0) + throw new IllegalArgumentException("You can only store the root tablet location"); + String value = assignment.server.getLocation() + "|" + assignment.server.getSession(); + Iterator currentIter = iterator(); + TabletLocationState current = currentIter.next(); + if (current.current != null) { + throw new DistributedStoreException("Trying to set the root tablet location: it is already set to " + current.current); + } + if (!current.future.equals(assignment.server)) { + throw new DistributedStoreException("Root tablet is already assigned to " + current.future); + } - store.put(RootTable.ZROOT_TABLET_LOCATION, value.getBytes()); - store.put(RootTable.ZROOT_TABLET_LAST_LOCATION, value.getBytes()); ++ store.put(RootTable.ZROOT_TABLET_LOCATION, value.getBytes(Constants.UTF8)); ++ store.put(RootTable.ZROOT_TABLET_LAST_LOCATION, value.getBytes(Constants.UTF8)); + // Make the following unnecessary by making the entire update atomic + store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION); + log.debug("Put down root tablet location"); + } + + @Override + public void unassign(Collection tablets) throws DistributedStoreException { + if (tablets.size() != 1) + throw new IllegalArgumentException("There is only one root tablet"); + TabletLocationState tls = tablets.iterator().next(); + if (tls.extent.compareTo(RootTable.EXTENT) != 0) + throw new IllegalArgumentException("You can only store the root tablet location"); + store.remove(RootTable.ZROOT_TABLET_LOCATION); + store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION); + log.debug("unassign root tablet location"); + } + + @Override + public String name() { + return "Root Table"; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java index 9735371,0000000..cdcdfba mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java @@@ -1,272 -1,0 +1,276 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.metrics; + +import java.io.File; - import java.io.FileWriter; ++import java.io.FileOutputStream; +import java.io.IOException; ++import java.io.OutputStreamWriter; ++import java.io.Writer; +import java.lang.management.ManagementFactory; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.ConcurrentHashMap; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.StandardMBean; + ++import org.apache.accumulo.core.Constants; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.time.DateUtils; + +public abstract class AbstractMetricsImpl { + + public class Metric { + + private long count = 0; + private long avg = 0; + private long min = 0; + private long max = 0; + + public long getCount() { + return count; + } + + public long getAvg() { + return avg; + } + + public long getMin() { + return min; + } + + public long getMax() { + return max; + } + + public void incCount() { + count++; + } + + public void addAvg(long a) { + if (a < 0) + return; + avg = (long) ((avg * .8) + (a * .2)); + } + + public void addMin(long a) { + if (a < 0) + return; + min = Math.min(min, a); + } + + public void addMax(long a) { + if (a < 0) + return; + max = Math.max(max, a); + } + + @Override + public String toString() { + return new ToStringBuilder(this).append("count", count).append("average", avg).append("minimum", min).append("maximum", max).toString(); + } + + } + + static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(AbstractMetricsImpl.class); + + private static ConcurrentHashMap registry = new ConcurrentHashMap(); + + private boolean currentlyLogging = false; + + private File logDir = null; + + private String metricsPrefix = null; + + private Date today = new Date(); + + private File logFile = null; + - private FileWriter logWriter = null; ++ private Writer logWriter = null; + + private SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd"); + + private SimpleDateFormat logFormatter = new SimpleDateFormat("yyyyMMddhhmmssz"); + + private MetricsConfiguration config = null; + + public AbstractMetricsImpl() { + this.metricsPrefix = getMetricsPrefix(); + config = new MetricsConfiguration(metricsPrefix); + } + + /** + * Registers a StandardMBean with the MBean Server + * + * @throws Exception + */ + public void register(StandardMBean mbean) throws Exception { + // Register this object with the MBeanServer + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + if (null == getObjectName()) + throw new IllegalArgumentException("MBean object name must be set."); + mbs.registerMBean(mbean, getObjectName()); + + setupLogging(); + } + + /** + * Registers this MBean with the MBean Server + * + * @throws Exception + */ + public void register() throws Exception { + // Register this object with the MBeanServer + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + if (null == getObjectName()) + throw new IllegalArgumentException("MBean object name must be set."); + mbs.registerMBean(this, getObjectName()); + setupLogging(); + } + + public void createMetric(String name) { + registry.put(name, new Metric()); + } + + public Metric getMetric(String name) { + return registry.get(name); + } + + public long getMetricCount(String name) { + return registry.get(name).getCount(); + } + + public long getMetricAvg(String name) { + return registry.get(name).getAvg(); + } + + public long getMetricMin(String name) { + return registry.get(name).getMin(); + } + + public long getMetricMax(String name) { + return registry.get(name).getMax(); + } + + private void setupLogging() throws IOException { + if (null == config.getMetricsConfiguration()) + return; + // If we are already logging, then return + if (!currentlyLogging && config.getMetricsConfiguration().getBoolean(metricsPrefix + ".logging", false)) { + // Check to see if directory exists, else make it + String mDir = config.getMetricsConfiguration().getString("logging.dir"); + if (null != mDir) { + File dir = new File(mDir); + if (!dir.isDirectory()) - dir.mkdir(); ++ if (!dir.mkdir()) ++ log.warn("Could not create log directory: " + dir); + logDir = dir; + // Create new log file + startNewLog(); + } + currentlyLogging = true; + } + } + + private void startNewLog() throws IOException { + if (null != logWriter) { + logWriter.flush(); + logWriter.close(); + } + logFile = new File(logDir, metricsPrefix + "-" + formatter.format(today) + ".log"); + if (!logFile.exists()) { + if (!logFile.createNewFile()) { + log.error("Unable to create new log file"); + currentlyLogging = false; + return; + } + } - logWriter = new FileWriter(logFile, true); ++ logWriter = new OutputStreamWriter(new FileOutputStream(logFile, true), Constants.UTF8); + } + + private void writeToLog(String name) throws IOException { + if (null == logWriter) + return; + // Increment the date if we have to + Date now = new Date(); + if (!DateUtils.isSameDay(today, now)) { + today = now; + startNewLog(); + } + logWriter.append(logFormatter.format(now)).append(" Metric: ").append(name).append(": ").append(registry.get(name).toString()).append("\n"); + } + + public void add(String name, long time) { + if (isEnabled()) { + registry.get(name).incCount(); + registry.get(name).addAvg(time); + registry.get(name).addMin(time); + registry.get(name).addMax(time); + // If we are not currently logging and should be, then initialize + if (!currentlyLogging && config.getMetricsConfiguration().getBoolean(metricsPrefix + ".logging", false)) { + try { + setupLogging(); + } catch (IOException ioe) { + log.error("Error setting up log", ioe); + } + } else if (currentlyLogging && !config.getMetricsConfiguration().getBoolean(metricsPrefix + ".logging", false)) { + // if we are currently logging and shouldn't be, then close logs + try { + logWriter.flush(); + logWriter.close(); + logWriter = null; + logFile = null; + } catch (Exception e) { + log.error("Error stopping metrics logging", e); + } + currentlyLogging = false; + } + if (currentlyLogging) { + try { + writeToLog(name); + } catch (IOException ioe) { + log.error("Error writing to metrics log", ioe); + } + } + } + } + + public boolean isEnabled() { + return config.isEnabled(); + } + + protected abstract ObjectName getObjectName(); + + protected abstract String getMetricsPrefix(); + + @Override + protected void finalize() { + if (null != logWriter) { + try { + logWriter.close(); + } catch (Exception e) { + // do nothing + } finally { + logWriter = null; + } + } + logFile = null; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java index fd83e97,0000000..4df585d mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java +++ b/server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java @@@ -1,158 -1,0 +1,158 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.monitor; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.log4j.net.SocketNode; +import org.apache.log4j.spi.LoggingEvent; + +/** + * Hijack log4j and capture log events for display. + * + */ +public class LogService extends org.apache.log4j.AppenderSkeleton { + + private static final Logger log = Logger.getLogger(LogService.class); + + /** + * Read logging events forward to us over the net. + * + */ + static class SocketServer implements Runnable { + private ServerSocket server = null; + + public SocketServer(int port) { + try { + server = new ServerSocket(port); + } catch (IOException io) { + throw new RuntimeException(io); + } + } + + public int getLocalPort() { + return server.getLocalPort(); + } + + @Override + public void run() { + try { + while (true) { + log.debug("Waiting for log message senders"); + Socket socket = server.accept(); + log.debug("Got a new connection"); + Thread t = new Daemon(new SocketNode(socket, LogManager.getLoggerRepository())); + t.start(); + } + } catch (IOException io) { + log.error(io, io); + } + } + } + + public static void startLogListener(AccumuloConfiguration conf, String instanceId) { + try { + SocketServer server = new SocketServer(conf.getPort(Property.MONITOR_LOG4J_PORT)); + ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instanceId) + Constants.ZMONITOR_LOG4J_PORT, - Integer.toString(server.getLocalPort()).getBytes(), NodeExistsPolicy.OVERWRITE); ++ Integer.toString(server.getLocalPort()).getBytes(Constants.UTF8), NodeExistsPolicy.OVERWRITE); + new Daemon(server).start(); + } catch (Throwable t) { + log.info("Unable to listen to cluster-wide ports", t); + } + } + + static private LogService instance = null; + + synchronized public static LogService getInstance() { + if (instance == null) + return new LogService(); + return instance; + } + + private static final int MAX_LOGS = 50; + + private LinkedHashMap events = new LinkedHashMap(MAX_LOGS + 1, (float) .75, true) { + + private static final long serialVersionUID = 1L; + + @Override + @SuppressWarnings("rawtypes") + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > MAX_LOGS; + } + }; + + public LogService() { + synchronized (LogService.class) { + instance = this; + } + } + + @Override + synchronized protected void append(LoggingEvent ev) { + Object application = ev.getMDC("application"); + if (application == null || application.toString().isEmpty()) + return; + + DedupedLogEvent dev = new DedupedLogEvent(ev); + + // if event is present, increase the count + if (events.containsKey(dev.toString())) { + DedupedLogEvent oldDev = events.remove(dev.toString()); + dev.setCount(oldDev.getCount() + 1); + } + events.put(dev.toString(), dev); + } + + @Override + public void close() { + events = null; + } + + @Override + public synchronized void doAppend(LoggingEvent event) { + super.doAppend(event); + } + + @Override + public boolean requiresLayout() { + return false; + } + + synchronized public List getEvents() { + return new ArrayList(events.values()); + } + + synchronized public void clear() { + events.clear(); + } +}