hbase-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ruslan Salyakhov <rusla...@gmail.com>
Subject Re: Bulk import, HFiles, Multiple reducers and TotalOrderPartitioner
Date Thu, 25 Mar 2010 20:13:27 GMT
Jean-Daniel,
https://issues.apache.org/jira/browse/HBASE-2378

Ruslan

On Thu, Mar 25, 2010 at 8:22 PM, Jean-Daniel Cryans <jdcryans@apache.org>wrote:

> Ruslan,
>
> I see you did all the required homework but this mail is really hard
> to read. Can you create a jira
> (http://issues.apache.org/jira/browse/HBASE) and attach all the code?
> This will also make it easier to track.
>
> thx!
>
> J-D
>
> On Wed, Mar 24, 2010 at 6:02 PM, Ruslan Salyakhov <ruslansa@gmail.com>
> wrote:
> > Hi!
> >
> > I'm trying to use bulk import that writing HFiles directly into HDFS and
> > have a problem with multiple reducers. If I run MR to prepare HFIles with
> > more than one reducer then some values for keys are not appeared in the
> > table after loadtable.rb script execution. With one reducer everything
> works
> > fine. Let's take a look at details:
> >
> > Environment:
> > - Hadoop 0.20.1
> > - HBase release 0.20.3
> >
> >
> http://hadoop.apache.org/hbase/docs/r0.20.3/api/org/apache/hadoop/hbase/mapreduce/package-summary.html#bulk
> > - the row id must be formatted as a ImmutableBytesWritable
> > - MR job should ensure a total ordering among all keys
> >
> > http://issues.apache.org/jira/browse/MAPREDUCE-366 (patch-5668-3.txt)
> > - TotalOrderPartitioner that uses the new API
> >
> > https://issues.apache.org/jira/browse/HBASE-2063
> > - patched HFileOutputFormat
> >
> > Sample data of my keys:
> > 1.3.SWE.AB.-1.UPPLANDS-VASBY.1.1.0.1
> > 1.306.CAN.ON.-1.LONDON.1.1.0.1
> > 1.306.USA.CO.751.FT COLLINS.1.1.1.0
> > 1.306.USA.CO.751.LITTLETON.1.1.1.0
> > 4.6.USA.TX.623.MUENSTER.1.1.0.0
> > 4.67.USA.MI.563.GRAND RAPIDS.1.1.0.0
> > 4.68.USA.CT.533.WILLINGTON.1.1.1.0
> > 4.68.USA.LA.642.LAFAYETTE.1.1.1.0
> > 4.9.USA.CT.501.STAMFORD.1.1.0.0
> > 4.9.USA.NJ.504.PRINCETON.1.1.0.1
> > 4.92.USA.IN.527.INDIANAPOLIS.1.1.0.0
> >
> > I've put everything together:
> >
> > 1) Test of TotalOrderPartitioner that checks how it works with my keys.
> > note that I've set up my comparator to pass that test
> > conf.setClass("mapred.output.key.comparator.class",
> MyKeyComparator.class,
> > Object.class);
> >
> > import java.io.IOException;
> > import java.util.ArrayList;
> >
> > import junit.framework.TestCase;
> >
> > import org.apache.hadoop.conf.Configuration;
> > import org.apache.hadoop.fs.FileSystem;
> > import org.apache.hadoop.fs.Path;
> > import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> > import org.apache.hadoop.hbase.util.Bytes;
> > import org.apache.hadoop.io.NullWritable;
> > import org.apache.hadoop.io.SequenceFile;
> > import org.apache.hadoop.io.WritableComparable;
> > import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
> >
> > public class TestTotalOrderPartitionerForHFileKeys extends TestCase {
> >
> >    private static final ImmutableBytesWritable[] splitKeys = new
> > ImmutableBytesWritable[] {
> >            // -inf
> >                    // 0
> >            new
> >
> ImmutableBytesWritable(Bytes.toBytes("0.27.USA.OK.650.FAIRVIEW.1.1.0.1")),
> >        // 1
> >            new
> > ImmutableBytesWritable(Bytes.toBytes("0.430.USA.TX.625.Rollup.1.1.0.0")),
> >        // 2
> >            new ImmutableBytesWritable(Bytes.toBytes("0.9.USA.NY.501.NEW
> > YORK.1.1.0.0")),         // 3
> >            new
> > ImmutableBytesWritable(Bytes.toBytes("1.103.USA.DC.511.Rollup.1.1.0.0")),
> >        // 4
> >            new
> > ImmutableBytesWritable(Bytes.toBytes("1.11.CAN.QC.-1.MONTREAL.1.1.1.0")),
> >        // 5
> >            new
> >
> ImmutableBytesWritable(Bytes.toBytes("1.220.USA.NC.Rollup.Rollup.1.1.1.0")),
> >    // 6
> >            new
> >
> ImmutableBytesWritable(Bytes.toBytes("1.225.USA.Rollup.Rollup.Rollup.1.1.0.1")),//
> > 7
> >            new
> > ImmutableBytesWritable(Bytes.toBytes("1.245.ZAF.WC.-1.PAROW.1.1.0.1")),
> >    // 8
> >            new ImmutableBytesWritable(Bytes.toBytes("1.249.USA.MI.513.BAY
> > CITY.1.1.0.0"))         // 9
> >    };
> >
> >    private static final ArrayList<Check<ImmutableBytesWritable>> testKeys
> =
> > new ArrayList<Check<ImmutableBytesWritable>>();
> >    static {
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("0.10.USA.CA.825.SAN DIEGO.1.1.0.1")), 0));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("0.103.FRA.J.-1.PARIS.1.1.0.1")), 0));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("0.3.GBR.SCT.826032.PERTH.1.1.0.1")), 1));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("0.42.GBR.ENG.Rollup.Rollup.1.1.0.1")), 1));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("0.7.USA.CA.807.SANTA CLARA.1.1.0.0")), 2));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("1.10.SWE.AB.-1.STOCKHOLM.1.1.0.0")), 3));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("1.108.ABW.Rollup.Rollup.Rollup.1.1.0.0")), 4));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("1.11.CAN.NB.-1.SACKVILLE.1.1.0.1")), 4));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("1.11.CAN.Rollup.Rollup.Rollup.1.1.0.0")), 5));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("1.220.USA.NM.790.ALBUQUERQUE.1.1.0.0")), 6));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("1.23.GBR.ENG.826005.NEWHAM.1.1.0.0")), 7));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("1.248.GBR.ENG.826012.HULL.1.1.0.1")), 8));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("1.25.CAN.AB.-1.GRANDE PRAIRIE.1.1.0.0")), 9));
> >        testKeys.add(new Check<ImmutableBytesWritable>(new
> > ImmutableBytesWritable(Bytes
> >                .toBytes("1.25.CAN.AB.Rollup.Rollup.1.1.0.0")), 9));
> >    };
> >
> >    public void testTotalOrderHFileKeyBinarySearch() throws Exception {
> >        TotalOrderPartitioner<ImmutableBytesWritable, NullWritable>
> > partitioner = new TotalOrderPartitioner<ImmutableBytesWritable,
> > NullWritable>();
> >        Configuration conf = new Configuration();
> >        Path p =
> > TestTotalOrderPartitionerForHFileKeys.<ImmutableBytesWritable>
> > writePartitionFile(
> >                "totalorderbinarysearch", conf, splitKeys);
> >        conf.setBoolean("total.order.partitioner.natural.order", false);
> >        conf.setClass("mapred.mapoutput.key.class",
> > ImmutableBytesWritable.class, Object.class);
> >        conf.setClass("mapred.output.key.comparator.class",
> > MyKeyComparator.class, Object.class);
> >
> >        try {
> >            partitioner.setConf(conf);
> >            NullWritable nw = NullWritable.get();
> >            for (Check<ImmutableBytesWritable> chk : testKeys) {
> >                log(Bytes.toString(chk.data.get()) + ", chk.part: " +
> > chk.part + ", should be: "
> >                        + partitioner.getPartition(chk.data, nw,
> > splitKeys.length + 1));
> >
> >                assertEquals(Bytes.toString(chk.data.get()), chk.part,
> >                        partitioner.getPartition(chk.data, nw,
> > splitKeys.length + 1));
> >
> >            }
> >        } finally {
> >            p.getFileSystem(conf).delete(p, true);
> >        }
> >    }
> >
> >    public void testInventoryKeyComparator() {
> >        InventoryKeyComparator comparator = new InventoryKeyComparator();
> >        for (int i = 0; i < splitKeys.length - 2; i++) {
> >            // splitKeys should be sorted in ascending order
> >            int res1 = comparator.compare(splitKeys[i], splitKeys[i + 1]);
> >            assertTrue(res1 < 0);
> >
> >            int res2 = comparator.compare(splitKeys[i].get(), 0,
> > splitKeys[i].get().length,
> >                    splitKeys[i + 1].get(), 0, splitKeys[i +
> > 1].get().length);
> >
> >            assertTrue(res2 < 0);
> >            assertTrue(res1 == res2);
> >        }
> >    }
> >
> >    // ----------------------------------------
> >    // Copy-Paste from TestTotalOrderPartitoner
> >    // http://issues.apache.org/jira/browse/MAPREDUCE-366
> >    // ----------------------------------------
> >    static class Check<T> {
> >        T data;
> >        int part;
> >
> >        Check(T data, int part) {
> >            this.data = data;
> >            this.part = part;
> >        }
> >    }
> >
> >    private static <T extends WritableComparable<?>> Path
> > writePartitionFile(String testname,
> >            Configuration conf, T[] splits) throws IOException {
> >        final FileSystem fs = FileSystem.getLocal(conf);
> >        final Path testdir = new
> Path(System.getProperty("test.build.data",
> > "/tmp"))
> >                .makeQualified(fs);
> >        Path p = new Path(testdir, testname + "/_partition.lst");
> >        TotalOrderPartitioner.setPartitionFile(conf, p);
> >        conf.setInt("mapred.reduce.tasks", splits.length + 1);
> >        SequenceFile.Writer w = null;
> >        try {
> >            w = SequenceFile.createWriter(fs, conf, p,
> splits[0].getClass(),
> > NullWritable.class,
> >                    SequenceFile.CompressionType.NONE);
> >            for (int i = 0; i < splits.length; ++i) {
> >                w.append(splits[i], NullWritable.get());
> >            }
> >        } finally {
> >            if (null != w)
> >                w.close();
> >        }
> >        return p;
> >    }
> >
> >    private static void log(String message) {
> >        System.out.println(message);
> >    }
> > }
> >
> > 2) MyKeyComparator
> > I've wrote it to pass test above.
> >
> > import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> > import org.apache.hadoop.hbase.util.Bytes;
> > import org.apache.hadoop.io.RawComparator;
> >
> > public class MyKeyComparator implements
> > RawComparator<ImmutableBytesWritable> {
> >
> >    public int compare(ImmutableBytesWritable o1, ImmutableBytesWritable
> o2)
> > {
> >        return Bytes.compareTo(o1.get(), o2.get());
> >    }
> >
> >    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int
> l2)
> > {
> >        return Bytes.compareTo(b1, s1, l1, b2, s2, l2);
> >    }
> > }
> >
> > 3) MySampler
> > this code is based on InputSampler from
> > http://issues.apache.org/jira/browse/MAPREDUCE-366 (patch-5668-3.txt)
> > BUT I've put the following string into getSample:
> >            reader.initialize(splits.get(i), new
> > TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
> > without that string it doesn't work
> >
> >
> > import java.io.IOException;
> > import java.util.ArrayList;
> > import java.util.Arrays;
> > import java.util.List;
> > import java.util.Random;
> > import java.util.StringTokenizer;
> >
> > import org.apache.commons.logging.Log;
> > import org.apache.commons.logging.LogFactory;
> > import org.apache.hadoop.conf.Configuration;
> > import org.apache.hadoop.conf.Configured;
> > import org.apache.hadoop.fs.FileSystem;
> > import org.apache.hadoop.fs.Path;
> > import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> > import org.apache.hadoop.hbase.util.Bytes;
> > import org.apache.hadoop.io.NullWritable;
> > import org.apache.hadoop.io.RawComparator;
> > import org.apache.hadoop.io.SequenceFile;
> > import org.apache.hadoop.io.Text;
> > import org.apache.hadoop.mapreduce.InputFormat;
> > import org.apache.hadoop.mapreduce.InputSplit;
> > import org.apache.hadoop.mapreduce.Job;
> > import org.apache.hadoop.mapreduce.RecordReader;
> > import org.apache.hadoop.mapreduce.TaskAttemptContext;
> > import org.apache.hadoop.mapreduce.TaskAttemptID;
> > import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
> > import org.apache.hadoop.util.ReflectionUtils;
> >
> > /**
> >  * based on org.apache.hadoop.mapreduce.lib.partition.InputSampler
> >  */
> > public class MySampler extends Configured  {
> >      private static final Log LOG = LogFactory.getLog(MySampler.class);
> >
> >      public MySampler(Configuration conf) {
> >        setConf(conf);
> >      }
> >
> >      /**
> >       * Sample from random points in the input.
> >       * General-purpose sampler. Takes numSamples / maxSplitsSampled
> inputs
> > from
> >       * each split.
> >       */
> >      public static class RandomSampler {
> >        private double freq;
> >        private final int numSamples;
> >        private final int maxSplitsSampled;
> >
> >        /**
> >         * Create a new RandomSampler sampling <em>all</em> splits.
> >         * This will read every split at the client, which is very
> > expensive.
> >         * @param freq Probability with which a key will be chosen.
> >         * @param numSamples Total number of samples to obtain from all
> > selected
> >         *                   splits.
> >         */
> >        public RandomSampler(double freq, int numSamples) {
> >          this(freq, numSamples, Integer.MAX_VALUE);
> >        }
> >
> >        /**
> >         * Create a new RandomSampler.
> >         * @param freq Probability with which a key will be chosen.
> >         * @param numSamples Total number of samples to obtain from all
> > selected
> >         *                   splits.
> >         * @param maxSplitsSampled The maximum number of splits to
> examine.
> >         */
> >        public RandomSampler(double freq, int numSamples, int
> > maxSplitsSampled) {
> >          this.freq = freq;
> >          this.numSamples = numSamples;
> >          this.maxSplitsSampled = maxSplitsSampled;
> >        }
> >
> >        /**
> >         * Randomize the split order, then take the specified number of
> keys
> > from
> >         * each split sampled, where each key is selected with the
> specified
> >         * probability and possibly replaced by a subsequently selected
> key
> > when
> >         * the quota of keys from that split is satisfied.
> >         */
> >        @SuppressWarnings("unchecked") // ArrayList::toArray doesn't
> > preserve type
> >        public ImmutableBytesWritable[]
> > getSample(InputFormat<ImmutableBytesWritable, Text> inf, Job job)
> >            throws IOException, InterruptedException {
> >          List<InputSplit> splits = inf.getSplits(job);
> >          ArrayList<ImmutableBytesWritable> samples = new
> > ArrayList<ImmutableBytesWritable>(numSamples);
> >          int splitsToSample = Math.min(maxSplitsSampled, splits.size());
> >
> >          Random r = new Random();
> >          long seed = r.nextLong();
> >          r.setSeed(seed);
> >          LOG.debug("seed: " + seed);
> >          // shuffle splits
> >          for (int i = 0; i < splits.size(); ++i) {
> >            InputSplit tmp = splits.get(i);
> >            int j = r.nextInt(splits.size());
> >            splits.set(i, splits.get(j));
> >            splits.set(j, tmp);
> >          }
> >          // our target rate is in terms of the maximum number of sample
> > splits,
> >          // but we accept the possibility of sampling additional splits
> to
> > hit
> >          // the target sample keyset
> >          for (int i = 0; i < splitsToSample ||
> >                         (i < splits.size() && samples.size() <
> numSamples);
> > ++i) {
> >            RecordReader<ImmutableBytesWritable, Text> reader =
> > inf.createRecordReader(splits.get(i),
> >              new TaskAttemptContext(job.getConfiguration(), new
> > TaskAttemptID()));
> >
> >            reader.initialize(splits.get(i), new
> > TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
> >
> >            while (reader.nextKeyValue()) {
> >              if (r.nextDouble() <= freq) {
> >                if (samples.size() < numSamples) {
> >                  samples.add(composeKey(reader.getCurrentValue()));
> >                } else {
> >                  // When exceeding the maximum number of samples, replace
> a
> >                  // random element with this one, then adjust the
> frequency
> >                  // to reflect the possibility of existing elements being
> >                  // pushed out
> >                  int ind = r.nextInt(numSamples);
> >                  if (ind != numSamples) {
> >                    samples.set(ind,
> composeKey(reader.getCurrentValue()));
> >                  }
> >                  freq *= (numSamples - 1) / (double) numSamples;
> >                }
> >              }
> >            }
> >            reader.close();
> >          }
> >          return (ImmutableBytesWritable[])samples.toArray(new
> > ImmutableBytesWritable[samples.size()]);
> >        }
> >
> >      }
> >
> >      private static ImmutableBytesWritable composeKey(Text value) {
> >          StringTokenizer itr = new StringTokenizer(value.toString(),
> "\t");
> >            int cnt = 0;
> >            String[] vals = new
> > String[AdvancedRawLogUploader.RAW_LOG_NUM_FILEDS];
> >            while (itr.hasMoreTokens()) {
> >                vals[cnt] = itr.nextToken();
> >                cnt++;
> >            }
> >
> >            String newKeyStr =
> > AdvancedRawLogUploader.RawLogMapper.generateKey(vals, 0,
> >                    AdvancedRawLogUploader.NUM_KEY_FILEDS,
> > AdvancedRawLogUploader.KEY_PARTS_DELIMITER);
> >            info(newKeyStr);
> >            ImmutableBytesWritable newKey = new
> ImmutableBytesWritable(Bytes
> >                    .toBytes(newKeyStr));
> >          return newKey;
> >      }
> >
> >      /**
> >       * Write a partition file for the given job, using the Sampler
> > provided.
> >       * Queries the sampler for a sample keyset, sorts by the output key
> >       * comparator, selects the keys for each rank, and writes to the
> > destination
> >       * returned from {@link TotalOrderPartitioner#getPartitionFile}.
> >       */
> >      @SuppressWarnings("unchecked") // getInputFormat,
> > getOutputKeyComparator
> >      public static void writePartitionFile(Job job, RandomSampler
> sampler)
> >          throws IOException, ClassNotFoundException, InterruptedException
> {
> >        Configuration conf = job.getConfiguration();
> >        final InputFormat inf =
> >            ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
> >        int numPartitions = job.getNumReduceTasks();
> >        ImmutableBytesWritable[] samples = sampler.getSample(inf, job);
> >        LOG.info("Using " + samples.length + " samples");
> >        RawComparator<ImmutableBytesWritable> comparator =
> >          (RawComparator<ImmutableBytesWritable>) job.getSortComparator();
> >        Arrays.sort(samples, comparator);
> >        Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
> >        FileSystem fs = dst.getFileSystem(conf);
> >        if (fs.exists(dst)) {
> >          fs.delete(dst, false);
> >        }
> >        SequenceFile.Writer writer = SequenceFile.createWriter(fs,
> >          conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
> >        NullWritable nullValue = NullWritable.get();
> >        float stepSize = samples.length / (float) numPartitions;
> >        int last = -1;
> >        for(int i = 1; i < numPartitions; ++i) {
> >          int k = Math.round(stepSize * i);
> >          while (last >= k && comparator.compare(samples[last],
> samples[k])
> > == 0) {
> >            ++k;
> >          }
> >          writer.append(samples[k], nullValue);
> >          last = k;
> >        }
> >        writer.close();
> >      }
> >
> >      private static void info(String message) {
> > //          LOG.info(message);
> >          System.out.println(message);
> >      }
> >
> >
> > }
> >
> > 4) and finally the definition of my MR job:
> >
> > MySampler.RandomSampler sampler = new MySampler.RandomSampler(0.1, 10000,
> > 10);
> > in = in.makeQualified(in.getFileSystem(conf));
> > Path partitionFile = new Path(in.getParent(), "_partitions");
> > // Use TotalOrderPartitioner based on the new API: from
> > http://issues.apache.org/jira/browse/MAPREDUCE-366
> > TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
> > URI partitionUri = new URI(partitionFile.toString() + "#_partitions");
> > DistributedCache.addCacheFile(partitionUri, conf);
> > DistributedCache.createSymlink(conf);
> >
> > Job job = new Job(conf, "Write HFiles at " + out.getName());
> > job.setNumReduceTasks(numReduceTasks);
> > job.setJarByClass(MyHFilesWriter.class);
> > job.setMapperClass(MyMapper.class);
> > job.setMapOutputKeyClass(ImmutableBytesWritable.class);
> > job.setMapOutputValueClass(KeyValue.class);
> > job.setReducerClass(KeyValueSortReducer.class);
> > job.setOutputFormatClass(HFileOutputFormat.class);
> >
> > //job.setSortComparatorClass(MyKeyComparator.class);
> > // if you uncomment the code above you will get:
> >        /*
> > 10/03/24 15:00:43 INFO mapred.JobClient: Task Id :
> > attempt_201003171417_0063_r_000000_0, Status : FAILED
> > java.io.IOException: Added a key not lexically larger than previous
> > key=1.9.USA.AOL.0.AOL.1.1.0.0valsCategoryRollupFlag,
> > lastkey=2.14.USA.MA.0.?.1.1.0.0valsTagFormatId
> >    at
> > org.apache.hadoop.hbase.io.hfile.HFile$Writer.checkKey(HFile.java:551)
> >    at
> org.apache.hadoop.hbase.io.hfile.HFile$Writer.append(HFile.java:513)
> >    at
> org.apache.hadoop.hbase.io.hfile.HFile$Writer.append(HFile.java:481)
> >    at
> >
> com.contextweb.hadoop.hbase.mapred.HFileOutputFormat$1.write(HFileOutputFormat.java:77)
> >    at
> >
> com.contextweb.hadoop.hbase.mapred.HFileOutputFormat$1.write(HFileOutputFormat.java:49)
> >    at
> >
> org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:508)
> >    at
> >
> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
> >    at
> >
> org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:46)
> >    at
> >
> org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:35)
> >    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
> >    at
> > org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566)
> >    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408)
> >    at org.apache.hadoop.mapred.Child.main(Child.java:170)
> >
> > */
> >
> > FileOutputFormat.setOutputPath(job, out);
> > FileInputFormat.addInputPath(job, in);
> > job.setPartitionerClass(TotalOrderPartitioner.class);
> > MySampler.writePartitionFile(job, sampler);
> >
> > System.exit(job.waitForCompletion(true) ? 0 : 1);
> >
> > So if I do not use the MyKeyComparator class (
> > job.setSortComparatorClass(MyKeyComparator.class);) then nothing is
> changed
> > - it works but the values for some keys are not appeared in the table,
> > otherwise (with MyKeyComparator) the error occurs "Added a key not
> lexically
> > larger than previous key".
> >
> > What am I doing wrong? I want to run my MR with more than one reducer and
> > get all data in HBase table after loadtable.rb execution.
> > Thank you have read this far, I hope you didn't get headache :)
> >
> > Ruslan Salyakhov | ruslan@jalent.ru
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message