tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hj...@apache.org
Subject [13/20] tajo git commit: TAJO-1181: Avro schema URL should support various protocols. (jinho)
Date Tue, 25 Nov 2014 14:45:54 GMT
TAJO-1181: Avro schema URL should support various protocols. (jinho)

Closes #252


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/965cbd90
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/965cbd90
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/965cbd90

Branch: refs/heads/hbase_storage
Commit: 965cbd90778e2c6872e04456e94a19dd2e4f27f5
Parents: c544ffc
Author: jhkim <jhkim@apache.org>
Authored: Thu Nov 20 10:29:20 2014 +0900
Committer: jhkim <jhkim@apache.org>
Committed: Thu Nov 20 10:29:20 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../org/apache/tajo/storage/avro/AvroUtil.java  |  35 +++-
 .../java/org/apache/tajo/HttpFileServer.java    |  84 +++++++++
 .../org/apache/tajo/HttpFileServerHandler.java  | 184 +++++++++++++++++++
 .../tajo/HttpFileServerPipelineFactory.java     |  54 ++++++
 .../apache/tajo/storage/avro/TestAvroUtil.java  | 108 +++++++++++
 6 files changed, 463 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/965cbd90/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index b197457..42ab10a 100644
--- a/CHANGES
+++ b/CHANGES
@@ -63,6 +63,9 @@ Release 0.9.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1181: Avro schema URL should support various protocols. 
+    (jinho)
+
     TAJO-1200: Invalid shuffle data of multiple worker in same server.
     (jinho)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/965cbd90/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
index c15d20b..0d14c3d 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
@@ -18,21 +18,23 @@
 
 package org.apache.tajo.storage.avro;
 
