accumulo-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bell, Philip S CIV SPAWARSYSCEN-PACIFIC, 81320" <philip.b...@navy.mil>
Subject RE: Using iterators to add columns to a row
Date Thu, 18 Apr 2013 17:04:53 GMT
Just as a note, you might want to look into the whole row iterator which lets you get back
entire rows as single key value pairs that you then break down, this saves you from having
to check each entry as it comes for if you're on a new row or not.

-----Original Message-----
From: Peter Tillotson [mailto:slatemine@yahoo.co.uk] 
Sent: Thursday, April 18, 2013 9:28 AM
To: user@accumulo.apache.org
Subject: Using iterators to add columns to a row



Apologies in advanced - this is some of my first Accumulo code, but I suspect there is a much
better way to do this. 


Basically I'm trying to add an edge count column to each row of my table, so I get rows along
the following line
 - node1 {  to:count:3, to:node2:, to:node3:, to:node3:  }

But on the client side I only need write 
 - node1 {  to:node2:, to:node3:, to:node3:  }


I'd like to use the same approach to add indexes to separate column families, and combiners
to aggregate.  


Aside from the inefficiency of a BatchWriter for each mutation 
 - is this the correct approach? or
 - is there a simpler way to achieve this?


Many thanks in advance


Peter T
 
--- code compiles but not tested ---

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;

import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyIterator;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.security.thrift.AuthInfo;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.hadoop.io.Text;

public class EdgeCountIterator extends SortedKeyIterator
{
    private boolean isDisabled = false;
    private Connector connector;
    private Key currentRowStart = null;
    private String tableId;
    private int count = 0;
    
    @Override
    public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String>
options, IteratorEnvironment env) throws IOException
    {
        super.init(source, options, env);
        if(env.getIteratorScope() == IteratorScope.scan)
        {
            isDisabled = true;
            return;
        }
        
        String user = options.get("username");
        String password = options.get("password");
        String instanceId = options.get("instance");
        tableId = options.get("tableId");
        
        AuthInfo authInfo = new AuthInfo();
        authInfo.setUser(user);
        authInfo.setPassword(password.getBytes());
        authInfo.setInstanceId(instanceId);
        
        authInfo.setInstanceId(instanceId);
        
        
        try
        {
            connector  = HdfsZooInstance.getInstance().getConnector(authInfo);
        }
        catch (Exception e)
        {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void next() throws IOException
    {
        if( isDisabled )
        {
            super.next();
            return;
        }
        
        SortedKeyValueIterator<Key, Value> source = getSource();
        while(source.hasTop())
        {
            Key key = source.getTopKey();
            Value val = source.getTopValue();
            
            source.next();
        }
        doMutations(currentRowStart);
        super.next();
    }
    
    public void process( Key key, Value val )
    {
        if(currentRowStart == null)
        {
            currentRowStart = key;
        }
        else
        {
            if( !currentRowStart.getRow().equals(key.getRow()) )
            {
                doMutations(currentRowStart);
                currentRowStart = key;
                count = 0;
            }
        }
        count++;
    }

    public void doMutations( Key rowStartKey )
    {
        BatchWriter writer = null;
        try
        {
            writer = connector.createBatchWriter(tableId, 0, 0, 2);
            Mutation mutation = new Mutation( rowStartKey.getRow() );
            Text colQ = new Text("count");
            ByteBuffer b = ByteBuffer.allocate(4);
            b.putInt(count).array();
            mutation.put(rowStartKey.getColumnFamily(),
                    colQ,
                    new Value(b));
            writer.addMutation(mutation);
        }
        catch(Exception e)
        {
            throw new RuntimeException(e);
            
        }
        finally
        {
            if(writer != null)
            {
                try
                {
                    writer.close();
                }
                catch (MutationsRejectedException e)
                {
                    throw new RuntimeException(e);              
                }
            }
        }
        
    }
}




Mime
View raw message