avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r894946 - in /hadoop/avro/trunk: ./ src/java/org/apache/avro/io/ src/java/org/apache/avro/tool/ src/test/java/org/apache/avro/tool/
Date Thu, 31 Dec 2009 21:52:35 GMT
Author: cutting
Date: Thu Dec 31 21:52:35 2009
New Revision: 894946

URL: http://svn.apache.org/viewvc?rev=894946&view=rev
Log:
AVRO-267.  Add two new avroj commands: rpcsend and rpcreceive.  Contributed by Philip Zeyliger.

Added:
    hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcReceiveTool.java
    hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcSendTool.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestRpcReceiveAndSendTools.java
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/build.xml
    hadoop/avro/trunk/src/java/org/apache/avro/io/JsonDecoder.java
    hadoop/avro/trunk/src/java/org/apache/avro/tool/Main.java
    hadoop/avro/trunk/src/java/org/apache/avro/tool/Util.java

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=894946&r1=894945&r2=894946&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Thu Dec 31 21:52:35 2009
@@ -45,6 +45,9 @@
 
     AVRO-258. Add GenAvro language tool.  (Todd Lipcon via cutting)
 
+    AVRO-267. Add two new avroj commands: rpcsend and rpcreceive.
+    (Philip Zeyliger via cutting)
+
   IMPROVEMENTS
 
     AVRO-157. Changes from code review comments for C++. (sbanacho)

Modified: hadoop/avro/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/build.xml?rev=894946&r1=894945&r2=894946&view=diff
==============================================================================
--- hadoop/avro/trunk/build.xml (original)
+++ hadoop/avro/trunk/build.xml Thu Dec 31 21:52:35 2009
@@ -156,7 +156,8 @@
   </target>
 
   <target name="compile-java" depends="javacc,schemata,ivy-retrieve">
-    <java-compiler excludes="**/ipc/** **/*Requestor.java **/*Responder.java" >
+    <java-compiler
+       excludes="**/ipc/** **/*Requestor.java **/*Responder.java **/tool/**">
       <src path="${build.dir}/src"/>
       <src path="${java.src.dir}"/>
     </java-compiler>

Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/JsonDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/JsonDecoder.java?rev=894946&r1=894945&r2=894946&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/JsonDecoder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/JsonDecoder.java Thu Dec 31 21:52:35 2009
@@ -43,10 +43,21 @@
     super(root);
     init(in);
   }
+  
+  JsonDecoder(Symbol root, String in) throws IOException {
+    super(root);
+    init(in);
+  }
 
+  /** Creates a new JsonDecoder based on an InputStream. */
   public JsonDecoder(Schema schema, InputStream in) throws IOException {
     this(new JsonGrammarGenerator().generate(schema), in);
   }
+  
+  /** Creates a new JsonDecoder based on a String input. */
+  public JsonDecoder(Schema schema, String in) throws IOException {
+    this(new JsonGrammarGenerator().generate(schema), in);
+  }
 
   private void advance(Symbol symbol) throws IOException {
     if (in.getCurrentToken() == null && this.parser.depth() == 1)
@@ -60,6 +71,13 @@
     this.in = new JsonFactory().createJsonParser(in);
     this.in.nextToken();
   }
