incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/7] git commit: More features, a family per path csv parser and driver program.
Date Thu, 16 May 2013 20:45:20 GMT
More features, a family per path csv parser and driver program.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/c2bc98a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/c2bc98a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/c2bc98a9

Branch: refs/heads/0.1.5
Commit: c2bc98a912d6e33179ecd0e2a0a0e81f8f69f9b6
Parents: 0cc4d72
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu May 16 16:08:19 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu May 16 16:08:19 2013 -0400

----------------------------------------------------------------------
 .../apache/blur/mapreduce/lib/CsvBlurDriver.java   |   13 +-
 .../mapreduce/lib/CsvBlurDriverFamilyPerInput.java |   73 ++++++++
 .../apache/blur/mapreduce/lib/CsvBlurMapper.java   |  135 ++++++++++++++-
 3 files changed, 208 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c2bc98a9/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
index c6dcf42..c92360e 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.thrift.TException;
 
-
 public class CsvBlurDriver {
 
   public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException,
@@ -39,14 +38,14 @@ public class CsvBlurDriver {
     String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
     if (otherArgs.length != 4) {
       System.err
-          .println("Usage: csvindexer <thrift controller connection str> <tablename>
<in> <column family definitions>");
+          .println("Usage: csvindexer <thrift controller connection str> <tablename>
<column family definitions> <in>");
       System.exit(2);
     }
-
-    final String controllerConnectionStr = otherArgs[0];
-    final String tableName = otherArgs[1];
-    final String input = otherArgs[2];
-    final String columnDefs = otherArgs[3];
+    int c = 0;
+    final String controllerConnectionStr = otherArgs[c++];
+    final String tableName = otherArgs[c++];
+    final String columnDefs = otherArgs[c++];
+    final String input = otherArgs[c++];
 
     final Iface client = BlurClient.getClient(controllerConnectionStr);
     TableDescriptor tableDescriptor = client.describe(tableName);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c2bc98a9/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriverFamilyPerInput.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriverFamilyPerInput.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriverFamilyPerInput.java
new file mode 100644
index 0000000..0493139
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriverFamilyPerInput.java
@@ -0,0 +1,73 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.thrift.TException;
+
+public class CsvBlurDriverFamilyPerInput {
+
+  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException,
+      BlurException, TException {
+    Configuration configuration = new Configuration();
+    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
+    if (otherArgs.length < 4) {
+      System.err
+          .println("Usage: csvindexer <thrift controller connection str> <tablename>
<column family definitions> <family=input path> ...");
+      System.exit(2);
+    }
+    int c = 0;
+    final String controllerConnectionStr = otherArgs[c++];
+    final String tableName = otherArgs[c++];
+    final String columnDefs = otherArgs[c++];
+
+    final Iface client = BlurClient.getClient(controllerConnectionStr);
+    TableDescriptor tableDescriptor = client.describe(tableName);
+
+    Job job = new Job(configuration, "Blur indexer [" + tableName + "] Mulitple Inputs");
+    job.setJarByClass(CsvBlurDriverFamilyPerInput.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    CsvBlurMapper.setColumns(job, columnDefs);
+    CsvBlurMapper.setFamilyNotInFile(job, true);
+
+    for (int i = c; i < otherArgs.length; i++) {
+      final String input = otherArgs[c++];
+      int indexOf = input.indexOf('=');
+      String family = input.substring(0, indexOf);
+      String pathStr = input.substring(indexOf + 1);
+      FileInputFormat.addInputPath(job, new Path(pathStr));
+      CsvBlurMapper.addFamilyPath(job, family, new Path(pathStr));
+    }
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+
+    boolean waitForCompletion = job.waitForCompletion(true);
+    System.exit(waitForCompletion ? 0 : 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c2bc98a9/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
index 0eec9a7..36474fd 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
@@ -27,9 +27,11 @@ import java.util.TreeSet;
 
 import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
 import com.google.common.base.Splitter;
 
@@ -39,6 +41,9 @@ import com.google.common.base.Splitter;
  */
 public class CsvBlurMapper extends BaseBlurMapper<LongWritable, Text> {
 
+  public static final String BLUR_CSV_FAMILYISNOTINFILE = "blur.csv.familyisnotinfile";
+  public static final String BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES = "blur.csv.family.path.mappings.families";
+  public static final String BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX = "blur.csv.family.path.mappings.family.";
   public static final String BLUR_CSV_SEPARATOR = "blur.csv.separator";
   public static final String BLUR_CSV_FAMILY_COLUMN_PREFIX = "blur.csv.family.";
   public static final String BLUR_CSV_FAMILIES = "blur.csv.families";
@@ -46,6 +51,86 @@ public class CsvBlurMapper extends BaseBlurMapper<LongWritable, Text>
{
   private Map<String, List<String>> columnNameMap;
   private String separator = ",";
   private Splitter splitter;
+  private boolean familyNotInFile;
+  private String familyFromPath;
+
+  /**
+   * Add a mapping for a family to a path. This is to be used when an entire
+   * path is to be processed as a single family and the data itself does not
+   * contain the family.<br/>
+   * <br/>
+   * 
+   * NOTE: the familyNotInFile property must be set before this method can be
+   * called.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param family
+   *          the family.
+   * @param path
+   *          the path.
+   */
+  public static void addFamilyPath(Job job, String family, Path path) {
+    addFamilyPath(job.getConfiguration(), family, path);
+  }
+
+  /**
+   * Sets the property familyIsNotInFile so that the parser know that the family
+   * is not to be parsed. Is to be used in conjunction with the addFamilyPath
+   * method.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param familyIsNotInFile
+   *          boolean.
+   */
+  public static void setFamilyNotInFile(Job job, boolean familyIsNotInFile) {
+    setFamilyNotInFile(job.getConfiguration(), familyIsNotInFile);
+  }
+
+  /**
+   * Add a mapping for a family to a path. This is to be used when an entire
+   * path is to be processed as a single family and the data itself does not
+   * contain the family.<br/>
+   * <br/>
+   * 
+   * NOTE: the familyNotInFile property must be set before this method can be
+   * called.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param family
+   *          the family.
+   * @param path
+   *          the path.
+   */
+  public static void addFamilyPath(Configuration configuration, String family, Path path)
{
+    Collection<String> mappings = configuration.getStringCollection(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES);
+    if (mappings == null) {
+      mappings = new TreeSet<String>();
+    }
+    mappings.add(family);
+    configuration.setStrings(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES, mappings.toArray(new
String[mappings.size()]));
+    configuration.set(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX + family, path.toString());
+  }
+
+  /**
+   * Sets the property familyIsNotInFile so that the parser know that the family
+   * is not to be parsed. Is to be used in conjunction with the addFamilyPath
+   * method.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param familyIsNotInFile
+   *          boolean.
+   */
+  public static void setFamilyNotInFile(Configuration configuration, boolean familyIsNotInFile)
{
+    configuration.setBoolean(BLUR_CSV_FAMILYISNOTINFILE, familyIsNotInFile);
+  }
+
+  public static boolean isFamilyNotInFile(Configuration configuration) {
+    return configuration.getBoolean(BLUR_CSV_FAMILYISNOTINFILE, false);
+  }
 
   /**
    * Sets all the family and column definitions.
@@ -171,6 +256,36 @@ public class CsvBlurMapper extends BaseBlurMapper<LongWritable, Text>
{
     }
     splitter = Splitter.on(separator);
     separator = configuration.get(BLUR_CSV_SEPARATOR, separator);
+    familyNotInFile = isFamilyNotInFile(configuration);
+    if (familyNotInFile) {
+      Path fileCurrentlyProcessing = getCurrentFile(context);
+      Collection<String> families = configuration.getStringCollection(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES);
+      for (String family : families) {
+        String pathStr = configuration.get(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX +
family);
+        Path path = new Path(pathStr);
+        path = path.makeQualified(path.getFileSystem(configuration));
+        if (isParent(path, fileCurrentlyProcessing)) {
+          familyFromPath = family;
+          break;
+        }
+      }
+    }
+  }
+
+  private boolean isParent(Path possibleParent, Path child) {
+    if (child == null) {
+      return false;
+    }
+    if (possibleParent.equals(child.getParent())) {
+      return true;
+    }
+    return isParent(possibleParent, child.getParent());
+  }
+
+  private Path getCurrentFile(Context context) throws IOException {
+    FileSplit inputSplit = (FileSplit) context.getInputSplit();
+    Path path = inputSplit.getPath();
+    return path.makeQualified(path.getFileSystem(context.getConfiguration()));
   }
 
   @Override
@@ -185,23 +300,31 @@ public class CsvBlurMapper extends BaseBlurMapper<LongWritable, Text>
{
     if (list.size() < 3) {
       throw new IOException("Record [" + str + "] too short.");
     }
-
-    record.setRowId(list.get(0));
-    record.setRecordId(list.get(1));
-    String family = list.get(2);
+    int column = 0;
+    record.setRowId(list.get(column++));
+    record.setRecordId(list.get(column++));
+    String family;
+    int offset;
+    if (familyNotInFile) {
+      family = familyFromPath;
+      offset = 2;
+    } else {
+      family = list.get(column++);
+      offset = 3;
+    }
     record.setFamily(family);
 
     List<String> columnNames = columnNameMap.get(family);
     if (columnNames == null) {
       throw new IOException("Family [" + family + "] is missing in the definition.");
     }
-    if (list.size() - 3 != columnNames.size()) {
+    if (list.size() - offset != columnNames.size()) {
       throw new IOException("Record [" + str + "] too short, does not match defined record
[rowid,recordid,family"
           + getColumnNames(columnNames) + "].");
     }
 
     for (int i = 0; i < columnNames.size(); i++) {
-      record.addColumn(columnNames.get(i), list.get(i + 3));
+      record.addColumn(columnNames.get(i), list.get(i + offset));
       _fieldCounter.increment(1);
     }
     _key.set(record.getRowId());


Mime
View raw message