incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/3] git commit: Fixing some issues with user permissions and ugi usage.
Date Thu, 18 Jun 2015 13:01:31 GMT
Fixing some issues with user permissions and ugi usage.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/ef53f55f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/ef53f55f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/ef53f55f

Branch: refs/heads/master
Commit: ef53f55f70bc471a7847da34f4ce8a4ed94a8195
Parents: e360515
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Jun 18 09:01:20 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Jun 18 09:01:20 2015 -0400

----------------------------------------------------------------------
 .../clusterstatus/ZookeeperClusterStatus.java   | 37 ++++++---
 .../hive/BlurHiveMRLoaderOutputCommitter.java   | 82 ++++++++++--------
 .../apache/blur/hive/BlurHiveOutputFormat.java  | 87 +++++++++++++-------
 .../org/apache/blur/utils/BlurConstants.java    |  1 +
 .../src/main/resources/blur-default.properties  |  3 +
 5 files changed, 135 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ef53f55f/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
b/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
index 7ae5761..388a490 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/clusterstatus/ZookeeperClusterStatus.java
@@ -53,6 +53,7 @@ import org.apache.blur.zookeeper.ZookeeperPathConstants;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -609,10 +610,7 @@ public class ZookeeperClusterStatus extends ClusterStatus {
 
   private void assignMapReduceWorkingPath(TableDescriptor tableDescriptor) throws IOException
{
     Map<String, String> tableProperties = tableDescriptor.getTableProperties();
-    String mrIncWorkingPathStr = null;
-    if (tableProperties != null) {
-      mrIncWorkingPathStr = tableProperties.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
-    }
+    String mrIncWorkingPathStr = getProperty(tableProperties, BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
     if (mrIncWorkingPathStr == null) {
       // If not set on the table, try to use cluster default
       mrIncWorkingPathStr = _configuration.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
@@ -626,18 +624,37 @@ public class ZookeeperClusterStatus extends ClusterStatus {
     }
 
     Path mrIncWorkingPath = new Path(mrIncWorkingPathStr);
+
+    FileSystem fileSystem = mrIncWorkingPath.getFileSystem(_config);
+
     Path newData = new Path(mrIncWorkingPath, NEW);
     Path tmpData = new Path(mrIncWorkingPath, TMP);
     Path inprogressData = new Path(mrIncWorkingPath, INPROGRESS);
     Path completeData = new Path(mrIncWorkingPath, COMPLETE);
     Path fileCache = new Path(mrIncWorkingPath, CACHE);
 
-    FileSystem fileSystem = mrIncWorkingPath.getFileSystem(_config);
-    fileSystem.mkdirs(newData);
-    fileSystem.mkdirs(tmpData);
-    fileSystem.mkdirs(inprogressData);
-    fileSystem.mkdirs(completeData);
-    fileSystem.mkdirs(fileCache);
+    String permission = getProperty(tableProperties, BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH_PERMISSION);
+    FsPermission fsPermission;
+    if (permission == null || permission.isEmpty()) {
+      fsPermission = FsPermission.getDirDefault();
+    } else {
+      fsPermission = new FsPermission(permission);
+    }
+
+    fileSystem.mkdirs(mrIncWorkingPath, fsPermission);
+    fileSystem.mkdirs(newData, fsPermission);
+    fileSystem.mkdirs(tmpData, fsPermission);
+    fileSystem.mkdirs(inprogressData, fsPermission);
+    fileSystem.mkdirs(completeData, fsPermission);
+    fileSystem.mkdirs(fileCache, fsPermission);
+  }
+
+  private String getProperty(Map<String, String> tableProperties, String name) {
+    String value = null;
+    if (tableProperties != null) {
+      value = tableProperties.get(name);
+    }
+    return value;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ef53f55f/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveMRLoaderOutputCommitter.java
----------------------------------------------------------------------
diff --git a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveMRLoaderOutputCommitter.java
b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveMRLoaderOutputCommitter.java
index 94a6415..e0f3b8d 100644
--- a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveMRLoaderOutputCommitter.java
+++ b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveMRLoaderOutputCommitter.java
@@ -17,6 +17,7 @@
 package org.apache.blur.hive;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.blur.mapreduce.lib.BlurOutputFormat;
 import org.apache.blur.mapreduce.lib.update.BulkTableUpdateCommand;
@@ -31,6 +32,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.OutputCommitter;
 import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.security.UserGroupInformation;
 
 public class BlurHiveMRLoaderOutputCommitter extends OutputCommitter {
 
@@ -77,41 +79,53 @@ public class BlurHiveMRLoaderOutputCommitter extends OutputCommitter {
     finishBulkJob(context, true);
   }
 
-  private void finishBulkJob(JobContext context, boolean apply) throws IOException {
-    Configuration configuration = context.getConfiguration();
-    String workingPathStr = configuration.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
-    Path workingPath = new Path(workingPathStr);
-    Path tmpDir = new Path(workingPath, "tmp");
-    FileSystem fileSystem = tmpDir.getFileSystem(configuration);
-    String loadId = configuration.get(BlurSerDe.BLUR_MR_LOAD_ID);
-    Path loadPath = new Path(tmpDir, loadId);
-
-    if (apply) {
-      Path newDataPath = new Path(workingPath, "new");
-      Path dst = new Path(newDataPath, loadId);
-      if (!fileSystem.rename(loadPath, dst)) {
-        LOG.error("Could not move data from src [" + loadPath + "] to dst [" + dst + "]");
-        throw new IOException("Could not move data from src [" + loadPath + "] to dst ["
+ dst + "]");
+  private void finishBulkJob(JobContext context, final boolean apply) throws IOException
{
+    final Configuration configuration = context.getConfiguration();
+    PrivilegedExceptionAction<Void> action = new PrivilegedExceptionAction<Void>()
{
+      @Override
+      public Void run() throws Exception {
+        String workingPathStr = configuration.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
+        Path workingPath = new Path(workingPathStr);
+        Path tmpDir = new Path(workingPath, "tmp");
+        FileSystem fileSystem = tmpDir.getFileSystem(configuration);
+        String loadId = configuration.get(BlurSerDe.BLUR_MR_LOAD_ID);
+        Path loadPath = new Path(tmpDir, loadId);
+
+        if (apply) {
+          Path newDataPath = new Path(workingPath, "new");
+          Path dst = new Path(newDataPath, loadId);
+          if (!fileSystem.rename(loadPath, dst)) {
+            LOG.error("Could not move data from src [" + loadPath + "] to dst [" + dst +
"]");
+            throw new IOException("Could not move data from src [" + loadPath + "] to dst
[" + dst + "]");
+          }
+
+          TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+          String connectionStr = configuration.get(BlurSerDe.BLUR_CONTROLLER_CONNECTION_STR);
+          BulkTableUpdateCommand bulkTableUpdateCommand = new BulkTableUpdateCommand();
+          bulkTableUpdateCommand.setAutoLoad(true);
+          bulkTableUpdateCommand.setTable(tableDescriptor.getName());
+          bulkTableUpdateCommand.setWaitForDataBeVisible(true);
+
+          Configuration config = new Configuration(false);
+          config.addResource(HDFS_SITE_XML);
+          config.addResource(YARN_SITE_XML);
+          config.addResource(MAPRED_SITE_XML);
+
+          bulkTableUpdateCommand.addExtraConfig(config);
+          if (bulkTableUpdateCommand.run(BlurClient.getClient(connectionStr)) != 0) {
+            throw new IOException("Unknown error occured duing load.");
+          }
+        } else {
+          fileSystem.delete(loadPath, true);
+        }
+        return null;
       }
-
-      TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
-      String connectionStr = configuration.get(BlurSerDe.BLUR_CONTROLLER_CONNECTION_STR);
-      BulkTableUpdateCommand bulkTableUpdateCommand = new BulkTableUpdateCommand();
-      bulkTableUpdateCommand.setAutoLoad(true);
-      bulkTableUpdateCommand.setTable(tableDescriptor.getName());
-      bulkTableUpdateCommand.setWaitForDataBeVisible(true);
-
-      Configuration config = new Configuration(false);
-      config.addResource(HDFS_SITE_XML);
-      config.addResource(YARN_SITE_XML);
-      config.addResource(MAPRED_SITE_XML);
-
-      bulkTableUpdateCommand.addExtraConfig(config);
-      if (bulkTableUpdateCommand.run(BlurClient.getClient(connectionStr)) != 0) {
-        throw new IOException("Unknown error occured duing load.");
-      }
-    } else {
-      fileSystem.delete(loadPath, true);
+    };
+    UserGroupInformation userGroupInformation = BlurHiveOutputFormat.getUGI(configuration);
+    try {
+      userGroupInformation.doAs(action);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ef53f55f/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
----------------------------------------------------------------------
diff --git a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
index 6216d9c..2c75a01 100644
--- a/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
+++ b/blur-hive/src/main/java/org/apache/blur/hive/BlurHiveOutputFormat.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.util.Progressable;
 
 public class BlurHiveOutputFormat implements HiveOutputFormat<Text, BlurRecord> {
 
+  private static final String BLUR_USER_PROXY = "blur.user.proxy";
   private static final String BLUR = "blur";
   private static final String BLUR_USER_NAME = "blur.user.name";
   private static final String BLUR_BULK_MUTATE_ID = "blur.bulk.mutate.id";
@@ -93,43 +94,67 @@ public class BlurHiveOutputFormat implements HiveOutputFormat<Text,
BlurRecord>
 
   private org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getMrWorkingPathWriter(
       final Configuration configuration) throws IOException {
-    String workingPathStr = configuration.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
-    Path workingPath = new Path(workingPathStr);
-    Path tmpDir = new Path(workingPath, "tmp");
-    final FileSystem fileSystem = tmpDir.getFileSystem(configuration);
-    String loadId = configuration.get(BlurSerDe.BLUR_MR_LOAD_ID);
-    final Path loadPath = new Path(tmpDir, loadId);
-    String user = getBlurUser(configuration);
-    UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
+    PrivilegedExceptionAction<org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter>
privilegedExceptionAction = new PrivilegedExceptionAction<org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter>()
{
+      @Override
+      public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter run() throws Exception
{
+        String workingPathStr = configuration.get(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH);
+        Path workingPath = new Path(workingPathStr);
+        Path tmpDir = new Path(workingPath, "tmp");
+        FileSystem fileSystem = tmpDir.getFileSystem(configuration);
+        String loadId = configuration.get(BlurSerDe.BLUR_MR_LOAD_ID);
+        Path loadPath = new Path(tmpDir, loadId);
+        final Writer writer = new SequenceFile.Writer(fileSystem, configuration, new Path(loadPath,
UUID.randomUUID()
+            .toString()), Text.class, BlurRecord.class);
+
+        return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
+
+          @Override
+          public void write(Writable w) throws IOException {
+            BlurRecord blurRecord = (BlurRecord) w;
+            String rowId = blurRecord.getRowId();
+            writer.append(new Text(rowId), blurRecord);
+          }
+
+          @Override
+          public void close(boolean abort) throws IOException {
+            writer.close();
+          }
+        };
+      }
+    };
+
+    UserGroupInformation userGroupInformation = getUGI(configuration);
     try {
-      return proxyUser
-          .doAs(new PrivilegedExceptionAction<org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter>()
{
-            @Override
-            public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter run() throws
Exception {
-              final Writer writer = new SequenceFile.Writer(fileSystem, configuration, new
Path(loadPath, UUID
-                  .randomUUID().toString()), Text.class, BlurRecord.class);
-
-              return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
-
-                @Override
-                public void write(Writable w) throws IOException {
-                  BlurRecord blurRecord = (BlurRecord) w;
-                  String rowId = blurRecord.getRowId();
-                  writer.append(new Text(rowId), blurRecord);
-                }
-
-                @Override
-                public void close(boolean abort) throws IOException {
-                  writer.close();
-                }
-              };
-            }
-          });
+      return userGroupInformation.doAs(privilegedExceptionAction);
     } catch (InterruptedException e) {
       throw new IOException(e);
     }
   }
 
+  public static UserGroupInformation getUGI(final Configuration configuration) throws IOException
{
+    String user = getBlurUser(configuration);
+    UserGroupInformation userGroupInformation;
+    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+    if (user.equals(currentUser.getUserName())) {
+      userGroupInformation = currentUser;
+    } else {
+      if (BlurHiveOutputFormat.isBlurUserAsProxy(configuration)) {
+        userGroupInformation = UserGroupInformation.createProxyUser(user, currentUser);
+      } else {
+        userGroupInformation = UserGroupInformation.createRemoteUser(user);
+      }
+    }
+    return userGroupInformation;
+  }
+
+  public static boolean isBlurUserAsProxy(Configuration configuration) {
+    return configuration.getBoolean(BLUR_USER_PROXY, false);
+  }
+
+  public static void setBlurUserAsProxy(Configuration configuration, boolean blurUserProxy)
{
+    configuration.setBoolean(BLUR_USER_PROXY, blurUserProxy);
+  }
+
   public static String getBlurUser(Configuration configuration) {
     return configuration.get(BLUR_USER_NAME, BLUR);
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ef53f55f/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
index 6d36711..24488e1 100644
--- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -80,6 +80,7 @@ public class BlurConstants {
   public static final String BLUR_SHARD_INDEX_WRITER_SORT_FACTOR = "blur.shard.index.writer.sort.factor";
   public static final String BLUR_TABLE_DISABLE_FAST_DIR = "blur.table.disable.fast.dir";
   public static final String BLUR_BULK_UPDATE_WORKING_PATH = "blur.bulk.update.working.path";
+  public static final String BLUR_BULK_UPDATE_WORKING_PATH_PERMISSION = "blur.bulk.update.working.path.permission";
 
   public static final String BLUR_SHARD_SERVER_THRIFT_THREAD_COUNT = "blur.shard.server.thrift.thread.count";
   public static final String BLUR_SHARD_CACHE_MAX_TIMETOLIVE = "blur.shard.cache.max.timetolive";

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ef53f55f/blur-util/src/main/resources/blur-default.properties
----------------------------------------------------------------------
diff --git a/blur-util/src/main/resources/blur-default.properties b/blur-util/src/main/resources/blur-default.properties
index 3a372fe..e2988c4 100644
--- a/blur-util/src/main/resources/blur-default.properties
+++ b/blur-util/src/main/resources/blur-default.properties
@@ -80,6 +80,9 @@ blur.tmp.path=
 # The map reduce working path for map reduce incremental updates.
 blur.bulk.update.working.path=
 
+# The desired permission on the blur.bulk.update.working.path and sub directories.  If blank
the default permissions will be applied.
+blur.bulk.update.working.path.permission=
+
 # The hostname for the shard, if blank the hostname is automatically detected
 blur.shard.hostname=
 


Mime
View raw message