-import java.io.IOException;
-
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.storage.StorageConstants;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.StorageConstants;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
 
 public class AvroUtil {
   public static Schema getAvroSchema(TableMeta meta, Configuration conf)
       throws IOException {
 
-
     boolean isSchemaLiteral = meta.containsOption(StorageConstants.AVRO_SCHEMA_LITERAL);
     boolean isSchemaUrl = meta.containsOption(StorageConstants.AVRO_SCHEMA_URL);
     if (!isSchemaLiteral && !isSchemaUrl) {
@@ -44,9 +46,32 @@ public class AvroUtil {
     }
 
     String schemaURL = meta.getOption(StorageConstants.AVRO_SCHEMA_URL);
+    if (schemaURL.toLowerCase().startsWith("http")) {
+      return getAvroSchemaFromHttp(schemaURL);
+    } else {
+      return getAvroSchemaFromFileSystem(schemaURL, conf);
+    }
+  }
+
+  public static Schema getAvroSchemaFromHttp(String schemaURL) throws IOException {
+    InputStream inputStream = new URL(schemaURL).openStream();
+
+    try {
+      return new Schema.Parser().parse(inputStream);
+    } finally {
+      IOUtils.closeStream(inputStream);
+    }
+  }
+
+  public static Schema getAvroSchemaFromFileSystem(String schemaURL, Configuration conf)
throws IOException {
     Path schemaPath = new Path(schemaURL);
     FileSystem fs = schemaPath.getFileSystem(conf);
     FSDataInputStream inputStream = fs.open(schemaPath);
-    return new Schema.Parser().parse(inputStream);
+
+    try {
+      return new Schema.Parser().parse(inputStream);
+    } finally {
+      IOUtils.closeStream(inputStream);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/965cbd90/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java
new file mode 100644
index 0000000..cf8a54e
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServer.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.net.NetUtils;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+public class HttpFileServer {
+  private final static Log LOG = LogFactory.getLog(HttpFileServer.class);
+
+  private final InetSocketAddress addr;
+  private InetSocketAddress bindAddr;
+  private ServerBootstrap bootstrap = null;
+  private ChannelFactory factory = null;
+  private ChannelGroup channelGroup = null;
+
+  public HttpFileServer(final InetSocketAddress addr) {
+    this.addr = addr;
+    this.factory = new NioServerSocketChannelFactory(
+        Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
+        2);
+
+    // Configure the server.
+    this.bootstrap = new ServerBootstrap(factory);
+    // Set up the event pipeline factory.
+    this.bootstrap.setPipelineFactory(new HttpFileServerPipelineFactory());
+    this.channelGroup = new DefaultChannelGroup();
+  }
+
+  public HttpFileServer(String bindaddr) {
+    this(NetUtils.createSocketAddr(bindaddr));
+  }
+
+  public void start() {
+    // Bind and start to accept incoming connections.
+    Channel channel = bootstrap.bind(addr);
+    channelGroup.add(channel);    
+    this.bindAddr = (InetSocketAddress) channel.getLocalAddress();
+    LOG.info("HttpFileServer starts up ("
+        + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort()
+        + ")");
+  }
+  
+  public InetSocketAddress getBindAddress() {
+    return this.bindAddr;
+  }
+
+  public void stop() {
+    ChannelGroupFuture future = channelGroup.close();
+    future.awaitUninterruptibly();
+    factory.releaseExternalResources();
+
+    LOG.info("HttpFileServer shutdown ("
+        + this.bindAddr.getAddress().getHostAddress() + ":"
+        + this.bindAddr.getPort() + ")");
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/965cbd90/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java
new file mode 100644
index 0000000..6c77317
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerHandler.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedFile;
+import org.jboss.netty.util.CharsetUtil;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
+import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * this is an implementation copied from HttpStaticFileServerHandler.java of netty 3.6
+ */
+public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
+
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
{
+    HttpRequest request = (HttpRequest) e.getMessage();
+    if (request.getMethod() != GET) {
+      sendError(ctx, METHOD_NOT_ALLOWED);
+      return;
+    }
+
+    final String path = sanitizeUri(request.getUri());
+    if (path == null) {
+      sendError(ctx, FORBIDDEN);
+      return;
+    }
+
+    File file = new File(path);
+    if (file.isHidden() || !file.exists()) {
+      sendError(ctx, NOT_FOUND);
+      return;
+    }
+    if (!file.isFile()) {
+      sendError(ctx, FORBIDDEN);
+      return;
+    }
+
+    RandomAccessFile raf;
+    try {
+      raf = new RandomAccessFile(file, "r");
+    } catch (FileNotFoundException fnfe) {
+      sendError(ctx, NOT_FOUND);
+      return;
+    }
+    long fileLength = raf.length();
+
+    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+    setContentLength(response, fileLength);
+    setContentTypeHeader(response);
+
+    Channel ch = e.getChannel();
+
+    // Write the initial line and the header.
+    ch.write(response);
+
+    // Write the content.
+    ChannelFuture writeFuture;
+    if (ch.getPipeline().get(SslHandler.class) != null) {
+      // Cannot use zero-copy with HTTPS.
+      writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192));
+    } else {
+      // No encryption - use zero-copy.
+      final FileRegion region =
+          new DefaultFileRegion(raf.getChannel(), 0, fileLength);
+      writeFuture = ch.write(region);
+      writeFuture.addListener(new ChannelFutureProgressListener() {
+        public void operationComplete(ChannelFuture future) {
+          region.releaseExternalResources();
+        }
+
+        public void operationProgressed(
+            ChannelFuture future, long amount, long current, long total) {
+          System.out.printf("%s: %d / %d (+%d)%n", path, current, total, amount);
+        }
+      });
+    }
+
+    // Decide whether to close the connection or not.
+    if (!isKeepAlive(request)) {
+      // Close the connection when the whole content is written out.
+      writeFuture.addListener(ChannelFutureListener.CLOSE);
+    }
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+      throws Exception {
+    Channel ch = e.getChannel();
+    Throwable cause = e.getCause();
+    if (cause instanceof TooLongFrameException) {
+      sendError(ctx, BAD_REQUEST);
+      return;
+    }
+
+    cause.printStackTrace();
+    if (ch.isConnected()) {
+      sendError(ctx, INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  private static String sanitizeUri(String uri) {
+    // Decode the path.
+    try {
+      uri = URLDecoder.decode(uri, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      try {
+        uri = URLDecoder.decode(uri, "ISO-8859-1");
+      } catch (UnsupportedEncodingException e1) {
+        throw new Error();
+      }
+    }
+
+    // Convert file separators.
+    uri = uri.replace('/', File.separatorChar);
+
+    // Simplistic dumb security check.
+    // You will have to do something serious in the production environment.
+    if (uri.contains(File.separator + '.') ||
+        uri.contains('.' + File.separator) ||
+        uri.startsWith(".") || uri.endsWith(".")) {
+      return null;
+    }
+
+    return uri;
+  }
+
+  private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
+    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+    response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+    response.setContent(ChannelBuffers.copiedBuffer(
+        "Failure: " + status.toString() + "\r\n",
+        CharsetUtil.UTF_8));
+
+    // Close the connection as soon as the error message is sent.
+    ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+  }
+
+  /**
+   * Sets the content type header for the HTTP Response
+   *
+   * @param response
+   *            HTTP response
+   */
+  private static void setContentTypeHeader(HttpResponse response) {
+    response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/965cbd90/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
new file mode 100644
index 0000000..cecf93b
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+// Uncomment the following lines if you want HTTPS
+//import javax.net.ssl.SSLEngine;
+//import org.jboss.netty.example.securechat.SecureChatSslContextFactory;
+//import org.jboss.netty.handler.ssl.SslHandler;
+
+//this class is copied from HttpStaticFileServerPipelineFactory.java of netty 3.6
+public class HttpFileServerPipelineFactory implements ChannelPipelineFactory {
+  public ChannelPipeline getPipeline() throws Exception {
+    // Create a default pipeline implementation.
+    ChannelPipeline pipeline = pipeline();
+
+    // Uncomment the following lines if you want HTTPS
+    //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
+    //engine.setUseClientMode(false);
+    //pipeline.addLast("ssl", new SslHandler(engine));
+
+    pipeline.addLast("decoder", new HttpRequestDecoder());
+    pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
+    pipeline.addLast("encoder", new HttpResponseEncoder());
+    pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
+
+    pipeline.addLast("handler", new HttpFileServerHandler());
+    return pipeline;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/965cbd90/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java b/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
new file mode 100644
index 0000000..6186e9e
--- /dev/null
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.avro;
+
+import org.apache.avro.Schema;
+import org.apache.tajo.HttpFileServer;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.NetUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.net.URL;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link org.apache.tajo.storage.avro.AvroUtil}.
+ */
+public class TestAvroUtil {
+  private Schema expected;
+  private URL schemaUrl;
+
+  @Before
+  public void setUp() throws Exception {
+    schemaUrl = FileUtil.getResourcePath("testVariousTypes.avsc");
+    assertNotNull(schemaUrl);
+
+    File file = new File(schemaUrl.getPath());
+    assertTrue(file.exists());
+
+    expected = new Schema.Parser().parse(file);
+  }
+
+  @Test
+  public void testGetSchema() throws IOException, URISyntaxException {
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
+    meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, FileUtil.readTextFile(new File(schemaUrl.getPath())));
+    Schema schema = AvroUtil.getAvroSchema(meta, new TajoConf());
+    assertEquals(expected, schema);
+
+    meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
+    meta.putOption(StorageConstants.AVRO_SCHEMA_URL, schemaUrl.getPath());
+    schema = AvroUtil.getAvroSchema(meta, new TajoConf());
+    assertEquals(expected, schema);
+
+    HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0"));
+    try {
+      server.start();
+      InetSocketAddress addr = server.getBindAddress();
+
+      String url = "http://127.0.0.1:" + addr.getPort() + schemaUrl.getPath();
+      meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.AVRO);
+      meta.putOption(StorageConstants.AVRO_SCHEMA_URL, url);
+      schema = AvroUtil.getAvroSchema(meta, new TajoConf());
+    } finally {
+      server.stop();
+    }
+    assertEquals(expected, schema);
+  }
+
+  @Test
+  public void testGetSchemaFromHttp() throws IOException, URISyntaxException {
+    HttpFileServer server = new HttpFileServer(NetUtils.createSocketAddr("127.0.0.1:0"));
+    try {
+      server.start();
+      InetSocketAddress addr = server.getBindAddress();
+
+      Schema schema = AvroUtil.getAvroSchemaFromHttp("http://127.0.0.1:" + addr.getPort()
+ schemaUrl.getPath());
+      assertEquals(expected, schema);
+    } finally {
+      server.stop();
+    }
+  }
+
+  @Test
+  public void testGetSchemaFromFileSystem() throws IOException, URISyntaxException {
+    Schema schema = AvroUtil.getAvroSchemaFromFileSystem(schemaUrl.toString(), new TajoConf());
+
+    assertEquals(expected, schema);
+  }
+}


Mime
View raw message