Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 72959200B82 for ; Fri, 16 Sep 2016 13:20:49 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 716F9160ADB; Fri, 16 Sep 2016 11:20:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C8E9E160AE6 for ; Fri, 16 Sep 2016 13:20:46 +0200 (CEST) Received: (qmail 39521 invoked by uid 500); 16 Sep 2016 11:20:45 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 39119 invoked by uid 99); 16 Sep 2016 11:20:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Sep 2016 11:20:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5CD00E0B49; Fri, 16 Sep 2016 11:20:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Fri, 16 Sep 2016 11:20:57 -0000 Message-Id: <4ba643707257461da7e056d124bcad1c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [14/51] [partial] ignite git commit: IGNITE-3916: Initial impl. archived-at: Fri, 16 Sep 2016 11:20:49 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java deleted file mode 100644 index 3220538..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java +++ /dev/null @@ -1,510 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.commons.logging.Log; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.igfs.IgfsBlockLocation; -import org.apache.ignite.igfs.IgfsFile; -import org.apache.ignite.igfs.IgfsInputStream; -import org.apache.ignite.igfs.IgfsOutputStream; -import org.apache.ignite.igfs.IgfsPath; -import org.apache.ignite.igfs.IgfsPathSummary; -import org.apache.ignite.igfs.IgfsUserContext; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.igfs.IgfsEx; -import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; -import org.apache.ignite.internal.processors.igfs.IgfsStatus; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; -import org.apache.ignite.internal.util.future.GridFinishedFuture; -import org.apache.ignite.lang.IgniteOutClosure; -import org.jetbrains.annotations.Nullable; - -/** - * Communication with grid in the same process. - */ -public class HadoopIgfsInProc implements HadoopIgfsEx { - /** Target IGFS. */ - private final IgfsEx igfs; - - /** Buffer size. */ - private final int bufSize; - - /** Event listeners. */ - private final Map lsnrs = - new ConcurrentHashMap<>(); - - /** Logger. */ - private final Log log; - - /** The user this Igfs works on behalf of. */ - private final String user; - - /** - * Constructor. - * - * @param igfs Target IGFS. - * @param log Log. - */ - public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteCheckedException { - this.user = IgfsUtils.fixUserName(userName); - - this.igfs = igfs; - - this.log = log; - - bufSize = igfs.configuration().getBlockSize() * 2; - } - - /** {@inheritDoc} */ - @Override public IgfsHandshakeResponse handshake(final String logDir) { - return IgfsUserContext.doAs(user, new IgniteOutClosure() { - @Override public IgfsHandshakeResponse apply() { - igfs.clientLogDirectory(logDir); - - return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(), - igfs.globalSampling()); - } - }); - } - - /** {@inheritDoc} */ - @Override public void close(boolean force) { - // Perform cleanup. - for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) { - try { - lsnr.onClose(); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to notify stream event listener", e); - } - } - } - - /** {@inheritDoc} */ - @Override public IgfsFile info(final IgfsPath path) throws IgniteCheckedException { - try { - return IgfsUserContext.doAs(user, new IgniteOutClosure() { - @Override public IgfsFile apply() { - return igfs.info(path); - } - }); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to get file info because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsFile update(final IgfsPath path, final Map props) throws IgniteCheckedException { - try { - return IgfsUserContext.doAs(user, new IgniteOutClosure() { - @Override public IgfsFile apply() { - return igfs.update(path, props); - } - }); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to update file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) throws IgniteCheckedException { - try { - IgfsUserContext.doAs(user, new IgniteOutClosure() { - @Override public Void apply() { - igfs.setTimes(path, accessTime, modificationTime); - - return null; - } - }); - - return true; - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to set path times because Grid is stopping: " + - path); - } - } - - /** {@inheritDoc} */ - @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IgniteCheckedException { - try { - IgfsUserContext.doAs(user, new IgniteOutClosure() { - @Override public Void apply() { - igfs.rename(src, dest); - - return null; - } - }); - - return true; - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to rename path because Grid is stopping: " + src); - } - } - - /** {@inheritDoc} */ - @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IgniteCheckedException { - try { - return IgfsUserContext.doAs(user, new IgniteOutClosure() { - @Override public Boolean apply() { - return igfs.delete(path, recursive); - } - }); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to delete path because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsStatus fsStatus() throws IgniteCheckedException { - try { - return IgfsUserContext.doAs(user, new Callable() { - @Override public IgfsStatus call() throws IgniteCheckedException { - return igfs.globalSpace(); - } - }); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to get file system status because Grid is " + - "stopping."); - } - catch (IgniteCheckedException | RuntimeException | Error e) { - throw e; - } - catch (Exception e) { - throw new AssertionError("Must never go there."); - } - } - - /** {@inheritDoc} */ - @Override public Collection listPaths(final IgfsPath path) throws IgniteCheckedException { - try { - return IgfsUserContext.doAs(user, new IgniteOutClosure>() { - @Override public Collection apply() { - return igfs.listPaths(path); - } - }); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to list paths because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Collection listFiles(final IgfsPath path) throws IgniteCheckedException { - try { - return IgfsUserContext.doAs(user, new IgniteOutClosure>() { - @Override public Collection apply() { - return igfs.listFiles(path); - } - }); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to list files because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Boolean mkdirs(final IgfsPath path, final Map props) throws IgniteCheckedException { - try { - IgfsUserContext.doAs(user, new IgniteOutClosure() { - @Override public Void apply() { - igfs.mkdirs(path, props); - - return null; - } - }); - - return true; - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to create directory because Grid is stopping: " + - path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IgniteCheckedException { - try { - return IgfsUserContext.doAs(user, new IgniteOutClosure() { - @Override public IgfsPathSummary apply() { - return igfs.summary(path); - } - }); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to get content summary because Grid is stopping: " + - path); - } - } - - /** {@inheritDoc} */ - @Override public Collection affinity(final IgfsPath path, final long start, final long len) - throws IgniteCheckedException { - try { - return IgfsUserContext.doAs(user, new IgniteOutClosure>() { - @Override public Collection apply() { - return igfs.affinity(path, start, len); - } - }); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to get affinity because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IgniteCheckedException { - try { - return IgfsUserContext.doAs(user, new IgniteOutClosure() { - @Override public HadoopIgfsStreamDelegate apply() { - IgfsInputStream stream = igfs.open(path, bufSize); - - return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.length()); - } - }); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to open file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch) - throws IgniteCheckedException { - try { - return IgfsUserContext.doAs(user, new IgniteOutClosure() { - @Override public HadoopIgfsStreamDelegate apply() { - IgfsInputStream stream = igfs.open(path, bufSize, seqReadsBeforePrefetch); - - return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.length()); - } - }); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to open file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, final boolean colocate, - final int replication, final long blockSize, final @Nullable Map props) throws IgniteCheckedException { - try { - return IgfsUserContext.doAs(user, new IgniteOutClosure() { - @Override public HadoopIgfsStreamDelegate apply() { - IgfsOutputStream stream = igfs.create(path, bufSize, overwrite, - colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props); - - return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream); - } - }); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to create file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create, - final @Nullable Map props) throws IgniteCheckedException { - try { - return IgfsUserContext.doAs(user, new IgniteOutClosure() { - @Override public HadoopIgfsStreamDelegate apply() { - IgfsOutputStream stream = igfs.append(path, bufSize, create, props); - - return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream); - } - }); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new HadoopIgfsCommunicationException("Failed to append file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture readData(HadoopIgfsStreamDelegate delegate, long pos, int len, - @Nullable byte[] outBuf, int outOff, int outLen) { - IgfsInputStream stream = delegate.target(); - - try { - byte[] res = null; - - if (outBuf != null) { - int outTailLen = outBuf.length - outOff; - - if (len <= outTailLen) - stream.readFully(pos, outBuf, outOff, len); - else { - stream.readFully(pos, outBuf, outOff, outTailLen); - - int remainderLen = len - outTailLen; - - res = new byte[remainderLen]; - - stream.readFully(pos, res, 0, remainderLen); - } - } else { - res = new byte[len]; - - stream.readFully(pos, res, 0, len); - } - - return new GridFinishedFuture<>(res); - } - catch (IllegalStateException | IOException e) { - HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate); - - if (lsnr != null) - lsnr.onError(e.getMessage()); - - return new GridFinishedFuture<>(e); - } - } - - /** {@inheritDoc} */ - @Override public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int off, int len) - throws IOException { - try { - IgfsOutputStream stream = delegate.target(); - - stream.write(data, off, len); - } - catch (IllegalStateException | IOException e) { - HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate); - - if (lsnr != null) - lsnr.onError(e.getMessage()); - - if (e instanceof IllegalStateException) - throw new IOException("Failed to write data to IGFS stream because Grid is stopping.", e); - else - throw e; - } - } - - /** {@inheritDoc} */ - @Override public void flush(HadoopIgfsStreamDelegate delegate) throws IOException { - try { - IgfsOutputStream stream = delegate.target(); - - stream.flush(); - } - catch (IllegalStateException | IOException e) { - HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate); - - if (lsnr != null) - lsnr.onError(e.getMessage()); - - if (e instanceof IllegalStateException) - throw new IOException("Failed to flush data to IGFS stream because Grid is stopping.", e); - else - throw e; - } - } - - /** {@inheritDoc} */ - @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws IOException { - Closeable closeable = desc.target(); - - try { - closeable.close(); - } - catch (IllegalStateException e) { - throw new IOException("Failed to close IGFS stream because Grid is stopping.", e); - } - } - - /** {@inheritDoc} */ - @Override public void addEventListener(HadoopIgfsStreamDelegate delegate, - HadoopIgfsStreamEventListener lsnr) { - HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(delegate, lsnr); - - assert lsnr0 == null || lsnr0 == lsnr; - - if (log.isDebugEnabled()) - log.debug("Added stream event listener [delegate=" + delegate + ']'); - } - - /** {@inheritDoc} */ - @Override public void removeEventListener(HadoopIgfsStreamDelegate delegate) { - HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(delegate); - - if (lsnr0 != null && log.isDebugEnabled()) - log.debug("Removed stream event listener [delegate=" + delegate + ']'); - } - - /** {@inheritDoc} */ - @Override public String user() { - return user; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java deleted file mode 100644 index 46b46d7..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java +++ /dev/null @@ -1,629 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import org.apache.commons.logging.Log; -import org.apache.hadoop.fs.PositionedReadable; -import org.apache.hadoop.fs.Seekable; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.igfs.common.IgfsLogger; -import org.apache.ignite.internal.util.typedef.internal.A; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.NotNull; - -/** - * IGFS input stream wrapper for hadoop interfaces. - */ -@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") -public final class HadoopIgfsInputStream extends InputStream implements Seekable, PositionedReadable, - HadoopIgfsStreamEventListener { - /** Minimum buffer size. */ - private static final int MIN_BUF_SIZE = 4 * 1024; - - /** Server stream delegate. */ - private HadoopIgfsStreamDelegate delegate; - - /** Stream ID used by logger. */ - private long logStreamId; - - /** Stream position. */ - private long pos; - - /** Stream read limit. */ - private long limit; - - /** Mark position. */ - private long markPos = -1; - - /** Prefetch buffer. */ - private DoubleFetchBuffer buf = new DoubleFetchBuffer(); - - /** Buffer half size for double-buffering. */ - private int bufHalfSize; - - /** Closed flag. */ - private volatile boolean closed; - - /** Flag set if stream was closed due to connection breakage. */ - private boolean connBroken; - - /** Logger. */ - private Log log; - - /** Client logger. */ - private IgfsLogger clientLog; - - /** Read time. */ - private long readTime; - - /** User time. */ - private long userTime; - - /** Last timestamp. */ - private long lastTs; - - /** Amount of read bytes. */ - private long total; - - /** - * Creates input stream. - * - * @param delegate Server stream delegate. - * @param limit Read limit. - * @param bufSize Buffer size. - * @param log Log. - * @param clientLog Client logger. - */ - public HadoopIgfsInputStream(HadoopIgfsStreamDelegate delegate, long limit, int bufSize, Log log, - IgfsLogger clientLog, long logStreamId) { - assert limit >= 0; - - this.delegate = delegate; - this.limit = limit; - this.log = log; - this.clientLog = clientLog; - this.logStreamId = logStreamId; - - bufHalfSize = Math.max(bufSize, MIN_BUF_SIZE); - - lastTs = System.nanoTime(); - - delegate.hadoop().addEventListener(delegate, this); - } - - /** - * Read start. - */ - private void readStart() { - long now = System.nanoTime(); - - userTime += now - lastTs; - - lastTs = now; - } - - /** - * Read end. - */ - private void readEnd() { - long now = System.nanoTime(); - - readTime += now - lastTs; - - lastTs = now; - } - - /** {@inheritDoc} */ - @Override public synchronized int read() throws IOException { - checkClosed(); - - readStart(); - - try { - if (eof()) - return -1; - - buf.refreshAhead(pos); - - int res = buf.atPosition(pos); - - pos++; - total++; - - buf.refreshAhead(pos); - - return res; - } - catch (IgniteCheckedException e) { - throw HadoopIgfsUtils.cast(e); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException { - checkClosed(); - - if (eof()) - return -1; - - readStart(); - - try { - long remaining = limit - pos; - - int read = buf.flatten(b, pos, off, len); - - pos += read; - total += read; - remaining -= read; - - if (remaining > 0 && read != len) { - int readAmt = (int)Math.min(remaining, len - read); - - delegate.hadoop().readData(delegate, pos, readAmt, b, off + read, len - read).get(); - - read += readAmt; - pos += readAmt; - total += readAmt; - } - - buf.refreshAhead(pos); - - return read; - } - catch (IgniteCheckedException e) { - throw HadoopIgfsUtils.cast(e); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized long skip(long n) throws IOException { - checkClosed(); - - if (clientLog.isLogEnabled()) - clientLog.logSkip(logStreamId, n); - - long oldPos = pos; - - if (pos + n <= limit) - pos += n; - else - pos = limit; - - buf.refreshAhead(pos); - - return pos - oldPos; - } - - /** {@inheritDoc} */ - @Override public synchronized int available() throws IOException { - checkClosed(); - - int available = buf.available(pos); - - assert available >= 0; - - return available; - } - - /** {@inheritDoc} */ - @Override public synchronized void close() throws IOException { - if (!closed) { - readStart(); - - if (log.isDebugEnabled()) - log.debug("Closing input stream: " + delegate); - - delegate.hadoop().closeStream(delegate); - - readEnd(); - - if (clientLog.isLogEnabled()) - clientLog.logCloseIn(logStreamId, userTime, readTime, total); - - markClosed(false); - - if (log.isDebugEnabled()) - log.debug("Closed stream [delegate=" + delegate + ", readTime=" + readTime + - ", userTime=" + userTime + ']'); - } - } - - /** {@inheritDoc} */ - @Override public synchronized void mark(int readLimit) { - markPos = pos; - - if (clientLog.isLogEnabled()) - clientLog.logMark(logStreamId, readLimit); - } - - /** {@inheritDoc} */ - @Override public synchronized void reset() throws IOException { - checkClosed(); - - if (clientLog.isLogEnabled()) - clientLog.logReset(logStreamId); - - if (markPos == -1) - throw new IOException("Stream was not marked."); - - pos = markPos; - - buf.refreshAhead(pos); - } - - /** {@inheritDoc} */ - @Override public boolean markSupported() { - return true; - } - - /** {@inheritDoc} */ - @Override public synchronized int read(long position, byte[] buf, int off, int len) throws IOException { - long remaining = limit - position; - - int read = (int)Math.min(len, remaining); - - // Return -1 at EOF. - if (read == 0) - return -1; - - readFully(position, buf, off, read); - - return read; - } - - /** {@inheritDoc} */ - @Override public synchronized void readFully(long position, byte[] buf, int off, int len) throws IOException { - long remaining = limit - position; - - checkClosed(); - - if (len > remaining) - throw new EOFException("End of stream reached before data was fully read."); - - readStart(); - - try { - int read = this.buf.flatten(buf, position, off, len); - - total += read; - - if (read != len) { - int readAmt = len - read; - - delegate.hadoop().readData(delegate, position + read, readAmt, buf, off + read, readAmt).get(); - - total += readAmt; - } - - if (clientLog.isLogEnabled()) - clientLog.logRandomRead(logStreamId, position, len); - } - catch (IgniteCheckedException e) { - throw HadoopIgfsUtils.cast(e); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public void readFully(long position, byte[] buf) throws IOException { - readFully(position, buf, 0, buf.length); - } - - /** {@inheritDoc} */ - @Override public synchronized void seek(long pos) throws IOException { - A.ensure(pos >= 0, "position must be non-negative"); - - checkClosed(); - - if (clientLog.isLogEnabled()) - clientLog.logSeek(logStreamId, pos); - - if (pos > limit) - pos = limit; - - if (log.isDebugEnabled()) - log.debug("Seek to position [delegate=" + delegate + ", pos=" + pos + ", oldPos=" + this.pos + ']'); - - this.pos = pos; - - buf.refreshAhead(pos); - } - - /** {@inheritDoc} */ - @Override public synchronized long getPos() { - return pos; - } - - /** {@inheritDoc} */ - @Override public synchronized boolean seekToNewSource(long targetPos) { - return false; - } - - /** {@inheritDoc} */ - @Override public void onClose() { - markClosed(true); - } - - /** {@inheritDoc} */ - @Override public void onError(String errMsg) { - // No-op. - } - - /** - * Marks stream as closed. - * - * @param connBroken {@code True} if connection with server was lost. - */ - private void markClosed(boolean connBroken) { - // It is ok to have race here. - if (!closed) { - closed = true; - - this.connBroken = connBroken; - - delegate.hadoop().removeEventListener(delegate); - } - } - - /** - * @throws IOException If check failed. - */ - private void checkClosed() throws IOException { - if (closed) { - if (connBroken) - throw new IOException("Server connection was lost."); - else - throw new IOException("Stream is closed."); - } - } - - /** - * @return {@code True} if end of stream reached. - */ - private boolean eof() { - return limit == pos; - } - - /** - * Asynchronous prefetch buffer. - */ - private static class FetchBufferPart { - /** Read future. */ - private IgniteInternalFuture readFut; - - /** Position of cached chunk in file. */ - private long pos; - - /** Prefetch length. Need to store as read future result might be not available yet. */ - private int len; - - /** - * Creates fetch buffer part. - * - * @param readFut Read future for this buffer. - * @param pos Read position. - * @param len Chunk length. - */ - private FetchBufferPart(IgniteInternalFuture readFut, long pos, int len) { - this.readFut = readFut; - this.pos = pos; - this.len = len; - } - - /** - * Copies cached data if specified position matches cached region. - * - * @param dst Destination buffer. - * @param pos Read position in file. - * @param dstOff Offset in destination buffer from which start writing. - * @param len Maximum number of bytes to copy. - * @return Number of bytes copied. - * @throws IgniteCheckedException If read future failed. - */ - public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException { - // If read start position is within cached boundaries. - if (contains(pos)) { - byte[] data = readFut.get(); - - int srcPos = (int)(pos - this.pos); - int cpLen = Math.min(len, data.length - srcPos); - - U.arrayCopy(data, srcPos, dst, dstOff, cpLen); - - return cpLen; - } - - return 0; - } - - /** - * @return {@code True} if data is ready to be read. - */ - public boolean ready() { - return readFut.isDone(); - } - - /** - * Checks if current buffer part contains given position. - * - * @param pos Position to check. - * @return {@code True} if position matches buffer region. - */ - public boolean contains(long pos) { - return this.pos <= pos && this.pos + len > pos; - } - } - - private class DoubleFetchBuffer { - /** */ - private FetchBufferPart first; - - /** */ - private FetchBufferPart second; - - /** - * Copies fetched data from both buffers to destination array if cached region matched read position. - * - * @param dst Destination buffer. - * @param pos Read position in file. - * @param dstOff Destination buffer offset. - * @param len Maximum number of bytes to copy. - * @return Number of bytes copied. - * @throws IgniteCheckedException If any read operation failed. - */ - public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException { - assert dstOff >= 0; - assert dstOff + len <= dst.length : "Invalid indices [dst.length=" + dst.length + ", dstOff=" + dstOff + - ", len=" + len + ']'; - - int bytesCopied = 0; - - if (first != null) { - bytesCopied += first.flatten(dst, pos, dstOff, len); - - if (bytesCopied != len && second != null) { - assert second.pos == first.pos + first.len; - - bytesCopied += second.flatten(dst, pos + bytesCopied, dstOff + bytesCopied, len - bytesCopied); - } - } - - return bytesCopied; - } - - /** - * Gets byte at specified position in buffer. - * - * @param pos Stream position. - * @return Read byte. - * @throws IgniteCheckedException If read failed. - */ - public int atPosition(long pos) throws IgniteCheckedException { - // Should not reach here if stream contains no data. - assert first != null; - - if (first.contains(pos)) { - byte[] bytes = first.readFut.get(); - - return bytes[((int)(pos - first.pos))] & 0xFF; - } - else { - assert second != null; - assert second.contains(pos); - - byte[] bytes = second.readFut.get(); - - return bytes[((int)(pos - second.pos))] & 0xFF; - } - } - - /** - * Starts asynchronous buffer refresh if needed, depending on current position. - * - * @param pos Current stream position. - */ - public void refreshAhead(long pos) { - if (fullPrefetch(pos)) { - first = fetch(pos, bufHalfSize); - second = fetch(pos + bufHalfSize, bufHalfSize); - } - else if (needFlip(pos)) { - first = second; - - second = fetch(first.pos + first.len, bufHalfSize); - } - } - - /** - * @param pos Position from which read is expected. - * @return Number of bytes available to be read without blocking. - */ - public int available(long pos) { - int available = 0; - - if (first != null) { - if (first.contains(pos)) { - if (first.ready()) { - available += (pos - first.pos); - - if (second != null && second.ready()) - available += second.len; - } - } - else { - if (second != null && second.contains(pos) && second.ready()) - available += (pos - second.pos); - } - } - - return available; - } - - /** - * Checks if position shifted enough to forget previous buffer. - * - * @param pos Current position. - * @return {@code True} if need flip buffers. - */ - private boolean needFlip(long pos) { - // Return true if we read more then half of second buffer. - return second != null && second.contains(pos); - } - - /** - * Determines if all cached bytes should be discarded and new region should be - * prefetched. - * - * @param curPos Current stream position. - * @return {@code True} if need to refresh both blocks. - */ - private boolean fullPrefetch(long curPos) { - // If no data was prefetched yet, return true. - return first == null || curPos < first.pos || (second != null && curPos >= second.pos + second.len); - } - - /** - * Starts asynchronous fetch for given region. - * - * @param pos Position to read from. - * @param size Number of bytes to read. - * @return Fetch buffer part. - */ - private FetchBufferPart fetch(long pos, int size) { - long remaining = limit - pos; - - size = (int)Math.min(size, remaining); - - return size <= 0 ? null : - new FetchBufferPart(delegate.hadoop().readData(delegate, pos, size, null, 0, 0), pos, size); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIo.java deleted file mode 100644 index 70f645f..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIo.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.igfs.common.IgfsMessage; -import org.jetbrains.annotations.Nullable; - -/** - * IO abstraction layer for IGFS client. Two kind of messages are expected to be sent: requests with response - * and request without response. - */ -public interface HadoopIgfsIo { - /** - * Sends given IGFS client message and asynchronously awaits for response. - * - * @param msg Message to send. - * @return Future that will be completed. - * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed). - */ - public IgniteInternalFuture send(IgfsMessage msg) throws IgniteCheckedException; - - /** - * Sends given IGFS client message and asynchronously awaits for response. When IO detects response - * beginning for given message it stops reading data and passes input stream to closure which can read - * response in a specific way. - * - * @param msg Message to send. - * @param outBuf Output buffer. If {@code null}, the output buffer is not used. - * @param outOff Output buffer offset. - * @param outLen Output buffer length. - * @return Future that will be completed when response is returned from closure. - * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed). - */ - public IgniteInternalFuture send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, int outLen) - throws IgniteCheckedException; - - /** - * Sends given message and does not wait for response. - * - * @param msg Message to send. - * @throws IgniteCheckedException If send failed. - */ - public void sendPlain(IgfsMessage msg) throws IgniteCheckedException; - - /** - * Adds event listener that will be invoked when connection with server is lost or remote error has occurred. - * If connection is closed already, callback will be invoked synchronously inside this method. - * - * @param lsnr Event listener. - */ - public void addEventListener(HadoopIgfsIpcIoListener lsnr); - - /** - * Removes event listener that will be invoked when connection with server is lost or remote error has occurred. - * - * @param lsnr Event listener. - */ - public void removeEventListener(HadoopIgfsIpcIoListener lsnr); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java deleted file mode 100644 index b0a4135..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java +++ /dev/null @@ -1,624 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import java.io.BufferedOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.util.Collection; -import java.util.Iterator; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.commons.logging.Log; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.igfs.IgfsException; -import org.apache.ignite.internal.GridLoggerProxy; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.igfs.common.IgfsControlResponse; -import org.apache.ignite.internal.igfs.common.IgfsDataInputStream; -import org.apache.ignite.internal.igfs.common.IgfsDataOutputStream; -import org.apache.ignite.internal.igfs.common.IgfsIpcCommand; -import org.apache.ignite.internal.igfs.common.IgfsMarshaller; -import org.apache.ignite.internal.igfs.common.IgfsMessage; -import org.apache.ignite.internal.igfs.common.IgfsStreamControlRequest; -import org.apache.ignite.internal.util.GridConcurrentHashSet; -import org.apache.ignite.internal.util.GridStripedLock; -import org.apache.ignite.internal.util.ipc.IpcEndpoint; -import org.apache.ignite.internal.util.ipc.IpcEndpointFactory; -import org.apache.ignite.internal.util.ipc.shmem.IpcOutOfSystemResourcesException; -import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; - -/** - * IO layer implementation based on blocking IPC streams. - */ -@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") -public class HadoopIgfsIpcIo implements HadoopIgfsIo { - /** Logger. */ - private final Log log; - - /** Request futures map. */ - private ConcurrentMap reqMap = - new ConcurrentHashMap8<>(); - - /** Request ID counter. */ - private AtomicLong reqIdCnt = new AtomicLong(); - - /** Endpoint. */ - private IpcEndpoint endpoint; - - /** Endpoint output stream. */ - private IgfsDataOutputStream out; - - /** Protocol. */ - private final IgfsMarshaller marsh; - - /** Client reader thread. */ - private Thread reader; - - /** Lock for graceful shutdown. */ - private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); - - /** Stopping flag. */ - private volatile boolean stopping; - - /** Server endpoint address. */ - private final String endpointAddr; - - /** Number of open file system sessions. */ - private final AtomicInteger activeCnt = new AtomicInteger(1); - - /** Event listeners. */ - private final Collection lsnrs = - new GridConcurrentHashSet<>(); - - /** Cached connections. */ - private static final ConcurrentMap ipcCache = - new ConcurrentHashMap8<>(); - - /** Striped lock that prevents multiple instance creation in {@link #get(Log, String)}. */ - private static final GridStripedLock initLock = new GridStripedLock(32); - - /** - * @param endpointAddr Endpoint. - * @param marsh Protocol. - * @param log Logger to use. - */ - public HadoopIgfsIpcIo(String endpointAddr, IgfsMarshaller marsh, Log log) { - assert endpointAddr != null; - assert marsh != null; - - this.endpointAddr = endpointAddr; - this.marsh = marsh; - this.log = log; - } - - /** - * Returns a started and valid instance of this class - * for a given endpoint. - * - * @param log Logger to use for new instance. - * @param endpoint Endpoint string. - * @return New or existing cached instance, which is started and operational. - * @throws IOException If new instance was created but failed to start. - */ - public static HadoopIgfsIpcIo get(Log log, String endpoint) throws IOException { - while (true) { - HadoopIgfsIpcIo clientIo = ipcCache.get(endpoint); - - if (clientIo != null) { - if (clientIo.acquire()) - return clientIo; - else - // If concurrent close. - ipcCache.remove(endpoint, clientIo); - } - else { - Lock lock = initLock.getLock(endpoint); - - lock.lock(); - - try { - clientIo = ipcCache.get(endpoint); - - if (clientIo != null) { // Perform double check. - if (clientIo.acquire()) - return clientIo; - else - // If concurrent close. - ipcCache.remove(endpoint, clientIo); - } - - // Otherwise try creating a new one. - clientIo = new HadoopIgfsIpcIo(endpoint, new IgfsMarshaller(), log); - - try { - clientIo.start(); - } - catch (IgniteCheckedException e) { - throw new IOException(e.getMessage(), e); - } - - HadoopIgfsIpcIo old = ipcCache.putIfAbsent(endpoint, clientIo); - - // Put in exclusive lock. - assert old == null; - - return clientIo; - } - finally { - lock.unlock(); - } - } - } - } - - /** - * Increases usage count for this instance. - * - * @return {@code true} if usage count is greater than zero. - */ - private boolean acquire() { - while (true) { - int cnt = activeCnt.get(); - - if (cnt == 0) { - if (log.isDebugEnabled()) - log.debug("IPC IO not acquired (count was 0): " + this); - - return false; - } - - // Need to make sure that no-one decremented count in between. - if (activeCnt.compareAndSet(cnt, cnt + 1)) { - if (log.isDebugEnabled()) - log.debug("IPC IO acquired: " + this); - - return true; - } - } - } - - /** - * Releases this instance, decrementing usage count. - *

- * If usage count becomes zero, the instance is stopped - * and removed from cache. - */ - public void release() { - while (true) { - int cnt = activeCnt.get(); - - if (cnt == 0) { - if (log.isDebugEnabled()) - log.debug("IPC IO not released (count was 0): " + this); - - return; - } - - if (activeCnt.compareAndSet(cnt, cnt - 1)) { - if (cnt == 1) { - ipcCache.remove(endpointAddr, this); - - if (log.isDebugEnabled()) - log.debug("IPC IO stopping as unused: " + this); - - stop(); - } - else if (log.isDebugEnabled()) - log.debug("IPC IO released: " + this); - - return; - } - } - } - - /** - * Closes this IO instance, removing it from cache. - */ - public void forceClose() { - if (ipcCache.remove(endpointAddr, this)) - stop(); - } - - /** - * Starts the IO. - * - * @throws IgniteCheckedException If failed to connect the endpoint. - */ - private void start() throws IgniteCheckedException { - boolean success = false; - - try { - endpoint = IpcEndpointFactory.connectEndpoint( - endpointAddr, new GridLoggerProxy(new HadoopIgfsJclLogger(log), null, null, "")); - - out = new IgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream())); - - reader = new ReaderThread(); - - // Required for Hadoop 2.x - reader.setDaemon(true); - - reader.start(); - - success = true; - } - catch (IgniteCheckedException e) { - IpcOutOfSystemResourcesException resEx = e.getCause(IpcOutOfSystemResourcesException.class); - - if (resEx != null) - throw new IgniteCheckedException(IpcSharedMemoryServerEndpoint.OUT_OF_RESOURCES_MSG, resEx); - - throw e; - } - finally { - if (!success) - stop(); - } - } - - /** - * Shuts down the IO. No send requests will be accepted anymore, all pending futures will be failed. - * Close listeners will be invoked as if connection is closed by server. - */ - private void stop() { - close0(null); - - if (reader != null) { - try { - U.interrupt(reader); - U.join(reader); - - reader = null; - } - catch (IgniteInterruptedCheckedException ignored) { - Thread.currentThread().interrupt(); - - log.warn("Got interrupted while waiting for reader thread to shut down (will return)."); - } - } - } - - /** {@inheritDoc} */ - @Override public void addEventListener(HadoopIgfsIpcIoListener lsnr) { - if (!busyLock.readLock().tryLock()) { - lsnr.onClose(); - - return; - } - - boolean invokeNow = false; - - try { - invokeNow = stopping; - - if (!invokeNow) - lsnrs.add(lsnr); - } - finally { - busyLock.readLock().unlock(); - - if (invokeNow) - lsnr.onClose(); - } - } - - /** {@inheritDoc} */ - @Override public void removeEventListener(HadoopIgfsIpcIoListener lsnr) { - lsnrs.remove(lsnr); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture send(IgfsMessage msg) throws IgniteCheckedException { - return send(msg, null, 0, 0); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, - int outLen) throws IgniteCheckedException { - assert outBuf == null || msg.command() == IgfsIpcCommand.READ_BLOCK; - - if (!busyLock.readLock().tryLock()) - throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently " + - "closed)."); - - try { - if (stopping) - throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently " + - "closed)."); - - long reqId = reqIdCnt.getAndIncrement(); - - HadoopIgfsFuture fut = new HadoopIgfsFuture<>(); - - fut.outputBuffer(outBuf); - fut.outputOffset(outOff); - fut.outputLength(outLen); - fut.read(msg.command() == IgfsIpcCommand.READ_BLOCK); - - HadoopIgfsFuture oldFut = reqMap.putIfAbsent(reqId, fut); - - assert oldFut == null; - - if (log.isDebugEnabled()) - log.debug("Sending IGFS message [reqId=" + reqId + ", msg=" + msg + ']'); - - byte[] hdr = IgfsMarshaller.createHeader(reqId, msg.command()); - - IgniteCheckedException err = null; - - try { - synchronized (this) { - marsh.marshall(msg, hdr, out); - - out.flush(); // Blocking operation + sometimes system call. - } - } - catch (IgniteCheckedException e) { - err = e; - } - catch (IOException e) { - err = new HadoopIgfsCommunicationException(e); - } - - if (err != null) { - reqMap.remove(reqId, fut); - - fut.onDone(err); - } - - return fut; - } - finally { - busyLock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public void sendPlain(IgfsMessage msg) throws IgniteCheckedException { - if (!busyLock.readLock().tryLock()) - throw new HadoopIgfsCommunicationException("Failed to send message (client is being " + - "concurrently closed)."); - - try { - if (stopping) - throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently closed)."); - - assert msg.command() == IgfsIpcCommand.WRITE_BLOCK; - - IgfsStreamControlRequest req = (IgfsStreamControlRequest)msg; - - byte[] hdr = IgfsMarshaller.createHeader(-1, IgfsIpcCommand.WRITE_BLOCK); - - U.longToBytes(req.streamId(), hdr, 12); - U.intToBytes(req.length(), hdr, 20); - - synchronized (this) { - out.write(hdr); - out.write(req.data(), (int)req.position(), req.length()); - - out.flush(); - } - } - catch (IOException e) { - throw new HadoopIgfsCommunicationException(e); - } - finally { - busyLock.readLock().unlock(); - } - } - - /** - * Closes client but does not wait. - * - * @param err Error. - */ - private void close0(@Nullable Throwable err) { - busyLock.writeLock().lock(); - - try { - if (stopping) - return; - - stopping = true; - } - finally { - busyLock.writeLock().unlock(); - } - - if (err == null) - err = new IgniteCheckedException("Failed to perform request (connection was concurrently closed before response " + - "is received)."); - - // Clean up resources. - U.closeQuiet(out); - - if (endpoint != null) - endpoint.close(); - - // Unwind futures. We can safely iterate here because no more futures will be added. - Iterator it = reqMap.values().iterator(); - - while (it.hasNext()) { - HadoopIgfsFuture fut = it.next(); - - fut.onDone(err); - - it.remove(); - } - - for (HadoopIgfsIpcIoListener lsnr : lsnrs) - lsnr.onClose(); - } - - /** - * Do not extend {@code GridThread} to minimize class dependencies. - */ - private class ReaderThread extends Thread { - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void run() { - // Error to fail pending futures. - Throwable err = null; - - try { - InputStream in = endpoint.inputStream(); - - IgfsDataInputStream dis = new IgfsDataInputStream(in); - - byte[] hdr = new byte[IgfsMarshaller.HEADER_SIZE]; - byte[] msgHdr = new byte[IgfsControlResponse.RES_HEADER_SIZE]; - - while (!Thread.currentThread().isInterrupted()) { - dis.readFully(hdr); - - long reqId = U.bytesToLong(hdr, 0); - - // We don't wait for write responses, therefore reqId is -1. - if (reqId == -1) { - // We received a response which normally should not be sent. It must contain an error. - dis.readFully(msgHdr); - - assert msgHdr[4] != 0; - - String errMsg = dis.readUTF(); - - // Error code. - dis.readInt(); - - long streamId = dis.readLong(); - - for (HadoopIgfsIpcIoListener lsnr : lsnrs) - lsnr.onError(streamId, errMsg); - } - else { - HadoopIgfsFuture fut = reqMap.remove(reqId); - - if (fut == null) { - String msg = "Failed to read response from server: response closure is unavailable for " + - "requestId (will close connection):" + reqId; - - log.warn(msg); - - err = new IgniteCheckedException(msg); - - break; - } - else { - try { - IgfsIpcCommand cmd = IgfsIpcCommand.valueOf(U.bytesToInt(hdr, 8)); - - if (log.isDebugEnabled()) - log.debug("Received IGFS response [reqId=" + reqId + ", cmd=" + cmd + ']'); - - Object res = null; - - if (fut.read()) { - dis.readFully(msgHdr); - - boolean hasErr = msgHdr[4] != 0; - - if (hasErr) { - String errMsg = dis.readUTF(); - - // Error code. - Integer errCode = dis.readInt(); - - IgfsControlResponse.throwError(errCode, errMsg); - } - - int blockLen = U.bytesToInt(msgHdr, 5); - - int readLen = Math.min(blockLen, fut.outputLength()); - - if (readLen > 0) { - assert fut.outputBuffer() != null; - - dis.readFully(fut.outputBuffer(), fut.outputOffset(), readLen); - } - - if (readLen != blockLen) { - byte[] buf = new byte[blockLen - readLen]; - - dis.readFully(buf); - - res = buf; - } - } - else - res = marsh.unmarshall(cmd, hdr, dis); - - fut.onDone(res); - } - catch (IgfsException | IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to apply response closure (will fail request future): " + - e.getMessage()); - - fut.onDone(e); - - err = e; - } - catch (Throwable t) { - fut.onDone(t); - - throw t; - } - } - } - } - } - catch (EOFException ignored) { - err = new IgniteCheckedException("Failed to read response from server (connection was closed by remote peer)."); - } - catch (IOException e) { - if (!stopping) - log.error("Failed to read data (connection will be closed)", e); - - err = new HadoopIgfsCommunicationException(e); - } - catch (Throwable e) { - if (!stopping) - log.error("Failed to obtain endpoint input stream (connection will be closed)", e); - - err = e; - - if (e instanceof Error) - throw (Error)e; - } - finally { - close0(err); - } - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return getClass().getSimpleName() + " [endpointAddr=" + endpointAddr + ", activeCnt=" + activeCnt + - ", stopping=" + stopping + ']'; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIoListener.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIoListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIoListener.java deleted file mode 100644 index c26e896..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIoListener.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -/** - * Listens to the events of {@link HadoopIgfsIpcIo}. - */ -public interface HadoopIgfsIpcIoListener { - /** - * Callback invoked when the IO is being closed. - */ - public void onClose(); - - /** - * Callback invoked when remote error occurs. - * - * @param streamId Stream ID. - * @param errMsg Error message. - */ - public void onError(long streamId, String errMsg); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java deleted file mode 100644 index 3a7f45b..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.igfs; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.jetbrains.annotations.Nullable; - -/** - * JCL logger wrapper for Hadoop. - */ -public class HadoopIgfsJclLogger implements IgniteLogger { - /** JCL implementation proxy. */ - @GridToStringInclude - private Log impl; - - /** - * Constructor. - * - * @param impl JCL implementation to use. - */ - HadoopIgfsJclLogger(Log impl) { - assert impl != null; - - this.impl = impl; - } - - /** {@inheritDoc} */ - @Override public IgniteLogger getLogger(Object ctgr) { - return new HadoopIgfsJclLogger(LogFactory.getLog( - ctgr instanceof Class ? ((Class)ctgr).getName() : String.valueOf(ctgr))); - } - - /** {@inheritDoc} */ - @Override public void trace(String msg) { - impl.trace(msg); - } - - /** {@inheritDoc} */ - @Override public void debug(String msg) { - impl.debug(msg); - } - - /** {@inheritDoc} */ - @Override public void info(String msg) { - impl.info(msg); - } - - /** {@inheritDoc} */ - @Override public void warning(String msg) { - impl.warn(msg); - } - - /** {@inheritDoc} */ - @Override public void warning(String msg, @Nullable Throwable e) { - impl.warn(msg, e); - } - - /** {@inheritDoc} */ - @Override public void error(String msg) { - impl.error(msg); - } - - /** {@inheritDoc} */ - @Override public boolean isQuiet() { - return !isInfoEnabled() && !isDebugEnabled(); - } - - /** {@inheritDoc} */ - @Override public void error(String msg, @Nullable Throwable e) { - impl.error(msg, e); - } - - /** {@inheritDoc} */ - @Override public boolean isTraceEnabled() { - return impl.isTraceEnabled(); - } - - /** {@inheritDoc} */ - @Override public boolean isDebugEnabled() { - return impl.isDebugEnabled(); - } - - /** {@inheritDoc} */ - @Override public boolean isInfoEnabled() { - return impl.isInfoEnabled(); - } - - /** {@inheritDoc} */ - @Nullable @Override public String fileName() { - return null; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopIgfsJclLogger.class, this); - } -} \ No newline at end of file