Return-Path: X-Original-To: apmail-accumulo-notifications-archive@minotaur.apache.org Delivered-To: apmail-accumulo-notifications-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 31A4118F8F for ; Fri, 21 Aug 2015 19:18:46 +0000 (UTC) Received: (qmail 73700 invoked by uid 500); 21 Aug 2015 19:18:46 -0000 Delivered-To: apmail-accumulo-notifications-archive@accumulo.apache.org Received: (qmail 73668 invoked by uid 500); 21 Aug 2015 19:18:45 -0000 Mailing-List: contact notifications-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@apache.org Delivered-To: mailing list notifications@accumulo.apache.org Received: (qmail 73652 invoked by uid 99); 21 Aug 2015 19:18:45 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 19:18:45 +0000 Date: Fri, 21 Aug 2015 19:18:45 +0000 (UTC) From: "Christopher Tubbs (JIRA)" To: notifications@accumulo.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (ACCUMULO-3967) bulk import loses records when loading pre-split table MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/ACCUMULO-3967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14706333#comment-14706333 ] Christopher Tubbs edited comment on ACCUMULO-3967 at 8/21/15 7:17 PM: ---------------------------------------------------------------------- Sorry, couldn't figure out if I could upload this as a file rather than inline. It's kind of kludgey, but this is the test code I whipped up that demonstrates the problem on my cluster. I'll try on a second cluster tomorrow. {code:java} import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.PrintStream; import java.security.MessageDigest; import java.util.Collections; import java.util.List; import java.util.Random; import java.util.TreeSet; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /* * need to create namespace loadtest, and tables H_Test and T_Test in that * namespace. H_Test has rowid's of the form HASH:Timestamp where HASH is * an md5 of a random number, and Timestamp is just a random hour from 0-23. * T_Test has the opposite key, Timestamp:HASH. The RFiles for H_Test are * partitioned by range (HACK: I was lazy, so I just use 16 partitions in this * code...run with 16 reducers). The RFiles for the T_Test are partitioned * by hash, so all 24 leading hours will appear in each. * * You'll have to create an output dir containing the failures directory. */ public class TestBulkLoad extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(TestBulkLoad.class); private static final String NREC = "loadtest.nrec"; private static final String HTABLE = "loadtest.H_Test"; private static final String TTABLE = "loadtest.T_Test"; private static final Text CF = new Text("Family"); private static final Text CQ = new Text("Qualifier"); private static final Value NULL_VALUE = new Value(new byte[0]); private static final Text TROW = new Text(); public static class HMapClass extends Mapper { @Override public void map(String key, String value, Context output) throws IOException, InterruptedException { Key outKey = new Key(new Text(key + ":" + value), CF, CQ); output.write(outKey, NULL_VALUE); } } // partition by range. always 16 reducers public static class HPartitioner extends Partitioner { @Override public int getPartition(Key inKey, Value inValue, int inNumPartitions) { String rowid = inKey.getRow(TROW).toString(); char c = rowid.charAt(0); int cnum = (c >= '0' && c <= '9') ? ((int) c) - 0x30 : ((int)c) - 55; return cnum; } } public static class TMapClass extends Mapper { @Override public void map(String key, String value, Context output) throws IOException, InterruptedException { Key outKey = new Key(new Text(value + ":" + key), CF, CQ); output.write(outKey, NULL_VALUE); } } // partition by hash public static class TPartitioner extends Partitioner { @Override public int getPartition(Key inKey, Value inValue, int inNumPartitions) { String rowid = inKey.getRow(TROW).toString(); int ret = Math.abs(rowid.hashCode())%inNumPartitions; if (ret < 0) return -ret; return ret; } } public static class ReduceClass extends Reducer { public void reduce(Key key, Iterable values, Context output) throws IOException, InterruptedException { for (Value value : values) output.write(key, value); } } private int printUsage() { System.out.println("accumulo "+this.getClass().getName() +" H|T []"); return 0; } @Override public int run(String[] inArgs) throws Exception { if (inArgs.length > 0 && inArgs.length < 5) { System.out.println("ERROR: Wrong number of parameters: " + inArgs.length + " instead of 6."); return printUsage(); } Configuration conf = getConf(); PrintStream out = null; try { conf.set("mapreduce.job.jvm.numtasks", "-1"); conf.set("mapreduce.map.speculative", "false"); conf.set("mapreduce.map.output.compress", "false"); conf.set("mapred.max.split.size", "8388608"); conf.set("table.file.compress.type", "gz"); conf.set(Property.TABLE_FILE_TYPE.toString(), "rf"); String cbInstance = null; String zkServers = null; String user = null; byte[] pass = null; String workDir = null; String type = null; long nrec = 10000000; String tabName = null; TreeSet partitionKeys = new TreeSet(); // override config file if (inArgs.length > 4) { cbInstance = inArgs[0]; zkServers = inArgs[1]; user = inArgs[2]; pass = inArgs[3].getBytes(); workDir = inArgs[4]; type = inArgs[5]; if (inArgs.length > 6) nrec = Long.parseLong(inArgs[6]); } conf.set(NREC, "" + nrec); Instance instance = new ZooKeeperInstance(cbInstance, zkServers); Connector connector = instance.getConnector(user, pass); Job job = null; if (type.equals("H")) { tabName = HTABLE; partitionKeys.add(new Text("1")); partitionKeys.add(new Text("2")); partitionKeys.add(new Text("3")); partitionKeys.add(new Text("4")); partitionKeys.add(new Text("5")); partitionKeys.add(new Text("6")); partitionKeys.add(new Text("7")); partitionKeys.add(new Text("8")); partitionKeys.add(new Text("9")); partitionKeys.add(new Text("A")); partitionKeys.add(new Text("B")); partitionKeys.add(new Text("C")); partitionKeys.add(new Text("D")); partitionKeys.add(new Text("E")); partitionKeys.add(new Text("F")); job = new Job(conf, "HLoadTest"); job.setPartitionerClass(HPartitioner.class); job.setMapperClass(HMapClass.class); } else { tabName = TTABLE; partitionKeys.add(new Text("01")); partitionKeys.add(new Text("02")); partitionKeys.add(new Text("03")); partitionKeys.add(new Text("04")); partitionKeys.add(new Text("05")); partitionKeys.add(new Text("06")); partitionKeys.add(new Text("07")); partitionKeys.add(new Text("08")); partitionKeys.add(new Text("09")); partitionKeys.add(new Text("10")); partitionKeys.add(new Text("11")); partitionKeys.add(new Text("12")); partitionKeys.add(new Text("13")); partitionKeys.add(new Text("14")); partitionKeys.add(new Text("15")); partitionKeys.add(new Text("16")); partitionKeys.add(new Text("17")); partitionKeys.add(new Text("18")); partitionKeys.add(new Text("19")); partitionKeys.add(new Text("20")); partitionKeys.add(new Text("21")); partitionKeys.add(new Text("22")); partitionKeys.add(new Text("23")); job = new Job(conf, "TLoadTest"); job.setPartitionerClass(TPartitioner.class); job.setMapperClass(TMapClass.class); } connector.tableOperations().deleteRows(tabName, null, null); connector.tableOperations().addSplits(tabName, partitionKeys); job.setJarByClass(this.getClass()); job.setMapOutputKeyClass(Key.class); job.setMapOutputValueClass(Value.class); job.setReducerClass(ReduceClass.class); job.setInputFormatClass(FakeInputFormat.class); job.setOutputFormatClass(AccumuloFileOutputFormat.class); AccumuloFileOutputFormat.setOutputPath(job, new Path(workDir + "/files")); AccumuloFileOutputFormat.setCompressOutput(job, true); AccumuloFileOutputFormat.setCompressionType(job, "gz"); job.waitForCompletion(true); if (job.isSuccessful()) { connector.tableOperations().importDirectory(tabName, workDir+"/files", workDir+"/failures", false); } return job.isSuccessful() ? 1 : 0; } catch (Exception e) { throw new RuntimeException(e); } finally { if (out != null) out.close(); } } public static void main(String[] args) throws Exception { int res = ToolRunner.run(CachedConfiguration.getInstance(), new TestBulkLoad(), args); System.exit(res); } public static class TestInputSplit extends InputSplit implements Writable { private long nrec; public TestInputSplit() { nrec = 0; } public TestInputSplit(long inNRec) { nrec = inNRec; } public long getNRec() { return nrec; } @Override public long getLength() throws IOException, InterruptedException { return 1; } @Override public String[] getLocations() throws IOException, InterruptedException { return new String[] {}; } @Override public void write(DataOutput inOut) throws IOException { inOut.writeLong(nrec); } @Override public void readFields(DataInput inIn) throws IOException { nrec = inIn.readLong(); } } public static class FakeReader extends RecordReader { private static MessageDigest sMD5; static { try { sMD5 = MessageDigest.getInstance("MD5"); } catch (Exception e) { throw new RuntimeException(e); } } private static final char bytes[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' }; public static void doMD5(StringBuffer inImage, char[] outHash) { byte[] dig = null; synchronized(sMD5) { dig = sMD5.digest(inImage.toString().getBytes()); } for (int i=0; i < dig.length; i++) { int hi = (dig[i]&0xf0) >> 4; int lo = (dig[i]&0xf); outHash[i*2] = bytes[hi]; outHash[i*2+1] = bytes[lo]; } } private long nrec = 1; private long currec = 0; private Random rand = null; private char[] hash = new char[32]; private StringBuffer buf = new StringBuffer(); @Override public void initialize(InputSplit inSplit, TaskAttemptContext inContext) throws IOException, InterruptedException { nrec = ((TestInputSplit)inSplit).getNRec(); currec = 0; rand = new Random(System.currentTimeMillis()); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return currec++ < nrec; } @Override public String getCurrentKey() throws IOException, InterruptedException { buf.setLength(0); buf.append(rand.nextLong()); doMD5(buf, hash); return new String(hash); } @Override public String getCurrentValue() throws IOException, InterruptedException { // random number from 0-23 int hour = rand.nextInt(24); return (hour < 10) ? "0" + hour : "" + hour; } @Override public float getProgress() throws IOException, InterruptedException { return (float)currec/(float)nrec; } @Override public void close() throws IOException { } } public static class FakeInputFormat extends InputFormat { @Override public RecordReader createRecordReader(InputSplit inArg0, TaskAttemptContext inArg1) throws IOException, InterruptedException { return new FakeReader(); } @Override public List getSplits(JobContext inCtx) throws IOException, InterruptedException { long nrec = 1000000; String nrecStr = inCtx.getConfiguration().get(NREC); if (nrecStr != null) nrec = Long.parseLong(nrecStr); return Collections.singletonList((InputSplit)new TestInputSplit(nrec)); } } } {code} was (Author: etseidl): Sorry, couldn't figure out if I could upload this as a file rather than inline. It's kind of kludgey, but this is the test code I whipped up that demonstrates the problem on my cluster. I'll try on a second cluster tomorrow. {noformat} import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.PrintStream; import java.security.MessageDigest; import java.util.Collections; import java.util.List; import java.util.Random; import java.util.TreeSet; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /* * need to create namespace loadtest, and tables H_Test and T_Test in that * namespace. H_Test has rowid's of the form HASH:Timestamp where HASH is * an md5 of a random number, and Timestamp is just a random hour from 0-23. * T_Test has the opposite key, Timestamp:HASH. The RFiles for H_Test are * partitioned by range (HACK: I was lazy, so I just use 16 partitions in this * code...run with 16 reducers). The RFiles for the T_Test are partitioned * by hash, so all 24 leading hours will appear in each. * * You'll have to create an output dir containing the failures directory. */ public class TestBulkLoad extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(TestBulkLoad.class); private static final String NREC = "loadtest.nrec"; private static final String HTABLE = "loadtest.H_Test"; private static final String TTABLE = "loadtest.T_Test"; private static final Text CF = new Text("Family"); private static final Text CQ = new Text("Qualifier"); private static final Value NULL_VALUE = new Value(new byte[0]); private static final Text TROW = new Text(); public static class HMapClass extends Mapper { @Override public void map(String key, String value, Context output) throws IOException, InterruptedException { Key outKey = new Key(new Text(key + ":" + value), CF, CQ); output.write(outKey, NULL_VALUE); } } // partition by range. always 16 reducers public static class HPartitioner extends Partitioner { @Override public int getPartition(Key inKey, Value inValue, int inNumPartitions) { String rowid = inKey.getRow(TROW).toString(); char c = rowid.charAt(0); int cnum = (c >= '0' && c <= '9') ? ((int) c) - 0x30 : ((int)c) - 55; return cnum; } } public static class TMapClass extends Mapper { @Override public void map(String key, String value, Context output) throws IOException, InterruptedException { Key outKey = new Key(new Text(value + ":" + key), CF, CQ); output.write(outKey, NULL_VALUE); } } // partition by hash public static class TPartitioner extends Partitioner { @Override public int getPartition(Key inKey, Value inValue, int inNumPartitions) { String rowid = inKey.getRow(TROW).toString(); int ret = Math.abs(rowid.hashCode())%inNumPartitions; if (ret < 0) return -ret; return ret; } } public static class ReduceClass extends Reducer { public void reduce(Key key, Iterable values, Context output) throws IOException, InterruptedException { for (Value value : values) output.write(key, value); } } private int printUsage() { System.out.println("accumulo "+this.getClass().getName() +" H|T []"); return 0; } @Override public int run(String[] inArgs) throws Exception { if (inArgs.length > 0 && inArgs.length < 5) { System.out.println("ERROR: Wrong number of parameters: " + inArgs.length + " instead of 6."); return printUsage(); } Configuration conf = getConf(); PrintStream out = null; try { conf.set("mapreduce.job.jvm.numtasks", "-1"); conf.set("mapreduce.map.speculative", "false"); conf.set("mapreduce.map.output.compress", "false"); conf.set("mapred.max.split.size", "8388608"); conf.set("table.file.compress.type", "gz"); conf.set(Property.TABLE_FILE_TYPE.toString(), "rf"); String cbInstance = null; String zkServers = null; String user = null; byte[] pass = null; String workDir = null; String type = null; long nrec = 10000000; String tabName = null; TreeSet partitionKeys = new TreeSet(); // override config file if (inArgs.length > 4) { cbInstance = inArgs[0]; zkServers = inArgs[1]; user = inArgs[2]; pass = inArgs[3].getBytes(); workDir = inArgs[4]; type = inArgs[5]; if (inArgs.length > 6) nrec = Long.parseLong(inArgs[6]); } conf.set(NREC, "" + nrec); Instance instance = new ZooKeeperInstance(cbInstance, zkServers); Connector connector = instance.getConnector(user, pass); Job job = null; if (type.equals("H")) { tabName = HTABLE; partitionKeys.add(new Text("1")); partitionKeys.add(new Text("2")); partitionKeys.add(new Text("3")); partitionKeys.add(new Text("4")); partitionKeys.add(new Text("5")); partitionKeys.add(new Text("6")); partitionKeys.add(new Text("7")); partitionKeys.add(new Text("8")); partitionKeys.add(new Text("9")); partitionKeys.add(new Text("A")); partitionKeys.add(new Text("B")); partitionKeys.add(new Text("C")); partitionKeys.add(new Text("D")); partitionKeys.add(new Text("E")); partitionKeys.add(new Text("F")); job = new Job(conf, "HLoadTest"); job.setPartitionerClass(HPartitioner.class); job.setMapperClass(HMapClass.class); } else { tabName = TTABLE; partitionKeys.add(new Text("01")); partitionKeys.add(new Text("02")); partitionKeys.add(new Text("03")); partitionKeys.add(new Text("04")); partitionKeys.add(new Text("05")); partitionKeys.add(new Text("06")); partitionKeys.add(new Text("07")); partitionKeys.add(new Text("08")); partitionKeys.add(new Text("09")); partitionKeys.add(new Text("10")); partitionKeys.add(new Text("11")); partitionKeys.add(new Text("12")); partitionKeys.add(new Text("13")); partitionKeys.add(new Text("14")); partitionKeys.add(new Text("15")); partitionKeys.add(new Text("16")); partitionKeys.add(new Text("17")); partitionKeys.add(new Text("18")); partitionKeys.add(new Text("19")); partitionKeys.add(new Text("20")); partitionKeys.add(new Text("21")); partitionKeys.add(new Text("22")); partitionKeys.add(new Text("23")); job = new Job(conf, "TLoadTest"); job.setPartitionerClass(TPartitioner.class); job.setMapperClass(TMapClass.class); } connector.tableOperations().deleteRows(tabName, null, null); connector.tableOperations().addSplits(tabName, partitionKeys); job.setJarByClass(this.getClass()); job.setMapOutputKeyClass(Key.class); job.setMapOutputValueClass(Value.class); job.setReducerClass(ReduceClass.class); job.setInputFormatClass(FakeInputFormat.class); job.setOutputFormatClass(AccumuloFileOutputFormat.class); AccumuloFileOutputFormat.setOutputPath(job, new Path(workDir + "/files")); AccumuloFileOutputFormat.setCompressOutput(job, true); AccumuloFileOutputFormat.setCompressionType(job, "gz"); job.waitForCompletion(true); if (job.isSuccessful()) { connector.tableOperations().importDirectory(tabName, workDir+"/files", workDir+"/failures", false); } return job.isSuccessful() ? 1 : 0; } catch (Exception e) { throw new RuntimeException(e); } finally { if (out != null) out.close(); } } public static void main(String[] args) throws Exception { int res = ToolRunner.run(CachedConfiguration.getInstance(), new TestBulkLoad(), args); System.exit(res); } public static class TestInputSplit extends InputSplit implements Writable { private long nrec; public TestInputSplit() { nrec = 0; } public TestInputSplit(long inNRec) { nrec = inNRec; } public long getNRec() { return nrec; } @Override public long getLength() throws IOException, InterruptedException { return 1; } @Override public String[] getLocations() throws IOException, InterruptedException { return new String[] {}; } @Override public void write(DataOutput inOut) throws IOException { inOut.writeLong(nrec); } @Override public void readFields(DataInput inIn) throws IOException { nrec = inIn.readLong(); } } public static class FakeReader extends RecordReader { private static MessageDigest sMD5; static { try { sMD5 = MessageDigest.getInstance("MD5"); } catch (Exception e) { throw new RuntimeException(e); } } private static final char bytes[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' }; public static void doMD5(StringBuffer inImage, char[] outHash) { byte[] dig = null; synchronized(sMD5) { dig = sMD5.digest(inImage.toString().getBytes()); } for (int i=0; i < dig.length; i++) { int hi = (dig[i]&0xf0) >> 4; int lo = (dig[i]&0xf); outHash[i*2] = bytes[hi]; outHash[i*2+1] = bytes[lo]; } } private long nrec = 1; private long currec = 0; private Random rand = null; private char[] hash = new char[32]; private StringBuffer buf = new StringBuffer(); @Override public void initialize(InputSplit inSplit, TaskAttemptContext inContext) throws IOException, InterruptedException { nrec = ((TestInputSplit)inSplit).getNRec(); currec = 0; rand = new Random(System.currentTimeMillis()); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return currec++ < nrec; } @Override public String getCurrentKey() throws IOException, InterruptedException { buf.setLength(0); buf.append(rand.nextLong()); doMD5(buf, hash); return new String(hash); } @Override public String getCurrentValue() throws IOException, InterruptedException { // random number from 0-23 int hour = rand.nextInt(24); return (hour < 10) ? "0" + hour : "" + hour; } @Override public float getProgress() throws IOException, InterruptedException { return (float)currec/(float)nrec; } @Override public void close() throws IOException { } } public static class FakeInputFormat extends InputFormat { @Override public RecordReader createRecordReader(InputSplit inArg0, TaskAttemptContext inArg1) throws IOException, InterruptedException { return new FakeReader(); } @Override public List getSplits(JobContext inCtx) throws IOException, InterruptedException { long nrec = 1000000; String nrecStr = inCtx.getConfiguration().get(NREC); if (nrecStr != null) nrec = Long.parseLong(nrecStr); return Collections.singletonList((InputSplit)new TestInputSplit(nrec)); } } } {noformat} > bulk import loses records when loading pre-split table > ------------------------------------------------------ > > Key: ACCUMULO-3967 > URL: https://issues.apache.org/jira/browse/ACCUMULO-3967 > Project: Accumulo > Issue Type: Bug > Components: client, tserver > Affects Versions: 1.7.0 > Environment: generic hadoop 2.6.0, zookeeper 3.4.6 on redhat 6.7 > 7 node cluster > Reporter: Edward Seidl > Priority: Blocker > Fix For: 1.7.1, 1.8.0 > > > I just noticed that some records I'm loading via importDirectory go missing. After a lot of digging around trying to reproduce the problem, I discovered that it occurs most frequently when loading a table that I have just recently added splits to. In the tserver logs I'll see messages like > 20 16:25:36,805 [client.BulkImporter] INFO : Could not assign 1 map files to tablet 1xw;18;17 because : Not Serving Tablet . Will retry ... > > or > 20 16:25:44,826 [tserver.TabletServer] INFO : files [hdfs://xxxx:54310/accumulo/tables/1xw/b-00jnmxe/I00jnmxq.rf] not imported to 1xw;03;02: tablet 1xw;03;02 is closed > these appear after messages about unloading tablets...it seems that tablets are being redistributed at the same time as the bulk import is occuring. > Steps to reproduce > 1) I run a mapreduce job that produces random data in rfiles > 2) copy the rfiles to an import directory > 3) create table or deleterows -f > 4) addsplits > 5) importdirectory > I have also performed the above completely within the mapreduce job, with similar results. The difference with the mapreduce job is that the time between adding splits and the import directory is minutes rather than seconds. > my current test creates 1000000 records, and after the importdirectory returns a count of rows will be anywhere from ~800000 to 1000000. > With my original workflow, I found that re-importing the same set of rfiles three times would eventually get all rows loaded. -- This message was sent by Atlassian JIRA (v6.3.4#6332)