incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: User object is now being propogating form client throughout all remote calls and threads.
Date Mon, 30 Dec 2013 05:02:42 GMT
Updated Branches:
  refs/heads/master 8950ddd82 -> c2ac7ec31


User object is now being propogating form client throughout all remote calls and threads.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/c2ac7ec3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/c2ac7ec3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/c2ac7ec3

Branch: refs/heads/master
Commit: c2ac7ec31e23460eee9fb0bad31901a44df42b1e
Parents: 8950ddd
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Dec 30 00:01:50 2013 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Dec 30 00:02:31 2013 -0500

----------------------------------------------------------------------
 .../blur/thrift/ThriftBlurControllerServer.java |   1 +
 .../blur/thrift/ThriftBlurShardServer.java      |   1 +
 .../java/org/apache/blur/utils/BlurUtil.java    |  41 ++++++
 .../main/java/org/apache/blur/shell/Main.java   |  14 +-
 .../java/org/apache/blur/thrift/BlurClient.java |  64 +--------
 .../apache/blur/thrift/BlurClientManager.java   |   4 +
 .../org/apache/blur/thrift/UserConverter.java   |  37 +++++
 .../blur/thrift/util/SimpleQueryExample.java    |   5 +-
 .../blur/concurrent/BlurThreadPoolExecutor.java | 137 +++++++++++++++++++
 .../org/apache/blur/concurrent/Executors.java   |   4 +-
 .../concurrent/ThreadBoundaryProcessor.java     |  27 ++++
 .../main/java/org/apache/blur/user/User.java    |  39 ++++++
 .../java/org/apache/blur/user/UserContext.java  |  36 +++++
 .../blur/user/UserThreadBoundaryProcessor.java  |  43 ++++++
 14 files changed, 384 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c2ac7ec3/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
