hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1124542 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/mapreduce/ src/test/java/org/apache/hadoop/hbase/mapreduce/
Date Thu, 19 May 2011 05:48:38 GMT
Author: stack
Date: Thu May 19 05:48:37 2011
New Revision: 1124542

URL: http://svn.apache.org/viewvc?rev=1124542&view=rev
Log:
HBASE-3880 Make mapper function in ImportTSV plug-able

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1124542&r1=1124541&r2=1124542&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Thu May 19 05:48:37 2011
@@ -219,6 +219,7 @@ Release 0.91.0 - Unreleased
    HBASE-3797  StoreFile Level Compaction Locking
    HBASE-1476  Multithreaded Compactions
    HBASE-3877  Determine Proper Defaults for Compaction ThreadPools
+   HBASE-3880  Make mapper function in ImportTSV plug-able (Bill Graham)
 
   TASKS
    HBASE-3559  Move report of split to master OFF the heartbeat channel

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java?rev=1124542&r1=1124541&r2=1124542&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java Thu May 19
05:48:37 2011
@@ -28,17 +28,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -59,12 +53,14 @@ import com.google.common.collect.Lists;
 public class ImportTsv {
   final static String NAME = "importtsv";
 
+  final static String MAPPER_CONF_KEY = "importtsv.mapper.class";
   final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
   final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output";
   final static String COLUMNS_CONF_KEY = "importtsv.columns";
   final static String SEPARATOR_CONF_KEY = "importtsv.separator";
   final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp";
   final static String DEFAULT_SEPARATOR = "\t";
+  final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
 
   static class TsvParser {
     /**
@@ -76,7 +72,7 @@ public class ImportTsv {
     private final byte separatorByte;
 
     private int rowKeyColumnIndex;
-    
+
     public static String ROWKEY_COLUMN_SPEC="HBASE_ROW_KEY";
 
     /**
@@ -93,7 +89,7 @@ public class ImportTsv {
       // Configure columns
       ArrayList<String> columnStrings = Lists.newArrayList(
         Splitter.on(',').trimResults().split(columnsSpecification));
-      
+
       families = new byte[columnStrings.size()][];
       qualifiers = new byte[columnStrings.size()][];
 
@@ -113,7 +109,7 @@ public class ImportTsv {
         }
       }
     }
-    
+
     public int getRowKeyColumnIndex() {
       return rowKeyColumnIndex;
     }
@@ -123,7 +119,7 @@ public class ImportTsv {
     public byte[] getQualifier(int idx) {
       return qualifiers[idx];
     }
-    
+
     public ParsedLine parse(byte[] lineBytes, int length)
     throws BadTsvLineException {
       // Enumerate separator offsets
@@ -146,16 +142,16 @@ public class ImportTsv {
       }
       return new ParsedLine(tabOffsets, lineBytes);
     }
-    
+
     class ParsedLine {
       private final ArrayList<Integer> tabOffsets;
       private byte[] lineBytes;
-      
+
       ParsedLine(ArrayList<Integer> tabOffsets, byte[] lineBytes) {
         this.tabOffsets = tabOffsets;
         this.lineBytes = lineBytes;
       }
-      
+
       public int getRowKeyOffset() {
         return getColumnOffset(rowKeyColumnIndex);
       }
@@ -167,7 +163,7 @@ public class ImportTsv {
           return tabOffsets.get(idx - 1) + 1;
         else
           return 0;
-      }      
+      }
       public int getColumnLength(int idx) {
         return tabOffsets.get(idx) - getColumnOffset(idx);
       }
@@ -178,7 +174,7 @@ public class ImportTsv {
         return lineBytes;
       }
     }
-    
+
     public static class BadTsvLineException extends Exception {
       public BadTsvLineException(String err) {
         super(err);
@@ -186,103 +182,6 @@ public class ImportTsv {
       private static final long serialVersionUID = 1L;
     }
   }
-  
-  /**
-   * Write table content out to files in hdfs.
-   */
-  static class TsvImporter
-  extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
-  {
-    
-    /** Timestamp for all inserted rows */
-    private long ts;
-
-    /** Should skip bad lines */
-    private boolean skipBadLines;
-    private Counter badLineCount;
-
-    private TsvParser parser;
-
-    @Override
-    protected void setup(Context context) {
-      Configuration conf = context.getConfiguration();
-
-      // If a custom separator has been used,
-      // decode it back from Base64 encoding.
-      String separator = conf.get(SEPARATOR_CONF_KEY);
-      if (separator == null) {
-        separator = DEFAULT_SEPARATOR;
-      } else {
-        separator = new String(Base64.decode(separator));
-      }
-
-      parser = new TsvParser(conf.get(COLUMNS_CONF_KEY),
-                             separator);
-      if (parser.getRowKeyColumnIndex() == -1) {
-        throw new RuntimeException("No row key column specified");
-      }
-      ts = conf.getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis());
-
-      skipBadLines = context.getConfiguration().getBoolean(
-        SKIP_LINES_CONF_KEY, true);
-      badLineCount = context.getCounter("ImportTsv", "Bad Lines");
-    }
-
-    /**
-     * Convert a line of TSV text into an HBase table row.
-     */
-    @Override
-    public void map(LongWritable offset, Text value,
-      Context context)
-    throws IOException {
-      byte[] lineBytes = value.getBytes();
-
-      try {
-        TsvParser.ParsedLine parsed = parser.parse(
-            lineBytes, value.getLength());
-        ImmutableBytesWritable rowKey =
-          new ImmutableBytesWritable(lineBytes,
-              parsed.getRowKeyOffset(),
-              parsed.getRowKeyLength());
-
-        Put put = new Put(rowKey.copyBytes());
-        for (int i = 0; i < parsed.getColumnCount(); i++) {
-          if (i == parser.getRowKeyColumnIndex()) continue;
-          KeyValue kv = new KeyValue(
-              lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
-              parser.getFamily(i), 0, parser.getFamily(i).length,
-              parser.getQualifier(i), 0, parser.getQualifier(i).length,
-              ts,
-              KeyValue.Type.Put,
-              lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
-          put.add(kv);
-        }
-        context.write(rowKey, put);
-      } catch (BadTsvLineException badLine) {
-        if (skipBadLines) {
-          System.err.println(
-              "Bad line at offset: " + offset.get() + ":\n" +
-              badLine.getMessage());
-          badLineCount.increment(1);
-          return;
-        } else {
-          throw new IOException(badLine);
-        }
-      } catch (IllegalArgumentException e) {
-        if (skipBadLines) {
-          System.err.println(
-              "Bad line at offset: " + offset.get() + ":\n" +
-              e.getMessage());
-          badLineCount.increment(1);
-          return;
-        } else {
-          throw new IOException(e);
-        }
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-  }
 
   /**
    * Sets up the actual job.
@@ -293,7 +192,7 @@ public class ImportTsv {
    * @throws IOException When setting up the job fails.
    */
   public static Job createSubmittableJob(Configuration conf, String[] args)
-  throws IOException {
+  throws IOException, ClassNotFoundException {
 
     // Support non-XML supported characters
     // by re-encoding the passed separator as a Base64 string.
@@ -303,13 +202,18 @@ public class ImportTsv {
       Base64.encodeBytes(actualSeparator.getBytes())));
     }
 
+    // See if a non-default Mapper was set
+    String mapperClassName = conf.get(MAPPER_CONF_KEY);
+    Class mapperClass = mapperClassName != null ?
+        Class.forName(mapperClassName) : DEFAULT_MAPPER;
+
     String tableName = args[0];
     Path inputDir = new Path(args[1]);
     Job job = new Job(conf, NAME + "_" + tableName);
-    job.setJarByClass(TsvImporter.class);
+    job.setJarByClass(mapperClass);
     FileInputFormat.setInputPaths(job, inputDir);
     job.setInputFormatClass(TextInputFormat.class);
-    job.setMapperClass(TsvImporter.class);
+    job.setMapperClass(mapperClass);
 
     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
     if (hfileOutPath != null) {
@@ -326,9 +230,9 @@ public class ImportTsv {
       TableMapReduceUtil.initTableReducerJob(tableName, null, job);
       job.setNumReduceTasks(0);
     }
-    
+
     TableMapReduceUtil.addDependencyJars(job);
-    TableMapReduceUtil.addDependencyJars(job.getConfiguration(), 
+    TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
         com.google.common.base.Function.class /* Guava used by TsvParser */);
     return job;
   }
@@ -358,7 +262,8 @@ public class ImportTsv {
       "Other options that may be specified with -D include:\n" +
       "  -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
       "  '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" +
-      "  -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for
the import\n";
+      "  -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for
the import\n" +
+      "  -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of "
+ DEFAULT_MAPPER.getName() + "\n";
 
     System.err.println(usage);
   }

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java?rev=1124542&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java Thu
May 19 05:48:37 2011
@@ -0,0 +1,148 @@
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Write table content out to files in hdfs.
+ */
+public class TsvImporterMapper
+extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
+{
+
+  /** Timestamp for all inserted rows */
+  private long ts;
+
+  /** Column seperator */
+  private String separator;
+
+  /** Should skip bad lines */
+  private boolean skipBadLines;
+  private Counter badLineCount;
+
+  private ImportTsv.TsvParser parser;
+
+  public long getTs() {
+    return ts;
+  }
+
+  public boolean getSkipBadLines() {
+    return skipBadLines;
+  }
+
+  public Counter getBadLineCount() {
+    return badLineCount;
+  }
+
+  public void incrementBadLineCount(int count) {
+    this.badLineCount.increment(count);
+  }
+
+  /**
+   * Handles initializing this class with objects specific to it (i.e., the parser).
+   * Common initialization that might be leveraged by a subsclass is done in
+   * <code>doSetup</code>. Hence a subclass may choose to override this method
+   * and call <code>doSetup</code> as well before handling it's own custom params.
+   *
+   * @param context
+   */
+  @Override
+  protected void setup(Context context) {
+    doSetup(context);
+
+    Configuration conf = context.getConfiguration();
+
+    parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
+                           separator);
+    if (parser.getRowKeyColumnIndex() == -1) {
+      throw new RuntimeException("No row key column specified");
+    }
+  }
+
+  /**
+   * Handles common parameter initialization that a subclass might want to leverage.
+   * @param context
+   */
+  protected void doSetup(Context context) {
+    Configuration conf = context.getConfiguration();
+
+    // If a custom separator has been used,
+    // decode it back from Base64 encoding.
+    separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
+    if (separator == null) {
+      separator = ImportTsv.DEFAULT_SEPARATOR;
+    } else {
+      separator = new String(Base64.decode(separator));
+    }
+
+    ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, System.currentTimeMillis());
+
+    skipBadLines = context.getConfiguration().getBoolean(
+        ImportTsv.SKIP_LINES_CONF_KEY, true);
+    badLineCount = context.getCounter("ImportTsv", "Bad Lines");
+  }
+
+  /**
+   * Convert a line of TSV text into an HBase table row.
+   */
+  @Override
+  public void map(LongWritable offset, Text value,
+    Context context)
+  throws IOException {
+    byte[] lineBytes = value.getBytes();
+
+    try {
+      ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
+          lineBytes, value.getLength());
+      ImmutableBytesWritable rowKey =
+        new ImmutableBytesWritable(lineBytes,
+            parsed.getRowKeyOffset(),
+            parsed.getRowKeyLength());
+
+      Put put = new Put(rowKey.copyBytes());
+      for (int i = 0; i < parsed.getColumnCount(); i++) {
+        if (i == parser.getRowKeyColumnIndex()) continue;
+        KeyValue kv = new KeyValue(
+            lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
+            parser.getFamily(i), 0, parser.getFamily(i).length,
+            parser.getQualifier(i), 0, parser.getQualifier(i).length,
+            ts,
+            KeyValue.Type.Put,
+            lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
+        put.add(kv);
+      }
+      context.write(rowKey, put);
+    } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
+      if (skipBadLines) {
+        System.err.println(
+            "Bad line at offset: " + offset.get() + ":\n" +
+            badLine.getMessage());
+        incrementBadLineCount(1);
+        return;
+      } else {
+        throw new IOException(badLine);
+      }
+    } catch (IllegalArgumentException e) {
+      if (skipBadLines) {
+        System.err.println(
+            "Bad line at offset: " + offset.get() + ":\n" +
+            e.getMessage());
+        incrementBadLineCount(1);
+        return;
+      } else {
+        throw new IOException(e);
+      }
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java?rev=1124542&r1=1124541&r2=1124542&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java Thu May
19 05:48:37 2011
@@ -107,11 +107,11 @@ public class TestImportTsv {
           parsed.getColumnLength(i)));
     }
     if (!Iterables.elementsEqual(parsedCols, expected)) {
-      fail("Expected: " + Joiner.on(",").join(expected) + "\n" + 
+      fail("Expected: " + Joiner.on(",").join(expected) + "\n" +
           "Got:" + Joiner.on(",").join(parsedCols));
     }
   }
-  
+
   private void assertBytesEquals(byte[] a, byte[] b) {
     assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b));
   }
