Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 311E51779B for ; Wed, 4 Mar 2015 15:38:18 +0000 (UTC) Received: (qmail 56496 invoked by uid 500); 4 Mar 2015 15:38:02 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 56463 invoked by uid 500); 4 Mar 2015 15:38:02 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 56453 invoked by uid 99); 4 Mar 2015 15:38:02 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Mar 2015 15:38:02 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 04 Mar 2015 15:35:15 +0000 Received: (qmail 47263 invoked by uid 99); 4 Mar 2015 15:35:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Mar 2015 15:35:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E9B58E1034; Wed, 4 Mar 2015 15:35:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 04 Mar 2015 15:35:39 -0000 Message-Id: <00141465a0394b4b9bdbb0c40132b2bf@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [37/45] incubator-ignite git commit: IGNITE-386: Squashed changes. X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java new file mode 100644 index 0000000..9a6b4d8 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.mapreduce; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.igfs.*; +import org.apache.ignite.internal.processors.hadoop.planner.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.resources.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static org.apache.ignite.IgniteFileSystem.*; + +/** + * Default map-reduce planner implementation. + */ +public class IgniteHadoopMapReducePlanner implements HadoopMapReducePlanner { + /** Injected grid. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Logger. */ + @SuppressWarnings("UnusedDeclaration") + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection top, + @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException { + // Convert collection of topology nodes to collection of topology node IDs. + Collection topIds = new HashSet<>(top.size(), 1.0f); + + for (ClusterNode topNode : top) + topIds.add(topNode.id()); + + Map> mappers = mappers(top, topIds, job.input()); + + int rdcCnt = job.info().reducers(); + + if (rdcCnt < 0) + throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + rdcCnt); + + Map reducers = reducers(top, mappers, rdcCnt); + + return new HadoopDefaultMapReducePlan(mappers, reducers); + } + + /** + * Create plan for mappers. + * + * @param top Topology nodes. + * @param topIds Topology node IDs. + * @param splits Splits. + * @return Mappers map. + * @throws IgniteCheckedException If failed. + */ + private Map> mappers(Collection top, Collection topIds, + Iterable splits) throws IgniteCheckedException { + Map> mappers = new HashMap<>(); + + Map> nodes = hosts(top); + + Map nodeLoads = new HashMap<>(top.size(), 1.0f); // Track node load. + + for (UUID nodeId : topIds) + nodeLoads.put(nodeId, 0); + + for (HadoopInputSplit split : splits) { + UUID nodeId = nodeForSplit(split, topIds, nodes, nodeLoads); + + if (log.isDebugEnabled()) + log.debug("Mapped split to node [split=" + split + ", nodeId=" + nodeId + ']'); + + Collection nodeSplits = mappers.get(nodeId); + + if (nodeSplits == null) { + nodeSplits = new ArrayList<>(); + + mappers.put(nodeId, nodeSplits); + } + + nodeSplits.add(split); + + // Updated node load. + nodeLoads.put(nodeId, nodeLoads.get(nodeId) + 1); + } + + return mappers; + } + + /** + * Groups nodes by host names. + * + * @param top Topology to group. + * @return Map. + */ + private static Map> hosts(Collection top) { + Map> grouped = U.newHashMap(top.size()); + + for (ClusterNode node : top) { + for (String host : node.hostNames()) { + Collection nodeIds = grouped.get(host); + + if (nodeIds == null) { + // Expecting 1-2 nodes per host. + nodeIds = new ArrayList<>(2); + + grouped.put(host, nodeIds); + } + + nodeIds.add(node.id()); + } + } + + return grouped; + } + + /** + * Determine the best node for this split. + * + * @param split Split. + * @param topIds Topology node IDs. + * @param nodes Nodes. + * @param nodeLoads Node load tracker. + * @return Node ID. + */ + @SuppressWarnings("unchecked") + private UUID nodeForSplit(HadoopInputSplit split, Collection topIds, Map> nodes, + Map nodeLoads) throws IgniteCheckedException { + if (split instanceof HadoopFileBlock) { + HadoopFileBlock split0 = (HadoopFileBlock)split; + + if (IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) { + HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(split0.file().getAuthority()); + + IgfsEx igfs = null; + + if (F.eq(ignite.name(), endpoint.grid())) + igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs()); + + if (igfs != null && !igfs.isProxy(split0.file())) { + Collection blocks; + + try { + blocks = igfs.affinity(new IgfsPath(split0.file()), split0.start(), split0.length()); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + + assert blocks != null; + + if (blocks.size() == 1) + // Fast-path, split consists of one IGFS block (as in most cases). + return bestNode(blocks.iterator().next().nodeIds(), topIds, nodeLoads, false); + else { + // Slow-path, file consists of multiple IGFS blocks. First, find the most co-located nodes. + Map nodeMap = new HashMap<>(); + + List bestNodeIds = null; + long bestLen = -1L; + + for (IgfsBlockLocation block : blocks) { + for (UUID blockNodeId : block.nodeIds()) { + if (topIds.contains(blockNodeId)) { + Long oldLen = nodeMap.get(blockNodeId); + long newLen = oldLen == null ? block.length() : oldLen + block.length(); + + nodeMap.put(blockNodeId, newLen); + + if (bestNodeIds == null || bestLen < newLen) { + bestNodeIds = new ArrayList<>(1); + + bestNodeIds.add(blockNodeId); + + bestLen = newLen; + } + else if (bestLen == newLen) { + assert !F.isEmpty(bestNodeIds); + + bestNodeIds.add(blockNodeId); + } + } + } + } + + if (bestNodeIds != null) { + return bestNodeIds.size() == 1 ? bestNodeIds.get(0) : + bestNode(bestNodeIds, topIds, nodeLoads, true); + } + } + } + } + } + + // Cannot use local IGFS for some reason, try selecting the node by host. + Collection blockNodes = null; + + for (String host : split.hosts()) { + Collection hostNodes = nodes.get(host); + + if (!F.isEmpty(hostNodes)) { + if (blockNodes == null) + blockNodes = new ArrayList<>(hostNodes); + else + blockNodes.addAll(hostNodes); + } + } + + return bestNode(blockNodes, topIds, nodeLoads, false); + } + + /** + * Finds the best (the least loaded) node among the candidates. + * + * @param candidates Candidates. + * @param topIds Topology node IDs. + * @param nodeLoads Known node loads. + * @param skipTopCheck Whether to skip topology check. + * @return The best node. + */ + private UUID bestNode(@Nullable Collection candidates, Collection topIds, Map nodeLoads, + boolean skipTopCheck) { + UUID bestNode = null; + int bestLoad = Integer.MAX_VALUE; + + if (candidates != null) { + for (UUID candidate : candidates) { + if (skipTopCheck || topIds.contains(candidate)) { + int load = nodeLoads.get(candidate); + + if (bestNode == null || bestLoad > load) { + bestNode = candidate; + bestLoad = load; + + if (bestLoad == 0) + break; // Minimum load possible, no need for further iterations. + } + } + } + } + + if (bestNode == null) { + // Blocks are located on nodes which are not Hadoop-enabled, assign to the least loaded one. + bestLoad = Integer.MAX_VALUE; + + for (UUID nodeId : topIds) { + int load = nodeLoads.get(nodeId); + + if (bestNode == null || bestLoad > load) { + bestNode = nodeId; + bestLoad = load; + + if (bestLoad == 0) + break; // Minimum load possible, no need for further iterations. + } + } + } + + assert bestNode != null; + + return bestNode; + } + + /** + * Create plan for reducers. + * + * @param top Topology. + * @param mappers Mappers map. + * @param reducerCnt Reducers count. + * @return Reducers map. + */ + private Map reducers(Collection top, + Map> mappers, int reducerCnt) { + // Determine initial node weights. + int totalWeight = 0; + + List nodes = new ArrayList<>(top.size()); + + for (ClusterNode node : top) { + Collection split = mappers.get(node.id()); + + int weight = reducerNodeWeight(node, split != null ? split.size() : 0); + + nodes.add(new WeightedNode(node.id(), weight, weight)); + + totalWeight += weight; + } + + // Adjust weights. + int totalAdjustedWeight = 0; + + for (WeightedNode node : nodes) { + node.floatWeight = ((float)node.weight * reducerCnt) / totalWeight; + + node.weight = Math.round(node.floatWeight); + + totalAdjustedWeight += node.weight; + } + + // Apply redundant/lost reducers. + Collections.sort(nodes); + + if (totalAdjustedWeight > reducerCnt) { + // Too much reducers set. + ListIterator iter = nodes.listIterator(nodes.size() - 1); + + while (totalAdjustedWeight != reducerCnt) { + if (!iter.hasPrevious()) + iter = nodes.listIterator(nodes.size() - 1); + + WeightedNode node = iter.previous(); + + if (node.weight > 0) { + node.weight -= 1; + + totalAdjustedWeight--; + } + } + } + else if (totalAdjustedWeight < reducerCnt) { + // Not enough reducers set. + ListIterator iter = nodes.listIterator(0); + + while (totalAdjustedWeight != reducerCnt) { + if (!iter.hasNext()) + iter = nodes.listIterator(0); + + WeightedNode node = iter.next(); + + if (node.floatWeight > 0.0f) { + node.weight += 1; + + totalAdjustedWeight++; + } + } + } + + int idx = 0; + + Map reducers = new HashMap<>(nodes.size(), 1.0f); + + for (WeightedNode node : nodes) { + if (node.weight > 0) { + int[] arr = new int[node.weight]; + + for (int i = 0; i < arr.length; i++) + arr[i] = idx++; + + reducers.put(node.nodeId, arr); + } + } + + return reducers; + } + + /** + * Calculate node weight based on node metrics and data co-location. + * + * @param node Node. + * @param splitCnt Splits mapped to this node. + * @return Node weight. + */ + @SuppressWarnings("UnusedParameters") + protected int reducerNodeWeight(ClusterNode node, int splitCnt) { + return splitCnt; + } + + /** + * Weighted node. + */ + private static class WeightedNode implements Comparable { + /** Node ID. */ + private final UUID nodeId; + + /** Weight. */ + private int weight; + + /** Floating point weight. */ + private float floatWeight; + + /** + * Constructor. + * + * @param nodeId Node ID. + * @param weight Weight. + * @param floatWeight Floating point weight. + */ + private WeightedNode(UUID nodeId, int weight, float floatWeight) { + this.nodeId = nodeId; + this.weight = weight; + this.floatWeight = floatWeight; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj != null && obj instanceof WeightedNode && F.eq(nodeId, ((WeightedNode)obj).nodeId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return nodeId.hashCode(); + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull WeightedNode other) { + float res = other.floatWeight - floatWeight; + + return res > 0.0f ? 1 : res < 0.0f ? -1 : nodeId.compareTo(other.nodeId); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package.html ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package.html b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package.html new file mode 100644 index 0000000..e289841 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/package.html @@ -0,0 +1,24 @@ + + + + + + + Ignite Hadoop Accelerator map-reduce classes. + + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/hadoop/package.html ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/package.html b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/package.html new file mode 100644 index 0000000..d687e32 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/package.html @@ -0,0 +1,24 @@ + + + + + + + Ignite Hadoop Accelerator API. + + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java deleted file mode 100644 index bdab61d..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopFileSystemWrapper.java +++ /dev/null @@ -1,414 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs.hadoop; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.permission.*; -import org.apache.hadoop.ipc.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.igfs.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jetbrains.annotations.*; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -/** - * Adapter to use any Hadoop file system {@link org.apache.hadoop.fs.FileSystem} as {@link org.apache.ignite.igfs.Igfs}. - */ -public class IgfsHadoopFileSystemWrapper implements Igfs, AutoCloseable { - - /** Hadoop file system. */ - private final FileSystem fileSys; - - /** Properties of file system */ - private final Map props = new HashMap<>(); - - /** - * Simple constructor that is to be used by default. - * - * @param uri URI of file system. - * @throws IgniteCheckedException In case of error. - */ - public IgfsHadoopFileSystemWrapper(String uri) throws IgniteCheckedException { - this(uri, null); - } - - /** - * Constructor. - * - * @param uri URI of file system. - * @param cfgPath Additional path to Hadoop configuration. - * @throws IgniteCheckedException In case of error. - */ - public IgfsHadoopFileSystemWrapper(@Nullable String uri, @Nullable String cfgPath) throws IgniteCheckedException { - try { - SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(uri, cfgPath); - - fileSys = secProvider.createFileSystem(); - - uri = secProvider.uri().toString(); - - if (!uri.endsWith("/")) - uri += "/"; - - props.put(SECONDARY_FS_CONFIG_PATH, cfgPath); - props.put(SECONDARY_FS_URI, uri); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** - * Convert IGFS path into Hadoop path. - * - * @param path IGFS path. - * @return Hadoop path. - */ - private Path convert(IgfsPath path) { - URI uri = fileSys.getUri(); - - return new Path(uri.getScheme(), uri.getAuthority(), path.toString()); - } - - /** - * Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception. - * - * @param e Exception to check. - * @param detailMsg Detailed error message. - * @return Appropriate exception. - */ - private IgfsException handleSecondaryFsError(IOException e, String detailMsg) { - boolean wrongVer = X.hasCause(e, RemoteException.class) || - (e.getMessage() != null && e.getMessage().contains("Failed on local")); - - IgfsException igfsErr = !wrongVer ? cast(detailMsg, e) : - new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " + - "version.", e); - - return igfsErr; - } - - /** - * Cast IO exception to IGFS exception. - * - * @param e IO exception. - * @return IGFS exception. - */ - public static IgfsException cast(String msg, IOException e) { - if (e instanceof FileNotFoundException) - return new IgfsFileNotFoundException(e); - else if (e instanceof ParentNotDirectoryException) - return new IgfsParentNotDirectoryException(msg, e); - else if (e instanceof PathIsNotEmptyDirectoryException) - return new IgfsDirectoryNotEmptyException(e); - else if (e instanceof PathExistsException) - return new IgfsPathAlreadyExistsException(msg, e); - else - return new IgfsException(msg, e); - } - - /** - * Convert Hadoop FileStatus properties to map. - * - * @param status File status. - * @return IGFS attributes. - */ - private static Map properties(FileStatus status) { - FsPermission perm = status.getPermission(); - - if (perm == null) - perm = FsPermission.getDefault(); - - return F.asMap(PROP_PERMISSION, String.format("%04o", perm.toShort()), PROP_USER_NAME, status.getOwner(), - PROP_GROUP_NAME, status.getGroup()); - } - - /** {@inheritDoc} */ - @Override public boolean exists(IgfsPath path) { - try { - return fileSys.exists(convert(path)); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]"); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public IgfsFile update(IgfsPath path, Map props) { - IgfsHadoopFSProperties props0 = new IgfsHadoopFSProperties(props); - - try { - if (props0.userName() != null || props0.groupName() != null) - fileSys.setOwner(convert(path), props0.userName(), props0.groupName()); - - if (props0.permission() != null) - fileSys.setPermission(convert(path), props0.permission()); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]"); - } - - //Result is not used in case of secondary FS. - return null; - } - - /** {@inheritDoc} */ - @Override public void rename(IgfsPath src, IgfsPath dest) { - // Delegate to the secondary file system. - try { - if (!fileSys.rename(convert(src), convert(dest))) - throw new IgfsException("Failed to rename (secondary file system returned false) " + - "[src=" + src + ", dest=" + dest + ']'); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to rename file [src=" + src + ", dest=" + dest + ']'); - } - } - - /** {@inheritDoc} */ - @Override public boolean delete(IgfsPath path, boolean recursive) { - try { - return fileSys.delete(convert(path), recursive); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]"); - } - } - - /** {@inheritDoc} */ - @Override public void mkdirs(IgfsPath path) { - try { - if (!fileSys.mkdirs(convert(path))) - throw new IgniteException("Failed to make directories [path=" + path + "]"); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + "]"); - } - } - - /** {@inheritDoc} */ - @Override public void mkdirs(IgfsPath path, @Nullable Map props) { - try { - if (!fileSys.mkdirs(convert(path), new IgfsHadoopFSProperties(props).permission())) - throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]"); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to make directories [path=" + path + ", props=" + props + "]"); - } - } - - /** {@inheritDoc} */ - @Override public Collection listPaths(IgfsPath path) { - try { - FileStatus[] statuses = fileSys.listStatus(convert(path)); - - if (statuses == null) - throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); - - Collection res = new ArrayList<>(statuses.length); - - for (FileStatus status : statuses) - res.add(new IgfsPath(path, status.getPath().getName())); - - return res; - } - catch (FileNotFoundException ignored) { - throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); - } - } - - /** {@inheritDoc} */ - @Override public Collection listFiles(IgfsPath path) { - try { - FileStatus[] statuses = fileSys.listStatus(convert(path)); - - if (statuses == null) - throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); - - Collection res = new ArrayList<>(statuses.length); - - for (FileStatus status : statuses) { - IgfsFileInfo fsInfo = status.isDirectory() ? new IgfsFileInfo(true, properties(status)) : - new IgfsFileInfo((int)status.getBlockSize(), status.getLen(), null, null, false, - properties(status)); - - res.add(new IgfsFileImpl(new IgfsPath(path, status.getPath().getName()), fsInfo, 1)); - } - - return res; - } - catch (FileNotFoundException ignored) { - throw new IgfsFileNotFoundException("Failed to list files (path not found): " + path); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to list statuses due to secondary file system exception: " + path); - } - } - - /** {@inheritDoc} */ - @Override public IgfsReader open(IgfsPath path, int bufSize) { - return new IgfsHadoopReader(fileSys, convert(path), bufSize); - } - - /** {@inheritDoc} */ - @Override public OutputStream create(IgfsPath path, boolean overwrite) { - try { - return fileSys.create(convert(path), overwrite); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]"); - } - } - - /** {@inheritDoc} */ - @Override public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, - long blockSize, @Nullable Map props) { - IgfsHadoopFSProperties props0 = - new IgfsHadoopFSProperties(props != null ? props : Collections.emptyMap()); - - try { - return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize, - null); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props + - ", overwrite=" + overwrite + ", bufSize=" + bufSize + ", replication=" + replication + - ", blockSize=" + blockSize + "]"); - } - } - - /** {@inheritDoc} */ - @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, - @Nullable Map props) { - try { - return fileSys.append(convert(path), bufSize); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]"); - } - } - - /** {@inheritDoc} */ - @Override public IgfsFile info(final IgfsPath path) { - try { - final FileStatus status = fileSys.getFileStatus(convert(path)); - - if (status == null) - return null; - - final Map props = properties(status); - - return new IgfsFile() { - @Override public IgfsPath path() { - return path; - } - - @Override public boolean isFile() { - return status.isFile(); - } - - @Override public boolean isDirectory() { - return status.isDirectory(); - } - - @Override public int blockSize() { - return (int)status.getBlockSize(); - } - - @Override public long groupBlockSize() { - return status.getBlockSize(); - } - - @Override public long accessTime() { - return status.getAccessTime(); - } - - @Override public long modificationTime() { - return status.getModificationTime(); - } - - @Override public String property(String name) throws IllegalArgumentException { - String val = props.get(name); - - if (val == null) - throw new IllegalArgumentException("File property not found [path=" + path + ", name=" + name + ']'); - - return val; - } - - @Nullable @Override public String property(String name, @Nullable String dfltVal) { - String val = props.get(name); - - return val == null ? dfltVal : val; - } - - @Override public long length() { - return status.getLen(); - } - - /** {@inheritDoc} */ - @Override public Map properties() { - return props; - } - }; - - } - catch (FileNotFoundException ignore) { - return null; - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to get file status [path=" + path + "]"); - } - } - - /** {@inheritDoc} */ - @Override public long usedSpaceSize() { - try { - return fileSys.getContentSummary(new Path(fileSys.getUri())).getSpaceConsumed(); - } - catch (IOException e) { - throw handleSecondaryFsError(e, "Failed to get used space size of file system."); - } - } - - /** {@inheritDoc} */ - @Nullable @Override public Map properties() { - return props; - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - try { - fileSys.close(); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopParameters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopParameters.java b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopParameters.java deleted file mode 100644 index 6237dd4..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/IgfsHadoopParameters.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs.hadoop; - -/** - * This class lists parameters that can be specified in Hadoop configuration. - * Hadoop configuration can be specified in {@code core-site.xml} file - * or passed to map-reduce task directly when using Hadoop driver for IGFS file system: - *
    - *
  • - * {@code fs.igfs.[name].open.sequential_reads_before_prefetch} - this parameter overrides - * the one specified in {@link org.apache.ignite.configuration.IgfsConfiguration#getSequentialReadsBeforePrefetch()} - * IGFS data node configuration property. - *
  • - *
  • - * {@code fs.igfs.[name].log.enabled} - specifies whether IGFS sampling logger is enabled. If - * {@code true}, then all file system operations will be logged to a file. - *
  • - *
  • {@code fs.igfs.[name].log.dir} - specifies log directory where sampling log files should be placed.
  • - *
  • - * {@code fs.igfs.[name].log.batch_size} - specifies how many log entries are accumulated in a batch before - * it gets flushed to log file. Higher values will imply greater performance, but will increase delay - * before record appears in the log file. - *
  • - *
  • - * {@code fs.igfs.[name].colocated.writes} - specifies whether written files should be colocated on data - * node to which client is connected. If {@code true}, file will not be distributed and will be written - * to a single data node. Default value is {@code true}. - *
  • - *
  • - * {@code fs.igfs.prefer.local.writes} - specifies whether file preferably should be written to - * local data node if it has enough free space. After some time it can be redistributed across nodes though. - *
  • - *
- * Where {@code [name]} is file system endpoint which you specify in file system URI authority part. E.g. in - * case your file system URI is {@code igfs://127.0.0.1:10500} then {@code name} will be {@code 127.0.0.1:10500}. - *

- * Sample configuration that can be placed to {@code core-site.xml} file: - *

- *     <property>
- *         <name>fs.igfs.127.0.0.1:10500.log.enabled</name>
- *         <value>true</value>
- *     </property>
- *     <property>
- *         <name>fs.igfs.127.0.0.1:10500.log.dir</name>
- *         <value>/home/apache/ignite/log/sampling</value>
- *     </property>
- *     <property>
- *         <name>fs.igfs.127.0.0.1:10500.log.batch_size</name>
- *         <value>16</value>
- *     </property>
- * 
- * Parameters could also be specified per mapreduce job, e.g. - *
- * hadoop jar myjarfile.jar MyMapReduceJob -Dfs.igfs.open.sequential_reads_before_prefetch=4
- * 
- * If you want to use these parameters in code, then you have to substitute you file system name in it. The easiest - * way to do that is {@code String.format(PARAM_IGFS_COLOCATED_WRITES, [name])}. - */ -public class IgfsHadoopParameters { - /** Parameter name for control over file colocation write mode. */ - public static final String PARAM_IGFS_COLOCATED_WRITES = "fs.igfs.%s.colocated.writes"; - - /** Parameter name for custom sequential reads before prefetch value. */ - public static final String PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH = - "fs.igfs.%s.open.sequential_reads_before_prefetch"; - - /** Parameter name for client logger directory. */ - public static final String PARAM_IGFS_LOG_DIR = "fs.igfs.%s.log.dir"; - - /** Parameter name for log batch size. */ - public static final String PARAM_IGFS_LOG_BATCH_SIZE = "fs.igfs.%s.log.batch_size"; - - /** Parameter name for log enabled flag. */ - public static final String PARAM_IGFS_LOG_ENABLED = "fs.igfs.%s.log.enabled"; - - /** Parameter name for prefer local writes flag. */ - public static final String PARAM_IGFS_PREFER_LOCAL_WRITES = "fs.igfs.prefer.local.writes"; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/package.html ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/package.html b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/package.html deleted file mode 100644 index 137055b..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/package.html +++ /dev/null @@ -1,24 +0,0 @@ - - - - - - - Contains common files for Hadoop 1.x and Hadoop 2.x distros. - - http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java deleted file mode 100644 index c4d2f5e..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/IgfsHadoopFileSystem.java +++ /dev/null @@ -1,1234 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.igfs.hadoop.v1; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.permission.*; -import org.apache.hadoop.hdfs.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.util.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.igfs.common.*; -import org.apache.ignite.internal.igfs.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.IgniteFs.*; -import static org.apache.ignite.configuration.IgfsConfiguration.*; -import static org.apache.ignite.igfs.IgfsMode.*; -import static org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.*; -import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*; -import static org.apache.ignite.internal.processors.igfs.IgfsEx.*; - -/** - * {@code IGFS} Hadoop 1.x file system driver over file system API. To use - * {@code IGFS} as Hadoop file system, you should configure this class - * in Hadoop's {@code core-site.xml} as follows: - *
- *  <property>
- *      <name>fs.default.name</name>
- *      <value>igfs://ipc</value>
- *  </property>
- *
- *  <property>
- *      <name>fs.igfs.impl</name>
- *      <value>org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem</value>
- *  </property>
- * 
- * You should also add Ignite JAR and all libraries to Hadoop classpath. To - * do this, add following lines to {@code conf/hadoop-env.sh} script in Hadoop - * distribution: - *
- * export IGNITE_HOME=/path/to/Ignite/distribution
- * export HADOOP_CLASSPATH=$IGNITE_HOME/ignite*.jar
- *
- * for f in $IGNITE_HOME/libs/*.jar; do
- *  export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f;
- * done
- * 
- *

Data vs Clients Nodes

- * Hadoop needs to use its FileSystem remotely from client nodes as well as directly on - * data nodes. Client nodes are responsible for basic file system operations as well as - * accessing data nodes remotely. Usually, client nodes are started together - * with {@code job-submitter} or {@code job-scheduler} processes, while data nodes are usually - * started together with Hadoop {@code task-tracker} processes. - *

- * For sample client and data node configuration refer to {@code config/hadoop/default-config-client.xml} - * and {@code config/hadoop/default-config.xml} configuration files in Ignite installation. - */ -public class IgfsHadoopFileSystem extends FileSystem { - /** Internal property to indicate management connection. */ - public static final String IGFS_MANAGEMENT = "fs.igfs.management.connection"; - - /** Empty array of file block locations. */ - private static final BlockLocation[] EMPTY_BLOCK_LOCATIONS = new BlockLocation[0]; - - /** Empty array of file statuses. */ - public static final FileStatus[] EMPTY_FILE_STATUS = new FileStatus[0]; - - /** Ensures that close routine is invoked at most once. */ - private final AtomicBoolean closeGuard = new AtomicBoolean(); - - /** Grid remote client. */ - private IgfsHadoopWrapper rmtClient; - - /** User name for each thread. */ - private final ThreadLocal userName = new ThreadLocal(){ - /** {@inheritDoc} */ - @Override protected String initialValue() { - return DFLT_USER_NAME; - } - }; - - /** Working directory for each thread. */ - private final ThreadLocal workingDir = new ThreadLocal(){ - /** {@inheritDoc} */ - @Override protected Path initialValue() { - return getHomeDirectory(); - } - }; - - /** Default replication factor. */ - private short dfltReplication; - - /** Base file system uri. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private URI uri; - - /** Authority. */ - private String uriAuthority; - - /** Client logger. */ - private IgfsLogger clientLog; - - /** Secondary URI string. */ - private URI secondaryUri; - - /** IGFS mode resolver. */ - private IgfsModeResolver modeRslvr; - - /** Secondary file system instance. */ - private FileSystem secondaryFs; - - /** Management connection flag. */ - private boolean mgmt; - - /** Whether custom sequential reads before prefetch value is provided. */ - private boolean seqReadsBeforePrefetchOverride; - - /** IGFS group block size. */ - private long igfsGrpBlockSize; - - /** Flag that controls whether file writes should be colocated. */ - private boolean colocateFileWrites; - - /** Prefer local writes. */ - private boolean preferLocFileWrites; - - /** Custom-provided sequential reads before prefetch. */ - private int seqReadsBeforePrefetch; - - /** The cache was disabled when the instance was creating. */ - private boolean cacheEnabled; - - /** {@inheritDoc} */ - @Override public URI getUri() { - if (uri == null) - throw new IllegalStateException("URI is null (was IgfsHadoopFileSystem properly initialized?)."); - - return uri; - } - - /** - * Enter busy state. - * - * @throws IOException If file system is stopped. - */ - private void enterBusy() throws IOException { - if (closeGuard.get()) - throw new IOException("File system is stopped."); - } - - /** - * Leave busy state. - */ - private void leaveBusy() { - // No-op. - } - - /** - * Public setter that can be used by direct users of FS or Visor. - * - * @param colocateFileWrites Whether all ongoing file writes should be colocated. - */ - @SuppressWarnings("UnusedDeclaration") - public void colocateFileWrites(boolean colocateFileWrites) { - this.colocateFileWrites = colocateFileWrites; - } - - /** {@inheritDoc} */ - @Override public void initialize(URI name, Configuration cfg) throws IOException { - enterBusy(); - - try { - if (rmtClient != null) - throw new IOException("File system is already initialized: " + rmtClient); - - A.notNull(name, "name"); - A.notNull(cfg, "cfg"); - - super.initialize(name, cfg); - - setConf(cfg); - - String disableCacheName = String.format("fs.%s.impl.disable.cache", name.getScheme()); - - cacheEnabled = !cfg.getBoolean(disableCacheName, false); - - mgmt = cfg.getBoolean(IGFS_MANAGEMENT, false); - - if (!IGFS_SCHEME.equals(name.getScheme())) - throw new IOException("Illegal file system URI [expected=" + IGFS_SCHEME + - "://[name]/[optional_path], actual=" + name + ']'); - - uri = name; - - uriAuthority = uri.getAuthority(); - - setUser(cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME)); - - // Override sequential reads before prefetch if needed. - seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0); - - if (seqReadsBeforePrefetch > 0) - seqReadsBeforePrefetchOverride = true; - - // In Ignite replication factor is controlled by data cache affinity. - // We use replication factor to force the whole file to be stored on local node. - dfltReplication = (short)cfg.getInt("dfs.replication", 3); - - // Get file colocation control flag. - colocateFileWrites = parameter(cfg, PARAM_IGFS_COLOCATED_WRITES, uriAuthority, false); - preferLocFileWrites = cfg.getBoolean(PARAM_IGFS_PREFER_LOCAL_WRITES, false); - - // Get log directory. - String logDirCfg = parameter(cfg, PARAM_IGFS_LOG_DIR, uriAuthority, DFLT_IGFS_LOG_DIR); - - File logDirFile = U.resolveIgnitePath(logDirCfg); - - String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null; - - rmtClient = new IgfsHadoopWrapper(uriAuthority, logDir, cfg, LOG); - - // Handshake. - IgfsHandshakeResponse handshake = rmtClient.handshake(logDir); - - igfsGrpBlockSize = handshake.blockSize(); - - IgfsPaths paths = handshake.secondaryPaths(); - - // Initialize client logger. - Boolean logEnabled = parameter(cfg, PARAM_IGFS_LOG_ENABLED, uriAuthority, false); - - if (handshake.sampling() != null ? handshake.sampling() : logEnabled) { - // Initiate client logger. - if (logDir == null) - throw new IOException("Failed to resolve log directory: " + logDirCfg); - - Integer batchSize = parameter(cfg, PARAM_IGFS_LOG_BATCH_SIZE, uriAuthority, DFLT_IGFS_LOG_BATCH_SIZE); - - clientLog = IgfsLogger.logger(uriAuthority, handshake.igfsName(), logDir, batchSize); - } - else - clientLog = IgfsLogger.disabledLogger(); - - modeRslvr = new IgfsModeResolver(paths.defaultMode(), paths.pathModes()); - - boolean initSecondary = paths.defaultMode() == PROXY; - - if (!initSecondary && paths.pathModes() != null && !paths.pathModes().isEmpty()) { - for (T2 pathMode : paths.pathModes()) { - IgfsMode mode = pathMode.getValue(); - - if (mode == PROXY) { - initSecondary = true; - - break; - } - } - } - - if (initSecondary) { - Map props = paths.properties(); - - String secUri = props.get(SECONDARY_FS_URI); - String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); - - try { - SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); - - secondaryFs = secProvider.createFileSystem(); - secondaryUri = secProvider.uri(); - } - catch (IOException e) { - if (!mgmt) - throw new IOException("Failed to connect to the secondary file system: " + secUri, e); - else - LOG.warn("Visor failed to create secondary file system (operations on paths with PROXY mode " + - "will have no effect): " + e.getMessage()); - } - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override protected void checkPath(Path path) { - URI uri = path.toUri(); - - if (uri.isAbsolute()) { - if (!F.eq(uri.getScheme(), IGFS_SCHEME)) - throw new InvalidPathException("Wrong path scheme [expected=" + IGFS_SCHEME + ", actual=" + - uri.getAuthority() + ']'); - - if (!F.eq(uri.getAuthority(), uriAuthority)) - throw new InvalidPathException("Wrong path authority [expected=" + uriAuthority + ", actual=" + - uri.getAuthority() + ']'); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public short getDefaultReplication() { - return dfltReplication; - } - - /** {@inheritDoc} */ - @Override protected void finalize() throws Throwable { - super.finalize(); - - close0(); - } - - /** {@inheritDoc} */ - @Override public void close() throws IOException { - if (cacheEnabled && get(getUri(), getConf()) == this) - return; - - close0(); - } - - /** - * Closes file system. - * - * @throws IOException If failed. - */ - private void close0() throws IOException { - if (closeGuard.compareAndSet(false, true)) { - if (LOG.isDebugEnabled()) - LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']'); - - if (rmtClient == null) - return; - - super.close(); - - rmtClient.close(false); - - if (clientLog.isLogEnabled()) - clientLog.close(); - - if (secondaryFs != null) - U.closeQuiet(secondaryFs); - - // Reset initialized resources. - uri = null; - rmtClient = null; - } - } - - /** {@inheritDoc} */ - @Override public void setTimes(Path p, long mtime, long atime) throws IOException { - enterBusy(); - - try { - A.notNull(p, "p"); - - if (mode(p) == PROXY) { - if (secondaryFs == null) { - assert mgmt; - - // No-op for management connection. - return; - } - - secondaryFs.setTimes(toSecondary(p), mtime, atime); - } - else { - IgfsPath path = convert(p); - - rmtClient.setTimes(path, atime, mtime); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public void setPermission(Path p, FsPermission perm) throws IOException { - enterBusy(); - - try { - A.notNull(p, "p"); - - if (mode(p) == PROXY) { - if (secondaryFs == null) { - assert mgmt; - - // No-op for management connection. - return; - } - - secondaryFs.setPermission(toSecondary(p), perm); - } - else if (rmtClient.update(convert(p), permission(perm)) == null) { - throw new IOException("Failed to set file permission (file not found?)" + - " [path=" + p + ", perm=" + perm + ']'); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public void setOwner(Path p, String username, String grpName) throws IOException { - A.notNull(p, "p"); - A.notNull(username, "username"); - A.notNull(grpName, "grpName"); - - enterBusy(); - - try { - if (mode(p) == PROXY) { - if (secondaryFs == null) { - assert mgmt; - - // No-op for management connection. - return; - } - - secondaryFs.setOwner(toSecondary(p), username, grpName); - } - else if (rmtClient.update(convert(p), F.asMap(PROP_USER_NAME, username, PROP_GROUP_NAME, grpName)) == null) - throw new IOException("Failed to set file permission (file not found?)" + - " [path=" + p + ", userName=" + username + ", groupName=" + grpName + ']'); - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public FSDataInputStream open(Path f, int bufSize) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = mode(path); - - if (mode == PROXY) { - if (secondaryFs == null) { - assert mgmt; - - throw new IOException("Failed to open file (secondary file system is not initialized): " + f); - } - - FSDataInputStream is = secondaryFs.open(toSecondary(f), bufSize); - - if (clientLog.isLogEnabled()) { - // At this point we do not know file size, so we perform additional request to remote FS to get it. - FileStatus status = secondaryFs.getFileStatus(toSecondary(f)); - - long size = status != null ? status.getLen() : -1; - - long logId = IgfsLogger.nextId(); - - clientLog.logOpen(logId, path, PROXY, bufSize, size); - - return new FSDataInputStream(new IgfsHadoopProxyInputStream(is, clientLog, logId)); - } - else - return is; - } - else { - IgfsHadoopStreamDelegate stream = seqReadsBeforePrefetchOverride ? - rmtClient.open(path, seqReadsBeforePrefetch) : rmtClient.open(path); - - long logId = -1; - - if (clientLog.isLogEnabled()) { - logId = IgfsLogger.nextId(); - - clientLog.logOpen(logId, path, mode, bufSize, stream.length()); - } - - if (LOG.isDebugEnabled()) - LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path + - ", bufSize=" + bufSize + ']'); - - IgfsHadoopInputStream igfsIn = new IgfsHadoopInputStream(stream, stream.length(), - bufSize, LOG, clientLog, logId); - - if (LOG.isDebugEnabled()) - LOG.debug("Opened input stream [path=" + path + ", delegate=" + stream + ']'); - - return new FSDataInputStream(igfsIn); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public FSDataOutputStream create(Path f, FsPermission perm, boolean overwrite, int bufSize, - short replication, long blockSize, Progressable progress) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - OutputStream out = null; - - try { - IgfsPath path = convert(f); - IgfsMode mode = mode(path); - - if (LOG.isDebugEnabled()) - LOG.debug("Opening output stream in create [thread=" + Thread.currentThread().getName() + "path=" + - path + ", overwrite=" + overwrite + ", bufSize=" + bufSize + ']'); - - if (mode == PROXY) { - if (secondaryFs == null) { - assert mgmt; - - throw new IOException("Failed to create file (secondary file system is not initialized): " + f); - } - - FSDataOutputStream os = - secondaryFs.create(toSecondary(f), perm, overwrite, bufSize, replication, blockSize, progress); - - if (clientLog.isLogEnabled()) { - long logId = IgfsLogger.nextId(); - - clientLog.logCreate(logId, path, PROXY, overwrite, bufSize, replication, blockSize); - - return new FSDataOutputStream(new IgfsHadoopProxyOutputStream(os, clientLog, logId)); - } - else - return os; - } - else { - // Create stream and close it in the 'finally' section if any sequential operation failed. - IgfsHadoopStreamDelegate stream = rmtClient.create(path, overwrite, colocateFileWrites, - replication, blockSize, F.asMap(PROP_PERMISSION, toString(perm), - PROP_PREFER_LOCAL_WRITES, Boolean.toString(preferLocFileWrites))); - - assert stream != null; - - long logId = -1; - - if (clientLog.isLogEnabled()) { - logId = IgfsLogger.nextId(); - - clientLog.logCreate(logId, path, mode, overwrite, bufSize, replication, blockSize); - } - - if (LOG.isDebugEnabled()) - LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']'); - - IgfsHadoopOutputStream igfsOut = new IgfsHadoopOutputStream(stream, LOG, clientLog, - logId); - - bufSize = Math.max(64 * 1024, bufSize); - - out = new BufferedOutputStream(igfsOut, bufSize); - - FSDataOutputStream res = new FSDataOutputStream(out, null, 0); - - // Mark stream created successfully. - out = null; - - return res; - } - } - finally { - // Close if failed during stream creation. - if (out != null) - U.closeQuiet(out); - - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = mode(path); - - if (LOG.isDebugEnabled()) - LOG.debug("Opening output stream in append [thread=" + Thread.currentThread().getName() + - ", path=" + path + ", bufSize=" + bufSize + ']'); - - if (mode == PROXY) { - if (secondaryFs == null) { - assert mgmt; - - throw new IOException("Failed to append file (secondary file system is not initialized): " + f); - } - - FSDataOutputStream os = secondaryFs.append(toSecondary(f), bufSize, progress); - - if (clientLog.isLogEnabled()) { - long logId = IgfsLogger.nextId(); - - clientLog.logAppend(logId, path, PROXY, bufSize); // Don't have stream ID. - - return new FSDataOutputStream(new IgfsHadoopProxyOutputStream(os, clientLog, logId)); - } - else - return os; - } - else { - IgfsHadoopStreamDelegate stream = rmtClient.append(path, false, null); - - assert stream != null; - - long logId = -1; - - if (clientLog.isLogEnabled()) { - logId = IgfsLogger.nextId(); - - clientLog.logAppend(logId, path, mode, bufSize); - } - - if (LOG.isDebugEnabled()) - LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']'); - - IgfsHadoopOutputStream igfsOut = new IgfsHadoopOutputStream(stream, LOG, clientLog, - logId); - - bufSize = Math.max(64 * 1024, bufSize); - - BufferedOutputStream out = new BufferedOutputStream(igfsOut, bufSize); - - return new FSDataOutputStream(out, null, 0); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public boolean rename(Path src, Path dst) throws IOException { - A.notNull(src, "src"); - A.notNull(dst, "dst"); - - enterBusy(); - - try { - IgfsPath srcPath = convert(src); - IgfsPath dstPath = convert(dst); - IgfsMode mode = mode(srcPath); - - if (mode == PROXY) { - if (secondaryFs == null) { - assert mgmt; - - return false; - } - - if (clientLog.isLogEnabled()) - clientLog.logRename(srcPath, PROXY, dstPath); - - return secondaryFs.rename(toSecondary(src), toSecondary(dst)); - } - else { - // Will throw exception if failed. - rmtClient.rename(srcPath, dstPath); - - if (clientLog.isLogEnabled()) - clientLog.logRename(srcPath, mode, dstPath); - - return true; - } - } - catch (IOException e) { - // Intentionally ignore IGFS exceptions here to follow Hadoop contract. - if (F.eq(IOException.class, e.getClass()) && (e.getCause() == null || - !X.hasCause(e.getCause(), IgfsException.class))) - throw e; - else - return false; - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public boolean delete(Path f) throws IOException { - return delete(f, false); - } - - /** {@inheritDoc} */ - @Override public boolean delete(Path f, boolean recursive) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = mode(path); - - if (mode == PROXY) { - if (secondaryFs == null) { - assert mgmt; - - return false; - } - - if (clientLog.isLogEnabled()) - clientLog.logDelete(path, PROXY, recursive); - - return secondaryFs.delete(toSecondary(f), recursive); - } - else { - // Will throw exception if delete failed. - boolean res = rmtClient.delete(path, recursive); - - if (clientLog.isLogEnabled()) - clientLog.logDelete(path, mode, recursive); - - return res; - } - } - catch (IOException e) { - // Intentionally ignore IGFS exceptions here to follow Hadoop contract. - if (F.eq(IOException.class, e.getClass()) && (e.getCause() == null || - !X.hasCause(e.getCause(), IgfsException.class))) - throw e; - else - return false; - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public FileStatus[] listStatus(Path f) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = mode(path); - - if (mode == PROXY) { - if (secondaryFs == null) { - assert mgmt; - - return EMPTY_FILE_STATUS; - } - - FileStatus[] arr = secondaryFs.listStatus(toSecondary(f)); - - if (arr == null) - throw new FileNotFoundException("File " + f + " does not exist."); - - for (int i = 0; i < arr.length; i++) - arr[i] = toPrimary(arr[i]); - - if (clientLog.isLogEnabled()) { - String[] fileArr = new String[arr.length]; - - for (int i = 0; i < arr.length; i++) - fileArr[i] = arr[i].getPath().toString(); - - clientLog.logListDirectory(path, PROXY, fileArr); - } - - return arr; - } - else { - Collection list = rmtClient.listFiles(path); - - if (list == null) - throw new FileNotFoundException("File " + f + " does not exist."); - - List files = new ArrayList<>(list); - - FileStatus[] arr = new FileStatus[files.size()]; - - for (int i = 0; i < arr.length; i++) - arr[i] = convert(files.get(i)); - - if (clientLog.isLogEnabled()) { - String[] fileArr = new String[arr.length]; - - for (int i = 0; i < arr.length; i++) - fileArr[i] = arr[i].getPath().toString(); - - clientLog.logListDirectory(path, mode, fileArr); - } - - return arr; - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public Path getHomeDirectory() { - Path path = new Path("/user/" + userName.get()); - - return path.makeQualified(getUri(), null); - } - - /** - * Set user name and default working directory for current thread. - * - * @param userName User name. - */ - public void setUser(String userName) { - this.userName.set(userName); - - setWorkingDirectory(null); - } - - /** {@inheritDoc} */ - @Override public void setWorkingDirectory(Path newPath) { - if (newPath == null) { - Path homeDir = getHomeDirectory(); - - if (secondaryFs != null) - secondaryFs.setWorkingDirectory(toSecondary(homeDir)); - - workingDir.set(homeDir); - } - else { - Path fixedNewPath = fixRelativePart(newPath); - - String res = fixedNewPath.toUri().getPath(); - - if (!DFSUtil.isValidName(res)) - throw new IllegalArgumentException("Invalid DFS directory name " + res); - - if (secondaryFs != null) - secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath)); - - workingDir.set(fixedNewPath); - } - } - - /** {@inheritDoc} */ - @Override public Path getWorkingDirectory() { - return workingDir.get(); - } - - /** {@inheritDoc} */ - @Override public boolean mkdirs(Path f, FsPermission perm) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - IgfsPath path = convert(f); - IgfsMode mode = mode(path); - - if (mode == PROXY) { - if (secondaryFs == null) { - assert mgmt; - - return false; - } - - if (clientLog.isLogEnabled()) - clientLog.logMakeDirectory(path, PROXY); - - return secondaryFs.mkdirs(toSecondary(f), perm); - } - else { - boolean mkdirRes = rmtClient.mkdirs(path, permission(perm)); - - if (clientLog.isLogEnabled()) - clientLog.logMakeDirectory(path, mode); - - return mkdirRes; - } - } - catch (IOException e) { - // Intentionally ignore IGFS exceptions here to follow Hadoop contract. - if (F.eq(IOException.class, e.getClass()) && (e.getCause() == null || - !X.hasCause(e.getCause(), IgfsException.class))) - throw e; - else - return false; - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public FileStatus getFileStatus(Path f) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - if (mode(f) == PROXY) { - if (secondaryFs == null) { - assert mgmt; - - throw new IOException("Failed to get file status (secondary file system is not initialized): " + f); - } - - return toPrimary(secondaryFs.getFileStatus(toSecondary(f))); - } - else { - IgfsFile info = rmtClient.info(convert(f)); - - if (info == null) - throw new FileNotFoundException("File not found: " + f); - - return convert(info); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public ContentSummary getContentSummary(Path f) throws IOException { - A.notNull(f, "f"); - - enterBusy(); - - try { - if (mode(f) == PROXY) { - if (secondaryFs == null) { - assert mgmt; - - throw new IOException("Failed to get content summary (secondary file system is not initialized): " + - f); - } - - return secondaryFs.getContentSummary(toSecondary(f)); - } - else { - IgfsPathSummary sum = rmtClient.contentSummary(convert(f)); - - return new ContentSummary(sum.totalLength(), sum.filesCount(), sum.directoriesCount(), - -1, sum.totalLength(), rmtClient.fsStatus().spaceTotal()); - } - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public BlockLocation[] getFileBlockLocations(FileStatus status, long start, long len) throws IOException { - A.notNull(status, "status"); - - enterBusy(); - - try { - IgfsPath path = convert(status.getPath()); - - if (mode(status.getPath()) == PROXY) { - if (secondaryFs == null) { - assert mgmt; - - return EMPTY_BLOCK_LOCATIONS; - } - - Path secPath = toSecondary(status.getPath()); - - return secondaryFs.getFileBlockLocations(secondaryFs.getFileStatus(secPath), start, len); - } - else { - long now = System.currentTimeMillis(); - - List affinity = new ArrayList<>(rmtClient.affinity(path, start, len)); - - BlockLocation[] arr = new BlockLocation[affinity.size()]; - - for (int i = 0; i < arr.length; i++) - arr[i] = convert(affinity.get(i)); - - if (LOG.isDebugEnabled()) - LOG.debug("Fetched file locations [path=" + path + ", fetchTime=" + - (System.currentTimeMillis() - now) + ", locations=" + Arrays.asList(arr) + ']'); - - return arr; - } - } - catch (FileNotFoundException ignored) { - return EMPTY_BLOCK_LOCATIONS; - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public long getDefaultBlockSize() { - return igfsGrpBlockSize; - } - - /** - * Resolve path mode. - * - * @param path HDFS path. - * @return Path mode. - */ - public IgfsMode mode(Path path) { - return mode(convert(path)); - } - - /** - * Resolve path mode. - * - * @param path IGFS path. - * @return Path mode. - */ - public IgfsMode mode(IgfsPath path) { - return modeRslvr.resolveMode(path); - } - - /** - * Convert the given path to path acceptable by the primary file system. - * - * @param path Path. - * @return Primary file system path. - */ - private Path toPrimary(Path path) { - return convertPath(path, uri); - } - - /** - * Convert the given path to path acceptable by the secondary file system. - * - * @param path Path. - * @return Secondary file system path. - */ - private Path toSecondary(Path path) { - assert secondaryFs != null; - assert secondaryUri != null; - - return convertPath(path, secondaryUri); - } - - /** - * Convert path using the given new URI. - * - * @param path Old path. - * @param newUri New URI. - * @return New path. - */ - private Path convertPath(Path path, URI newUri) { - assert newUri != null; - - if (path != null) { - URI pathUri = path.toUri(); - - try { - return new Path(new URI(pathUri.getScheme() != null ? newUri.getScheme() : null, - pathUri.getAuthority() != null ? newUri.getAuthority() : null, pathUri.getPath(), null, null)); - } - catch (URISyntaxException e) { - throw new IgniteException("Failed to construct secondary file system path from the primary file " + - "system path: " + path, e); - } - } - else - return null; - } - - /** - * Convert a file status obtained from the secondary file system to a status of the primary file system. - * - * @param status Secondary file system status. - * @return Primary file system status. - */ - @SuppressWarnings("deprecation") - private FileStatus toPrimary(FileStatus status) { - return status != null ? new FileStatus(status.getLen(), status.isDir(), status.getReplication(), - status.getBlockSize(), status.getModificationTime(), status.getAccessTime(), status.getPermission(), - status.getOwner(), status.getGroup(), toPrimary(status.getPath())) : null; - } - - /** - * Convert IGFS path into Hadoop path. - * - * @param path IGFS path. - * @return Hadoop path. - */ - private Path convert(IgfsPath path) { - return new Path(IGFS_SCHEME, uriAuthority, path.toString()); - } - - /** - * Convert Hadoop path into IGFS path. - * - * @param path Hadoop path. - * @return IGFS path. - */ - @Nullable private IgfsPath convert(@Nullable Path path) { - if (path == null) - return null; - - return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) : - new IgfsPath(convert(workingDir.get()), path.toUri().getPath()); - } - - /** - * Convert IGFS affinity block location into Hadoop affinity block location. - * - * @param block IGFS affinity block location. - * @return Hadoop affinity block location. - */ - private BlockLocation convert(IgfsBlockLocation block) { - Collection names = block.names(); - Collection hosts = block.hosts(); - - return new BlockLocation( - names.toArray(new String[names.size()]) /* hostname:portNumber of data nodes */, - hosts.toArray(new String[hosts.size()]) /* hostnames of data nodes */, - block.start(), block.length() - ) { - @Override public String toString() { - try { - return "BlockLocation [offset=" + getOffset() + ", length=" + getLength() + - ", hosts=" + Arrays.asList(getHosts()) + ", names=" + Arrays.asList(getNames()) + ']'; - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - }; - } - - /** - * Convert IGFS file information into Hadoop file status. - * - * @param file IGFS file information. - * @return Hadoop file status. - */ - @SuppressWarnings("deprecation") - private FileStatus convert(IgfsFile file) { - return new FileStatus(file.length(), file.isDirectory(), getDefaultReplication(), - file.groupBlockSize(), file.modificationTime(), file.accessTime(), permission(file), - file.property(PROP_USER_NAME, DFLT_USER_NAME), file.property(PROP_GROUP_NAME, "users"), - convert(file.path())) { - @Override public String toString() { - return "FileStatus [path=" + getPath() + ", isDir=" + isDir() + ", len=" + getLen() + - ", mtime=" + getModificationTime() + ", atime=" + getAccessTime() + ']'; - } - }; - } - - /** - * Convert Hadoop permission into IGFS file attribute. - * - * @param perm Hadoop permission. - * @return IGFS attributes. - */ - private Map permission(FsPermission perm) { - if (perm == null) - perm = FsPermission.getDefault(); - - return F.asMap(PROP_PERMISSION, toString(perm)); - } - - /** - * @param perm Permission. - * @return String. - */ - private static String toString(FsPermission perm) { - return String.format("%04o", perm.toShort()); - } - - /** - * Convert IGFS file attributes into Hadoop permission. - * - * @param file File info. - * @return Hadoop permission. - */ - private FsPermission permission(IgfsFile file) { - String perm = file.property(PROP_PERMISSION, null); - - if (perm == null) - return FsPermission.getDefault(); - - try { - return new FsPermission((short)Integer.parseInt(perm, 8)); - } - catch (NumberFormatException ignore) { - return FsPermission.getDefault(); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsHadoopFileSystem.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/package.html ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/package.html b/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/package.html deleted file mode 100644 index 4b62db1..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/igfs/hadoop/v1/package.html +++ /dev/null @@ -1,24 +0,0 @@ - - - - - - - Contains Hadoop 1.x FileSystem wrapper for Ignite file system. - -