Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D2D0918A7A for ; Thu, 3 Mar 2016 21:59:26 +0000 (UTC) Received: (qmail 98716 invoked by uid 500); 3 Mar 2016 21:59:26 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 98657 invoked by uid 500); 3 Mar 2016 21:59:26 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 98540 invoked by uid 99); 3 Mar 2016 21:59:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Mar 2016 21:59:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4DCE9E78F6; Thu, 3 Mar 2016 21:59:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Thu, 03 Mar 2016 21:59:32 -0000 Message-Id: <3fdc04eb46f74697a3408a392cda2955@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [07/61] [abbrv] [partial] accumulo git commit: ACCUMULO-722 put trunk in my sandbox http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/trace/TraceFormatter.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/trace/TraceFormatter.java b/1.5/core/src/main/java/org/apache/accumulo/core/trace/TraceFormatter.java new file mode 100644 index 0000000..bbddadb --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/trace/TraceFormatter.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.trace; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Iterator; +import java.util.Map.Entry; + +import org.apache.accumulo.cloudtrace.thrift.RemoteSpan; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.format.DefaultFormatter; +import org.apache.accumulo.core.util.format.Formatter; +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.io.Text; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.transport.TMemoryInputTransport; + + +/** + * A formatter than can be used in the shell to display trace information. + * + */ +public class TraceFormatter implements Formatter { + public static final String DATE_FORMAT = "yyyy/MM/dd HH:mm:ss.SSS"; + // ugh... SimpleDataFormat is not thread safe + private static final ThreadLocal formatter = new ThreadLocal() { + @Override + protected SimpleDateFormat initialValue() { + return new SimpleDateFormat(DATE_FORMAT); + } + }; + + public static String formatDate(final Date date) { + return formatter.get().format(date); + } + + private final static Text SPAN_CF = new Text("span"); + + private Iterator> scanner; + private boolean printTimeStamps; + + public static RemoteSpan getRemoteSpan(Entry entry) { + TMemoryInputTransport transport = new TMemoryInputTransport(entry.getValue().get()); + TCompactProtocol protocol = new TCompactProtocol(transport); + RemoteSpan span = new RemoteSpan(); + try { + span.read(protocol); + } catch (TException ex) { + throw new RuntimeException(ex); + } + return span; + } + + @Override + public boolean hasNext() { + return scanner.hasNext(); + } + + @Override + public String next() { + Entry next = scanner.next(); + if (next.getKey().getColumnFamily().equals(SPAN_CF)) { + StringBuilder result = new StringBuilder(); + SimpleDateFormat dateFormatter = new SimpleDateFormat(DATE_FORMAT); + RemoteSpan span = getRemoteSpan(next); + result.append("----------------------\n"); + result.append(String.format(" %12s:%s\n", "name", span.description)); + result.append(String.format(" %12s:%s\n", "trace", Long.toHexString(span.traceId))); + result.append(String.format(" %12s:%s\n", "loc", span.svc + "@" + span.sender)); + result.append(String.format(" %12s:%s\n", "span", Long.toHexString(span.spanId))); + result.append(String.format(" %12s:%s\n", "parent", Long.toHexString(span.parentId))); + result.append(String.format(" %12s:%s\n", "start", dateFormatter.format(span.start))); + result.append(String.format(" %12s:%s\n", "ms", span.stop - span.start)); + for (Entry entry : span.data.entrySet()) { + result.append(String.format(" %12s:%s\n", entry.getKey(), entry.getValue())); + } + + if (printTimeStamps) { + result.append(String.format(" %-12s:%d\n", "timestamp", next.getKey().getTimestamp())); + } + return result.toString(); + } + return DefaultFormatter.formatEntry(next, printTimeStamps); + } + + @Override + public void remove() { + throw new NotImplementedException(); + } + + @Override + public void initialize(Iterable> scanner, boolean printTimestamps) { + this.scanner = scanner.iterator(); + this.printTimeStamps = printTimestamps; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java b/1.5/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java new file mode 100644 index 0000000..4b60b97 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.trace; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.accumulo.cloudtrace.instrument.receivers.SendSpansViaThrift; +import org.apache.accumulo.fate.zookeeper.ZooReader; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + + +/** + * Find a Span collector via zookeeper and push spans there via Thrift RPC + * + */ +public class ZooTraceClient extends SendSpansViaThrift implements Watcher { + + private static final Logger log = Logger.getLogger(ZooTraceClient.class); + + final ZooReader zoo; + final String path; + final Random random = new Random(); + final List hosts = new ArrayList(); + + public ZooTraceClient(ZooReader zoo, String path, String host, String service, long millis) throws IOException, KeeperException, InterruptedException { + super(host, service, millis); + this.path = path; + this.zoo = zoo; + updateHosts(path, zoo.getChildren(path, this)); + } + + @Override + synchronized protected String getSpanKey(Map data) { + if (hosts.size() > 0) { + return hosts.get(random.nextInt(hosts.size())); + } + return null; + } + + @Override + public void process(WatchedEvent event) { + try { + updateHosts(path, zoo.getChildren(path, null)); + } catch (Exception ex) { + log.error("unable to get destination hosts in zookeeper", ex); + } + } + + synchronized private void updateHosts(String path, List children) { + log.debug("Scanning trace hosts in zookeeper: " + path); + try { + List hosts = new ArrayList(); + for (String child : children) { + byte[] data = zoo.getData(path + "/" + child, null); + hosts.add(new String(data)); + } + this.hosts.clear(); + this.hosts.addAll(hosts); + log.debug("Trace hosts: " + this.hosts); + } catch (Exception ex) { + log.error("unable to get destination hosts in zookeeper", ex); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java new file mode 100644 index 0000000..08ae106 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.net.InetSocketAddress; + +import org.apache.hadoop.io.Text; +import org.apache.thrift.transport.TSocket; + +public class AddressUtil { + static public InetSocketAddress parseAddress(String address, int defaultPort) throws NumberFormatException { + String[] parts = address.split(":", 2); + if (address.contains("+")) + parts = address.split("\\+", 2); + if (parts.length == 2) { + if (parts[1].isEmpty()) + return new InetSocketAddress(parts[0], defaultPort); + return new InetSocketAddress(parts[0], Integer.parseInt(parts[1])); + } + return new InetSocketAddress(address, defaultPort); + } + + static public InetSocketAddress parseAddress(Text address, int defaultPort) { + return parseAddress(address.toString(), defaultPort); + } + + static public TSocket createTSocket(String address, int defaultPort) { + InetSocketAddress addr = parseAddress(address, defaultPort); + return new TSocket(addr.getHostName(), addr.getPort()); + } + + static public String toString(InetSocketAddress addr) { + return addr.getAddress().getHostAddress() + ":" + addr.getPort(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/ArgumentChecker.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/ArgumentChecker.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/ArgumentChecker.java new file mode 100644 index 0000000..0c8ba07 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/ArgumentChecker.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +/** + * This class provides methods to check arguments of a variable number for null values, or anything else that might be required on a routine basis. These + * methods should be used for early failures as close to the end user as possible, so things do not fail later on the server side, when they are harder to + * debug. + * + * Methods are created for a specific number of arguments, due to the poor performance of array allocation for varargs methods. + */ +public class ArgumentChecker { + private static final String NULL_ARG_MSG = "argument was null"; + + public static final void notNull(final Object arg1) { + if (arg1 == null) + throw new IllegalArgumentException(NULL_ARG_MSG + ":Is null- arg1? " + (arg1 == null)); + } + + public static final void notNull(final Object arg1, final Object arg2) { + if (arg1 == null || arg2 == null) + throw new IllegalArgumentException(NULL_ARG_MSG + ":Is null- arg1? " + (arg1 == null) + " arg2? " + (arg2 == null)); + } + + public static final void notNull(final Object arg1, final Object arg2, final Object arg3) { + if (arg1 == null || arg2 == null || arg3 == null) + throw new IllegalArgumentException(NULL_ARG_MSG + ":Is null- arg1? " + (arg1 == null) + " arg2? " + (arg2 == null) + " arg3? " + (arg3 == null)); + } + + public static final void notNull(final Object arg1, final Object arg2, final Object arg3, final Object arg4) { + if (arg1 == null || arg2 == null || arg3 == null || arg4 == null) + throw new IllegalArgumentException(NULL_ARG_MSG + ":Is null- arg1? " + (arg1 == null) + " arg2? " + (arg2 == null) + " arg3? " + (arg3 == null) + + " arg4? " + (arg4 == null)); + } + + public static final void notNull(final Object[] args) { + if (args == null) + throw new IllegalArgumentException(NULL_ARG_MSG + ":arg array is null"); + + for (int i = 0; i < args.length; i++) + if (args[i] == null) + throw new IllegalArgumentException(NULL_ARG_MSG + ":arg" + i + " is null"); + } + + public static final void strictlyPositive(final int i) { + if (i <= 0) + throw new IllegalArgumentException("integer should be > 0, was " + i); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/BadArgumentException.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/BadArgumentException.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/BadArgumentException.java new file mode 100644 index 0000000..c2f79ba --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/BadArgumentException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.util.regex.PatternSyntaxException; + +public final class BadArgumentException extends PatternSyntaxException { + private static final long serialVersionUID = 1L; + + public BadArgumentException(String desc, String badarg, int index) { + super(desc, badarg, index); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/BinaryTree.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/BinaryTree.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/BinaryTree.java new file mode 100644 index 0000000..0dbab6d --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/BinaryTree.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +/** + * A class for storing data as a binary tree. This class does not implement Collection as several methods such as add are not appropriate. + * + * A tree is represented as a node with a parent, a left child, and a right child. If the parent is null, this node represents the root of the tree. A node has + * contents of type T. + * + */ + +public class BinaryTree { + private BinaryTree parent; + private BinaryTree left; + private BinaryTree right; + + T contents; + + public BinaryTree getLeft() { + return left; + } + + public void setLeft(BinaryTree left) { + left.setParent(this); + this.left = left; + } + + public BinaryTree getParent() { + return parent; + } + + public void setParent(BinaryTree parent) { + this.parent = parent; + } + + public BinaryTree getRight() { + return right; + } + + public void setRight(BinaryTree right) { + right.setParent(this); + this.right = right; + } + + public T getContents() { + return contents; + } + + public void setContents(T contents) { + this.contents = contents; + } + + public boolean isEmpty() { + if (parent == null && left == null && right == null && contents == null) + return true; + return false; + } + + @Override + public String toString() { + String out = "["; + if (left != null) + out += left.toString(); + out += contents; + if (right != null) + out += right.toString(); + out += "]"; + return out; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/BulkImportHelper.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/BulkImportHelper.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/BulkImportHelper.java new file mode 100644 index 0000000..97459ff --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/BulkImportHelper.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +public class BulkImportHelper { + public static class AssignmentStats { + public AssignmentStats() {} + + public String toString() { + return "Bulk Import no longer provides statistics"; + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArrayBackedCharSequence.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArrayBackedCharSequence.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArrayBackedCharSequence.java new file mode 100644 index 0000000..f64b36c --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArrayBackedCharSequence.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import org.apache.accumulo.core.data.ByteSequence; + +public class ByteArrayBackedCharSequence implements CharSequence { + + private byte[] data; + private int offset; + private int len; + + public ByteArrayBackedCharSequence(byte[] data, int offset, int len) { + this.data = data; + this.offset = offset; + this.len = len; + } + + public ByteArrayBackedCharSequence(byte[] data) { + this(data, 0, data.length); + } + + public ByteArrayBackedCharSequence() { + this(null, 0, 0); + } + + public void set(byte[] data, int offset, int len) { + this.data = data; + this.offset = offset; + this.len = len; + } + + @Override + public char charAt(int index) { + return (char) (0xff & data[offset + index]); + } + + @Override + public int length() { + return len; + } + + @Override + public CharSequence subSequence(int start, int end) { + return new ByteArrayBackedCharSequence(data, offset + start, end - start); + } + + public String toString() { + return new String(data, offset, len); + } + + public void set(ByteSequence bs) { + set(bs.getBackingArray(), bs.offset(), bs.length()); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArrayComparator.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArrayComparator.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArrayComparator.java new file mode 100644 index 0000000..97b17ee --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArrayComparator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * + */ +package org.apache.accumulo.core.util; + +import java.io.Serializable; +import java.util.Comparator; + +public class ByteArrayComparator implements Comparator, Serializable { + private static final long serialVersionUID = 1L; + + @Override + public int compare(byte[] o1, byte[] o2) { + + int minLen = Math.min(o1.length, o2.length); + + for (int i = 0; i < minLen; i++) { + int a = (o1[i] & 0xff); + int b = (o2[i] & 0xff); + + if (a != b) { + return a - b; + } + } + + return o1.length - o2.length; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArraySet.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArraySet.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArraySet.java new file mode 100644 index 0000000..68f0ae5 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteArraySet.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.TreeSet; + +public class ByteArraySet extends TreeSet { + + private static final long serialVersionUID = 1L; + + public ByteArraySet() { + super(new ByteArrayComparator()); + } + + public ByteArraySet(Collection c) { + this(); + addAll(c); + } + + public static ByteArraySet fromStrings(Collection c) { + List lst = new ArrayList(); + for (String s : c) + lst.add(s.getBytes()); + return new ByteArraySet(lst); + } + + public static ByteArraySet fromStrings(String... c) { + return ByteArraySet.fromStrings(Arrays.asList(c)); + } + + public List toList() { + return new ArrayList(this); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java new file mode 100644 index 0000000..6fe4423 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.io.Text; + +public class ByteBufferUtil { + public static byte[] toBytes(ByteBuffer buffer) { + if (buffer == null) + return null; + return Arrays.copyOfRange(buffer.array(), buffer.position(), buffer.remaining()); + } + + public static List toByteBuffers(Collection bytesList) { + if (bytesList == null) + return null; + ArrayList result = new ArrayList(); + for (byte[] bytes : bytesList) { + result.add(ByteBuffer.wrap(bytes)); + } + return result; + } + + public static List toBytesList(Collection bytesList) { + if (bytesList == null) + return null; + ArrayList result = new ArrayList(); + for (ByteBuffer bytes : bytesList) { + result.add(toBytes(bytes)); + } + return result; + } + + public static Text toText(ByteBuffer bytes) { + if (bytes == null) + return null; + Text result = new Text(); + result.set(bytes.array(), bytes.position(), bytes.remaining()); + return result; + } + + public static String toString(ByteBuffer bytes) { + return new String(bytes.array(), bytes.position(), bytes.remaining()); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/CachedConfiguration.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/CachedConfiguration.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/CachedConfiguration.java new file mode 100644 index 0000000..daf165c --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/CachedConfiguration.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import org.apache.hadoop.conf.Configuration; + +public class CachedConfiguration { + private static Configuration configuration = null; + + public synchronized static Configuration getInstance() { + if (configuration == null) + setInstance(new Configuration()); + return configuration; + } + + public synchronized static Configuration setInstance(Configuration update) { + Configuration result = configuration; + configuration = update; + return result; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/ColumnFQ.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/ColumnFQ.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/ColumnFQ.java new file mode 100644 index 0000000..2bafcb1 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/ColumnFQ.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * + */ +package org.apache.accumulo.core.util; + +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.data.Column; +import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.Text; + +public class ColumnFQ implements Comparable { + private Text colf; + private Text colq; + + public ColumnFQ(Text colf, Text colq) { + if (colf == null || colq == null) { + throw new IllegalArgumentException(); + } + + this.colf = colf; + this.colq = colq; + } + + public ColumnFQ(Key k) { + this(k.getColumnFamily(), k.getColumnQualifier()); + } + + public ColumnFQ(ColumnUpdate cu) { + this(new Text(cu.getColumnFamily()), new Text(cu.getColumnQualifier())); + } + + public Text getColumnQualifier() { + return colq; + } + + public Text getColumnFamily() { + return colf; + } + + public Column toColumn() { + return new Column(TextUtil.getBytes(colf), TextUtil.getBytes(colq), null); + } + + public void fetch(ScannerBase sb) { + sb.fetchColumn(colf, colq); + } + + public void put(Mutation m, Value v) { + m.put(colf, colq, v); + } + + public void putDelete(Mutation m) { + m.putDelete(colf, colq); + } + + /** + * @deprecated since 1.5, use {@link #fetch(ScannerBase)} instead + */ + public static void fetch(ScannerBase sb, ColumnFQ cfq) { + sb.fetchColumn(cfq.colf, cfq.colq); + } + + /** + * @deprecated since 1.5, use {@link #put(Mutation, Value)} instead + */ + public static void put(Mutation m, ColumnFQ cfq, Value v) { + m.put(cfq.colf, cfq.colq, v); + } + + /** + * @deprecated since 1.5, use {@link #putDelete(Mutation)} instead + */ + public static void putDelete(Mutation m, ColumnFQ cfq) { + m.putDelete(cfq.colf, cfq.colq); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ColumnFQ)) + return false; + if (this == o) + return true; + ColumnFQ ocfq = (ColumnFQ) o; + return ocfq.colf.equals(colf) && ocfq.colq.equals(colq); + } + + @Override + public int hashCode() { + return colf.hashCode() + colq.hashCode(); + } + + public boolean hasColumns(Key key) { + return key.compareColumnFamily(colf) == 0 && key.compareColumnQualifier(colq) == 0; + } + + public boolean equals(Text colf, Text colq) { + return this.colf.equals(colf) && this.colq.equals(colq); + } + + @Override + public int compareTo(ColumnFQ o) { + int cmp = colf.compareTo(o.colf); + + if (cmp == 0) + cmp = colq.compareTo(o.colq); + + return cmp; + } + + @Override + public String toString() { + return colf + ":" + colq; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java new file mode 100644 index 0000000..5a1c2ef --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/ContextFactory.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.util; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.StatusReporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +/** + * A factory to allow applications to deal with inconsistencies between MapReduce Context Objects API between hadoop-0.20 and later versions. This code is based + * on org.apache.hadoop.mapreduce.ContextFactory in hadoop-mapred-0.22.0. + */ +public class ContextFactory { + + private static final Constructor JOB_CONTEXT_CONSTRUCTOR; + private static final Constructor TASK_CONTEXT_CONSTRUCTOR; + private static final Constructor TASK_ID_CONSTRUCTOR; + private static final Constructor MAP_CONSTRUCTOR; + private static final Constructor MAP_CONTEXT_CONSTRUCTOR; + private static final Constructor MAP_CONTEXT_IMPL_CONSTRUCTOR; + private static final Class TASK_TYPE_CLASS; + private static final boolean useV21; + + static { + boolean v21 = true; + final String PACKAGE = "org.apache.hadoop.mapreduce"; + try { + Class.forName(PACKAGE + ".task.JobContextImpl"); + } catch (ClassNotFoundException cnfe) { + v21 = false; + } + useV21 = v21; + Class jobContextCls; + Class taskContextCls; + Class mapCls; + Class mapContextCls; + Class innerMapContextCls; + try { + if (v21) { + jobContextCls = Class.forName(PACKAGE + ".task.JobContextImpl"); + taskContextCls = Class.forName(PACKAGE + ".task.TaskAttemptContextImpl"); + TASK_TYPE_CLASS = Class.forName(PACKAGE + ".TaskType"); + mapContextCls = Class.forName(PACKAGE + ".task.MapContextImpl"); + mapCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper"); + innerMapContextCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper$Context"); + } else { + jobContextCls = Class.forName(PACKAGE + ".JobContext"); + taskContextCls = Class.forName(PACKAGE + ".TaskAttemptContext"); + TASK_TYPE_CLASS = null; + mapContextCls = Class.forName(PACKAGE + ".MapContext"); + mapCls = Class.forName(PACKAGE + ".Mapper"); + innerMapContextCls = Class.forName(PACKAGE + ".Mapper$Context"); + } + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Can't find class", e); + } + try { + JOB_CONTEXT_CONSTRUCTOR = jobContextCls.getConstructor(Configuration.class, JobID.class); + JOB_CONTEXT_CONSTRUCTOR.setAccessible(true); + TASK_CONTEXT_CONSTRUCTOR = taskContextCls.getConstructor(Configuration.class, TaskAttemptID.class); + TASK_CONTEXT_CONSTRUCTOR.setAccessible(true); + if (useV21) { + TASK_ID_CONSTRUCTOR = TaskAttemptID.class.getConstructor(String.class, int.class, TASK_TYPE_CLASS, int.class, int.class); + TASK_ID_CONSTRUCTOR.setAccessible(true); + MAP_CONSTRUCTOR = mapCls.getConstructor(); + MAP_CONTEXT_CONSTRUCTOR = innerMapContextCls.getConstructor(mapCls, MapContext.class); + MAP_CONTEXT_IMPL_CONSTRUCTOR = mapContextCls.getDeclaredConstructor(Configuration.class, TaskAttemptID.class, RecordReader.class, RecordWriter.class, + OutputCommitter.class, StatusReporter.class, InputSplit.class); + MAP_CONTEXT_IMPL_CONSTRUCTOR.setAccessible(true); + } else { + TASK_ID_CONSTRUCTOR = TaskAttemptID.class.getConstructor(String.class, int.class, boolean.class, int.class, int.class); + TASK_ID_CONSTRUCTOR.setAccessible(true); + MAP_CONSTRUCTOR = null; + MAP_CONTEXT_CONSTRUCTOR = innerMapContextCls.getConstructor(mapCls, Configuration.class, TaskAttemptID.class, RecordReader.class, RecordWriter.class, + OutputCommitter.class, StatusReporter.class, InputSplit.class); + MAP_CONTEXT_IMPL_CONSTRUCTOR = null; + } + MAP_CONTEXT_CONSTRUCTOR.setAccessible(true); + } catch (SecurityException e) { + throw new IllegalArgumentException("Can't run constructor ", e); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("Can't find constructor ", e); + } + } + + public static JobContext createJobContext() { + return createJobContext(new Configuration()); + } + + public static JobContext createJobContext(Configuration conf) { + try { + return (JobContext) JOB_CONTEXT_CONSTRUCTOR.newInstance(conf, new JobID("local", 0)); + } catch (InstantiationException e) { + throw new IllegalArgumentException("Can't create object", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Can't create object", e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Can't create object", e); + } + } + + public static TaskAttemptContext createTaskAttemptContext(JobContext job) { + return createTaskAttemptContext(job.getConfiguration()); + } + + public static TaskAttemptContext createTaskAttemptContext(Configuration conf) { + try { + if (useV21) + return (TaskAttemptContext) TASK_CONTEXT_CONSTRUCTOR.newInstance(conf, + TASK_ID_CONSTRUCTOR.newInstance("local", 0, TASK_TYPE_CLASS.getEnumConstants()[0], 0, 0)); + else + return (TaskAttemptContext) TASK_CONTEXT_CONSTRUCTOR.newInstance(conf, TASK_ID_CONSTRUCTOR.newInstance("local", 0, true, 0, 0)); + } catch (InstantiationException e) { + throw new IllegalArgumentException("Can't create object", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Can't create object", e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Can't create object", e); + } + } + + public static Mapper.Context createMapContext(Mapper m, TaskAttemptContext tac, RecordReader reader, + RecordWriter writer, InputSplit split) { + return createMapContext(m, tac, reader, writer, null, null, split); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + public static Mapper.Context createMapContext(Mapper m, TaskAttemptContext tac, RecordReader reader, + RecordWriter writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) { + try { + if (useV21) { + Object basis = MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, committer, reporter, split); + return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance((Mapper) MAP_CONSTRUCTOR.newInstance(), basis); + } else { + return (Mapper.Context) MAP_CONTEXT_CONSTRUCTOR.newInstance(m, tac.getConfiguration(), tac.getTaskAttemptID(), reader, writer, committer, reporter, + split); + } + } catch (InstantiationException e) { + throw new IllegalArgumentException("Can't create object", e); + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Can't create object", e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Can't create object", e); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/Daemon.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/Daemon.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/Daemon.java new file mode 100644 index 0000000..7ce46eb --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/Daemon.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +public class Daemon extends Thread { + + public Daemon() { + setDaemon(true); + } + + public Daemon(Runnable target) { + super(target); + setDaemon(true); + } + + public Daemon(String name) { + super(name); + setDaemon(true); + } + + public Daemon(ThreadGroup group, Runnable target) { + super(group, target); + setDaemon(true); + } + + public Daemon(ThreadGroup group, String name) { + super(group, name); + setDaemon(true); + } + + public Daemon(Runnable target, String name) { + super(target, name); + setDaemon(true); + } + + public Daemon(ThreadGroup group, Runnable target, String name) { + super(group, target, name); + setDaemon(true); + } + + public Daemon(ThreadGroup group, Runnable target, String name, long stackSize) { + super(group, target, name, stackSize); + setDaemon(true); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/Duration.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/Duration.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/Duration.java new file mode 100644 index 0000000..91ae089 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/Duration.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +public class Duration { + + public static String format(long time) { + return format(time, " "); + } + + public static String format(long time, String space) { + return format(time, space, "—"); + } + + public static String format(long time, String space, String zero) { + long ms, sec, min, hr, day, yr; + ms = sec = min = hr = day = yr = -1; + if (time == 0) + return zero; + ms = time % 1000; + time /= 1000; + if (time == 0) + return String.format("%dms", ms); + sec = time % 60; + time /= 60; + if (time == 0) + return String.format("%ds" + space + "%dms", sec, ms); + min = time % 60; + time /= 60; + if (time == 0) + return String.format("%dm" + space + "%ds", min, sec); + hr = time % 24; + time /= 24; + if (time == 0) + return String.format("%dh" + space + "%dm", hr, min); + day = time % 365; + time /= 365; + if (time == 0) + return String.format("%dd" + space + "%dh", day, hr); + yr = time; + return String.format("%dy" + space + "%dd", yr, day); + + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/Encoding.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/Encoding.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/Encoding.java new file mode 100644 index 0000000..1c8cb5d --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/Encoding.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.io.Text; + +public class Encoding { + + public static String encodeAsBase64FileName(Text data) { + String encodedRow = new String(Base64.encodeBase64(TextUtil.getBytes(data))); + encodedRow = encodedRow.replace('/', '_').replace('+', '-'); + + int index = encodedRow.length() - 1; + while (index >= 0 && encodedRow.charAt(index) == '=') + index--; + + encodedRow = encodedRow.substring(0, index + 1); + return encodedRow; + } + + public static byte[] decodeBase64FileName(String node) { + while (node.length() % 4 != 0) + node += "="; + + node = node.replace('_', '/').replace('-', '+'); + + return Base64.decodeBase64(node.getBytes()); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java new file mode 100644 index 0000000..b9959f8 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * + */ +package org.apache.accumulo.core.util; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Column; +import org.apache.hadoop.io.Text; + +public class LocalityGroupUtil { + + // private static final Logger log = Logger.getLogger(ColumnFamilySet.class); + + public static Set EMPTY_CF_SET = Collections.emptySet(); + + public static Set families(Collection columns) { + Set result = new HashSet(columns.size()); + for (Column col : columns) { + result.add(new ArrayByteSequence(col.getColumnFamily())); + } + return result; + } + + @SuppressWarnings("serial") + static public class LocalityGroupConfigurationError extends AccumuloException { + LocalityGroupConfigurationError(String why) { + super(why); + } + } + + public static Map> getLocalityGroups(AccumuloConfiguration acuconf) throws LocalityGroupConfigurationError { + Map> result = new HashMap>(); + String[] groups = acuconf.get(Property.TABLE_LOCALITY_GROUPS).split(","); + for (String group : groups) { + if (group.length() > 0) + result.put(group, new HashSet()); + } + HashSet all = new HashSet(); + for (Entry entry : acuconf) { + String property = entry.getKey(); + String value = entry.getValue(); + String prefix = Property.TABLE_LOCALITY_GROUP_PREFIX.getKey(); + if (property.startsWith(prefix)) { + // this property configures a locality group, find out which one: + String group = property.substring(prefix.length()); + String[] parts = group.split("\\."); + group = parts[0]; + if (result.containsKey(group)) { + if (parts.length == 1) { + Set colFamsSet = decodeColumnFamilies(value); + if (!Collections.disjoint(all, colFamsSet)) { + colFamsSet.retainAll(all); + throw new LocalityGroupConfigurationError("Column families " + colFamsSet + " in group " + group + " is already used by another locality group"); + } + + all.addAll(colFamsSet); + result.put(group, colFamsSet); + } + } + } + } + // result.put("", all); + return result; + } + + public static Set decodeColumnFamilies(String colFams) throws LocalityGroupConfigurationError { + HashSet colFamsSet = new HashSet(); + + for (String family : colFams.split(",")) { + ByteSequence cfbs = decodeColumnFamily(family); + colFamsSet.add(cfbs); + } + + return colFamsSet; + } + + public static ByteSequence decodeColumnFamily(String colFam) throws LocalityGroupConfigurationError { + byte output[] = new byte[colFam.length()]; + int pos = 0; + + for (int i = 0; i < colFam.length(); i++) { + char c = colFam.charAt(i); + + if (c == '\\') { + // next char must be 'x' or '\' + i++; + + if (i >= colFam.length()) { + throw new LocalityGroupConfigurationError("Expected 'x' or '\' after '\' in " + colFam); + } + + char nc = colFam.charAt(i); + + switch (nc) { + case '\\': + output[pos++] = '\\'; + break; + case 'x': + // next two chars must be [0-9][0-9] + i++; + output[pos++] = (byte) (0xff & Integer.parseInt(colFam.substring(i, i + 2), 16)); + i++; + break; + default: + throw new LocalityGroupConfigurationError("Expected 'x' or '\' after '\' in " + colFam); + } + } else { + output[pos++] = (byte) (0xff & c); + } + + } + + return new ArrayByteSequence(output, 0, pos); + + } + + public static String encodeColumnFamilies(Set colFams) { + HashSet ecfs = new HashSet(); + + StringBuilder sb = new StringBuilder(); + + for (Text text : colFams) { + String ecf = encodeColumnFamily(sb, text.getBytes(), 0, text.getLength()); + ecfs.add(ecf); + } + + return StringUtil.join(ecfs, ","); + } + + public static String encodeColumnFamily(ByteSequence bs) { + return encodeColumnFamily(new StringBuilder(), bs.getBackingArray(), bs.offset(), bs.length()); + } + + private static String encodeColumnFamily(StringBuilder sb, byte[] ba, int offset, int len) { + sb.setLength(0); + + for (int i = 0; i < len; i++) { + int c = 0xff & ba[i]; + if (c == '\\') + sb.append("\\\\"); + else if (c >= 32 && c <= 126 && c != ',') + sb.append((char) c); + else + sb.append("\\x").append(String.format("%02X", c)); + } + + String ecf = sb.toString(); + return ecf; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/LoggingRunnable.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/LoggingRunnable.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/LoggingRunnable.java new file mode 100644 index 0000000..87fa319 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/LoggingRunnable.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.util.Date; + +import org.apache.log4j.Logger; + +public class LoggingRunnable implements Runnable { + private Runnable runnable; + private Logger log; + + public LoggingRunnable(Logger log, Runnable r) { + this.runnable = r; + this.log = log; + } + + public void run() { + try { + runnable.run(); + } catch (Throwable t) { + try { + log.error("Thread \"" + Thread.currentThread().getName() + "\" died " + t.getMessage(), t); + } catch (Throwable t2) { + // maybe the logging system is screwed up OR there is a bug in the exception, like t.getMessage() throws a NPE + System.err.println("ERROR " + new Date() + " Failed to log message about thread death " + t2.getMessage()); + t2.printStackTrace(); + + // try to print original exception + System.err.println("ERROR " + new Date() + " Exception that failed to log : " + t.getMessage()); + t.printStackTrace(); + } + } + } + + public static void main(String[] args) { + Runnable r = new Runnable() { + @Override + public void run() { + int x[] = new int[0]; + + x[0]++; + } + }; + + LoggingRunnable lr = new LoggingRunnable(null, r); + lr.run(); + + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/Merge.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/Merge.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/Merge.java new file mode 100644 index 0000000..33c25eb --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/Merge.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Value; +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +public class Merge { + + public static class MergeException extends Exception { + private static final long serialVersionUID = 1L; + + MergeException(Exception ex) { + super(ex); + } + }; + + private static final Logger log = Logger.getLogger(Merge.class); + + protected void message(String format, Object... args) { + log.info(String.format(format, args)); + } + + public void start(String[] args) throws MergeException, ParseException { + String keepers = "localhost"; + String instance = "instance"; + String table = null; + long goalSize = -1; + String user = "root"; + byte[] password = "secret".getBytes(); + boolean force = false; + Text begin = null; + Text end = null; + + Options options = new Options(); + options.addOption("k", "keepers", true, "ZooKeeper list"); + options.addOption("i", "instance", true, "instance name"); + options.addOption("t", "table", true, "table to merge"); + options.addOption("s", "size", true, "merge goal size"); + options.addOption("u", "user", true, "user"); + options.addOption("p", "password", true, "password"); + options.addOption("f", "force", false, "merge small tablets even if merging them to larger tablets might cause a split"); + options.addOption("b", "begin", true, "start tablet"); + options.addOption("e", "end", true, "end tablet"); + CommandLine commandLine = new BasicParser().parse(options, args); + if (commandLine.hasOption("k")) { + keepers = commandLine.getOptionValue("k"); + } + if (commandLine.hasOption("i")) { + instance = commandLine.getOptionValue("i"); + } + if (commandLine.hasOption("t")) { + table = commandLine.getOptionValue("t"); + } + if (commandLine.hasOption("s")) { + goalSize = AccumuloConfiguration.getMemoryInBytes(commandLine.getOptionValue("s")); + } + if (commandLine.hasOption("u")) { + table = commandLine.getOptionValue("u"); + } + if (commandLine.hasOption("p")) { + password = commandLine.getOptionValue("p").getBytes(); + } + if (commandLine.hasOption("f")) { + force = true; + } + if (commandLine.hasOption("b")) { + begin = new Text(commandLine.getOptionValue("b")); + } + if (commandLine.hasOption("e")) { + end = new Text(commandLine.getOptionValue("e")); + } + if (table == null) { + System.err.println("Specify the table to merge"); + return; + } + Instance zki = new ZooKeeperInstance(instance, keepers); + try { + Connector conn = zki.getConnector(user, password); + + if (!conn.tableOperations().exists(table)) { + System.err.println("table " + table + " does not exist"); + return; + } + if (goalSize < 1) { + AccumuloConfiguration tableConfig = new ConfigurationCopy(conn.tableOperations().getProperties(table)); + goalSize = tableConfig.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD); + } + + message("Merging tablets in table %s to %d bytes", table, goalSize); + mergomatic(conn, table, begin, end, goalSize, force); + } catch (Exception ex) { + throw new MergeException(ex); + } + } + + public static void main(String[] args) throws MergeException, ParseException { + Merge merge = new Merge(); + merge.start(args); + } + + public static class Size { + public Size(KeyExtent extent, long size) { + this.extent = extent; + this.size = size; + } + + KeyExtent extent; + long size; + } + + public void mergomatic(Connector conn, String table, Text start, Text end, long goalSize, boolean force) throws MergeException { + try { + if (table.equals(Constants.METADATA_TABLE_NAME)) { + throw new IllegalArgumentException("cannot merge tablets on the metadata table"); + } + List sizes = new ArrayList(); + long totalSize = 0; + // Merge any until you get larger than the goal size, and then merge one less tablet + Iterator sizeIterator = getSizeIterator(conn, table, start, end); + while (sizeIterator.hasNext()) { + Size next = sizeIterator.next(); + totalSize += next.size; + sizes.add(next); + if (totalSize > goalSize) { + totalSize = mergeMany(conn, table, sizes, goalSize, force, false); + } + } + if (sizes.size() > 1) + mergeMany(conn, table, sizes, goalSize, force, true); + } catch (Exception ex) { + throw new MergeException(ex); + } + } + + protected long mergeMany(Connector conn, String table, List sizes, long goalSize, boolean force, boolean last) throws MergeException { + // skip the big tablets, which will be the typical case + while (!sizes.isEmpty()) { + if (sizes.get(0).size < goalSize) + break; + sizes.remove(0); + } + if (sizes.isEmpty()) { + return 0; + } + + // collect any small ones + long mergeSize = 0; + int numToMerge = 0; + for (int i = 0; i < sizes.size(); i++) { + if (mergeSize + sizes.get(i).size > goalSize) { + numToMerge = i; + break; + } + mergeSize += sizes.get(i).size; + } + + if (numToMerge > 1) { + mergeSome(conn, table, sizes, numToMerge); + } else { + if (numToMerge == 1 && sizes.size() > 1) { + // here we have the case of a merge candidate that is surrounded by candidates that would split + if (force) { + mergeSome(conn, table, sizes, 2); + } else { + sizes.remove(0); + } + } + } + if (numToMerge == 0 && sizes.size() > 1 && last) { + // That's the last tablet, and we have a bunch to merge + mergeSome(conn, table, sizes, sizes.size()); + } + long result = 0; + for (Size s : sizes) { + result += s.size; + } + return result; + } + + protected void mergeSome(Connector conn, String table, List sizes, int numToMerge) throws MergeException { + merge(conn, table, sizes, numToMerge); + for (int i = 0; i < numToMerge; i++) { + sizes.remove(0); + } + } + + protected void merge(Connector conn, String table, List sizes, int numToMerge) throws MergeException { + try { + Text start = sizes.get(0).extent.getPrevEndRow(); + Text end = sizes.get(numToMerge - 1).extent.getEndRow(); + message("Merging %d tablets from (%s to %s]", numToMerge, start == null ? "-inf" : start, end == null ? "+inf" : end); + conn.tableOperations().merge(table, start, end); + } catch (Exception ex) { + throw new MergeException(ex); + } + } + + protected Iterator getSizeIterator(Connector conn, String tablename, Text start, Text end) throws MergeException { + // open up the !METADATA table, walk through the tablets. + String tableId; + Scanner scanner; + try { + tableId = Tables.getTableId(conn.getInstance(), tablename); + scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); + } catch (Exception e) { + throw new MergeException(e); + } + scanner.setRange(new KeyExtent(new Text(tableId), end, start).toMetadataRange()); + scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY); + Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner); + final Iterator> iterator = scanner.iterator(); + + Iterator result = new Iterator() { + Size next = fetch(); + + @Override + public boolean hasNext() { + return next != null; + } + + private Size fetch() { + long tabletSize = 0; + while (iterator.hasNext()) { + Entry entry = iterator.next(); + Key key = entry.getKey(); + if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) { + String[] sizeEntries = new String(entry.getValue().get()).split(","); + if (sizeEntries.length == 2) { + tabletSize += Long.parseLong(sizeEntries[0]); + } + } else if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) { + KeyExtent extent = new KeyExtent(key.getRow(), entry.getValue()); + return new Size(extent, tabletSize); + } + } + return null; + } + + @Override + public Size next() { + Size result = next; + next = fetch(); + return result; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + return result; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/MetadataTable.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/MetadataTable.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/MetadataTable.java new file mode 100644 index 0000000..e440ee3 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/MetadataTable.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.impl.ScannerImpl; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.thrift.AuthInfo; +import org.apache.hadoop.io.Text; + +public class MetadataTable { + public static class DataFileValue { + private long size; + private long numEntries; + private long time = -1; + + public DataFileValue(long size, long numEntries, long time) { + this.size = size; + this.numEntries = numEntries; + this.time = time; + } + + public DataFileValue(long size, long numEntries) { + this.size = size; + this.numEntries = numEntries; + this.time = -1; + } + + public DataFileValue(byte[] encodedDFV) { + String[] ba = new String(encodedDFV).split(","); + + size = Long.parseLong(ba[0]); + numEntries = Long.parseLong(ba[1]); + + if (ba.length == 3) + time = Long.parseLong(ba[2]); + else + time = -1; + } + + public long getSize() { + return size; + } + + public long getNumEntries() { + return numEntries; + } + + public boolean isTimeSet() { + return time >= 0; + } + + public long getTime() { + return time; + } + + public byte[] encode() { + if (time >= 0) + return ("" + size + "," + numEntries + "," + time).getBytes(); + return ("" + size + "," + numEntries).getBytes(); + } + + public boolean equals(Object o) { + if (o instanceof DataFileValue) { + DataFileValue odfv = (DataFileValue) o; + + return size == odfv.size && numEntries == odfv.numEntries; + } + + return false; + } + + public int hashCode() { + return Long.valueOf(size + numEntries).hashCode(); + } + + public String toString() { + return size + " " + numEntries; + } + + public void setTime(long time) { + if (time < 0) + throw new IllegalArgumentException(); + this.time = time; + } + } + + public static SortedMap getMetadataLocationEntries(SortedMap entries) { + Key key; + Value val; + Text location = null; + Value prevRow = null; + KeyExtent ke; + + SortedMap results = new TreeMap(); + + Text lastRowFromKey = new Text(); + + // text obj below is meant to be reused in loop for efficiency + Text colf = new Text(); + Text colq = new Text(); + + for (Entry entry : entries.entrySet()) { + key = entry.getKey(); + val = entry.getValue(); + + if (key.compareRow(lastRowFromKey) != 0) { + prevRow = null; + location = null; + key.getRow(lastRowFromKey); + } + + colf = key.getColumnFamily(colf); + colq = key.getColumnQualifier(colq); + + // interpret the row id as a key extent + if (colf.equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY) || colf.equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY)) + location = new Text(val.toString()); + else if (Constants.METADATA_PREV_ROW_COLUMN.equals(colf, colq)) + prevRow = new Value(val); + + if (location != null && prevRow != null) { + ke = new KeyExtent(key.getRow(), prevRow); + results.put(ke, location); + + location = null; + prevRow = null; + } + } + return results; + } + + public static SortedMap> getTabletEntries(Instance instance, KeyExtent ke, List columns, AuthInfo credentials) { + TreeMap tkv = new TreeMap(); + getTabletAndPrevTabletKeyValues(instance, tkv, ke, columns, credentials); + return getTabletEntries(tkv, columns); + } + + public static SortedMap> getTabletEntries(SortedMap tabletKeyValues, List columns) { + TreeMap> tabletEntries = new TreeMap>(); + + HashSet colSet = null; + if (columns != null) { + colSet = new HashSet(columns); + } + + for (Entry entry : tabletKeyValues.entrySet()) { + + if (columns != null && !colSet.contains(new ColumnFQ(entry.getKey()))) { + continue; + } + + Text row = entry.getKey().getRow(); + + SortedMap colVals = tabletEntries.get(row); + if (colVals == null) { + colVals = new TreeMap(); + tabletEntries.put(row, colVals); + } + + colVals.put(new ColumnFQ(entry.getKey()), entry.getValue()); + } + + return tabletEntries; + } + + public static void getTabletAndPrevTabletKeyValues(Instance instance, SortedMap tkv, KeyExtent ke, List columns, AuthInfo credentials) { + Text startRow; + Text endRow = ke.getMetadataEntry(); + + if (ke.getPrevEndRow() == null) { + startRow = new Text(KeyExtent.getMetadataEntry(ke.getTableId(), new Text())); + } else { + startRow = new Text(KeyExtent.getMetadataEntry(ke.getTableId(), ke.getPrevEndRow())); + } + + Scanner scanner = new ScannerImpl(instance, credentials, Constants.METADATA_TABLE_ID, Constants.NO_AUTHS); + + if (columns != null) { + for (ColumnFQ column : columns) + column.fetch(scanner); + } + + scanner.setRange(new Range(new Key(startRow), true, new Key(endRow).followingKey(PartialKey.ROW), false)); + + tkv.clear(); + boolean successful = false; + try { + for (Entry entry : scanner) { + tkv.put(entry.getKey(), entry.getValue()); + } + successful = true; + } finally { + if (!successful) { + tkv.clear(); + } + } + } + + public static void getEntries(Instance instance, AuthInfo credentials, String table, boolean isTid, Map locations, + SortedSet tablets) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + String tableId = isTid ? table : Tables.getNameToIdMap(instance).get(table); + + Scanner scanner = instance.getConnector(credentials.user, credentials.password).createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); + + Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner); + scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY); + + // position at first entry in metadata table for given table + KeyExtent ke = new KeyExtent(new Text(tableId), new Text(), null); + Key startKey = new Key(ke.getMetadataEntry()); + ke = new KeyExtent(new Text(tableId), null, null); + Key endKey = new Key(ke.getMetadataEntry()).followingKey(PartialKey.ROW); + scanner.setRange(new Range(startKey, endKey)); + + Text colf = new Text(); + Text colq = new Text(); + + KeyExtent currentKeyExtent = null; + String location = null; + Text row = null; + // acquire this tables METADATA table entries + boolean haveExtent = false; + boolean haveLocation = false; + for (Entry entry : scanner) { + if (row != null) { + if (!row.equals(entry.getKey().getRow())) { + currentKeyExtent = null; + haveExtent = false; + haveLocation = false; + row = entry.getKey().getRow(); + } + } else + row = entry.getKey().getRow(); + + colf = entry.getKey().getColumnFamily(colf); + colq = entry.getKey().getColumnQualifier(colq); + + // stop scanning metadata table when another table is reached + if (!(new KeyExtent(entry.getKey().getRow(), (Text) null)).getTableId().toString().equals(tableId)) + break; + + if (Constants.METADATA_PREV_ROW_COLUMN.equals(colf, colq)) { + currentKeyExtent = new KeyExtent(entry.getKey().getRow(), entry.getValue()); + tablets.add(currentKeyExtent); + haveExtent = true; + } else if (colf.equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)) { + location = entry.getValue().toString(); + haveLocation = true; + } + + if (haveExtent && haveLocation) { + locations.put(currentKeyExtent, location); + haveExtent = false; + haveLocation = false; + currentKeyExtent = null; + } + } + + validateEntries(tableId, tablets); + } + + public static void validateEntries(String tableId, SortedSet tablets) throws AccumuloException { + // sanity check of metadata table entries + // make sure tablets has no holes, and that it starts and ends w/ null + if (tablets.size() == 0) + throw new AccumuloException("No entries found in metadata table for table " + tableId); + + if (tablets.first().getPrevEndRow() != null) + throw new AccumuloException("Problem with metadata table, first entry for table " + tableId + "- " + tablets.first() + " - has non null prev end row"); + + if (tablets.last().getEndRow() != null) + throw new AccumuloException("Problem with metadata table, last entry for table " + tableId + "- " + tablets.first() + " - has non null end row"); + + Iterator tabIter = tablets.iterator(); + Text lastEndRow = tabIter.next().getEndRow(); + while (tabIter.hasNext()) { + KeyExtent tabke = tabIter.next(); + + if (tabke.getPrevEndRow() == null) + throw new AccumuloException("Problem with metadata table, it has null prev end row in middle of table " + tabke); + + if (!tabke.getPrevEndRow().equals(lastEndRow)) + throw new AccumuloException("Problem with metadata table, it has a hole " + tabke.getPrevEndRow() + " != " + lastEndRow); + + lastEndRow = tabke.getEndRow(); + } + + // end METADATA table sanity check + } + + public static boolean isContiguousRange(KeyExtent ke, SortedSet children) { + if (children.size() == 0) + return false; + + if (children.size() == 1) + return children.first().equals(ke); + + Text per = children.first().getPrevEndRow(); + Text er = children.last().getEndRow(); + + boolean perEqual = (per == ke.getPrevEndRow() || per != null && ke.getPrevEndRow() != null && ke.getPrevEndRow().compareTo(per) == 0); + + boolean erEqual = (er == ke.getEndRow() || er != null && ke.getEndRow() != null && ke.getEndRow().compareTo(er) == 0); + + if (!perEqual || !erEqual) + return false; + + Iterator iter = children.iterator(); + + Text lastEndRow = iter.next().getEndRow(); + + while (iter.hasNext()) { + KeyExtent cke = iter.next(); + + per = cke.getPrevEndRow(); + + // something in the middle should not be null + + if (per == null || lastEndRow == null) + return false; + + if (per.compareTo(lastEndRow) != 0) + return false; + + lastEndRow = cke.getEndRow(); + } + + return true; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java new file mode 100644 index 0000000..7c67a1a --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.accumulo.cloudtrace.instrument.TraceRunnable; +import org.apache.log4j.Logger; + +public class NamingThreadFactory implements ThreadFactory { + private static final Logger log = Logger.getLogger(NamingThreadFactory.class); + + private AtomicInteger threadNum = new AtomicInteger(1); + private String name; + + public NamingThreadFactory(String name) { + this.name = name; + } + + public Thread newThread(Runnable r) { + return new Daemon(new LoggingRunnable(log, new TraceRunnable(r)), name + " " + threadNum.getAndIncrement()); + } + +}