Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 60430106AC for ; Thu, 5 Mar 2015 09:07:48 +0000 (UTC) Received: (qmail 57422 invoked by uid 500); 5 Mar 2015 09:07:48 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 57387 invoked by uid 500); 5 Mar 2015 09:07:48 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 57378 invoked by uid 99); 5 Mar 2015 09:07:48 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Mar 2015 09:07:48 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 05 Mar 2015 09:05:06 +0000 Received: (qmail 42476 invoked by uid 99); 5 Mar 2015 09:05:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Mar 2015 09:05:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A3A0FE10A2; Thu, 5 Mar 2015 09:05:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yzhdanov@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 05 Mar 2015 09:05:36 -0000 Message-Id: <26fb906e7f7549b1a4a3676e683d0e33@git.apache.org> In-Reply-To: <56d156eb01174f0b88f954783fd4b143@git.apache.org> References: <56d156eb01174f0b88f954783fd4b143@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [36/58] [abbrv] incubator-ignite git commit: IGNITE-386: Squashed changes. X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java deleted file mode 100644 index 0759203..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/IgfsHadoopFileSystem.java +++ /dev/null @@ -1,982 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs.hadoop.v2; - -import org.apache.commons.logging.*; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.permission.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.util.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.igfs.common.*; -import org.apache.ignite.internal.igfs.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.IgniteFs.*; -import static org.apache.ignite.configuration.IgfsConfiguration.*; -import static org.apache.ignite.igfs.IgfsMode.*; -import static org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.*; -import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.*; - -/** - * {@code IGFS} Hadoop 2.x file system driver over file system API. To use - * {@code IGFS} as Hadoop file system, you should configure this class - * in Hadoop's {@code core-site.xml} as follows: - *
- *  <property>
- *      <name>fs.default.name</name>
- *      <value>igfs://ipc</value>
- *  </property>
- *
- *  <property>
- *      <name>fs.igfs.impl</name>
- *      <value>org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem</value>
- *  </property>
- * 
- * You should also add Ignite JAR and all libraries to Hadoop classpath. To - * do this, add following lines to {@code conf/hadoop-env.sh} script in Hadoop - * distribution: - *
- * export IGNITE_HOME=/path/to/Ignite/distribution
- * export HADOOP_CLASSPATH=$IGNITE_HOME/ignite*.jar
- *
- * for f in $IGNITE_HOME/libs/*.jar; do
- *  export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f;
- * done
- * 
- *

Data vs Clients Nodes

- * Hadoop needs to use its FileSystem remotely from client nodes as well as directly on - * data nodes. Client nodes are responsible for basic file system operations as well as - * accessing data nodes remotely. Usually, client nodes are started together - * with {@code job-submitter} or {@code job-scheduler} processes, while data nodes are usually - * started together with Hadoop {@code task-tracker} processes. - *

- * For sample client and data node configuration refer to {@code config/hadoop/default-config-client.xml} - * and {@code config/hadoop/default-config.xml} configuration files in Ignite installation. - */ -public class IgfsHadoopFileSystem extends AbstractFileSystem implements Closeable { - /** Logger. */ - private static final Log LOG = LogFactory.getLog(IgfsHadoopFileSystem.class); - - /** Ensures that close routine is invoked at most once. */ - private final AtomicBoolean closeGuard = new AtomicBoolean(); - - /** Grid remote client. */ - private IgfsHadoopWrapper rmtClient; - - /** Working directory. */ - private IgfsPath workingDir; - - /** URI. */ - private URI uri; - - /** Authority. */ - private String uriAuthority; - - /** Client logger. */ - private IgfsLogger clientLog; - - /** Server block size. */ - private long grpBlockSize; - - /** Default replication factor. */ - private short dfltReplication; - - /** Secondary URI string. */ - private URI secondaryUri; - - /** Mode resolver. */ - private IgfsModeResolver modeRslvr; - - /** Secondary file system instance. */ - private AbstractFileSystem secondaryFs; - - /** Whether custom sequential reads before prefetch value is provided. */ - private boolean seqReadsBeforePrefetchOverride; - - /** Custom-provided sequential reads before prefetch. */ - private int seqReadsBeforePrefetch; - - /** Flag that controls whether file writes should be colocated on data node. */ - private boolean colocateFileWrites; - - /** Prefer local writes. */ - private boolean preferLocFileWrites; - - /** - * @param name URI for file system. - * @param cfg Configuration. - * @throws URISyntaxException if name has invalid syntax. - * @throws IOException If initialization failed. - */ - public IgfsHadoopFileSystem(URI name, Configuration cfg) throws URISyntaxException, IOException { - super(IgfsHadoopEndpoint.normalize(name), IGFS_SCHEME, false, -1); - - uri = name; - - try { - initialize(name, cfg); - } - catch (IOException e) { - // Close client if exception occurred. - if (rmtClient != null) - rmtClient.close(false); - - throw e; - } - - workingDir = new IgfsPath("/user/" + cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME)); - } - - /** {@inheritDoc} */ - @Override public void checkPath(Path path) { - URI uri = path.toUri(); - - if (uri.isAbsolute()) { - if (!F.eq(uri.getScheme(), IGFS_SCHEME)) - throw new InvalidPathException("Wrong path scheme [expected=" + IGFS_SCHEME + ", actual=" + - uri.getAuthority() + ']'); - - if (!F.eq(uri.getAuthority(), uriAuthority)) - throw new InvalidPathException("Wrong path authority [expected=" + uriAuthority + ", actual=" + - uri.getAuthority() + ']'); - } - } - - /** - * Public setter that can be used by direct users of FS or Visor. - * - * @param colocateFileWrites Whether all ongoing file writes should be colocated. - */ - @SuppressWarnings("UnusedDeclaration") - public void colocateFileWrites(boolean colocateFileWrites) { - this.colocateFileWrites = colocateFileWrites; - } - - /** - * Enter busy state. - * - * @throws IOException If file system is stopped. - */ - private void enterBusy() throws IOException { - if (closeGuard.get()) - throw new IOException("File system is stopped."); - } - - /** - * Leave busy state. - */ - private void leaveBusy() { - // No-op. - } - - /** - * @param name URI passed to constructor. - * @param cfg Configuration passed to constructor. - * @throws IOException If initialization failed. - */ - private void initialize(URI name, Configuration cfg) throws IOException { - enterBusy(); - - try { - if (rmtClient != null) - throw new IOException("File system is already initialized: " + rmtClient); - - A.notNull(name, "name"); - A.notNull(cfg, "cfg"); - - if (!IGFS_SCHEME.equals(name.getScheme())) - throw new IOException("Illegal file system URI [expected=" + IGFS_SCHEME + - "://[name]/[optional_path], actual=" + name + ']'); - - uriAuthority = name.getAuthority(); - - // Override sequential reads before prefetch if needed. - seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0); - - if (seqReadsBeforePrefetch > 0) - seqReadsBeforePrefetchOverride = true; - - // In Ignite replication factor is controlled by data cache affinity. - // We use replication factor to force the whole file to be stored on local node. - dfltReplication = (short)cfg.getInt("dfs.replication", 3); - - // Get file colocation control flag. - colocateFileWrites = parameter(cfg, PARAM_IGFS_COLOCATED_WRITES, uriAuthority, false); - preferLocFileWrites = cfg.getBoolean(PARAM_IGFS_PREFER_LOCAL_WRITES, false); - - // Get log directory. - String logDirCfg = parameter(cfg, PARAM_IGFS_LOG_DIR, uriAuthority, DFLT_IGFS_LOG_DIR); - - File logDirFile = U.resolveIgnitePath(logDirCfg); - - String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null; - - rmtClient = new IgfsHadoopWrapper(uriAuthority, logDir, cfg, LOG); - - // Handshake. - IgfsHandshakeResponse handshake = rmtClient.handshake(logDir); - - grpBlockSize = handshake.blockSize(); - - IgfsPaths paths = handshake.secondaryPaths(); - - Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false); - - if (handshake.sampling() != null ? handshake.sampling() : logEnabled) { - // Initiate client logger. - if (logDir == null) - throw new IOException("Failed to resolve log directory: " + logDirCfg); - - Integer batchSize = parameter(cfg, PARAM_IGFS_LOG_BATCH_SIZE, uriAuthority, DFLT_IGFS_LOG_BATCH_SIZE); - - clientLog = IgfsLogger.logger(uriAuthority, handshake.igfsName(), logDir, batchSize); - } - else - clientLog = IgfsLogger.disabledLogger(); - - modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes()); - - boolean initSecondary = paths.defaultMode() == PROXY; - - if (!initSecondary && paths.pathModes() != null) { - for (T2 pathMode : paths.pathModes()) { - IgfsMode mode = pathMode.getValue(); - - if (mode == PROXY) { - initSecondary = true; - - break; - } - } - } - - if (initSecondary) { - Map props = paths.properties(); - - String secUri = props.get(SECONDARY_FS_URI); - String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); - - try { - SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); - - secondaryFs = secProvider.createAbstractFileSystem(); - secondaryUri = secProvider.uri(); - } - catch (IOException e) { - throw new IOException("Failed to connect to the secondary file system: " + secUri, e); - } - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public void close() throws IOException { - if (closeGuard.compareAndSet(false, true)) { - if (rmtClient == null) - return; - - rmtClient.close(false); - - if (clientLog.isLogEnabled()) - clientLog.close(); - - // Reset initialized resources. - rmtClient = null; - } - } - - /** {@inheritDoc} */ - @Override public URI getUri() { - return uri; - } - - /** {@inheritDoc} */ - @Override public int getUriDefaultPort() { - return -1; - } - - /** {@inheritDoc} */ - @Override public FsServerDefaults getServerDefaults() throws IOException { - return new FsServerDefaults(grpBlockSize, (int)grpBlockSize, (int)grpBlockSize, dfltReplication, 64 * 1024, - false, 0, DataChecksum.Type.NULL); - } - - /** {@inheritDoc} */ - @Override public boolean setReplication(Path f, short replication) throws IOException { - return mode(f) == PROXY && secondaryFs.setReplication(f, replication); - } - - /** {@inheritDoc} */ - @Override public void setTimes(Path f, long mtime, long atime) throws IOException { - if (mode(f) == PROXY) - secondaryFs.setTimes(f, mtime, atime); - else { - if (mtime == -1 && atime == -1) - return; - - rmtClient.setTimes(convert(f), atime, mtime); - } - } - - /** {@inheritDoc} */ - @Override public FsStatus getFsStatus() throws IOException { - IgfsStatus status = rmtClient.fsStatus(); - - return new FsStatus(status.spaceTotal(), status.spaceUsed(), status.spaceTotal() - status.spaceUsed()); - } - - /** {@inheritDoc} */ - @Override public void setPermission(Path p, FsPermission perm) throws IOException { - enterBusy(); - - try { - A.notNull(p, "p"); - - if (mode(p) == PROXY) - secondaryFs.setPermission(toSecondary(p), perm); - else { - if (rmtClient.update(convert(p), permission(perm)) == null) - throw new IOException("Failed to set file permission (file not found?)" + - " [path=" + p + ", perm=" + perm + ']'); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public void setOwner(Path p, String usr, String grp) throws IOException { - A.notNull(p, "p"); - A.notNull(usr, "username"); - A.notNull(grp, "grpName"); - - enterBusy(); - - try { - if (mode(p) == PROXY) - secondaryFs.setOwner(toSecondary(p), usr, grp); - else if (rmtClient.update(convert(p), F.asMap(PROP_USER_NAME, usr, PROP_GROUP_NAME, grp)) == null) - throw new IOException("Failed to set file permission (file not found?)" + - " [path=" + p + ", username=" + usr + ", grpName=" + grp + ']'); - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public FSDataInputStream open(Path f, int bufSize) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); - - if (mode == PROXY) { - FSDataInputStream is = secondaryFs.open(toSecondary(f), bufSize); - - if (clientLog.isLogEnabled()) { - // At this point we do not know file size, so we perform additional request to remote FS to get it. - FileStatus status = secondaryFs.getFileStatus(toSecondary(f)); - - long size = status != null ? status.getLen() : -1; - - long logId = IgfsLogger.nextId(); - - clientLog.logOpen(logId, path, PROXY, bufSize, size); - - return new FSDataInputStream(new IgfsHadoopProxyInputStream(is, clientLog, logId)); - } - else - return is; - } - else { - IgfsHadoopStreamDelegate stream = seqReadsBeforePrefetchOverride ? - rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path); - - long logId = -1; - - if (clientLog.isLogEnabled()) { - logId = IgfsLogger.nextId(); - - clientLog.logOpen(logId, path, mode, bufSize, stream.length()); - } - - if (LOG.isDebugEnabled()) - LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path + - ", bufSize=" + bufSize + ']'); - - IgfsHadoopInputStream igfsIn = new IgfsHadoopInputStream(stream, stream.length(), - bufSize, LOG, clientLog, logId); - - if (LOG.isDebugEnabled()) - LOG.debug("Opened input stream [path=" + path + ", delegate=" + stream + ']'); - - return new FSDataInputStream(igfsIn); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public FSDataOutputStream createInternal( - Path f, - EnumSet flag, - FsPermission perm, - int bufSize, - short replication, - long blockSize, - Progressable progress, - Options.ChecksumOpt checksumOpt, - boolean createParent - ) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - boolean overwrite = flag.contains(CreateFlag.OVERWRITE); - boolean append = flag.contains(CreateFlag.APPEND); - boolean create = flag.contains(CreateFlag.CREATE); - - OutputStream out = null; - - try { - IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); - - if (LOG.isDebugEnabled()) - LOG.debug("Opening output stream in create [thread=" + Thread.currentThread().getName() + "path=" + - path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']'); - - if (mode == PROXY) { - FSDataOutputStream os = secondaryFs.createInternal(toSecondary(f), flag, perm, bufSize, - replication, blockSize, progress, checksumOpt, createParent); - - if (clientLog.isLogEnabled()) { - long logId = IgfsLogger.nextId(); - - if (append) - clientLog.logAppend(logId, path, PROXY, bufSize); // Don't have stream ID. - else - clientLog.logCreate(logId, path, PROXY, overwrite, bufSize, replication, blockSize); - - return new FSDataOutputStream(new IgfsHadoopProxyOutputStream(os, clientLog, logId)); - } - else - return os; - } - else { - Map permMap = F.asMap(PROP_PERMISSION, toString(perm), - PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites)); - - // Create stream and close it in the 'finally' section if any sequential operation failed. - IgfsHadoopStreamDelegate stream; - - long logId = -1; - - if (append) { - stream = rmtClient.append(path, create, permMap); - - if (clientLog.isLogEnabled()) { - logId = IgfsLogger.nextId(); - - clientLog.logAppend(logId, path, mode, bufSize); - } - - if (LOG.isDebugEnabled()) - LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']'); - } - else { - stream = rmtClient.create(path, overwrite, colocateFileWrites, replication, blockSize, - permMap); - - if (clientLog.isLogEnabled()) { - logId = IgfsLogger.nextId(); - - clientLog.logCreate(logId, path, mode, overwrite, bufSize, replication, blockSize); - } - - if (LOG.isDebugEnabled()) - LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']'); - } - - assert stream != null; - - IgfsHadoopOutputStream igfsOut = new IgfsHadoopOutputStream(stream, LOG, - clientLog, logId); - - bufSize = Math.max(64 * 1024, bufSize); - - out = new BufferedOutputStream(igfsOut, bufSize); - - FSDataOutputStream res = new FSDataOutputStream(out, null, 0); - - // Mark stream created successfully. - out = null; - - return res; - } - } - finally { - // Close if failed during stream creation. - if (out != null) - U.closeQuiet(out); - - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public boolean supportsSymlinks() { - return false; - } - - /** {@inheritDoc} */ - @Override public void renameInternal(Path src, Path dst) throws IOException { - A.notNull(src, "src"); - A.notNull(dst, "dst"); - - enterBusy(); - - try { - IgfsPath srcPath = convert(src); - IgfsPath dstPath = convert(dst); - Set childrenModes = modeRslvr.resolveChildrenModes(srcPath); - - if (childrenModes.contains(PROXY)) { - if (clientLog.isLogEnabled()) - clientLog.logRename(srcPath, PROXY, dstPath); - - secondaryFs.renameInternal(toSecondary(src), toSecondary(dst)); - } - - rmtClient.rename(srcPath, dstPath); - - if (clientLog.isLogEnabled()) - clientLog.logRename(srcPath, modeRslvr.resolveMode(srcPath), dstPath); - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public boolean delete(Path f, boolean recursive) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); - Set childrenModes = modeRslvr.resolveChildrenModes(path); - - if (childrenModes.contains(PROXY)) { - if (clientLog.isLogEnabled()) - clientLog.logDelete(path, PROXY, recursive); - - return secondaryFs.delete(toSecondary(f), recursive); - } - - boolean res = rmtClient.delete(path, recursive); - - if (clientLog.isLogEnabled()) - clientLog.logDelete(path, mode, recursive); - - return res; - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public void setVerifyChecksum(boolean verifyChecksum) throws IOException { - // Checksum has effect for secondary FS only. - if (secondaryFs != null) - secondaryFs.setVerifyChecksum(verifyChecksum); - } - - /** {@inheritDoc} */ - @Override public FileChecksum getFileChecksum(Path f) throws IOException { - if (mode(f) == PROXY) - return secondaryFs.getFileChecksum(f); - - return null; - } - - /** {@inheritDoc} */ - @Override public FileStatus[] listStatus(Path f) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); - - if (mode == PROXY) { - FileStatus[] arr = secondaryFs.listStatus(toSecondary(f)); - - if (arr == null) - throw new FileNotFoundException("File " + f + " does not exist."); - - for (int i = 0; i < arr.length; i++) - arr[i] = toPrimary(arr[i]); - - if (clientLog.isLogEnabled()) { - String[] fileArr = new String[arr.length]; - - for (int i = 0; i < arr.length; i++) - fileArr[i] = arr[i].getPath().toString(); - - clientLog.logListDirectory(path, PROXY, fileArr); - } - - return arr; - } - else { - Collection list = rmtClient.listFiles(path); - - if (list == null) - throw new FileNotFoundException("File " + f + " does not exist."); - - List files = new ArrayList<>(list); - - FileStatus[] arr = new FileStatus[files.size()]; - - for (int i = 0; i < arr.length; i++) - arr[i] = convert(files.get(i)); - - if (clientLog.isLogEnabled()) { - String[] fileArr = new String[arr.length]; - - for (int i = 0; i < arr.length; i++) - fileArr[i] = arr[i].getPath().toString(); - - clientLog.logListDirectory(path, mode, fileArr); - } - - return arr; - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public void mkdir(Path f, FsPermission perm, boolean createParent) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = modeRslvr.resolveMode(path); - - if (mode == PROXY) { - if (clientLog.isLogEnabled()) - clientLog.logMakeDirectory(path, PROXY); - - secondaryFs.mkdir(toSecondary(f), perm, createParent); - } - else { - rmtClient.mkdirs(path, permission(perm)); - - if (clientLog.isLogEnabled()) - clientLog.logMakeDirectory(path, mode); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public FileStatus getFileStatus(Path f) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - if (mode(f) == PROXY) - return toPrimary(secondaryFs.getFileStatus(toSecondary(f))); - else { - IgfsFile info = rmtClient.info(convert(f)); - - if (info == null) - throw new FileNotFoundException("File not found: " + f); - - return convert(info); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public BlockLocation[] getFileBlockLocations(Path path, long start, long len) throws IOException { - A.notNull(path, "path"); - - IgfsPath igfsPath = convert(path); - - enterBusy(); - - try { - if (modeRslvr.resolveMode(igfsPath) == PROXY) - return secondaryFs.getFileBlockLocations(path, start, len); - else { - long now = System.currentTimeMillis(); - - List affinity = new ArrayList<>( - rmtClient.affinity(igfsPath, start, len)); - - BlockLocation[] arr = new BlockLocation[affinity.size()]; - - for (int i = 0; i < arr.length; i++) - arr[i] = convert(affinity.get(i)); - - if (LOG.isDebugEnabled()) - LOG.debug("Fetched file locations [path=" + path + ", fetchTime=" + - (System.currentTimeMillis() - now) + ", locations=" + Arrays.asList(arr) + ']'); - - return arr; - } - } - finally { - leaveBusy(); - } - } - - /** - * Resolve path mode. - * - * @param path HDFS path. - * @return Path mode. - */ - public IgfsMode mode(Path path) { - return modeRslvr.resolveMode(convert(path)); - } - - /** - * Convert the given path to path acceptable by the primary file system. - * - * @param path Path. - * @return Primary file system path. - */ - private Path toPrimary(Path path) { - return convertPath(path, getUri()); - } - - /** - * Convert the given path to path acceptable by the secondary file system. - * - * @param path Path. - * @return Secondary file system path. - */ - private Path toSecondary(Path path) { - assert secondaryFs != null; - assert secondaryUri != null; - - return convertPath(path, secondaryUri); - } - - /** - * Convert path using the given new URI. - * - * @param path Old path. - * @param newUri New URI. - * @return New path. - */ - private Path convertPath(Path path, URI newUri) { - assert newUri != null; - - if (path != null) { - URI pathUri = path.toUri(); - - try { - return new Path(new URI(pathUri.getScheme() != null ? newUri.getScheme() : null, - pathUri.getAuthority() != null ? newUri.getAuthority() : null, pathUri.getPath(), null, null)); - } - catch (URISyntaxException e) { - throw new IgniteException("Failed to construct secondary file system path from the primary file " + - "system path: " + path, e); - } - } - else - return null; - } - - /** - * Convert a file status obtained from the secondary file system to a status of the primary file system. - * - * @param status Secondary file system status. - * @return Primary file system status. - */ - private FileStatus toPrimary(FileStatus status) { - return status != null ? new FileStatus(status.getLen(), status.isDirectory(), status.getReplication(), - status.getBlockSize(), status.getModificationTime(), status.getAccessTime(), status.getPermission(), - status.getOwner(), status.getGroup(), toPrimary(status.getPath())) : null; - } - - /** - * Convert IGFS path into Hadoop path. - * - * @param path IGFS path. - * @return Hadoop path. - */ - private Path convert(IgfsPath path) { - return new Path(IGFS_SCHEME, uriAuthority, path.toString()); - } - - /** - * Convert Hadoop path into IGFS path. - * - * @param path Hadoop path. - * @return IGFS path. - */ - @Nullable private IgfsPath convert(Path path) { - if (path == null) - return null; - - return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) : - new IgfsPath(workingDir, path.toUri().getPath()); - } - - /** - * Convert IGFS affinity block location into Hadoop affinity block location. - * - * @param block IGFS affinity block location. - * @return Hadoop affinity block location. - */ - private BlockLocation convert(IgfsBlockLocation block) { - Collection names = block.names(); - Collection hosts = block.hosts(); - - return new BlockLocation( - names.toArray(new String[names.size()]) /* hostname:portNumber of data nodes */, - hosts.toArray(new String[hosts.size()]) /* hostnames of data nodes */, - block.start(), block.length() - ) { - @Override public String toString() { - try { - return "BlockLocation [offset=" + getOffset() + ", length=" + getLength() + - ", hosts=" + Arrays.asList(getHosts()) + ", names=" + Arrays.asList(getNames()) + ']'; - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - }; - } - - /** - * Convert IGFS file information into Hadoop file status. - * - * @param file IGFS file information. - * @return Hadoop file status. - */ - private FileStatus convert(IgfsFile file) { - return new FileStatus( - file.length(), - file.isDirectory(), - dfltReplication, - file.groupBlockSize(), - file.modificationTime(), - file.accessTime(), - permission(file), - file.property(PROP_USER_NAME, DFLT_USER_NAME), - file.property(PROP_GROUP_NAME, "users"), - convert(file.path())) { - @Override public String toString() { - return "FileStatus [path=" + getPath() + ", isDir=" + isDirectory() + ", len=" + getLen() + "]"; - } - }; - } - - /** - * Convert Hadoop permission into IGFS file attribute. - * - * @param perm Hadoop permission. - * @return IGFS attributes. - */ - private Map permission(FsPermission perm) { - if (perm == null) - perm = FsPermission.getDefault(); - - return F.asMap(PROP_PERMISSION, toString(perm)); - } - - /** - * @param perm Permission. - * @return String. - */ - private static String toString(FsPermission perm) { - return String.format("%04o", perm.toShort()); - } - - /** - * Convert IGFS file attributes into Hadoop permission. - * - * @param file File info. - * @return Hadoop permission. - */ - private FsPermission permission(IgfsFile file) { - String perm = file.property(PROP_PERMISSION, null); - - if (perm == null) - return FsPermission.getDefault(); - - try { - return new FsPermission((short)Integer.parseInt(perm, 8)); - } - catch (NumberFormatException ignore) { - return FsPermission.getDefault(); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsHadoopFileSystem.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/package.html ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/package.html b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/package.html deleted file mode 100644 index 6df66f4..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v2/package.html +++ /dev/null @@ -1,24 +0,0 @@ - - - - - - - Contains Hadoop 2.x FileSystem wrapper for Ignite file system. - - http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/igfs/package.html ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/package.html b/modules/hadoop/src/main/java/org/apache/ignite/igfs/package.html deleted file mode 100644 index ec38a21..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/package.html +++ /dev/null @@ -1,24 +0,0 @@ - - - - - - - Contains IGnite File System APIs. - - http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java deleted file mode 100644 index 27d6e33..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoop.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Facade for communication with grid. - */ -public interface IgfsHadoop { - /** - * Perform handshake. - * - * @param logDir Log directory. - * @return Future with handshake result. - * @throws IgniteCheckedException If failed. - */ - public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException, IOException; - - /** - * Close connection. - * - * @param force Force flag. - */ - public void close(boolean force); - - /** - * Command to retrieve file info for some IGFS path. - * - * @param path Path to get file info for. - * @return Future for info operation. - * @throws IgniteCheckedException If failed. - */ - public IgfsFile info(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to update file properties. - * - * @param path IGFS path to update properties. - * @param props Properties to update. - * @return Future for update operation. - * @throws IgniteCheckedException If failed. - */ - public IgfsFile update(IgfsPath path, Map props) throws IgniteCheckedException, IOException; - - /** - * Sets last access time and last modification time for a file. - * - * @param path Path to update times. - * @param accessTime Last access time to set. - * @param modificationTime Last modification time to set. - * @throws IgniteCheckedException If failed. - */ - public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException, - IOException; - - /** - * Command to rename given path. - * - * @param src Source path. - * @param dest Destination path. - * @return Future for rename operation. - * @throws IgniteCheckedException If failed. - */ - public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException, IOException; - - /** - * Command to delete given path. - * - * @param path Path to delete. - * @param recursive {@code True} if deletion is recursive. - * @return Future for delete operation. - * @throws IgniteCheckedException If failed. - */ - public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException, IOException; - - /** - * Command to get affinity for given path, offset and length. - * - * @param path Path to get affinity for. - * @param start Start position (offset). - * @param len Data length. - * @return Future for affinity command. - * @throws IgniteCheckedException If failed. - */ - public Collection affinity(IgfsPath path, long start, long len) throws IgniteCheckedException, - IOException; - - /** - * Gets path summary. - * - * @param path Path to get summary for. - * @return Future that will be completed when summary is received. - * @throws IgniteCheckedException If failed. - */ - public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to create directories. - * - * @param path Path to create. - * @return Future for mkdirs operation. - * @throws IgniteCheckedException If failed. - */ - public Boolean mkdirs(IgfsPath path, Map props) throws IgniteCheckedException, IOException; - - /** - * Command to get list of files in directory. - * - * @param path Path to list. - * @return Future for listFiles operation. - * @throws IgniteCheckedException If failed. - */ - public Collection listFiles(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to get directory listing. - * - * @param path Path to list. - * @return Future for listPaths operation. - * @throws IgniteCheckedException If failed. - */ - public Collection listPaths(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Performs status request. - * - * @return Status response. - * @throws IgniteCheckedException If failed. - */ - public IgfsStatus fsStatus() throws IgniteCheckedException, IOException; - - /** - * Command to open file for reading. - * - * @param path File path to open. - * @return Future for open operation. - * @throws IgniteCheckedException If failed. - */ - public IgfsHadoopStreamDelegate open(IgfsPath path) throws IgniteCheckedException, IOException; - - /** - * Command to open file for reading. - * - * @param path File path to open. - * @return Future for open operation. - * @throws IgniteCheckedException If failed. - */ - public IgfsHadoopStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) throws IgniteCheckedException, - IOException; - - /** - * Command to create file and open it for output. - * - * @param path Path to file. - * @param overwrite If {@code true} then old file contents will be lost. - * @param colocate If {@code true} and called on data node, file will be written on that node. - * @param replication Replication factor. - * @param props File properties for creation. - * @return Stream descriptor. - * @throws IgniteCheckedException If failed. - */ - public IgfsHadoopStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, - int replication, long blockSize, @Nullable Map props) throws IgniteCheckedException, IOException; - - /** - * Open file for output appending data to the end of a file. - * - * @param path Path to file. - * @param create If {@code true}, file will be created if does not exist. - * @param props File properties. - * @return Stream descriptor. - * @throws IgniteCheckedException If failed. - */ - public IgfsHadoopStreamDelegate append(IgfsPath path, boolean create, - @Nullable Map props) throws IgniteCheckedException, IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java deleted file mode 100644 index 03bf733..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopCommunicationException.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.*; - -/** - * Communication exception indicating a problem between file system and IGFS instance. - */ -public class IgfsHadoopCommunicationException extends IgniteCheckedException { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Creates new exception with given throwable as a nested cause and - * source of error message. - * - * @param cause Non-null throwable cause. - */ - public IgfsHadoopCommunicationException(Exception cause) { - super(cause); - } - - /** - * Creates a new exception with given error message and optional nested cause exception. - * - * @param msg Error message. - */ - public IgfsHadoopCommunicationException(String msg) { - super(msg); - } - - /** - * Creates a new exception with given error message and optional nested cause exception. - * - * @param msg Error message. - * @param cause Cause. - */ - public IgfsHadoopCommunicationException(String msg, Exception cause) { - super(msg, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java deleted file mode 100644 index 35638ea..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEndpoint.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; - -import static org.apache.ignite.configuration.IgfsConfiguration.*; - -/** - * IGFS endpoint abstraction. - */ -public class IgfsHadoopEndpoint { - /** Localhost. */ - public static final String LOCALHOST = "127.0.0.1"; - - /** IGFS name. */ - private final String igfsName; - - /** Grid name. */ - private final String gridName; - - /** Host. */ - private final String host; - - /** Port. */ - private final int port; - - /** - * Normalize IGFS URI. - * - * @param uri URI. - * @return Normalized URI. - * @throws IOException If failed. - */ - public static URI normalize(URI uri) throws IOException { - try { - if (!F.eq(IgniteFs.IGFS_SCHEME, uri.getScheme())) - throw new IOException("Failed to normalize UIR because it has non IGFS scheme: " + uri); - - IgfsHadoopEndpoint endpoint = new IgfsHadoopEndpoint(uri.getAuthority()); - - StringBuilder sb = new StringBuilder(); - - if (endpoint.igfs() != null) - sb.append(endpoint.igfs()); - - if (endpoint.grid() != null) - sb.append(":").append(endpoint.grid()); - - return new URI(uri.getScheme(), sb.length() != 0 ? sb.toString() : null, endpoint.host(), endpoint.port(), - uri.getPath(), uri.getQuery(), uri.getFragment()); - } - catch (URISyntaxException | IgniteCheckedException e) { - throw new IOException("Failed to normalize URI: " + uri, e); - } - } - - /** - * Constructor. - * - * @param connStr Connection string. - * @throws IgniteCheckedException If failed to parse connection string. - */ - public IgfsHadoopEndpoint(@Nullable String connStr) throws IgniteCheckedException { - if (connStr == null) - connStr = ""; - - String[] tokens = connStr.split("@", -1); - - IgniteBiTuple hostPort; - - if (tokens.length == 1) { - igfsName = null; - gridName = null; - - hostPort = hostPort(connStr, connStr); - } - else if (tokens.length == 2) { - String authStr = tokens[0]; - - if (authStr.isEmpty()) { - gridName = null; - igfsName = null; - } - else { - String[] authTokens = authStr.split(":", -1); - - igfsName = F.isEmpty(authTokens[0]) ? null : authTokens[0]; - - if (authTokens.length == 1) - gridName = null; - else if (authTokens.length == 2) - gridName = F.isEmpty(authTokens[1]) ? null : authTokens[1]; - else - throw new IgniteCheckedException("Invalid connection string format: " + connStr); - } - - hostPort = hostPort(connStr, tokens[1]); - } - else - throw new IgniteCheckedException("Invalid connection string format: " + connStr); - - host = hostPort.get1(); - - assert hostPort.get2() != null; - - port = hostPort.get2(); - } - - /** - * Parse host and port. - * - * @param connStr Full connection string. - * @param hostPortStr Host/port connection string part. - * @return Tuple with host and port. - * @throws IgniteCheckedException If failed to parse connection string. - */ - private IgniteBiTuple hostPort(String connStr, String hostPortStr) throws IgniteCheckedException { - String[] tokens = hostPortStr.split(":", -1); - - String host = tokens[0]; - - if (F.isEmpty(host)) - host = LOCALHOST; - - int port; - - if (tokens.length == 1) - port = DFLT_IPC_PORT; - else if (tokens.length == 2) { - String portStr = tokens[1]; - - try { - port = Integer.valueOf(portStr); - - if (port < 0 || port > 65535) - throw new IgniteCheckedException("Invalid port number: " + connStr); - } - catch (NumberFormatException e) { - throw new IgniteCheckedException("Invalid port number: " + connStr); - } - } - else - throw new IgniteCheckedException("Invalid connection string format: " + connStr); - - return F.t(host, port); - } - - /** - * @return IGFS name. - */ - @Nullable public String igfs() { - return igfsName; - } - - /** - * @return Grid name. - */ - @Nullable public String grid() { - return gridName; - } - - /** - * @return Host. - */ - public String host() { - return host; - } - - /** - * @return Host. - */ - public boolean isLocal() { - return F.eq(LOCALHOST, host); - } - - /** - * @return Port. - */ - public int port() { - return port; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsHadoopEndpoint.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java deleted file mode 100644 index da86e37..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopEx.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Extended IGFS server interface. - */ -public interface IgfsHadoopEx extends IgfsHadoop { - /** - * Adds event listener that will be invoked when connection with server is lost or remote error has occurred. - * If connection is closed already, callback will be invoked synchronously inside this method. - * - * @param delegate Stream delegate. - * @param lsnr Event listener. - */ - public void addEventListener(IgfsHadoopStreamDelegate delegate, IgfsHadoopStreamEventListener lsnr); - - /** - * Removes event listener that will be invoked when connection with server is lost or remote error has occurred. - * - * @param delegate Stream delegate. - */ - public void removeEventListener(IgfsHadoopStreamDelegate delegate); - - /** - * Asynchronously reads specified amount of bytes from opened input stream. - * - * @param delegate Stream delegate. - * @param pos Position to read from. - * @param len Data length to read. - * @param outBuf Optional output buffer. If buffer length is less then {@code len}, all remaining - * bytes will be read into new allocated buffer of length {len - outBuf.length} and this buffer will - * be the result of read future. - * @param outOff Output offset. - * @param outLen Output length. - * @return Read data. - */ - public GridPlainFuture readData(IgfsHadoopStreamDelegate delegate, long pos, int len, - @Nullable final byte[] outBuf, final int outOff, final int outLen); - - /** - * Writes data to the stream with given streamId. This method does not return any future since - * no response to write request is sent. - * - * @param delegate Stream delegate. - * @param data Data to write. - * @param off Offset. - * @param len Length. - * @throws IOException If failed. - */ - public void writeData(IgfsHadoopStreamDelegate delegate, byte[] data, int off, int len) throws IOException; - - /** - * Close server stream. - * - * @param delegate Stream delegate. - * @throws IOException If failed. - */ - public void closeStream(IgfsHadoopStreamDelegate delegate) throws IOException; - - /** - * Flush output stream. - * - * @param delegate Stream delegate. - * @throws IOException If failed. - */ - public void flush(IgfsHadoopStreamDelegate delegate) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java deleted file mode 100644 index c9d1322..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFSProperties.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.hadoop.fs.permission.*; -import org.apache.ignite.*; - -import java.util.*; - -import static org.apache.ignite.IgniteFs.*; - -/** - * Hadoop file system properties. - */ -public class IgfsHadoopFSProperties { - /** Username. */ - private String usrName; - - /** Group name. */ - private String grpName; - - /** Permissions. */ - private FsPermission perm; - - /** - * Constructor. - * - * @param props Properties. - * @throws IgniteException In case of error. - */ - public IgfsHadoopFSProperties(Map props) throws IgniteException { - usrName = props.get(PROP_USER_NAME); - grpName = props.get(PROP_GROUP_NAME); - - String permStr = props.get(PROP_PERMISSION); - - if (permStr != null) { - try { - perm = new FsPermission((short)Integer.parseInt(permStr, 8)); - } - catch (NumberFormatException ignore) { - throw new IgniteException("Permissions cannot be parsed: " + permStr); - } - } - } - - /** - * Get user name. - * - * @return User name. - */ - public String userName() { - return usrName; - } - - /** - * Get group name. - * - * @return Group name. - */ - public String groupName() { - return grpName; - } - - /** - * Get permission. - * - * @return Permission. - */ - public FsPermission permission() { - return perm; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java deleted file mode 100644 index 476641c..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopFuture.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -/** - * IGFS client future that holds response parse closure. - */ -public class IgfsHadoopFuture extends GridPlainFutureAdapter { - /** Output buffer. */ - private byte[] outBuf; - - /** Output offset. */ - private int outOff; - - /** Output length. */ - private int outLen; - - /** Read future flag. */ - private boolean read; - - /** - * @return Output buffer. - */ - public byte[] outputBuffer() { - return outBuf; - } - - /** - * @param outBuf Output buffer. - */ - public void outputBuffer(@Nullable byte[] outBuf) { - this.outBuf = outBuf; - } - - /** - * @return Offset in output buffer to write from. - */ - public int outputOffset() { - return outOff; - } - - /** - * @param outOff Offset in output buffer to write from. - */ - public void outputOffset(int outOff) { - this.outOff = outOff; - } - - /** - * @return Length to write to output buffer. - */ - public int outputLength() { - return outLen; - } - - /** - * @param outLen Length to write to output buffer. - */ - public void outputLength(int outLen) { - this.outLen = outLen; - } - - /** - * @param read {@code True} if this is a read future. - */ - public void read(boolean read) { - this.read = read; - } - - /** - * @return {@code True} if this is a read future. - */ - public boolean read() { - return read; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java deleted file mode 100644 index 8245125..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInProc.java +++ /dev/null @@ -1,409 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.commons.logging.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Communication with grid in the same process. - */ -public class IgfsHadoopInProc implements IgfsHadoopEx { - /** Target IGFS. */ - private final IgfsEx igfs; - - /** Buffer size. */ - private final int bufSize; - - /** Event listeners. */ - private final Map lsnrs = - new ConcurrentHashMap<>(); - - /** Logger. */ - private final Log log; - - /** - * Constructor. - * - * @param igfs Target IGFS. - * @param log Log. - */ - public IgfsHadoopInProc(IgfsEx igfs, Log log) { - this.igfs = igfs; - this.log = log; - - bufSize = igfs.configuration().getBlockSize() * 2; - } - - /** {@inheritDoc} */ - @Override public IgfsHandshakeResponse handshake(String logDir) { - igfs.clientLogDirectory(logDir); - - return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(), - igfs.globalSampling()); - } - - /** {@inheritDoc} */ - @Override public void close(boolean force) { - // Perform cleanup. - for (IgfsHadoopStreamEventListener lsnr : lsnrs.values()) { - try { - lsnr.onClose(); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to notify stream event listener", e); - } - } - } - - /** {@inheritDoc} */ - @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException { - try { - return igfs.info(path); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to get file info because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsFile update(IgfsPath path, Map props) throws IgniteCheckedException { - try { - return igfs.update(path, props); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to update file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { - try { - igfs.setTimes(path, accessTime, modificationTime); - - return true; - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to set path times because Grid is stopping: " + - path); - } - } - - /** {@inheritDoc} */ - @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException { - try { - igfs.rename(src, dest); - - return true; - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to rename path because Grid is stopping: " + src); - } - } - - /** {@inheritDoc} */ - @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException { - try { - return igfs.delete(path, recursive); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to delete path because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsStatus fsStatus() throws IgniteCheckedException { - try { - return igfs.globalSpace(); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to get file system status because Grid is " + - "stopping."); - } - } - - /** {@inheritDoc} */ - @Override public Collection listPaths(IgfsPath path) throws IgniteCheckedException { - try { - return igfs.listPaths(path); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to list paths because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Collection listFiles(IgfsPath path) throws IgniteCheckedException { - try { - return igfs.listFiles(path); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to list files because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Boolean mkdirs(IgfsPath path, Map props) throws IgniteCheckedException { - try { - igfs.mkdirs(path, props); - - return true; - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to create directory because Grid is stopping: " + - path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException { - try { - return igfs.summary(path); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to get content summary because Grid is stopping: " + - path); - } - } - - /** {@inheritDoc} */ - @Override public Collection affinity(IgfsPath path, long start, long len) - throws IgniteCheckedException { - try { - return igfs.affinity(path, start, len); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to get affinity because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate open(IgfsPath path) throws IgniteCheckedException { - try { - IgfsInputStreamAdapter stream = igfs.open(path, bufSize); - - return new IgfsHadoopStreamDelegate(this, stream, stream.fileInfo().length()); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to open file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) - throws IgniteCheckedException { - try { - IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch); - - return new IgfsHadoopStreamDelegate(this, stream, stream.fileInfo().length()); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to open file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, - int replication, long blockSize, @Nullable Map props) throws IgniteCheckedException { - try { - IgfsOutputStream stream = igfs.create(path, bufSize, overwrite, - colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props); - - return new IgfsHadoopStreamDelegate(this, stream); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to create file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsHadoopStreamDelegate append(IgfsPath path, boolean create, - @Nullable Map props) throws IgniteCheckedException { - try { - IgfsOutputStream stream = igfs.append(path, bufSize, create, props); - - return new IgfsHadoopStreamDelegate(this, stream); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - catch (IllegalStateException e) { - throw new IgfsHadoopCommunicationException("Failed to append file because Grid is stopping: " + path); - } - } - - /** {@inheritDoc} */ - @Override public GridPlainFuture readData(IgfsHadoopStreamDelegate delegate, long pos, int len, - @Nullable byte[] outBuf, int outOff, int outLen) { - IgfsInputStreamAdapter stream = delegate.target(); - - try { - byte[] res = null; - - if (outBuf != null) { - int outTailLen = outBuf.length - outOff; - - if (len <= outTailLen) - stream.readFully(pos, outBuf, outOff, len); - else { - stream.readFully(pos, outBuf, outOff, outTailLen); - - int remainderLen = len - outTailLen; - - res = new byte[remainderLen]; - - stream.readFully(pos, res, 0, remainderLen); - } - } else { - res = new byte[len]; - - stream.readFully(pos, res, 0, len); - } - - return new GridPlainFutureAdapter<>(res); - } - catch (IllegalStateException | IOException e) { - IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate); - - if (lsnr != null) - lsnr.onError(e.getMessage()); - - return new GridPlainFutureAdapter<>(e); - } - } - - /** {@inheritDoc} */ - @Override public void writeData(IgfsHadoopStreamDelegate delegate, byte[] data, int off, int len) - throws IOException { - try { - IgfsOutputStream stream = delegate.target(); - - stream.write(data, off, len); - } - catch (IllegalStateException | IOException e) { - IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate); - - if (lsnr != null) - lsnr.onError(e.getMessage()); - - if (e instanceof IllegalStateException) - throw new IOException("Failed to write data to IGFS stream because Grid is stopping.", e); - else - throw e; - } - } - - /** {@inheritDoc} */ - @Override public void flush(IgfsHadoopStreamDelegate delegate) throws IOException { - try { - IgfsOutputStream stream = delegate.target(); - - stream.flush(); - } - catch (IllegalStateException | IOException e) { - IgfsHadoopStreamEventListener lsnr = lsnrs.get(delegate); - - if (lsnr != null) - lsnr.onError(e.getMessage()); - - if (e instanceof IllegalStateException) - throw new IOException("Failed to flush data to IGFS stream because Grid is stopping.", e); - else - throw e; - } - } - - /** {@inheritDoc} */ - @Override public void closeStream(IgfsHadoopStreamDelegate desc) throws IOException { - Closeable closeable = desc.target(); - - try { - closeable.close(); - } - catch (IllegalStateException e) { - throw new IOException("Failed to close IGFS stream because Grid is stopping.", e); - } - } - - /** {@inheritDoc} */ - @Override public void addEventListener(IgfsHadoopStreamDelegate delegate, - IgfsHadoopStreamEventListener lsnr) { - IgfsHadoopStreamEventListener lsnr0 = lsnrs.put(delegate, lsnr); - - assert lsnr0 == null || lsnr0 == lsnr; - - if (log.isDebugEnabled()) - log.debug("Added stream event listener [delegate=" + delegate + ']'); - } - - /** {@inheritDoc} */ - @Override public void removeEventListener(IgfsHadoopStreamDelegate delegate) { - IgfsHadoopStreamEventListener lsnr0 = lsnrs.remove(delegate); - - if (lsnr0 != null && log.isDebugEnabled()) - log.debug("Removed stream event listener [delegate=" + delegate + ']'); - } -}