Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1E40F17EE2 for ; Mon, 1 Jun 2015 22:19:30 +0000 (UTC) Received: (qmail 71107 invoked by uid 500); 1 Jun 2015 22:19:30 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 71075 invoked by uid 500); 1 Jun 2015 22:19:30 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 71066 invoked by uid 99); 1 Jun 2015 22:19:30 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Jun 2015 22:19:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 8FD141A424A for ; Mon, 1 Jun 2015 22:19:29 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id up2lm9gRgW-G for ; Mon, 1 Jun 2015 22:19:27 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id D2B6047C2C for ; Mon, 1 Jun 2015 22:19:15 +0000 (UTC) Received: (qmail 70191 invoked by uid 99); 1 Jun 2015 22:19:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Jun 2015 22:19:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D769DE083A; Mon, 1 Jun 2015 22:19:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.incubator.apache.org Date: Mon, 01 Jun 2015 22:19:39 -0000 Message-Id: <908255e369ac4a3e88fd4d49d3f15de0@git.apache.org> In-Reply-To: <2331b7ed2ccf46ad99699422a6f7ad50@git.apache.org> References: <2331b7ed2ccf46ad99699422a6f7ad50@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [27/50] [abbrv] incubator-ignite git commit: [IGNITE-958]: IGNITE-218 (Wrong staging permissions while running MR job under hadoop accelerator): IGFS part. [IGNITE-958]: IGNITE-218 (Wrong staging permissions while running MR job under hadoop accelerator): IGFS part. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8455c7a6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8455c7a6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8455c7a6 Branch: refs/heads/ignite-389 Commit: 8455c7a6ed6f7449c7ad31b1ef7b129705262e1b Parents: 3538819 Author: iveselovskiy Authored: Fri May 29 15:40:26 2015 +0300 Committer: iveselovskiy Committed: Fri May 29 15:40:26 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/igfs/IgfsUserContext.java | 119 +++++++++++ .../hadoop/fs/HadoopLazyConcurrentMap.java | 204 +++++++++++++++++++ 2 files changed, 323 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8455c7a6/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java new file mode 100644 index 0000000..5a65bdb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.igfs; + +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.concurrent.*; + +/** + * Provides ability to execute IGFS code in a context of a specific user. + */ +public abstract class IgfsUserContext { + /** Thread local to hold the current user context. */ + private static final ThreadLocal userStackThreadLocal = new ThreadLocal<>(); + + /** + * Executes given callable in the given user context. + * The main contract of this method is that {@link #currentUser()} method invoked + * inside closure always returns 'user' this callable executed with. + * @param user the user name to invoke closure on behalf of. + * @param clo the closure to execute + * @param The type of closure result. + * @return the result of closure execution. + * @throws IllegalArgumentException if user name is null or empty String or if the closure is null. + */ + public static T doAs(String user, final IgniteOutClosure clo) { + if (F.isEmpty(user)) + throw new IllegalArgumentException("Failed to use null or empty user name."); + + final String ctxUser = userStackThreadLocal.get(); + + if (F.eq(ctxUser, user)) + return clo.apply(); // correct context is already there + + userStackThreadLocal.set(user); + + try { + return clo.apply(); + } + finally { + userStackThreadLocal.set(ctxUser); + } + } + + /** + * Same contract that {@link #doAs(String, IgniteOutClosure)} has, but accepts + * callable that throws checked Exception. + * The Exception is not ever wrapped anyhow. + * If your Callable throws Some specific checked Exceptions, the recommended usage pattern is: + *
+     *  public Foo myOperation() throws MyCheckedException1, MyCheckedException2 {
+     *      try {
+     *          return IgfsUserContext.doAs(user, new Callable() {
+     *              @Override public Foo call() throws MyCheckedException1, MyCheckedException2 {
+     *                  return makeSomeFoo(); // do the job
+     *              }
+     *          });
+     *      }
+     *      catch (MyCheckedException1 | MyCheckedException2 | RuntimeException | Error e) {
+     *          throw e;
+     *      }
+     *      catch (Exception e) {
+     *          throw new AssertionError("Must never go there.");
+     *      }
+     *  }
+     * 
+ * @param user the user name to invoke closure on behalf of. + * @param clbl the Callable to execute + * @param The type of callable result. + * @return the result of closure execution. + * @throws IllegalArgumentException if user name is null or empty String or if the closure is null. + */ + public static T doAs(String user, final Callable clbl) throws Exception { + if (F.isEmpty(user)) + throw new IllegalArgumentException("Failed to use null or empty user name."); + + final String ctxUser = userStackThreadLocal.get(); + + if (F.eq(ctxUser, user)) + return clbl.call(); // correct context is already there + + userStackThreadLocal.set(user); + + try { + return clbl.call(); + } + finally { + userStackThreadLocal.set(ctxUser); + } + } + + /** + * Gets the current context user. + * If this method is invoked outside of any {@link #doAs(String, IgniteOutClosure)} on the call stack, it will + * return null. Otherwise it will return the user name set in the most lower + * {@link #doAs(String, IgniteOutClosure)} call on the call stack. + * @return The current user, may be null. + */ + @Nullable public static String currentUser() { + return userStackThreadLocal.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8455c7a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java new file mode 100644 index 0000000..71b38c4 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.future.*; +import org.jsr166.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.*; + +/** + * Maps values by keys. + * Values are created lazily using {@link ValueFactory}. + * + * Despite of the name, does not depend on any Hadoop classes. + */ +public class HadoopLazyConcurrentMap { + /** The map storing the actual values. */ + private final ConcurrentMap map = new ConcurrentHashMap8<>(); + + /** The factory passed in by the client. Will be used for lazy value creation. */ + private final ValueFactory factory; + + /** Lock used to close the objects. */ + private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); + + /** Flag indicating that this map is closed and cleared. */ + private boolean closed; + + /** + * Constructor. + * @param factory the factory to create new values lazily. + */ + public HadoopLazyConcurrentMap(ValueFactory factory) { + this.factory = factory; + } + + /** + * Gets cached or creates a new value of V. + * Never returns null. + * @param k the key to associate the value with. + * @return the cached or newly created value, never null. + * @throws IgniteException on error + */ + public V getOrCreate(K k) { + ValueWrapper w = map.get(k); + + if (w == null) { + closeLock.readLock().lock(); + + try { + if (closed) + throw new IllegalStateException("Failed to create value for key [" + k + + "]: the map is already closed."); + + final ValueWrapper wNew = new ValueWrapper(k); + + w = map.putIfAbsent(k, wNew); + + if (w == null) { + wNew.init(); + + w = wNew; + } + } + finally { + closeLock.readLock().unlock(); + } + } + + try { + V v = w.getValue(); + + assert v != null; + + return v; + } + catch (IgniteCheckedException ie) { + throw new IgniteException(ie); + } + } + + /** + * Clears the map and closes all the values. + */ + public void close() throws IgniteCheckedException { + closeLock.writeLock().lock(); + + try { + closed = true; + + Exception err = null; + + Set keySet = map.keySet(); + + for (K key : keySet) { + V v = null; + + try { + v = map.get(key).getValue(); + } + catch (IgniteCheckedException ignore) { + // No-op. + } + + if (v != null) { + try { + v.close(); + } + catch (Exception err0) { + if (err == null) + err = err0; + } + } + } + + map.clear(); + + if (err != null) + throw new IgniteCheckedException(err); + } + finally { + closeLock.writeLock().unlock(); + } + } + + /** + * Helper class that drives the lazy value creation. + */ + private class ValueWrapper { + /** Future. */ + private final GridFutureAdapter fut = new GridFutureAdapter<>(); + + /** the key */ + private final K key; + + /** + * Creates new wrapper. + */ + private ValueWrapper(K key) { + this.key = key; + } + + /** + * Initializes the value using the factory. + */ + private void init() { + try { + final V v0 = factory.createValue(key); + + if (v0 == null) + throw new IgniteException("Failed to create non-null value. [key=" + key + ']'); + + fut.onDone(v0); + } + catch (Throwable e) { + fut.onDone(e); + } + } + + /** + * Gets the available value or blocks until the value is initialized. + * @return the value, never null. + * @throws IgniteCheckedException on error. + */ + V getValue() throws IgniteCheckedException { + return fut.get(); + } + } + + /** + * Interface representing the factory that creates map values. + * @param the type of the key. + * @param the type of the value. + */ + public interface ValueFactory { + /** + * Creates the new value. Should never return null. + * + * @param key the key to create value for + * @return the value. + * @throws IgniteException on failure. + */ + public V createValue(K key); + } +} \ No newline at end of file