Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8D975200BAD for ; Mon, 19 Sep 2016 12:27:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8C599160ABC; Mon, 19 Sep 2016 10:27:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2E99D160AFD for ; Mon, 19 Sep 2016 12:27:00 +0200 (CEST) Received: (qmail 43641 invoked by uid 500); 19 Sep 2016 10:26:55 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 42354 invoked by uid 99); 19 Sep 2016 10:26:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Sep 2016 10:26:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 97A31E058E; Mon, 19 Sep 2016 10:26:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Mon, 19 Sep 2016 10:27:37 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [47/51] [abbrv] [partial] ignite git commit: IGNITE-3916: Created separate module. archived-at: Mon, 19 Sep 2016 10:27:04 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java new file mode 100644 index 0000000..2484492 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumFs; +import org.apache.hadoop.fs.DelegateToFileSystem; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.local.LocalConfigKeys; + +import static org.apache.hadoop.fs.FsConstants.LOCAL_FS_URI; + +/** + * Local file system replacement for Hadoop jobs. + */ +public class HadoopLocalFileSystemV2 extends ChecksumFs { + /** + * Creates new local file system. + * + * @param cfg Configuration. + * @throws IOException If failed. + * @throws URISyntaxException If failed. + */ + public HadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException { + super(new DelegateFS(cfg)); + } + + /** + * Creates new local file system. + * + * @param uri URI. + * @param cfg Configuration. + * @throws IOException If failed. + * @throws URISyntaxException If failed. + */ + public HadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException { + this(cfg); + } + + /** + * Delegate file system. + */ + private static class DelegateFS extends DelegateToFileSystem { + /** + * Creates new local file system. + * + * @param cfg Configuration. + * @throws IOException If failed. + * @throws URISyntaxException If failed. + */ + public DelegateFS(Configuration cfg) throws IOException, URISyntaxException { + super(LOCAL_FS_URI, new HadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false); + } + + /** {@inheritDoc} */ + @Override public int getUriDefaultPort() { + return -1; + } + + /** {@inheritDoc} */ + @Override public FsServerDefaults getServerDefaults() throws IOException { + return LocalConfigKeys.getServerDefaults(); + } + + /** {@inheritDoc} */ + @Override public boolean isValidName(String src) { + return true; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java new file mode 100644 index 0000000..0aac4a3 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopParameters.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +/** + * This class lists parameters that can be specified in Hadoop configuration. + * Hadoop configuration can be specified in {@code core-site.xml} file + * or passed to map-reduce task directly when using Hadoop driver for IGFS file system: + *
    + *
  • + * {@code fs.igfs.[name].open.sequential_reads_before_prefetch} - this parameter overrides + * the one specified in {@link org.apache.ignite.configuration.FileSystemConfiguration#getSequentialReadsBeforePrefetch()} + * IGFS data node configuration property. + *
  • + *
  • + * {@code fs.igfs.[name].log.enabled} - specifies whether IGFS sampling logger is enabled. If + * {@code true}, then all file system operations will be logged to a file. + *
  • + *
  • {@code fs.igfs.[name].log.dir} - specifies log directory where sampling log files should be placed.
  • + *
  • + * {@code fs.igfs.[name].log.batch_size} - specifies how many log entries are accumulated in a batch before + * it gets flushed to log file. Higher values will imply greater performance, but will increase delay + * before record appears in the log file. + *
  • + *
  • + * {@code fs.igfs.[name].colocated.writes} - specifies whether written files should be colocated on data + * node to which client is connected. If {@code true}, file will not be distributed and will be written + * to a single data node. Default value is {@code true}. + *
  • + *
  • + * {@code fs.igfs.prefer.local.writes} - specifies whether file preferably should be written to + * local data node if it has enough free space. After some time it can be redistributed across nodes though. + *
  • + *
+ * Where {@code [name]} is file system endpoint which you specify in file system URI authority part. E.g. in + * case your file system URI is {@code igfs://127.0.0.1:10500} then {@code name} will be {@code 127.0.0.1:10500}. + *

+ * Sample configuration that can be placed to {@code core-site.xml} file: + *

+ *     <property>
+ *         <name>fs.igfs.127.0.0.1:10500.log.enabled</name>
+ *         <value>true</value>
+ *     </property>
+ *     <property>
+ *         <name>fs.igfs.127.0.0.1:10500.log.dir</name>
+ *         <value>/home/apache/ignite/log/sampling</value>
+ *     </property>
+ *     <property>
+ *         <name>fs.igfs.127.0.0.1:10500.log.batch_size</name>
+ *         <value>16</value>
+ *     </property>
+ * 
+ * Parameters could also be specified per mapreduce job, e.g. + *
+ * hadoop jar myjarfile.jar MyMapReduceJob -Dfs.igfs.open.sequential_reads_before_prefetch=4
+ * 
+ * If you want to use these parameters in code, then you have to substitute you file system name in it. The easiest + * way to do that is {@code String.format(PARAM_IGFS_COLOCATED_WRITES, [name])}. + */ +public class HadoopParameters { + /** Parameter name for control over file colocation write mode. */ + public static final String PARAM_IGFS_COLOCATED_WRITES = "fs.igfs.%s.colocated.writes"; + + /** Parameter name for custom sequential reads before prefetch value. */ + public static final String PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH = + "fs.igfs.%s.open.sequential_reads_before_prefetch"; + + /** Parameter name for client logger directory. */ + public static final String PARAM_IGFS_LOG_DIR = "fs.igfs.%s.log.dir"; + + /** Parameter name for log batch size. */ + public static final String PARAM_IGFS_LOG_BATCH_SIZE = "fs.igfs.%s.log.batch_size"; + + /** Parameter name for log enabled flag. */ + public static final String PARAM_IGFS_LOG_ENABLED = "fs.igfs.%s.log.enabled"; + + /** Parameter name for prefer local writes flag. */ + public static final String PARAM_IGFS_PREFER_LOCAL_WRITES = "fs.igfs.prefer.local.writes"; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java new file mode 100644 index 0000000..b8fc8e7 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.net.URI; +import java.nio.file.Files; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsConstants; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Local file system implementation for Hadoop. + */ +public class HadoopRawLocalFileSystem extends FileSystem { + /** Working directory for each thread. */ + private final ThreadLocal workDir = new ThreadLocal() { + @Override protected Path initialValue() { + return getInitialWorkingDirectory(); + } + }; + + /** + * Converts Hadoop path to local path. + * + * @param path Hadoop path. + * @return Local path. + */ + File convert(Path path) { + checkPath(path); + + if (path.isAbsolute()) + return new File(path.toUri().getPath()); + + return new File(getWorkingDirectory().toUri().getPath(), path.toUri().getPath()); + } + + /** {@inheritDoc} */ + @Override public Path getHomeDirectory() { + return makeQualified(new Path(System.getProperty("user.home"))); + } + + /** {@inheritDoc} */ + @Override public Path getInitialWorkingDirectory() { + File f = new File(System.getProperty("user.dir")); + + return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), null); + } + + /** {@inheritDoc} */ + @Override public void initialize(URI uri, Configuration conf) throws IOException { + super.initialize(uri, conf); + + setConf(conf); + + String initWorkDir = conf.get(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP); + + if (initWorkDir != null) + setWorkingDirectory(new Path(initWorkDir)); + } + + /** {@inheritDoc} */ + @Override public URI getUri() { + return FsConstants.LOCAL_FS_URI; + } + + /** {@inheritDoc} */ + @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return new FSDataInputStream(new InStream(checkExists(convert(f)))); + } + + /** {@inheritDoc} */ + @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufSize, + short replication, long blockSize, Progressable progress) throws IOException { + File file = convert(f); + + if (!overwrite && !file.createNewFile()) + throw new IOException("Failed to create new file: " + f.toUri()); + + return out(file, false, bufSize); + } + + /** + * @param file File. + * @param append Append flag. + * @return Output stream. + * @throws IOException If failed. + */ + private FSDataOutputStream out(File file, boolean append, int bufSize) throws IOException { + return new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(file, append), + bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new Statistics(getUri().getScheme())); + } + + /** {@inheritDoc} */ + @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException { + return out(convert(f), true, bufSize); + } + + /** {@inheritDoc} */ + @Override public boolean rename(Path src, Path dst) throws IOException { + return convert(src).renameTo(convert(dst)); + } + + /** {@inheritDoc} */ + @Override public boolean delete(Path f, boolean recursive) throws IOException { + File file = convert(f); + + if (file.isDirectory() && !recursive) + throw new IOException("Failed to remove directory in non recursive mode: " + f.toUri()); + + return U.delete(file); + } + + /** {@inheritDoc} */ + @Override public void setWorkingDirectory(Path dir) { + workDir.set(fixRelativePart(dir)); + + checkPath(dir); + } + + /** {@inheritDoc} */ + @Override public Path getWorkingDirectory() { + return workDir.get(); + } + + /** {@inheritDoc} */ + @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException { + if(f == null) + throw new IllegalArgumentException("mkdirs path arg is null"); + + Path parent = f.getParent(); + + File p2f = convert(f); + + if(parent != null) { + File parent2f = convert(parent); + + if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) + throw new FileAlreadyExistsException("Parent path is not a directory: " + parent); + + } + + return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory()); + } + + /** {@inheritDoc} */ + @Override public FileStatus getFileStatus(Path f) throws IOException { + return fileStatus(checkExists(convert(f))); + } + + /** + * @return File status. + */ + private FileStatus fileStatus(File file) throws IOException { + boolean dir = file.isDirectory(); + + java.nio.file.Path path = dir ? null : file.toPath(); + + return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, file.lastModified(), file.lastModified(), + /*permission*/null, /*owner*/null, /*group*/null, dir ? null : Files.isSymbolicLink(path) ? + new Path(Files.readSymbolicLink(path).toUri()) : null, new Path(file.toURI())); + } + + /** + * @param file File. + * @return Same file. + * @throws FileNotFoundException If does not exist. + */ + private static File checkExists(File file) throws FileNotFoundException { + if (!file.exists()) + throw new FileNotFoundException("File " + file.getAbsolutePath() + " does not exist."); + + return file; + } + + /** {@inheritDoc} */ + @Override public FileStatus[] listStatus(Path f) throws IOException { + File file = convert(f); + + if (checkExists(file).isFile()) + return new FileStatus[] {fileStatus(file)}; + + File[] files = file.listFiles(); + + FileStatus[] res = new FileStatus[files.length]; + + for (int i = 0; i < res.length; i++) + res[i] = fileStatus(files[i]); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean supportsSymlinks() { + return true; + } + + /** {@inheritDoc} */ + @Override public void createSymlink(Path target, Path link, boolean createParent) throws IOException { + Files.createSymbolicLink(convert(link).toPath(), convert(target).toPath()); + } + + /** {@inheritDoc} */ + @Override public FileStatus getFileLinkStatus(Path f) throws IOException { + return getFileStatus(getLinkTarget(f)); + } + + /** {@inheritDoc} */ + @Override public Path getLinkTarget(Path f) throws IOException { + File file = Files.readSymbolicLink(convert(f).toPath()).toFile(); + + return new Path(file.toURI()); + } + + /** + * Input stream. + */ + private static class InStream extends InputStream implements Seekable, PositionedReadable { + /** */ + private final RandomAccessFile file; + + /** + * @param f File. + * @throws IOException If failed. + */ + public InStream(File f) throws IOException { + file = new RandomAccessFile(f, "r"); + } + + /** {@inheritDoc} */ + @Override public synchronized int read() throws IOException { + return file.read(); + } + + /** {@inheritDoc} */ + @Override public synchronized int read(byte[] b, int off, int len) throws IOException { + return file.read(b, off, len); + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + file.close(); + } + + /** {@inheritDoc} */ + @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException { + long pos0 = file.getFilePointer(); + + file.seek(pos); + int res = file.read(buf, off, len); + + file.seek(pos0); + + return res; + } + + /** {@inheritDoc} */ + @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException { + if (read(pos, buf, off, len) != len) + throw new IOException(); + } + + /** {@inheritDoc} */ + @Override public void readFully(long pos, byte[] buf) throws IOException { + readFully(pos, buf, 0, buf.length); + } + + /** {@inheritDoc} */ + @Override public synchronized void seek(long pos) throws IOException { + file.seek(pos); + } + + /** {@inheritDoc} */ + @Override public synchronized long getPos() throws IOException { + return file.getFilePointer(); + } + + /** {@inheritDoc} */ + @Override public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java new file mode 100644 index 0000000..fe43596 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfs.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsPathSummary; +import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; +import org.apache.ignite.internal.processors.igfs.IgfsStatus; +import org.jetbrains.annotations.Nullable; + +/** + * Facade for communication with grid. + */ +public interface HadoopIgfs { + /** + * Perform handshake. + * + * @param logDir Log directory. + * @return Future with handshake result. + * @throws IgniteCheckedException If failed. + */ + public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException, IOException; + + /** + * Close connection. + * + * @param force Force flag. + */ + public void close(boolean force); + + /** + * Command to retrieve file info for some IGFS path. + * + * @param path Path to get file info for. + * @return Future for info operation. + * @throws IgniteCheckedException If failed. + */ + public IgfsFile info(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to update file properties. + * + * @param path IGFS path to update properties. + * @param props Properties to update. + * @return Future for update operation. + * @throws IgniteCheckedException If failed. + */ + public IgfsFile update(IgfsPath path, Map props) throws IgniteCheckedException, IOException; + + /** + * Sets last access time and last modification time for a file. + * + * @param path Path to update times. + * @param accessTime Last access time to set. + * @param modificationTime Last modification time to set. + * @throws IgniteCheckedException If failed. + */ + public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException, + IOException; + + /** + * Command to rename given path. + * + * @param src Source path. + * @param dest Destination path. + * @return Future for rename operation. + * @throws IgniteCheckedException If failed. + */ + public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException, IOException; + + /** + * Command to delete given path. + * + * @param path Path to delete. + * @param recursive {@code True} if deletion is recursive. + * @return Future for delete operation. + * @throws IgniteCheckedException If failed. + */ + public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException, IOException; + + /** + * Command to get affinity for given path, offset and length. + * + * @param path Path to get affinity for. + * @param start Start position (offset). + * @param len Data length. + * @return Future for affinity command. + * @throws IgniteCheckedException If failed. + */ + public Collection affinity(IgfsPath path, long start, long len) throws IgniteCheckedException, + IOException; + + /** + * Gets path summary. + * + * @param path Path to get summary for. + * @return Future that will be completed when summary is received. + * @throws IgniteCheckedException If failed. + */ + public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to create directories. + * + * @param path Path to create. + * @return Future for mkdirs operation. + * @throws IgniteCheckedException If failed. + */ + public Boolean mkdirs(IgfsPath path, Map props) throws IgniteCheckedException, IOException; + + /** + * Command to get list of files in directory. + * + * @param path Path to list. + * @return Future for listFiles operation. + * @throws IgniteCheckedException If failed. + */ + public Collection listFiles(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to get directory listing. + * + * @param path Path to list. + * @return Future for listPaths operation. + * @throws IgniteCheckedException If failed. + */ + public Collection listPaths(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Performs status request. + * + * @return Status response. + * @throws IgniteCheckedException If failed. + */ + public IgfsStatus fsStatus() throws IgniteCheckedException, IOException; + + /** + * Command to open file for reading. + * + * @param path File path to open. + * @return Future for open operation. + * @throws IgniteCheckedException If failed. + */ + public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException, IOException; + + /** + * Command to open file for reading. + * + * @param path File path to open. + * @return Future for open operation. + * @throws IgniteCheckedException If failed. + */ + public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) throws IgniteCheckedException, + IOException; + + /** + * Command to create file and open it for output. + * + * @param path Path to file. + * @param overwrite If {@code true} then old file contents will be lost. + * @param colocate If {@code true} and called on data node, file will be written on that node. + * @param replication Replication factor. + * @param props File properties for creation. + * @return Stream descriptor. + * @throws IgniteCheckedException If failed. + */ + public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, + int replication, long blockSize, @Nullable Map props) throws IgniteCheckedException, IOException; + + /** + * Open file for output appending data to the end of a file. + * + * @param path Path to file. + * @param create If {@code true}, file will be created if does not exist. + * @param props File properties. + * @return Stream descriptor. + * @throws IgniteCheckedException If failed. + */ + public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create, + @Nullable Map props) throws IgniteCheckedException, IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java new file mode 100644 index 0000000..d610091 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsCommunicationException.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.ignite.IgniteCheckedException; + +/** + * Communication exception indicating a problem between file system and IGFS instance. + */ +public class HadoopIgfsCommunicationException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Creates new exception with given throwable as a nested cause and + * source of error message. + * + * @param cause Non-null throwable cause. + */ + public HadoopIgfsCommunicationException(Exception cause) { + super(cause); + } + + /** + * Creates a new exception with given error message and optional nested cause exception. + * + * @param msg Error message. + */ + public HadoopIgfsCommunicationException(String msg) { + super(msg); + } + + /** + * Creates a new exception with given error message and optional nested cause exception. + * + * @param msg Error message. + * @param cause Cause. + */ + public HadoopIgfsCommunicationException(String msg, Exception cause) { + super(msg, cause); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java new file mode 100644 index 0000000..014e2a1 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import java.io.IOException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.jetbrains.annotations.Nullable; + +/** + * Extended IGFS server interface. + */ +public interface HadoopIgfsEx extends HadoopIgfs { + /** + * Adds event listener that will be invoked when connection with server is lost or remote error has occurred. + * If connection is closed already, callback will be invoked synchronously inside this method. + * + * @param delegate Stream delegate. + * @param lsnr Event listener. + */ + public void addEventListener(HadoopIgfsStreamDelegate delegate, HadoopIgfsStreamEventListener lsnr); + + /** + * Removes event listener that will be invoked when connection with server is lost or remote error has occurred. + * + * @param delegate Stream delegate. + */ + public void removeEventListener(HadoopIgfsStreamDelegate delegate); + + /** + * Asynchronously reads specified amount of bytes from opened input stream. + * + * @param delegate Stream delegate. + * @param pos Position to read from. + * @param len Data length to read. + * @param outBuf Optional output buffer. If buffer length is less then {@code len}, all remaining + * bytes will be read into new allocated buffer of length {len - outBuf.length} and this buffer will + * be the result of read future. + * @param outOff Output offset. + * @param outLen Output length. + * @return Read data. + */ + public IgniteInternalFuture readData(HadoopIgfsStreamDelegate delegate, long pos, int len, + @Nullable final byte[] outBuf, final int outOff, final int outLen); + + /** + * Writes data to the stream with given streamId. This method does not return any future since + * no response to write request is sent. + * + * @param delegate Stream delegate. + * @param data Data to write. + * @param off Offset. + * @param len Length. + * @throws IOException If failed. + */ + public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int off, int len) throws IOException; + + /** + * Close server stream. + * + * @param delegate Stream delegate. + * @throws IOException If failed. + */ + public void closeStream(HadoopIgfsStreamDelegate delegate) throws IOException; + + /** + * Flush output stream. + * + * @param delegate Stream delegate. + * @throws IOException If failed. + */ + public void flush(HadoopIgfsStreamDelegate delegate) throws IOException; + + /** + * The user this Igfs instance works on behalf of. + * @return the user name. + */ + public String user(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java new file mode 100644 index 0000000..5ff1b2e --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsFuture.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.jetbrains.annotations.Nullable; + +/** + * IGFS client future that holds response parse closure. + */ +public class HadoopIgfsFuture extends GridFutureAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** Output buffer. */ + private byte[] outBuf; + + /** Output offset. */ + private int outOff; + + /** Output length. */ + private int outLen; + + /** Read future flag. */ + private boolean read; + + /** + * @return Output buffer. + */ + public byte[] outputBuffer() { + return outBuf; + } + + /** + * @param outBuf Output buffer. + */ + public void outputBuffer(@Nullable byte[] outBuf) { + this.outBuf = outBuf; + } + + /** + * @return Offset in output buffer to write from. + */ + public int outputOffset() { + return outOff; + } + + /** + * @param outOff Offset in output buffer to write from. + */ + public void outputOffset(int outOff) { + this.outOff = outOff; + } + + /** + * @return Length to write to output buffer. + */ + public int outputLength() { + return outLen; + } + + /** + * @param outLen Length to write to output buffer. + */ + public void outputLength(int outLen) { + this.outLen = outLen; + } + + /** + * @param read {@code True} if this is a read future. + */ + public void read(boolean read) { + this.read = read; + } + + /** + * @return {@code True} if this is a read future. + */ + public boolean read() { + return read; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java new file mode 100644 index 0000000..3220538 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.logging.Log; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.igfs.IgfsBlockLocation; +import org.apache.ignite.igfs.IgfsFile; +import org.apache.ignite.igfs.IgfsInputStream; +import org.apache.ignite.igfs.IgfsOutputStream; +import org.apache.ignite.igfs.IgfsPath; +import org.apache.ignite.igfs.IgfsPathSummary; +import org.apache.ignite.igfs.IgfsUserContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.igfs.IgfsEx; +import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse; +import org.apache.ignite.internal.processors.igfs.IgfsStatus; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.lang.IgniteOutClosure; +import org.jetbrains.annotations.Nullable; + +/** + * Communication with grid in the same process. + */ +public class HadoopIgfsInProc implements HadoopIgfsEx { + /** Target IGFS. */ + private final IgfsEx igfs; + + /** Buffer size. */ + private final int bufSize; + + /** Event listeners. */ + private final Map lsnrs = + new ConcurrentHashMap<>(); + + /** Logger. */ + private final Log log; + + /** The user this Igfs works on behalf of. */ + private final String user; + + /** + * Constructor. + * + * @param igfs Target IGFS. + * @param log Log. + */ + public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteCheckedException { + this.user = IgfsUtils.fixUserName(userName); + + this.igfs = igfs; + + this.log = log; + + bufSize = igfs.configuration().getBlockSize() * 2; + } + + /** {@inheritDoc} */ + @Override public IgfsHandshakeResponse handshake(final String logDir) { + return IgfsUserContext.doAs(user, new IgniteOutClosure() { + @Override public IgfsHandshakeResponse apply() { + igfs.clientLogDirectory(logDir); + + return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(), + igfs.globalSampling()); + } + }); + } + + /** {@inheritDoc} */ + @Override public void close(boolean force) { + // Perform cleanup. + for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) { + try { + lsnr.onClose(); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to notify stream event listener", e); + } + } + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(final IgfsPath path) throws IgniteCheckedException { + try { + return IgfsUserContext.doAs(user, new IgniteOutClosure() { + @Override public IgfsFile apply() { + return igfs.info(path); + } + }); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to get file info because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsFile update(final IgfsPath path, final Map props) throws IgniteCheckedException { + try { + return IgfsUserContext.doAs(user, new IgniteOutClosure() { + @Override public IgfsFile apply() { + return igfs.update(path, props); + } + }); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to update file because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) throws IgniteCheckedException { + try { + IgfsUserContext.doAs(user, new IgniteOutClosure() { + @Override public Void apply() { + igfs.setTimes(path, accessTime, modificationTime); + + return null; + } + }); + + return true; + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to set path times because Grid is stopping: " + + path); + } + } + + /** {@inheritDoc} */ + @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IgniteCheckedException { + try { + IgfsUserContext.doAs(user, new IgniteOutClosure() { + @Override public Void apply() { + igfs.rename(src, dest); + + return null; + } + }); + + return true; + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to rename path because Grid is stopping: " + src); + } + } + + /** {@inheritDoc} */ + @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IgniteCheckedException { + try { + return IgfsUserContext.doAs(user, new IgniteOutClosure() { + @Override public Boolean apply() { + return igfs.delete(path, recursive); + } + }); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to delete path because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsStatus fsStatus() throws IgniteCheckedException { + try { + return IgfsUserContext.doAs(user, new Callable() { + @Override public IgfsStatus call() throws IgniteCheckedException { + return igfs.globalSpace(); + } + }); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to get file system status because Grid is " + + "stopping."); + } + catch (IgniteCheckedException | RuntimeException | Error e) { + throw e; + } + catch (Exception e) { + throw new AssertionError("Must never go there."); + } + } + + /** {@inheritDoc} */ + @Override public Collection listPaths(final IgfsPath path) throws IgniteCheckedException { + try { + return IgfsUserContext.doAs(user, new IgniteOutClosure>() { + @Override public Collection apply() { + return igfs.listPaths(path); + } + }); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to list paths because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public Collection listFiles(final IgfsPath path) throws IgniteCheckedException { + try { + return IgfsUserContext.doAs(user, new IgniteOutClosure>() { + @Override public Collection apply() { + return igfs.listFiles(path); + } + }); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to list files because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public Boolean mkdirs(final IgfsPath path, final Map props) throws IgniteCheckedException { + try { + IgfsUserContext.doAs(user, new IgniteOutClosure() { + @Override public Void apply() { + igfs.mkdirs(path, props); + + return null; + } + }); + + return true; + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to create directory because Grid is stopping: " + + path); + } + } + + /** {@inheritDoc} */ + @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IgniteCheckedException { + try { + return IgfsUserContext.doAs(user, new IgniteOutClosure() { + @Override public IgfsPathSummary apply() { + return igfs.summary(path); + } + }); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to get content summary because Grid is stopping: " + + path); + } + } + + /** {@inheritDoc} */ + @Override public Collection affinity(final IgfsPath path, final long start, final long len) + throws IgniteCheckedException { + try { + return IgfsUserContext.doAs(user, new IgniteOutClosure>() { + @Override public Collection apply() { + return igfs.affinity(path, start, len); + } + }); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to get affinity because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IgniteCheckedException { + try { + return IgfsUserContext.doAs(user, new IgniteOutClosure() { + @Override public HadoopIgfsStreamDelegate apply() { + IgfsInputStream stream = igfs.open(path, bufSize); + + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.length()); + } + }); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to open file because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch) + throws IgniteCheckedException { + try { + return IgfsUserContext.doAs(user, new IgniteOutClosure() { + @Override public HadoopIgfsStreamDelegate apply() { + IgfsInputStream stream = igfs.open(path, bufSize, seqReadsBeforePrefetch); + + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.length()); + } + }); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to open file because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, final boolean colocate, + final int replication, final long blockSize, final @Nullable Map props) throws IgniteCheckedException { + try { + return IgfsUserContext.doAs(user, new IgniteOutClosure() { + @Override public HadoopIgfsStreamDelegate apply() { + IgfsOutputStream stream = igfs.create(path, bufSize, overwrite, + colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props); + + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream); + } + }); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to create file because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create, + final @Nullable Map props) throws IgniteCheckedException { + try { + return IgfsUserContext.doAs(user, new IgniteOutClosure() { + @Override public HadoopIgfsStreamDelegate apply() { + IgfsOutputStream stream = igfs.append(path, bufSize, create, props); + + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream); + } + }); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + catch (IllegalStateException e) { + throw new HadoopIgfsCommunicationException("Failed to append file because Grid is stopping: " + path); + } + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture readData(HadoopIgfsStreamDelegate delegate, long pos, int len, + @Nullable byte[] outBuf, int outOff, int outLen) { + IgfsInputStream stream = delegate.target(); + + try { + byte[] res = null; + + if (outBuf != null) { + int outTailLen = outBuf.length - outOff; + + if (len <= outTailLen) + stream.readFully(pos, outBuf, outOff, len); + else { + stream.readFully(pos, outBuf, outOff, outTailLen); + + int remainderLen = len - outTailLen; + + res = new byte[remainderLen]; + + stream.readFully(pos, res, 0, remainderLen); + } + } else { + res = new byte[len]; + + stream.readFully(pos, res, 0, len); + } + + return new GridFinishedFuture<>(res); + } + catch (IllegalStateException | IOException e) { + HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate); + + if (lsnr != null) + lsnr.onError(e.getMessage()); + + return new GridFinishedFuture<>(e); + } + } + + /** {@inheritDoc} */ + @Override public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int off, int len) + throws IOException { + try { + IgfsOutputStream stream = delegate.target(); + + stream.write(data, off, len); + } + catch (IllegalStateException | IOException e) { + HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate); + + if (lsnr != null) + lsnr.onError(e.getMessage()); + + if (e instanceof IllegalStateException) + throw new IOException("Failed to write data to IGFS stream because Grid is stopping.", e); + else + throw e; + } + } + + /** {@inheritDoc} */ + @Override public void flush(HadoopIgfsStreamDelegate delegate) throws IOException { + try { + IgfsOutputStream stream = delegate.target(); + + stream.flush(); + } + catch (IllegalStateException | IOException e) { + HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate); + + if (lsnr != null) + lsnr.onError(e.getMessage()); + + if (e instanceof IllegalStateException) + throw new IOException("Failed to flush data to IGFS stream because Grid is stopping.", e); + else + throw e; + } + } + + /** {@inheritDoc} */ + @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws IOException { + Closeable closeable = desc.target(); + + try { + closeable.close(); + } + catch (IllegalStateException e) { + throw new IOException("Failed to close IGFS stream because Grid is stopping.", e); + } + } + + /** {@inheritDoc} */ + @Override public void addEventListener(HadoopIgfsStreamDelegate delegate, + HadoopIgfsStreamEventListener lsnr) { + HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(delegate, lsnr); + + assert lsnr0 == null || lsnr0 == lsnr; + + if (log.isDebugEnabled()) + log.debug("Added stream event listener [delegate=" + delegate + ']'); + } + + /** {@inheritDoc} */ + @Override public void removeEventListener(HadoopIgfsStreamDelegate delegate) { + HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(delegate); + + if (lsnr0 != null && log.isDebugEnabled()) + log.debug("Removed stream event listener [delegate=" + delegate + ']'); + } + + /** {@inheritDoc} */ + @Override public String user() { + return user; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java new file mode 100644 index 0000000..46b46d7 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInputStream.java @@ -0,0 +1,629 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import org.apache.commons.logging.Log; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.igfs.common.IgfsLogger; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.NotNull; + +/** + * IGFS input stream wrapper for hadoop interfaces. + */ +@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") +public final class HadoopIgfsInputStream extends InputStream implements Seekable, PositionedReadable, + HadoopIgfsStreamEventListener { + /** Minimum buffer size. */ + private static final int MIN_BUF_SIZE = 4 * 1024; + + /** Server stream delegate. */ + private HadoopIgfsStreamDelegate delegate; + + /** Stream ID used by logger. */ + private long logStreamId; + + /** Stream position. */ + private long pos; + + /** Stream read limit. */ + private long limit; + + /** Mark position. */ + private long markPos = -1; + + /** Prefetch buffer. */ + private DoubleFetchBuffer buf = new DoubleFetchBuffer(); + + /** Buffer half size for double-buffering. */ + private int bufHalfSize; + + /** Closed flag. */ + private volatile boolean closed; + + /** Flag set if stream was closed due to connection breakage. */ + private boolean connBroken; + + /** Logger. */ + private Log log; + + /** Client logger. */ + private IgfsLogger clientLog; + + /** Read time. */ + private long readTime; + + /** User time. */ + private long userTime; + + /** Last timestamp. */ + private long lastTs; + + /** Amount of read bytes. */ + private long total; + + /** + * Creates input stream. + * + * @param delegate Server stream delegate. + * @param limit Read limit. + * @param bufSize Buffer size. + * @param log Log. + * @param clientLog Client logger. + */ + public HadoopIgfsInputStream(HadoopIgfsStreamDelegate delegate, long limit, int bufSize, Log log, + IgfsLogger clientLog, long logStreamId) { + assert limit >= 0; + + this.delegate = delegate; + this.limit = limit; + this.log = log; + this.clientLog = clientLog; + this.logStreamId = logStreamId; + + bufHalfSize = Math.max(bufSize, MIN_BUF_SIZE); + + lastTs = System.nanoTime(); + + delegate.hadoop().addEventListener(delegate, this); + } + + /** + * Read start. + */ + private void readStart() { + long now = System.nanoTime(); + + userTime += now - lastTs; + + lastTs = now; + } + + /** + * Read end. + */ + private void readEnd() { + long now = System.nanoTime(); + + readTime += now - lastTs; + + lastTs = now; + } + + /** {@inheritDoc} */ + @Override public synchronized int read() throws IOException { + checkClosed(); + + readStart(); + + try { + if (eof()) + return -1; + + buf.refreshAhead(pos); + + int res = buf.atPosition(pos); + + pos++; + total++; + + buf.refreshAhead(pos); + + return res; + } + catch (IgniteCheckedException e) { + throw HadoopIgfsUtils.cast(e); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException { + checkClosed(); + + if (eof()) + return -1; + + readStart(); + + try { + long remaining = limit - pos; + + int read = buf.flatten(b, pos, off, len); + + pos += read; + total += read; + remaining -= read; + + if (remaining > 0 && read != len) { + int readAmt = (int)Math.min(remaining, len - read); + + delegate.hadoop().readData(delegate, pos, readAmt, b, off + read, len - read).get(); + + read += readAmt; + pos += readAmt; + total += readAmt; + } + + buf.refreshAhead(pos); + + return read; + } + catch (IgniteCheckedException e) { + throw HadoopIgfsUtils.cast(e); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized long skip(long n) throws IOException { + checkClosed(); + + if (clientLog.isLogEnabled()) + clientLog.logSkip(logStreamId, n); + + long oldPos = pos; + + if (pos + n <= limit) + pos += n; + else + pos = limit; + + buf.refreshAhead(pos); + + return pos - oldPos; + } + + /** {@inheritDoc} */ + @Override public synchronized int available() throws IOException { + checkClosed(); + + int available = buf.available(pos); + + assert available >= 0; + + return available; + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + if (!closed) { + readStart(); + + if (log.isDebugEnabled()) + log.debug("Closing input stream: " + delegate); + + delegate.hadoop().closeStream(delegate); + + readEnd(); + + if (clientLog.isLogEnabled()) + clientLog.logCloseIn(logStreamId, userTime, readTime, total); + + markClosed(false); + + if (log.isDebugEnabled()) + log.debug("Closed stream [delegate=" + delegate + ", readTime=" + readTime + + ", userTime=" + userTime + ']'); + } + } + + /** {@inheritDoc} */ + @Override public synchronized void mark(int readLimit) { + markPos = pos; + + if (clientLog.isLogEnabled()) + clientLog.logMark(logStreamId, readLimit); + } + + /** {@inheritDoc} */ + @Override public synchronized void reset() throws IOException { + checkClosed(); + + if (clientLog.isLogEnabled()) + clientLog.logReset(logStreamId); + + if (markPos == -1) + throw new IOException("Stream was not marked."); + + pos = markPos; + + buf.refreshAhead(pos); + } + + /** {@inheritDoc} */ + @Override public boolean markSupported() { + return true; + } + + /** {@inheritDoc} */ + @Override public synchronized int read(long position, byte[] buf, int off, int len) throws IOException { + long remaining = limit - position; + + int read = (int)Math.min(len, remaining); + + // Return -1 at EOF. + if (read == 0) + return -1; + + readFully(position, buf, off, read); + + return read; + } + + /** {@inheritDoc} */ + @Override public synchronized void readFully(long position, byte[] buf, int off, int len) throws IOException { + long remaining = limit - position; + + checkClosed(); + + if (len > remaining) + throw new EOFException("End of stream reached before data was fully read."); + + readStart(); + + try { + int read = this.buf.flatten(buf, position, off, len); + + total += read; + + if (read != len) { + int readAmt = len - read; + + delegate.hadoop().readData(delegate, position + read, readAmt, buf, off + read, readAmt).get(); + + total += readAmt; + } + + if (clientLog.isLogEnabled()) + clientLog.logRandomRead(logStreamId, position, len); + } + catch (IgniteCheckedException e) { + throw HadoopIgfsUtils.cast(e); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public void readFully(long position, byte[] buf) throws IOException { + readFully(position, buf, 0, buf.length); + } + + /** {@inheritDoc} */ + @Override public synchronized void seek(long pos) throws IOException { + A.ensure(pos >= 0, "position must be non-negative"); + + checkClosed(); + + if (clientLog.isLogEnabled()) + clientLog.logSeek(logStreamId, pos); + + if (pos > limit) + pos = limit; + + if (log.isDebugEnabled()) + log.debug("Seek to position [delegate=" + delegate + ", pos=" + pos + ", oldPos=" + this.pos + ']'); + + this.pos = pos; + + buf.refreshAhead(pos); + } + + /** {@inheritDoc} */ + @Override public synchronized long getPos() { + return pos; + } + + /** {@inheritDoc} */ + @Override public synchronized boolean seekToNewSource(long targetPos) { + return false; + } + + /** {@inheritDoc} */ + @Override public void onClose() { + markClosed(true); + } + + /** {@inheritDoc} */ + @Override public void onError(String errMsg) { + // No-op. + } + + /** + * Marks stream as closed. + * + * @param connBroken {@code True} if connection with server was lost. + */ + private void markClosed(boolean connBroken) { + // It is ok to have race here. + if (!closed) { + closed = true; + + this.connBroken = connBroken; + + delegate.hadoop().removeEventListener(delegate); + } + } + + /** + * @throws IOException If check failed. + */ + private void checkClosed() throws IOException { + if (closed) { + if (connBroken) + throw new IOException("Server connection was lost."); + else + throw new IOException("Stream is closed."); + } + } + + /** + * @return {@code True} if end of stream reached. + */ + private boolean eof() { + return limit == pos; + } + + /** + * Asynchronous prefetch buffer. + */ + private static class FetchBufferPart { + /** Read future. */ + private IgniteInternalFuture readFut; + + /** Position of cached chunk in file. */ + private long pos; + + /** Prefetch length. Need to store as read future result might be not available yet. */ + private int len; + + /** + * Creates fetch buffer part. + * + * @param readFut Read future for this buffer. + * @param pos Read position. + * @param len Chunk length. + */ + private FetchBufferPart(IgniteInternalFuture readFut, long pos, int len) { + this.readFut = readFut; + this.pos = pos; + this.len = len; + } + + /** + * Copies cached data if specified position matches cached region. + * + * @param dst Destination buffer. + * @param pos Read position in file. + * @param dstOff Offset in destination buffer from which start writing. + * @param len Maximum number of bytes to copy. + * @return Number of bytes copied. + * @throws IgniteCheckedException If read future failed. + */ + public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException { + // If read start position is within cached boundaries. + if (contains(pos)) { + byte[] data = readFut.get(); + + int srcPos = (int)(pos - this.pos); + int cpLen = Math.min(len, data.length - srcPos); + + U.arrayCopy(data, srcPos, dst, dstOff, cpLen); + + return cpLen; + } + + return 0; + } + + /** + * @return {@code True} if data is ready to be read. + */ + public boolean ready() { + return readFut.isDone(); + } + + /** + * Checks if current buffer part contains given position. + * + * @param pos Position to check. + * @return {@code True} if position matches buffer region. + */ + public boolean contains(long pos) { + return this.pos <= pos && this.pos + len > pos; + } + } + + private class DoubleFetchBuffer { + /** */ + private FetchBufferPart first; + + /** */ + private FetchBufferPart second; + + /** + * Copies fetched data from both buffers to destination array if cached region matched read position. + * + * @param dst Destination buffer. + * @param pos Read position in file. + * @param dstOff Destination buffer offset. + * @param len Maximum number of bytes to copy. + * @return Number of bytes copied. + * @throws IgniteCheckedException If any read operation failed. + */ + public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException { + assert dstOff >= 0; + assert dstOff + len <= dst.length : "Invalid indices [dst.length=" + dst.length + ", dstOff=" + dstOff + + ", len=" + len + ']'; + + int bytesCopied = 0; + + if (first != null) { + bytesCopied += first.flatten(dst, pos, dstOff, len); + + if (bytesCopied != len && second != null) { + assert second.pos == first.pos + first.len; + + bytesCopied += second.flatten(dst, pos + bytesCopied, dstOff + bytesCopied, len - bytesCopied); + } + } + + return bytesCopied; + } + + /** + * Gets byte at specified position in buffer. + * + * @param pos Stream position. + * @return Read byte. + * @throws IgniteCheckedException If read failed. + */ + public int atPosition(long pos) throws IgniteCheckedException { + // Should not reach here if stream contains no data. + assert first != null; + + if (first.contains(pos)) { + byte[] bytes = first.readFut.get(); + + return bytes[((int)(pos - first.pos))] & 0xFF; + } + else { + assert second != null; + assert second.contains(pos); + + byte[] bytes = second.readFut.get(); + + return bytes[((int)(pos - second.pos))] & 0xFF; + } + } + + /** + * Starts asynchronous buffer refresh if needed, depending on current position. + * + * @param pos Current stream position. + */ + public void refreshAhead(long pos) { + if (fullPrefetch(pos)) { + first = fetch(pos, bufHalfSize); + second = fetch(pos + bufHalfSize, bufHalfSize); + } + else if (needFlip(pos)) { + first = second; + + second = fetch(first.pos + first.len, bufHalfSize); + } + } + + /** + * @param pos Position from which read is expected. + * @return Number of bytes available to be read without blocking. + */ + public int available(long pos) { + int available = 0; + + if (first != null) { + if (first.contains(pos)) { + if (first.ready()) { + available += (pos - first.pos); + + if (second != null && second.ready()) + available += second.len; + } + } + else { + if (second != null && second.contains(pos) && second.ready()) + available += (pos - second.pos); + } + } + + return available; + } + + /** + * Checks if position shifted enough to forget previous buffer. + * + * @param pos Current position. + * @return {@code True} if need flip buffers. + */ + private boolean needFlip(long pos) { + // Return true if we read more then half of second buffer. + return second != null && second.contains(pos); + } + + /** + * Determines if all cached bytes should be discarded and new region should be + * prefetched. + * + * @param curPos Current stream position. + * @return {@code True} if need to refresh both blocks. + */ + private boolean fullPrefetch(long curPos) { + // If no data was prefetched yet, return true. + return first == null || curPos < first.pos || (second != null && curPos >= second.pos + second.len); + } + + /** + * Starts asynchronous fetch for given region. + * + * @param pos Position to read from. + * @param size Number of bytes to read. + * @return Fetch buffer part. + */ + private FetchBufferPart fetch(long pos, int size) { + long remaining = limit - pos; + + size = (int)Math.min(size, remaining); + + return size <= 0 ? null : + new FetchBufferPart(delegate.hadoop().readData(delegate, pos, size, null, 0, 0), pos, size); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIo.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIo.java new file mode 100644 index 0000000..70f645f --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIo.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.igfs; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.igfs.common.IgfsMessage; +import org.jetbrains.annotations.Nullable; + +/** + * IO abstraction layer for IGFS client. Two kind of messages are expected to be sent: requests with response + * and request without response. + */ +public interface HadoopIgfsIo { + /** + * Sends given IGFS client message and asynchronously awaits for response. + * + * @param msg Message to send. + * @return Future that will be completed. + * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed). + */ + public IgniteInternalFuture send(IgfsMessage msg) throws IgniteCheckedException; + + /** + * Sends given IGFS client message and asynchronously awaits for response. When IO detects response + * beginning for given message it stops reading data and passes input stream to closure which can read + * response in a specific way. + * + * @param msg Message to send. + * @param outBuf Output buffer. If {@code null}, the output buffer is not used. + * @param outOff Output buffer offset. + * @param outLen Output buffer length. + * @return Future that will be completed when response is returned from closure. + * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed). + */ + public IgniteInternalFuture send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, int outLen) + throws IgniteCheckedException; + + /** + * Sends given message and does not wait for response. + * + * @param msg Message to send. + * @throws IgniteCheckedException If send failed. + */ + public void sendPlain(IgfsMessage msg) throws IgniteCheckedException; + + /** + * Adds event listener that will be invoked when connection with server is lost or remote error has occurred. + * If connection is closed already, callback will be invoked synchronously inside this method. + * + * @param lsnr Event listener. + */ + public void addEventListener(HadoopIgfsIpcIoListener lsnr); + + /** + * Removes event listener that will be invoked when connection with server is lost or remote error has occurred. + * + * @param lsnr Event listener. + */ + public void removeEventListener(HadoopIgfsIpcIoListener lsnr); +} \ No newline at end of file