accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [3/3] git commit: Merge branch '1.6.1-SNAPSHOT'
Date Tue, 01 Jul 2014 05:08:53 GMT
Merge branch '1.6.1-SNAPSHOT'


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/79bb5c1c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/79bb5c1c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/79bb5c1c

Branch: refs/heads/master
Commit: 79bb5c1c1317a379595e164d11d5e19ea080f4a9
Parents: f402493 b6dd312
Author: Josh Elser <elserj@apache.org>
Authored: Tue Jul 1 00:50:53 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Tue Jul 1 00:50:53 2014 -0400

----------------------------------------------------------------------
 .../core/client/mapreduce/RangeInputSplit.java      | 16 ++++++++++++++++
 .../core/client/mapred/RangeInputSplitTest.java     | 16 ++++++++++++++++
 .../core/client/mapreduce/RangeInputSplitTest.java  | 16 ++++++++++++++++
 3 files changed, 48 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/79bb5c1c/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
----------------------------------------------------------------------
diff --cc mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
index d2f4724,0000000..3a2f721
mode 100644,000000..100644
--- a/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
+++ b/mapreduce/src/main/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplit.java
@@@ -1,490 -1,0 +1,506 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.mapreduce;
 +
 +import java.io.DataInput;
 +import java.io.DataOutput;
 +import java.io.IOException;
 +import java.math.BigInteger;
 +import java.nio.charset.StandardCharsets;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.client.ClientConfiguration;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.ZooKeeperInstance;
 +import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator;
 +import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase.TokenSource;
 +import org.apache.accumulo.core.client.mock.MockInstance;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.AuthenticationTokenSerializer;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.Base64;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.Writable;
 +import org.apache.hadoop.mapreduce.InputSplit;
 +import org.apache.log4j.Level;
 +
 +/**
 + * The Class RangeInputSplit. Encapsulates an Accumulo range for use in Map Reduce jobs.
 + */
 +public class RangeInputSplit extends InputSplit implements Writable {
 +  private Range range;
 +  private String[] locations;
 +  private String tableId, tableName, instanceName, zooKeepers, principal;
 +  private TokenSource tokenSource;
 +  private String tokenFile;
 +  private AuthenticationToken token;
 +  private Boolean offline, mockInstance, isolatedScan, localIterators;
 +  private Authorizations auths;
 +  private Set<Pair<Text,Text>> fetchedColumns;
 +  private List<IteratorSetting> iterators;
 +  private Level level;
 +
 +  public RangeInputSplit() {
 +    range = new Range();
 +    locations = new String[0];
 +    tableName = "";
 +    tableId = "";
 +  }
 +
 +  public RangeInputSplit(RangeInputSplit split) throws IOException {
 +    this.setRange(split.getRange());
 +    this.setLocations(split.getLocations());
 +    this.setTableName(split.getTableName());
 +    this.setTableId(split.getTableId());
 +  }
 +
 +  protected RangeInputSplit(String table, String tableId, Range range, String[] locations)
{
 +    this.range = range;
 +    setLocations(locations);
 +    this.tableName = table;
 +    this.tableId = tableId;
 +  }
 +
 +  public Range getRange() {
 +    return range;
 +  }
 +
 +  private static byte[] extractBytes(ByteSequence seq, int numBytes) {
 +    byte[] bytes = new byte[numBytes + 1];
 +    bytes[0] = 0;
 +    for (int i = 0; i < numBytes; i++) {
 +      if (i >= seq.length())
 +        bytes[i + 1] = 0;
 +      else
 +        bytes[i + 1] = seq.byteAt(i);
 +    }
 +    return bytes;
 +  }
 +
 +  public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position)
{
 +    int maxDepth = Math.min(Math.max(end.length(), start.length()), position.length());
 +    BigInteger startBI = new BigInteger(extractBytes(start, maxDepth));
 +    BigInteger endBI = new BigInteger(extractBytes(end, maxDepth));
 +    BigInteger positionBI = new BigInteger(extractBytes(position, maxDepth));
 +    return (float) (positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue());
 +  }
 +
 +  public float getProgress(Key currentKey) {
 +    if (currentKey == null)
 +      return 0f;
 +    if (range.getStartKey() != null && range.getEndKey() != null) {
 +      if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW) != 0) {
 +        // just look at the row progress
 +        return getProgress(range.getStartKey().getRowData(), range.getEndKey().getRowData(),
currentKey.getRowData());
 +      } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM)
