incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [9/9] git commit: Updating hive implemenation to use new bulk mutate api.
Date Thu, 18 Dec 2014 23:21:58 GMT
Updating hive implemenation to use new bulk mutate api.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/f81e0106
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/f81e0106
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/f81e0106

Branch: refs/heads/master
Commit: f81e0106b0e47f23eeb19ce713546b41ac9d55c7
Parents: 89945db
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Dec 18 18:22:10 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Dec 18 18:22:10 2014 -0500

----------------------------------------------------------------------
 .../blur/hive/BlurHiveOutputCommitter.java      |  41 ++++---
 .../apache/blur/hive/BlurHiveOutputFormat.java  | 109 ++++++++++++++++---
 .../blur/hive/BlurHiveStorageHandler.java       |  23 +++-
 .../java/org/apache/blur/hive/BlurSerDe.java    |  15 ++-
 .../java/org/apache/blur/hive/CreateData.java   |   5 +-
 contrib/blur-hive/src/test/java/test.hive       |  48 ++++----
 6 files changed, 179 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f81e0106/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputCommitter.java
----------------------------------------------------------------------
diff --git a/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputCommitter.java
b/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputCommitter.java
index c46dacd..979cabd 100644
--- a/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputCommitter.java
+++ b/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputCommitter.java
@@ -18,54 +18,69 @@ package org.apache.blur.hive;
 
 import java.io.IOException;
 
-import org.apache.blur.mapreduce.lib.BlurOutputCommitter;
+import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.OutputCommitter;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 
 public class BlurHiveOutputCommitter extends OutputCommitter {
 
-  private BlurOutputCommitter _committer = new BlurOutputCommitter();
-
   @Override
   public void setupJob(JobContext jobContext) throws IOException {
-    _committer.setupJob(jobContext);
   }
 
   @Override
   public void setupTask(TaskAttemptContext taskContext) throws IOException {
-    _committer.setupTask(taskContext);
   }
 
   @Override
   public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
-    return _committer.needsTaskCommit(taskContext);
+    return false;
   }
 
   @Override
   public void commitTask(TaskAttemptContext taskContext) throws IOException {
-    _committer.commitTask(taskContext);
   }
 
   @Override
   public void abortTask(TaskAttemptContext taskContext) throws IOException {
-    _committer.abortTask(taskContext);
+
   }
 
   @Override
