Return-Path: X-Original-To: apmail-accumulo-notifications-archive@minotaur.apache.org Delivered-To: apmail-accumulo-notifications-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C40E117496 for ; Thu, 2 Apr 2015 02:31:53 +0000 (UTC) Received: (qmail 50439 invoked by uid 500); 2 Apr 2015 02:31:53 -0000 Delivered-To: apmail-accumulo-notifications-archive@accumulo.apache.org Received: (qmail 50396 invoked by uid 500); 2 Apr 2015 02:31:53 -0000 Mailing-List: contact notifications-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@apache.org Delivered-To: mailing list notifications@accumulo.apache.org Received: (qmail 50381 invoked by uid 99); 2 Apr 2015 02:31:53 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Apr 2015 02:31:53 +0000 Date: Thu, 2 Apr 2015 02:31:53 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: notifications@accumulo.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (ACCUMULO-3602) BatchScanner optimization for AccumuloInputFormat MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/ACCUMULO-3602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14392011#comment-14392011 ] ASF GitHub Bot commented on ACCUMULO-3602: ------------------------------------------ Github user joshelser commented on a diff in the pull request: https://github.com/apache/accumulo/pull/25#discussion_r27628943 --- Diff: core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputSplit.java --- @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mapreduce; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase.TokenSource; +import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.Base64; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.data.Key; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.log4j.Level; + +/** + * Abstracts over configurations common to all InputSplits. Specifically it leaves out methods + * related to number of ranges and locations per InputSplit as those vary by implementation. + * + * @see RangeInputSplit + * @see BatchInputSplit + */ +public abstract class AccumuloInputSplit extends InputSplit implements Writable { + protected String[] locations; + protected String tableId, tableName, instanceName, zooKeepers, principal; + protected TokenSource tokenSource; + protected String tokenFile; + protected AuthenticationToken token; + protected Boolean mockInstance; + protected Authorizations auths; + protected Set> fetchedColumns; + protected List iterators; + protected Level level; + + public abstract float getProgress(Key currentKey); + + public AccumuloInputSplit() { + locations = new String[0]; + tableName = ""; + tableId = ""; + } + + public AccumuloInputSplit(AccumuloInputSplit split) throws IOException { + this.setLocations(split.getLocations()); + this.setTableName(split.getTableName()); + this.setTableId(split.getTableId()); + } + + protected AccumuloInputSplit(String table, String tableId, String[] locations) { + setLocations(locations); + this.tableName = table; + this.tableId = tableId; + } + + private static byte[] extractBytes(ByteSequence seq, int numBytes) { + byte[] bytes = new byte[numBytes + 1]; + bytes[0] = 0; + for (int i = 0; i < numBytes; i++) { + if (i >= seq.length()) + bytes[i + 1] = 0; + else + bytes[i + 1] = seq.byteAt(i); + } + return bytes; + } + + public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) { + int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length()); + BigInteger startBI = new BigInteger(extractBytes(start, maxDepth)); + BigInteger endBI = new BigInteger(extractBytes(end, maxDepth)); + BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth)); + return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue()); + } + + public long getRangeLength(Range range) throws IOException { + Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) : range.getStartKey().getRow(); + Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow(); + int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength())); + long diff = 0; + + byte[] start = startRow.getBytes(); + byte[] stop = stopRow.getBytes(); + for (int i = 0; i < maxCommon; ++i) { + diff |= 0xff & (start[i] ^ stop[i]); + diff <<= Byte.SIZE; + } + + if (startRow.getLength() != stopRow.getLength()) + diff |= 0xff; + + return diff + 1; + } + + @Override + public String[] getLocations() throws IOException { + return Arrays.copyOf(locations, locations.length); + } + + @Override + public void readFields(DataInput in) throws IOException { + tableName = in.readUTF(); + tableId = in.readUTF(); + int numLocs = in.readInt(); + locations = new String[numLocs]; + for (int i = 0; i < numLocs; ++i) + locations[i] = in.readUTF(); + + if (in.readBoolean()) { + mockInstance = in.readBoolean(); + } + + if (in.readBoolean()) { + int numColumns = in.readInt(); + List columns = new ArrayList(numColumns); + for (int i = 0; i < numColumns; i++) { + columns.add(in.readUTF()); + } + + fetchedColumns = InputConfigurator.deserializeFetchedColumns(columns); + } + + if (in.readBoolean()) { + String strAuths = in.readUTF(); + auths = new Authorizations(strAuths.getBytes(UTF_8)); + } + + if (in.readBoolean()) { + principal = in.readUTF(); + } + + if (in.readBoolean()) { + int ordinal = in.readInt(); + this.tokenSource = TokenSource.values()[ordinal]; + + switch (this.tokenSource) { + case INLINE: + String tokenClass = in.readUTF(); + byte[] base64TokenBytes = in.readUTF().getBytes(UTF_8); + byte[] tokenBytes = Base64.decodeBase64(base64TokenBytes); + + this.token = AuthenticationTokenSerializer.deserialize(tokenClass, tokenBytes); + break; + + case FILE: + this.tokenFile = in.readUTF(); + + break; + default: + throw new IOException("Cannot parse unknown TokenSource ordinal"); + } + } + + if (in.readBoolean()) { + instanceName = in.readUTF(); + } + + if (in.readBoolean()) { + zooKeepers = in.readUTF(); + } + + if (in.readBoolean()) { + int numIterators = in.readInt(); + iterators = new ArrayList(numIterators); + for (int i = 0; i < numIterators; i++) { + iterators.add(new IteratorSetting(in)); + } + } + + if (in.readBoolean()) { + level = Level.toLevel(in.readInt()); + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(tableName); + out.writeUTF(tableId); + out.writeInt(locations.length); + for (int i = 0; i < locations.length; ++i) + out.writeUTF(locations[i]); + + out.writeBoolean(null != mockInstance); + if (null != mockInstance) { + out.writeBoolean(mockInstance); + } + + out.writeBoolean(null != fetchedColumns); + if (null != fetchedColumns) { + String[] cols = InputConfigurator.serializeColumns(fetchedColumns); + out.writeInt(cols.length); + for (String col : cols) { + out.writeUTF(col); + } + } + + out.writeBoolean(null != auths); + if (null != auths) { + out.writeUTF(auths.serialize()); + } + + out.writeBoolean(null != principal); + if (null != principal) { + out.writeUTF(principal); + } + + out.writeBoolean(null != tokenSource); + if (null != tokenSource) { + out.writeInt(tokenSource.ordinal()); + + if (null != token && null != tokenFile) { + throw new IOException("Cannot use both inline AuthenticationToken and file-based AuthenticationToken"); + } else if (null != token) { + out.writeUTF(token.getClass().getCanonicalName()); + out.writeUTF(Base64.encodeBase64String(AuthenticationTokenSerializer.serialize(token))); + } else { + out.writeUTF(tokenFile); + } + } + + out.writeBoolean(null != instanceName); + if (null != instanceName) { + out.writeUTF(instanceName); + } + + out.writeBoolean(null != zooKeepers); + if (null != zooKeepers) { + out.writeUTF(zooKeepers); + } + + out.writeBoolean(null != iterators); + if (null != iterators) { + out.writeInt(iterators.size()); + for (IteratorSetting iterator : iterators) { + iterator.write(out); + } + } + + out.writeBoolean(null != level); + if (null != level) { + out.writeInt(level.toInt()); + } + } + + /** + * Use {@link #getTableName} + */ + @Deprecated + public String getTable() { + return getTableName(); + } + + public String getTableName() { + return tableName; + } + + /** + * Use {@link #setTableName} + */ + @Deprecated + public void setTable(String table) { + setTableName(table); + } + + public void setTableName(String table) { + this.tableName = table; + } + + public void setTableId(String tableId) { + this.tableId = tableId; + } + + public String getTableId() { + return tableId; + } + + /** + * @see #getInstance(ClientConfiguration) + */ + @Deprecated --- End diff -- Thanks for catching this already! > BatchScanner optimization for AccumuloInputFormat > ------------------------------------------------- > > Key: ACCUMULO-3602 > URL: https://issues.apache.org/jira/browse/ACCUMULO-3602 > Project: Accumulo > Issue Type: Improvement > Components: client > Affects Versions: 1.6.1, 1.6.2 > Reporter: Eugene Cheipesh > Assignee: Eugene Cheipesh > Labels: performance > Fix For: 1.7.0 > > > Currently {{AccumuloInputFormat}} produces a split for reach {{Range}} specified in the configuration. Some table indexing schemes, for instance z-order geospacial index, produce large number of small ranges resulting in large number of splits. This is specifically a concern when using {{AccumuloInputFormat}} as a source for Spark RDD where each Split is mapped to an RDD partition. > Large number of small RDD partitions leads to poor parallism on read and high overhead on processing. A desirable alternative is to group ranges by tablet into a single split and use {{BatchScanner}} to produce the records. Grouping by tablets is useful because it represents Accumulos attempt to distributed stored records and can be influance by the user through table splits. > The grouping functionality already exists in the internal {{TabletLocator}} class. > Current proposal is to modify {{AbstractInputFormat}} such that it generates either {{RangeInputSplit}} or {{MultiRangeInputSplit}} based on a new setting in {{InputConfigurator}}. {{AccumuloInputFormat}} would then be able to inspect the type of the split and instantiate an appropriate reader. > The functinality of {{TabletLocator}} should be exposed as a public API in 1.7 as it is useful for optimizations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)