Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1D9A7200BAB for ; Fri, 16 Sep 2016 13:20:53 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1C29E160ADB; Fri, 16 Sep 2016 11:20:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2033C160B04 for ; Fri, 16 Sep 2016 13:20:47 +0200 (CEST) Received: (qmail 41319 invoked by uid 500); 16 Sep 2016 11:20:47 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 40205 invoked by uid 99); 16 Sep 2016 11:20:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Sep 2016 11:20:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 42410E056F; Fri, 16 Sep 2016 11:20:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Fri, 16 Sep 2016 11:21:27 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [44/51] [partial] ignite git commit: IGNITE-3916: Initial impl. archived-at: Fri, 16 Sep 2016 11:20:53 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java new file mode 100644 index 0000000..9902142 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java @@ -0,0 +1,524 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsException; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsPathSummary; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.igfs.common.IgfsControlResponse; +import org.apache.ignite.internal.igfs.common.IgfsHandshakeRequest; +import org.apache.ignite.internal.igfs.common.IgfsMessage; +import org.apache.ignite.internal.igfs.common.IgfsPathControlRequest; +import org.apache.ignite.internal.igfs.common.IgfsStatusRequest; +import org.apache.ignite.internal.igfs.common.IgfsStreamControlRequest; +import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; +import org.apache.ignite.internal.processors.igfs.IgfsInputStreamDescriptor; +import org.apache.ignite.internal.processors.igfs.IgfsStatus; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.lang.GridClosureException; +import org.apache.ignite.lang.IgniteClosure; +import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentHashMap8; + +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.AFFINITY; +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.CLOSE; +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.DELETE; +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.INFO; +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.LIST_FILES; +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.LIST_PATHS; +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.MAKE_DIRECTORIES; +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_APPEND; +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_CREATE; +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_READ; +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.PATH_SUMMARY; +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.READ_BLOCK; +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.RENAME; +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.SET_TIMES; +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.UPDATE; +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.WRITE_BLOCK; + +/** + * Communication with external process (TCP or shmem). + */ +public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener { + /** Expected result is boolean. */ + private static final IgniteClosure, Boolean> BOOL_RES = createClosure(); + + /** Expected result is boolean. */ + private static final IgniteClosure, Long> LONG_RES = createClosure(); + + /** Expected result is {@code IgfsFile}. */ + private static final IgniteClosure, IgfsFile> FILE_RES = createClosure(); + + /** Expected result is {@code IgfsHandshakeResponse} */ + private static final IgniteClosure, + IgfsHandshakeResponse> HANDSHAKE_RES = createClosure(); + + /** Expected result is {@code IgfsStatus} */ + private static final IgniteClosure, IgfsStatus> STATUS_RES = + createClosure(); + + /** Expected result is {@code IgfsFile}. */ + private static final IgniteClosure, + IgfsInputStreamDescriptor> STREAM_DESCRIPTOR_RES = createClosure(); + + /** Expected result is {@code IgfsFile}. */ + private static final IgniteClosure, + Collection> FILE_COL_RES = createClosure(); + + /** Expected result is {@code IgfsFile}. */ + private static final IgniteClosure, + Collection> PATH_COL_RES = createClosure(); + + /** Expected result is {@code IgfsPathSummary}. */ + private static final IgniteClosure, IgfsPathSummary> SUMMARY_RES = + createClosure(); + + /** Expected result is {@code IgfsFile}. */ + private static final IgniteClosure, + Collection> BLOCK_LOCATION_COL_RES = createClosure(); + + /** Grid name. */ + private final String grid; + + /** IGFS name. */ + private final String igfs; + + /** The user this out proc is performing on behalf of. */ + private final String userName; + + /** Client log. */ + private final Log log; + + /** Client IO. */ + private final HadoopIgfsIpcIo io; + + /** Event listeners. */ + private final Map lsnrs = new ConcurrentHashMap8<>(); + + /** + * Constructor for TCP endpoint. + * + * @param host Host. + * @param port Port. + * @param grid Grid name. + * @param igfs IGFS name. + * @param log Client logger. + * @throws IOException If failed. + */ + public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log, String user) throws IOException { + this(host, port, grid, igfs, false, log, user); + } + + /** + * Constructor for shmem endpoint. + * + * @param port Port. + * @param grid Grid name. + * @param igfs IGFS name. + * @param log Client logger. + * @throws IOException If failed. + */ + public HadoopIgfsOutProc(int port, String grid, String igfs, Log log, String user) throws IOException { + this(null, port, grid, igfs, true, log, user); + } + + /** + * Constructor. + * + * @param host Host. + * @param port Port. + * @param grid Grid name. + * @param igfs IGFS name. + * @param shmem Shared memory flag. + * @param log Client logger. + * @throws IOException If failed. + */ + private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log, String user) + throws IOException { + assert host != null && !shmem || host == null && shmem : + "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']'; + + String endpoint = host != null ? host + ":" + port : "shmem:" + port; + + this.grid = grid; + this.igfs = igfs; + this.log = log; + this.userName = IgfsUtils.fixUserName(user); + + io = HadoopIgfsIpcIo.get(log, endpoint); + + io.addEventListener(this); + } + + /** {@inheritDoc} */ + @Override public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException { + final IgfsHandshakeRequest req = new IgfsHandshakeRequest(); + + req.gridName(grid); + req.igfsName(igfs); + req.logDirectory(logDir); + + return io.send(req).chain(HANDSHAKE_RES).get(); + } + + /** {@inheritDoc} */ + @Override public void close(boolean force) { + assert io != null; + + io.removeEventListener(this); + + if (force) + io.forceClose(); + else + io.release(); + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(INFO); + msg.path(path); + msg.userName(userName); + + return io.send(msg).chain(FILE_RES).get(); + } + + /** {@inheritDoc} */ + @Override public IgfsFile update(IgfsPath path, Map props) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(UPDATE); + msg.path(path); + msg.properties(props); + msg.userName(userName); + + return io.send(msg).chain(FILE_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(SET_TIMES); + msg.path(path); + msg.accessTime(accessTime); + msg.modificationTime(modificationTime); + msg.userName(userName); + + return io.send(msg).chain(BOOL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(RENAME); + msg.path(src); + msg.destinationPath(dest); + msg.userName(userName); + + return io.send(msg).chain(BOOL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(DELETE); + msg.path(path); + msg.flag(recursive); + msg.userName(userName); + + return io.send(msg).chain(BOOL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Collection affinity(IgfsPath path, long start, long len) + throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(AFFINITY); + msg.path(path); + msg.start(start); + msg.length(len); + msg.userName(userName); + + return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(PATH_SUMMARY); + msg.path(path); + msg.userName(userName); + + return io.send(msg).chain(SUMMARY_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Boolean mkdirs(IgfsPath path, Map props) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(MAKE_DIRECTORIES); + msg.path(path); + msg.properties(props); + msg.userName(userName); + + return io.send(msg).chain(BOOL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Collection listFiles(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(LIST_FILES); + msg.path(path); + msg.userName(userName); + + return io.send(msg).chain(FILE_COL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Collection listPaths(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(LIST_PATHS); + msg.path(path); + msg.userName(userName); + + return io.send(msg).chain(PATH_COL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public IgfsStatus fsStatus() throws IgniteCheckedException { + return io.send(new IgfsStatusRequest()).chain(STATUS_RES).get(); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(OPEN_READ); + msg.path(path); + msg.flag(false); + msg.userName(userName); + + IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); + + return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length()); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate open(IgfsPath path, + int seqReadsBeforePrefetch) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(OPEN_READ); + msg.path(path); + msg.flag(true); + msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch); + msg.userName(userName); + + IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); + + return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length()); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, + int replication, long blockSize, @Nullable Map props) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(OPEN_CREATE); + msg.path(path); + msg.flag(overwrite); + msg.colocate(colocate); + msg.properties(props); + msg.replication(replication); + msg.blockSize(blockSize); + msg.userName(userName); + + Long streamId = io.send(msg).chain(LONG_RES).get(); + + return new HadoopIgfsStreamDelegate(this, streamId); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create, + @Nullable Map props) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(OPEN_APPEND); + msg.path(path); + msg.flag(create); + msg.properties(props); + msg.userName(userName); + + Long streamId = io.send(msg).chain(LONG_RES).get(); + + return new HadoopIgfsStreamDelegate(this, streamId); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture readData(HadoopIgfsStreamDelegate desc, long pos, int len, + final @Nullable byte[] outBuf, final int outOff, final int outLen) { + assert len > 0; + + final IgfsStreamControlRequest msg = new IgfsStreamControlRequest(); + + msg.command(READ_BLOCK); + msg.streamId((long) desc.target()); + msg.position(pos); + msg.length(len); + + try { + return io.send(msg, outBuf, outOff, outLen); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + } + + /** {@inheritDoc} */ + @Override public void writeData(HadoopIgfsStreamDelegate desc, byte[] data, int off, int len) + throws IOException { + final IgfsStreamControlRequest msg = new IgfsStreamControlRequest(); + + msg.command(WRITE_BLOCK); + msg.streamId((long) desc.target()); + msg.data(data); + msg.position(off); + msg.length(len); + + try { + io.sendPlain(msg); + } + catch (IgniteCheckedException e) { + throw HadoopIgfsUtils.cast(e); + } + } + + /** {@inheritDoc} */ + @Override public void flush(HadoopIgfsStreamDelegate delegate) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws IOException { + final IgfsStreamControlRequest msg = new IgfsStreamControlRequest(); + + msg.command(CLOSE); + msg.streamId((long)desc.target()); + + try { + io.send(msg).chain(BOOL_RES).get(); + } + catch (IgniteCheckedException e) { + throw HadoopIgfsUtils.cast(e); + } + } + + /** {@inheritDoc} */ + @Override public void addEventListener(HadoopIgfsStreamDelegate desc, + HadoopIgfsStreamEventListener lsnr) { + long streamId = desc.target(); + + HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(streamId, lsnr); + + assert lsnr0 == null || lsnr0 == lsnr; + + if (log.isDebugEnabled()) + log.debug("Added stream event listener [streamId=" + streamId + ']'); + } + + /** {@inheritDoc} */ + @Override public void removeEventListener(HadoopIgfsStreamDelegate desc) { + long streamId = desc.target(); + + HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(streamId); + + if (lsnr0 != null && log.isDebugEnabled()) + log.debug("Removed stream event listener [streamId=" + streamId + ']'); + } + + /** {@inheritDoc} */ + @Override public void onClose() { + for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) { + try { + lsnr.onClose(); + } + catch (IgniteCheckedException e) { + log.warn("Got exception from stream event listener (will ignore): " + lsnr, e); + } + } + } + + /** {@inheritDoc} */ + @Override public void onError(long streamId, String errMsg) { + HadoopIgfsStreamEventListener lsnr = lsnrs.get(streamId); + + if (lsnr != null) + lsnr.onError(errMsg); + else + log.warn("Received write error response for not registered output stream (will ignore) " + + "[streamId= " + streamId + ']'); + } + + /** + * Creates conversion closure for given type. + * + * @param Type of expected result. + * @return Conversion closure. + */ + @SuppressWarnings("unchecked") + private static IgniteClosure, T> createClosure() { + return new IgniteClosure, T>() { + @Override public T apply(IgniteInternalFuture fut) { + try { + IgfsControlResponse res = (IgfsControlResponse)fut.get(); + + if (res.hasError()) + res.throwError(); + + return (T)res.response(); + } + catch (IgfsException | IgniteCheckedException e) { + throw new GridClosureException(e); + } + } + }; + } + + /** {@inheritDoc} */ + @Override public String user() { + return userName; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java new file mode 100644 index 0000000..8f7458b --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutputStream.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import java.io.IOException; +import java.io.OutputStream; +import org.apache.commons.logging.Log; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.igfs.common.IgfsLogger; +import org.jetbrains.annotations.NotNull; + +/** + * IGFS Hadoop output stream implementation. + */ +public class HadoopIgfsOutputStream extends OutputStream implements HadoopIgfsStreamEventListener { + /** Log instance. */ + private Log log; + + /** Client logger. */ + private IgfsLogger clientLog; + + /** Log stream ID. */ + private long logStreamId; + + /** Server stream delegate. */ + private HadoopIgfsStreamDelegate delegate; + + /** Closed flag. */ + private volatile boolean closed; + + /** Flag set if stream was closed due to connection breakage. */ + private boolean connBroken; + + /** Error message. */ + private volatile String errMsg; + + /** Read time. */ + private long writeTime; + + /** User time. */ + private long userTime; + + /** Last timestamp. */ + private long lastTs; + + /** Amount of written bytes. */ + private long total; + + /** + * Creates light output stream. + * + * @param delegate Server stream delegate. + * @param log Logger to use. + * @param clientLog Client logger. + */ + public HadoopIgfsOutputStream(HadoopIgfsStreamDelegate delegate, Log log, + IgfsLogger clientLog, long logStreamId) { + this.delegate = delegate; + this.log = log; + this.clientLog = clientLog; + this.logStreamId = logStreamId; + + lastTs = System.nanoTime(); + + delegate.hadoop().addEventListener(delegate, this); + } + + /** + * Read start. + */ + private void writeStart() { + long now = System.nanoTime(); + + userTime += now - lastTs; + + lastTs = now; + } + + /** + * Read end. + */ + private void writeEnd() { + long now = System.nanoTime(); + + writeTime += now - lastTs; + + lastTs = now; + } + + /** {@inheritDoc} */ + @Override public void write(@NotNull byte[] b, int off, int len) throws IOException { + check(); + + writeStart(); + + try { + delegate.hadoop().writeData(delegate, b, off, len); + + total += len; + } + finally { + writeEnd(); + } + } + + /** {@inheritDoc} */ + @Override public void write(int b) throws IOException { + write(new byte[] {(byte)b}); + + total++; + } + + /** {@inheritDoc} */ + @Override public void flush() throws IOException { + delegate.hadoop().flush(delegate); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + if (!closed) { + if (log.isDebugEnabled()) + log.debug("Closing output stream: " + delegate); + + writeStart(); + + delegate.hadoop().closeStream(delegate); + + markClosed(false); + + writeEnd(); + + if (clientLog.isLogEnabled()) + clientLog.logCloseOut(logStreamId, userTime, writeTime, total); + + if (log.isDebugEnabled()) + log.debug("Closed output stream [delegate=" + delegate + ", writeTime=" + writeTime / 1000 + + ", userTime=" + userTime / 1000 + ']'); + } + else if(connBroken) + throw new IOException( + "Failed to close stream, because connection was broken (data could have been lost)."); + } + + /** + * Marks stream as closed. + * + * @param connBroken {@code True} if connection with server was lost. + */ + private void markClosed(boolean connBroken) { + // It is ok to have race here. + if (!closed) { + closed = true; + + delegate.hadoop().removeEventListener(delegate); + + this.connBroken = connBroken; + } + } + + /** + * @throws IOException If check failed. + */ + private void check() throws IOException { + String errMsg0 = errMsg; + + if (errMsg0 != null) + throw new IOException(errMsg0); + + if (closed) { + if (connBroken) + throw new IOException("Server connection was lost."); + else + throw new IOException("Stream is closed."); + } + } + + /** {@inheritDoc} */ + @Override public void onClose() throws IgniteCheckedException { + markClosed(true); + } + + /** {@inheritDoc} */ + @Override public void onError(String errMsg) { + this.errMsg = errMsg; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java new file mode 100644 index 0000000..90f6bca --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProperties.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import java.util.Map; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; + +/** + * Hadoop file system properties. + */ +public class HadoopIgfsProperties { + /** Username. */ + private String usrName; + + /** Group name. */ + private String grpName; + + /** Permissions. */ + private FsPermission perm; + + /** + * Constructor. + * + * @param props Properties. + * @throws IgniteException In case of error. + */ + public HadoopIgfsProperties(Map props) throws IgniteException { + usrName = props.get(IgfsUtils.PROP_USER_NAME); + grpName = props.get(IgfsUtils.PROP_GROUP_NAME); + + String permStr = props.get(IgfsUtils.PROP_PERMISSION); + + if (permStr != null) { + try { + perm = new FsPermission((short)Integer.parseInt(permStr, 8)); + } + catch (NumberFormatException ignore) { + throw new IgniteException("Permissions cannot be parsed: " + permStr); + } + } + } + + /** + * Get user name. + * + * @return User name. + */ + public String userName() { + return usrName; + } + + /** + * Get group name. + * + * @return Group name. + */ + public String groupName() { + return grpName; + } + + /** + * Get permission. + * + * @return Permission. + */ + public FsPermission permission() { + return perm; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java new file mode 100644 index 0000000..5cee947 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyInputStream.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import java.io.IOException; +import java.io.InputStream; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; +import org.apache.ignite.internal.igfs.common.IgfsLogger; + +/** + * Secondary Hadoop file system input stream wrapper. + */ +public class HadoopIgfsProxyInputStream extends InputStream implements Seekable, PositionedReadable { + /** Actual input stream to the secondary file system. */ + private final FSDataInputStream is; + + /** Client logger. */ + private final IgfsLogger clientLog; + + /** Log stream ID. */ + private final long logStreamId; + + /** Read time. */ + private long readTime; + + /** User time. */ + private long userTime; + + /** Last timestamp. */ + private long lastTs; + + /** Amount of read bytes. */ + private long total; + + /** Closed flag. */ + private boolean closed; + + /** + * Constructor. + * + * @param is Actual input stream to the secondary file system. + * @param clientLog Client log. + */ + public HadoopIgfsProxyInputStream(FSDataInputStream is, IgfsLogger clientLog, long logStreamId) { + assert is != null; + assert clientLog != null; + + this.is = is; + this.clientLog = clientLog; + this.logStreamId = logStreamId; + + lastTs = System.nanoTime(); + } + + /** {@inheritDoc} */ + @Override public synchronized int read(byte[] b) throws IOException { + readStart(); + + int res; + + try { + res = is.read(b); + } + finally { + readEnd(); + } + + if (res != -1) + total += res; + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized int read(byte[] b, int off, int len) throws IOException { + readStart(); + + int res; + + try { + res = super.read(b, off, len); + } + finally { + readEnd(); + } + + if (res != -1) + total += res; + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized long skip(long n) throws IOException { + readStart(); + + long res; + + try { + res = is.skip(n); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logSkip(logStreamId, res); + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized int available() throws IOException { + readStart(); + + try { + return is.available(); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + if (!closed) { + closed = true; + + readStart(); + + try { + is.close(); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logCloseIn(logStreamId, userTime, readTime, total); + } + } + + /** {@inheritDoc} */ + @Override public synchronized void mark(int readLimit) { + readStart(); + + try { + is.mark(readLimit); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logMark(logStreamId, readLimit); + } + + /** {@inheritDoc} */ + @Override public synchronized void reset() throws IOException { + readStart(); + + try { + is.reset(); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logReset(logStreamId); + } + + /** {@inheritDoc} */ + @Override public synchronized boolean markSupported() { + readStart(); + + try { + return is.markSupported(); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized int read() throws IOException { + readStart(); + + int res; + + try { + res = is.read(); + } + finally { + readEnd(); + } + + if (res != -1) + total++; + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException { + readStart(); + + int res; + + try { + res = is.read(pos, buf, off, len); + } + finally { + readEnd(); + } + + if (res != -1) + total += res; + + if (clientLog.isLogEnabled()) + clientLog.logRandomRead(logStreamId, pos, res); + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized void readFully(long pos, byte[] buf, int off, int len) throws IOException { + readStart(); + + try { + is.readFully(pos, buf, off, len); + } + finally { + readEnd(); + } + + total += len; + + if (clientLog.isLogEnabled()) + clientLog.logRandomRead(logStreamId, pos, len); + } + + /** {@inheritDoc} */ + @Override public synchronized void readFully(long pos, byte[] buf) throws IOException { + readStart(); + + try { + is.readFully(pos, buf); + } + finally { + readEnd(); + } + + total += buf.length; + + if (clientLog.isLogEnabled()) + clientLog.logRandomRead(logStreamId, pos, buf.length); + } + + /** {@inheritDoc} */ + @Override public synchronized void seek(long pos) throws IOException { + readStart(); + + try { + is.seek(pos); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logSeek(logStreamId, pos); + } + + /** {@inheritDoc} */ + @Override public synchronized long getPos() throws IOException { + readStart(); + + try { + return is.getPos(); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized boolean seekToNewSource(long targetPos) throws IOException { + readStart(); + + try { + return is.seekToNewSource(targetPos); + } + finally { + readEnd(); + } + } + + /** + * Read start. + */ + private void readStart() { + long now = System.nanoTime(); + + userTime += now - lastTs; + + lastTs = now; + } + + /** + * Read end. + */ + private void readEnd() { + long now = System.nanoTime(); + + readTime += now - lastTs; + + lastTs = now; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java new file mode 100644 index 0000000..eade0f0 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsProxyOutputStream.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import java.io.IOException; +import java.io.OutputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.ignite.internal.igfs.common.IgfsLogger; + +/** + * Secondary Hadoop file system output stream wrapper. + */ +public class HadoopIgfsProxyOutputStream extends OutputStream { + /** Actual output stream. */ + private FSDataOutputStream os; + + /** Client logger. */ + private final IgfsLogger clientLog; + + /** Log stream ID. */ + private final long logStreamId; + + /** Read time. */ + private long writeTime; + + /** User time. */ + private long userTime; + + /** Last timestamp. */ + private long lastTs; + + /** Amount of written bytes. */ + private long total; + + /** Closed flag. */ + private boolean closed; + + /** + * Constructor. + * + * @param os Actual output stream. + * @param clientLog Client logger. + * @param logStreamId Log stream ID. + */ + public HadoopIgfsProxyOutputStream(FSDataOutputStream os, IgfsLogger clientLog, long logStreamId) { + assert os != null; + assert clientLog != null; + + this.os = os; + this.clientLog = clientLog; + this.logStreamId = logStreamId; + + lastTs = System.nanoTime(); + } + + /** {@inheritDoc} */ + @Override public synchronized void write(int b) throws IOException { + writeStart(); + + try { + os.write(b); + } + finally { + writeEnd(); + } + + total++; + } + + /** {@inheritDoc} */ + @Override public synchronized void write(byte[] b) throws IOException { + writeStart(); + + try { + os.write(b); + } + finally { + writeEnd(); + } + + total += b.length; + } + + /** {@inheritDoc} */ + @Override public synchronized void write(byte[] b, int off, int len) throws IOException { + writeStart(); + + try { + os.write(b, off, len); + } + finally { + writeEnd(); + } + + total += len; + } + + /** {@inheritDoc} */ + @Override public synchronized void flush() throws IOException { + writeStart(); + + try { + os.flush(); + } + finally { + writeEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + if (!closed) { + closed = true; + + writeStart(); + + try { + os.close(); + } + finally { + writeEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logCloseOut(logStreamId, userTime, writeTime, total); + } + } + + /** + * Read start. + */ + private void writeStart() { + long now = System.nanoTime(); + + userTime += now - lastTs; + + lastTs = now; + } + + /** + * Read end. + */ + private void writeEnd() { + long now = System.nanoTime(); + + writeTime += now - lastTs; + + lastTs = now; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java new file mode 100644 index 0000000..a0577ce --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsSecondaryFileSystemPositionedReadable.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import java.io.IOException; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Secondary file system input stream wrapper which actually opens input stream only in case it is explicitly + * requested. + *

+ * The class is expected to be used only from synchronized context and therefore is not tread-safe. + */ +public class HadoopIgfsSecondaryFileSystemPositionedReadable implements IgfsSecondaryFileSystemPositionedReadable { + /** Secondary file system. */ + private final FileSystem fs; + + /** Path to the file to open. */ + private final Path path; + + /** Buffer size. */ + private final int bufSize; + + /** Actual input stream. */ + private FSDataInputStream in; + + /** Cached error occurred during output stream open. */ + private IOException err; + + /** Flag indicating that the stream was already opened. */ + private boolean opened; + + /** + * Constructor. + * + * @param fs Secondary file system. + * @param path Path to the file to open. + * @param bufSize Buffer size. + */ + public HadoopIgfsSecondaryFileSystemPositionedReadable(FileSystem fs, Path path, int bufSize) { + assert fs != null; + assert path != null; + + this.fs = fs; + this.path = path; + this.bufSize = bufSize; + } + + /** Get input stream. */ + private PositionedReadable in() throws IOException { + if (opened) { + if (err != null) + throw err; + } + else { + opened = true; + + try { + in = fs.open(path, bufSize); + + if (in == null) + throw new IOException("Failed to open input stream (file system returned null): " + path); + } + catch (IOException e) { + err = e; + + throw err; + } + } + + return in; + } + + /** + * Close wrapped input stream in case it was previously opened. + */ + @Override public void close() { + U.closeQuiet(in); + } + + /** {@inheritDoc} */ + @Override public int read(long pos, byte[] buf, int off, int len) throws IOException { + return in().read(pos, buf, off, len); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java new file mode 100644 index 0000000..37b58ab --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamDelegate.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * IGFS Hadoop stream descriptor. + */ +public class HadoopIgfsStreamDelegate { + /** RPC handler. */ + private final HadoopIgfsEx hadoop; + + /** Target. */ + private final Object target; + + /** Optional stream length. */ + private final long len; + + /** + * Constructor. + * + * @param target Target. + */ + public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target) { + this(hadoop, target, -1); + } + + /** + * Constructor. + * + * @param target Target. + * @param len Optional length. + */ + public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target, long len) { + assert hadoop != null; + assert target != null; + + this.hadoop = hadoop; + this.target = target; + this.len = len; + } + + /** + * @return RPC handler. + */ + public HadoopIgfsEx hadoop() { + return hadoop; + } + + /** + * @return Stream target. + */ + @SuppressWarnings("unchecked") + public T target() { + return (T) target; + } + + /** + * @return Length. + */ + public long length() { + return len; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return System.identityHashCode(target); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj != null && obj instanceof HadoopIgfsStreamDelegate && + target == ((HadoopIgfsStreamDelegate)obj).target; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopIgfsStreamDelegate.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java new file mode 100644 index 0000000..d81f765 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsStreamEventListener.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.ignite.IgniteCheckedException; + +/** + * IGFS input stream event listener. + */ +public interface HadoopIgfsStreamEventListener { + /** + * Callback invoked when the stream is being closed. + * + * @throws IgniteCheckedException If failed. + */ + public void onClose() throws IgniteCheckedException; + + /** + * Callback invoked when remote error occurs. + * + * @param errMsg Error message. + */ + public void onError(String errMsg); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java new file mode 100644 index 0000000..fa5cbc5 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.AbstractFileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathExistsException; +import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException; +import org.apache.ignite.igfs.IgfsParentNotDirectoryException; +import org.apache.ignite.igfs.IgfsPathAlreadyExistsException; +import org.apache.ignite.igfs.IgfsPathNotFoundException; +import org.jetbrains.annotations.Nullable; + +/** + * Utility constants and methods for IGFS Hadoop file system. + */ +public class HadoopIgfsUtils { + /** Parameter name for endpoint no embed mode flag. */ + public static final String PARAM_IGFS_ENDPOINT_NO_EMBED = "fs.igfs.%s.endpoint.no_embed"; + + /** Parameter name for endpoint no shared memory flag. */ + public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM = "fs.igfs.%s.endpoint.no_local_shmem"; + + /** Parameter name for endpoint no local TCP flag. */ + public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP = "fs.igfs.%s.endpoint.no_local_tcp"; + + /** + * Get string parameter. + * + * @param cfg Configuration. + * @param name Parameter name. + * @param authority Authority. + * @param dflt Default value. + * @return String value. + */ + public static String parameter(Configuration cfg, String name, String authority, String dflt) { + return cfg.get(String.format(name, authority != null ? authority : ""), dflt); + } + + /** + * Get integer parameter. + * + * @param cfg Configuration. + * @param name Parameter name. + * @param authority Authority. + * @param dflt Default value. + * @return Integer value. + * @throws IOException In case of parse exception. + */ + public static int parameter(Configuration cfg, String name, String authority, int dflt) throws IOException { + String name0 = String.format(name, authority != null ? authority : ""); + + try { + return cfg.getInt(name0, dflt); + } + catch (NumberFormatException ignore) { + throw new IOException("Failed to parse parameter value to integer: " + name0); + } + } + + /** + * Get boolean parameter. + * + * @param cfg Configuration. + * @param name Parameter name. + * @param authority Authority. + * @param dflt Default value. + * @return Boolean value. + */ + public static boolean parameter(Configuration cfg, String name, String authority, boolean dflt) { + return cfg.getBoolean(String.format(name, authority != null ? authority : ""), dflt); + } + + /** + * Cast Ignite exception to appropriate IO exception. + * + * @param e Exception to cast. + * @return Casted exception. + */ + public static IOException cast(IgniteCheckedException e) { + return cast(e, null); + } + + /** + * Cast Ignite exception to appropriate IO exception. + * + * @param e Exception to cast. + * @param path Path for exceptions. + * @return Casted exception. + */ + @SuppressWarnings("unchecked") + public static IOException cast(IgniteCheckedException e, @Nullable String path) { + assert e != null; + + // First check for any nested IOException; if exists - re-throw it. + if (e.hasCause(IOException.class)) + return e.getCause(IOException.class); + else if (e.hasCause(IgfsPathNotFoundException.class)) + return new FileNotFoundException(path); // TODO: Or PathNotFoundException? + else if (e.hasCause(IgfsParentNotDirectoryException.class)) + return new ParentNotDirectoryException(path); + else if (path != null && e.hasCause(IgfsDirectoryNotEmptyException.class)) + return new PathIsNotEmptyDirectoryException(path); + else if (path != null && e.hasCause(IgfsPathAlreadyExistsException.class)) + return new PathExistsException(path); + else { + String msg = e.getMessage(); + + return msg == null ? new IOException(e) : new IOException(msg, e); + } + } + + /** + * Deletes all files from the given file system. + * + * @param fs The file system to clean up. + * @throws IOException On error. + */ + public static void clear(FileSystem fs) throws IOException { + // Delete root contents: + FileStatus[] statuses = fs.listStatus(new Path("/")); + + if (statuses != null) { + for (FileStatus stat: statuses) + fs.delete(stat.getPath(), true); + } + } + + /** + * Deletes all files from the given file system. + * + * @param fs The file system to clean up. + * @throws IOException On error. + */ + public static void clear(AbstractFileSystem fs) throws IOException { + // Delete root contents: + FileStatus[] statuses = fs.listStatus(new Path("/")); + + if (statuses != null) { + for (FileStatus stat: statuses) + fs.delete(stat.getPath(), true); + } + } + + /** + * Constructor. + */ + private HadoopIgfsUtils() { + // No-op. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java new file mode 100644 index 0000000..f4ee97f --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java @@ -0,0 +1,552 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.logging.Log; +import org.apache.hadoop.conf.Configuration; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.IgniteIllegalStateException; +import org.apache.ignite.Ignition; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsPathSummary; +import org.apache.ignite.internal.processors.igfs.IgfsEx; +import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; +import org.apache.ignite.internal.processors.igfs.IgfsStatus; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.SB; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.IgniteState.STARTED; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint.LOCALHOST; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_EMBED; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP; +import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter; + +/** + * Wrapper for IGFS server. + */ +public class HadoopIgfsWrapper implements HadoopIgfs { + /** Delegate. */ + private final AtomicReference delegateRef = new AtomicReference<>(); + + /** Authority. */ + private final String authority; + + /** Connection string. */ + private final HadoopIgfsEndpoint endpoint; + + /** Log directory. */ + private final String logDir; + + /** Configuration. */ + private final Configuration conf; + + /** Logger. */ + private final Log log; + + /** The user name this wrapper works on behalf of. */ + private final String userName; + + /** + * Constructor. + * + * @param authority Authority (connection string). + * @param logDir Log directory for server. + * @param conf Configuration. + * @param log Current logger. + */ + public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user) + throws IOException { + try { + this.authority = authority; + this.endpoint = new HadoopIgfsEndpoint(authority); + this.logDir = logDir; + this.conf = conf; + this.log = log; + this.userName = user; + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to parse endpoint: " + authority, e); + } + } + + /** {@inheritDoc} */ + @Override public IgfsHandshakeResponse handshake(String logDir) throws IOException { + return withReconnectHandling(new FileSystemClosure() { + @Override public IgfsHandshakeResponse apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) { + return hndResp; + } + }); + } + + /** {@inheritDoc} */ + @Override public void close(boolean force) { + Delegate delegate = delegateRef.get(); + + if (delegate != null && delegateRef.compareAndSet(delegate, null)) + delegate.close(force); + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure() { + @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.info(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public IgfsFile update(final IgfsPath path, final Map props) throws IOException { + return withReconnectHandling(new FileSystemClosure() { + @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.update(path, props); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) + throws IOException { + return withReconnectHandling(new FileSystemClosure() { + @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.setTimes(path, accessTime, modificationTime); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IOException { + return withReconnectHandling(new FileSystemClosure() { + @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.rename(src, dest); + } + }, src); + } + + /** {@inheritDoc} */ + @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IOException { + return withReconnectHandling(new FileSystemClosure() { + @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.delete(path, recursive); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Collection affinity(final IgfsPath path, final long start, + final long len) throws IOException { + return withReconnectHandling(new FileSystemClosure>() { + @Override public Collection apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.affinity(path, start, len); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure() { + @Override public IgfsPathSummary apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.contentSummary(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Boolean mkdirs(final IgfsPath path, final Map props) throws IOException { + return withReconnectHandling(new FileSystemClosure() { + @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.mkdirs(path, props); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Collection listFiles(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure>() { + @Override public Collection apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.listFiles(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public Collection listPaths(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure>() { + @Override public Collection apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.listPaths(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public IgfsStatus fsStatus() throws IOException { + return withReconnectHandling(new FileSystemClosure() { + @Override public IgfsStatus apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) + throws IgniteCheckedException, IOException { + return hadoop.fsStatus(); + } + }); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IOException { + return withReconnectHandling(new FileSystemClosure() { + @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.open(path); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch) + throws IOException { + return withReconnectHandling(new FileSystemClosure() { + @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.open(path, seqReadsBeforePrefetch); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, + final boolean colocate, final int replication, final long blockSize, @Nullable final Map props) + throws IOException { + return withReconnectHandling(new FileSystemClosure() { + @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.create(path, overwrite, colocate, replication, blockSize, props); + } + }, path); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create, + @Nullable final Map props) throws IOException { + return withReconnectHandling(new FileSystemClosure() { + @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop, + IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException { + return hadoop.append(path, create, props); + } + }, path); + } + + /** + * Execute closure which is not path-specific. + * + * @param clo Closure. + * @return Result. + * @throws IOException If failed. + */ + private T withReconnectHandling(FileSystemClosure clo) throws IOException { + return withReconnectHandling(clo, null); + } + + /** + * Execute closure. + * + * @param clo Closure. + * @param path Path for exceptions. + * @return Result. + * @throws IOException If failed. + */ + private T withReconnectHandling(final FileSystemClosure clo, @Nullable IgfsPath path) + throws IOException { + Exception err = null; + + for (int i = 0; i < 2; i++) { + Delegate curDelegate = null; + + boolean close = false; + boolean force = false; + + try { + curDelegate = delegate(); + + assert curDelegate != null; + + close = curDelegate.doomed; + + return clo.apply(curDelegate.hadoop, curDelegate.hndResp); + } + catch (HadoopIgfsCommunicationException e) { + if (curDelegate != null && !curDelegate.doomed) { + // Try getting rid fo faulty delegate ASAP. + delegateRef.compareAndSet(curDelegate, null); + + close = true; + force = true; + } + + if (log.isDebugEnabled()) + log.debug("Failed to send message to a server: " + e); + + err = e; + } + catch (IgniteCheckedException e) { + throw HadoopIgfsUtils.cast(e, path != null ? path.toString() : null); + } + finally { + if (close) { + assert curDelegate != null; + + curDelegate.close(force); + } + } + } + + List list = X.getThrowableList(err); + + Throwable cause = list.get(list.size() - 1); + + throw new IOException("Failed to communicate with IGFS: " + + (cause.getMessage() == null ? cause.toString() : cause.getMessage()), err); + } + + /** + * Get delegate creating it if needed. + * + * @return Delegate. + */ + private Delegate delegate() throws HadoopIgfsCommunicationException { + // These fields will contain possible exceptions from shmem and TCP endpoints. + Exception errShmem = null; + Exception errTcp = null; + + // 1. If delegate is set, return it immediately. + Delegate curDelegate = delegateRef.get(); + + if (curDelegate != null) + return curDelegate; + + // 2. Guess that we are in the same VM. + boolean skipInProc = parameter(conf, PARAM_IGFS_ENDPOINT_NO_EMBED, authority, false); + + if (!skipInProc) { + IgfsEx igfs = getIgfsEx(endpoint.grid(), endpoint.igfs()); + + if (igfs != null) { + HadoopIgfsEx hadoop = null; + + try { + hadoop = new HadoopIgfsInProc(igfs, log, userName); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof HadoopIgfsCommunicationException) + if (hadoop != null) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to in-process IGFS, fallback to IPC mode.", e); + } + } + } + + // 3. Try connecting using shmem. + boolean skipLocShmem = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false); + + if (curDelegate == null && !skipLocShmem && !U.isWindows()) { + HadoopIgfsEx hadoop = null; + + try { + hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof HadoopIgfsCommunicationException) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to IGFS using shared memory [port=" + endpoint.port() + ']', e); + + errShmem = e; + } + } + + // 4. Try local TCP connection. + boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority, false); + + if (curDelegate == null && !skipLocTcp) { + HadoopIgfsEx hadoop = null; + + try { + hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(), + log, userName); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof HadoopIgfsCommunicationException) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() + + ", port=" + endpoint.port() + ']', e); + + errTcp = e; + } + } + + // 5. Try remote TCP connection. + if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) { + HadoopIgfsEx hadoop = null; + + try { + hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), + log, userName); + + curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); + } + catch (IOException | IgniteCheckedException e) { + if (e instanceof HadoopIgfsCommunicationException) + hadoop.close(true); + + if (log.isDebugEnabled()) + log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() + + ", port=" + endpoint.port() + ']', e); + + errTcp = e; + } + } + + if (curDelegate != null) { + if (!delegateRef.compareAndSet(null, curDelegate)) + curDelegate.doomed = true; + + return curDelegate; + } + else { + SB errMsg = new SB("Failed to connect to IGFS [endpoint=igfs://" + authority + ", attempts=["); + + if (errShmem != null) + errMsg.a("[type=SHMEM, port=" + endpoint.port() + ", err=" + errShmem + "], "); + + errMsg.a("[type=TCP, host=" + endpoint.host() + ", port=" + endpoint.port() + ", err=" + errTcp + "]] "); + + errMsg.a("(ensure that IGFS is running and have IPC endpoint enabled; ensure that " + + "ignite-shmem-1.0.0.jar is in Hadoop classpath if you use shared memory endpoint)."); + + throw new HadoopIgfsCommunicationException(errMsg.toString()); + } + } + + /** + * File system operation closure. + */ + private static interface FileSystemClosure { + /** + * Call closure body. + * + * @param hadoop RPC handler. + * @param hndResp Handshake response. + * @return Result. + * @throws IgniteCheckedException If failed. + * @throws IOException If failed. + */ + public T apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException; + } + + /** + * Delegate. + */ + private static class Delegate { + /** RPC handler. */ + private final HadoopIgfsEx hadoop; + + /** Handshake request. */ + private final IgfsHandshakeResponse hndResp; + + /** Close guard. */ + private final AtomicBoolean closeGuard = new AtomicBoolean(); + + /** Whether this delegate must be closed at the end of the next invocation. */ + private boolean doomed; + + /** + * Constructor. + * + * @param hadoop Hadoop. + * @param hndResp Handshake response. + */ + private Delegate(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) { + this.hadoop = hadoop; + this.hndResp = hndResp; + } + + /** + * Close underlying RPC handler. + * + * @param force Force flag. + */ + private void close(boolean force) { + if (closeGuard.compareAndSet(false, true)) + hadoop.close(force); + } + } + + /** + * Helper method to find Igfs of the given name in the given Ignite instance. + * + * @param gridName The name of the grid to check. + * @param igfsName The name of Igfs. + * @return The file system instance, or null if not found. + */ + private static IgfsEx getIgfsEx(@Nullable String gridName, @Nullable String igfsName) { + if (Ignition.state(gridName) == STARTED) { + try { + for (IgniteFileSystem fs : Ignition.ignite(gridName).fileSystems()) { + if (F.eq(fs.name(), igfsName)) + return (IgfsEx)fs; + } + } + catch (IgniteIllegalStateException ignore) { + // May happen if the grid state has changed: + } + } + + return null; + } +} \ No newline at end of file