accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [07/61] [abbrv] [partial] accumulo git commit: ACCUMULO-722 put trunk in my sandbox
Date Thu, 03 Mar 2016 21:59:32 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/trace/TraceFormatter.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/trace/TraceFormatter.java b/1.5/core/src/main/java/org/apache/accumulo/core/trace/TraceFormatter.java
new file mode 100644
index 0000000..bbddadb
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/trace/TraceFormatter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.trace;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.cloudtrace.thrift.RemoteSpan;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.format.DefaultFormatter;
+import org.apache.accumulo.core.util.format.Formatter;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.transport.TMemoryInputTransport;
+
+
+/**
+ * A formatter than can be used in the shell to display trace information.
+ * 
+ */
+public class TraceFormatter implements Formatter {
+  public static final String DATE_FORMAT = "yyyy/MM/dd HH:mm:ss.SSS";
+  // ugh... SimpleDataFormat is not thread safe
+  private static final ThreadLocal<SimpleDateFormat> formatter = new ThreadLocal<SimpleDateFormat>() {
+    @Override
+    protected SimpleDateFormat initialValue() {
+      return new SimpleDateFormat(DATE_FORMAT);
+    }
+  };
+  
+  public static String formatDate(final Date date) {
+    return formatter.get().format(date);
+  }
+  
+  private final static Text SPAN_CF = new Text("span");
+  
+  private Iterator<Entry<Key,Value>> scanner;
+  private boolean printTimeStamps;
+  
+  public static RemoteSpan getRemoteSpan(Entry<Key,Value> entry) {
+    TMemoryInputTransport transport = new TMemoryInputTransport(entry.getValue().get());
+    TCompactProtocol protocol = new TCompactProtocol(transport);
+    RemoteSpan span = new RemoteSpan();
+    try {
+      span.read(protocol);
+    } catch (TException ex) {
+      throw new RuntimeException(ex);
+    }
+    return span;
+  }
+  
+  @Override
+  public boolean hasNext() {
+    return scanner.hasNext();
+  }
+  
+  @Override
+  public String next() {
+    Entry<Key,Value> next = scanner.next();
+    if (next.getKey().getColumnFamily().equals(SPAN_CF)) {
+      StringBuilder result = new StringBuilder();
+      SimpleDateFormat dateFormatter = new SimpleDateFormat(DATE_FORMAT);
+      RemoteSpan span = getRemoteSpan(next);
+      result.append("----------------------\n");
+      result.append(String.format(" %12s:%s\n", "name", span.description));
+      result.append(String.format(" %12s:%s\n", "trace", Long.toHexString(span.traceId)));
+      result.append(String.format(" %12s:%s\n", "loc", span.svc + "@" + span.sender));
+      result.append(String.format(" %12s:%s\n", "span", Long.toHexString(span.spanId)));
+      result.append(String.format(" %12s:%s\n", "parent", Long.toHexString(span.parentId)));
+      result.append(String.format(" %12s:%s\n", "start", dateFormatter.format(span.start)));
+      result.append(String.format(" %12s:%s\n", "ms", span.stop - span.start));
+      for (Entry<String,String> entry : span.data.entrySet()) {
+        result.append(String.format(" %12s:%s\n", entry.getKey(), entry.getValue()));
+      }
+      
+      if (printTimeStamps) {
+        result.append(String.format(" %-12s:%d\n", "timestamp", next.getKey().getTimestamp()));
+      }
+      return result.toString();
+    }
+    return DefaultFormatter.formatEntry(next, printTimeStamps);
+  }
+  
+  @Override
+  public void remove() {
+    throw new NotImplementedException();
+  }
+  
+  @Override
+  public void initialize(Iterable<Entry<Key,Value>> scanner, boolean printTimestamps) {
+    this.scanner = scanner.iterator();
+    this.printTimeStamps = printTimestamps;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java b/1.5/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java
new file mode 100644
index 0000000..4b60b97
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.trace;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.accumulo.cloudtrace.instrument.receivers.SendSpansViaThrift;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+
+/**
+ * Find a Span collector via zookeeper and push spans there via Thrift RPC
+ * 
+ */
+public class ZooTraceClient extends SendSpansViaThrift implements Watcher {
+  
+  private static final Logger log = Logger.getLogger(ZooTraceClient.class);
+  
+  final ZooReader zoo;
+  final String path;
+  final Random random = new Random();
+  final List<String> hosts = new ArrayList<String>();
+  
+  public ZooTraceClient(ZooReader zoo, String path, String host, String service, long millis) throws IOException, KeeperException, InterruptedException {
+    super(host, service, millis);
+    this.path = path;
+    this.zoo = zoo;
+    updateHosts(path, zoo.getChildren(path, this));
+  }
+  
+  @Override
+  synchronized protected String getSpanKey(Map<String,String> data) {
+    if (hosts.size() > 0) {
+      return hosts.get(random.nextInt(hosts.size()));
+    }
+    return null;
+  }
+  
+  @Override
+  public void process(WatchedEvent event) {
+    try {
+      updateHosts(path, zoo.getChildren(path, null));
+    } catch (Exception ex) {
+      log.error("unable to get destination hosts in zookeeper", ex);
+    }
+  }
+  
+  synchronized private void updateHosts(String path, List<String> children) {
+    log.debug("Scanning trace hosts in zookeeper: " + path);
+    try {
+      List<String> hosts = new ArrayList<String>();
+      for (String child : children) {
+        byte[] data = zoo.getData(path + "/" + child, null);
+        hosts.add(new String(data));
+      }
+      this.hosts.clear();
+      this.hosts.addAll(hosts);
+      log.debug("Trace hosts: " + this.hosts);
+    } catch (Exception ex) {
+      log.error("unable to get destination hosts in zookeeper", ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
new file mode 100644
index 0000000..08ae106
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.transport.TSocket;
+
+public class AddressUtil {
+  static public InetSocketAddress parseAddress(String address, int defaultPort) throws NumberFormatException {
+    String[] parts = address.split(":", 2);
+    if (address.contains("+"))
+      parts = address.split("\\+", 2);
+    if (parts.length == 2) {
+      if (parts[1].isEmpty())
+        return new InetSocketAddress(parts[0], defaultPort);
+      return new InetSocketAddress(parts[0], Integer.parseInt(parts[1]));
+    }
+    return new InetSocketAddress(address, defaultPort);
+  }
+  
+  static public InetSocketAddress parseAddress(Text address, int defaultPort) {
+    return parseAddress(address.toString(), defaultPort);
+  }
+  
+  static public TSocket createTSocket(String address, int defaultPort) {
+    InetSocketAddress addr = parseAddress(address, defaultPort);
+    return new TSocket(addr.getHostName(), addr.getPort());
+  }
+  
+  static public String toString(InetSocketAddress addr) {
+    return addr.getAddress().getHostAddress() + ":" + addr.getPort();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/ArgumentChecker.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/ArgumentChecker.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/ArgumentChecker.java
new file mode 100644
index 0000000..0c8ba07
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/ArgumentChecker.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+/**
+ * This class provides methods to check arguments of a variable number for null values, or anything else that might be required on a routine basis. These
+ * methods should be used for early failures as close to the end user as possible, so things do not fail later on the server side, when they are harder to
+ * debug.
+ * 
+ * Methods are created for a specific number of arguments, due to the poor performance of array allocation for varargs methods.
+ */
+public class ArgumentChecker {
+  private static final String NULL_ARG_MSG = "argument was null";
+  
+  public static final void notNull(final Object arg1) {
+    if (arg1 == null)
+      throw new IllegalArgumentException(NULL_ARG_MSG + ":Is null- arg1? " + (arg1 == null));
+  }
+  
+  public static final void notNull(final Object arg1, final Object arg2) {
+    if (arg1 == null || arg2 == null)
+      throw new IllegalArgumentException(NULL_ARG_MSG + ":Is null- arg1? " + (arg1 == null) + " arg2? " + (arg2 == null));
+  }
+  
+  public static final void notNull(final Object arg1, final Object arg2, final Object arg3) {
+    if (arg1 == null || arg2 == null || arg3 == null)
+      throw new IllegalArgumentException(NULL_ARG_MSG + ":Is null- arg1? " + (arg1 == null) + " arg2? " + (arg2 == null) + " arg3? " + (arg3 == null));
+  }
+  
+  public static final void notNull(final Object arg1, final Object arg2, final Object arg3, final Object arg4) {
+    if (arg1 == null || arg2 == null || arg3 == null || arg4 == null)
+      throw new IllegalArgumentException(NULL_ARG_MSG + ":Is null- arg1? " + (arg1 == null) + " arg2? " + (arg2 == null) + " arg3? " + (arg3 == null)
+          + " arg4? " + (arg4 == null));
+  }
+  
+  public static final void notNull(final Object[] args) {
+    if (args == null)
+      throw new IllegalArgumentException(NULL_ARG_MSG + ":arg array is null");
+    
+    for (int i = 0; i < args.length; i++)
+      if (args[i] == null)
+        throw new IllegalArgumentException(NULL_ARG_MSG + ":arg" + i + " is null");
+  }
+  
+  public static final void strictlyPositive(final int i) {
+    if (i <= 0)
+      throw new IllegalArgumentException("integer should be > 0, was " + i);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/BadArgumentException.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/BadArgumentException.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/BadArgumentException.java
new file mode 100644
index 0000000..c2f79ba
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/BadArgumentException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.util.regex.PatternSyntaxException;
+
+public final class BadArgumentException extends PatternSyntaxException {
+  private static final long serialVersionUID = 1L;
+  
+  public BadArgumentException(String desc, String badarg, int index) {
+    super(desc, badarg, index);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/BinaryTree.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/BinaryTree.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/BinaryTree.java
new file mode 100644
index 0000000..0dbab6d
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/BinaryTree.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+/**
+ * A class for storing data as a binary tree. This class does not implement Collection as several methods such as add are not appropriate.
+ * 
+ * A tree is represented as a node with a parent, a left child, and a right child. If the parent is null, this node represents the root of the tree. A node has
+ * contents of type T.
+ * 
+ */
+
+public class BinaryTree<T> {
+  private BinaryTree<T> parent;
+  private BinaryTree<T> left;
+  private BinaryTree<T> right;
+  
+  T contents;
+  
+  public BinaryTree<T> getLeft() {
+    return left;
+  }
+  
+  public void setLeft(BinaryTree<T> left) {
+    left.setParent(this);
+    this.left = left;
+  }
+  
+  public BinaryTree<T> getParent() {
+    return parent;
+  }
+  
+  public void setParent(BinaryTree<T> parent) {
+    this.parent = parent;
+  }
+  
+  public BinaryTree<T> getRight() {
+    return right;
+  }
+  
+  public void setRight(BinaryTree<T> right) {
+    right.setParent(this);
+    this.right = right;
+  }
+  
+  public T getContents() {
+    return contents;
+  }
+  
+  public void setContents(T contents) {
+    this.contents = contents;
+  }
+  
+  public boolean isEmpty() {
+    if (parent == null && left == null && right == null && contents == null)
+      return true;
+    return false;
+  }
+  
+  @Override
+  public String toString() {
+    String out = "[";
+    if (left != null)
+      out += left.toString();
+    out += contents;
+    if (right != null)
+      out += right.toString();
+    out += "]";
+    return out;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/BulkImportHelper.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/BulkImportHelper.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/BulkImportHelper.java
new file mode 100644
index 0000000..97459ff
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/BulkImportHelper.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+public class BulkImportHelper {
+  public static class AssignmentStats {
+    public AssignmentStats() {}
+    
+    public String toString() {
+      return "Bulk Import no longer provides statistics";
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArrayBackedCharSequence.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArrayBackedCharSequence.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArrayBackedCharSequence.java
new file mode 100644
index 0000000..f64b36c
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArrayBackedCharSequence.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import org.apache.accumulo.core.data.ByteSequence;
+
+public class ByteArrayBackedCharSequence implements CharSequence {
+  
+  private byte[] data;
+  private int offset;
+  private int len;
+  
+  public ByteArrayBackedCharSequence(byte[] data, int offset, int len) {
+    this.data = data;
+    this.offset = offset;
+    this.len = len;
+  }
+  
+  public ByteArrayBackedCharSequence(byte[] data) {
+    this(data, 0, data.length);
+  }
+  
+  public ByteArrayBackedCharSequence() {
+    this(null, 0, 0);
+  }
+  
+  public void set(byte[] data, int offset, int len) {
+    this.data = data;
+    this.offset = offset;
+    this.len = len;
+  }
+  
+  @Override
+  public char charAt(int index) {
+    return (char) (0xff & data[offset + index]);
+  }
+  
+  @Override
+  public int length() {
+    return len;
+  }
+  
+  @Override
+  public CharSequence subSequence(int start, int end) {
+    return new ByteArrayBackedCharSequence(data, offset + start, end - start);
+  }
+  
+  public String toString() {
+    return new String(data, offset, len);
+  }
+  
+  public void set(ByteSequence bs) {
+    set(bs.getBackingArray(), bs.offset(), bs.length());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArrayComparator.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArrayComparator.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArrayComparator.java
new file mode 100644
index 0000000..97b17ee
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArrayComparator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package org.apache.accumulo.core.util;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+public class ByteArrayComparator implements Comparator<byte[]>, Serializable {
+  private static final long serialVersionUID = 1L;
+  
+  @Override
+  public int compare(byte[] o1, byte[] o2) {
+    
+    int minLen = Math.min(o1.length, o2.length);
+    
+    for (int i = 0; i < minLen; i++) {
+      int a = (o1[i] & 0xff);
+      int b = (o2[i] & 0xff);
+      
+      if (a != b) {
+        return a - b;
+      }
+    }
+    
+    return o1.length - o2.length;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArraySet.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArraySet.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArraySet.java
new file mode 100644
index 0000000..68f0ae5
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArraySet.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.TreeSet;
+
+public class ByteArraySet extends TreeSet<byte[]> {
+  
+  private static final long serialVersionUID = 1L;
+  
+  public ByteArraySet() {
+    super(new ByteArrayComparator());
+  }
+  
+  public ByteArraySet(Collection<? extends byte[]> c) {
+    this();
+    addAll(c);
+  }
+  
+  public static ByteArraySet fromStrings(Collection<String> c) {
+    List<byte[]> lst = new ArrayList<byte[]>();
+    for (String s : c)
+      lst.add(s.getBytes());
+    return new ByteArraySet(lst);
+  }
+  
+  public static ByteArraySet fromStrings(String... c) {
+    return ByteArraySet.fromStrings(Arrays.asList(c));
+  }
+  
+  public List<byte[]> toList() {
+    return new ArrayList<byte[]>(this);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java
new file mode 100644
index 0000000..6fe4423
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+
+public class ByteBufferUtil {
+  public static byte[] toBytes(ByteBuffer buffer) {
+    if (buffer == null)
+      return null;
+    return Arrays.copyOfRange(buffer.array(), buffer.position(), buffer.remaining());
+  }
+  
+  public static List<ByteBuffer> toByteBuffers(Collection<byte[]> bytesList) {
+    if (bytesList == null)
+      return null;
+    ArrayList<ByteBuffer> result = new ArrayList<ByteBuffer>();
+    for (byte[] bytes : bytesList) {
+      result.add(ByteBuffer.wrap(bytes));
+    }
+    return result;
+  }
+  
+  public static List<byte[]> toBytesList(Collection<ByteBuffer> bytesList) {
+    if (bytesList == null)
+      return null;
+    ArrayList<byte[]> result = new ArrayList<byte[]>();
+    for (ByteBuffer bytes : bytesList) {
+      result.add(toBytes(bytes));
+    }
+    return result;
+  }
+  
+  public static Text toText(ByteBuffer bytes) {
+    if (bytes == null)
+      return null;
+    Text result = new Text();
+    result.set(bytes.array(), bytes.position(), bytes.remaining());
+    return result;
+  }
+  
+  public static String toString(ByteBuffer bytes) {
+    return new String(bytes.array(), bytes.position(), bytes.remaining());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/CachedConfiguration.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/CachedConfiguration.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/CachedConfiguration.java
new file mode 100644
index 0000000..daf165c
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/CachedConfiguration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class CachedConfiguration {
+  private static Configuration configuration = null;
+  
+  public synchronized static Configuration getInstance() {
+    if (configuration == null)
+      setInstance(new Configuration());
+    return configuration;
+  }
+  
+  public synchronized static Configuration setInstance(Configuration update) {
+    Configuration result = configuration;
+    configuration = update;
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/ColumnFQ.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/ColumnFQ.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/ColumnFQ.java
new file mode 100644
index 0000000..2bafcb1
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/ColumnFQ.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package org.apache.accumulo.core.util;
+
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+public class ColumnFQ implements Comparable<ColumnFQ> {
+  private Text colf;
+  private Text colq;
+  
+  public ColumnFQ(Text colf, Text colq) {
+    if (colf == null || colq == null) {
+      throw new IllegalArgumentException();
+    }
+    
+    this.colf = colf;
+    this.colq = colq;
+  }
+  
+  public ColumnFQ(Key k) {
+    this(k.getColumnFamily(), k.getColumnQualifier());
+  }
+  
+  public ColumnFQ(ColumnUpdate cu) {
+    this(new Text(cu.getColumnFamily()), new Text(cu.getColumnQualifier()));
+  }
+  
+  public Text getColumnQualifier() {
+    return colq;
+  }
+  
+  public Text getColumnFamily() {
+    return colf;
+  }
+  
+  public Column toColumn() {
+    return new Column(TextUtil.getBytes(colf), TextUtil.getBytes(colq), null);
+  }
+  
+  public void fetch(ScannerBase sb) {
+    sb.fetchColumn(colf, colq);
+  }
+  
+  public void put(Mutation m, Value v) {
+    m.put(colf, colq, v);
+  }
+  
+  public void putDelete(Mutation m) {
+    m.putDelete(colf, colq);
+  }
+  
+  /**
+   * @deprecated since 1.5, use {@link #fetch(ScannerBase)} instead
+   */
+  public static void fetch(ScannerBase sb, ColumnFQ cfq) {
+    sb.fetchColumn(cfq.colf, cfq.colq);
+  }
+  
+  /**
+   * @deprecated since 1.5, use {@link #put(Mutation, Value)} instead
+   */
+  public static void put(Mutation m, ColumnFQ cfq, Value v) {
+    m.put(cfq.colf, cfq.colq, v);
+  }
+  
+  /**
+   * @deprecated since 1.5, use {@link #putDelete(Mutation)} instead
+   */
+  public static void putDelete(Mutation m, ColumnFQ cfq) {
+    m.putDelete(cfq.colf, cfq.colq);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof ColumnFQ))
+      return false;
+    if (this == o)
+      return true;
+    ColumnFQ ocfq = (ColumnFQ) o;
+    return ocfq.colf.equals(colf) && ocfq.colq.equals(colq);
+  }
+  
+  @Override
+  public int hashCode() {
+    return colf.hashCode() + colq.hashCode();
+  }
+  
+  public boolean hasColumns(Key key) {
+    return key.compareColumnFamily(colf) == 0 && key.compareColumnQualifier(colq) == 0;
+  }
+  
+  public boolean equals(Text colf, Text colq) {
+    return this.colf.equals(colf) && this.colq.equals(colq);
+  }
+  
+  @Override
+  public int compareTo(ColumnFQ o) {
+    int cmp = colf.compareTo(o.colf);
+    
+    if (cmp == 0)
+      cmp = colq.compareTo(o.colq);
+    
+    return cmp;
+  }
+  
+  @Override
+  public String toString() {
+    return colf + ":" + colq;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
new file mode 100644
index 0000000..5a1c2ef
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.util;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * A factory to allow applications to deal with inconsistencies between MapReduce Context Objects API between hadoop-0.20 and later versions. This code is based
+ * on org.apache.hadoop.mapreduce.ContextFactory in hadoop-mapred-0.22.0.
+ */
+public class ContextFactory {
+  
+  private static final Constructor<?> JOB_CONTEXT_CONSTRUCTOR;
+  private static final Constructor<?> TASK_CONTEXT_CONSTRUCTOR;
+  private static final Constructor<?> TASK_ID_CONSTRUCTOR;
+  private static final Constructor<?> MAP_CONSTRUCTOR;
+  private static final Constructor<?> MAP_CONTEXT_CONSTRUCTOR;
+  private static final Constructor<?> MAP_CONTEXT_IMPL_CONSTRUCTOR;
+  private static final Class<?> TASK_TYPE_CLASS;
+  private static final boolean useV21;
+  
+  static {
+    boolean v21 = true;
+    final String PACKAGE = "org.apache.hadoop.mapreduce";
+    try {
+      Class.forName(PACKAGE + ".task.JobContextImpl");
+    } catch (ClassNotFoundException cnfe) {
+      v21 = false;
+    }
+    useV21 = v21;
+    Class<?> jobContextCls;
+    Class<?> taskContextCls;
+    Class<?> mapCls;
+    Class<?> mapContextCls;
+    Class<?> innerMapContextCls;
+    try {
+      if (v21) {
+        jobContextCls = Class.forName(PACKAGE + ".task.JobContextImpl");
+        taskContextCls = Class.forName(PACKAGE + ".task.TaskAttemptContextImpl");
+        TASK_TYPE_CLASS = Class.forName(PACKAGE + ".TaskType");
+        mapContextCls = Class.forName(PACKAGE + ".task.MapContextImpl");
+        mapCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper");
+        innerMapContextCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper$Context");
+      } else {
+        jobContextCls = Class.forName(PACKAGE + ".JobContext");
+        taskContextCls = Class.forName(PACKAGE + ".TaskAttemptContext");
+        TASK_TYPE_CLASS = null;
+        mapContextCls = Class.forName(PACKAGE + ".MapContext");
+        mapCls = Class.forName(PACKAGE + ".Mapper");
+        innerMapContextCls = Class.forName(PACKAGE + ".Mapper$Context");
+      }
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException("Can't find class", e);
+    }
+    try {
+      JOB_CONTEXT_CONSTRUCTOR = jobContextCls.getConstructor(Configuration.class, JobID.class);
+      JOB_CONTEXT_CONSTRUCTOR.setAccessible(true);
+      TASK_CONTEXT_CONSTRUCTOR = taskContextCls.getConstructor(Configuration.class, TaskAttemptID.class);
+      TASK_CONTEXT_CONSTRUCTOR.setAccessible(true);
+      if (useV21) {
+        TASK_ID_CONSTRUCTOR = TaskAttemptID.class.getConstructor(String.class, int.class, TASK_TYPE_CLASS, int.class, int.class);
+        TASK_ID_CONSTRUCTOR.setAccessible(true);
+        MAP_CONSTRUCTOR = mapCls.getConstructor();
+        MAP_CONTEXT_CONSTRUCTOR = innerMapContextCls.getConstructor(mapCls, MapContext.class);
+        MAP_CONTEXT_IMPL_CONSTRUCTOR = mapContextCls.getDeclaredConstructor(Configuration.class, TaskAttemptID.class, RecordReader.class, RecordWriter.class,
+            OutputCommitter.class, StatusReporter.class, InputSplit.class);
+        MAP_CONTEXT_IMPL_CONSTRUCTOR.setAccessible(true);
+      } else {
+        TASK_ID_CONSTRUCTOR = TaskAttemptID.class.getConstructor(String.class, int.class, boolean.class, int.class, int.class);
+        TASK_ID_CONSTRUCTOR.setAccessible(true);
+        MAP_CONSTRUCTOR = null;
+        MAP_CONTEXT_CONSTRUCTOR = innerMapContextCls.getConstructor(mapCls, Configuration.class, TaskAttemptID.class, RecordReader.class, RecordWriter.class,
+            OutputCommitter.class, StatusReporter.class, InputSplit.class);
+        MAP_CONTEXT_IMPL_CONSTRUCTOR = null;
+      }
+      MAP_CONTEXT_CONSTRUCTOR.setAccessible(true);
+    } catch (SecurityException e) {
+      throw new IllegalArgumentException("Can't run constructor ", e);
+    } catch (NoSuchMethodException e) {
+      throw new IllegalArgumentException("Can't find constructor ", e);
+    }
+  }
+  
+  public static JobContext createJobContext() {
+    return createJobContext(new Configuration());
+  }
+  
+  public static JobContext createJobContext(Configuration conf) {
+    try {
+      return (JobContext) JOB_CONTEXT_CONSTRUCTOR.newInstance(conf, new JobID("local", 0));
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException("Can't create object", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException("Can't create object", e);
+    } catch (InvocationTargetException e) {
+      throw new IllegalArgumentException("Can't create object", e);
+    }
+  }
+  
+  public static TaskAttemptContext createTaskAttemptContext(JobContext job) {
+    return createTaskAttemptContext(job.getConfiguration());
+  }
+  
+  public static TaskAttemptContext createTaskAttemptContext(Configuration conf) {
+    try {
+      if (useV21)
+        return (TaskAttemptContext) TASK_CONTEXT_CONSTRUCTOR.newInstance(conf,
+            TASK_ID_CONSTRUCTOR.newInstance("local", 0, TASK_TYPE_CLASS.getEnumConstants()[0], 0, 0));
+      else
+        return (TaskAttemptContext) TASK_CONTEXT_CONSTRUCTOR.newInstance(conf, TASK_ID_CONSTRUCTOR.newInstance("local", 0, true, 0, 0));
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException("Can't create object", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException("Can't create object", e);
+    } catch (InvocationTargetException e) {
+      throw new IllegalArgumentException("Can't create object", e);
+    }
+  }
+  
+  public static <K1,V1,K2,V2> Mapper<K1,V1,K2,V2>.Context createMapContext(Mapper<K1,V1,K2,V2> m, TaskAttemptContext tac, RecordReader<K1,V1> reader,
+      RecordWriter<K2,V2> writer, InputSplit split) {
+    return createMapContext(m, tac, reader, writer, null, null, split);
+  }
+  
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public static <K1,V1,K2,V2> Mapper<K1,V1,K2,V2>.Context createMapContext(Mapper<K1,V1,K2,V2> m, TaskAttemptContext tac, RecordReader<K1,V1> reader,
+      RecordWriter<K2,V2> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) {
+    try {
+      if (useV21) {
+        Object basis = MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, committer, reporter, split);
+        return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance((Mapper<K1,V1,K2,V2>) MAP_CONSTRUCTOR.newInstance(), basis);
+      } else {
+        return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, committer, reporter,
+            split);
+      }
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException("Can't create object", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException("Can't create object", e);
+    } catch (InvocationTargetException e) {
+      throw new IllegalArgumentException("Can't create object", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/Daemon.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/Daemon.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/Daemon.java
new file mode 100644
index 0000000..7ce46eb
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/Daemon.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+public class Daemon extends Thread {
+  
+  public Daemon() {
+    setDaemon(true);
+  }
+  
+  public Daemon(Runnable target) {
+    super(target);
+    setDaemon(true);
+  }
+  
+  public Daemon(String name) {
+    super(name);
+    setDaemon(true);
+  }
+  
+  public Daemon(ThreadGroup group, Runnable target) {
+    super(group, target);
+    setDaemon(true);
+  }
+  
+  public Daemon(ThreadGroup group, String name) {
+    super(group, name);
+    setDaemon(true);
+  }
+  
+  public Daemon(Runnable target, String name) {
+    super(target, name);
+    setDaemon(true);
+  }
+  
+  public Daemon(ThreadGroup group, Runnable target, String name) {
+    super(group, target, name);
+    setDaemon(true);
+  }
+  
+  public Daemon(ThreadGroup group, Runnable target, String name, long stackSize) {
+    super(group, target, name, stackSize);
+    setDaemon(true);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/Duration.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/Duration.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/Duration.java
new file mode 100644
index 0000000..91ae089
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/Duration.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+public class Duration {
+  
+  public static String format(long time) {
+    return format(time, "&nbsp;");
+  }
+  
+  public static String format(long time, String space) {
+    return format(time, space, "&mdash;");
+  }
+  
+  public static String format(long time, String space, String zero) {
+    long ms, sec, min, hr, day, yr;
+    ms = sec = min = hr = day = yr = -1;
+    if (time == 0)
+      return zero;
+    ms = time % 1000;
+    time /= 1000;
+    if (time == 0)
+      return String.format("%dms", ms);
+    sec = time % 60;
+    time /= 60;
+    if (time == 0)
+      return String.format("%ds" + space + "%dms", sec, ms);
+    min = time % 60;
+    time /= 60;
+    if (time == 0)
+      return String.format("%dm" + space + "%ds", min, sec);
+    hr = time % 24;
+    time /= 24;
+    if (time == 0)
+      return String.format("%dh" + space + "%dm", hr, min);
+    day = time % 365;
+    time /= 365;
+    if (time == 0)
+      return String.format("%dd" + space + "%dh", day, hr);
+    yr = time;
+    return String.format("%dy" + space + "%dd", yr, day);
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/Encoding.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/Encoding.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/Encoding.java
new file mode 100644
index 0000000..1c8cb5d
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/Encoding.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.Text;
+
+public class Encoding {
+  
+  public static String encodeAsBase64FileName(Text data) {
+    String encodedRow = new String(Base64.encodeBase64(TextUtil.getBytes(data)));
+    encodedRow = encodedRow.replace('/', '_').replace('+', '-');
+    
+    int index = encodedRow.length() - 1;
+    while (index >= 0 && encodedRow.charAt(index) == '=')
+      index--;
+    
+    encodedRow = encodedRow.substring(0, index + 1);
+    return encodedRow;
+  }
+  
+  public static byte[] decodeBase64FileName(String node) {
+    while (node.length() % 4 != 0)
+      node += "=";
+    
+    node = node.replace('_', '/').replace('-', '+');
+    
+    return Base64.decodeBase64(node.getBytes());
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
new file mode 100644
index 0000000..b9959f8
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package org.apache.accumulo.core.util;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Column;
+import org.apache.hadoop.io.Text;
+
+public class LocalityGroupUtil {
+  
+  // private static final Logger log = Logger.getLogger(ColumnFamilySet.class);
+  
+  public static Set<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
+  
+  public static Set<ByteSequence> families(Collection<Column> columns) {
+    Set<ByteSequence> result = new HashSet<ByteSequence>(columns.size());
+    for (Column col : columns) {
+      result.add(new ArrayByteSequence(col.getColumnFamily()));
+    }
+    return result;
+  }
+  
+  @SuppressWarnings("serial")
+  static public class LocalityGroupConfigurationError extends AccumuloException {
+    LocalityGroupConfigurationError(String why) {
+      super(why);
+    }
+  }
+  
+  public static Map<String,Set<ByteSequence>> getLocalityGroups(AccumuloConfiguration acuconf) throws LocalityGroupConfigurationError {
+    Map<String,Set<ByteSequence>> result = new HashMap<String,Set<ByteSequence>>();
+    String[] groups = acuconf.get(Property.TABLE_LOCALITY_GROUPS).split(",");
+    for (String group : groups) {
+      if (group.length() > 0)
+        result.put(group, new HashSet<ByteSequence>());
+    }
+    HashSet<ByteSequence> all = new HashSet<ByteSequence>();
+    for (Entry<String,String> entry : acuconf) {
+      String property = entry.getKey();
+      String value = entry.getValue();
+      String prefix = Property.TABLE_LOCALITY_GROUP_PREFIX.getKey();
+      if (property.startsWith(prefix)) {
+        // this property configures a locality group, find out which one:
+        String group = property.substring(prefix.length());
+        String[] parts = group.split("\\.");
+        group = parts[0];
+        if (result.containsKey(group)) {
+          if (parts.length == 1) {
+            Set<ByteSequence> colFamsSet = decodeColumnFamilies(value);
+            if (!Collections.disjoint(all, colFamsSet)) {
+              colFamsSet.retainAll(all);
+              throw new LocalityGroupConfigurationError("Column families " + colFamsSet + " in group " + group + " is already used by another locality group");
+            }
+            
+            all.addAll(colFamsSet);
+            result.put(group, colFamsSet);
+          }
+        }
+      }
+    }
+    // result.put("", all);
+    return result;
+  }
+  
+  public static Set<ByteSequence> decodeColumnFamilies(String colFams) throws LocalityGroupConfigurationError {
+    HashSet<ByteSequence> colFamsSet = new HashSet<ByteSequence>();
+    
+    for (String family : colFams.split(",")) {
+      ByteSequence cfbs = decodeColumnFamily(family);
+      colFamsSet.add(cfbs);
+    }
+    
+    return colFamsSet;
+  }
+  
+  public static ByteSequence decodeColumnFamily(String colFam) throws LocalityGroupConfigurationError {
+    byte output[] = new byte[colFam.length()];
+    int pos = 0;
+    
+    for (int i = 0; i < colFam.length(); i++) {
+      char c = colFam.charAt(i);
+      
+      if (c == '\\') {
+        // next char must be 'x' or '\'
+        i++;
+        
+        if (i >= colFam.length()) {
+          throw new LocalityGroupConfigurationError("Expected 'x' or '\' after '\'  in " + colFam);
+        }
+        
+        char nc = colFam.charAt(i);
+        
+        switch (nc) {
+          case '\\':
+            output[pos++] = '\\';
+            break;
+          case 'x':
+            // next two chars must be [0-9][0-9]
+            i++;
+            output[pos++] = (byte) (0xff & Integer.parseInt(colFam.substring(i, i + 2), 16));
+            i++;
+            break;
+          default:
+            throw new LocalityGroupConfigurationError("Expected 'x' or '\' after '\'  in " + colFam);
+        }
+      } else {
+        output[pos++] = (byte) (0xff & c);
+      }
+      
+    }
+    
+    return new ArrayByteSequence(output, 0, pos);
+    
+  }
+  
+  public static String encodeColumnFamilies(Set<Text> colFams) {
+    HashSet<String> ecfs = new HashSet<String>();
+    
+    StringBuilder sb = new StringBuilder();
+    
+    for (Text text : colFams) {
+      String ecf = encodeColumnFamily(sb, text.getBytes(), 0, text.getLength());
+      ecfs.add(ecf);
+    }
+    
+    return StringUtil.join(ecfs, ",");
+  }
+  
+  public static String encodeColumnFamily(ByteSequence bs) {
+    return encodeColumnFamily(new StringBuilder(), bs.getBackingArray(), bs.offset(), bs.length());
+  }
+  
+  private static String encodeColumnFamily(StringBuilder sb, byte[] ba, int offset, int len) {
+    sb.setLength(0);
+    
+    for (int i = 0; i < len; i++) {
+      int c = 0xff & ba[i];
+      if (c == '\\')
+        sb.append("\\\\");
+      else if (c >= 32 && c <= 126 && c != ',')
+        sb.append((char) c);
+      else
+        sb.append("\\x").append(String.format("%02X", c));
+    }
+    
+    String ecf = sb.toString();
+    return ecf;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/LoggingRunnable.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/LoggingRunnable.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/LoggingRunnable.java
new file mode 100644
index 0000000..87fa319
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/LoggingRunnable.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.util.Date;
+
+import org.apache.log4j.Logger;
+
+public class LoggingRunnable implements Runnable {
+  private Runnable runnable;
+  private Logger log;
+  
+  public LoggingRunnable(Logger log, Runnable r) {
+    this.runnable = r;
+    this.log = log;
+  }
+  
+  public void run() {
+    try {
+      runnable.run();
+    } catch (Throwable t) {
+      try {
+        log.error("Thread \"" + Thread.currentThread().getName() + "\" died " + t.getMessage(), t);
+      } catch (Throwable t2) {
+        // maybe the logging system is screwed up OR there is a bug in the exception, like t.getMessage() throws a NPE
+        System.err.println("ERROR " + new Date() + " Failed to log message about thread death " + t2.getMessage());
+        t2.printStackTrace();
+        
+        // try to print original exception
+        System.err.println("ERROR " + new Date() + " Exception that failed to log : " + t.getMessage());
+        t.printStackTrace();
+      }
+    }
+  }
+  
+  public static void main(String[] args) {
+    Runnable r = new Runnable() {
+      @Override
+      public void run() {
+        int x[] = new int[0];
+        
+        x[0]++;
+      }
+    };
+    
+    LoggingRunnable lr = new LoggingRunnable(null, r);
+    lr.run();
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/Merge.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/Merge.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/Merge.java
new file mode 100644
index 0000000..33c25eb
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/Merge.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class Merge {
+  
+  public static class MergeException extends Exception {
+    private static final long serialVersionUID = 1L;
+    
+    MergeException(Exception ex) {
+      super(ex);
+    }
+  };
+  
+  private static final Logger log = Logger.getLogger(Merge.class);
+  
+  protected void message(String format, Object... args) {
+    log.info(String.format(format, args));
+  }
+  
+  public void start(String[] args) throws MergeException, ParseException {
+    String keepers = "localhost";
+    String instance = "instance";
+    String table = null;
+    long goalSize = -1;
+    String user = "root";
+    byte[] password = "secret".getBytes();
+    boolean force = false;
+    Text begin = null;
+    Text end = null;
+    
+    Options options = new Options();
+    options.addOption("k", "keepers", true, "ZooKeeper list");
+    options.addOption("i", "instance", true, "instance name");
+    options.addOption("t", "table", true, "table to merge");
+    options.addOption("s", "size", true, "merge goal size");
+    options.addOption("u", "user", true, "user");
+    options.addOption("p", "password", true, "password");
+    options.addOption("f", "force", false, "merge small tablets even if merging them to larger tablets might cause a split");
+    options.addOption("b", "begin", true, "start tablet");
+    options.addOption("e", "end", true, "end tablet");
+    CommandLine commandLine = new BasicParser().parse(options, args);
+    if (commandLine.hasOption("k")) {
+      keepers = commandLine.getOptionValue("k");
+    }
+    if (commandLine.hasOption("i")) {
+      instance = commandLine.getOptionValue("i");
+    }
+    if (commandLine.hasOption("t")) {
+      table = commandLine.getOptionValue("t");
+    }
+    if (commandLine.hasOption("s")) {
+      goalSize = AccumuloConfiguration.getMemoryInBytes(commandLine.getOptionValue("s"));
+    }
+    if (commandLine.hasOption("u")) {
+    	table = commandLine.getOptionValue("u");
+    }
+    if (commandLine.hasOption("p")) {
+        password = commandLine.getOptionValue("p").getBytes();
+    }
+    if (commandLine.hasOption("f")) {
+      force = true;
+    }
+    if (commandLine.hasOption("b")) {
+      begin = new Text(commandLine.getOptionValue("b"));
+    }
+    if (commandLine.hasOption("e")) {
+    	end = new Text(commandLine.getOptionValue("e"));
+    }
+    if (table == null) {
+      System.err.println("Specify the table to merge");
+      return;
+    }
+    Instance zki = new ZooKeeperInstance(instance, keepers);
+    try {
+      Connector conn = zki.getConnector(user, password);
+      
+      if (!conn.tableOperations().exists(table)) {
+        System.err.println("table " + table + " does not exist");
+        return;
+      }
+      if (goalSize < 1) {
+        AccumuloConfiguration tableConfig = new ConfigurationCopy(conn.tableOperations().getProperties(table));
+        goalSize = tableConfig.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD);
+      }
+      
+      message("Merging tablets in table %s to %d bytes", table, goalSize);
+      mergomatic(conn, table, begin, end, goalSize, force);
+    } catch (Exception ex) {
+      throw new MergeException(ex);
+    }
+  }
+  
+  public static void main(String[] args) throws MergeException, ParseException {
+    Merge merge = new Merge();
+    merge.start(args);
+  }
+  
+  public static class Size {
+    public Size(KeyExtent extent, long size) {
+      this.extent = extent;
+      this.size = size;
+    }
+    
+    KeyExtent extent;
+    long size;
+  }
+  
+  public void mergomatic(Connector conn, String table, Text start, Text end, long goalSize, boolean force) throws MergeException {
+    try {
+      if (table.equals(Constants.METADATA_TABLE_NAME)) {
+        throw new IllegalArgumentException("cannot merge tablets on the metadata table");
+      }
+      List<Size> sizes = new ArrayList<Size>();
+      long totalSize = 0;
+      // Merge any until you get larger than the goal size, and then merge one less tablet
+      Iterator<Size> sizeIterator = getSizeIterator(conn, table, start, end);
+      while (sizeIterator.hasNext()) {
+        Size next = sizeIterator.next();
+        totalSize += next.size;
+        sizes.add(next);
+        if (totalSize > goalSize) {
+          totalSize = mergeMany(conn, table, sizes, goalSize, force, false);
+        }
+      }
+      if (sizes.size() > 1)
+        mergeMany(conn, table, sizes, goalSize, force, true);
+    } catch (Exception ex) {
+      throw new MergeException(ex);
+    }
+  }
+  
+  protected long mergeMany(Connector conn, String table, List<Size> sizes, long goalSize, boolean force, boolean last) throws MergeException {
+    // skip the big tablets, which will be the typical case
+    while (!sizes.isEmpty()) {
+      if (sizes.get(0).size < goalSize)
+        break;
+      sizes.remove(0);
+    }
+    if (sizes.isEmpty()) {
+      return 0;
+    }
+    
+    // collect any small ones
+    long mergeSize = 0;
+    int numToMerge = 0;
+    for (int i = 0; i < sizes.size(); i++) {
+      if (mergeSize + sizes.get(i).size > goalSize) {
+        numToMerge = i;
+        break;
+      }
+      mergeSize += sizes.get(i).size;
+    }
+    
+    if (numToMerge > 1) {
+      mergeSome(conn, table, sizes, numToMerge);
+    } else {
+      if (numToMerge == 1 && sizes.size() > 1) {
+        // here we have the case of a merge candidate that is surrounded by candidates that would split
+        if (force) {
+          mergeSome(conn, table, sizes, 2);
+        } else {
+          sizes.remove(0);
+        }
+      }
+    }
+    if (numToMerge == 0 && sizes.size() > 1 && last) {
+      // That's the last tablet, and we have a bunch to merge
+      mergeSome(conn, table, sizes, sizes.size());
+    }
+    long result = 0;
+    for (Size s : sizes) {
+      result += s.size;
+    }
+    return result;
+  }
+  
+  protected void mergeSome(Connector conn, String table, List<Size> sizes, int numToMerge) throws MergeException {
+    merge(conn, table, sizes, numToMerge);
+    for (int i = 0; i < numToMerge; i++) {
+      sizes.remove(0);
+    }
+  }
+  
+  protected void merge(Connector conn, String table, List<Size> sizes, int numToMerge) throws MergeException {
+    try {
+      Text start = sizes.get(0).extent.getPrevEndRow();
+      Text end = sizes.get(numToMerge - 1).extent.getEndRow();
+      message("Merging %d tablets from (%s to %s]", numToMerge, start == null ? "-inf" : start, end == null ? "+inf" : end);
+      conn.tableOperations().merge(table, start, end);
+    } catch (Exception ex) {
+      throw new MergeException(ex);
+    }
+  }
+  
+  protected Iterator<Size> getSizeIterator(Connector conn, String tablename, Text start, Text end) throws MergeException {
+    // open up the !METADATA table, walk through the tablets.
+    String tableId;
+    Scanner scanner;
+    try {
+      tableId = Tables.getTableId(conn.getInstance(), tablename);
+      scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+    } catch (Exception e) {
+      throw new MergeException(e);
+    }
+    scanner.setRange(new KeyExtent(new Text(tableId), end, start).toMetadataRange());
+    scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
+    Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
+    final Iterator<Entry<Key,Value>> iterator = scanner.iterator();
+    
+    Iterator<Size> result = new Iterator<Size>() {
+      Size next = fetch();
+      
+      @Override
+      public boolean hasNext() {
+        return next != null;
+      }
+      
+      private Size fetch() {
+        long tabletSize = 0;
+        while (iterator.hasNext()) {
+          Entry<Key,Value> entry = iterator.next();
+          Key key = entry.getKey();
+          if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
+            String[] sizeEntries = new String(entry.getValue().get()).split(",");
+            if (sizeEntries.length == 2) {
+              tabletSize += Long.parseLong(sizeEntries[0]);
+            }
+          } else if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
+            KeyExtent extent = new KeyExtent(key.getRow(), entry.getValue());
+            return new Size(extent, tabletSize);
+          }
+        }
+        return null;
+      }
+      
+      @Override
+      public Size next() {
+        Size result = next;
+        next = fetch();
+        return result;
+      }
+      
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+    return result;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/MetadataTable.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/MetadataTable.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/MetadataTable.java
new file mode 100644
index 0000000..e440ee3
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/MetadataTable.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.ScannerImpl;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.thrift.AuthInfo;
+import org.apache.hadoop.io.Text;
+
+public class MetadataTable {
+  public static class DataFileValue {
+    private long size;
+    private long numEntries;
+    private long time = -1;
+    
+    public DataFileValue(long size, long numEntries, long time) {
+      this.size = size;
+      this.numEntries = numEntries;
+      this.time = time;
+    }
+    
+    public DataFileValue(long size, long numEntries) {
+      this.size = size;
+      this.numEntries = numEntries;
+      this.time = -1;
+    }
+    
+    public DataFileValue(byte[] encodedDFV) {
+      String[] ba = new String(encodedDFV).split(",");
+      
+      size = Long.parseLong(ba[0]);
+      numEntries = Long.parseLong(ba[1]);
+      
+      if (ba.length == 3)
+        time = Long.parseLong(ba[2]);
+      else
+        time = -1;
+    }
+    
+    public long getSize() {
+      return size;
+    }
+    
+    public long getNumEntries() {
+      return numEntries;
+    }
+    
+    public boolean isTimeSet() {
+      return time >= 0;
+    }
+    
+    public long getTime() {
+      return time;
+    }
+    
+    public byte[] encode() {
+      if (time >= 0)
+        return ("" + size + "," + numEntries + "," + time).getBytes();
+      return ("" + size + "," + numEntries).getBytes();
+    }
+    
+    public boolean equals(Object o) {
+      if (o instanceof DataFileValue) {
+        DataFileValue odfv = (DataFileValue) o;
+        
+        return size == odfv.size && numEntries == odfv.numEntries;
+      }
+      
+      return false;
+    }
+    
+    public int hashCode() {
+      return Long.valueOf(size + numEntries).hashCode();
+    }
+    
+    public String toString() {
+      return size + " " + numEntries;
+    }
+    
+    public void setTime(long time) {
+      if (time < 0)
+        throw new IllegalArgumentException();
+      this.time = time;
+    }
+  }
+  
+  public static SortedMap<KeyExtent,Text> getMetadataLocationEntries(SortedMap<Key,Value> entries) {
+    Key key;
+    Value val;
+    Text location = null;
+    Value prevRow = null;
+    KeyExtent ke;
+    
+    SortedMap<KeyExtent,Text> results = new TreeMap<KeyExtent,Text>();
+    
+    Text lastRowFromKey = new Text();
+    
+    // text obj below is meant to be reused in loop for efficiency
+    Text colf = new Text();
+    Text colq = new Text();
+    
+    for (Entry<Key,Value> entry : entries.entrySet()) {
+      key = entry.getKey();
+      val = entry.getValue();
+      
+      if (key.compareRow(lastRowFromKey) != 0) {
+        prevRow = null;
+        location = null;
+        key.getRow(lastRowFromKey);
+      }
+      
+      colf = key.getColumnFamily(colf);
+      colq = key.getColumnQualifier(colq);
+      
+      // interpret the row id as a key extent
+      if (colf.equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY) || colf.equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY))
+        location = new Text(val.toString());
+      else if (Constants.METADATA_PREV_ROW_COLUMN.equals(colf, colq))
+        prevRow = new Value(val);
+      
+      if (location != null && prevRow != null) {
+        ke = new KeyExtent(key.getRow(), prevRow);
+        results.put(ke, location);
+        
+        location = null;
+        prevRow = null;
+      }
+    }
+    return results;
+  }
+  
+  public static SortedMap<Text,SortedMap<ColumnFQ,Value>> getTabletEntries(Instance instance, KeyExtent ke, List<ColumnFQ> columns, AuthInfo credentials) {
+    TreeMap<Key,Value> tkv = new TreeMap<Key,Value>();
+    getTabletAndPrevTabletKeyValues(instance, tkv, ke, columns, credentials);
+    return getTabletEntries(tkv, columns);
+  }
+  
+  public static SortedMap<Text,SortedMap<ColumnFQ,Value>> getTabletEntries(SortedMap<Key,Value> tabletKeyValues, List<ColumnFQ> columns) {
+    TreeMap<Text,SortedMap<ColumnFQ,Value>> tabletEntries = new TreeMap<Text,SortedMap<ColumnFQ,Value>>();
+    
+    HashSet<ColumnFQ> colSet = null;
+    if (columns != null) {
+      colSet = new HashSet<ColumnFQ>(columns);
+    }
+    
+    for (Entry<Key,Value> entry : tabletKeyValues.entrySet()) {
+      
+      if (columns != null && !colSet.contains(new ColumnFQ(entry.getKey()))) {
+        continue;
+      }
+      
+      Text row = entry.getKey().getRow();
+      
+      SortedMap<ColumnFQ,Value> colVals = tabletEntries.get(row);
+      if (colVals == null) {
+        colVals = new TreeMap<ColumnFQ,Value>();
+        tabletEntries.put(row, colVals);
+      }
+      
+      colVals.put(new ColumnFQ(entry.getKey()), entry.getValue());
+    }
+    
+    return tabletEntries;
+  }
+  
+  public static void getTabletAndPrevTabletKeyValues(Instance instance, SortedMap<Key,Value> tkv, KeyExtent ke, List<ColumnFQ> columns, AuthInfo credentials) {
+    Text startRow;
+    Text endRow = ke.getMetadataEntry();
+    
+    if (ke.getPrevEndRow() == null) {
+      startRow = new Text(KeyExtent.getMetadataEntry(ke.getTableId(), new Text()));
+    } else {
+      startRow = new Text(KeyExtent.getMetadataEntry(ke.getTableId(), ke.getPrevEndRow()));
+    }
+    
+    Scanner scanner = new ScannerImpl(instance, credentials, Constants.METADATA_TABLE_ID, Constants.NO_AUTHS);
+    
+    if (columns != null) {
+      for (ColumnFQ column : columns)
+        column.fetch(scanner);
+    }
+    
+    scanner.setRange(new Range(new Key(startRow), true, new Key(endRow).followingKey(PartialKey.ROW), false));
+    
+    tkv.clear();
+    boolean successful = false;
+    try {
+      for (Entry<Key,Value> entry : scanner) {
+        tkv.put(entry.getKey(), entry.getValue());
+      }
+      successful = true;
+    } finally {
+      if (!successful) {
+        tkv.clear();
+      }
+    }
+  }
+  
+  public static void getEntries(Instance instance, AuthInfo credentials, String table, boolean isTid, Map<KeyExtent,String> locations,
+      SortedSet<KeyExtent> tablets) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    String tableId = isTid ? table : Tables.getNameToIdMap(instance).get(table);
+    
+    Scanner scanner = instance.getConnector(credentials.user, credentials.password).createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+    
+    Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
+    scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
+    
+    // position at first entry in metadata table for given table
+    KeyExtent ke = new KeyExtent(new Text(tableId), new Text(), null);
+    Key startKey = new Key(ke.getMetadataEntry());
+    ke = new KeyExtent(new Text(tableId), null, null);
+    Key endKey = new Key(ke.getMetadataEntry()).followingKey(PartialKey.ROW);
+    scanner.setRange(new Range(startKey, endKey));
+    
+    Text colf = new Text();
+    Text colq = new Text();
+    
+    KeyExtent currentKeyExtent = null;
+    String location = null;
+    Text row = null;
+    // acquire this tables METADATA table entries
+    boolean haveExtent = false;
+    boolean haveLocation = false;
+    for (Entry<Key,Value> entry : scanner) {
+      if (row != null) {
+        if (!row.equals(entry.getKey().getRow())) {
+          currentKeyExtent = null;
+          haveExtent = false;
+          haveLocation = false;
+          row = entry.getKey().getRow();
+        }
+      } else
+        row = entry.getKey().getRow();
+      
+      colf = entry.getKey().getColumnFamily(colf);
+      colq = entry.getKey().getColumnQualifier(colq);
+      
+      // stop scanning metadata table when another table is reached
+      if (!(new KeyExtent(entry.getKey().getRow(), (Text) null)).getTableId().toString().equals(tableId))
+        break;
+      
+      if (Constants.METADATA_PREV_ROW_COLUMN.equals(colf, colq)) {
+        currentKeyExtent = new KeyExtent(entry.getKey().getRow(), entry.getValue());
+        tablets.add(currentKeyExtent);
+        haveExtent = true;
+      } else if (colf.equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)) {
+        location = entry.getValue().toString();
+        haveLocation = true;
+      }
+      
+      if (haveExtent && haveLocation) {
+        locations.put(currentKeyExtent, location);
+        haveExtent = false;
+        haveLocation = false;
+        currentKeyExtent = null;
+      }
+    }
+    
+    validateEntries(tableId, tablets);
+  }
+  
+  public static void validateEntries(String tableId, SortedSet<KeyExtent> tablets) throws AccumuloException {
+    // sanity check of metadata table entries
+    // make sure tablets has no holes, and that it starts and ends w/ null
+    if (tablets.size() == 0)
+      throw new AccumuloException("No entries found in metadata table for table " + tableId);
+    
+    if (tablets.first().getPrevEndRow() != null)
+      throw new AccumuloException("Problem with metadata table, first entry for table " + tableId + "- " + tablets.first() + " - has non null prev end row");
+    
+    if (tablets.last().getEndRow() != null)
+      throw new AccumuloException("Problem with metadata table, last entry for table " + tableId + "- " + tablets.first() + " - has non null end row");
+    
+    Iterator<KeyExtent> tabIter = tablets.iterator();
+    Text lastEndRow = tabIter.next().getEndRow();
+    while (tabIter.hasNext()) {
+      KeyExtent tabke = tabIter.next();
+      
+      if (tabke.getPrevEndRow() == null)
+        throw new AccumuloException("Problem with metadata table, it has null prev end row in middle of table " + tabke);
+      
+      if (!tabke.getPrevEndRow().equals(lastEndRow))
+        throw new AccumuloException("Problem with metadata table, it has a hole " + tabke.getPrevEndRow() + " != " + lastEndRow);
+      
+      lastEndRow = tabke.getEndRow();
+    }
+    
+    // end METADATA table sanity check
+  }
+  
+  public static boolean isContiguousRange(KeyExtent ke, SortedSet<KeyExtent> children) {
+    if (children.size() == 0)
+      return false;
+    
+    if (children.size() == 1)
+      return children.first().equals(ke);
+    
+    Text per = children.first().getPrevEndRow();
+    Text er = children.last().getEndRow();
+    
+    boolean perEqual = (per == ke.getPrevEndRow() || per != null && ke.getPrevEndRow() != null && ke.getPrevEndRow().compareTo(per) == 0);
+    
+    boolean erEqual = (er == ke.getEndRow() || er != null && ke.getEndRow() != null && ke.getEndRow().compareTo(er) == 0);
+    
+    if (!perEqual || !erEqual)
+      return false;
+    
+    Iterator<KeyExtent> iter = children.iterator();
+    
+    Text lastEndRow = iter.next().getEndRow();
+    
+    while (iter.hasNext()) {
+      KeyExtent cke = iter.next();
+      
+      per = cke.getPrevEndRow();
+      
+      // something in the middle should not be null
+      
+      if (per == null || lastEndRow == null)
+        return false;
+      
+      if (per.compareTo(lastEndRow) != 0)
+        return false;
+      
+      lastEndRow = cke.getEndRow();
+    }
+    
+    return true;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
new file mode 100644
index 0000000..7c67a1a
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.cloudtrace.instrument.TraceRunnable;
+import org.apache.log4j.Logger;
+
+public class NamingThreadFactory implements ThreadFactory {
+  private static final Logger log = Logger.getLogger(NamingThreadFactory.class);
+  
+  private AtomicInteger threadNum = new AtomicInteger(1);
+  private String name;
+  
+  public NamingThreadFactory(String name) {
+    this.name = name;
+  }
+  
+  public Thread newThread(Runnable r) {
+    return new Daemon(new LoggingRunnable(log, new TraceRunnable(r)), name + " " + threadNum.getAndIncrement());
+  }
+  
+}


Mime
View raw message