Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1109218A7D for ; Thu, 3 Mar 2016 21:59:27 +0000 (UTC) Received: (qmail 98821 invoked by uid 500); 3 Mar 2016 21:59:26 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 98747 invoked by uid 500); 3 Mar 2016 21:59:26 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 98547 invoked by uid 99); 3 Mar 2016 21:59:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Mar 2016 21:59:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 47C24E7907; Thu, 3 Mar 2016 21:59:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Thu, 03 Mar 2016 21:59:31 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [06/61] [abbrv] [partial] accumulo git commit: ACCUMULO-722 put trunk in my sandbox http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/OpTimer.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/OpTimer.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/OpTimer.java new file mode 100644 index 0000000..205b043 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/OpTimer.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +public class OpTimer { + private Logger log; + private Level level; + private long t1; + private long opid; + private static AtomicLong nextOpid = new AtomicLong(); + + public OpTimer(Logger log, Level level) { + this.log = log; + this.level = level; + } + + public OpTimer start(String msg) { + opid = nextOpid.getAndIncrement(); + if (log.isEnabledFor(level)) + log.log(level, "tid=" + Thread.currentThread().getId() + " oid=" + opid + " " + msg); + t1 = System.currentTimeMillis(); + return this; + } + + public void stop(String msg) { + if (log.isEnabledFor(level)) { + long t2 = System.currentTimeMillis(); + String duration = String.format("%.3f secs", (t2 - t1) / 1000.0); + msg = msg.replace("%DURATION%", duration); + log.log(level, "tid=" + Thread.currentThread().getId() + " oid=" + opid + " " + msg); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/Pair.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/Pair.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/Pair.java new file mode 100644 index 0000000..638e314 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/Pair.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +public class Pair { + A first; + B second; + + public Pair(A f, B s) { + this.first = f; + this.second = s; + } + + private int hashCode(Object o) { + if (o == null) + return 0; + return o.hashCode(); + } + + @Override + public int hashCode() { + return hashCode(first) + hashCode(second); + } + + private boolean equals(Object o1, Object o2) { + if (o1 == null || o2 == null) + return o1 == o2; + + return o1.equals(o2); + } + + @Override + public boolean equals(Object o) { + if (o instanceof Pair) { + Pair op = (Pair) o; + return equals(first, op.first) && equals(second, op.second); + } + return false; + } + + public A getFirst() { + return first; + } + + public B getSecond() { + return second; + } + + public String toString() { + return "(" + first + "," + second + ")"; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/PeekingIterator.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/PeekingIterator.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/PeekingIterator.java new file mode 100644 index 0000000..28618d2 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/PeekingIterator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.util.Iterator; + +public class PeekingIterator implements Iterator { + Iterator source; + E top; + + public PeekingIterator(Iterator source) { + this.source = source; + if (source.hasNext()) + top = source.next(); + else + top = null; + } + + public E peek() { + return top; + } + + public E next() { + E lastPeeked = top; + if (source.hasNext()) + top = source.next(); + else + top = null; + return lastPeeked; + } + + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasNext() { + return top != null; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/ServerServices.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/ServerServices.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/ServerServices.java new file mode 100644 index 0000000..5e490b7 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/ServerServices.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.net.InetSocketAddress; +import java.util.EnumMap; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; + +public class ServerServices implements Comparable { + public static enum Service { + TSERV_CLIENT, MASTER_CLIENT, GC_CLIENT; + + // not necessary: everything should be advertizing ports in zookeeper + int getDefaultPort() { + switch (this) { + case TSERV_CLIENT: + return AccumuloConfiguration.getDefaultConfiguration().getPort(Property.TSERV_CLIENTPORT); + case MASTER_CLIENT: + return AccumuloConfiguration.getDefaultConfiguration().getPort(Property.MASTER_CLIENTPORT); + case GC_CLIENT: + return AccumuloConfiguration.getDefaultConfiguration().getPort(Property.GC_PORT); + default: + throw new IllegalArgumentException(); + } + } + } + + public static String SERVICE_SEPARATOR = ";"; + public static String SEPARATOR_CHAR = "="; + + private EnumMap services; + private String stringForm = null; + + public ServerServices(String services) { + this.services = new EnumMap(Service.class); + + String[] addresses = services.split(SERVICE_SEPARATOR); + for (String address : addresses) { + String[] sa = address.split(SEPARATOR_CHAR, 2); + this.services.put(Service.valueOf(sa[0]), sa[1]); + } + } + + public ServerServices(String address, Service service) { + this(service.name() + SEPARATOR_CHAR + address); + } + + public String getAddressString(Service service) { + return services.get(service); + } + + public InetSocketAddress getAddress(Service service) { + String address = getAddressString(service); + String[] parts = address.split(":", 2); + if (parts.length == 2) { + if (parts[1].isEmpty()) + return new InetSocketAddress(parts[0], service.getDefaultPort()); + return new InetSocketAddress(parts[0], Integer.parseInt(parts[1])); + } + return new InetSocketAddress(address, service.getDefaultPort()); + } + + // DON'T CHANGE THIS; WE'RE USING IT FOR SERIALIZATION!!! + public String toString() { + if (stringForm == null) { + StringBuilder sb = new StringBuilder(); + String prefix = ""; + for (Service service : new Service[] {Service.MASTER_CLIENT, Service.TSERV_CLIENT, Service.GC_CLIENT}) { + if (services.containsKey(service)) { + sb.append(prefix).append(service.name()).append(SEPARATOR_CHAR).append(services.get(service)); + prefix = SERVICE_SEPARATOR; + } + } + stringForm = sb.toString(); + } + return stringForm; + } + + @Override + public int hashCode() { + return toString().hashCode(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof ServerServices) + return toString().equals(((ServerServices) o).toString()); + return false; + } + + @Override + public int compareTo(ServerServices other) { + return toString().compareTo(other.toString()); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java new file mode 100644 index 0000000..f164527 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + + +/** + * Create a simple thread pool using common parameters. + */ +public class SimpleThreadPool extends ThreadPoolExecutor { + + public SimpleThreadPool(int max, final String name) { + super(max, max, 1l, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamingThreadFactory(name)); + allowCoreThreadTimeOut(true); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/Stat.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/Stat.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/Stat.java new file mode 100644 index 0000000..e65265c --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/Stat.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +public class Stat { + + long max = Long.MIN_VALUE; + long min = Long.MAX_VALUE; + long sum = 0; + int count = 0; + double partialStdDev = 0; + + public void addStat(long stat) { + if (stat > max) + max = stat; + if (stat < min) + min = stat; + + sum += stat; + + partialStdDev += stat * stat; + + count++; + } + + public long getMin() { + return min; + } + + public long getMax() { + return max; + } + + public double getAverage() { + return ((double) sum) / count; + } + + public double getStdDev() { + return Math.sqrt(partialStdDev / count - getAverage() * getAverage()); + } + + public String toString() { + return String.format("%,d %,d %,.2f %,d", getMin(), getMax(), getAverage(), count); + } + + public void clear() { + sum = 0; + count = 0; + partialStdDev = 0; + } + + public long getSum() { + return sum; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/StopWatch.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/StopWatch.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/StopWatch.java new file mode 100644 index 0000000..7f0f3e9 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/StopWatch.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.util.EnumMap; + +public class StopWatch> { + EnumMap startTime; + EnumMap totalTime; + + public StopWatch(Class k) { + startTime = new EnumMap(k); + totalTime = new EnumMap(k); + } + + public synchronized void start(K timer) { + if (startTime.containsKey(timer)) { + throw new IllegalStateException(timer + " already started"); + } + startTime.put(timer, System.currentTimeMillis()); + } + + public synchronized void stopIfActive(K timer) { + if (startTime.containsKey(timer)) + stop(timer); + } + + public synchronized void stop(K timer) { + + Long st = startTime.get(timer); + + if (st == null) { + throw new IllegalStateException(timer + " not started"); + } + + Long existingTime = totalTime.get(timer); + if (existingTime == null) + existingTime = 0L; + + totalTime.put(timer, existingTime + (System.currentTimeMillis() - st)); + startTime.remove(timer); + } + + public synchronized void reset(K timer) { + totalTime.remove(timer); + } + + public synchronized long get(K timer) { + Long existingTime = totalTime.get(timer); + if (existingTime == null) + existingTime = 0L; + return existingTime; + } + + public synchronized double getSecs(K timer) { + Long existingTime = totalTime.get(timer); + if (existingTime == null) + existingTime = 0L; + return existingTime / 1000.0; + } + + public synchronized void print() { + for (K timer : totalTime.keySet()) { + System.out.printf("%20s : %,6.4f secs\n", timer.toString(), get(timer) / 1000.0); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/StringUtil.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/StringUtil.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/StringUtil.java new file mode 100644 index 0000000..6357093 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/StringUtil.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.util.Collection; + +public final class StringUtil { + public static String join(Collection strings, String sep) { + int last = 0; + StringBuilder ret = new StringBuilder(); + for (String s : strings) { + ret.append(s); + last = ret.length(); + ret.append(sep); + } + ret.delete(last, ret.length()); + return ret.toString(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/TBufferedSocket.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/TBufferedSocket.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/TBufferedSocket.java new file mode 100644 index 0000000..c2d9400 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/TBufferedSocket.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; + +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TSocket; + +public class TBufferedSocket extends TIOStreamTransport { + + String client; + + public TBufferedSocket(TSocket sock, int bufferSize) throws IOException { + super(new BufferedInputStream(sock.getSocket().getInputStream(), bufferSize), new BufferedOutputStream(sock.getSocket().getOutputStream(), bufferSize)); + client = sock.getSocket().getInetAddress().getHostAddress() + ":" + sock.getSocket().getPort(); + } + + public String getClientString() { + return client; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java new file mode 100644 index 0000000..6f74451 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.channels.spi.SelectorProvider; + +import org.apache.hadoop.net.SocketInputStream; +import org.apache.hadoop.net.SocketOutputStream; +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TTransport; + +public class TTimeoutTransport { + + public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException { + Socket socket = SelectorProvider.provider().openSocketChannel().socket(); + socket.setSoLinger(false, 0); + socket.setTcpNoDelay(true); + socket.connect(addr); + InputStream input = new BufferedInputStream(new SocketInputStream(socket, timeoutMillis), 1024 * 10); + OutputStream output = new BufferedOutputStream(new SocketOutputStream(socket, timeoutMillis), 1024 * 10); + return new TIOStreamTransport(input, output); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/TableDiskUsage.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/TableDiskUsage.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/TableDiskUsage.java new file mode 100644 index 0000000..46d1570 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/TableDiskUsage.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +public class TableDiskUsage { + + private int nextInternalId = 0; + private Map internalIds = new HashMap(); + private Map externalIds = new HashMap(); + private Map tableFiles = new HashMap(); + private Map fileSizes = new HashMap(); + + void addTable(String tableId) { + if (internalIds.containsKey(tableId)) + throw new IllegalArgumentException("Already added table " + tableId); + + int iid = nextInternalId++; + + internalIds.put(tableId, iid); + externalIds.put(iid, tableId); + } + + void linkFileAndTable(String tableId, String file) { + int internalId = internalIds.get(tableId); + + Integer[] tables = tableFiles.get(file); + if (tables == null) { + tables = new Integer[internalIds.size()]; + for (int i = 0; i < tables.length; i++) + tables[i] = 0; + tableFiles.put(file, tables); + } + + tables[internalId] = 1; + } + + void addFileSize(String file, long size) { + fileSizes.put(file, size); + } + + Map,Long> calculateUsage() { + + Map,Long> usage = new HashMap,Long>(); + + for (Entry entry : tableFiles.entrySet()) { + List key = Arrays.asList(entry.getValue()); + Long size = fileSizes.get(entry.getKey()); + + Long tablesUsage = usage.get(key); + if (tablesUsage == null) + tablesUsage = 0l; + + tablesUsage += size; + + usage.put(key, tablesUsage); + + } + + Map,Long> externalUsage = new HashMap,Long>(); + + for (Entry,Long> entry : usage.entrySet()) { + List externalKey = new ArrayList(); + List key = entry.getKey(); + for (int i = 0; i < key.size(); i++) + if (key.get(i) != 0) + externalKey.add(externalIds.get(i)); + + externalUsage.put(externalKey, entry.getValue()); + } + + return externalUsage; + } + + public static void printDiskUsage(AccumuloConfiguration acuConf, Collection tables, FileSystem fs, Connector conn) throws TableNotFoundException, + IOException { + + TableDiskUsage tdu = new TableDiskUsage(); + + HashSet tableIds = new HashSet(); + + for (String tableName : tables) { + String tableId = conn.tableOperations().tableIdMap().get(tableName); + if (tableId == null) + throw new TableNotFoundException(null, tableName, "Table " + tableName + " not found"); + + tableIds.add(tableId); + } + + for (String tableId : tableIds) + tdu.addTable(tableId); + + HashSet tablesReferenced = new HashSet(tableIds); + + for (String tableId : tableIds) { + Scanner mdScanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); + mdScanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY); + mdScanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange()); + + for (Entry entry : mdScanner) { + String file = entry.getKey().getColumnQualifier().toString(); + if (file.startsWith("../")) { + file = file.substring(2); + tablesReferenced.add(file.split("\\/")[1]); + } else + file = "/" + tableId + file; + + tdu.linkFileAndTable(tableId, file); + } + } + + for (String tableId : tablesReferenced) { + FileStatus[] files = fs.globStatus(new Path(Constants.getTablesDir(acuConf) + "/" + tableId + "/*/*")); + + for (FileStatus fileStatus : files) { + String dir = fileStatus.getPath().getParent().getName(); + String name = fileStatus.getPath().getName(); + + tdu.addFileSize("/" + tableId + "/" + dir + "/" + name, fileStatus.getLen()); + } + + } + + HashMap reverseTableIdMap = new HashMap(); + for (Entry entry : conn.tableOperations().tableIdMap().entrySet()) + reverseTableIdMap.put(entry.getValue(), entry.getKey()); + + TreeMap,Long> usage = new TreeMap,Long>(new Comparator>() { + + @Override + public int compare(TreeSet o1, TreeSet o2) { + int len1 = o1.size(); + int len2 = o2.size(); + + int min = Math.min(len1, len2); + + Iterator iter1 = o1.iterator(); + Iterator iter2 = o2.iterator(); + + int count = 0; + + while (count < min) { + String s1 = iter1.next(); + String s2 = iter2.next(); + + int cmp = s1.compareTo(s2); + + if (cmp != 0) + return cmp; + + count++; + } + + return len1 - len2; + } + }); + + for (Entry,Long> entry : tdu.calculateUsage().entrySet()) { + TreeSet tableNames = new TreeSet(); + for (String tableId : entry.getKey()) + tableNames.add(reverseTableIdMap.get(tableId)); + + usage.put(tableNames, entry.getValue()); + } + + for (Entry,Long> entry : usage.entrySet()) + System.out.printf("%,24d %s\n", entry.getValue(), entry.getKey()); + + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/TextUtil.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/TextUtil.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/TextUtil.java new file mode 100644 index 0000000..f78747e --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/TextUtil.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.nio.ByteBuffer; + +import org.apache.accumulo.core.Constants; +import org.apache.hadoop.io.Text; + +public final class TextUtil { + public static byte[] getBytes(Text text) { + byte[] bytes = text.getBytes(); + if (bytes.length != text.getLength()) { + bytes = new byte[text.getLength()]; + System.arraycopy(text.getBytes(), 0, bytes, 0, bytes.length); + } + return bytes; + } + + public static ByteBuffer getByteBuffer(Text text) { + if (text == null) + return null; + byte[] bytes = text.getBytes(); + return ByteBuffer.wrap(bytes, 0, text.getLength()); + } + + public static Text truncate(Text text, int maxLen) { + if (text.getLength() > maxLen) { + Text newText = new Text(); + newText.append(text.getBytes(), 0, maxLen); + String suffix = "... TRUNCATED"; + newText.append(suffix.getBytes(), 0, suffix.length()); + return newText; + } + + return text; + } + + public static Text truncate(Text row) { + return truncate(row, Constants.MAX_DATA_TO_PRINT); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java new file mode 100644 index 0000000..fd9fc6c --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.net.InetSocketAddress; + +import org.apache.accumulo.cloudtrace.instrument.Span; +import org.apache.accumulo.cloudtrace.instrument.Trace; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.impl.ClientExec; +import org.apache.accumulo.core.client.impl.ClientExecReturn; +import org.apache.accumulo.core.client.impl.ThriftTransportPool; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.security.thrift.ThriftSecurityException; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.TServiceClient; +import org.apache.thrift.TServiceClientFactory; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TMessage; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.TTransportFactory; + + +public class ThriftUtil { + private static final Logger log = Logger.getLogger(ThriftUtil.class); + + public static class TraceProtocol extends TCompactProtocol { + + @Override + public void writeMessageBegin(TMessage message) throws TException { + Trace.start("client:" + message.name); + super.writeMessageBegin(message); + } + + @Override + public void writeMessageEnd() throws TException { + super.writeMessageEnd(); + Span currentTrace = Trace.currentTrace(); + if (currentTrace != null) + currentTrace.stop(); + } + + public TraceProtocol(TTransport transport) { + super(transport); + } + } + + public static class TraceProtocolFactory extends TCompactProtocol.Factory { + private static final long serialVersionUID = 1L; + + @Override + public TProtocol getProtocol(TTransport trans) { + return new TraceProtocol(trans); + } + } + + static private TProtocolFactory protocolFactory = new TraceProtocolFactory(); + static private TTransportFactory transportFactory = new TFramedTransport.Factory(); + + static public T createClient(TServiceClientFactory factory, TTransport transport) { + return factory.getClient(protocolFactory.getProtocol(transport), protocolFactory.getProtocol(transport)); + } + + static public T getClient(TServiceClientFactory factory, InetSocketAddress address, AccumuloConfiguration conf) + throws TTransportException { + return createClient(factory, ThriftTransportPool.getInstance().getTransportWithDefaultTimeout(address, conf)); + } + + static public T getClient(TServiceClientFactory factory, String address, Property property, AccumuloConfiguration configuration) + throws TTransportException { + int port = configuration.getPort(property); + TTransport transport = ThriftTransportPool.getInstance().getTransport(address, port); + return createClient(factory, transport); + } + + static public T getClient(TServiceClientFactory factory, String address, Property property, Property timeoutProperty, + AccumuloConfiguration configuration) throws TTransportException { + return getClient(factory, address, property, configuration.getTimeInMillis(timeoutProperty), configuration); + } + + static public T getClient(TServiceClientFactory factory, String address, Property property, long timeout, + AccumuloConfiguration configuration) throws TTransportException { + int port = configuration.getPort(property); + TTransport transport = ThriftTransportPool.getInstance().getTransport(address, port, timeout); + return createClient(factory, transport); + } + + static public void returnClient(TServiceClient iface) { // Eew... the typing here is horrible + if (iface != null) { + ThriftTransportPool.getInstance().returnTransport(iface.getInputProtocol().getTransport()); + } + } + + static public TabletClientService.Client getTServerClient(String address, AccumuloConfiguration conf) throws TTransportException { + return getClient(new TabletClientService.Client.Factory(), address, Property.TSERV_CLIENTPORT, Property.GENERAL_RPC_TIMEOUT, conf); + } + + static public TabletClientService.Client getTServerClient(String address, AccumuloConfiguration conf, long timeout) throws TTransportException { + return getClient(new TabletClientService.Client.Factory(), address, Property.TSERV_CLIENTPORT, timeout, conf); + } + + public static void execute(String address, AccumuloConfiguration conf, ClientExec exec) throws AccumuloException, + AccumuloSecurityException { + while (true) { + TabletClientService.Client client = null; + try { + exec.execute(client = getTServerClient(address, conf)); + break; + } catch (TTransportException tte) { + log.debug("getTServerClient request failed, retrying ... ", tte); + UtilWaitThread.sleep(100); + } catch (ThriftSecurityException e) { + throw new AccumuloSecurityException(e.user, e.code, e); + } catch (Exception e) { + throw new AccumuloException(e); + } finally { + if (client != null) + returnClient(client); + } + } + } + + public static T execute(String address, AccumuloConfiguration conf, ClientExecReturn exec) throws AccumuloException, + AccumuloSecurityException { + while (true) { + TabletClientService.Client client = null; + try { + return exec.execute(client = getTServerClient(address, conf)); + } catch (TTransportException tte) { + log.debug("getTServerClient request failed, retrying ... ", tte); + UtilWaitThread.sleep(100); + } catch (ThriftSecurityException e) { + throw new AccumuloSecurityException(e.user, e.code, e); + } catch (Exception e) { + throw new AccumuloException(e); + } finally { + if (client != null) + returnClient(client); + } + } + } + + public static TTransportFactory transportFactory() { + return transportFactory; + } + + public static TProtocolFactory protocolFactory() { + return protocolFactory; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/UtilWaitThread.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/UtilWaitThread.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/UtilWaitThread.java new file mode 100644 index 0000000..36a4279 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/UtilWaitThread.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import org.apache.log4j.Logger; + +public class UtilWaitThread { + private static final Logger log = Logger.getLogger(UtilWaitThread.class); + + public static void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/Version.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/Version.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/Version.java new file mode 100644 index 0000000..7347227 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/Version.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class Version { + String package_ = null; + int major = 0; + int minor = 0; + int release = 0; + String etcetera = null; + + public Version(String everything) { + parse(everything); + } + + private void parse(String everything) { + Pattern pattern = Pattern.compile("(([^-]*)-)?(\\d+)(\\.(\\d+)(\\.(\\d+))?)?(-(.*))?"); + Matcher parser = pattern.matcher(everything); + if (!parser.matches()) + throw new IllegalArgumentException("Unable to parse: " + everything + " as a version"); + + if (parser.group(1) != null) + package_ = parser.group(2); + major = Integer.valueOf(parser.group(3)); + minor = 0; + if (parser.group(5) != null) + minor = Integer.valueOf(parser.group(5)); + if (parser.group(7) != null) + release = Integer.valueOf(parser.group(7)); + if (parser.group(9) != null) + etcetera = parser.group(9); + + } + + public String getPackage() { + return package_; + } + + public int getMajorVersion() { + return major; + } + + public int getMinorVersion() { + return minor; + } + + public int getReleaseVersion() { + return release; + } + + public String getEtcetera() { + return etcetera; + } + + @Override + public String toString() { + StringBuilder result = new StringBuilder(); + if (package_ != null) { + result.append(package_); + result.append("-"); + } + result.append(major); + result.append("."); + result.append(minor); + result.append("."); + result.append(release); + if (etcetera != null) { + result.append("-"); + result.append(etcetera); + } + return result.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/format/BinaryFormatter.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/format/BinaryFormatter.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/format/BinaryFormatter.java new file mode 100644 index 0000000..5021d66 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/format/BinaryFormatter.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util.format; + +import java.util.Iterator; +import java.util.Map.Entry; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.io.Text; + +public class BinaryFormatter implements Formatter { + private Iterator> si; + private boolean doTimestamps; + private static int showLength; + + @Override + public void initialize(Iterable> scanner, boolean printTimestamps) { + checkState(si, false); + si = scanner.iterator(); + doTimestamps = printTimestamps; + } + + public boolean hasNext() { + checkState(si, true); + return si.hasNext(); + } + + public String next() { + checkState(si, true); + return formatEntry(si.next(), doTimestamps); + } + + public void remove() { + checkState(si, true); + si.remove(); + } + + static void checkState(Iterator> si, boolean expectInitialized) { + if (expectInitialized && si == null) + throw new IllegalStateException("Not initialized"); + if (!expectInitialized && si != null) + throw new IllegalStateException("Already initialized"); + } + + // this should be replaced with something like Record.toString(); + public static String formatEntry(Entry entry, boolean showTimestamps) { + StringBuilder sb = new StringBuilder(); + + // append row + appendText(sb, entry.getKey().getRow()).append(" "); + + // append column family + appendText(sb, entry.getKey().getColumnFamily()).append(":"); + + // append column qualifier + appendText(sb, entry.getKey().getColumnQualifier()).append(" "); + + // append visibility expression + sb.append(new ColumnVisibility(entry.getKey().getColumnVisibility())); + + // append timestamp + if (showTimestamps) + sb.append(" ").append(entry.getKey().getTimestamp()); + + // append value + if (entry.getValue() != null && entry.getValue().getSize() > 0) { + sb.append("\t"); + appendValue(sb, entry.getValue()); + } + + return sb.toString(); + } + + public static StringBuilder appendText(StringBuilder sb, Text t) { + return appendBytes(sb, t.getBytes(), 0, t.getLength()); + } + + static StringBuilder appendValue(StringBuilder sb, Value value) { + + return appendBytes(sb, value.get(), 0, value.get().length); + } + + static StringBuilder appendBytes(StringBuilder sb, byte ba[], int offset, int len) { + if (len > showLength) { + for (int i = 0; i < showLength; i++) { + int c = 0xff & ba[offset + i]; + if (c == '\\') + sb.append("\\\\"); + else if (c >= 32 && c <= 126) + sb.append((char) c); + else + sb.append("\\x").append(String.format("%02X", c)); + } + return sb; + } + + else { + for (int i = 0; i < len; i++) { + + int c = 0xff & ba[offset + i]; + if (c == '\\') + sb.append("\\\\"); + else if (c >= 32 && c <= 126) + sb.append((char) c); + else + sb.append("\\x").append(String.format("%02X", c)); + } + return sb; + } + } + + public Iterator> getScannerIterator() { + return si; + } + + public static void getlength(int length) { + showLength = length; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/format/DefaultFormatter.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/format/DefaultFormatter.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/format/DefaultFormatter.java new file mode 100644 index 0000000..78abed9 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/format/DefaultFormatter.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util.format; + +import java.util.Iterator; +import java.util.Map.Entry; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.io.Text; + +public class DefaultFormatter implements Formatter { + private Iterator> si; + private boolean doTimestamps; + + @Override + public void initialize(Iterable> scanner, boolean printTimestamps) { + checkState(si, false); + si = scanner.iterator(); + doTimestamps = printTimestamps; + } + + public boolean hasNext() { + checkState(si, true); + return si.hasNext(); + } + + public String next() { + checkState(si, true); + return formatEntry(si.next(), doTimestamps); + } + + public void remove() { + checkState(si, true); + si.remove(); + } + + static void checkState(Iterator> si, boolean expectInitialized) { + if (expectInitialized && si == null) + throw new IllegalStateException("Not initialized"); + if (!expectInitialized && si != null) + throw new IllegalStateException("Already initialized"); + } + + // this should be replaced with something like Record.toString(); + public static String formatEntry(Entry entry, boolean showTimestamps) { + StringBuilder sb = new StringBuilder(); + + // append row + appendText(sb, entry.getKey().getRow()).append(" "); + + // append column family + appendText(sb, entry.getKey().getColumnFamily()).append(":"); + + // append column qualifier + appendText(sb, entry.getKey().getColumnQualifier()).append(" "); + + // append visibility expression + sb.append(new ColumnVisibility(entry.getKey().getColumnVisibility())); + + // append timestamp + if (showTimestamps) + sb.append(" ").append(entry.getKey().getTimestamp()); + + // append value + if (entry.getValue() != null && entry.getValue().getSize() > 0) { + sb.append("\t"); + appendValue(sb, entry.getValue()); + } + + return sb.toString(); + } + + static StringBuilder appendText(StringBuilder sb, Text t) { + return appendBytes(sb, t.getBytes(), 0, t.getLength()); + } + + static StringBuilder appendValue(StringBuilder sb, Value value) { + return appendBytes(sb, value.get(), 0, value.get().length); + } + + static StringBuilder appendBytes(StringBuilder sb, byte ba[], int offset, int len) { + for (int i = 0; i < len; i++) { + int c = 0xff & ba[offset + i]; + if (c == '\\') + sb.append("\\\\"); + else if (c >= 32 && c <= 126) + sb.append((char) c); + else + sb.append("\\x").append(String.format("%02X", c)); + } + return sb; + } + + public Iterator> getScannerIterator() { + return si; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/format/DeleterFormatter.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/format/DeleterFormatter.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/format/DeleterFormatter.java new file mode 100644 index 0000000..b4acdaf --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/format/DeleterFormatter.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util.format; + +import java.io.IOException; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.data.ConstraintViolationSummary; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.util.shell.Shell; +import org.apache.log4j.Logger; + +public class DeleterFormatter extends DefaultFormatter { + + private static final Logger log = Logger.getLogger(DeleterFormatter.class); + private BatchWriter writer; + private Shell shellState; + private boolean printTimestamps; + private boolean force; + private boolean more; + + public DeleterFormatter(BatchWriter writer, Iterable> scanner, boolean printTimestamps, Shell shellState, boolean force) { + super.initialize(scanner, printTimestamps); + this.writer = writer; + this.shellState = shellState; + this.printTimestamps = printTimestamps; + this.force = force; + this.more = true; + } + + @Override + public boolean hasNext() { + if (!getScannerIterator().hasNext() || !more) { + try { + writer.close(); + } catch (MutationsRejectedException e) { + log.error(e.toString()); + if (Shell.isDebuggingEnabled()) + for (ConstraintViolationSummary cvs : e.getConstraintViolationSummaries()) + log.trace(cvs.toString()); + } + return false; + } + return true; + } + + @Override + public String next() { + Entry next = getScannerIterator().next(); + Key key = next.getKey(); + Mutation m = new Mutation(key.getRow()); + String entryStr = formatEntry(next, printTimestamps); + boolean delete = force; + try { + if (!force) { + shellState.getReader().flushConsole(); + String line = shellState.getReader().readLine("Delete { " + entryStr + " } ? "); + more = line != null; + delete = line != null && (line.equalsIgnoreCase("y") || line.equalsIgnoreCase("yes")); + } + if (delete) { + m.putDelete(key.getColumnFamily(), key.getColumnQualifier(), new ColumnVisibility(key.getColumnVisibility()), key.getTimestamp()); + try { + writer.addMutation(m); + } catch (MutationsRejectedException e) { + log.error(e.toString()); + if (Shell.isDebuggingEnabled()) + for (ConstraintViolationSummary cvs : e.getConstraintViolationSummaries()) + log.trace(cvs.toString()); + } + } + shellState.getReader().printString(String.format("[%s] %s\n", delete ? "DELETED" : "SKIPPED", entryStr)); + } catch (IOException e) { + log.error("Cannot write to console", e); + throw new RuntimeException(e); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/format/Formatter.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/format/Formatter.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/format/Formatter.java new file mode 100644 index 0000000..0c1fc5f --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/format/Formatter.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util.format; + +import java.util.Iterator; +import java.util.Map.Entry; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; + +public interface Formatter extends Iterator { + public void initialize(Iterable> scanner, boolean printTimestamps); +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/format/FormatterFactory.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/format/FormatterFactory.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/format/FormatterFactory.java new file mode 100644 index 0000000..5451843 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/format/FormatterFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util.format; + +import java.util.Map.Entry; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.log4j.Logger; + +public class FormatterFactory { + private static final Logger log = Logger.getLogger(FormatterFactory.class); + + public static Formatter getFormatter(Class formatterClass, Iterable> scanner, boolean printTimestamps) { + Formatter formatter = null; + try { + formatter = formatterClass.newInstance(); + } catch (Exception e) { + log.warn("Unable to instantiate formatter. Using default formatter.", e); + formatter = new DefaultFormatter(); + } + formatter.initialize(scanner, printTimestamps); + return formatter; + } + + public static Formatter getDefaultFormatter(Iterable> scanner, boolean printTimestamps) { + return getFormatter(DefaultFormatter.class, scanner, printTimestamps); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/format/ShardedTableDistributionFormatter.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/format/ShardedTableDistributionFormatter.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/format/ShardedTableDistributionFormatter.java new file mode 100644 index 0000000..4b8d5e3 --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/format/ShardedTableDistributionFormatter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * + */ +package org.apache.accumulo.core.util.format; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; + +/** + * Formats the rows in a METADATA table scan to show distribution of shards over servers per day. This can be used to determine the effectiveness of the + * ShardedTableLoadBalancer + * + * Use this formatter with the following scan command in the shell: + * + * scan -b tableId -c ~tab:loc + */ +public class ShardedTableDistributionFormatter extends DefaultFormatter { + + private Map> countsByDay = new HashMap>(); + + @Override + public String next() { + Iterator> si = super.getScannerIterator(); + checkState(si, true); + while (si.hasNext()) + aggregateStats(si.next()); + return getStats(); + } + + private void aggregateStats(Entry entry) { + if (entry.getKey().getColumnFamily().toString().equals("~tab") && entry.getKey().getColumnQualifier().toString().equals("loc")) { + // The row for the sharded table should look like: ;yyyyMMhh_N + String row = entry.getKey().getRow().toString(); + // Parse the day out of the row + int semicolon = row.indexOf(";"); + String day = null; + if (semicolon != -1) { + semicolon++; + day = row.substring(semicolon, semicolon + 8); + } else + day = "NULL "; + String server = entry.getValue().toString(); + if (countsByDay.get(day) == null) + countsByDay.put(day, new HashSet()); + countsByDay.get(day).add(server); + } + } + + private String getStats() { + StringBuilder buf = new StringBuilder(); + buf.append("DAY \t\tSERVERS\n"); + buf.append("------\t\t-------\n"); + for (String day : countsByDay.keySet()) + buf.append(day + "\t\t" + countsByDay.get(day).size() + "\n"); + return buf.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/util/format/StatisticsDisplayFormatter.java ---------------------------------------------------------------------- diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/util/format/StatisticsDisplayFormatter.java b/1.5/core/src/main/java/org/apache/accumulo/core/util/format/StatisticsDisplayFormatter.java new file mode 100644 index 0000000..dd9de6c --- /dev/null +++ b/1.5/core/src/main/java/org/apache/accumulo/core/util/format/StatisticsDisplayFormatter.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util.format; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; + +/** + * Does not show contents from scan, only displays statistics. Beware that this work is being done client side and this was developed as a utility for + * debugging. If used on large result sets it will likely fail. + */ +public class StatisticsDisplayFormatter extends DefaultFormatter { + private Map classifications = new HashMap(); + private Map columnFamilies = new HashMap(); + private Map columnQualifiers = new HashMap(); + private long total = 0; + + @Override + public String next() { + Iterator> si = super.getScannerIterator(); + checkState(si, true); + while (si.hasNext()) + aggregateStats(si.next()); + return getStats(); + } + + private void aggregateStats(Entry entry) { + String key; + Long count; + + key = entry.getKey().getColumnVisibility().toString(); + count = classifications.get(key); + classifications.put(key, count != null ? count + 1 : 0L); + + key = entry.getKey().getColumnFamily().toString(); + count = columnFamilies.get(key); + columnFamilies.put(key, count != null ? count + 1 : 0L); + + key = entry.getKey().getColumnQualifier().toString(); + count = columnQualifiers.get(key); + columnQualifiers.put(key, count != null ? count + 1 : 0L); + + ++total; + } + + private String getStats() { + StringBuilder buf = new StringBuilder(); + buf.append("CLASSIFICATIONS:\n"); + buf.append("----------------\n"); + for (String key : classifications.keySet()) + buf.append("\t").append(key).append(": ").append(classifications.get(key)).append("\n"); + buf.append("COLUMN FAMILIES:\n"); + buf.append("----------------\n"); + for (String key : columnFamilies.keySet()) + buf.append("\t").append(key).append(": ").append(columnFamilies.get(key)).append("\n"); + buf.append("COLUMN QUALIFIERS:\n"); + buf.append("------------------\n"); + for (String key : columnQualifiers.keySet()) + buf.append("\t").append(key).append(": ").append(columnQualifiers.get(key)).append("\n"); + + buf.append(total).append(" entries matched."); + total = 0; + classifications = new HashMap(); + columnFamilies = new HashMap(); + columnQualifiers = new HashMap(); + + return buf.toString(); + } +}