accumulo-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Peter Tillotson <slatem...@yahoo.co.uk>
Subject Re: Using iterators to add columns to a row
Date Fri, 19 Apr 2013 08:09:16 GMT
Many thanks - this is exactly the kind of solution I was looking for, I think I prefer the
buffered row approach as I can add my columns at arbitrary points. 

I looked at the whole row iterator, bu I'm not keen on the serialization overhead.     

tnx

Peter


________________________________
 From: Billie Rinaldi <billie.rinaldi@gmail.com>
To: "user@accumulo.apache.org" <user@accumulo.apache.org>; Peter Tillotson <slatemine@yahoo.co.uk>

Sent: Thursday, 18 April 2013, 18:43
Subject: Re: Using iterators to add columns to a row
 


Writing mutations is not necessary in this case.  The iterator has the ability to change
how the current row is seen, so you don't have to create a mutation to change the row -- you
just have to create an extra key with the information you want.

One possibility is to write an iterator that passes through existing key/value pairs, counting
them, until it gets to the end of the row, at which point it creates a new key/value pair
and passes that along before continuing.  You'd have to make sure the count column name was
chosen to sort at the end of the row.  (If that is not possible you could iterate over the
entire row first, but that's more work for Accumulo.)  Below is an incomplete sketch of an
end-of-row counting approach.

  public Key getTopKey() {
    if count key is ready
      return count key
    else
      return source's top key
  }
  public void next() throws IOException {
    if count key is ready
      reset count key to null
    else {
      call next() on source iterator
      if this is the start of a new row {
        prepare count key for previous row
        reset count

      }

      increment count

    }

  }


Billie




On Thu, Apr 18, 2013 at 9:28 AM, Peter Tillotson <slatemine@yahoo.co.uk> wrote:


>
>Apologies in advanced - this is some of my first Accumulo code, but I suspect there is
a much better way to do this. 
>
>
>
>Basically I'm trying to add an edge count column to each row of my table, so I get rows
along the following line
> - node1 {  to:count:3, to:node2:, to:node3:, to:node3:  }
>
>
>But on the client side I only need write 
> - node1 {  to:node2:, to:node3:, to:node3:  }
>
>
>I'd like to use the same approach to add indexes to separate column families, and combiners
to aggregate.  
>
>
>Aside from the inefficiency of a BatchWriter for each mutation 
> - is this the correct approach? or
> - is there a simpler way to achieve this?
>
>
>Many thanks in advance
>
>
>Peter T
> 
>--- code compiles but not tested ---
>
>
>import java.io.IOException;
>import java.nio.ByteBuffer;
>import java.util.Map;
>
>import org.apache.accumulo.core.client.BatchWriter;
>import org.apache.accumulo.core.client.Connector;
>import org.apache.accumulo.core.client.MutationsRejectedException;
>import org.apache.accumulo.core.data.Key;
>import org.apache.accumulo.core.data.Mutation;
>import org.apache.accumulo.core.data.Value;
>import org.apache.accumulo.core.iterators.IteratorEnvironment;
>import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
>import org.apache.accumulo.core.iterators.SortedKeyIterator;
>import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
>import org.apache.accumulo.core.security.thrift.AuthInfo;
>import org.apache.accumulo.server.client.HdfsZooInstance;
>import org.apache.hadoop.io.Text;
>
>public class EdgeCountIterator extends SortedKeyIterator
>{
>    private boolean isDisabled = false;
>    private Connector connector;
>    private Key currentRowStart = null;
>    private String tableId;
>    private int count = 0;
>    
>    @Override
>    public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException
>    {
>        super.init(source, options, env);
>        if(env.getIteratorScope() == IteratorScope.scan)
>        {
>            isDisabled = true;
>            return;
>        }
>        
>        String user = options.get("username");
>        String password = options.get("password");
>        String instanceId = options.get("instance");
>        tableId = options.get("tableId");
>        
>        AuthInfo authInfo = new AuthInfo();
>        authInfo.setUser(user);
>        authInfo.setPassword(password.getBytes());
>        authInfo.setInstanceId(instanceId);
>        
>        authInfo.setInstanceId(instanceId);
>        
>        
>        try
>        {
>            connector  = HdfsZooInstance.getInstance().getConnector(authInfo);
>        }
>        catch (Exception e)
>        {
>            throw new RuntimeException(e);
>        }
>    }
>
>    @Override
>    public void next() throws IOException
>    {
>        if( isDisabled )
>        {
>            super.next();
>            return;
>        }
>        
>        SortedKeyValueIterator<Key, Value> source = getSource();
>        while(source.hasTop())
>        {
>            Key key = source.getTopKey();
>            Value val = source.getTopValue();
>            
>            source.next();
>        }
>        doMutations(currentRowStart);
>        super.next();
>    }
>    
>    public void process( Key key, Value val )
>    {
>        if(currentRowStart == null)
>        {
>            currentRowStart = key;
>        }
>        else
>        {
>            if( !currentRowStart.getRow().equals(key.getRow()) )
>            {
>                doMutations(currentRowStart);
>                currentRowStart = key;
>                count = 0;
>            }
>        }
>        count++;
>    }
>
>    public void doMutations( Key rowStartKey )
>    {
>        BatchWriter writer = null;
>        try
>        {
>            writer = connector.createBatchWriter(tableId, 0, 0, 2);
>            Mutation mutation = new Mutation( rowStartKey.getRow() );
>            Text colQ = new Text("count");
>            ByteBuffer b = ByteBuffer.allocate(4);
>            b.putInt(count).array();
>            mutation.put(rowStartKey.getColumnFamily(),
>                    colQ,
>                    new Value(b));
>            writer.addMutation(mutation);
>        }
>        catch(Exception e)
>        {
>            throw new RuntimeException(e);
>            
>        }
>        finally
>        {
>            if(writer != null)
>            {
>                try
>                {
>                    writer.close();
>                }
>                catch (MutationsRejectedException e)
>                {
>                    throw new RuntimeException(e);              
>                }
>            }
>        }
>        
>    }
>}
>
>
>
Mime
View raw message