incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Blur hive project now uses the bulk table update command to load new data from hive.
Date Wed, 27 May 2015 13:19:09 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master a9e6f32a8 -> ff9022bcf


Blur hive project now uses the bulk table update command to load new data from hive.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/ff9022bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/ff9022bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/ff9022bc

Branch: refs/heads/master
Commit: ff9022bcf6b67878594f99567769ff0613c29817
Parents: a9e6f32
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed May 27 09:19:32 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed May 27 09:19:32 2015 -0400

----------------------------------------------------------------------
 blur-command/pom.xml                            |   5 -
 .../apache/blur/command/BulkTableUpdate.java    | 177 -------------------
 .../blur/command/ControllerCommandManager.java  |   4 +-
 .../clusterstatus/ZookeeperClusterStatus.java   |  40 +++++
 .../hive/BlurHiveMRLoaderOutputCommitter.java   |  29 ++-
 .../apache/blur/hive/BlurHiveOutputFormat.java  |  64 ++++---
 .../java/org/apache/blur/hive/BlurSerDe.java    |   7 +-
 .../org/apache/blur/hive/BlurSerDeTest.java     |  43 +----
 blur-mapred/pom.xml                             |   5 +
 .../lib/update/BulkTableUpdateCommand.java      | 176 ++++++++++++++++++
 .../services/org.apache.blur.command.Commands   |  16 ++
 .../blur/mapreduce/lib/update/DriverTest.java   |  69 ++++++++
 .../org/apache/blur/utils/BlurConstants.java    |   1 +
 13 files changed, 386 insertions(+), 250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ff9022bc/blur-command/pom.xml
----------------------------------------------------------------------
diff --git a/blur-command/pom.xml b/blur-command/pom.xml
index 14dcf37..f01b180 100644
--- a/blur-command/pom.xml
+++ b/blur-command/pom.xml
@@ -32,11 +32,6 @@
 			<version>${project.version}</version>
 		</dependency>
 		<dependency>
-			<groupId>org.apache.blur</groupId>
-			<artifactId>blur-mapred</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
 			<groupId>commons-logging</groupId>
 			<artifactId>commons-logging</artifactId>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ff9022bc/blur-command/src/main/java/org/apache/blur/command/BulkTableUpdate.java