index b796f2b..b571c00 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
@@ -151,6 +151,7 @@ public class ThriftBlurControllerServer extends ThriftServer {
 
     Iface iface = BlurUtil.wrapFilteredBlurServer(configuration, controllerServer, false);
     iface = BlurUtil.recordMethodCallsAndAverageTimes(iface, Iface.class, true);
+    iface = BlurUtil.runWithUser(iface, true);
     iface = BlurUtil.runTrace(iface, true);
     int threadCount = configuration.getInt(BLUR_CONTROLLER_SERVER_THRIFT_THREAD_COUNT, 32);
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c2ac7ec3/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index 7293cd4..f9fef0b 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -236,6 +236,7 @@ public class ThriftBlurShardServer extends ThriftServer {
 
     Iface iface = BlurUtil.wrapFilteredBlurServer(configuration, shardServer, true);
     iface = BlurUtil.recordMethodCallsAndAverageTimes(iface, Iface.class, false);
+    iface = BlurUtil.runWithUser(iface, false);
     iface = BlurUtil.runTrace(iface, false);
     if (httpServer != null) {
       WebAppContext context = httpServer.getContext();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c2ac7ec3/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java b/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
index 3e4004f..ee954a8 100644
--- a/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
+++ b/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
@@ -77,6 +77,7 @@ import org.apache.blur.thirdparty.thrift_0_9_0.TBase;
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TJSONProtocol;
 import org.apache.blur.thirdparty.thrift_0_9_0.transport.TMemoryBuffer;
+import org.apache.blur.thrift.UserConverter;
 import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.BlurQuery;
@@ -94,6 +95,7 @@ import org.apache.blur.thrift.generated.Selector;
 import org.apache.blur.thrift.util.ResetableTMemoryBuffer;
 import org.apache.blur.trace.Trace;
 import org.apache.blur.trace.Tracer;
+import org.apache.blur.user.UserContext;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -1086,6 +1088,45 @@ public class BlurUtil {
     return iface;
   }
 
+  public static Iface runWithUser(final Iface iface, final boolean controller) {
+    InvocationHandler handler = new InvocationHandler() {
+      @Override
+      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+        if (method.getName().equals("setUser")) {
+          try {
+            return method.invoke(iface, args);
+          } catch (InvocationTargetException e) {
+            throw e.getTargetException();
+          }
+        }
+        BlurServerContext context = getServerContext(controller);
+        if (context == null) {
+          try {
+            return method.invoke(iface, args);
+          } catch (InvocationTargetException e) {
+            throw e.getTargetException();
+          }
+        }
+        UserContext.setUser(UserConverter.toUserFromThrift(context.getUser()));
+        try {
+          return method.invoke(iface, args);
+        } catch (InvocationTargetException e) {
+          throw e.getTargetException();
+        } finally {
+          UserContext.reset();
+        }
+      }
+
+      private BlurServerContext getServerContext(boolean controller) {
+        if (controller) {
+          return ControllerServerContext.getControllerServerContext();
+        }
+        return ShardServerContext.getShardServerContext();
+      }
+    };
+    return (Iface) Proxy.newProxyInstance(Iface.class.getClassLoader(), new Class[] { Iface.class
}, handler);
+  }
+
   public static Iface runTrace(final Iface iface, final boolean controller) {
     InvocationHandler handler = new InvocationHandler() {
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c2ac7ec3/blur-shell/src/main/java/org/apache/blur/shell/Main.java
----------------------------------------------------------------------
diff --git a/blur-shell/src/main/java/org/apache/blur/shell/Main.java b/blur-shell/src/main/java/org/apache/blur/shell/Main.java
index ec0ae8a..100e62f 100644
--- a/blur-shell/src/main/java/org/apache/blur/shell/Main.java
+++ b/blur-shell/src/main/java/org/apache/blur/shell/Main.java
@@ -49,8 +49,9 @@ import org.apache.blur.thrift.generated.Blur.Client;
 import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.Selector;
-import org.apache.blur.thrift.generated.User;
 import org.apache.blur.trace.Trace;
+import org.apache.blur.user.User;
+import org.apache.blur.user.UserContext;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
@@ -71,8 +72,6 @@ public class Main {
   static Map<String, Command> commands;
   static String cluster;
 
-  static User user;
-
   static String getCluster(Iface client) throws BlurException, TException, CommandException
{
     return getCluster(client,
         "There is more than one shard cluster, use \"cluster\" command to set the cluster
that should be in use.");
@@ -185,7 +184,7 @@ public class Main {
     @Override
     public void doit(PrintWriter out, Blur.Iface client, String[] args) throws CommandException,
TException,
         BlurException {
-      out.println("User [" + user + "]");
+      out.println("User [" + UserContext.getUser() + "]");
     }
 
     @Override
@@ -210,7 +209,7 @@ public class Main {
     public void doit(PrintWriter out, Blur.Iface client, String[] args) throws CommandException,
TException,
         BlurException {
       if (args.length == 1) {
-        user = null;
+        UserContext.reset();
         out.println("User reset.");
         return;
       }
@@ -220,8 +219,8 @@ public class Main {
         String[] parts = args[i].split("\\=");
         attributes.put(parts[0], parts[1]);
       }
-      user = new User(username, attributes);
-      out.println("User set [" + user + "]");
+      UserContext.setUser(new User(username, attributes));
+      out.println("User set [" + UserContext.getUser() + "]");
     }
 
     @Override
@@ -568,7 +567,6 @@ public class Main {
             } else {
               long start = System.nanoTime();
               try {
-                client.setUser(user);
                 String traceId = null;
                 if (trace) {
                   traceId = UUID.randomUUID().toString();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c2ac7ec3/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
index 086a1ad..7d5e1d2 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClient.java
@@ -23,46 +23,32 @@ import java.lang.reflect.Proxy;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thrift.commands.BlurCommand;
 import org.apache.blur.thrift.generated.Blur.Client;
 import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.User;
 
 public class BlurClient {
 
-  private static final Log LOG = LogFactory.getLog(BlurClient.class);
-  private static final boolean DEFAULT_SET_USER = false;
-
   static class BlurClientInvocationHandler implements InvocationHandler {
 
-    private static final String SET_USER = "setUser";
     private final List<Connection> _connections;
     private final int _maxRetries;
     private final long _backOffTime;
     private final long _maxBackOffTime;
-    private final boolean _setUser;
-    private User user;
-
-    public BlurClientInvocationHandler(List<Connection> connections) {
-      this(connections, DEFAULT_SET_USER);
-    }
 
     public BlurClientInvocationHandler(List<Connection> connections, int maxRetries,
long backOffTime,
-        long maxBackOffTime, boolean setUser) {
+        long maxBackOffTime) {
       _connections = connections;
       _maxRetries = maxRetries;
       _backOffTime = backOffTime;
       _maxBackOffTime = maxBackOffTime;
-      _setUser = setUser;
     }
 
-    public BlurClientInvocationHandler(List<Connection> connections, boolean setUser)
{
+    public BlurClientInvocationHandler(List<Connection> connections) {
       this(connections, BlurClientManager.MAX_RETRIES, BlurClientManager.BACK_OFF_TIME,
-          BlurClientManager.MAX_BACK_OFF_TIME, setUser);
+          BlurClientManager.MAX_BACK_OFF_TIME);
     }
 
     @Override
@@ -71,16 +57,7 @@ public class BlurClient {
         @Override
         public Object call(Client client) throws BlurException, TException {
           try {
-            if (_setUser && method.getName().equals(SET_USER)) {
-              user = (User) args[0];
-              LOG.info("Setting the user [{0}] for this client.", user);
-              return null;
-            } else {
-              if (_setUser) {
-                client.setUser(user);
-              }
-              return method.invoke(client, args);
-            }
+            return method.invoke(client, args);
           } catch (IllegalArgumentException e) {
             throw new RuntimeException(e);
           } catch (IllegalAccessException e) {
@@ -129,17 +106,6 @@ public class BlurClient {
     return getClient(connections, maxRetries, backOffTime, maxBackOffTime);
   }
 
-  public static Iface getClient(String connectionStr, boolean setUser) {
-    List<Connection> connections = BlurClientManager.getConnections(connectionStr);
-    return getClient(connections, setUser);
-  }
-
-  public static Iface getClient(String connectionStr, int maxRetries, long backOffTime, long
maxBackOffTime,
-      boolean setUser) {
-    List<Connection> connections = BlurClientManager.getConnections(connectionStr);
-    return getClient(connections, maxRetries, backOffTime, maxBackOffTime, setUser);
-  }
-
   public static Iface getClient(Connection connection) {
     return getClient(Arrays.asList(connection));
   }
@@ -149,33 +115,13 @@ public class BlurClient {
         new BlurClientInvocationHandler(connections));
   }
 
-  public static Iface getClient(Connection connection, boolean setUser) {
-    return getClient(Arrays.asList(connection), setUser);
-  }
-
-  public static Iface getClient(List<Connection> connections, boolean setUser) {
-    return (Iface) Proxy.newProxyInstance(Iface.class.getClassLoader(), new Class[] { Iface.class
},
-        new BlurClientInvocationHandler(connections, setUser));
-  }
-
   public static Iface getClient(Connection connection, int maxRetries, long backOffTime,
long maxBackOffTime) {
     return getClient(Arrays.asList(connection), maxRetries, backOffTime, maxBackOffTime);
   }
 
   public static Iface getClient(List<Connection> connections, int maxRetries, long
backOffTime, long maxBackOffTime) {
     return (Iface) Proxy.newProxyInstance(Iface.class.getClassLoader(), new Class[] { Iface.class
},
-        new BlurClientInvocationHandler(connections, maxRetries, backOffTime, maxBackOffTime,
DEFAULT_SET_USER));
-  }
-
-  public static Iface getClient(Connection connection, int maxRetries, long backOffTime,
long maxBackOffTime,
-      boolean setUser) {
-    return getClient(Arrays.asList(connection), maxRetries, backOffTime, maxBackOffTime,
setUser);
-  }
-
-  public static Iface getClient(List<Connection> connections, int maxRetries, long
backOffTime, long maxBackOffTime,
-      boolean setUser) {
-    return (Iface) Proxy.newProxyInstance(Iface.class.getClassLoader(), new Class[] { Iface.class
},
-        new BlurClientInvocationHandler(connections, maxRetries, backOffTime, maxBackOffTime,
setUser));
+        new BlurClientInvocationHandler(connections, maxRetries, backOffTime, maxBackOffTime));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c2ac7ec3/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
index 851361c..bf5b170 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
@@ -45,9 +45,11 @@ import org.apache.blur.thrift.generated.Blur;
 import org.apache.blur.thrift.generated.Blur.Client;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.ErrorType;
+import org.apache.blur.thrift.generated.User;
 import org.apache.blur.trace.Trace;
 import org.apache.blur.trace.Trace.TraceId;
 import org.apache.blur.trace.Tracer;
+import org.apache.blur.user.UserContext;
 
 public class BlurClientManager {
 
@@ -185,6 +187,8 @@ public class BlurClientManager {
             client.get().startTrace(traceId.getRootId(), traceId.getRequestId());
             trace = Trace.trace("thrift client", Trace.param("connection", getConnectionStr(client.get())));
           }
+          User user = UserConverter.toThriftUser(UserContext.getUser());
+          client.get().setUser(user);
           T result = command.call((CLIENT) client.get(), connection);
           allBad = false;
           if (command.isDetachClient()) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c2ac7ec3/blur-thrift/src/main/java/org/apache/blur/thrift/UserConverter.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/UserConverter.java b/blur-thrift/src/main/java/org/apache/blur/thrift/UserConverter.java
new file mode 100644
index 0000000..72d57e0
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/UserConverter.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.thrift;
+
+import org.apache.blur.thrift.generated.User;
+
+public class UserConverter {
+
+  public static User toThriftUser(org.apache.blur.user.User user) {
+    if (user == null) {
+      return null;
+    }
+    return new User(user.getUsername(), user.getAttributes());
+  }
+
+  public static org.apache.blur.user.User toUserFromThrift(User user) {
+    if (user == null) {
+      return null;
+    }
+    return new org.apache.blur.user.User(user.getUsername(), user.getAttributes());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c2ac7ec3/blur-thrift/src/main/java/org/apache/blur/thrift/util/SimpleQueryExample.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/util/SimpleQueryExample.java
b/blur-thrift/src/main/java/org/apache/blur/thrift/util/SimpleQueryExample.java
index e905ad0..3aae102 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/util/SimpleQueryExample.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/util/SimpleQueryExample.java
@@ -29,6 +29,8 @@ import org.apache.blur.thrift.generated.BlurResults;
 import org.apache.blur.thrift.generated.Query;
 import org.apache.blur.thrift.generated.Selector;
 import org.apache.blur.trace.Trace;
+import org.apache.blur.user.User;
+import org.apache.blur.user.UserContext;
 
 public class SimpleQueryExample {
 
@@ -41,6 +43,7 @@ public class SimpleQueryExample {
 
     String uuid = UUID.randomUUID().toString();
     Trace.setupTrace(uuid);
+    UserContext.setUser(new User("me", null));
     final BlurQuery blurQuery = new BlurQuery();
     Query query = new Query();
     blurQuery.setQuery(query);
@@ -52,6 +55,6 @@ public class SimpleQueryExample {
     for (BlurResult result : results.getResults()) {
       System.out.println(result);
     }
-    Trace.tearDownTrace();
+    // Trace.tearDownTrace();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c2ac7ec3/blur-util/src/main/java/org/apache/blur/concurrent/BlurThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/concurrent/BlurThreadPoolExecutor.java
b/blur-util/src/main/java/org/apache/blur/concurrent/BlurThreadPoolExecutor.java
new file mode 100644
index 0000000..a0b6f59
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/concurrent/BlurThreadPoolExecutor.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.concurrent;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+
+public class BlurThreadPoolExecutor extends ThreadPoolExecutor {
+
+  private static final Log LOG = LogFactory.getLog(BlurThreadPoolExecutor.class);
+
+  private final List<ThreadBoundaryProcessor> _processorCollection = new ArrayList<ThreadBoundaryProcessor>();
+
+  public BlurThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit,
+      BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
+    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+  }
+
+  public void add(ThreadBoundaryProcessor processor) {
+    _processorCollection.add(processor);
+  }
+
+  public void remove(ThreadBoundaryProcessor processor) {
+    _processorCollection.remove(processor);
+  }
+
+  public List<ThreadBoundaryProcessor> getPrePostCollection() {
+    return _processorCollection;
+  }
+
+  @Override
+  public void execute(Runnable command) {
+    super.execute(wrapRunnable(command));
+  }
+
+  @Override
+  public Future<?> submit(Runnable task) {
+    return super.submit(wrapRunnable(task));
+  }
+
+  @Override
+  public <T> Future<T> submit(Runnable task, T result) {
+    return super.submit(wrapRunnable(task), result);
+  }
+
+  @Override
+  public <T> Future<T> submit(Callable<T> task) {
+    return super.submit(wrapCallable(task));
+  }
+
+  private Runnable wrapRunnable(final Runnable runnable) {
+    final Object[] vars = preprocessCallingThread();
+    return new Runnable() {
+      @Override
+      public void run() {
+        Closeable[] closeables = preprocessNewThread(vars);
+        try {
+          runnable.run();
+        } finally {
+          cleanup(closeables);
+        }
+      }
+    };
+  }
+
+  protected void cleanup(Closeable[] closeables) {
+    for (Closeable closeable : closeables) {
+      cleanup(closeable);
+    }
+  }
+
+  private void cleanup(Closeable closeable) {
+    try {
+      closeable.close();
+    } catch (IOException e) {
+      LOG.error("Error during quiet close of [" + closeable + "] [" + e.getMessage() + "]");
+    }
+  }
+
+  private Closeable[] preprocessNewThread(Object[] vars) {
+    int size = _processorCollection.size();
+    Closeable[] closeables = new Closeable[size];
+    for (int i = 0; i < size; i++) {
+      closeables[i] = _processorCollection.get(i).preprocessNewThread(vars[i]);
+    }
+    return closeables;
+  }
+
+  private Object[] preprocessCallingThread() {
+    int size = _processorCollection.size();
+    Object[] vars = new Object[size];
+    for (int i = 0; i < size; i++) {
+      vars[i] = _processorCollection.get(i).preprocessCallingThread();
+    }
+    return vars;
+  }
+
+  private <T> Callable<T> wrapCallable(final Callable<T> callable) {
+    final Object[] vars = preprocessCallingThread();
+    return new Callable<T>() {
+      @Override
+      public T call() throws Exception {
+        Closeable[] closeables = preprocessNewThread(vars);
+        try {
+          return callable.call();
+        } finally {
+          cleanup(closeables);
+        }
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c2ac7ec3/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java b/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
index e9fce56..62185ec 100644
--- a/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
+++ b/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.blur.trace.Trace;
+import org.apache.blur.user.UserThreadBoundaryProcessor;
 
 public class Executors {
 
@@ -32,8 +33,9 @@ public class Executors {
   }
 
   public static ExecutorService newThreadPool(String prefix, int threadCount, boolean watch)
{
-    ThreadPoolExecutor executorService = new ThreadPoolExecutor(threadCount, threadCount,
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new BlurThreadFactory(prefix));
+    BlurThreadPoolExecutor executorService = new BlurThreadPoolExecutor(threadCount, threadCount,
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new BlurThreadFactory(prefix));
     executorService.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+    executorService.add(new UserThreadBoundaryProcessor());
     if (watch) {
       return Trace.getExecutorService(ThreadWatcher.instance().watch(executorService));
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c2ac7ec3/blur-util/src/main/java/org/apache/blur/concurrent/ThreadBoundaryProcessor.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/concurrent/ThreadBoundaryProcessor.java
b/blur-util/src/main/java/org/apache/blur/concurrent/ThreadBoundaryProcessor.java
new file mode 100644
index 0000000..1abcc04
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/concurrent/ThreadBoundaryProcessor.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.concurrent;
+
+import java.io.Closeable;
+
+public interface ThreadBoundaryProcessor {
+
+  Object preprocessCallingThread();
+
+  Closeable preprocessNewThread(Object object);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c2ac7ec3/blur-util/src/main/java/org/apache/blur/user/User.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/user/User.java b/blur-util/src/main/java/org/apache/blur/user/User.java
new file mode 100644
index 0000000..0af61e0
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/user/User.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.user;
+
+import java.util.Map;
+
+public class User {
+
+  private final String _username;
+  private final Map<String, String> _attributes;
+
+  public User(String username, Map<String, String> attributes) {
+    _username = username;
+    _attributes = attributes;
+  }
+
+  public String getUsername() {
+    return _username;
+  }
+
+  public Map<String, String> getAttributes() {
+    return _attributes;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c2ac7ec3/blur-util/src/main/java/org/apache/blur/user/UserContext.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/user/UserContext.java b/blur-util/src/main/java/org/apache/blur/user/UserContext.java
new file mode 100644
index 0000000..0c3dd5f
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/user/UserContext.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.user;
+
+
+public class UserContext {
+  
+  private static ThreadLocal<User> _user = new ThreadLocal<User>();
+  
+  public static void setUser(User user) {
+    _user.set(user);
+  }
+  
+  public static User getUser() {
+    return _user.get();
+  }
+  
+  public static void reset() {
+    setUser(null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c2ac7ec3/blur-util/src/main/java/org/apache/blur/user/UserThreadBoundaryProcessor.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/user/UserThreadBoundaryProcessor.java
b/blur-util/src/main/java/org/apache/blur/user/UserThreadBoundaryProcessor.java
new file mode 100644
index 0000000..3a7a5a5
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/user/UserThreadBoundaryProcessor.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.user;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.blur.concurrent.ThreadBoundaryProcessor;
+
+public class UserThreadBoundaryProcessor implements ThreadBoundaryProcessor {
+
+  @Override
+  public Object preprocessCallingThread() {
+    User user = UserContext.getUser();
+    return user;
+  }
+
+  @Override
+  public Closeable preprocessNewThread(Object object) {
+    UserContext.setUser((User) object);
+    return new Closeable() {
+      @Override
+      public void close() throws IOException {
+        UserContext.reset();
+      }
+    };
+  }
+
+}


Mime
View raw message