accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [13/59] [abbrv] ACCUMULO-658 Move master to its own module
Date Sat, 07 Sep 2013 03:28:16 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletServerState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletServerState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletServerState.java
deleted file mode 100644
index 23f16e3..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletServerState.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.state;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
-
-public enum TabletServerState {
-  // not a valid state, reserved for internal use only
-  RESERVED((byte) (-1)),
-  
-  // the following are normally functioning states
-  NEW((byte) 0),
-  ONLINE((byte) 1),
-  UNRESPONSIVE((byte) 2),
-  DOWN((byte) 3),
-  
-  // the following are bad states and cause tservers to be ignored by the master
-  BAD_SYSTEM_PASSWORD((byte) 101),
-  BAD_VERSION((byte) 102),
-  BAD_INSTANCE((byte) 103),
-  BAD_CONFIG((byte) 104),
-  BAD_VERSION_AND_INSTANCE((byte) 105),
-  BAD_VERSION_AND_CONFIG((byte) 106),
-  BAD_VERSION_AND_INSTANCE_AND_CONFIG((byte) 107),
-  BAD_INSTANCE_AND_CONFIG((byte) 108);
-  
-  private byte id;
-  
-  private static HashMap<Byte,TabletServerState> mapping;
-  private static HashSet<TabletServerState> badStates;
-  
-  static {
-    mapping = new HashMap<Byte,TabletServerState>(TabletServerState.values().length);
-    badStates = new HashSet<TabletServerState>();
-    for (TabletServerState state : TabletServerState.values()) {
-      mapping.put(state.id, state);
-      if (state.id > 99)
-        badStates.add(state);
-    }
-  }
-  
-  private TabletServerState(byte id) {
-    this.id = id;
-  }
-  
-  public byte getId() {
-    return this.id;
-  }
-  
-  public static TabletServerState getStateById(byte id) {
-    if (mapping.containsKey(id))
-      return mapping.get(id);
-    throw new IndexOutOfBoundsException("No such state");
-  }
-  
-  public static Set<TabletServerState> getBadStates() {
-    return Collections.unmodifiableSet(badStates);
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletState.java
deleted file mode 100644
index d69ca19..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletState.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.state;
-
-public enum TabletState {
-  UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
deleted file mode 100644
index ddcdeea..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.master.state;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.SkippingIterator;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.util.AddressUtil;
-import org.apache.accumulo.core.util.StringUtil;
-import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Text;
-
-public class TabletStateChangeIterator extends SkippingIterator {
-  
-  private static final String SERVERS_OPTION = "servers";
-  private static final String TABLES_OPTION = "tables";
-  private static final String MERGES_OPTION = "merges";
-  // private static final Logger log = Logger.getLogger(TabletStateChangeIterator.class);
-  
-  Set<TServerInstance> current;
-  Set<String> onlineTables;
-  Map<Text,MergeInfo> merges;
-  
-  @Override
-  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
-    super.init(source, options, env);
-    current = parseServers(options.get(SERVERS_OPTION));
-    onlineTables = parseTables(options.get(TABLES_OPTION));
-    merges = parseMerges(options.get(MERGES_OPTION));
-  }
-  
-  private Set<String> parseTables(String tables) {
-    if (tables == null)
-      return null;
-    Set<String> result = new HashSet<String>();
-    for (String table : tables.split(","))
-      result.add(table);
-    return result;
-  }
-  
-  private Set<TServerInstance> parseServers(String servers) {
-    if (servers == null)
-      return null;
-    // parse "host:port[INSTANCE]"
-    Set<TServerInstance> result = new HashSet<TServerInstance>();
-    if (servers.length() > 0) {
-      for (String part : servers.split(",")) {
-        String parts[] = part.split("\\[", 2);
-        String hostport = parts[0];
-        String instance = parts[1];
-        if (instance != null && instance.endsWith("]"))
-          instance = instance.substring(0, instance.length() - 1);
-        result.add(new TServerInstance(AddressUtil.parseAddress(hostport), instance));
-      }
-    }
-    return result;
-  }
-  
-  private Map<Text,MergeInfo> parseMerges(String merges) {
-    if (merges == null)
-      return null;
-    try {
-      Map<Text,MergeInfo> result = new HashMap<Text,MergeInfo>();
-      DataInputBuffer buffer = new DataInputBuffer();
-      byte[] data = Base64.decodeBase64(merges.getBytes());
-      buffer.reset(data, data.length);
-      while (buffer.available() > 0) {
-        MergeInfo mergeInfo = new MergeInfo();
-        mergeInfo.readFields(buffer);
-        result.put(mergeInfo.extent.getTableId(), mergeInfo);
-      }
-      return result;
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-  }
-  
-  @Override
-  protected void consume() throws IOException {
-    while (getSource().hasTop()) {
-      Key k = getSource().getTopKey();
-      Value v = getSource().getTopValue();
-      
-      if (onlineTables == null || current == null)
-        return;
-      
-      TabletLocationState tls;
-      try {
-        tls = MetaDataTableScanner.createTabletLocationState(k, v);
-        if (tls == null)
-          return;
-      } catch (BadLocationStateException e) {
-        // maybe the master can do something with a tablet with bad/inconsistent state
-        return;
-      }
-      // we always want data about merges
-      MergeInfo merge = merges.get(tls.extent.getTableId());
-      if (merge != null && merge.getExtent() != null && merge.getExtent().overlaps(tls.extent)) {
-        return;
-      }
-      // is the table supposed to be online or offline?
-      boolean shouldBeOnline = onlineTables.contains(tls.extent.getTableId().toString());
-      
-      switch (tls.getState(current)) {
-        case ASSIGNED:
-          // we always want data about assigned tablets
-          return;
-        case HOSTED:
-          if (!shouldBeOnline)
-            return;
-        case ASSIGNED_TO_DEAD_SERVER:
-          return;
-        case UNASSIGNED:
-          if (shouldBeOnline)
-            return;
-      }
-      // table is in the expected state so don't bother returning any information about it
-      getSource().next();
-    }
-  }
-  
-  @Override
-  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
-    throw new UnsupportedOperationException();
-  }
-  
-  public static void setCurrentServers(IteratorSetting cfg, Set<TServerInstance> goodServers) {
-    if (goodServers != null) {
-      List<String> servers = new ArrayList<String>();
-      for (TServerInstance server : goodServers)
-        servers.add(server.toString());
-      cfg.addOption(SERVERS_OPTION, StringUtil.join(servers, ","));
-    }
-  }
-  
-  public static void setOnlineTables(IteratorSetting cfg, Set<String> onlineTables) {
-    if (onlineTables != null)
-      cfg.addOption(TABLES_OPTION, StringUtil.join(onlineTables, ","));
-  }
-  
-  public static void setMerges(IteratorSetting cfg, Collection<MergeInfo> merges) {
-    DataOutputBuffer buffer = new DataOutputBuffer();
-    try {
-      for (MergeInfo info : merges) {
-        KeyExtent extent = info.getExtent();
-        if (extent != null && !info.getState().equals(MergeState.NONE)) {
-          info.write(buffer);
-        }
-      }
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-    String encoded = new String(Base64.encodeBase64(Arrays.copyOf(buffer.getData(), buffer.getLength())));
-    cfg.addOption(MERGES_OPTION, encoded);
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/base/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java b/server/base/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java
new file mode 100644
index 0000000..c7cfc59
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.client.admin.TimeType;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.server.util.time.RelativeTime;
+
+public abstract class TabletTime {
+  public static final char LOGICAL_TIME_ID = 'L';
+  public static final char MILLIS_TIME_ID = 'M';
+  
+  public static char getTimeID(TimeType timeType) {
+    switch (timeType) {
+      case LOGICAL:
+        return LOGICAL_TIME_ID;
+      case MILLIS:
+        return MILLIS_TIME_ID;
+    }
+    
+    throw new IllegalArgumentException("Unknown time type " + timeType);
+  }
+  
+  abstract void useMaxTimeFromWALog(long time);
+  
+  abstract String getMetadataValue(long time);
+  
+  abstract String getMetadataValue();
+  
+  // abstract long setUpdateTimes(Mutation mutation);
+  abstract long setUpdateTimes(List<Mutation> mutations);
+  
+  abstract long getTime();
+  
+  abstract long getAndUpdateTime();
+  
+  protected void setSystemTimes(Mutation mutation, long lastCommitTime) {
+    ServerMutation m = (ServerMutation)mutation;
+    m.setSystemTimestamp(lastCommitTime);
+  }
+  
+  static TabletTime getInstance(String metadataValue) {
+    if (metadataValue.charAt(0) == LOGICAL_TIME_ID) {
+      return new LogicalTime(Long.parseLong(metadataValue.substring(1)));
+    } else if (metadataValue.charAt(0) == MILLIS_TIME_ID) {
+      return new MillisTime(Long.parseLong(metadataValue.substring(1)));
+    }
+    
+    throw new IllegalArgumentException("Time type unknown : " + metadataValue);
+    
+  }
+  
+  public static String maxMetadataTime(String mv1, String mv2) {
+    if (mv1 == null) {
+      checkType(mv2);
+      return mv2;
+    }
+    
+    if (mv2 == null) {
+      checkType(mv1);
+      return mv1;
+    }
+    
+    if (mv1.charAt(0) != mv2.charAt(0)) throw new IllegalArgumentException("Time types differ " + mv1 + " " + mv2);
+    checkType(mv1);
+    
+    long t1 = Long.parseLong(mv1.substring(1));
+    long t2 = Long.parseLong(mv2.substring(1));
+    
+    if (t1 < t2) return mv2;
+    else return mv1;
+    
+  }
+  
+  private static void checkType(String mv1) {
+    if (mv1.charAt(0) != LOGICAL_TIME_ID && mv1.charAt(0) != MILLIS_TIME_ID) throw new IllegalArgumentException("Invalid time type " + mv1);
+  }
+  
+  static class MillisTime extends TabletTime {
+    
+    private long lastTime;
+    private long lastUpdateTime = 0;
+    
+    public MillisTime(long time) {
+      this.lastTime = time;
+    }
+    
+    @Override
+    String getMetadataValue(long time) {
+      return MILLIS_TIME_ID + "" + time;
+    }
+    
+    @Override
+    public String getMetadataValue() {
+      return getMetadataValue(lastTime);
+    }
+    
+    @Override
+    void useMaxTimeFromWALog(long time) {
+      if (time > lastTime)
+        lastTime = time;
+    }
+    
+    @Override
+    long setUpdateTimes(List<Mutation> mutations) {
+      
+      long currTime = RelativeTime.currentTimeMillis();
+      
+      synchronized (this) {
+        if (mutations.size() == 0)
+          return lastTime;
+        
+        currTime = updateTime(currTime);
+      }
+      
+      for (Mutation mutation : mutations)
+        setSystemTimes(mutation, currTime);
+      
+      return currTime;
+    }
+    
+    private long updateTime(long currTime) {
+      if (currTime < lastTime) {
+        if (currTime - lastUpdateTime > 0) {
+          // not in same millisecond as last call
+          // to this method so move ahead slowly
+          lastTime++;
+        }
+        
+        lastUpdateTime = currTime;
+        
+        currTime = lastTime;
+      } else {
+        lastTime = currTime;
+      }
+      return currTime;
+    }
+    
+    @Override
+    long getTime() {
+      return lastTime;
+    }
+    
+    @Override
+    long getAndUpdateTime() {
+      long currTime = RelativeTime.currentTimeMillis();
+      
+      synchronized (this) {
+        currTime = updateTime(currTime);
+      }
+      
+      return currTime;
+    }
+    
+  }
+  
+  static class LogicalTime extends TabletTime {
+    AtomicLong nextTime;
+    
+    private LogicalTime(Long time) {
+      this.nextTime = new AtomicLong(time.longValue() + 1);
+    }
+    
+    @Override
+    void useMaxTimeFromWALog(long time) {
+      time++;
+      
+      if (this.nextTime.get() < time) {
+        this.nextTime.set(time);
+      }
+    }
+    
+    @Override
+    public String getMetadataValue() {
+      return getMetadataValue(getTime());
+    }
+    
+    @Override
+    public String getMetadataValue(long time) {
+      return LOGICAL_TIME_ID + "" + time;
+    }
+    
+    @Override
+    long setUpdateTimes(List<Mutation> mutations) {
+      if (mutations.size() == 0)
+        return getTime();
+      
+      long time = nextTime.getAndAdd(mutations.size());
+      for (Mutation mutation : mutations)
+        setSystemTimes(mutation, time++);
+      
+      return time - 1;
+    }
+    
+    @Override
+    long getTime() {
+      return nextTime.get() - 1;
+    }
+    
+    @Override
+    long getAndUpdateTime() {
+      return nextTime.getAndIncrement();
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/base/src/main/java/org/apache/accumulo/server/tabletserver/UniqueNameAllocator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/tabletserver/UniqueNameAllocator.java b/server/base/src/main/java/org/apache/accumulo/server/tabletserver/UniqueNameAllocator.java
new file mode 100644
index 0000000..9629948
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/tabletserver/UniqueNameAllocator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.util.Random;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.util.FastFormat;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+
+/**
+ * Allocates unique names for an accumulo instance. The names are unique for the lifetime of the instance.
+ * 
+ * This is useful for filenames because it makes caching easy.
+ * 
+ */
+
+public class UniqueNameAllocator {
+  private long next = 0;
+  private long maxAllocated = 0;
+  private String nextNamePath;
+  private Random rand;
+  
+  private UniqueNameAllocator() {
+    nextNamePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZNEXT_FILE;
+    rand = new Random();
+  }
+  
+  public synchronized String getNextName() {
+    
+    while (next >= maxAllocated) {
+      final int allocate = 100 + rand.nextInt(100);
+      
+      try {
+        byte[] max = ZooReaderWriter.getRetryingInstance().mutate(nextNamePath, null, ZooUtil.PRIVATE, new ZooReaderWriter.Mutator() {
+          public byte[] mutate(byte[] currentValue) throws Exception {
+            long l = Long.parseLong(new String(currentValue), Character.MAX_RADIX);
+            l += allocate;
+            return Long.toString(l, Character.MAX_RADIX).getBytes();
+          }
+        });
+        
+        maxAllocated = Long.parseLong(new String(max), Character.MAX_RADIX);
+        next = maxAllocated - allocate;
+        
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    
+    return new String(FastFormat.toZeroPaddedString(next++, 7, Character.MAX_RADIX, new byte[0]));
+  }
+  
+  private static UniqueNameAllocator instance = null;
+  
+  public static synchronized UniqueNameAllocator getInstance() {
+    if (instance == null)
+      instance = new UniqueNameAllocator();
+    
+    return instance;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/base/src/main/java/org/apache/accumulo/server/util/TableInfoUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TableInfoUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/TableInfoUtil.java
new file mode 100644
index 0000000..cc91ef3
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TableInfoUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.master.thrift.Compacting;
+import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+
+/**
+ * 
+ */
+public class TableInfoUtil {
+  
+  public static void add(TableInfo total, TableInfo more) {
+    if (total.minors == null)
+      total.minors = new Compacting();
+    if (total.majors == null)
+      total.majors = new Compacting();
+    if (total.scans == null)
+      total.scans = new Compacting();
+    if (more.minors != null) {
+      total.minors.running += more.minors.running;
+      total.minors.queued += more.minors.queued;
+    }
+    if (more.majors != null) {
+      total.majors.running += more.majors.running;
+      total.majors.queued += more.majors.queued;
+    }
+    if (more.scans != null) {
+      total.scans.running += more.scans.running;
+      total.scans.queued += more.scans.queued;
+    }
+    total.onlineTablets += more.onlineTablets;
+    total.recs += more.recs;
+    total.recsInMemory += more.recsInMemory;
+    total.tablets += more.tablets;
+    total.ingestRate += more.ingestRate;
+    total.ingestByteRate += more.ingestByteRate;
+    total.queryRate += more.queryRate;
+    total.queryByteRate += more.queryByteRate;
+    total.scanRate += more.scanRate;
+  }
+  
+  public static TableInfo summarizeTableStats(TabletServerStatus status) {
+    TableInfo summary = new TableInfo();
+    summary.majors = new Compacting();
+    summary.minors = new Compacting();
+    summary.scans = new Compacting();
+    for (TableInfo rates : status.tableMap.values()) {
+      TableInfoUtil.add(summary, rates);
+    }
+    return summary;
+  }
+  
+  public static Map<String,Double> summarizeTableStats(MasterMonitorInfo mmi) {
+    Map<String,Double> compactingByTable = new HashMap<String,Double>();
+    if (mmi != null && mmi.tServerInfo != null) {
+      for (TabletServerStatus status : mmi.tServerInfo) {
+        if (status != null && status.tableMap != null) {
+          for (String table : status.tableMap.keySet()) {
+            Double holdTime = compactingByTable.get(table);
+            compactingByTable.put(table, Math.max(holdTime == null ? 0. : holdTime.doubleValue(), status.holdTime));
+          }
+        }
+      }
+    }
+    return compactingByTable;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java b/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
new file mode 100644
index 0000000..8dac384
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.tabletserver.UniqueNameAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class TabletOperations {
+  
+  private static final Logger log = Logger.getLogger(TabletOperations.class);
+  
+  public static String createTabletDirectory(VolumeManager fs, String tableId, Text endRow) {
+    String lowDirectory;
+    
+    UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
+    String volume = fs.choose(ServerConstants.getTablesDirs());
+    
+    while (true) {
+      try {
+        if (endRow == null) {
+          lowDirectory = Constants.DEFAULT_TABLET_LOCATION;
+          Path lowDirectoryPath = new Path(volume + "/" + tableId + "/" + lowDirectory);
+          if (fs.exists(lowDirectoryPath) || fs.mkdirs(lowDirectoryPath))
+            return lowDirectoryPath.makeQualified(fs.getFileSystemByPath(lowDirectoryPath)).toString();
+          log.warn("Failed to create " + lowDirectoryPath + " for unknown reason");
+        } else {
+          lowDirectory = "/" + Constants.GENERATED_TABLET_DIRECTORY_PREFIX + namer.getNextName();
+          Path lowDirectoryPath = new Path(volume + "/" + tableId + "/" +  lowDirectory);
+          if (fs.exists(lowDirectoryPath))
+            throw new IllegalStateException("Dir exist when it should not " + lowDirectoryPath);
+          if (fs.mkdirs(lowDirectoryPath))
+            return lowDirectoryPath.makeQualified(fs.getFileSystemByPath(lowDirectoryPath)).toString();
+        }
+      } catch (IOException e) {
+        log.warn(e);
+      }
+      
+      log.warn("Failed to create dir for tablet in table " + tableId + " in volume " + volume + " + will retry ...");
+      UtilWaitThread.sleep(3000);
+      
+    }
+  }
+  
+  public static String createTabletDirectory(String tableDir, Text endRow) {
+    while (true) {
+      try {
+        VolumeManager fs = VolumeManagerImpl.get();
+        return createTabletDirectory(fs, tableDir, endRow);
+      } catch (IOException e) {
+        log.warn(e);
+      }
+      UtilWaitThread.sleep(3000);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/pom.xml
----------------------------------------------------------------------
diff --git a/server/master/pom.xml b/server/master/pom.xml
new file mode 100644
index 0000000..343b888
--- /dev/null
+++ b/server/master/pom.xml
@@ -0,0 +1,130 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.accumulo</groupId>
+    <artifactId>accumulo-server-parent</artifactId>
+    <version>1.6.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>accumulo-master</artifactId>
+  <name>Master Server</name>
+  <dependencies>
+    <dependency>
+      <groupId>com.beust</groupId>
+      <artifactId>jcommander</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>jline</groupId>
+      <artifactId>jline</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-fate</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-server-base</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-start</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-trace</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-collections</groupId>
+      <artifactId>commons-collections</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-configuration</groupId>
+      <artifactId>commons-configuration</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>javax.servlet</groupId>
+      <artifactId>servlet-api</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/EventCoordinator.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/EventCoordinator.java b/server/master/src/main/java/org/apache/accumulo/master/EventCoordinator.java
new file mode 100644
index 0000000..e2f32c4
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/EventCoordinator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master;
+
+import org.apache.log4j.Logger;
+
+public class EventCoordinator {
+  
+  private static final Logger log = Logger.getLogger(EventCoordinator.class);
+  long eventCounter = 0;
+  
+  synchronized long waitForEvents(long millis, long lastEvent) {
+    // Did something happen since the last time we waited?
+    if (lastEvent == eventCounter) {
+      // no
+      if (millis <= 0)
+        return eventCounter;
+      try {
+        wait(millis);
+      } catch (InterruptedException e) {
+        log.debug("ignoring InterruptedException", e);
+      }
+    }
+    return eventCounter;
+  }
+  
+  synchronized public void event(String msg, Object... args) {
+    log.info(String.format(msg, args));
+    eventCounter++;
+    notifyAll();
+  }
+  
+  public Listener getListener() {
+    return new Listener();
+  }
+  
+  public class Listener {
+    long lastEvent;
+    
+    Listener() {
+      lastEvent = eventCounter;
+    }
+    
+    public void waitForEvents(long millis) {
+      lastEvent = EventCoordinator.this.waitForEvents(millis, lastEvent);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/88079cc3/server/master/src/main/java/org/apache/accumulo/master/LiveTServerSet.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/LiveTServerSet.java b/server/master/src/main/java/org/apache/accumulo/master/LiveTServerSet.java
new file mode 100644
index 0000000..59ab8c8
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/LiveTServerSet.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master;
+
+import static org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy.SKIP;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.ServerServices;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.master.state.TServerInstance;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.util.Halt;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransport;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NotEmptyException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+public class LiveTServerSet implements Watcher {
+  
+  public interface Listener {
+    void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added);
+  }
+  
+  private static final Logger log = Logger.getLogger(LiveTServerSet.class);
+  
+  private final Listener cback;
+  private final Instance instance;
+  private final AccumuloConfiguration conf;
+  private ZooCache zooCache;
+  
+  public class TServerConnection {
+    private final InetSocketAddress address;
+    
+    public TServerConnection(InetSocketAddress addr) throws TException {
+      address = addr;
+    }
+    
+    private String lockString(ZooLock mlock) {
+      return mlock.getLockID().serialize(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK);
+    }
+    
+    public void assignTablet(ZooLock lock, KeyExtent extent) throws TException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        client.loadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift());
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+    public void unloadTablet(ZooLock lock, KeyExtent extent, boolean save) throws TException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        client.unloadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift(), save);
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+    public TabletServerStatus getTableMap(boolean usePooledConnection) throws TException, ThriftSecurityException {
+      
+      if (usePooledConnection == true)
+        throw new UnsupportedOperationException();
+      
+      TTransport transport = ThriftUtil.createTransport(address, conf);
+      
+      try {
+        TabletClientService.Client client = ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport);
+        return client.getTabletServerStatus(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance));
+      } finally {
+        if (transport != null)
+          transport.close();
+      }
+    }
+    
+    public void halt(ZooLock lock) throws TException, ThriftSecurityException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        client.halt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock));
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+    public void fastHalt(ZooLock lock) throws TException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        client.fastHalt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock));
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+    public void flush(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        client.flush(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId,
+            startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow));
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+    public void chop(ZooLock lock, KeyExtent extent) throws TException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        client.chop(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift());
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+    public void splitTablet(ZooLock lock, KeyExtent extent, Text splitPoint) throws TException, ThriftSecurityException, NotServingTabletException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        client.splitTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), extent.toThrift(),
+            ByteBuffer.wrap(splitPoint.getBytes(), 0, splitPoint.getLength()));
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+    public void flushTablet(ZooLock lock, KeyExtent extent) throws TException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        client.flushTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift());
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+    public void compact(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        client.compact(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId,
+            startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow));
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+    public boolean isActive(long tid) throws TException {
+      TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      try {
+        return client.isActive(Tracer.traceInfo(), tid);
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    }
+    
+  }
+  
+  static class TServerInfo {
+    TServerConnection connection;
+    TServerInstance instance;
+    
+    TServerInfo(TServerInstance instance, TServerConnection connection) {
+      this.connection = connection;
+      this.instance = instance;
+    }
+  };
+  
+  // The set of active tservers with locks, indexed by their name in zookeeper
+  private Map<String,TServerInfo> current = new HashMap<String,TServerInfo>();
+  // as above, indexed by TServerInstance
+  private Map<TServerInstance,TServerInfo> currentInstances = new HashMap<TServerInstance,TServerInfo>();
+  
+  // The set of entries in zookeeper without locks, and the first time each was noticed
+  private Map<String,Long> locklessServers = new HashMap<String,Long>();
+  
+  public LiveTServerSet(Instance instance, AccumuloConfiguration conf, Listener cback) {
+    this.cback = cback;
+    this.instance = instance;
+    this.conf = conf;
+    
+  }
+  
+  public synchronized ZooCache getZooCache() {
+    if (zooCache == null)
+      zooCache = new ZooCache(this);
+    return zooCache;
+  }
+  
+  public synchronized void startListeningForTabletServerChanges() {
+    scanServers();
+    SimpleTimer.getInstance().schedule(new Runnable() {
+      @Override
+      public void run() {
+        scanServers();
+      }
+    }, 0, 5000);
+  }
+  
+  public synchronized void scanServers() {
+    try {
+      final Set<TServerInstance> updates = new HashSet<TServerInstance>();
+      final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
+      
+      final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
+      
+      HashSet<String> all = new HashSet<String>(current.keySet());
+      all.addAll(getZooCache().getChildren(path));
+      
+      locklessServers.keySet().retainAll(all);
+      
+      for (String zPath : all) {
+        checkServer(updates, doomed, path, zPath);
+      }
+      
+      // log.debug("Current: " + current.keySet());
+      if (!doomed.isEmpty() || !updates.isEmpty())
+        this.cback.update(this, doomed, updates);
+    } catch (Exception ex) {
+      log.error(ex, ex);
+    }
+  }
+  
+  private void deleteServerNode(String serverNode) throws InterruptedException, KeeperException {
+    try {
+      ZooReaderWriter.getInstance().delete(serverNode, -1);
+    } catch (NotEmptyException ex) {
+      // race condition: tserver created the lock after our last check; we'll see it at the next check
+    } catch (NoNodeException nne) {
+      // someone else deleted it
+    }
+  }
+  
+  private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance> doomed, final String path, final String zPath)
+      throws TException, InterruptedException, KeeperException {
+    
+    TServerInfo info = current.get(zPath);
+    
+    final String lockPath = path + "/" + zPath;
+    Stat stat = new Stat();
+    byte[] lockData = ZooLock.getLockData(getZooCache(), lockPath, stat);
+    
+    if (lockData == null) {
+      if (info != null) {
+        doomed.add(info.instance);
+        current.remove(zPath);
+        currentInstances.remove(info.instance);
+      }
+      
+      Long firstSeen = locklessServers.get(zPath);
+      if (firstSeen == null) {
+        locklessServers.put(zPath, System.currentTimeMillis());
+      } else if (System.currentTimeMillis() - firstSeen > 10 * 60 * 1000) {
+        deleteServerNode(path + "/" + zPath);
+        locklessServers.remove(zPath);
+      }
+    } else {
+      locklessServers.remove(zPath);
+      ServerServices services = new ServerServices(new String(lockData));
+      InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
+      TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner());
+      
+      if (info == null) {
+        updates.add(instance);
+        TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client));
+        current.put(zPath, tServerInfo);
+        currentInstances.put(instance, tServerInfo);
+      } else if (!info.instance.equals(instance)) {
+        doomed.add(info.instance);
+        updates.add(instance);
+        TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client));
+        current.put(zPath, tServerInfo);
+        currentInstances.put(info.instance, tServerInfo);
+      }
+    }
+  }
+  
+  @Override
+  public void process(WatchedEvent event) {
+    
+    // its important that these event are propagated by ZooCache, because this ensures when reading zoocache that is has already processed the event and cleared
+    // relevant nodes before code below reads from zoocache
+    
+    if (event.getPath() != null) {
+      if (event.getPath().endsWith(Constants.ZTSERVERS)) {
+        scanServers();
+      } else if (event.getPath().contains(Constants.ZTSERVERS)) {
+        int pos = event.getPath().lastIndexOf('/');
+        
+        // do only if ZTSERVER is parent
+        if (pos >= 0 && event.getPath().substring(0, pos).endsWith(Constants.ZTSERVERS)) {
+          
+          String server = event.getPath().substring(pos + 1);
+          
+          final Set<TServerInstance> updates = new HashSet<TServerInstance>();
+          final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
+          
+          final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
+          
+          try {
+            checkServer(updates, doomed, path, server);
+            if (!doomed.isEmpty() || !updates.isEmpty())
+              this.cback.update(this, doomed, updates);
+          } catch (Exception ex) {
+            log.error(ex, ex);
+          }
+        }
+      }
+    }
+  }
+  
+  public synchronized TServerConnection getConnection(TServerInstance server) throws TException {
+    if (server == null)
+      return null;
+    TServerInfo tServerInfo = currentInstances.get(server);
+    if (tServerInfo == null)
+      return null;
+    return tServerInfo.connection;
+  }
+  
+  public synchronized Set<TServerInstance> getCurrentServers() {
+    return new HashSet<TServerInstance>(currentInstances.keySet());
+  }
+  
+  public synchronized int size() {
+    return current.size();
+  }
+  
+  public synchronized TServerInstance find(String tabletServer) {
+    InetSocketAddress addr = AddressUtil.parseAddress(tabletServer);
+    for (Entry<String,TServerInfo> entry : current.entrySet()) {
+      if (entry.getValue().instance.getLocation().equals(addr))
+        return entry.getValue().instance;
+    }
+    return null;
+  }
+  
+  public synchronized void remove(TServerInstance server) {
+    String zPath = null;
+    for (Entry<String,TServerInfo> entry : current.entrySet()) {
+      if (entry.getValue().instance.equals(server)) {
+        zPath = entry.getKey();
+        break;
+      }
+    }
+    if (zPath == null)
+      return;
+    current.remove(zPath);
+    currentInstances.remove(server);
+    
+    log.info("Removing zookeeper lock for " + server);
+    String fullpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + zPath;
+    try {
+      ZooReaderWriter.getRetryingInstance().recursiveDelete(fullpath, SKIP);
+    } catch (Exception e) {
+      String msg = "error removing tablet server lock";
+      log.fatal(msg, e);
+      Halt.halt(msg, -1);
+    }
+    getZooCache().clear(fullpath);
+  }
+}


Mime
View raw message