+  
+  /** Re-initializes to start reading from a new String input. */
+  public void init(String in) throws IOException {
+    parser.reset();
+    this.in = new JsonFactory().createJsonParser(in);
+    this.in.nextToken();
+  }
 
   @Override
   public void readNull() throws IOException {

Modified: hadoop/avro/trunk/src/java/org/apache/avro/tool/Main.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/tool/Main.java?rev=894946&r1=894945&r2=894946&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/tool/Main.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/tool/Main.java Thu Dec 31 21:52:35 2009
@@ -43,7 +43,9 @@
         new DataFileReadTool(),
         new DataFileWriteTool(),
         new DataFileGetSchemaTool(),
-        new GenAvroTool()
+        new GenAvroTool(),
+        new RpcReceiveTool(),
+        new RpcSendTool()
         }) {
       Tool prev = tools.put(tool.getName(), tool);
       if (prev != null) {

Added: hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcReceiveTool.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcReceiveTool.java?rev=894946&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcReceiveTool.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcReceiveTool.java Thu Dec 31 21:52:35
2009
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.tool;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericResponder;
+import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.HttpServer;
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+
+/**
+ * Receives one RPC call and responds.  (The moral equivalent
+ * of "netcat".)
+ */
+public class RpcReceiveTool implements Tool {
+  private PrintStream out;
+  private Object response;
+  /** Used to communicate between server thread (responder) and run() */
+  private CountDownLatch latch;
+  private Message expectedMessage;
+  HttpServer server;
+
+  @Override
+  public String getName() {
+    return "rpcreceive";
+  }
+
+  @Override
+  public String getShortDescription() {
+    return "Opens an HTTP RPC Server and listens for one message.";
+  }
+  
+  private class SinkResponder extends GenericResponder {
+
+    public SinkResponder(Protocol local) {
+      super(local);
+    }
+
+    @Override
+    public Object respond(Message message, Object request)
+    throws AvroRemoteException {
+      if (!message.equals(expectedMessage)) {
+        out.println(String.format("Expected message '%s' but received '%s'.", 
+            expectedMessage.getName(), message.getName()));
+        latch.countDown();
+        throw new IllegalArgumentException("Unexpected message.");
+      }
+      out.print(message.getName());
+      out.print("\t");
+      try {
+        JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(
+            out, JsonEncoding.UTF8);
+        JsonEncoder jsonEncoder = new JsonEncoder(message.getRequest(), jsonGenerator);
+
+        GenericDatumWriter<Object> writer = new GenericDatumWriter<Object>(
+            message.getRequest());
+        writer.write(request, jsonEncoder);
+        jsonGenerator.flush();
+        jsonEncoder.flush();
+        out.flush();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      out.println();
+      latch.countDown();
+      return response;
+    }
+  }
+  
+  @Override
+  public int run(InputStream in, PrintStream out, PrintStream err,
+      List<String> args) throws Exception {
+    // Split up into two functions for easier testing.
+    int r = run1(in, out, err, args);
+    if (r != 0) {
+      return r;
+    }
+    return run2(err);
+  }
+
+  int run1(InputStream in, PrintStream out, PrintStream err,
+      List<String> args) throws Exception {
+    if (args.size() != 4) {
+      err.println("Expected four arguments: protocol port message_name json_response");
+      return 1;
+    }
+    Protocol protocol = Protocol.parse(args.get(0));
+    int port = Integer.parseInt(args.get(1));
+    String messageName = args.get(2);
+    expectedMessage = protocol.getMessages().get(messageName);
+    if (expectedMessage == null) {
+      err.println(String.format("No message named '%s' found in protocol '%s'.",
+          messageName, protocol));
+      return 1;
+    }
+    String jsonData = args.get(3);
+    this.out = out;
+    
+    this.response = Util.jsonToGenericDatum(expectedMessage.getResponse(), jsonData);
+    
+    latch = new CountDownLatch(1);
+    server = new HttpServer(new SinkResponder(protocol), port);
+    err.println("Listening on port " + server.getPort());
+    return 0;
+  }
+  
+  int run2(PrintStream err) throws InterruptedException {
+    latch.await();
+    err.println("Closing server.");
+    server.close();
+    return 0;
+  }
+
+}

Added: hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcSendTool.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcSendTool.java?rev=894946&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcSendTool.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcSendTool.java Thu Dec 31 21:52:35 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.tool;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.net.URL;
+import java.util.List;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRequestor;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.ipc.HttpTransceiver;
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+
+/**
+ * Sends a single RPC message.
+ */
+public class RpcSendTool implements Tool {
+  @Override
+  public String getName() {
+    return "rpcsend";
+  }
+
+  @Override
+  public String getShortDescription() {
+    return "Sends a single RPC message.";
+  }
+
+  @Override
+  public int run(InputStream in, PrintStream out, PrintStream err,
+      List<String> args) throws Exception {
+    if (args.size() != 5) {
+      err.println(
+          "Expected 5 arguments: protocol message_name host port json_data");
+      return 1;
+    }
+    Protocol protocol = Protocol.parse(args.get(0));
+    String messageName = args.get(1);
+    Message message = protocol.getMessages().get(messageName);
+    if (message == null) {
+      err.println(String.format("No message named '%s' found in protocol '%s'.",
+          messageName, protocol));
+      return 1;
+    }
+    String host = args.get(2);
+    int port = Integer.parseInt(args.get(3));
+    String jsonData = args.get(4);
+    
+    Object datum = Util.jsonToGenericDatum(message.getRequest(), jsonData);
+    GenericRequestor client = makeClient(protocol, host, port);
+    Object response = client.request(message.getName(), datum);
+    dumpJson(out, message.getResponse(), response);
+    return 0;
+  }
+
+  private void dumpJson(PrintStream out, Schema schema, Object datum) 
+  throws IOException {
+    DatumWriter<Object> writer = new GenericDatumWriter<Object>(schema);
+    JsonGenerator g =
+      new JsonFactory().createJsonGenerator(out, JsonEncoding.UTF8);
+    g.useDefaultPrettyPrinter();
+    writer.write(datum, new JsonEncoder(schema, g));
+    g.flush();
+    out.println();
+    out.flush();
+  }
+
+  private GenericRequestor makeClient(Protocol protocol, String host, int port) 
+  throws IOException {
+    HttpTransceiver transceiver = 
+      new HttpTransceiver(new URL("http", host, port, "/"));
+    GenericRequestor requestor = new GenericRequestor(protocol, transceiver);
+    return requestor;
+  }
+}

Modified: hadoop/avro/trunk/src/java/org/apache/avro/tool/Util.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/tool/Util.java?rev=894946&r1=894945&r2=894946&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/tool/Util.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/tool/Util.java Thu Dec 31 21:52:35 2009
@@ -20,8 +20,13 @@
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.io.InputStream;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.JsonDecoder;
+
 /** Static utility methods for tools. */
 class Util {
   /**
@@ -36,4 +41,17 @@
       return new FileInputStream(new File(filename));
     }
   }
+  
+  /** 
+   * Converts a String JSON object into a generic datum.
+   * 
+   * This is inefficient (creates extra objects), so should be used 
+   * sparingly.
+   */
+  static Object jsonToGenericDatum(Schema schema, String jsonData) throws IOException {
+    GenericDatumReader<Object> reader = 
+      new GenericDatumReader<Object>(schema);
+    Object datum = reader.read(null, new JsonDecoder(schema, jsonData));
+    return datum;
+  }
 }

Added: hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestRpcReceiveAndSendTools.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestRpcReceiveAndSendTools.java?rev=894946&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestRpcReceiveAndSendTools.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestRpcReceiveAndSendTools.java Thu
Dec 31 21:52:35 2009
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.tool;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+
+import org.apache.avro.Protocol;
+import org.junit.Test;
+
+public class TestRpcReceiveAndSendTools {
+  
+  /**
+   * Starts a server (using the tool) and sends a single message to it.
+   */
+  @Test
+  public void testServeAndSend() throws Exception {
+    Protocol protocol = Protocol.parse("" +
+                "{\"protocol\": \"Minimal\", " +
+                "\"messages\": { \"sink\": {" +
+                "   \"request\": [{\"name\": \"a\", \"type\": \"string\"}], " +
+                "   \"response\": \"string\"} } }");
+    ByteArrayOutputStream baos1 = new ByteArrayOutputStream();
+    PrintStream p1 = new PrintStream(baos1);
+    RpcReceiveTool receive = new RpcReceiveTool();
+    receive.run1(null, p1, System.err, 
+        Arrays.asList(protocol.toString(), "0", "sink", "\"omega\""));
+    int port = receive.server.getPort();
+    ByteArrayOutputStream baos2 = new ByteArrayOutputStream();
+    PrintStream p2 = new PrintStream(baos2);
+    RpcSendTool send = new RpcSendTool();
+    send.run(null, p2, System.err,
+        Arrays.asList(protocol.toString(), "sink", "localhost", 
+            Integer.toString(port), "{ \"a\": \"alpha\" }"));
+    receive.run2(System.err);
+    
+    assertEquals("sink\t{\"a\":\"alpha\"}\n", baos1.toString("UTF-8"));
+    assertEquals("\"omega\"\n", baos2.toString("UTF-8"));
+  }
+}



Mime
View raw message