Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 31CD7175A9 for ; Tue, 3 Mar 2015 13:08:54 +0000 (UTC) Received: (qmail 43222 invoked by uid 500); 3 Mar 2015 13:08:54 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 43133 invoked by uid 500); 3 Mar 2015 13:08:54 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 43117 invoked by uid 99); 3 Mar 2015 13:08:54 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 13:08:54 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,FUZZY_CPILL,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 03 Mar 2015 13:08:38 +0000 Received: (qmail 41401 invoked by uid 99); 3 Mar 2015 13:08:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 13:08:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E0789E1080; Tue, 3 Mar 2015 13:08:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 03 Mar 2015 13:08:42 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [26/31] incubator-ignite git commit: # IGNITE-386: WIP on internal namings (4). X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIpcIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIpcIo.java deleted file mode 100644 index d07f34d..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIpcIo.java +++ /dev/null @@ -1,599 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.commons.logging.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.igfs.common.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.ipc.*; -import org.apache.ignite.internal.util.ipc.shmem.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; - -/** - * IO layer implementation based on blocking IPC streams. - */ -@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") -public class IgfsHadoopIpcIo implements IgfsHadoopIo { - /** Logger. */ - private Log log; - - /** Request futures map. */ - private ConcurrentMap reqMap = - new ConcurrentHashMap8<>(); - - /** Request ID counter. */ - private AtomicLong reqIdCnt = new AtomicLong(); - - /** Endpoint. */ - private IpcEndpoint endpoint; - - /** Endpoint output stream. */ - private IgfsDataOutputStream out; - - /** Protocol. */ - private final IgfsMarshaller marsh; - - /** Client reader thread. */ - private Thread reader; - - /** Lock for graceful shutdown. */ - private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); - - /** Stopping flag. */ - private volatile boolean stopping; - - /** Server endpoint address. */ - private final String endpointAddr; - - /** Number of open file system sessions. */ - private final AtomicInteger activeCnt = new AtomicInteger(1); - - /** Event listeners. */ - private final Collection lsnrs = - new GridConcurrentHashSet<>(); - - /** Cached connections. */ - private static final ConcurrentMap ipcCache = - new ConcurrentHashMap8<>(); - - /** Striped lock that prevents multiple instance creation in {@link #get(Log, String)}. */ - private static final GridStripedLock initLock = new GridStripedLock(32); - - /** - * @param endpointAddr Endpoint. - * @param marsh Protocol. - * @param log Logger to use. - */ - public IgfsHadoopIpcIo(String endpointAddr, IgfsMarshaller marsh, Log log) { - assert endpointAddr != null; - assert marsh != null; - - this.endpointAddr = endpointAddr; - this.marsh = marsh; - this.log = log; - } - - /** - * Returns a started and valid instance of this class - * for a given endpoint. - * - * @param log Logger to use for new instance. - * @param endpoint Endpoint string. - * @return New or existing cached instance, which is started and operational. - * @throws IOException If new instance was created but failed to start. - */ - public static IgfsHadoopIpcIo get(Log log, String endpoint) throws IOException { - while (true) { - IgfsHadoopIpcIo clientIo = ipcCache.get(endpoint); - - if (clientIo != null) { - if (clientIo.acquire()) - return clientIo; - else - // If concurrent close. - ipcCache.remove(endpoint, clientIo); - } - else { - Lock lock = initLock.getLock(endpoint); - - lock.lock(); - - try { - clientIo = ipcCache.get(endpoint); - - if (clientIo != null) { // Perform double check. - if (clientIo.acquire()) - return clientIo; - else - // If concurrent close. - ipcCache.remove(endpoint, clientIo); - } - - // Otherwise try creating a new one. - clientIo = new IgfsHadoopIpcIo(endpoint, new IgfsMarshaller(), log); - - try { - clientIo.start(); - } - catch (IgniteCheckedException e) { - throw new IOException(e.getMessage(), e); - } - - IgfsHadoopIpcIo old = ipcCache.putIfAbsent(endpoint, clientIo); - - // Put in exclusive lock. - assert old == null; - - return clientIo; - } - finally { - lock.unlock(); - } - } - } - } - - /** - * Increases usage count for this instance. - * - * @return {@code true} if usage count is greater than zero. - */ - private boolean acquire() { - while (true) { - int cnt = activeCnt.get(); - - if (cnt == 0) { - if (log.isDebugEnabled()) - log.debug("IPC IO not acquired (count was 0): " + this); - - return false; - } - - // Need to make sure that no-one decremented count in between. - if (activeCnt.compareAndSet(cnt, cnt + 1)) { - if (log.isDebugEnabled()) - log.debug("IPC IO acquired: " + this); - - return true; - } - } - } - - /** - * Releases this instance, decrementing usage count. - *