!= 0) {
 +        // just look at the column family progress
 +        return getProgress(range.getStartKey().getColumnFamilyData(), range.getEndKey().getColumnFamilyData(),
currentKey.getColumnFamilyData());
 +      } else if (range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL)
!= 0) {
 +        // just look at the column qualifier progress
 +        return getProgress(range.getStartKey().getColumnQualifierData(), range.getEndKey().getColumnQualifierData(),
currentKey.getColumnQualifierData());
 +      }
 +    }
 +    // if we can't figure it out, then claim no progress
 +    return 0f;
 +  }
 +
 +  /**
 +   * This implementation of length is only an estimate, it does not provide exact values.
Do not have your code rely on this return value.
 +   */
 +  @Override
 +  public long getLength() throws IOException {
 +    Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] {Byte.MIN_VALUE}) :
range.getStartKey().getRow();
 +    Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] {Byte.MAX_VALUE}) : range.getEndKey().getRow();
 +    int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength()));
 +    long diff = 0;
 +
 +    byte[] start = startRow.getBytes();
 +    byte[] stop = stopRow.getBytes();
 +    for (int i = 0; i < maxCommon; ++i) {
 +      diff |= 0xff & (start[i] ^ stop[i]);
 +      diff <<= Byte.SIZE;
 +    }
 +
 +    if (startRow.getLength() != stopRow.getLength())
 +      diff |= 0xff;
 +
 +    return diff + 1;
 +  }
 +
 +  @Override
 +  public String[] getLocations() throws IOException {
 +    return Arrays.copyOf(locations, locations.length);
 +  }
 +
 +  @Override
 +  public void readFields(DataInput in) throws IOException {
 +    range.readFields(in);
 +    tableName = in.readUTF();
 +    tableId = in.readUTF();
 +    int numLocs = in.readInt();
 +    locations = new String[numLocs];
 +    for (int i = 0; i < numLocs; ++i)
 +      locations[i] = in.readUTF();
 +
 +    if (in.readBoolean()) {
 +      isolatedScan = in.readBoolean();
 +    }
 +
 +    if (in.readBoolean()) {
 +      offline = in.readBoolean();
 +    }
 +
 +    if (in.readBoolean()) {
 +      localIterators = in.readBoolean();
 +    }
 +
 +    if (in.readBoolean()) {
 +      mockInstance = in.readBoolean();
 +    }
 +
 +    if (in.readBoolean()) {
 +      int numColumns = in.readInt();
 +      List<String> columns = new ArrayList<String>(numColumns);
 +      for (int i = 0; i < numColumns; i++) {
 +        columns.add(in.readUTF());
 +      }
 +
 +      fetchedColumns = InputConfigurator.deserializeFetchedColumns(columns);
 +    }
 +
 +    if (in.readBoolean()) {
 +      String strAuths = in.readUTF();
 +      auths = new Authorizations(strAuths.getBytes(StandardCharsets.UTF_8));
 +    }
 +
 +    if (in.readBoolean()) {
 +      principal = in.readUTF();
 +    }
 +
 +    if (in.readBoolean()) {
 +      int ordinal = in.readInt();
 +      this.tokenSource = TokenSource.values()[ordinal];
 +
 +      switch (this.tokenSource) {
 +        case INLINE:
 +          String tokenClass = in.readUTF();
 +          byte[] base64TokenBytes = in.readUTF().getBytes(StandardCharsets.UTF_8);
 +          byte[] tokenBytes = Base64.decodeBase64(base64TokenBytes);
 +
 +          this.token = AuthenticationTokenSerializer.deserialize(tokenClass, tokenBytes);
 +          break;
 +
 +        case FILE:
 +          this.tokenFile = in.readUTF();
 +
 +          break;
 +        default:
 +          throw new IOException("Cannot parse unknown TokenSource ordinal");
 +      }
 +    }
 +
 +    if (in.readBoolean()) {
 +      instanceName = in.readUTF();
 +    }
 +
 +    if (in.readBoolean()) {
 +      zooKeepers = in.readUTF();
 +    }
 +
 +    if (in.readBoolean()) {
++      int numIterators = in.readInt();
++      iterators = new ArrayList<IteratorSetting>(numIterators);
++      for (int i = 0; i < numIterators; i++) {
++        iterators.add(new IteratorSetting(in));
++      }
++    }
++
++    if (in.readBoolean()) {
 +      level = Level.toLevel(in.readInt());
 +    }
 +  }
 +
 +  @Override
 +  public void write(DataOutput out) throws IOException {
 +    range.write(out);
 +    out.writeUTF(tableName);
 +    out.writeUTF(tableId);
 +    out.writeInt(locations.length);
 +    for (int i = 0; i < locations.length; ++i)
 +      out.writeUTF(locations[i]);
 +
 +    out.writeBoolean(null != isolatedScan);
 +    if (null != isolatedScan) {
 +      out.writeBoolean(isolatedScan);
 +    }
 +
 +    out.writeBoolean(null != offline);
 +    if (null != offline) {
 +      out.writeBoolean(offline);
 +    }
 +
 +    out.writeBoolean(null != localIterators);
 +    if (null != localIterators) {
 +      out.writeBoolean(localIterators);
 +    }
 +
 +    out.writeBoolean(null != mockInstance);
 +    if (null != mockInstance) {
 +      out.writeBoolean(mockInstance);
 +    }
 +
 +    out.writeBoolean(null != fetchedColumns);
 +    if (null != fetchedColumns) {
 +      String[] cols = InputConfigurator.serializeColumns(fetchedColumns);
 +      out.writeInt(cols.length);
 +      for (String col : cols) {
 +        out.writeUTF(col);
 +      }
 +    }
 +
 +    out.writeBoolean(null != auths);
 +    if (null != auths) {
 +      out.writeUTF(auths.serialize());
 +    }
 +
 +    out.writeBoolean(null != principal);
 +    if (null != principal) {
 +      out.writeUTF(principal);
 +    }
 +
 +    out.writeBoolean(null != tokenSource);
 +    if (null != tokenSource) {
 +      out.writeInt(tokenSource.ordinal());
 +
 +      if (null != token && null != tokenFile) {
 +        throw new IOException("Cannot use both inline AuthenticationToken and file-based
AuthenticationToken");
 +      } else if (null != token) {
 +        out.writeUTF(token.getClass().getCanonicalName());
 +        out.writeUTF(Base64.encodeBase64String(AuthenticationTokenSerializer.serialize(token)));
 +      } else {
 +        out.writeUTF(tokenFile);
 +      }
 +    }
 +
 +    out.writeBoolean(null != instanceName);
 +    if (null != instanceName) {
 +      out.writeUTF(instanceName);
 +    }
 +
 +    out.writeBoolean(null != zooKeepers);
 +    if (null != zooKeepers) {
 +      out.writeUTF(zooKeepers);
 +    }
 +
++    out.writeBoolean(null != iterators);
++    if (null != iterators) {
++      out.writeInt(iterators.size());
++      for (IteratorSetting iterator : iterators) {
++        iterator.write(out);
++      }
++    }
++
 +    out.writeBoolean(null != level);
 +    if (null != level) {
 +      out.writeInt(level.toInt());
 +    }
 +  }
 +
 +  @Override
 +  public String toString() {
 +    StringBuilder sb = new StringBuilder(256);
 +    sb.append("Range: ").append(range);
 +    sb.append(" Locations: ").append(Arrays.asList(locations));
 +    sb.append(" Table: ").append(tableName);
 +    sb.append(" TableID: ").append(tableId);
 +    sb.append(" InstanceName: ").append(instanceName);
 +    sb.append(" zooKeepers: ").append(zooKeepers);
 +    sb.append(" principal: ").append(principal);
 +    sb.append(" tokenSource: ").append(tokenSource);
 +    sb.append(" authenticationToken: ").append(token);
 +    sb.append(" authenticationTokenFile: ").append(tokenFile);
 +    sb.append(" Authorizations: ").append(auths);
 +    sb.append(" offlineScan: ").append(offline);
 +    sb.append(" mockInstance: ").append(mockInstance);
 +    sb.append(" isolatedScan: ").append(isolatedScan);
 +    sb.append(" localIterators: ").append(localIterators);
 +    sb.append(" fetchColumns: ").append(fetchedColumns);
 +    sb.append(" iterators: ").append(iterators);
 +    sb.append(" logLevel: ").append(level);
 +    return sb.toString();
 +  }
 +
 +  public String getTableName() {
 +    return tableName;
 +  }
 +
 +  public void setTableName(String table) {
 +    this.tableName = table;
 +  }
 +
 +  public void setTableId(String tableId) {
 +    this.tableId = tableId;
 +  }
 +
 +  public String getTableId() {
 +    return tableId;
 +  }
 +
 +  public Instance getInstance() {
 +    if (null == instanceName) {
 +      return null;
 +    }
 +
 +    if (isMockInstance()) {
 +      return new MockInstance(getInstanceName());
 +    }
 +
 +    if (null == zooKeepers) {
 +      return null;
 +    }
 +
 +    return new ZooKeeperInstance(ClientConfiguration.loadDefault().withInstance(getInstanceName()).withZkHosts(getZooKeepers()));
 +  }
 +
 +  public String getInstanceName() {
 +    return instanceName;
 +  }
 +
 +  public void setInstanceName(String instanceName) {
 +    this.instanceName = instanceName;
 +  }
 +
 +  public String getZooKeepers() {
 +    return zooKeepers;
 +  }
 +
 +  public void setZooKeepers(String zooKeepers) {
 +    this.zooKeepers = zooKeepers;
 +  }
 +
 +  public String getPrincipal() {
 +    return principal;
 +  }
 +
 +  public void setPrincipal(String principal) {
 +    this.principal = principal;
 +  }
 +
 +  public AuthenticationToken getToken() {
 +    return token;
 +  }
 +
 +  public void setToken(AuthenticationToken token) {
 +    this.tokenSource = TokenSource.INLINE;
 +    this.token = token;
 +  }
 +
 +  public void setToken(String tokenFile) {
 +    this.tokenSource = TokenSource.FILE;
 +    this.tokenFile = tokenFile;
 +  }
 +
 +  public Boolean isOffline() {
 +    return offline;
 +  }
 +
 +  public void setOffline(Boolean offline) {
 +    this.offline = offline;
 +  }
 +
 +  public void setLocations(String[] locations) {
 +    this.locations = Arrays.copyOf(locations, locations.length);
 +  }
 +
 +  public Boolean isMockInstance() {
 +    return mockInstance;
 +  }
 +
 +  public void setMockInstance(Boolean mockInstance) {
 +    this.mockInstance = mockInstance;
 +  }
 +
 +  public Boolean isIsolatedScan() {
 +    return isolatedScan;
 +  }
 +
 +  public void setIsolatedScan(Boolean isolatedScan) {
 +    this.isolatedScan = isolatedScan;
 +  }
 +
 +  public Authorizations getAuths() {
 +    return auths;
 +  }
 +
 +  public void setAuths(Authorizations auths) {
 +    this.auths = auths;
 +  }
 +
 +  public void setRange(Range range) {
 +    this.range = range;
 +  }
 +
 +  public Boolean usesLocalIterators() {
 +    return localIterators;
 +  }
 +
 +  public void setUsesLocalIterators(Boolean localIterators) {
 +    this.localIterators = localIterators;
 +  }
 +
 +  public Set<Pair<Text,Text>> getFetchedColumns() {
 +    return fetchedColumns;
 +  }
 +
 +  public void setFetchedColumns(Collection<Pair<Text,Text>> fetchedColumns)
{
 +    this.fetchedColumns = new HashSet<Pair<Text,Text>>();
 +    for (Pair<Text,Text> columns : fetchedColumns) {
 +      this.fetchedColumns.add(columns);
 +    }
 +  }
 +
 +  public void setFetchedColumns(Set<Pair<Text,Text>> fetchedColumns) {
 +    this.fetchedColumns = fetchedColumns;
 +  }
 +
 +  public List<IteratorSetting> getIterators() {
 +    return iterators;
 +  }
 +
 +  public void setIterators(List<IteratorSetting> iterators) {
 +    this.iterators = iterators;
 +  }
 +
 +  public Level getLogLevel() {
 +    return level;
 +  }
 +
 +  public void setLogLevel(Level level) {
 +    this.level = level;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79bb5c1c/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/RangeInputSplitTest.java
----------------------------------------------------------------------
diff --cc mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/RangeInputSplitTest.java
index 3f72164,0000000..88f5527
mode 100644,000000..100644
--- a/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/RangeInputSplitTest.java
+++ b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapred/RangeInputSplitTest.java
@@@ -1,105 -1,0 +1,121 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.mapred;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
++import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.HashSet;
 +import java.util.Set;
 +
++import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
++import org.apache.accumulo.core.iterators.user.SummingCombiner;
++import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Level;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +public class RangeInputSplitTest {
 +
 +  @Test
 +  public void testSimpleWritable() throws IOException {
 +    RangeInputSplit split = new RangeInputSplit("table", "1", new Range(new Key("a"), new
Key("b")), new String[]{"localhost"});
 +    
 +    ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +    DataOutputStream dos = new DataOutputStream(baos);
 +    split.write(dos);
 +    
 +    RangeInputSplit newSplit = new RangeInputSplit();
 +    
 +    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
 +    DataInputStream dis = new DataInputStream(bais);
 +    newSplit.readFields(dis);
 +    
 +    Assert.assertEquals(split.getRange(), newSplit.getRange());
 +    Assert.assertTrue(Arrays.equals(split.getLocations(), newSplit.getLocations()));
 +  }
 +
 +  @Test
 +  public void testAllFieldsWritable() throws IOException {
 +    RangeInputSplit split = new RangeInputSplit("table", "1", new Range(new Key("a"), new
Key("b")), new String[]{"localhost"});
 +    
 +    Set<Pair<Text,Text>> fetchedColumns = new HashSet<Pair<Text,Text>>();
 +    
 +    fetchedColumns.add(new Pair<Text,Text>(new Text("colf1"), new Text("colq1")));
 +    fetchedColumns.add(new Pair<Text,Text>(new Text("colf2"), new Text("colq2")));
++
++    // Fake some iterators
++    ArrayList<IteratorSetting> iterators = new ArrayList<IteratorSetting>();
++    IteratorSetting setting = new IteratorSetting(50, SummingCombiner.class);
++    setting.addOption("foo", "bar");
++    iterators.add(setting);
++
++    setting = new IteratorSetting(100, WholeRowIterator.class);
++    setting.addOption("bar", "foo");
++    iterators.add(setting);
 +    
 +    split.setAuths(new Authorizations("foo"));
 +    split.setOffline(true);
 +    split.setIsolatedScan(true);
 +    split.setUsesLocalIterators(true);
 +    split.setFetchedColumns(fetchedColumns);
 +    split.setToken(new PasswordToken("password"));
 +    split.setPrincipal("root");
 +    split.setInstanceName("instance");
 +    split.setMockInstance(true);
 +    split.setZooKeepers("localhost");
++    split.setIterators(iterators);
 +    split.setLogLevel(Level.WARN);
 +    
 +    ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +    DataOutputStream dos = new DataOutputStream(baos);
 +    split.write(dos);
 +    
 +    RangeInputSplit newSplit = new RangeInputSplit();
 +    
 +    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
 +    DataInputStream dis = new DataInputStream(bais);
 +    newSplit.readFields(dis);
 +    
 +    Assert.assertEquals(split.getRange(), newSplit.getRange());
 +    Assert.assertArrayEquals(split.getLocations(), newSplit.getLocations());
 +    
 +    Assert.assertEquals(split.getAuths(), newSplit.getAuths());
 +    Assert.assertEquals(split.isOffline(), newSplit.isOffline());
 +    Assert.assertEquals(split.isIsolatedScan(), newSplit.isOffline());
 +    Assert.assertEquals(split.usesLocalIterators(), newSplit.usesLocalIterators());
 +    Assert.assertEquals(split.getFetchedColumns(), newSplit.getFetchedColumns());
 +    Assert.assertEquals(split.getToken(), newSplit.getToken());
 +    Assert.assertEquals(split.getPrincipal(), newSplit.getPrincipal());
 +    Assert.assertEquals(split.getInstanceName(), newSplit.getInstanceName());
 +    Assert.assertEquals(split.isMockInstance(), newSplit.isMockInstance());
 +    Assert.assertEquals(split.getZooKeepers(), newSplit.getZooKeepers());
++    Assert.assertEquals(split.getIterators(), newSplit.getIterators());
 +    Assert.assertEquals(split.getLogLevel(), newSplit.getLogLevel());
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/79bb5c1c/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java
----------------------------------------------------------------------
diff --cc mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java
index 80e8c28,0000000..295aa5d
mode 100644,000000..100644
--- a/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java
+++ b/mapreduce/src/test/java/org/apache/accumulo/core/client/mapreduce/RangeInputSplitTest.java
@@@ -1,107 -1,0 +1,123 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.mapreduce;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
++import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.HashSet;
 +import java.util.Set;
 +
++import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
++import org.apache.accumulo.core.iterators.user.SummingCombiner;
++import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Level;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +public class RangeInputSplitTest {
 +
 +  @Test
 +  public void testSimpleWritable() throws IOException {
 +    RangeInputSplit split = new RangeInputSplit("table", "1", new Range(new Key("a"), new
Key("b")), new String[]{"localhost"});
 +    
 +    ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +    DataOutputStream dos = new DataOutputStream(baos);
 +    split.write(dos);
 +    
 +    RangeInputSplit newSplit = new RangeInputSplit();
 +    
 +    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
 +    DataInputStream dis = new DataInputStream(bais);
 +    newSplit.readFields(dis);
 +    
 +    Assert.assertEquals(split.getTableName(), newSplit.getTableName());
 +    Assert.assertEquals(split.getTableId(), newSplit.getTableId());
 +    Assert.assertEquals(split.getRange(), newSplit.getRange());
 +    Assert.assertTrue(Arrays.equals(split.getLocations(), newSplit.getLocations()));
 +  }
 +
 +  @Test
 +  public void testAllFieldsWritable() throws IOException {
 +    RangeInputSplit split = new RangeInputSplit("table", "1", new Range(new Key("a"), new
Key("b")), new String[]{"localhost"});
 +    
 +    Set<Pair<Text,Text>> fetchedColumns = new HashSet<Pair<Text,Text>>();
 +    
 +    fetchedColumns.add(new Pair<Text,Text>(new Text("colf1"), new Text("colq1")));
 +    fetchedColumns.add(new Pair<Text,Text>(new Text("colf2"), new Text("colq2")));
++
++    // Fake some iterators
++    ArrayList<IteratorSetting> iterators = new ArrayList<IteratorSetting>();
++    IteratorSetting setting = new IteratorSetting(50, SummingCombiner.class);
++    setting.addOption("foo", "bar");
++    iterators.add(setting);
++
++    setting = new IteratorSetting(100, WholeRowIterator.class);
++    setting.addOption("bar", "foo");
++    iterators.add(setting);
 +    
 +    split.setAuths(new Authorizations("foo"));
 +    split.setOffline(true);
 +    split.setIsolatedScan(true);
 +    split.setUsesLocalIterators(true);
 +    split.setFetchedColumns(fetchedColumns);
 +    split.setToken(new PasswordToken("password"));
 +    split.setPrincipal("root");
 +    split.setInstanceName("instance");
 +    split.setMockInstance(true);
 +    split.setZooKeepers("localhost");
++    split.setIterators(iterators);
 +    split.setLogLevel(Level.WARN);
 +    
 +    ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +    DataOutputStream dos = new DataOutputStream(baos);
 +    split.write(dos);
 +    
 +    RangeInputSplit newSplit = new RangeInputSplit();
 +    
 +    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
 +    DataInputStream dis = new DataInputStream(bais);
 +    newSplit.readFields(dis);
 +    
 +    Assert.assertEquals(split.getRange(), newSplit.getRange());
 +    Assert.assertArrayEquals(split.getLocations(), newSplit.getLocations());
 +    
 +    Assert.assertEquals(split.getAuths(), newSplit.getAuths());
 +    Assert.assertEquals(split.isOffline(), newSplit.isOffline());
 +    Assert.assertEquals(split.isIsolatedScan(), newSplit.isOffline());
 +    Assert.assertEquals(split.usesLocalIterators(), newSplit.usesLocalIterators());
 +    Assert.assertEquals(split.getFetchedColumns(), newSplit.getFetchedColumns());
 +    Assert.assertEquals(split.getToken(), newSplit.getToken());
 +    Assert.assertEquals(split.getPrincipal(), newSplit.getPrincipal());
 +    Assert.assertEquals(split.getInstanceName(), newSplit.getInstanceName());
 +    Assert.assertEquals(split.isMockInstance(), newSplit.isMockInstance());
 +    Assert.assertEquals(split.getZooKeepers(), newSplit.getZooKeepers());
++    Assert.assertEquals(split.getIterators(), newSplit.getIterators());
 +    Assert.assertEquals(split.getLogLevel(), newSplit.getLogLevel());
 +  }
 +  
 +}


Mime
View raw message