----------------------------------------------------------------------
diff --git a/blur-command/src/main/java/org/apache/blur/command/BulkTableUpdate.java b/blur-command/src/main/java/org/apache/blur/command/BulkTableUpdate.java
deleted file mode 100644
index 2c3a58e..0000000
--- a/blur-command/src/main/java/org/apache/blur/command/BulkTableUpdate.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.command;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.blur.BlurConfiguration;
-import org.apache.blur.command.annotation.OptionalArgument;
-import org.apache.blur.command.commandtype.ClusterExecuteCommandSingleTable;
-import org.apache.blur.mapreduce.lib.update.Driver;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.thirdparty.thrift_0_9_0.TException;
-import org.apache.blur.thrift.BlurClient;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.thrift.generated.TableStats;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.ToolRunner;
-
-public class BulkTableUpdate extends ClusterExecuteCommandSingleTable<Integer> {
-
-  private static final String YARN_SITE_XML = "yarn-site.xml";
-  private static final String HDFS_SITE_XML = "hdfs-site.xml";
-  private static final String BLUR_BULK_UPDATE_WORKING_PATH = "blur.bulk.update.working.path";
-  private static final String IMPORT = "import";
-  private static final String BULK_UPDATE = "bulk-update";
-
-  @OptionalArgument("The reducer multipler.")
-  private Integer reducerMultipler = 1;
-
-  @OptionalArgument("Automatically load the indexed data into the table when complete.")
-  private Boolean autoLoad = true;
-
-  @OptionalArgument("Block and wait for import to complete.")
-  private boolean waitForDataBeVisible = true;
-
-  @OptionalArgument("Additional configurations that may needed to execute the indexing job.")
-  private List<String> extraConfigs = new ArrayList<String>();
-
-  @Override
-  public String getName() {
-    return BULK_UPDATE;
-  }
-
-  @Override
-  public Integer clusterExecute(ClusterContext context) throws IOException, InterruptedException
{
-    String table = getTable();
-    BlurConfiguration blurConfiguration = context.getBlurConfiguration(table);
-    String blurZkConnection = blurConfiguration.get(BlurConstants.BLUR_ZOOKEEPER_CONNECTION);
-    TableContext tableContext = context.getTableContext(table);
-    TableDescriptor descriptor = tableContext.getDescriptor();
-    String tableUri = descriptor.getTableUri();
-    String mrIncWorkingPathStr = blurConfiguration.get(BLUR_BULK_UPDATE_WORKING_PATH);
-    Path mrIncWorkingPath = new Path(mrIncWorkingPathStr, table);
-    String outputPathStr = new Path(new Path(tableUri), IMPORT).toString();
-    Configuration configuration = new Configuration();
-    configuration.addResource(HDFS_SITE_XML);
-    configuration.addResource(YARN_SITE_XML);
-    for (String s : extraConfigs) {
-      if (s != null) {
-        InputStream inputStream = IOUtils.toInputStream(s);
-        configuration.addResource(inputStream);
-        inputStream.close();
-      }
-    }
-    int run;
-    try {
-      run = ToolRunner.run(configuration, new Driver(), new String[] { table, mrIncWorkingPath.toString(),
-          outputPathStr, blurZkConnection, Integer.toString(reducerMultipler) });
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-    if (run == 0 && autoLoad) {
-      Iface client = BlurClient.getClient();
-      try {
-        client.loadData(table, outputPathStr);
-        if (waitForDataBeVisible) {
-          waitForDataToBeVisible(client, table);
-        }
-      } catch (BlurException e) {
-        throw new IOException(e);
-      } catch (TException e) {
-        throw new IOException(e);
-      }
-    }
-    return run;
-  }
-
-  private void waitForDataToBeVisible(Iface client, String table) throws BlurException, TException,
-      InterruptedException {
-    while (true) {
-      TableStats tableStats = client.tableStats(table);
-      if (tableStats.getSegmentImportInProgressCount() > 0) {
-        break;
-      } else if (tableStats.getSegmentImportPendingCount() > 0) {
-        break;
-      }
-      Thread.sleep(1000);
-    }
-
-    // Once 0 is met wait for 5 more seconds, just in case there is a slow shard
-    // server.
-    for (int i = 0; i < 5; i++) {
-      INNER: while (true) {
-        TableStats tableStats = client.tableStats(table);
-        if (tableStats.getSegmentImportInProgressCount() == 0) {
-          break INNER;
-        } else if (tableStats.getSegmentImportPendingCount() == 0) {
-          break INNER;
-        }
-        Thread.sleep(1000);
-      }
-    }
-  }
-
-  public Integer getReducerMultipler() {
-    return reducerMultipler;
-  }
-
-  public void setReducerMultipler(Integer reducerMultipler) {
-    this.reducerMultipler = reducerMultipler;
-  }
-
-  public Boolean getAutoLoad() {
-    return autoLoad;
-  }
-
-  public void setAutoLoad(Boolean autoLoad) {
-    this.autoLoad = autoLoad;
-  }
-
-  public boolean isWaitForDataBeVisible() {
-    return waitForDataBeVisible;
-  }
-
-  public void setWaitForDataBeVisible(boolean waitForDataBeVisible) {
-    this.waitForDataBeVisible = waitForDataBeVisible;
-  }
-
-  public List<String> getExtraConfigs() {
-    return extraConfigs;
-  }
-
-  public void setExtraConfigs(List<String> extraConfigs) {
-    this.extraConfigs = extraConfigs;
-  }
-
-  public void addExtraConfig(Configuration configuration) throws IOException {
-    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-    configuration.writeXml(outputStream);
-    outputStream.close();
-    extraConfigs.add(new String(outputStream.toByteArray()));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ff9022bc/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
b/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
index 7b8282d..6f19257 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
@@ -73,7 +73,7 @@ public class ControllerCommandManager extends BaseCommandManager {
             return executeClusterCommand(context, command);
           }
           if (command instanceof ClusterExecuteCommand) {
-            throw new RuntimeException("Not implemented");
+            return executeClusterCommand(context, command);
           }
 
           throw new IOException("Command type of [" + command.getClass() + "] not supported.");
@@ -102,7 +102,7 @@ public class ControllerCommandManager extends BaseCommandManager {
 
   private Response executeClusterCommand(ClusterContext context, Command<?> command)
throws IOException,
       InterruptedException {
-    ClusterExecuteServerReadCommand<Object> clusterCommand = (ClusterExecuteServerReadCommand<Object>)
command;
+    ClusterExecuteCommand<Object> clusterCommand = (ClusterExecuteCommand<Object>)
command;
     Object object = clusterCommand.clusterExecute(context);
     return Response.createNewAggregateResponse(object);
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ff9022bc/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
b/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
index 469d92d..31b9996 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
@@ -51,6 +51,8 @@ import org.apache.blur.zookeeper.ZkUtils;
 import org.apache.blur.zookeeper.ZooKeeperLockManager;
 import org.apache.blur.zookeeper.ZookeeperPathConstants;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -59,8 +61,15 @@ import org.apache.zookeeper.data.Stat;
 
 public class ZookeeperClusterStatus extends ClusterStatus {
 
+  private static final String TMP = "tmp";
+
   private static final Log LOG = LogFactory.getLog(ZookeeperClusterStatus.class);
 
+  public static final String CACHE = "cache";
+  public static final String COMPLETE = "complete";
+  public static final String INPROGRESS = "inprogress";
+  public static final String NEW = "new";
+
   private final ZooKeeper _zk;
   private final BlurConfiguration _configuration;
   private final AtomicBoolean _running = new AtomicBoolean();
@@ -551,6 +560,7 @@ public class ZookeeperClusterStatus extends ClusterStatus {
       String table = BlurUtil.nullCheck(tableDescriptor.name, "tableDescriptor.name cannot
be null.");
       String cluster = BlurUtil.nullCheck(tableDescriptor.cluster, "tableDescriptor.cluster
cannot be null.");
       assignTableUri(tableDescriptor);
+      assignMapReduceWorkingPath(tableDescriptor);
       String uri = BlurUtil.nullCheck(tableDescriptor.tableUri, "tableDescriptor.tableUri
cannot be null.");
       int shardCount = BlurUtil.zeroCheck(tableDescriptor.shardCount,
           "tableDescriptor.shardCount cannot be less than 1");
@@ -597,6 +607,36 @@ public class ZookeeperClusterStatus extends ClusterStatus {
     tableDescriptor.setTableUri(tableUri);
   }
 
+  private void assignMapReduceWorkingPath(TableDescriptor tableDescriptor) throws IOException
{
+    Map<String, String> tableProperties = tableDescriptor.getTableProperties();
+    String mrIncWorkingPathStr = tableProperties.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
+    if (mrIncWorkingPathStr == null) {
+      // If not set on the table, try to use cluster default
+      mrIncWorkingPathStr = _configuration.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
+      if (mrIncWorkingPathStr == null) {
+        LOG.info("Could not setup map reduce working path for table [{0}]", tableDescriptor.getName());
+        return;
+      }
+      // Add table on the cluster default and add back to the table desc.
+      mrIncWorkingPathStr = new Path(mrIncWorkingPathStr, tableDescriptor.getName()).toString();
+      tableDescriptor.putToTableProperties(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH, mrIncWorkingPathStr);
+    }
+
+    Path mrIncWorkingPath = new Path(mrIncWorkingPathStr);
+    Path newData = new Path(mrIncWorkingPath, NEW);
+    Path tmpData = new Path(mrIncWorkingPath, TMP);
+    Path inprogressData = new Path(mrIncWorkingPath, INPROGRESS);
+    Path completeData = new Path(mrIncWorkingPath, COMPLETE);
+    Path fileCache = new Path(mrIncWorkingPath, CACHE);
+
+    FileSystem fileSystem = mrIncWorkingPath.getFileSystem(_config);
+    fileSystem.mkdirs(newData);
+    fileSystem.mkdirs(tmpData);
+    fileSystem.mkdirs(inprogressData);
+    fileSystem.mkdirs(completeData);
+    fileSystem.mkdirs(fileCache);
+  }
+
   @Override
   public void disableTable(String cluster, String table) {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ff9022bc/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveMRLoaderOutputCommitter.java
----------------------------------------------------------------------
diff --git a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveMRLoaderOutputCommitter.java
b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveMRLoaderOutputCommitter.java
index b14063d..936c822 100644
--- a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveMRLoaderOutputCommitter.java
+++ b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveMRLoaderOutputCommitter.java
@@ -18,6 +18,11 @@ package org.apache.blur.hive;
 
 import java.io.IOException;
 
+import org.apache.blur.mapreduce.lib.BlurOutputFormat;
+import org.apache.blur.mapreduce.lib.update.BulkTableUpdateCommand;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -29,6 +34,9 @@ import org.apache.hadoop.mapred.TaskAttemptContext;
 
 public class BlurHiveMRLoaderOutputCommitter extends OutputCommitter {
 
+  private static final String YARN_SITE_XML = "yarn-site.xml";
+  private static final String HDFS_SITE_XML = "hdfs-site.xml";
+
   private static final Log LOG = LogFactory.getLog(BlurHiveMRLoaderOutputCommitter.class);
 
   @Override
@@ -70,7 +78,7 @@ public class BlurHiveMRLoaderOutputCommitter extends OutputCommitter {
 
   private void finishBulkJob(JobContext context, boolean apply) throws IOException {
     Configuration configuration = context.getConfiguration();
-    String workingPathStr = configuration.get(BlurSerDe.BLUR_MR_UPDATE_WORKING_PATH);
+    String workingPathStr = configuration.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
     Path workingPath = new Path(workingPathStr);
     Path tmpDir = new Path(workingPath, "tmp");
     FileSystem fileSystem = tmpDir.getFileSystem(configuration);
@@ -81,7 +89,24 @@ public class BlurHiveMRLoaderOutputCommitter extends OutputCommitter {
       Path newDataPath = new Path(workingPath, "new");
       Path dst = new Path(newDataPath, loadId);
       if (!fileSystem.rename(loadPath, dst)) {
-        LOG.error("Could move data from src [" + loadPath + "] to dst [" + dst + "]");
+        LOG.error("Could not move data from src [" + loadPath + "] to dst [" + dst + "]");
+        throw new IOException("Could not move data from src [" + loadPath + "] to dst ["
+ dst + "]");
+      }
+
+      TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+      String connectionStr = configuration.get(BlurSerDe.BLUR_CONTROLLER_CONNECTION_STR);
+      BulkTableUpdateCommand bulkTableUpdateCommand = new BulkTableUpdateCommand();
+      bulkTableUpdateCommand.setAutoLoad(true);
+      bulkTableUpdateCommand.setTable(tableDescriptor.getName());
+      bulkTableUpdateCommand.setWaitForDataBeVisible(true);
+
+      Configuration config = new Configuration(false);
+      config.addResource(HDFS_SITE_XML);
+      config.addResource(YARN_SITE_XML);
+
+      bulkTableUpdateCommand.addExtraConfig(config);
+      if (bulkTableUpdateCommand.run(BlurClient.getClient(connectionStr)) != 0) {
+        throw new IOException("Unknown error occured duing load.");
       }
     } else {
       fileSystem.delete(loadPath, true);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ff9022bc/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
----------------------------------------------------------------------
diff --git a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
index 2e5e2dc..6216d9c 100644
--- a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
+++ b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
@@ -17,6 +17,7 @@
 package org.apache.blur.hive;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +41,7 @@ import org.apache.blur.thrift.generated.RecordMutationType;
 import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.thrift.generated.RowMutationType;
 import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.ShardUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -51,10 +53,13 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
 
 public class BlurHiveOutputFormat implements HiveOutputFormat<Text, BlurRecord> {
 
+  private static final String BLUR = "blur";
+  private static final String BLUR_USER_NAME = "blur.user.name";
   private static final String BLUR_BULK_MUTATE_ID = "blur.bulk.mutate.id";
 
   public static String getBulkId(Configuration conf) {
@@ -87,31 +92,50 @@ public class BlurHiveOutputFormat implements HiveOutputFormat<Text,
BlurRecord>
   }
 
   private org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getMrWorkingPathWriter(
-      Configuration configuration) throws IOException {
-    String workingPathStr = configuration.get(BlurSerDe.BLUR_MR_UPDATE_WORKING_PATH);
+      final Configuration configuration) throws IOException {
+    String workingPathStr = configuration.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
     Path workingPath = new Path(workingPathStr);
     Path tmpDir = new Path(workingPath, "tmp");
-    FileSystem fileSystem = tmpDir.getFileSystem(configuration);
+    final FileSystem fileSystem = tmpDir.getFileSystem(configuration);
     String loadId = configuration.get(BlurSerDe.BLUR_MR_LOAD_ID);
-    Path loadPath = new Path(tmpDir, loadId);
-
-    final Writer writer = new SequenceFile.Writer(fileSystem, configuration, new Path(loadPath,
UUID.randomUUID()
-        .toString()), Text.class, BlurRecord.class);
-
-    return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
+    final Path loadPath = new Path(tmpDir, loadId);
+    String user = getBlurUser(configuration);
+    UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
+    try {
+      return proxyUser
+          .doAs(new PrivilegedExceptionAction<org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter>()
{
+            @Override
+            public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter run() throws
Exception {
+              final Writer writer = new SequenceFile.Writer(fileSystem, configuration, new
Path(loadPath, UUID
+                  .randomUUID().toString()), Text.class, BlurRecord.class);
+
+              return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
+
+                @Override
+                public void write(Writable w) throws IOException {
+                  BlurRecord blurRecord = (BlurRecord) w;
+                  String rowId = blurRecord.getRowId();
+                  writer.append(new Text(rowId), blurRecord);
+                }
+
+                @Override
+                public void close(boolean abort) throws IOException {
+                  writer.close();
+                }
+              };
+            }
+          });
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
 
-      @Override
-      public void write(Writable w) throws IOException {
-        BlurRecord blurRecord = (BlurRecord) w;
-        String rowId = blurRecord.getRowId();
-        writer.append(new Text(rowId), blurRecord);
-      }
+  public static String getBlurUser(Configuration configuration) {
+    return configuration.get(BLUR_USER_NAME, BLUR);
+  }
 
-      @Override
-      public void close(boolean abort) throws IOException {
-        writer.close();
-      }
-    };
+  public static void setBlurUser(Configuration configuration, String blurUser) {
+    configuration.set(BLUR_USER_NAME, blurUser);
   }
 
   private org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getBulkRecordWriter(Configuration
configuration)

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ff9022bc/blur-hive/src/main/java/org/apache/blur/hive/BlurSerDe.java
----------------------------------------------------------------------
diff --git a/blur-hive/src/main/java/org/apache/blur/hive/BlurSerDe.java b/blur-hive/src/main/java/org/apache/blur/hive/BlurSerDe.java
index 1f09493..f1c0e41 100644
--- a/blur-hive/src/main/java/org/apache/blur/hive/BlurSerDe.java
+++ b/blur-hive/src/main/java/org/apache/blur/hive/BlurSerDe.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.io.Writable;
 
 public class BlurSerDe extends AbstractSerDe {
 
-  public static final String BLUR_MR_UPDATE_WORKING_PATH = "blur.mr.update.working.path";
   public static final String BLUR_MR_UPDATE_DISABLED = "blur.mr.update.disabled";
   public static final String BLUR_BLOCKING_APPLY = "blur.blocking.apply";
   public static final String BLUR_CONTROLLER_CONNECTION_STR = "blur.controller.connection.str";
@@ -87,10 +86,10 @@ public class BlurSerDe extends AbstractSerDe {
         TableDescriptor tableDescriptor = client.describe(table);
         Map<String, String> tableProperties = tableDescriptor.getTableProperties();
         if (tableProperties != null) {
-          String workingPath = tableProperties.get(BLUR_MR_UPDATE_WORKING_PATH);
+          String workingPath = tableProperties.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
           if (conf != null && workingPath != null) {
             if (!conf.getBoolean(BLUR_MR_UPDATE_DISABLED, false)) {
-              conf.set(BLUR_MR_UPDATE_WORKING_PATH, workingPath);
+              conf.set(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH, workingPath);
             }
           }
         }
@@ -175,7 +174,7 @@ public class BlurSerDe extends AbstractSerDe {
   }
 
   public static boolean shouldUseMRWorkingPath(Configuration configuration) {
-    String workingPath = configuration.get(BlurSerDe.BLUR_MR_UPDATE_WORKING_PATH);
+    String workingPath = configuration.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
     if (workingPath != null) {
       return true;
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ff9022bc/blur-hive/src/test/java/org/apache/blur/hive/BlurSerDeTest.java
----------------------------------------------------------------------
diff --git a/blur-hive/src/test/java/org/apache/blur/hive/BlurSerDeTest.java b/blur-hive/src/test/java/org/apache/blur/hive/BlurSerDeTest.java
index d343fcd..96609f2 100644
--- a/blur-hive/src/test/java/org/apache/blur/hive/BlurSerDeTest.java
+++ b/blur-hive/src/test/java/org/apache/blur/hive/BlurSerDeTest.java
@@ -40,7 +40,6 @@ import java.util.Properties;
 import org.apache.blur.MiniCluster;
 import org.apache.blur.mapreduce.lib.BlurColumn;
 import org.apache.blur.mapreduce.lib.BlurRecord;
-import org.apache.blur.mapreduce.lib.update.Driver;
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thrift.BlurClient;
 import org.apache.blur.thrift.generated.Blur.Iface;
@@ -50,7 +49,7 @@ import org.apache.blur.thrift.generated.BlurResults;
 import org.apache.blur.thrift.generated.ColumnDefinition;
 import org.apache.blur.thrift.generated.Query;
 import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.GCWatcher;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -131,7 +130,7 @@ public class BlurSerDeTest {
       tableDescriptor.setName(TEST);
       tableDescriptor.setShardCount(1);
       tableDescriptor.setTableUri(miniCluster.getFileSystemUri().toString() + "/blur/tables/test");
-      tableDescriptor.putToTableProperties(BlurSerDe.BLUR_MR_UPDATE_WORKING_PATH, _mrWorkingPath);
+      tableDescriptor.putToTableProperties(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH, _mrWorkingPath);
 
       client.createTable(tableDescriptor);
 
@@ -268,7 +267,6 @@ public class BlurSerDeTest {
   public void test2() throws SQLException, ClassNotFoundException, IOException, BlurException,
TException,
       InterruptedException {
     int totalRecords = runLoad(true);
-
     Iface client = BlurClient.getClientFromZooKeeperConnectionStr(miniCluster.getZkConnectionString());
     BlurQuery blurQuery = new BlurQuery();
     Query query = new Query();
@@ -280,32 +278,8 @@ public class BlurSerDeTest {
 
   @Test
   public void test3() throws Exception {
-
-    Path mrWorkingPath = new Path(_mrWorkingPath);
-    FileSystem fileSystem = miniCluster.getFileSystem();
-    fileSystem.mkdirs(mrWorkingPath);
-    fileSystem.mkdirs(new Path(mrWorkingPath, "tmp"));
-    fileSystem.mkdirs(new Path(mrWorkingPath, Driver.NEW));
-    fileSystem.mkdirs(new Path(mrWorkingPath, Driver.COMPLETE));
-    fileSystem.mkdirs(new Path(mrWorkingPath, Driver.CACHE));
-    fileSystem.mkdirs(new Path(mrWorkingPath, Driver.INPROGRESS));
-
     int totalRecords = runLoad(false);
-
-    Driver driver = new Driver();
-    driver.setConf(miniCluster.getMRConfiguration());
-
-    String outputPathStr = miniCluster.getFileSystemUri().toString() + "/indexoutput";
-    String blurZkConnection = miniCluster.getZkConnectionString();
-
-    assertEquals(0, driver.run(new String[] { TEST, _mrWorkingPath, outputPathStr, blurZkConnection,
"1" }));
-
-    Iface client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
-
-    client.loadData(TEST, outputPathStr);
-
-    waitUntilAllImportsAreCompleted(client, TEST);
-
+    Iface client = BlurClient.getClientFromZooKeeperConnectionStr(miniCluster.getZkConnectionString());
     BlurQuery blurQuery = new BlurQuery();
     Query query = new Query();
     query.setQuery("*");
@@ -314,17 +288,6 @@ public class BlurSerDeTest {
     assertEquals(totalRecords, results.getTotalResults());
   }
 
-  private void waitUntilAllImportsAreCompleted(Iface client, String tableName) throws BlurException,
TException,
-      InterruptedException {
-    while (true) {
-      Thread.sleep(1000);
-      TableStats tableStats = client.tableStats(tableName);
-      if (tableStats.getSegmentImportInProgressCount() == 0 && tableStats.getSegmentImportPendingCount()
== 0) {
-        return;
-      }
-    }
-  }
-
   private int runLoad(boolean disableMrUpdate) throws IOException, InterruptedException,
ClassNotFoundException,
       SQLException {
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ff9022bc/blur-mapred/pom.xml
----------------------------------------------------------------------
diff --git a/blur-mapred/pom.xml b/blur-mapred/pom.xml
index f311fda..d104b71 100644
--- a/blur-mapred/pom.xml
+++ b/blur-mapred/pom.xml
@@ -56,6 +56,11 @@
 		</dependency>
 		<dependency>
 			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-command</artifactId>
+			<version>${projectVersion}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
 			<artifactId>blur-util</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ff9022bc/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/BulkTableUpdateCommand.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/BulkTableUpdateCommand.java
b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/BulkTableUpdateCommand.java
new file mode 100644
index 0000000..98b5967
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/update/BulkTableUpdateCommand.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib.update;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.command.ClusterContext;
+import org.apache.blur.command.annotation.OptionalArgument;
+import org.apache.blur.command.commandtype.ClusterExecuteCommandSingleTable;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+
+public class BulkTableUpdateCommand extends ClusterExecuteCommandSingleTable<Integer>
{
+
+  private static final String YARN_SITE_XML = "yarn-site.xml";
+  private static final String HDFS_SITE_XML = "hdfs-site.xml";
+  private static final String IMPORT = "import";
+  private static final String BULK_UPDATE = "bulk-update";
+
+  @OptionalArgument("The reducer multipler.")
+  private Integer reducerMultipler = 1;
+
+  @OptionalArgument("Automatically load the indexed data into the table when complete.")
+  private Boolean autoLoad = true;
+
+  @OptionalArgument("Block and wait for import to complete.")
+  private boolean waitForDataBeVisible = true;
+
+  @OptionalArgument("Additional configurations that may needed to execute the indexing job.")
+  private List<String> extraConfigs = new ArrayList<String>();
+
+  @Override
+  public String getName() {
+    return BULK_UPDATE;
+  }
+
+  @Override
+  public Integer clusterExecute(ClusterContext context) throws IOException, InterruptedException
{
+    String table = getTable();
+    BlurConfiguration blurConfiguration = context.getBlurConfiguration(table);
+    String blurZkConnection = blurConfiguration.get(BlurConstants.BLUR_ZOOKEEPER_CONNECTION);
+    TableContext tableContext = context.getTableContext(table);
+    TableDescriptor descriptor = tableContext.getDescriptor();
+    String tableUri = descriptor.getTableUri();
+    String mrIncWorkingPathStr = blurConfiguration.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
+    Path mrIncWorkingPath = new Path(mrIncWorkingPathStr);
+    String outputPathStr = new Path(new Path(new Path(tableUri), IMPORT), Long.toString(System.currentTimeMillis()))
+        .toString();
+    Configuration configuration = new Configuration();
+    configuration.addResource(HDFS_SITE_XML);
+    configuration.addResource(YARN_SITE_XML);
+    for (String s : extraConfigs) {
+      if (s != null) {
+        InputStream inputStream = IOUtils.toInputStream(s);
+        configuration.addResource(inputStream);
+        inputStream.close();
+      }
+    }
+    int run;
+    try {
+      run = ToolRunner.run(configuration, new Driver(), new String[] { table, mrIncWorkingPath.toString(),
+          outputPathStr, blurZkConnection, Integer.toString(reducerMultipler) });
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
+    if (run == 0 && autoLoad) {
+      Iface client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
+      try {
+        client.loadData(table, outputPathStr);
+        if (waitForDataBeVisible) {
+          waitForDataToBeVisible(client, table);
+        }
+      } catch (BlurException e) {
+        throw new IOException(e);
+      } catch (TException e) {
+        throw new IOException(e);
+      }
+    }
+    return run;
+  }
+
+  private void waitForDataToBeVisible(Iface client, String table) throws BlurException, TException,
+      InterruptedException {
+    while (true) {
+      TableStats tableStats = client.tableStats(table);
+      if (tableStats.getSegmentImportInProgressCount() > 0) {
+        break;
+      } else if (tableStats.getSegmentImportPendingCount() > 0) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+
+    // Once 0 is met wait for 5 more seconds, just in case there is a slow shard
+    // server.
+    for (int i = 0; i < 5; i++) {
+      INNER: while (true) {
+        TableStats tableStats = client.tableStats(table);
+        if (tableStats.getSegmentImportInProgressCount() == 0 && tableStats.getSegmentImportPendingCount()
== 0) {
+          break INNER;
+        }
+        Thread.sleep(1000);
+      }
+    }
+  }
+
+  public Integer getReducerMultipler() {
+    return reducerMultipler;
+  }
+
+  public void setReducerMultipler(Integer reducerMultipler) {
+    this.reducerMultipler = reducerMultipler;
+  }
+
+  public Boolean getAutoLoad() {
+    return autoLoad;
+  }
+
+  public void setAutoLoad(Boolean autoLoad) {
+    this.autoLoad = autoLoad;
+  }
+
+  public boolean isWaitForDataBeVisible() {
+    return waitForDataBeVisible;
+  }
+
+  public void setWaitForDataBeVisible(boolean waitForDataBeVisible) {
+    this.waitForDataBeVisible = waitForDataBeVisible;
+  }
+
+  public List<String> getExtraConfigs() {
+    return extraConfigs;
+  }
+
+  public void setExtraConfigs(List<String> extraConfigs) {
+    this.extraConfigs = extraConfigs;
+  }
+
+  public void addExtraConfig(Configuration configuration) throws IOException {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    configuration.writeXml(outputStream);
+    outputStream.close();
+    extraConfigs.add(new String(outputStream.toByteArray()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ff9022bc/blur-mapred/src/main/resources/META-INF/services/org.apache.blur.command.Commands
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/resources/META-INF/services/org.apache.blur.command.Commands
b/blur-mapred/src/main/resources/META-INF/services/org.apache.blur.command.Commands
new file mode 100644
index 0000000..f6fc606
--- /dev/null
+++ b/blur-mapred/src/main/resources/META-INF/services/org.apache.blur.command.Commands
@@ -0,0 +1,16 @@
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+org.apache.blur.mapreduce.lib.update.BulkTableUpdateCommand
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ff9022bc/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/update/DriverTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/update/DriverTest.java
b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/update/DriverTest.java
index 9232e1a..8c8de6e 100644
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/update/DriverTest.java
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/update/DriverTest.java
@@ -226,6 +226,67 @@ public class DriverTest {
     }
   }
 
+  @Test
+  public void testBulkTableUpdateCommandUpdateRecordToExistingRow() throws Exception {
+    FileSystem fileSystem = miniCluster.getFileSystem();
+    Path root = new Path(fileSystem.getUri() + "/");
+
+    String tableName = "testBulkTableUpdateCommandUpdateRecordToExistingRow";
+    Iface client = getClient();
+    Path mrIncWorkingPath = new Path(new Path(root, "working"), tableName);
+    creatTable(tableName, new Path(root, "tables"), true, mrIncWorkingPath.toString());
+    String rowId = "row1";
+    String recordId = "record1";
+    addRow(client, tableName, rowId, recordId, "value1");
+
+    generateData(mrIncWorkingPath.toString(), rowId, recordId, "value2");
+
+    {
+      Selector selector = new Selector();
+      selector.setRowId(rowId);
+      FetchResult fetchRow = client.fetchRow(tableName, selector);
+      Row row = fetchRow.getRowResult().getRow();
+      assertEquals(rowId, row.getId());
+      List<Record> records = row.getRecords();
+      assertEquals(1, records.size());
+      Record record = records.get(0);
+      assertEquals(recordId, record.getRecordId());
+      List<Column> columns = record.getColumns();
+      assertEquals(1, columns.size());
+      Column column = columns.get(0);
+      assertEquals("col0", column.getName());
+      assertEquals("value1", column.getValue());
+    }
+
+    BulkTableUpdateCommand bulkTableUpdateCommand = new BulkTableUpdateCommand();
+    bulkTableUpdateCommand.setAutoLoad(true);
+    bulkTableUpdateCommand.setTable(tableName);
+    bulkTableUpdateCommand.setWaitForDataBeVisible(true);
+    bulkTableUpdateCommand.addExtraConfig(conf);
+    assertEquals(0, (int) bulkTableUpdateCommand.run(getClient()));
+
+    TableStats tableStats = client.tableStats(tableName);
+    assertEquals(1, tableStats.getRowCount());
+    assertEquals(1, tableStats.getRecordCount());
+
+    {
+      Selector selector = new Selector();
+      selector.setRowId(rowId);
+      FetchResult fetchRow = client.fetchRow(tableName, selector);
+      Row row = fetchRow.getRowResult().getRow();
+      assertEquals(rowId, row.getId());
+      List<Record> records = row.getRecords();
+      assertEquals(1, records.size());
+      Record record = records.get(0);
+      assertEquals(recordId, record.getRecordId());
+      List<Column> columns = record.getColumns();
+      assertEquals(1, columns.size());
+      Column column = columns.get(0);
+      assertEquals("col0", column.getName());
+      assertEquals("value2", column.getValue());
+    }
+  }
+
   private void addRow(Iface client, String tableName, String rowId, String recordId, String
value)
       throws BlurException, TException {
     List<RecordMutation> recordMutations = new ArrayList<RecordMutation>();
@@ -266,6 +327,11 @@ public class DriverTest {
   }
 
   private void creatTable(String tableName, Path tables, boolean fastDisable) throws BlurException,
TException {
+    creatTable(tableName, tables, fastDisable, null);
+  }
+
+  private void creatTable(String tableName, Path tables, boolean fastDisable, String workingPath)
throws BlurException,
+      TException {
     Path tablePath = new Path(tables, tableName);
     Iface client = getClient();
     TableDescriptor tableDescriptor = new TableDescriptor();
@@ -273,6 +339,9 @@ public class DriverTest {
     tableDescriptor.setName(tableName);
     tableDescriptor.setShardCount(2);
     tableDescriptor.putToTableProperties(BlurConstants.BLUR_TABLE_DISABLE_FAST_DIR, Boolean.toString(fastDisable));
+    if (workingPath != null) {
+      tableDescriptor.putToTableProperties(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH, workingPath);
+    }
     client.createTable(tableDescriptor);
 
     ColumnDefinition colDef = new ColumnDefinition();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ff9022bc/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
index 40db852..fdf6cad 100644
--- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -79,6 +79,7 @@ public class BlurConstants {
   public static final String BLUR_SHARD_INDEX_WRITER_SORT_MEMORY = "blur.shard.index.writer.sort.memory";
   public static final String BLUR_SHARD_INDEX_WRITER_SORT_FACTOR = "blur.shard.index.writer.sort.factor";
   public static final String BLUR_TABLE_DISABLE_FAST_DIR = "blur.table.disable.fast.dir";
+  public static final String BLUR_BULK_UPDATE_WORKING_PATH = "blur.bulk.update.working.path";
 
   public static final String BLUR_SHARD_SERVER_THRIFT_THREAD_COUNT = "blur.shard.server.thrift.thread.count";
   public static final String BLUR_SHARD_CACHE_MAX_TIMETOLIVE = "blur.shard.cache.max.timetolive";


Mime
View raw message