accumulo-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Edward Seidl (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (ACCUMULO-3967) bulk import loses records when loading pre-split table
Date Fri, 21 Aug 2015 07:21:45 GMT

    [ https://issues.apache.org/jira/browse/ACCUMULO-3967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14706333#comment-14706333
] 

Edward Seidl commented on ACCUMULO-3967:
----------------------------------------

Sorry, couldn't figure out if I could upload this as a file rather than inline.  It's kind
of kludgey, but this is the test code I whipped up that demonstrates the problem on my cluster.
 I'll try on a second cluster tomorrow.

{noformat}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.PrintStream;
import java.security.MessageDigest;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.TreeSet;

import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/*
 * need to create namespace loadtest, and tables H_Test and T_Test in that
 * namespace.  H_Test has rowid's of the form HASH:Timestamp where HASH is
 * an md5 of a random number, and Timestamp is just a random hour from 0-23.
 * T_Test has the opposite key, Timestamp:HASH.  The RFiles for H_Test are
 * partitioned by range (HACK: I was lazy, so I just use 16 partitions in this
 * code...run with 16 reducers).  The RFiles for the T_Test are partitioned 
 * by hash, so all 24 leading hours will appear in each.
 *
 * You'll have to create an output dir containing the failures directory.
 */

public class TestBulkLoad extends Configured implements Tool {
    private static final Log LOG = LogFactory.getLog(TestBulkLoad.class);
    
    private static final String NREC = "loadtest.nrec";
    private static final String HTABLE = "loadtest.H_Test";
    private static final String TTABLE = "loadtest.T_Test";
    private static final Text CF = new Text("Family");
    private static final Text CQ = new Text("Qualifier");
    private static final Value NULL_VALUE = new Value(new byte[0]);

    private static final Text TROW = new Text();
    
    public static class HMapClass extends Mapper<String, String, Key, Value>
    {
        @Override
        public void map(String key, String value, Context output)
            throws IOException, InterruptedException
        {
            Key outKey = new Key(new Text(key + ":" + value), CF, CQ);
            output.write(outKey, NULL_VALUE);
        }
    }
    
    // partition by range.  always 16 reducers
    public static class HPartitioner extends Partitioner<Key, Value> {
        @Override
        public int getPartition(Key inKey, Value inValue,
                                int inNumPartitions)
        {
            String rowid = inKey.getRow(TROW).toString();
            char c = rowid.charAt(0);
            int cnum = (c >= '0' && c <= '9') ? ((int) c) - 0x30 : ((int)c)
- 55;
            
            return cnum;
        }
    }

    public static class TMapClass extends Mapper<String, String, Key, Value>
    {
        @Override
        public void map(String key, String value, Context output)
            throws IOException, InterruptedException
        {
            Key outKey = new Key(new Text(value + ":" + key), CF, CQ);
            output.write(outKey, NULL_VALUE);
        }
    }
    
    // partition by hash
    public static class TPartitioner extends Partitioner<Key, Value> {
        @Override
        public int getPartition(Key inKey, Value inValue,
                                int inNumPartitions)
        {
            String rowid = inKey.getRow(TROW).toString();
            int ret = Math.abs(rowid.hashCode())%inNumPartitions;
            if (ret < 0)
                return -ret;
            return ret;
        }
    }
    
    public static class ReduceClass extends Reducer<Key, Value, Key, Value>
    {
        public void reduce(Key key, Iterable<Value> values, Context output)
            throws IOException, InterruptedException
        {
            for (Value value : values) 
                output.write(key, value);
        }
    }
    
    private int printUsage() {
        System.out.println("accumulo "+this.getClass().getName()
                           +" <instanceName> <zooKeepers> <username> <password>
<workDir> H|T [<num_records>]");
        return 0;
    }
    
    @Override
    public int run(String[] inArgs) throws Exception {
        if (inArgs.length > 0 && inArgs.length < 5) {
            System.out.println("ERROR: Wrong number of parameters: " +
                inArgs.length + " instead of 6.");
            return printUsage();
        }
        
        Configuration conf = getConf();
        PrintStream out = null;
        try {
            conf.set("mapreduce.job.jvm.numtasks", "-1");
            conf.set("mapreduce.map.speculative", "false");
            conf.set("mapreduce.map.output.compress", "false");
            conf.set("mapred.max.split.size", "8388608");
            conf.set("table.file.compress.type", "gz");
            conf.set(Property.TABLE_FILE_TYPE.toString(), "rf");
            
            String cbInstance = null;
            String zkServers = null;
            String user = null;
            byte[] pass = null;
            String workDir = null;
            String type = null;
            long nrec = 10000000;
            
            String tabName = null;
            TreeSet<Text> partitionKeys = new TreeSet<Text>();
            
            // override config file
            if (inArgs.length > 4) {
                cbInstance = inArgs[0];
                zkServers = inArgs[1];
                user = inArgs[2];
                pass = inArgs[3].getBytes();
                workDir = inArgs[4];
                type = inArgs[5];
                
                if (inArgs.length > 6) 
                    nrec = Long.parseLong(inArgs[6]);
            }
            
            conf.set(NREC, "" + nrec);
            
            Instance instance = new ZooKeeperInstance(cbInstance, zkServers);
            Connector connector = instance.getConnector(user, pass);
            Job job = null;
            
            if (type.equals("H")) {
                tabName = HTABLE;
                
                partitionKeys.add(new Text("1"));
                partitionKeys.add(new Text("2"));
                partitionKeys.add(new Text("3"));
                partitionKeys.add(new Text("4"));
                partitionKeys.add(new Text("5"));
                partitionKeys.add(new Text("6"));
                partitionKeys.add(new Text("7"));
                partitionKeys.add(new Text("8"));
                partitionKeys.add(new Text("9"));
                partitionKeys.add(new Text("A"));
                partitionKeys.add(new Text("B"));
                partitionKeys.add(new Text("C"));
                partitionKeys.add(new Text("D"));
                partitionKeys.add(new Text("E"));
                partitionKeys.add(new Text("F"));
                
                job = new Job(conf, "HLoadTest");

                job.setPartitionerClass(HPartitioner.class);
                job.setMapperClass(HMapClass.class);
            }
            else {
                tabName = TTABLE;
                
                partitionKeys.add(new Text("01"));
                partitionKeys.add(new Text("02"));
                partitionKeys.add(new Text("03"));
                partitionKeys.add(new Text("04"));
                partitionKeys.add(new Text("05"));
                partitionKeys.add(new Text("06"));
                partitionKeys.add(new Text("07"));
                partitionKeys.add(new Text("08"));
                partitionKeys.add(new Text("09"));
                partitionKeys.add(new Text("10"));
                partitionKeys.add(new Text("11"));
                partitionKeys.add(new Text("12"));
                partitionKeys.add(new Text("13"));
                partitionKeys.add(new Text("14"));
                partitionKeys.add(new Text("15"));
                partitionKeys.add(new Text("16"));
                partitionKeys.add(new Text("17"));
                partitionKeys.add(new Text("18"));
                partitionKeys.add(new Text("19"));
                partitionKeys.add(new Text("20"));
                partitionKeys.add(new Text("21"));
                partitionKeys.add(new Text("22"));
                partitionKeys.add(new Text("23"));
            
                job = new Job(conf, "TLoadTest");

                job.setPartitionerClass(TPartitioner.class);
                job.setMapperClass(TMapClass.class);
            }
            
            connector.tableOperations().deleteRows(tabName, null, null);
            connector.tableOperations().addSplits(tabName, partitionKeys);
            
            job.setJarByClass(this.getClass());
            job.setMapOutputKeyClass(Key.class);
            job.setMapOutputValueClass(Value.class);
            job.setReducerClass(ReduceClass.class);
            
            job.setInputFormatClass(FakeInputFormat.class);
            job.setOutputFormatClass(AccumuloFileOutputFormat.class);

            AccumuloFileOutputFormat.setOutputPath(job, new Path(workDir + "/files"));
            AccumuloFileOutputFormat.setCompressOutput(job, true);
            AccumuloFileOutputFormat.setCompressionType(job, "gz");
   
            job.waitForCompletion(true);
            if (job.isSuccessful()) {
                connector.tableOperations().importDirectory(tabName,
                                                            workDir+"/files",
                                                            workDir+"/failures",
                                                            false);
            }
            
            return job.isSuccessful() ? 1 : 0;
            
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            if (out != null)
                out.close();
        }
    }
 
    public static void main(String[] args)
        throws Exception
    {
        int res = ToolRunner.run(CachedConfiguration.getInstance(), new TestBulkLoad(), args);
        System.exit(res);
    }
    
    public static class TestInputSplit extends InputSplit implements Writable {
        
        private long nrec;
        
        public TestInputSplit() { nrec = 0; }
        
        public TestInputSplit(long inNRec) {
            nrec = inNRec;
        }

        public long getNRec() { return nrec; }
        
        @Override
        public long getLength() throws IOException, InterruptedException {
            return 1;
        }

        @Override
        public String[] getLocations() throws IOException, InterruptedException
        {
            return new String[] {};
        }
        
        @Override
        public void write(DataOutput inOut) throws IOException {
            inOut.writeLong(nrec);
        }

        @Override
        public void readFields(DataInput inIn) throws IOException {
            nrec = inIn.readLong();
        }
    }

    public static class FakeReader extends RecordReader<String, String> {
        private static MessageDigest sMD5;
        
        static {
            try {
                sMD5 = MessageDigest.getInstance("MD5");
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        
        private static final char bytes[] = {
            '0', '1', '2', '3', '4', '5', '6', '7',
            '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'
        };
        
        public static void doMD5(StringBuffer inImage, char[] outHash) {
            byte[] dig = null;

            synchronized(sMD5) {
                dig = sMD5.digest(inImage.toString().getBytes());
            }
            
            for (int i=0; i < dig.length; i++) {
                int hi = (dig[i]&0xf0) >> 4;
                int lo = (dig[i]&0xf);
                outHash[i*2] = bytes[hi];
                outHash[i*2+1] = bytes[lo];
            }
        }
        
        private long nrec = 1;
        private long currec = 0;
        private Random rand = null;
        private char[] hash = new char[32];
        private StringBuffer buf = new StringBuffer();
        
        @Override
        public void initialize(InputSplit inSplit, TaskAttemptContext inContext)
            throws IOException, InterruptedException
        {
            nrec = ((TestInputSplit)inSplit).getNRec();
            currec = 0;
            rand = new Random(System.currentTimeMillis());
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            return currec++ < nrec;
        }

        @Override
        public String getCurrentKey() throws IOException, InterruptedException {
            buf.setLength(0);
            buf.append(rand.nextLong());
            
            doMD5(buf, hash);
            return new String(hash);
        }

        @Override
        public String getCurrentValue() throws IOException,
            InterruptedException
        {
            // random number from 0-23
            int hour = rand.nextInt(24);
            return (hour < 10) ? "0" + hour : "" + hour;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return (float)currec/(float)nrec;
        }

        @Override
        public void close() throws IOException {
        }
        
    }
    
    public static class FakeInputFormat extends InputFormat<String, String> {
        
        @Override
        public RecordReader<String, String> createRecordReader(InputSplit inArg0,
                                                               TaskAttemptContext inArg1)
            throws IOException, InterruptedException
        {
            return new FakeReader();
        }

        @Override
        public List<InputSplit> getSplits(JobContext inCtx)
            throws IOException, InterruptedException
        {
            long nrec = 1000000;
            String nrecStr = inCtx.getConfiguration().get(NREC);
            if (nrecStr != null)
                nrec = Long.parseLong(nrecStr);
            
            return Collections.singletonList((InputSplit)new TestInputSplit(nrec));
        }
    }
}
{noformat}

> bulk import loses records when loading pre-split table
> ------------------------------------------------------
>
>                 Key: ACCUMULO-3967
>                 URL: https://issues.apache.org/jira/browse/ACCUMULO-3967
>             Project: Accumulo
>          Issue Type: Bug
>          Components: client, tserver
>    Affects Versions: 1.7.0
>         Environment: generic hadoop 2.6.0, zookeeper 3.4.6 on redhat 6.7
> 7 node cluster
>            Reporter: Edward Seidl
>            Priority: Blocker
>             Fix For: 1.7.1
>
>
> I just noticed that some records I'm loading via importDirectory go missing.  After a
lot of digging around trying to reproduce the problem, I discovered that it occurs most frequently
when loading a table that I have just recently added splits to.  In the tserver logs I'll
see messages like 
> 20 16:25:36,805 [client.BulkImporter] INFO : Could not assign 1 map files to tablet 1xw;18;17
because : Not Serving Tablet .  Will retry ...
>  
> or
> 20 16:25:44,826 [tserver.TabletServer] INFO : files [hdfs://xxxx:54310/accumulo/tables/1xw/b-00jnmxe/I00jnmxq.rf]
not imported to 1xw;03;02: tablet 1xw;03;02 is closed
> these appear after messages about unloading tablets...it seems that tablets are being
redistributed at the same time as the bulk import is occuring.
> Steps to reproduce
> 1) I run a mapreduce job that produces random data in rfiles
> 2) copy the rfiles to an import directory
> 3) create table or deleterows -f
> 4) addsplits
> 5) importdirectory
> I have also performed the above completely within the mapreduce job, with similar results.
 The difference with the mapreduce job is that the time between adding splits and the import
directory is minutes rather than seconds.
> my current test creates 1000000 records, and after the importdirectory returns a count
of rows will be anywhere from ~800000 to 1000000.
> With my original workflow, I found that re-importing the same set of rfiles three times
would eventually get all rows loaded.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message