- * If usage count becomes zero, the instance is stopped - * and removed from cache. - */ - public void release() { - while (true) { - int cnt = activeCnt.get(); - - if (cnt == 0) { - if (log.isDebugEnabled()) - log.debug("IPC IO not released (count was 0): " + this); - - return; - } - - if (activeCnt.compareAndSet(cnt, cnt - 1)) { - if (cnt == 1) { - ipcCache.remove(endpointAddr, this); - - if (log.isDebugEnabled()) - log.debug("IPC IO stopping as unused: " + this); - - stop(); - } - else if (log.isDebugEnabled()) - log.debug("IPC IO released: " + this); - - return; - } - } - } - - /** - * Closes this IO instance, removing it from cache. - */ - public void forceClose() { - if (ipcCache.remove(endpointAddr, this)) - stop(); - } - - /** - * Starts the IO. - * - * @throws IgniteCheckedException If failed to connect the endpoint. - */ - private void start() throws IgniteCheckedException { - boolean success = false; - - try { - endpoint = IpcEndpointFactory.connectEndpoint( - endpointAddr, new GridLoggerProxy(new IgfsHadoopJclLogger(log), null, null, "")); - - out = new IgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream())); - - reader = new ReaderThread(); - - // Required for Hadoop 2.x - reader.setDaemon(true); - - reader.start(); - - success = true; - } - catch (IgniteCheckedException e) { - IpcOutOfSystemResourcesException resEx = e.getCause(IpcOutOfSystemResourcesException.class); - - if (resEx != null) - throw new IgniteCheckedException(IpcSharedMemoryServerEndpoint.OUT_OF_RESOURCES_MSG, resEx); - - throw e; - } - finally { - if (!success) - stop(); - } - } - - /** - * Shuts down the IO. No send requests will be accepted anymore, all pending futures will be failed. - * Close listeners will be invoked as if connection is closed by server. - */ - private void stop() { - close0(null); - - if (reader != null) { - try { - U.interrupt(reader); - U.join(reader); - - reader = null; - } - catch (IgniteInterruptedCheckedException ignored) { - Thread.currentThread().interrupt(); - - log.warn("Got interrupted while waiting for reader thread to shut down (will return)."); - } - } - } - - /** {@inheritDoc} */ - @Override public void addEventListener(IgfsHadoopIpcIoListener lsnr) { - if (!busyLock.readLock().tryLock()) { - lsnr.onClose(); - - return; - } - - boolean invokeNow = false; - - try { - invokeNow = stopping; - - if (!invokeNow) - lsnrs.add(lsnr); - } - finally { - busyLock.readLock().unlock(); - - if (invokeNow) - lsnr.onClose(); - } - } - - /** {@inheritDoc} */ - @Override public void removeEventListener(IgfsHadoopIpcIoListener lsnr) { - lsnrs.remove(lsnr); - } - - /** {@inheritDoc} */ - @Override public GridPlainFuture send(IgfsMessage msg) throws IgniteCheckedException { - return send(msg, null, 0, 0); - } - - /** {@inheritDoc} */ - @Override public GridPlainFuture send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, - int outLen) throws IgniteCheckedException { - assert outBuf == null || msg.command() == IgfsIpcCommand.READ_BLOCK; - - if (!busyLock.readLock().tryLock()) - throw new IgfsHadoopCommunicationException("Failed to send message (client is being concurrently " + - "closed)."); - - try { - if (stopping) - throw new IgfsHadoopCommunicationException("Failed to send message (client is being concurrently " + - "closed)."); - - long reqId = reqIdCnt.getAndIncrement(); - - IgfsHadoopFuture fut = new IgfsHadoopFuture<>(); - - fut.outputBuffer(outBuf); - fut.outputOffset(outOff); - fut.outputLength(outLen); - fut.read(msg.command() == IgfsIpcCommand.READ_BLOCK); - - IgfsHadoopFuture oldFut = reqMap.putIfAbsent(reqId, fut); - - assert oldFut == null; - - if (log.isDebugEnabled()) - log.debug("Sending IGFS message [reqId=" + reqId + ", msg=" + msg + ']'); - - byte[] hdr = IgfsMarshaller.createHeader(reqId, msg.command()); - - IgniteCheckedException err = null; - - try { - synchronized (this) { - marsh.marshall(msg, hdr, out); - - out.flush(); // Blocking operation + sometimes system call. - } - } - catch (IgniteCheckedException e) { - err = e; - } - catch (IOException e) { - err = new IgfsHadoopCommunicationException(e); - } - - if (err != null) { - reqMap.remove(reqId, fut); - - fut.onDone(err); - } - - return fut; - } - finally { - busyLock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public void sendPlain(IgfsMessage msg) throws IgniteCheckedException { - if (!busyLock.readLock().tryLock()) - throw new IgfsHadoopCommunicationException("Failed to send message (client is being " + - "concurrently closed)."); - - try { - if (stopping) - throw new IgfsHadoopCommunicationException("Failed to send message (client is being concurrently closed)."); - - assert msg.command() == IgfsIpcCommand.WRITE_BLOCK; - - IgfsStreamControlRequest req = (IgfsStreamControlRequest)msg; - - byte[] hdr = IgfsMarshaller.createHeader(-1, IgfsIpcCommand.WRITE_BLOCK); - - U.longToBytes(req.streamId(), hdr, 12); - U.intToBytes(req.length(), hdr, 20); - - synchronized (this) { - out.write(hdr); - out.write(req.data(), (int)req.position(), req.length()); - - out.flush(); - } - } - catch (IOException e) { - throw new IgfsHadoopCommunicationException(e); - } - finally { - busyLock.readLock().unlock(); - } - } - - /** - * Closes client but does not wait. - * - * @param err Error. - */ - private void close0(@Nullable Throwable err) { - busyLock.writeLock().lock(); - - try { - if (stopping) - return; - - stopping = true; - } - finally { - busyLock.writeLock().unlock(); - } - - if (err == null) - err = new IgniteCheckedException("Failed to perform request (connection was concurrently closed before response " + - "is received)."); - - // Clean up resources. - U.closeQuiet(out); - - if (endpoint != null) - endpoint.close(); - - // Unwind futures. We can safely iterate here because no more futures will be added. - Iterator it = reqMap.values().iterator(); - - while (it.hasNext()) { - IgfsHadoopFuture fut = it.next(); - - fut.onDone(err); - - it.remove(); - } - - for (IgfsHadoopIpcIoListener lsnr : lsnrs) - lsnr.onClose(); - } - - /** - * Do not extend {@code GridThread} to minimize class dependencies. - */ - private class ReaderThread extends Thread { - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void run() { - // Error to fail pending futures. - Throwable err = null; - - try { - InputStream in = endpoint.inputStream(); - - IgfsDataInputStream dis = new IgfsDataInputStream(in); - - byte[] hdr = new byte[IgfsMarshaller.HEADER_SIZE]; - byte[] msgHdr = new byte[IgfsControlResponse.RES_HEADER_SIZE]; - - while (!Thread.currentThread().isInterrupted()) { - dis.readFully(hdr); - - long reqId = U.bytesToLong(hdr, 0); - - // We don't wait for write responses, therefore reqId is -1. - if (reqId == -1) { - // We received a response which normally should not be sent. It must contain an error. - dis.readFully(msgHdr); - - assert msgHdr[4] != 0; - - String errMsg = dis.readUTF(); - - // Error code. - dis.readInt(); - - long streamId = dis.readLong(); - - for (IgfsHadoopIpcIoListener lsnr : lsnrs) - lsnr.onError(streamId, errMsg); - } - else { - IgfsHadoopFuture fut = reqMap.remove(reqId); - - if (fut == null) { - String msg = "Failed to read response from server: response closure is unavailable for " + - "requestId (will close connection):" + reqId; - - log.warn(msg); - - err = new IgniteCheckedException(msg); - - break; - } - else { - try { - IgfsIpcCommand cmd = IgfsIpcCommand.valueOf(U.bytesToInt(hdr, 8)); - - if (log.isDebugEnabled()) - log.debug("Received IGFS response [reqId=" + reqId + ", cmd=" + cmd + ']'); - - Object res = null; - - if (fut.read()) { - dis.readFully(msgHdr); - - boolean hasErr = msgHdr[4] != 0; - - if (hasErr) { - String errMsg = dis.readUTF(); - - // Error code. - Integer errCode = dis.readInt(); - - IgfsControlResponse.throwError(errCode, errMsg); - } - - int blockLen = U.bytesToInt(msgHdr, 5); - - int readLen = Math.min(blockLen, fut.outputLength()); - - if (readLen > 0) { - assert fut.outputBuffer() != null; - - dis.readFully(fut.outputBuffer(), fut.outputOffset(), readLen); - } - - if (readLen != blockLen) { - byte[] buf = new byte[blockLen - readLen]; - - dis.readFully(buf); - - res = buf; - } - } - else - res = marsh.unmarshall(cmd, hdr, dis); - - fut.onDone(res); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to apply response closure (will fail request future): " + - e.getMessage()); - - fut.onDone(e); - - err = e; - } - } - } - } - } - catch (EOFException ignored) { - err = new IgniteCheckedException("Failed to read response from server (connection was closed by remote peer)."); - } - catch (IOException e) { - if (!stopping) - log.error("Failed to read data (connection will be closed)", e); - - err = new IgfsHadoopCommunicationException(e); - } - catch (IgniteCheckedException e) { - if (!stopping) - log.error("Failed to obtain endpoint input stream (connection will be closed)", e); - - err = e; - } - finally { - close0(err); - } - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return getClass().getSimpleName() + " [endpointAddr=" + endpointAddr + ", activeCnt=" + activeCnt + - ", stopping=" + stopping + ']'; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIpcIoListener.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIpcIoListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIpcIoListener.java deleted file mode 100644 index ffc58ba..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIpcIoListener.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -/** - * Listens to the events of {@link IgfsHadoopIpcIo}. - */ -public interface IgfsHadoopIpcIoListener { - /** - * Callback invoked when the IO is being closed. - */ - public void onClose(); - - /** - * Callback invoked when remote error occurs. - * - * @param streamId Stream ID. - * @param errMsg Error message. - */ - public void onError(long streamId, String errMsg); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopJclLogger.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopJclLogger.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopJclLogger.java deleted file mode 100644 index e43d77a..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopJclLogger.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.commons.logging.*; -import org.apache.ignite.*; -import org.jetbrains.annotations.*; - -/** - * JCL logger wrapper for Hadoop. - */ -public class IgfsHadoopJclLogger implements IgniteLogger { - /** JCL implementation proxy. */ - private Log impl; - - /** - * Constructor. - * - * @param impl JCL implementation to use. - */ - IgfsHadoopJclLogger(Log impl) { - assert impl != null; - - this.impl = impl; - } - - /** {@inheritDoc} */ - @Override public IgniteLogger getLogger(Object ctgr) { - return new IgfsHadoopJclLogger(LogFactory.getLog( - ctgr instanceof Class ? ((Class)ctgr).getName() : String.valueOf(ctgr))); - } - - /** {@inheritDoc} */ - @Override public void trace(String msg) { - impl.trace(msg); - } - - /** {@inheritDoc} */ - @Override public void debug(String msg) { - impl.debug(msg); - } - - /** {@inheritDoc} */ - @Override public void info(String msg) { - impl.info(msg); - } - - /** {@inheritDoc} */ - @Override public void warning(String msg) { - impl.warn(msg); - } - - /** {@inheritDoc} */ - @Override public void warning(String msg, @Nullable Throwable e) { - impl.warn(msg, e); - } - - /** {@inheritDoc} */ - @Override public void error(String msg) { - impl.error(msg); - } - - /** {@inheritDoc} */ - @Override public boolean isQuiet() { - return !isInfoEnabled() && !isDebugEnabled(); - } - - /** {@inheritDoc} */ - @Override public void error(String msg, @Nullable Throwable e) { - impl.error(msg, e); - } - - /** {@inheritDoc} */ - @Override public boolean isTraceEnabled() { - return impl.isTraceEnabled(); - } - - /** {@inheritDoc} */ - @Override public boolean isDebugEnabled() { - return impl.isDebugEnabled(); - } - - /** {@inheritDoc} */ - @Override public boolean isInfoEnabled() { - return impl.isInfoEnabled(); - } - - /** {@inheritDoc} */ - @Nullable @Override public String fileName() { - return null; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "IgfsHadoopJclLogger [impl=" + impl + ']'; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopOutProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopOutProc.java deleted file mode 100644 index 31183a8..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopOutProc.java +++ /dev/null @@ -1,466 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.commons.logging.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.igfs.common.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.util.lang.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.*; - -/** - * Communication with external process (TCP or shmem). - */ -public class IgfsHadoopOutProc implements IgfsHadoopEx, IgfsHadoopIpcIoListener { - /** Expected result is boolean. */ - private static final GridPlainClosure, Boolean> BOOL_RES = createClosure(); - - /** Expected result is boolean. */ - private static final GridPlainClosure, Long> LONG_RES = createClosure(); - - /** Expected result is {@code IgfsFile}. */ - private static final GridPlainClosure, IgfsFile> FILE_RES = createClosure(); - - /** Expected result is {@code IgfsHandshakeResponse} */ - private static final GridPlainClosure, - IgfsHandshakeResponse> HANDSHAKE_RES = createClosure(); - - /** Expected result is {@code IgfsStatus} */ - private static final GridPlainClosure, IgfsStatus> STATUS_RES = - createClosure(); - - /** Expected result is {@code IgfsFile}. */ - private static final GridPlainClosure, - IgfsInputStreamDescriptor> STREAM_DESCRIPTOR_RES = createClosure(); - - /** Expected result is {@code IgfsFile}. */ - private static final GridPlainClosure, - Collection> FILE_COL_RES = createClosure(); - - /** Expected result is {@code IgfsFile}. */ - private static final GridPlainClosure, - Collection> PATH_COL_RES = createClosure(); - - /** Expected result is {@code IgfsPathSummary}. */ - private static final GridPlainClosure, IgfsPathSummary> SUMMARY_RES = - createClosure(); - - /** Expected result is {@code IgfsFile}. */ - private static final GridPlainClosure, - Collection> BLOCK_LOCATION_COL_RES = createClosure(); - - /** Grid name. */ - private final String grid; - - /** IGFS name. */ - private final String igfs; - - /** Client log. */ - private final Log log; - - /** Client IO. */ - private final IgfsHadoopIpcIo io; - - /** Event listeners. */ - private final Map lsnrs = new ConcurrentHashMap8<>(); - - /** - * Constructor for TCP endpoint. - * - * @param host Host. - * @param port Port. - * @param grid Grid name. - * @param igfs IGFS name. - * @param log Client logger. - * @throws IOException If failed. - */ - public IgfsHadoopOutProc(String host, int port, String grid, String igfs, Log log) throws IOException { - this(host, port, grid, igfs, false, log); - } - - /** - * Constructor for shmem endpoint. - * - * @param port Port. - * @param grid Grid name. - * @param igfs IGFS name. - * @param log Client logger. - * @throws IOException If failed. - */ - public IgfsHadoopOutProc(int port, String grid, String igfs, Log log) throws IOException { - this(null, port, grid, igfs, true, log); - } - - /** - * Constructor. - * - * @param host Host. - * @param port Port. - * @param grid Grid name. - * @param igfs IGFS name. - * @param shmem Shared memory flag. - * @param log Client logger. - * @throws IOException If failed. - */ - private IgfsHadoopOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log) - throws IOException { - assert host != null && !shmem || host == null && shmem : - "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']'; - - String endpoint = host != null ? host + ":" + port : "shmem:" + port; - - this.grid = grid; - this.igfs = igfs; - this.log = log; - - io = IgfsHadoopIpcIo.get(log, endpoint); - - io.addEventListener(this); - } - - /** {@inheritDoc} */ - @Override public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException { - final IgfsHandshakeRequest req = new IgfsHandshakeRequest(); - - req.gridName(grid); - req.igfsName(igfs); - req.logDirectory(logDir); - - return io.send(req).chain(HANDSHAKE_RES).get(); - } - - /** {@inheritDoc} */ - @Override public void close(boolean force) { - assert io != null; - - io.removeEventListener(this); - - if (force) - io.forceClose(); - else - io.release(); - } - - /** {@inheritDoc} */ - @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(INFO); - msg.path(path); - - return io.send(msg).chain(FILE_RES).get(); - } - - /** {@inheritDoc} */ - @Override public IgfsFile update(IgfsPath path, Map props) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(UPDATE); - msg.path(path); - msg.properties(props); - - return io.send(msg).chain(FILE_RES).get(); - } - - /** {@inheritDoc} */ - @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(SET_TIMES); - msg.path(path); - msg.accessTime(accessTime); - msg.modificationTime(modificationTime); - - return io.send(msg).chain(BOOL_RES).get(); - } - - /** {@inheritDoc} */ - @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(RENAME); - msg.path(src); - msg.destinationPath(dest); - - return io.send(msg).chain(BOOL_RES).get(); - } - - /** {@inheritDoc} */ - @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(DELETE); - msg.path(path); - msg.flag(recursive); - - return io.send(msg).chain(BOOL_RES).get(); - } - - /** {@inheritDoc} */ - @Override public Collection affinity(IgfsPath path, long start, long len) - throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(AFFINITY); - msg.path(path); - msg.start(start); - msg.length(len); - - return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get(); - } - - /** {@inheritDoc} */ - @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(PATH_SUMMARY); - msg.path(path); - - return io.send(msg).chain(SUMMARY_RES).get(); - } - - /** {@inheritDoc} */ - @Override public Boolean mkdirs(IgfsPath path, Map props) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(MAKE_DIRECTORIES); - msg.path(path); - msg.properties(props); - - return io.send(msg).chain(BOOL_RES).get(); - } - - /** {@inheritDoc} */ - @Override public Collection listFiles(IgfsPath path) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(LIST_FILES); - msg.path(path); - - return io.send(msg).chain(FILE_COL_RES).get(); - } - - /** {@inheritDoc} */ - @Override public Collection listPaths(IgfsPath path) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(LIST_PATHS); - msg.path(path); - - return io.send(msg).chain(PATH_COL_RES).get(); - } - - /** {@inheritDoc} */ - @Override public IgfsStatus fsStatus() throws IgniteCheckedException { - return io.send(new IgfsStatusRequest()).chain(STATUS_RES).get(); - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate open(IgfsPath path) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(OPEN_READ); - msg.path(path); - msg.flag(false); - - IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); - - return new IgfsHadoopStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length()); - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate open(IgfsPath path, - int seqReadsBeforePrefetch) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(OPEN_READ); - msg.path(path); - msg.flag(true); - msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch); - - IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); - - return new IgfsHadoopStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length()); - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, - int replication, long blockSize, @Nullable Map props) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(OPEN_CREATE); - msg.path(path); - msg.flag(overwrite); - msg.colocate(colocate); - msg.properties(props); - msg.replication(replication); - msg.blockSize(blockSize); - - Long streamId = io.send(msg).chain(LONG_RES).get(); - - return new IgfsHadoopStreamDelegate(this, streamId); - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate append(IgfsPath path, boolean create, - @Nullable Map props) throws IgniteCheckedException { - final IgfsPathControlRequest msg = new IgfsPathControlRequest(); - - msg.command(OPEN_APPEND); - msg.path(path); - msg.flag(create); - msg.properties(props); - - Long streamId = io.send(msg).chain(LONG_RES).get(); - - return new IgfsHadoopStreamDelegate(this, streamId); - } - - /** {@inheritDoc} */ - @Override public GridPlainFuture readData(IgfsHadoopStreamDelegate desc, long pos, int len, - final @Nullable byte[] outBuf, final int outOff, final int outLen) { - assert len > 0; - - final IgfsStreamControlRequest msg = new IgfsStreamControlRequest(); - - msg.command(READ_BLOCK); - msg.streamId((long) desc.target()); - msg.position(pos); - msg.length(len); - - try { - return io.send(msg, outBuf, outOff, outLen); - } - catch (IgniteCheckedException e) { - return new GridPlainFutureAdapter<>(e); - } - } - - /** {@inheritDoc} */ - @Override public void writeData(IgfsHadoopStreamDelegate desc, byte[] data, int off, int len) - throws IOException { - final IgfsStreamControlRequest msg = new IgfsStreamControlRequest(); - - msg.command(WRITE_BLOCK); - msg.streamId((long) desc.target()); - msg.data(data); - msg.position(off); - msg.length(len); - - try { - io.sendPlain(msg); - } - catch (IgniteCheckedException e) { - throw IgfsHadoopUtils.cast(e); - } - } - - /** {@inheritDoc} */ - @Override public void flush(IgfsHadoopStreamDelegate delegate) throws IOException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void closeStream(IgfsHadoopStreamDelegate desc) throws IOException { - final IgfsStreamControlRequest msg = new IgfsStreamControlRequest(); - - msg.command(CLOSE); - msg.streamId((long)desc.target()); - - try { - io.send(msg).chain(BOOL_RES).get(); - } - catch (IgniteCheckedException e) { - throw IgfsHadoopUtils.cast(e); - } - } - - /** {@inheritDoc} */ - @Override public void addEventListener(IgfsHadoopStreamDelegate desc, - IgfsHadoopStreamEventListener lsnr) { - long streamId = desc.target(); - - IgfsHadoopStreamEventListener lsnr0 = lsnrs.put(streamId, lsnr); - - assert lsnr0 == null || lsnr0 == lsnr; - - if (log.isDebugEnabled()) - log.debug("Added stream event listener [streamId=" + streamId + ']'); - } - - /** {@inheritDoc} */ - @Override public void removeEventListener(IgfsHadoopStreamDelegate desc) { - long streamId = desc.target(); - - IgfsHadoopStreamEventListener lsnr0 = lsnrs.remove(streamId); - - if (lsnr0 != null && log.isDebugEnabled()) - log.debug("Removed stream event listener [streamId=" + streamId + ']'); - } - - /** {@inheritDoc} */ - @Override public void onClose() { - for (IgfsHadoopStreamEventListener lsnr : lsnrs.values()) { - try { - lsnr.onClose(); - } - catch (IgniteCheckedException e) { - log.warn("Got exception from stream event listener (will ignore): " + lsnr, e); - } - } - } - - /** {@inheritDoc} */ - @Override public void onError(long streamId, String errMsg) { - IgfsHadoopStreamEventListener lsnr = lsnrs.get(streamId); - - if (lsnr != null) - lsnr.onError(errMsg); - else - log.warn("Received write error response for not registered output stream (will ignore) " + - "[streamId= " + streamId + ']'); - } - - /** - * Creates conversion closure for given type. - * - * @param Type of expected result. - * @return Conversion closure. - */ - @SuppressWarnings("unchecked") - private static GridPlainClosure, T> createClosure() { - return new GridPlainClosure, T>() { - @Override public T apply(GridPlainFuture fut) throws IgniteCheckedException { - IgfsControlResponse res = (IgfsControlResponse)fut.get(); - - if (res.hasError()) - res.throwError(); - - return (T)res.response(); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopOutputStream.java deleted file mode 100644 index ae5f980..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopOutputStream.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.commons.logging.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.igfs.common.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * IGFS Hadoop output stream implementation. - */ -public class IgfsHadoopOutputStream extends OutputStream implements IgfsHadoopStreamEventListener { - /** Log instance. */ - private Log log; - - /** Client logger. */ - private IgfsLogger clientLog; - - /** Log stream ID. */ - private long logStreamId; - - /** Server stream delegate. */ - private IgfsHadoopStreamDelegate delegate; - - /** Closed flag. */ - private volatile boolean closed; - - /** Flag set if stream was closed due to connection breakage. */ - private boolean connBroken; - - /** Error message. */ - private volatile String errMsg; - - /** Read time. */ - private long writeTime; - - /** User time. */ - private long userTime; - - /** Last timestamp. */ - private long lastTs; - - /** Amount of written bytes. */ - private long total; - - /** - * Creates light output stream. - * - * @param delegate Server stream delegate. - * @param log Logger to use. - * @param clientLog Client logger. - */ - public IgfsHadoopOutputStream(IgfsHadoopStreamDelegate delegate, Log log, - IgfsLogger clientLog, long logStreamId) { - this.delegate = delegate; - this.log = log; - this.clientLog = clientLog; - this.logStreamId = logStreamId; - - lastTs = System.nanoTime(); - - delegate.hadoop().addEventListener(delegate, this); - } - - /** - * Read start. - */ - private void writeStart() { - long now = System.nanoTime(); - - userTime += now - lastTs; - - lastTs = now; - } - - /** - * Read end. - */ - private void writeEnd() { - long now = System.nanoTime(); - - writeTime += now - lastTs; - - lastTs = now; - } - - /** {@inheritDoc} */ - @Override public void write(@NotNull byte[] b, int off, int len) throws IOException { - check(); - - writeStart(); - - try { - delegate.hadoop().writeData(delegate, b, off, len); - - total += len; - } - finally { - writeEnd(); - } - } - - /** {@inheritDoc} */ - @Override public void write(int b) throws IOException { - write(new byte[] {(byte)b}); - - total++; - } - - /** {@inheritDoc} */ - @Override public void flush() throws IOException { - delegate.hadoop().flush(delegate); - } - - /** {@inheritDoc} */ - @Override public void close() throws IOException { - if (!closed) { - if (log.isDebugEnabled()) - log.debug("Closing output stream: " + delegate); - - writeStart(); - - delegate.hadoop().closeStream(delegate); - - markClosed(false); - - writeEnd(); - - if (clientLog.isLogEnabled()) - clientLog.logCloseOut(logStreamId, userTime, writeTime, total); - - if (log.isDebugEnabled()) - log.debug("Closed output stream [delegate=" + delegate + ", writeTime=" + writeTime / 1000 + - ", userTime=" + userTime / 1000 + ']'); - } - else if(connBroken) - throw new IOException( - "Failed to close stream, because connection was broken (data could have been lost)."); - } - - /** - * Marks stream as closed. - * - * @param connBroken {@code True} if connection with server was lost. - */ - private void markClosed(boolean connBroken) { - // It is ok to have race here. - if (!closed) { - closed = true; - - delegate.hadoop().removeEventListener(delegate); - - this.connBroken = connBroken; - } - } - - /** - * @throws IOException If check failed. - */ - private void check() throws IOException { - String errMsg0 = errMsg; - - if (errMsg0 != null) - throw new IOException(errMsg0); - - if (closed) { - if (connBroken) - throw new IOException("Server connection was lost."); - else - throw new IOException("Stream is closed."); - } - } - - /** {@inheritDoc} */ - @Override public void onClose() throws IgniteCheckedException { - markClosed(true); - } - - /** {@inheritDoc} */ - @Override public void onError(String errMsg) { - this.errMsg = errMsg; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopProxyInputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopProxyInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopProxyInputStream.java deleted file mode 100644 index 330537d..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopProxyInputStream.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.hadoop.fs.*; -import org.apache.ignite.internal.igfs.common.*; - -import java.io.*; - -/** - * Secondary Hadoop file system input stream wrapper. - */ -public class IgfsHadoopProxyInputStream extends InputStream implements Seekable, PositionedReadable { - /** Actual input stream to the secondary file system. */ - private final FSDataInputStream is; - - /** Client logger. */ - private final IgfsLogger clientLog; - - /** Log stream ID. */ - private final long logStreamId; - - /** Read time. */ - private long readTime; - - /** User time. */ - private long userTime; - - /** Last timestamp. */ - private long lastTs; - - /** Amount of read bytes. */ - private long total; - - /** Closed flag. */ - private boolean closed; - - /** - * Constructor. - * - * @param is Actual input stream to the secondary file system. - * @param clientLog Client log. - */ - public IgfsHadoopProxyInputStream(FSDataInputStream is, IgfsLogger clientLog, long logStreamId) { - assert is != null; - assert clientLog != null; - - this.is = is; - this.clientLog = clientLog; - this.logStreamId = logStreamId; - - lastTs = System.nanoTime(); - } - - /** {@inheritDoc} */ - @Override public synchronized int read(byte[] b) throws IOException { - readStart(); - - int res; - - try { - res = is.read(b); - } - finally { - readEnd(); - } - - if (res != -1) - total += res; - - return res; - } - - /** {@inheritDoc} */ - @Override public synchronized int read(byte[] b, int off, int len) throws IOException { - readStart(); - - int res; - - try { - res = super.read(b, off, len); - } - finally { - readEnd(); - } - - if (res != -1) - total += res; - - return res; - } - - /** {@inheritDoc} */ - @Override public synchronized long skip(long n) throws IOException { - readStart(); - - long res; - - try { - res = is.skip(n); - } - finally { - readEnd(); - } - - if (clientLog.isLogEnabled()) - clientLog.logSkip(logStreamId, res); - - return res; - } - - /** {@inheritDoc} */ - @Override public synchronized int available() throws IOException { - readStart(); - - try { - return is.available(); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized void close() throws IOException { - if (!closed) { - closed = true; - - readStart(); - - try { - is.close(); - } - finally { - readEnd(); - } - - if (clientLog.isLogEnabled()) - clientLog.logCloseIn(logStreamId, userTime, readTime, total); - } - } - - /** {@inheritDoc} */ - @Override public synchronized void mark(int readLimit) { - readStart(); - - try { - is.mark(readLimit); - } - finally { - readEnd(); - } - - if (clientLog.isLogEnabled()) - clientLog.logMark(logStreamId, readLimit); - } - - /** {@inheritDoc} */ - @Override public synchronized void reset() throws IOException { - readStart(); - - try { - is.reset(); - } - finally { - readEnd(); - } - - if (clientLog.isLogEnabled()) - clientLog.logReset(logStreamId); - } - - /** {@inheritDoc} */ - @Override public synchronized boolean markSupported() { - readStart(); - - try { - return is.markSupported(); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized int read() throws IOException { - readStart(); - - int res; - - try { - res = is.read(); - } - finally { - readEnd(); - } - - if (res != -1) - total++; - - return res; - } - - /** {@inheritDoc} */ - @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException { - readStart(); - - int res; - - try { - res = is.read(pos, buf, off, len); - } - finally { - readEnd(); - } - - if (res != -1) - total += res; - - if (clientLog.isLogEnabled()) - clientLog.logRandomRead(logStreamId, pos, res); - - return res; - } - - /** {@inheritDoc} */ - @Override public synchronized void readFully(long pos, byte[] buf, int off, int len) throws IOException { - readStart(); - - try { - is.readFully(pos, buf, off, len); - } - finally { - readEnd(); - } - - total += len; - - if (clientLog.isLogEnabled()) - clientLog.logRandomRead(logStreamId, pos, len); - } - - /** {@inheritDoc} */ - @Override public synchronized void readFully(long pos, byte[] buf) throws IOException { - readStart(); - - try { - is.readFully(pos, buf); - } - finally { - readEnd(); - } - - total += buf.length; - - if (clientLog.isLogEnabled()) - clientLog.logRandomRead(logStreamId, pos, buf.length); - } - - /** {@inheritDoc} */ - @Override public synchronized void seek(long pos) throws IOException { - readStart(); - - try { - is.seek(pos); - } - finally { - readEnd(); - } - - if (clientLog.isLogEnabled()) - clientLog.logSeek(logStreamId, pos); - } - - /** {@inheritDoc} */ - @Override public synchronized long getPos() throws IOException { - readStart(); - - try { - return is.getPos(); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized boolean seekToNewSource(long targetPos) throws IOException { - readStart(); - - try { - return is.seekToNewSource(targetPos); - } - finally { - readEnd(); - } - } - - /** - * Read start. - */ - private void readStart() { - long now = System.nanoTime(); - - userTime += now - lastTs; - - lastTs = now; - } - - /** - * Read end. - */ - private void readEnd() { - long now = System.nanoTime(); - - readTime += now - lastTs; - - lastTs = now; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopProxyOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopProxyOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopProxyOutputStream.java deleted file mode 100644 index 41e80eb..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopProxyOutputStream.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.hadoop.fs.*; -import org.apache.ignite.internal.igfs.common.*; - -import java.io.*; - -/** - * Secondary Hadoop file system output stream wrapper. - */ -public class IgfsHadoopProxyOutputStream extends OutputStream { - /** Actual output stream. */ - private FSDataOutputStream os; - - /** Client logger. */ - private final IgfsLogger clientLog; - - /** Log stream ID. */ - private final long logStreamId; - - /** Read time. */ - private long writeTime; - - /** User time. */ - private long userTime; - - /** Last timestamp. */ - private long lastTs; - - /** Amount of written bytes. */ - private long total; - - /** Closed flag. */ - private boolean closed; - - /** - * Constructor. - * - * @param os Actual output stream. - * @param clientLog Client logger. - * @param logStreamId Log stream ID. - */ - public IgfsHadoopProxyOutputStream(FSDataOutputStream os, IgfsLogger clientLog, long logStreamId) { - assert os != null; - assert clientLog != null; - - this.os = os; - this.clientLog = clientLog; - this.logStreamId = logStreamId; - - lastTs = System.nanoTime(); - } - - /** {@inheritDoc} */ - @Override public synchronized void write(int b) throws IOException { - writeStart(); - - try { - os.write(b); - } - finally { - writeEnd(); - } - - total++; - } - - /** {@inheritDoc} */ - @Override public synchronized void write(byte[] b) throws IOException { - writeStart(); - - try { - os.write(b); - } - finally { - writeEnd(); - } - - total += b.length; - } - - /** {@inheritDoc} */ - @Override public synchronized void write(byte[] b, int off, int len) throws IOException { - writeStart(); - - try { - os.write(b, off, len); - } - finally { - writeEnd(); - } - - total += len; - } - - /** {@inheritDoc} */ - @Override public synchronized void flush() throws IOException { - writeStart(); - - try { - os.flush(); - } - finally { - writeEnd(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized void close() throws IOException { - if (!closed) { - closed = true; - - writeStart(); - - try { - os.close(); - } - finally { - writeEnd(); - } - - if (clientLog.isLogEnabled()) - clientLog.logCloseOut(logStreamId, userTime, writeTime, total); - } - } - - /** - * Read start. - */ - private void writeStart() { - long now = System.nanoTime(); - - userTime += now - lastTs; - - lastTs = now; - } - - /** - * Read end. - */ - private void writeEnd() { - long now = System.nanoTime(); - - writeTime += now - lastTs; - - lastTs = now; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopReader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopReader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopReader.java deleted file mode 100644 index 3ab3acc..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopReader.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * Secondary file system input stream wrapper which actually opens input stream only in case it is explicitly - * requested. - *

