incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [09/20] git commit: Fixed BLUR-184 and BLUR-185.
Date Mon, 05 Aug 2013 18:56:46 GMT
Fixed BLUR-184 and BLUR-185.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/7970c906
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/7970c906
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/7970c906

Branch: refs/heads/0.2.0-newtypesystem
Commit: 7970c90688f79f35f8f69f40758382987507f154
Parents: 14fba2e
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri Aug 2 10:17:37 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri Aug 2 10:17:37 2013 -0400

----------------------------------------------------------------------
 .../blur/mapreduce/lib/CsvBlurDriver.java       |  11 +-
 .../blur/mapreduce/lib/CsvBlurMapper.java       | 129 +++++++++++++++----
 .../blur/mapreduce/lib/CsvBlurMapperTest.java   |  19 ++-
 3 files changed, 127 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7970c906/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
index 6ae329b..4c0fee3 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
@@ -36,25 +36,27 @@ public class CsvBlurDriver {
       BlurException, TException {
     Configuration configuration = new Configuration();
     String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
-    if (otherArgs.length != 4) {
+    if (otherArgs.length != 5) {
       System.err
-          .println("Usage: csvindexer <thrift controller connection str> <tablename>
<column family definitions> <in>");
+          .println("Usage: csvindexer <thrift controller connection str> <tablename>
<column family definitions> <auto generate record id> <in>");
       System.exit(2);
     }
     int c = 0;
     final String controllerConnectionStr = otherArgs[c++];
     final String tableName = otherArgs[c++];
     final String columnDefs = otherArgs[c++];
+    final Boolean autoGenerateRecordIds = Boolean.parseBoolean(otherArgs[c++]);
     final String input = otherArgs[c++];
 
     final Iface client = BlurClient.getClient(controllerConnectionStr);
     TableDescriptor tableDescriptor = client.describe(tableName);
-
+    
     Job job = new Job(configuration, "Blur indexer [" + tableName + "] [" + input + "]");
+    CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(job, autoGenerateRecordIds);
     job.setJarByClass(CsvBlurDriver.class);
     job.setMapperClass(CsvBlurMapper.class);
     job.setInputFormatClass(TextInputFormat.class);
-
+    
     FileInputFormat.addInputPath(job, new Path(input));
     CsvBlurMapper.setColumns(job, columnDefs);
     BlurOutputFormat.setupJob(job, tableDescriptor);
@@ -62,4 +64,5 @@ public class CsvBlurDriver {
     boolean waitForCompletion = job.waitForCompletion(true);
     System.exit(waitForCompletion ? 0 : 1);
   }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7970c906/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
index e5e73fb..4839ce1 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
@@ -17,6 +17,9 @@ package org.apache.blur.mapreduce.lib;
  * limitations under the License.
  */
 import java.io.IOException;
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -28,8 +31,8 @@ import java.util.TreeSet;
 import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -40,8 +43,9 @@ import com.google.common.base.Splitter;
  * This will parse a standard csv file into a {@link BlurMutate} object. Use the
  * static addColumns, and setSeparator methods to configure the class.
  */
-public class CsvBlurMapper extends BaseBlurMapper<LongWritable, Text> {
+public class CsvBlurMapper extends BaseBlurMapper<Writable, Text> {
 
+  public static final String BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA = "blur.csv.auto.generate.record.id.as.hash.of.data";
   public static final String BLUR_CSV_FAMILYISNOTINFILE = "blur.csv.familyisnotinfile";
   public static final String BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES = "blur.csv.family.path.mappings.families";
   public static final String BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX = "blur.csv.family.path.mappings.family.";
@@ -49,11 +53,13 @@ public class CsvBlurMapper extends BaseBlurMapper<LongWritable, Text>
{
   public static final String BLUR_CSV_FAMILY_COLUMN_PREFIX = "blur.csv.family.";
   public static final String BLUR_CSV_FAMILIES = "blur.csv.families";
 
-  private Map<String, List<String>> columnNameMap;
-  private String separator = ",";
-  private Splitter splitter;
-  private boolean familyNotInFile;
-  private String familyFromPath;
+  private Map<String, List<String>> _columnNameMap;
+  private String _separator = ",";
+  private Splitter _splitter;
+  private boolean _familyNotInFile;
+  private String _familyFromPath;
+  private boolean _autoGenerateRecordIdAsHashOfData;
+  private MessageDigest _digest;
 
   /**
    * Add a mapping for a family to a path. This is to be used when an entire
@@ -134,6 +140,45 @@ public class CsvBlurMapper extends BaseBlurMapper<LongWritable, Text>
{
   }
 
   /**
+   * If set to true the record id will be automatically generated as a hash of
+   * the data that the record contains.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param autoGenerateRecordIdAsHashOfData
+   *          boolean.
+   */
+  public static void setAutoGenerateRecordIdAsHashOfData(Job job, boolean autoGenerateRecordIdAsHashOfData)
{
+    setAutoGenerateRecordIdAsHashOfData(job.getConfiguration(), autoGenerateRecordIdAsHashOfData);
+  }
+
+  /**
+   * If set to true the record id will be automatically generated as a hash of
+   * the data that the record contains.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param autoGenerateRecordIdAsHashOfData
+   *          boolean.
+   */
+  public static void setAutoGenerateRecordIdAsHashOfData(Configuration configuration,
+      boolean autoGenerateRecordIdAsHashOfData) {
+    configuration.setBoolean(BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA, autoGenerateRecordIdAsHashOfData);
+  }
+
+  /**
+   * Gets whether or not to generate a recordid for the record based on the
+   * data.
+   * 
+   * @param configuration
+   *          the configuration.
+   * @return boolean.
+   */
+  public static boolean isAutoGenerateRecordIdAsHashOfData(Configuration configuration) {
+    return configuration.getBoolean(BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA, false);
+  }
+
+  /**
    * Sets all the family and column definitions.
    * 
    * @param job
@@ -249,16 +294,24 @@ public class CsvBlurMapper extends BaseBlurMapper<LongWritable, Text>
{
   protected void setup(Context context) throws IOException, InterruptedException {
     super.setup(context);
     Configuration configuration = context.getConfiguration();
+    _autoGenerateRecordIdAsHashOfData = isAutoGenerateRecordIdAsHashOfData(configuration);
+    if (_autoGenerateRecordIdAsHashOfData) {
+      try {
+        _digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new IOException(e);
+      }
+    }
     Collection<String> familyNames = configuration.getStringCollection(BLUR_CSV_FAMILIES);
-    columnNameMap = new HashMap<String, List<String>>();
+    _columnNameMap = new HashMap<String, List<String>>();
     for (String family : familyNames) {
       String[] columnsNames = configuration.getStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family);
-      columnNameMap.put(family, Arrays.asList(columnsNames));
+      _columnNameMap.put(family, Arrays.asList(columnsNames));
     }
-    splitter = Splitter.on(separator);
-    separator = configuration.get(BLUR_CSV_SEPARATOR, separator);
-    familyNotInFile = isFamilyNotInFile(configuration);
-    if (familyNotInFile) {
+    _splitter = Splitter.on(_separator);
+    _separator = configuration.get(BLUR_CSV_SEPARATOR, _separator);
+    _familyNotInFile = isFamilyNotInFile(configuration);
+    if (_familyNotInFile) {
       Path fileCurrentlyProcessing = getCurrentFile(context);
       Collection<String> families = configuration.getStringCollection(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES);
       for (String family : families) {
@@ -266,7 +319,7 @@ public class CsvBlurMapper extends BaseBlurMapper<LongWritable, Text>
{
         Path path = new Path(pathStr);
         path = path.makeQualified(path.getFileSystem(configuration));
         if (isParent(path, fileCurrentlyProcessing)) {
-          familyFromPath = family;
+          _familyFromPath = family;
           break;
         }
       }
@@ -294,12 +347,12 @@ public class CsvBlurMapper extends BaseBlurMapper<LongWritable, Text>
{
   }
 
   @Override
-  protected void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException
{
+  protected void map(Writable k, Text value, Context context) throws IOException, InterruptedException
{
     BlurRecord record = _mutate.getRecord();
     record.clearColumns();
     String str = value.toString();
 
-    Iterable<String> split = splitter.split(str);
+    Iterable<String> split = _splitter.split(str);
     List<String> list = toList(split);
 
     if (list.size() < 3) {
@@ -307,25 +360,49 @@ public class CsvBlurMapper extends BaseBlurMapper<LongWritable, Text>
{
     }
     int column = 0;
     record.setRowId(list.get(column++));
-    record.setRecordId(list.get(column++));
+    int offset = 2;
+    if (!_autoGenerateRecordIdAsHashOfData) {
+      record.setRecordId(list.get(column++));
+    } else {
+      offset--;
+      _digest.reset();
+      byte[] bs = value.getBytes();
+      int length = value.getLength();
+      _digest.update(bs, 0, length);
+      record.setRecordId(new BigInteger(_digest.digest()).toString(Character.MAX_RADIX));
+    }
     String family;
-    int offset;
-    if (familyNotInFile) {
-      family = familyFromPath;
-      offset = 2;
+    if (_familyNotInFile) {
+      family = _familyFromPath;
     } else {
       family = list.get(column++);
-      offset = 3;
+      offset++;
     }
     record.setFamily(family);
 
-    List<String> columnNames = columnNameMap.get(family);
+    List<String> columnNames = _columnNameMap.get(family);
     if (columnNames == null) {
       throw new IOException("Family [" + family + "] is missing in the definition.");
     }
     if (list.size() - offset != columnNames.size()) {
-      throw new IOException("Record [" + str + "] too short, does not match defined record
[rowid,recordid,family"
-          + getColumnNames(columnNames) + "].");
+      if (_familyNotInFile) {
+        if (_autoGenerateRecordIdAsHashOfData) {
+          throw new IOException("Record [" + str + "] too short, does not match defined record
[rowid,"
+              + getColumnNames(columnNames) + "].");
+        } else {
+          throw new IOException("Record [" + str + "] too short, does not match defined record
[rowid,recordid,"
+              + getColumnNames(columnNames) + "].");
+        }
+      } else {
+        if (_autoGenerateRecordIdAsHashOfData) {
+          throw new IOException("Record [" + str + "] too short, does not match defined record
[rowid,family"
+              + getColumnNames(columnNames) + "].");
+        } else {
+          throw new IOException("Record [" + str + "] too short, does not match defined record
[rowid,recordid,family"
+              + getColumnNames(columnNames) + "].");
+        }
+      }
+
     }
 
     for (int i = 0; i < columnNames.size(); i++) {
@@ -340,7 +417,7 @@ public class CsvBlurMapper extends BaseBlurMapper<LongWritable, Text>
{
   }
 
   public void setFamilyFromPath(String familyFromPath) {
-    this.familyFromPath = familyFromPath;
+    this._familyFromPath = familyFromPath;
   }
 
   private String getColumnNames(List<String> columnNames) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7970c906/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
index 2e28f6c..b653a07 100644
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
@@ -18,19 +18,19 @@ package org.apache.blur.mapreduce.lib;
  */
 import java.io.IOException;
 
-import org.apache.blur.mapreduce.lib.CsvBlurMapper;
 import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mrunit.mapreduce.MapDriver;
 import org.junit.Before;
 import org.junit.Test;
 
 public class CsvBlurMapperTest {
 
-  private MapDriver<LongWritable, Text, Text, BlurMutate> _mapDriver;
+  private MapDriver<Writable, Text, Text, BlurMutate> _mapDriver;
   private CsvBlurMapper _mapper;
 
   @Before
@@ -62,5 +62,20 @@ public class CsvBlurMapperTest {
         .addColumn("col1", "value1").addColumn("col2", "value2"));
     _mapDriver.runTest();
   }
+  
+  @Test
+  public void testMapperAutoGenerateRecordId() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(configuration, true);
+    CsvBlurMapper.setFamilyNotInFile(configuration, true);
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
+    _mapper.setFamilyFromPath("cf1");
+
+    _mapDriver.withInput(new LongWritable(), new Text("rowid1,value1,value2"));
+    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1",
"-25nqln3n2vb4cayex9y9tpxx3", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
 
 }
\ No newline at end of file


Mime
View raw message