@@ -153,7 +153,7 @@ public class TestImportTsv {
     String TABLE_NAME = "TestTable";
     String FAMILY = "FAM";
     String INPUT_FILE = "InputFile.esv";
-    
+
     // Prepare the arguments required for the test.
     String[] args = new String[] {
         "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
@@ -162,6 +162,29 @@ public class TestImportTsv {
         INPUT_FILE
     };
 
+    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 1);
+  }
+
+  @Test
+  public void testMROnTableWithCustomMapper()
+  throws Exception {
+    String TABLE_NAME = "TestTable";
+    String FAMILY = "FAM";
+    String INPUT_FILE = "InputFile2.esv";
+
+    // Prepare the arguments required for the test.
+    String[] args = new String[] {
+        "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper",
+        TABLE_NAME,
+        INPUT_FILE
+    };
+
+    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3);
+  }
+
+  private void doMROnTableTest(String inputFile, String family, String tableName,
+                               String[] args, int valueMultiplier) throws Exception {
+
     // Cluster
     HBaseTestingUtility htu1 = new HBaseTestingUtility();
 
@@ -172,15 +195,15 @@ public class TestImportTsv {
     args = opts.getRemainingArgs();
 
     try {
-      
+
       FileSystem fs = FileSystem.get(conf);
-      FSDataOutputStream op = fs.create(new Path(INPUT_FILE), true);
+      FSDataOutputStream op = fs.create(new Path(inputFile), true);
       String line = "KEY\u001bVALUE1\u001bVALUE2\n";
       op.write(line.getBytes(HConstants.UTF8_ENCODING));
       op.close();
 
-      final byte[] FAM = Bytes.toBytes(FAMILY);
-      final byte[] TAB = Bytes.toBytes(TABLE_NAME);
+      final byte[] FAM = Bytes.toBytes(family);
+      final byte[] TAB = Bytes.toBytes(tableName);
       final byte[] QA = Bytes.toBytes("A");
       final byte[] QB = Bytes.toBytes("B");
 
@@ -210,9 +233,9 @@ public class TestImportTsv {
             assertEquals(toU8Str(kvs.get(1).getRow()),
                 toU8Str(Bytes.toBytes("KEY")));
             assertEquals(toU8Str(kvs.get(0).getValue()),
-                toU8Str(Bytes.toBytes("VALUE1")));
+                toU8Str(Bytes.toBytes("VALUE" + valueMultiplier)));
             assertEquals(toU8Str(kvs.get(1).getValue()),
-                toU8Str(Bytes.toBytes("VALUE2")));
+                toU8Str(Bytes.toBytes("VALUE" + 2*valueMultiplier)));
             // Only one result set is expected, so let it loop.
           }
           verified = true;

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java?rev=1124542&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java
(added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java
Thu May 19 05:48:37 2011
@@ -0,0 +1,61 @@
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.KeyValue;
+
+import java.io.IOException;
+
+/**
+ * Dummy mapper used for unit tests to verify that the mapper can be injected.
+ * This approach would be used if a custom transformation needed to be done after
+ * reading the input data before writing it to HFiles.
+ */
+public class TsvImporterCustomTestMapper extends TsvImporterMapper {
+
+  @Override
+  protected void setup(Context context) {
+    doSetup(context);
+  }
+
+  /**
+   * Convert a line of TSV text into an HBase table row after transforming the
+   * values by multiplying them by 3.
+   */
+  @Override
+  public void map(LongWritable offset, Text value, Context context)
+        throws IOException {
+    byte[] family = Bytes.toBytes("FAM");
+    final byte[][] qualifiers = { Bytes.toBytes("A"), Bytes.toBytes("B") };
+
+    // do some basic line parsing
+    byte[] lineBytes = value.getBytes();
+    String[] valueTokens = new String(lineBytes, "UTF-8").split("\u001b");
+
+    // create the rowKey and Put
+    ImmutableBytesWritable rowKey =
+      new ImmutableBytesWritable(Bytes.toBytes(valueTokens[0]));
+    Put put = new Put(rowKey.copyBytes());
+
+    //The value should look like this: VALUE1 or VALUE2. Let's multiply
+    //the integer by 3
+    for(int i = 1; i < valueTokens.length; i++) {
+      String prefix = valueTokens[i].substring(0, "VALUE".length());
+      String suffix = valueTokens[i].substring("VALUE".length());
+      String newValue = prefix + Integer.parseInt(suffix) * 3;
+
+      KeyValue kv = new KeyValue(rowKey.copyBytes(), family,
+          qualifiers[i-1], Bytes.toBytes(newValue));
+      put.add(kv);
+    }
+
+    try {
+      context.write(rowKey, put);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+}
\ No newline at end of file



Mime
View raw message