Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 49237 invoked from network); 10 Aug 2007 23:41:36 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 10 Aug 2007 23:41:36 -0000 Received: (qmail 86918 invoked by uid 500); 10 Aug 2007 23:41:35 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 86813 invoked by uid 500); 10 Aug 2007 23:41:34 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 86804 invoked by uid 99); 10 Aug 2007 23:41:34 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Aug 2007 16:41:34 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Aug 2007 23:41:37 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 1FB121A981A; Fri, 10 Aug 2007 16:41:11 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r564804 - in /lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs: FileDataServlet.java HftpFileSystem.java ListPathsServlet.java Date: Fri, 10 Aug 2007 23:41:10 -0000 To: hadoop-commits@lucene.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070810234111.1FB121A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Fri Aug 10 16:41:10 2007 New Revision: 564804 URL: http://svn.apache.org/viewvc?view=rev&rev=564804 Log: HADOOP-1568. Missed files Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FileDataServlet.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/HftpFileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ListPathsServlet.java Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FileDataServlet.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FileDataServlet.java?view=auto&rev=564804 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FileDataServlet.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FileDataServlet.java Fri Aug 10 16:41:10 2007 @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.dfs; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** Redirect queries about the hosted filesystem to an appropriate datanode. + * @see org.apache.hadoop.fs.HftpFileSystem + */ +public class FileDataServlet extends HttpServlet { + + static URI getUri(DFSFileInfo i, NameNode nn) + throws IOException, URISyntaxException { + final DatanodeInfo host = pickSrcDatanode(i, nn); + return new URI("http", null, host.getHostName(), host.getInfoPort(), + "/streamFile", "filename=" + i.getPath(), null); + } + + private final static int BLOCK_SAMPLE = 5; + + /** Select a datanode to service this request. + * Currently, this looks at no more than the first five blocks of a file, + * selecting a datanode randomly from the most represented. + */ + protected static DatanodeInfo pickSrcDatanode(DFSFileInfo i, NameNode nn) + throws IOException { + long sample; + if (i.getLen() == 0) sample = 1; + else sample = i.getLen() / i.getBlockSize() > BLOCK_SAMPLE + ? i.getBlockSize() * BLOCK_SAMPLE - 1 + : i.getLen(); + final LocatedBlocks blks = nn.getBlockLocations( + i.getPath().toUri().getPath(), 0, sample); + HashMap count = new HashMap(); + for (LocatedBlock b : blks.getLocatedBlocks()) { + for (DatanodeInfo d : b.getLocations()) { + if (!count.containsKey(d)) { + count.put(d, 0); + } + count.put(d, count.get(d) + 1); + } + } + ArrayList loc = new ArrayList(); + int max = 0; + for (Map.Entry e : count.entrySet()) { + if (e.getValue() > max) { + loc.clear(); + max = e.getValue(); + } + if (e.getValue() == max) { + loc.add(e.getKey()); + } + } + final Random r = new Random(); + return loc.get(r.nextInt(loc.size())); + } + + /** + * Service a GET request as described below. + * Request: + * {@code + * GET http://:/data[/] HTTP/1.1 + * } + */ + public void doGet(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { + + try { + final String path = request.getPathInfo() != null + ? request.getPathInfo() : "/"; + final NameNode nn = (NameNode)getServletContext().getAttribute("name.node"); + DFSFileInfo info = nn.getFileInfo(path); + if (!info.isDir()) { + response.sendRedirect(getUri(info, nn).toURL().toString()); + } else { + response.sendError(400, "cat: " + path + ": is a directory"); + } + } catch (URISyntaxException e) { + response.getWriter().println(e.toString()); + } catch (IOException e) { + response.sendError(400, e.getMessage()); + } + } + +} + Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/HftpFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/HftpFileSystem.java?view=auto&rev=564804 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/HftpFileSystem.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/HftpFileSystem.java Fri Aug 10 16:41:10 2007 @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.dfs; + +import java.io.InputStream; +import java.io.IOException; + +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; + +import java.text.ParseException; +import java.text.SimpleDateFormat; + +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.TimeZone; + +import org.xml.sax.Attributes; +import org.xml.sax.ContentHandler; +import org.xml.sax.InputSource; +import org.xml.sax.SAXException; +import org.xml.sax.XMLReader; +import org.xml.sax.helpers.DefaultHandler; +import org.xml.sax.helpers.XMLReaderFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Progressable; + +/** An implementation of a protocol for accessing filesystems over HTTP. + * The following implementation provides a limited, read-only interface + * to a filesystem over HTTP. + * @see org.apache.hadoop.dfs.ListPathsServlet + * @see org.apache.hadoop.dfs.FileDataServet; + */ +public class HftpFileSystem extends FileSystem { + static { + HttpURLConnection.setFollowRedirects(true); + } + + private String fshostname = ""; + private int fsport = -1; + protected static final SimpleDateFormat df = ListPathsServlet.df; + + @Override + public void initialize(URI name, Configuration conf) throws IOException { + setConf(conf); + this.fshostname = name.getHost(); + this.fsport = name.getPort() != -1 + ? name.getPort() + : conf.getInt("dfs.info.port", -1); + } + + @Override + public URI getUri() { + try { + return new URI("hftp", null, fshostname, fsport, null, null, null); + } catch (URISyntaxException e) { + return null; + } + } + + @Override + public FSDataInputStream open(Path f, int buffersize) throws IOException { + HttpURLConnection connection = null; + try { + final URL url = new URI("http", null, fshostname, fsport, + "/data" + f.toUri().getPath(), null, null).toURL(); + connection = (HttpURLConnection)url.openConnection(); + connection.setRequestMethod("GET"); + connection.connect(); + } catch (URISyntaxException e) { + throw new IOException(e); + } + final InputStream in = connection.getInputStream(); + return new FSDataInputStream(new FSInputStream() { + public int read() throws IOException { + return in.read(); + } + public int read(byte[] b, int off, int len) throws IOException { + return in.read(b, off, len); + } + + public void close() throws IOException { + in.close(); + } + + public void seek(long pos) throws IOException { + throw new IOException("Can't seek!"); + } + public long getPos() throws IOException { + throw new IOException("Position unknown!"); + } + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + }); + } + + /** Class to parse and store a listing reply from the server. */ + class LsParser extends DefaultHandler { + + ArrayList fslist = new ArrayList(); + + public void startElement(String ns, String localname, String qname, + Attributes attrs) throws SAXException { + if ("listing".equals(qname)) return; + if (!"file".equals(qname) && !"directory".equals(qname)) { + throw new SAXException("Unrecognized entry: " + qname); + } + long modif; + try { + modif = df.parse(attrs.getValue("modified")).getTime(); + } catch (ParseException e) { throw new SAXException(e); } + FileStatus fs = "file".equals(qname) + ? new FileStatus( + Long.valueOf(attrs.getValue("size")).longValue(), false, + Short.valueOf(attrs.getValue("replication")).shortValue(), + Long.valueOf(attrs.getValue("blocksize")).longValue(), + modif, new Path("hftp", fshostname + ":" + fsport, + attrs.getValue("path"))) + : new FileStatus(0L, true, 0, 0L, + modif, new Path("hftp", fshostname + ":" + fsport, + attrs.getValue("path"))); + fslist.add(fs); + } + + private void fetchList(String path, boolean recur) throws IOException { + try { + XMLReader xr = XMLReaderFactory.createXMLReader(); + xr.setContentHandler(this); + final URL url = new URI("http", null, fshostname, fsport, + "/listPaths" + path, recur ? "recursive=yes" : null , null).toURL(); + HttpURLConnection connection = (HttpURLConnection)url.openConnection(); + connection.setRequestMethod("GET"); + connection.connect(); + + InputStream resp = connection.getInputStream(); + xr.parse(new InputSource(resp)); + } catch (Exception e) { + throw new IOException(e); + } + } + + public FileStatus getFileStatus(Path f) throws IOException { + fetchList(f.toUri().getPath(), false); + if (fslist.size() == 0) { + throw new IOException("File does not exist"); + } + return fslist.get(0); + } + + public FileStatus[] listStatus(Path f, boolean recur) throws IOException { + fetchList(f.toUri().getPath(), recur); + if (fslist.size() > 0 && (fslist.size() != 1 || fslist.get(0).isDir())) { + fslist.remove(0); + } + return fslist.toArray(new FileStatus[0]); + } + + public FileStatus[] listStatus(Path f) throws IOException { + return listStatus(f, false); + } + } + + @Override + public boolean exists(Path f) throws IOException { + LsParser lsparser = new LsParser(); + return lsparser.getFileStatus(f) != null; + } + + @Override + public FileStatus[] listStatus(Path f) throws IOException { + LsParser lsparser = new LsParser(); + return lsparser.listStatus(f); + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + LsParser lsparser = new LsParser(); + return lsparser.getFileStatus(f); + } + + @Override + public Path getWorkingDirectory() { + return new Path("/").makeQualified(this); + } + + @Override + public void setWorkingDirectory(Path f) { } + + @Override + public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, + short replication, long blockSize, + Progressable progress) throws IOException { + throw new IOException("Not supported"); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + throw new IOException("Not supported"); + } + + @Override + public boolean delete(Path f) throws IOException { + throw new IOException("Not supported"); + } + + @Override + public boolean mkdirs(Path f) throws IOException { + throw new IOException("Not supported"); + } + +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ListPathsServlet.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ListPathsServlet.java?view=auto&rev=564804 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ListPathsServlet.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ListPathsServlet.java Fri Aug 10 16:41:10 2007 @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.dfs; + +import org.apache.hadoop.util.VersionInfo; + +import org.znerd.xmlenc.*; + +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import java.net.URISyntaxException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Stack; +import java.util.TimeZone; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * Obtain meta-information about a filesystem. + * @see org.apache.hadoop.fs.HftpFileSystem + */ +public class ListPathsServlet extends HttpServlet { + + static final SimpleDateFormat df = + new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ"); + static { + df.setTimeZone(TimeZone.getTimeZone("UTC")); + } + + /** + * Write a node to output. + * Dir: path, modification + * File: path, size, replication, blocksize, and modification + */ + protected void writeItem(DFSFileInfo i, XMLOutputter doc, NameNode nn) + throws IOException, URISyntaxException { + doc.startTag(i.isDir() ? "directory" : "file"); + doc.attribute("path", i.getPath().toUri().getPath()); + doc.attribute("modified", df.format(new Date(i.getModificationTime()))); + if (!i.isDir()) { + doc.attribute("size", String.valueOf(i.getLen())); + doc.attribute("replication", String.valueOf(i.getReplication())); + doc.attribute("blocksize", String.valueOf(i.getBlockSize())); + } + doc.endTag(); + } + + /** + * Build a map from the query string, setting values and defaults. + */ + protected Map buildRoot(HttpServletRequest request, + XMLOutputter doc) { + final String path = request.getPathInfo() != null + ? request.getPathInfo() : "/"; + final String exclude = request.getParameter("exclude") != null + ? request.getParameter("exclude") : "\\..*\\.crc"; + final String filter = request.getParameter("filter") != null + ? request.getParameter("filter") : ".*"; + final boolean recur = request.getParameter("recursive") != null + && "yes".equals(request.getParameter("recursive")); + + Map root = new HashMap(); + root.put("path", path); + root.put("recursive", recur ? "yes" : "no"); + root.put("filter", filter); + root.put("exclude", exclude); + root.put("time", df.format(new Date())); + root.put("version", VersionInfo.getVersion()); + return root; + } + + /** + * Service a GET request as described below. + * Request: + * {@code + * GET http://:/listPaths[/][[&option]*] HTTP/1.1 + * } + * + * Where option (default) in: + * recursive ("no") + * filter (".*") + * exclude ("\..*\.crc") + * + * Response: A flat list of files/directories in the following format: + * {@code + * + * + * + * + * } + */ + public void doGet(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { + + final PrintWriter out = response.getWriter(); + final XMLOutputter doc = new XMLOutputter(out, "UTF-8"); + try { + final Map root = buildRoot(request, doc); + final String path = root.get("path"); + final boolean recur = "yes".equals(root.get("recursive")); + final Pattern filter = Pattern.compile(root.get("filter")); + final Pattern exclude = Pattern.compile(root.get("exclude")); + final NameNode nn = (NameNode)getServletContext().getAttribute("name.node"); + doc.declaration(); + doc.startTag("listing"); + for (Map.Entry m : root.entrySet()) { + doc.attribute(m.getKey(), m.getValue()); + } + + DFSFileInfo base = nn.getFileInfo(path); + if (base.isDir()) { + writeItem(base, doc, nn); + } + + Stack pathstack = new Stack(); + pathstack.push(path); + while (!pathstack.empty()) { + for (DFSFileInfo i : nn.getListing(pathstack.pop())) { + if (exclude.matcher(i.getName()).matches() + || !filter.matcher(i.getName()).matches()) { + continue; + } + if (recur && i.isDir()) { + pathstack.push(i.getPath().toUri().getPath()); + } + writeItem(i, doc, nn); + } + } + + } catch (URISyntaxException e) { + out.println(e.toString()); + } catch (PatternSyntaxException e) { + out.println(e.toString()); + } finally { + if (doc != null) { + doc.endDocument(); + } + + if (out != null) { + out.close(); + } + } + } +}