- * The class is expected to be used only from synchronized context and therefore is not tread-safe. - */ -public class IgfsHadoopReader implements IgfsReader { - /** Secondary file system. */ - private final FileSystem fs; - - /** Path to the file to open. */ - private final Path path; - - /** Buffer size. */ - private final int bufSize; - - /** Actual input stream. */ - private FSDataInputStream in; - - /** Cached error occurred during output stream open. */ - private IOException err; - - /** Flag indicating that the stream was already opened. */ - private boolean opened; - - /** - * Constructor. - * - * @param fs Secondary file system. - * @param path Path to the file to open. - * @param bufSize Buffer size. - */ - public IgfsHadoopReader(FileSystem fs, Path path, int bufSize) { - assert fs != null; - assert path != null; - - this.fs = fs; - this.path = path; - this.bufSize = bufSize; - } - - /** Get input stream. */ - private PositionedReadable in() throws IOException { - if (opened) { - if (err != null) - throw err; - } - else { - opened = true; - - try { - in = fs.open(path, bufSize); - - if (in == null) - throw new IOException("Failed to open input stream (file system returned null): " + path); - } - catch (IOException e) { - err = e; - - throw err; - } - } - - return in; - } - - /** - * Close wrapped input stream in case it was previously opened. - */ - @Override public void close() { - U.closeQuiet(in); - } - - /** {@inheritDoc} */ - @Override public int read(long pos, byte[] buf, int off, int len) throws IOException { - return in().read(pos, buf, off, len); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java deleted file mode 100644 index 9aaab4c..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamDelegate.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.internal.util.typedef.internal.*; - -/** - * IGFS Hadoop stream descriptor. - */ -public class IgfsHadoopStreamDelegate { - /** RPC handler. */ - private final IgfsHadoopEx hadoop; - - /** Target. */ - private final Object target; - - /** Optional stream length. */ - private final long len; - - /** - * Constructor. - * - * @param target Target. - */ - public IgfsHadoopStreamDelegate(IgfsHadoopEx hadoop, Object target) { - this(hadoop, target, -1); - } - - /** - * Constructor. - * - * @param target Target. - * @param len Optional length. - */ - public IgfsHadoopStreamDelegate(IgfsHadoopEx hadoop, Object target, long len) { - assert hadoop != null; - assert target != null; - - this.hadoop = hadoop; - this.target = target; - this.len = len; - } - - /** - * @return RPC handler. - */ - public IgfsHadoopEx hadoop() { - return hadoop; - } - - /** - * @return Stream target. - */ - @SuppressWarnings("unchecked") - public T target() { - return (T) target; - } - - /** - * @return Length. - */ - public long length() { - return len; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return System.identityHashCode(target); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return obj != null && obj instanceof IgfsHadoopStreamDelegate && - target == ((IgfsHadoopStreamDelegate)obj).target; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsHadoopStreamDelegate.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java deleted file mode 100644 index 20d7f2a..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopStreamEventListener.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.*; - -/** - * IGFS input stream event listener. - */ -public interface IgfsHadoopStreamEventListener { - /** - * Callback invoked when the stream is being closed. - * - * @throws IgniteCheckedException If failed. - */ - public void onClose() throws IgniteCheckedException; - - /** - * Callback invoked when remote error occurs. - * - * @param errMsg Error message. - */ - public void onError(String errMsg); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java deleted file mode 100644 index bd96e60..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopUtils.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Utility constants and methods for IGFS Hadoop file system. - */ -public class IgfsHadoopUtils { - /** Parameter name for endpoint no embed mode flag. */ - public static final String PARAM_IGFS_ENDPOINT_NO_EMBED = "fs.igfs.%s.endpoint.no_embed"; - - /** Parameter name for endpoint no shared memory flag. */ - public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM = "fs.igfs.%s.endpoint.no_local_shmem"; - - /** Parameter name for endpoint no local TCP flag. */ - public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP = "fs.igfs.%s.endpoint.no_local_tcp"; - - /** - * Get string parameter. - * - * @param cfg Configuration. - * @param name Parameter name. - * @param authority Authority. - * @param dflt Default value. - * @return String value. - */ - public static String parameter(Configuration cfg, String name, String authority, String dflt) { - return cfg.get(String.format(name, authority != null ? authority : ""), dflt); - } - - /** - * Get integer parameter. - * - * @param cfg Configuration. - * @param name Parameter name. - * @param authority Authority. - * @param dflt Default value. - * @return Integer value. - * @throws IOException In case of parse exception. - */ - public static int parameter(Configuration cfg, String name, String authority, int dflt) throws IOException { - String name0 = String.format(name, authority != null ? authority : ""); - - try { - return cfg.getInt(name0, dflt); - } - catch (NumberFormatException ignore) { - throw new IOException("Failed to parse parameter value to integer: " + name0); - } - } - - /** - * Get boolean parameter. - * - * @param cfg Configuration. - * @param name Parameter name. - * @param authority Authority. - * @param dflt Default value. - * @return Boolean value. - */ - public static boolean parameter(Configuration cfg, String name, String authority, boolean dflt) { - return cfg.getBoolean(String.format(name, authority != null ? authority : ""), dflt); - } - - /** - * Cast Ignite exception to appropriate IO exception. - * - * @param e Exception to cast. - * @return Casted exception. - */ - public static IOException cast(IgniteCheckedException e) { - return cast(e, null); - } - - /** - * Cast Ignite exception to appropriate IO exception. - * - * @param e Exception to cast. - * @param path Path for exceptions. - * @return Casted exception. - */ - @SuppressWarnings("unchecked") - public static IOException cast(IgniteCheckedException e, @Nullable String path) { - assert e != null; - - // First check for any nested IOException; if exists - re-throw it. - if (e.hasCause(IOException.class)) - return e.getCause(IOException.class); - else if (e.hasCause(IgfsFileNotFoundException.class)) - return new FileNotFoundException(path); // TODO: Or PathNotFoundException? - else if (e.hasCause(IgfsParentNotDirectoryException.class)) - return new ParentNotDirectoryException(path); - else if (path != null && e.hasCause(IgfsDirectoryNotEmptyException.class)) - return new PathIsNotEmptyDirectoryException(path); - else if (path != null && e.hasCause(IgfsPathAlreadyExistsException.class)) - return new PathExistsException(path); - else - return new IOException(e); - } - - /** - * Constructor. - */ - private IgfsHadoopUtils() { - // No-op. - } -}