-  public void abortJob(JobContext jobContext, int status) throws IOException {
-    _committer.abortJob(jobContext, null);
+  public void abortJob(JobContext context, int status) throws IOException {
+    finishBulkJob(context, false);
   }
 
-  @SuppressWarnings("deprecation")
   @Override
   public void cleanupJob(JobContext context) throws IOException {
-    _committer.cleanupJob(context);
+
   }
 
   @Override
   public void commitJob(JobContext context) throws IOException {
-    _committer.commitJob(context);
+    finishBulkJob(context, true);
+  }
+
+  private void finishBulkJob(JobContext context, boolean apply) throws IOException {
+    Configuration configuration = context.getConfiguration();
+    String connectionStr = configuration.get(BlurSerDe.BLUR_CONTROLLER_CONNECTION_STR);
+    Iface client = BlurClient.getClient(connectionStr);
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    String bulkId = BlurHiveOutputFormat.getBulkId(configuration);
+    try {
+      client.bulkMutateFinish(tableDescriptor.getName(), bulkId, apply, false);
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f81e0106/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
b/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
index 836d16e..8561aa5 100644
--- a/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
+++ b/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
@@ -17,13 +17,29 @@
 package org.apache.blur.hive;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
-import org.apache.blur.mapreduce.lib.BlurMutate;
-import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
+import org.apache.blur.manager.BlurPartitioner;
+import org.apache.blur.mapreduce.lib.BlurColumn;
+import org.apache.blur.mapreduce.lib.BlurOutputFormat;
 import org.apache.blur.mapreduce.lib.BlurRecord;
-import org.apache.blur.mapreduce.lib.CheckOutputSpecs;
-import org.apache.blur.mapreduce.lib.GenericBlurRecordWriter;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.RecordMutation;
+import org.apache.blur.thrift.generated.RecordMutationType;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.RowMutationType;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -31,18 +47,23 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
 
 public class BlurHiveOutputFormat implements HiveOutputFormat<Text, BlurRecord> {
 
+  private static final String BLUR_BULK_MUTATE_ID = "blur.bulk.mutate.id";
+
+  public static String getBulkId(Configuration conf) {
+    return conf.get(BLUR_BULK_MUTATE_ID);
+  }
+
+  public static void setBulkId(Configuration conf, String bulkId) {
+    conf.set(BLUR_BULK_MUTATE_ID, bulkId);
+  }
+
   @Override
   public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf) throws IOException
{
-    try {
-      CheckOutputSpecs.checkOutputSpecs(jobConf, jobConf.getNumReduceTasks());
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
+
   }
 
   @Override
@@ -55,23 +76,79 @@ public class BlurHiveOutputFormat implements HiveOutputFormat<Text,
BlurRecord>
   public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf
jc,
       Path finalOutPath, Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties,
       Progressable progress) throws IOException {
-    TaskAttemptID taskAttemptID = TaskAttemptID.forName(jc.get("mapred.task.id"));
-    int id = taskAttemptID.getTaskID().getId();
-    final GenericBlurRecordWriter writer = new GenericBlurRecordWriter(jc, id, taskAttemptID.toString()
+ ".tmp");
+
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(jc);
+    String conStr = jc.get(BlurSerDe.BLUR_CONTROLLER_CONNECTION_STR);
+    final Iface controllerClient = BlurClient.getClient(conStr);
+    final String table = tableDescriptor.getName();
+    final int numberOfShardsInTable = tableDescriptor.getShardCount();
+    final String bulkId = getBulkId(jc);
     return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
 
+      private BlurPartitioner _blurPartitioner = new BlurPartitioner();
+
       @Override
       public void write(Writable w) throws IOException {
         BlurRecord blurRecord = (BlurRecord) w;
-        BlurMutate blurMutate = new BlurMutate(MUTATE_TYPE.REPLACE, blurRecord);
-        writer.write(new Text(blurRecord.getRowId()), blurMutate);
+        String rowId = blurRecord.getRowId();
+        RowMutation rowMutation = new RowMutation();
+        rowMutation.setTable(table);
+        rowMutation.setRowId(rowId);
+        rowMutation.setRowMutationType(RowMutationType.UPDATE_ROW);
+        rowMutation.addToRecordMutations(new RecordMutation(RecordMutationType.REPLACE_ENTIRE_RECORD,
+            toRecord(blurRecord)));
+
+        try {
+          Iface client = getClient(rowId);
+          client.bulkMutateAdd(table, bulkId, rowMutation);
+        } catch (BlurException e) {
+          throw new IOException(e);
+        } catch (TException e) {
+          throw new IOException(e);
+        }
+      }
+
+      private Iface getClient(String rowId) throws BlurException, TException {
+        int shard = _blurPartitioner.getShard(rowId, numberOfShardsInTable);
+        String shardId = BlurUtil.getShardName(shard);
+        return getClientFromShardId(table, shardId);
+      }
+
+      private Map<String, String> _shardToServerLayout;
+      private Map<String, Iface> _shardClients = new HashMap<String, Iface>();
+
+      private Iface getClientFromShardId(String table, String shardId) throws BlurException,
TException {
+        if (_shardToServerLayout == null) {
+          _shardToServerLayout = controllerClient.shardServerLayout(table);
+        }
+        return getClientFromConnectionStr(_shardToServerLayout.get(shardId));
+      }
+
+      private Iface getClientFromConnectionStr(String connectionStr) {
+        Iface iface = _shardClients.get(connectionStr);
+        if (iface == null) {
+          _shardClients.put(connectionStr, iface = BlurClient.getClient(connectionStr));
+        }
+        return iface;
       }
 
       @Override
       public void close(boolean abort) throws IOException {
-        writer.close();
+
       }
     };
   }
 
+  protected Record toRecord(BlurRecord blurRecord) {
+    return new Record(blurRecord.getRecordId(), blurRecord.getFamily(), toColumns(blurRecord.getColumns()));
+  }
+
+  private List<Column> toColumns(List<BlurColumn> columns) {
+    List<Column> result = new ArrayList<Column>();
+    for (BlurColumn blurColumn : columns) {
+      result.add(new Column(blurColumn.getName(), blurColumn.getValue()));
+    }
+    return result;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f81e0106/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveStorageHandler.java
----------------------------------------------------------------------
diff --git a/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveStorageHandler.java
b/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveStorageHandler.java
index 348438a..f9ff7a7 100644
--- a/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveStorageHandler.java
+++ b/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveStorageHandler.java
@@ -18,8 +18,13 @@ package org.apache.blur.hive;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -48,20 +53,26 @@ public class BlurHiveStorageHandler extends DefaultStorageHandler {
 
   @Override
   public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
-    // Will set setup Table Descriptor and Output Committer.
-    jobConf.setPartitionerClass(BlurHiveParitioner.class);
-    jobConf.setOutputCommitter(BlurHiveOutputCommitter.class);
-    TableDescriptor tableDescriptor;
     try {
-      tableDescriptor = BlurOutputFormat.getTableDescriptor(jobConf);
+      String bulkId = UUID.randomUUID().toString();
+      String connectionStr = jobConf.get(BlurSerDe.BLUR_CONTROLLER_CONNECTION_STR);
+      Iface client = BlurClient.getClient(connectionStr);
+      TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(jobConf);
+      client.bulkMutateStart(tableDescriptor.getName(), bulkId);
+      BlurHiveOutputFormat.setBulkId(jobConf, bulkId);
+      jobConf.setOutputCommitter(BlurHiveOutputCommitter.class);
     } catch (IOException e) {
       throw new RuntimeException(e);
+    } catch (BlurException e) {
+      throw new RuntimeException(e);
+    } catch (TException e) {
+      throw new RuntimeException(e);
     }
-    jobConf.setNumReduceTasks(tableDescriptor.getShardCount());
   }
 
   @Override
   public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String>
jobProperties) {
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f81e0106/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurSerDe.java
----------------------------------------------------------------------
diff --git a/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurSerDe.java b/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurSerDe.java
index ba1b5aa..b35feec 100644
--- a/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurSerDe.java
+++ b/contrib/blur-hive/src/main/java/org/apache/blur/hive/BlurSerDe.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.io.Writable;
 
 public class BlurSerDe extends AbstractSerDe {
 
+  public static final String BLUR_CONTROLLER_CONNECTION_STR = "BLUR_CONTROLLER_CONNECTION_STR";
   private static final String FAMILY = "blur.family";
   private static final String TABLE = "blur.table";
   private String _family;
@@ -66,7 +67,6 @@ public class BlurSerDe extends AbstractSerDe {
     }
 
     Iface client = BlurClient.getClient(configuration);
-    // TableDescriptor tableDescriptor;
     Schema schema;
     try {
       List<String> tableList = client.tableList();
@@ -76,6 +76,7 @@ public class BlurSerDe extends AbstractSerDe {
       if (conf != null) {
         TableDescriptor tableDescriptor = client.describe(table);
         BlurOutputFormat.setTableDescriptor(conf, tableDescriptor);
+        conf.set(BLUR_CONTROLLER_CONNECTION_STR, getControllerConnectionStr(client));
       }
       schema = client.schema(table);
     } catch (BlurException e) {
@@ -107,6 +108,18 @@ public class BlurSerDe extends AbstractSerDe {
     _serializer = new BlurSerializer();
   }
 
+  private String getControllerConnectionStr(Iface client) throws BlurException, TException
{
+    List<String> controllerServerList = client.controllerServerList();
+    StringBuilder builder = new StringBuilder();
+    for (String c : controllerServerList) {
+      if (builder.length() != 0) {
+        builder.append(',');
+      }
+      builder.append(c);
+    }
+    return builder.toString();
+  }
+
   @Override
   public Writable serialize(Object o, ObjectInspector oi) throws SerDeException {
     return _serializer.serialize(o, oi, _columnNames, _columnTypes, _schema, _family);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f81e0106/contrib/blur-hive/src/test/java/org/apache/blur/hive/CreateData.java
----------------------------------------------------------------------
diff --git a/contrib/blur-hive/src/test/java/org/apache/blur/hive/CreateData.java b/contrib/blur-hive/src/test/java/org/apache/blur/hive/CreateData.java
index d51841e..63bf568 100644
--- a/contrib/blur-hive/src/test/java/org/apache/blur/hive/CreateData.java
+++ b/contrib/blur-hive/src/test/java/org/apache/blur/hive/CreateData.java
@@ -34,11 +34,12 @@ public class CreateData {
     FileSystem fileSystem = path.getFileSystem(configuration);
     FSDataOutputStream outputStream = fileSystem.create(path);
     PrintWriter print = new PrintWriter(outputStream);
-    for (int i = 0; i < 10; i++) {
+    int rows = 100000;
+    for (int i = 0; i < rows; i++) {
       String s = Integer.toString(i);
       print.print(s);
       print.print(SEP);
-      print.print(s);
+      print.print(s + "-" + System.currentTimeMillis());
       for (int c = 0; c < 10; c++) {
         print.print(SEP);
         print.print(s);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f81e0106/contrib/blur-hive/src/test/java/test.hive
----------------------------------------------------------------------
diff --git a/contrib/blur-hive/src/test/java/test.hive b/contrib/blur-hive/src/test/java/test.hive
index f448e1a..1b92dae 100644
--- a/contrib/blur-hive/src/test/java/test.hive
+++ b/contrib/blur-hive/src/test/java/test.hive
@@ -52,38 +52,38 @@ add jar /Users/amccurry/Development/incubator-blur/distribution/target/apache-bl
 add jar /Users/amccurry/Development/incubator-blur/distribution/target/apache-blur-0.2.4-incubating-SNAPSHOT-hadoop1-bin/lib/spatial4j-0.3.jar;
 add jar /Users/amccurry/Development/incubator-blur/distribution/target/apache-blur-0.2.4-incubating-SNAPSHOT-hadoop1-bin/lib/zookeeper-3.4.5.jar;
 
--- create database test;
+create database if not exists test;
 use test;
 
---  CREATE TABLE test
---  ROW FORMAT SERDE 'org.apache.blur.hive.BlurSerDe'
---  WITH SERDEPROPERTIES (
---    'blur.zookeeper.connection'='localhost',
---    'blur.table'='test_hdfs',
---    'blur.family'='fam0'
---  )
--- STORED BY 'org.apache.blur.hive.BlurHiveStorageHandler';
+CREATE TABLE if not exists test  
+ROW FORMAT SERDE 'org.apache.blur.hive.BlurSerDe'
+WITH SERDEPROPERTIES (
+  'blur.zookeeper.connection'='localhost',
+  'blur.table'='test_hdfs',
+  'blur.family'='fam0'
+)
+STORED BY 'org.apache.blur.hive.BlurHiveStorageHandler';
 
 desc test;
 
--- create table input_data (
--- rowid string,
--- recordid string,
--- col0 string,
--- col1 string,
--- col2 string,
--- col3 string,
--- col4 string,
--- col5 string,
--- col6 string,
--- col7 string,
--- col8 string,
--- col9 string
--- );
+create table if not exists input_data (
+ rowid string,
+ recordid string,
+ col0 string,
+ col1 string,
+ col2 string,
+ col3 string,
+ col4 string,
+ col5 string,
+ col6 string,
+ col7 string,
+ col8 string,
+ col9 string
+);
 
 select * from input_data;
 
-insert overwrite table test select * from input_data distribute by rowid;
+insert overwrite table test select * from input_data;
 
 
 


Mime
View raw message