incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Making some updates to the new stream impl.
Date Tue, 06 Oct 2015 13:41:14 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 9ba62e6c9 -> 28f574671


Making some updates to the new stream impl.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/28f57467
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/28f57467
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/28f57467

Branch: refs/heads/master
Commit: 28f57467126fae8e5528dcce29e9bf7ce3ce62c3
Parents: 9ba62e6
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Oct 6 09:41:09 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Oct 6 09:41:09 2015 -0400

----------------------------------------------------------------------
 .../blur/command/stream/StreamClient.java       |  6 +-
 .../apache/blur/command/stream/StreamError.java | 27 +++-----
 .../blur/command/stream/StreamException.java    | 26 ++++++++
 .../blur/command/stream/StreamProcessor.java    | 14 +++--
 .../blur/command/stream/StreamServer.java       | 39 +++++++-----
 .../blur/command/stream/StreamServerTest.java   | 65 ++++++++++++++++----
 6 files changed, 125 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/28f57467/blur-core/src/main/java/org/apache/blur/command/stream/StreamClient.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamClient.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamClient.java
index 20d1bcd..70019e5 100644
--- a/blur-core/src/main/java/org/apache/blur/command/stream/StreamClient.java
+++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamClient.java
@@ -123,7 +123,8 @@ public class StreamClient implements Closeable {
     loadJars(classLoaderId, Arrays.asList(testJars));
   }
 
-  public <T> Iterable<T> executeStream(StreamSplit streamSplit, StreamFunction<T>
streamFunction) throws IOException {
+  public <T> Iterable<T> executeStream(StreamSplit streamSplit, StreamFunction<T>
streamFunction)
+      throws StreamException {
     return new Iterable<T>() {
       @Override
       public Iterator<T> iterator() {
@@ -168,6 +169,9 @@ public class StreamClient implements Closeable {
             public T next() {
               T o = (T) _obj;
               _obj = null;
+              if (o instanceof StreamError) {
+                throw new StreamException(((StreamError) o).getThrowable());
+              }
               return o;
             }
           };

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/28f57467/blur-core/src/main/java/org/apache/blur/command/stream/StreamError.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamError.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamError.java
index 468cd56..ffb7bd1 100644
--- a/blur-core/src/main/java/org/apache/blur/command/stream/StreamError.java
+++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamError.java
@@ -16,38 +16,25 @@
  */
 package org.apache.blur.command.stream;
 
-import java.io.PrintWriter;
 import java.io.Serializable;
-import java.io.StringWriter;
 
 public class StreamError implements Serializable {
 
   private static final long serialVersionUID = 5624998869726795714L;
-  
-  private final String _message;
-  private final String _stack;
-
-  public StreamError(Exception e) {
-    _message = e.getMessage();
-    Throwable cause = e.getCause();
-    StringWriter sw = new StringWriter();
-    PrintWriter pw = new PrintWriter(sw);
-    cause.printStackTrace(pw);
-    pw.close();
-    _stack = sw.toString();
-  }
 
-  public String getMessage() {
-    return _message;
+  private final Throwable _throwable;
+
+  public StreamError(Throwable t) {
+    _throwable = t;
   }
 
-  public String getStack() {
-    return _stack;
+  public Throwable getThrowable() {
+    return _throwable;
   }
 
   @Override
   public String toString() {
-    return "StreamError [_message=" + _message + ", _stack=" + _stack + "]";
+    return "StreamError [_throwable=" + _throwable + "]";
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/28f57467/blur-core/src/main/java/org/apache/blur/command/stream/StreamException.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamException.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamException.java
new file mode 100644
index 0000000..2ba32fd
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.stream;
+
+public class StreamException extends RuntimeException {
+
+  private static final long serialVersionUID = -2692794357251949238L;
+
+  public StreamException(Throwable throwable) {
+    super(throwable.getMessage(), throwable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/28f57467/blur-core/src/main/java/org/apache/blur/command/stream/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamProcessor.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamProcessor.java
index d1d09c1..4098a6b 100644
--- a/blur-core/src/main/java/org/apache/blur/command/stream/StreamProcessor.java
+++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.blur.command.stream;
 
+import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.File;
@@ -43,6 +44,8 @@ import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.manager.IndexServer;
 import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.trace.Trace;
+import org.apache.blur.trace.Tracer;
 import org.apache.commons.io.IOUtils;
 
 import com.google.common.cache.CacheBuilder;
@@ -106,16 +109,17 @@ public class StreamProcessor {
 
   public <T> void execute(StreamFunction<T> function, OutputStream outputStream,
IndexContext indexContext)
       throws IOException {
-    final ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
+    final ObjectOutputStream objectOutputStream = new ObjectOutputStream(new BufferedOutputStream(outputStream));
     StreamWriter<T> writer = getWriter(objectOutputStream);
+    Tracer tracer = Trace.trace("stream - execute");
     try {
       function.call(indexContext, writer);
-    } catch (Exception e) {
-      LOG.error("Unknown error.", e);
-      objectOutputStream.writeObject(new StreamError(e));
-    } finally {
       objectOutputStream.writeObject(new StreamComplete());
+    } catch (Throwable t) {
+      objectOutputStream.writeObject(new StreamError(t));
+    } finally {
       objectOutputStream.close();
+      tracer.done();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/28f57467/blur-core/src/main/java/org/apache/blur/command/stream/StreamServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamServer.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamServer.java
index 82bf6a2..b8f9dbd 100644
--- a/blur-core/src/main/java/org/apache/blur/command/stream/StreamServer.java
+++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamServer.java
@@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.blur.concurrent.Executors;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.trace.Trace;
+import org.apache.blur.trace.Tracer;
 import org.apache.blur.user.User;
 import org.apache.blur.user.UserContext;
 import org.apache.commons.io.IOUtils;
@@ -158,23 +160,28 @@ public class StreamServer implements Closeable {
 
   public static void executeStream(StreamProcessor streamProcessor, InputStream in, OutputStream
outputStream)
       throws IOException {
-    DataInputStream inputStream = new DataInputStream(in);
-    byte[] streamSplitBytes = getObjectBytes(inputStream);
-    byte[] functionBytes = getObjectBytes(inputStream);
-    StreamSplit streamSplit = getStreamSplit(toInputStream(streamSplitBytes));
-    String table = streamSplit.getTable();
-    String shard = streamSplit.getShard();
-    String classLoaderId = streamSplit.getClassLoaderId();
-    User user = new User(streamSplit.getUser(), streamSplit.getUserAttributes());
-    UserContext.setUser(user);
-    StreamIndexContext indexContext = null;
+    Tracer tracer = Trace.trace("stream - executeStream");
     try {
-      indexContext = streamProcessor.getIndexContext(table, shard);
-      StreamFunction<?> function = streamProcessor.getStreamFunction(classLoaderId,
toInputStream(functionBytes));
-      streamProcessor.execute(function, outputStream, indexContext);
+      DataInputStream inputStream = new DataInputStream(in);
+      byte[] streamSplitBytes = getObjectBytes(inputStream);
+      byte[] functionBytes = getObjectBytes(inputStream);
+      StreamSplit streamSplit = getStreamSplit(toInputStream(streamSplitBytes));
+      String table = streamSplit.getTable();
+      String shard = streamSplit.getShard();
+      String classLoaderId = streamSplit.getClassLoaderId();
+      User user = new User(streamSplit.getUser(), streamSplit.getUserAttributes());
+      UserContext.setUser(user);
+      StreamIndexContext indexContext = null;
+      try {
+        indexContext = streamProcessor.getIndexContext(table, shard);
+        StreamFunction<?> function = streamProcessor.getStreamFunction(classLoaderId,
toInputStream(functionBytes));
+        streamProcessor.execute(function, outputStream, indexContext);
+      } finally {
+        IOUtils.closeQuietly(indexContext);
+        UserContext.reset();
+      }
     } finally {
-      IOUtils.closeQuietly(indexContext);
-      UserContext.reset();
+      tracer.done();
     }
   }
 
@@ -224,6 +231,7 @@ public class StreamServer implements Closeable {
   }
 
   private static StreamSplit getStreamSplit(InputStream inputStream) throws IOException {
+    Tracer tracer = Trace.trace("stream - getStreamSplit");
     ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
     try {
       return (StreamSplit) objectInputStream.readObject();
@@ -231,6 +239,7 @@ public class StreamServer implements Closeable {
       throw new IOException(e);
     } finally {
       objectInputStream.close();
+      tracer.done();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/28f57467/blur-core/src/test/java/org/apache/blur/command/stream/StreamServerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/command/stream/StreamServerTest.java
b/blur-core/src/test/java/org/apache/blur/command/stream/StreamServerTest.java
index da582a3..d28f6c9 100644
--- a/blur-core/src/test/java/org/apache/blur/command/stream/StreamServerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/command/stream/StreamServerTest.java
@@ -16,9 +16,7 @@
  */
 package org.apache.blur.command.stream;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -55,7 +53,7 @@ import com.google.common.io.Closer;
 public class StreamServerTest implements Serializable {
 
   @Test
-  public void testServer() throws IOException {
+  public void testServer() throws StreamException, IOException {
     Closer closer = Closer.create();
     try {
       File tmpFile = new File("./target/tmp/StreamServerTest");
@@ -93,6 +91,51 @@ public class StreamServerTest implements Serializable {
     }
   }
 
+  @Test
+  public void testServerInjectError() throws IOException {
+    Closer closer = Closer.create();
+    try {
+      File tmpFile = new File("./target/tmp/StreamServerTest");
+      tmpFile.mkdirs();
+      IndexServer indexServer = new TestIndexServer();
+      StreamProcessor streamProcessor = new StreamProcessor(indexServer, tmpFile);
+      int timeout = 3000000;
+      String classLoaderId = UUID.randomUUID().toString();
+
+      StreamServer server = closer.register(new StreamServer(0, 100, streamProcessor));
+      server.start();
+      int port = server.getPort();
+      StreamClient client = closer.register(new StreamClient("localhost", port, timeout));
+      assertFalse(client.isClassLoaderAvailable(classLoaderId));
+      client.loadJars(classLoaderId, getTestJar());
+
+      String table = "test";
+      String shard = "shard";
+      String user = "test";
+      Map<String, String> userAttributes = new HashMap<String, String>();
+      StreamSplit split = new StreamSplit(table, shard, classLoaderId, user, userAttributes);
+      try {
+        Iterable<String> it = client.executeStream(split, new StreamFunction<String>()
{
+          @Override
+          public void call(IndexContext indexContext, StreamWriter<String> writer)
throws Exception {
+            Class.forName("errorclass");
+          }
+        });
+        Iterator<String> iterator = it.iterator();
+        if (iterator.hasNext()) {
+          iterator.next();
+        }
+        fail();
+      } catch (StreamException e) {
+        Throwable cause = e.getCause();
+        cause.printStackTrace();
+      }
+
+    } finally {
+      closer.close();
+    }
+  }
+
   private String getTestJar() {
     String property = System.getProperty("java.class.path");
     Splitter splitter = Splitter.on(':');
@@ -249,32 +292,32 @@ public class StreamServerTest implements Serializable {
 
   protected static IndexSearcherCloseable getIndexSearcherCloseable() {
     return new IndexSearcherCloseable() {
-      
+
       @Override
       public void close() throws IOException {
-        
+
       }
-      
+
       @Override
       public void setSimilarity(Similarity similarity) {
         throw new RuntimeException("Not implemented.");
       }
-      
+
       @Override
       public void search(Query query, Collector collector) throws IOException {
         throw new RuntimeException("Not implemented.");
       }
-      
+
       @Override
       public TopDocs search(Query query, int i) throws IOException {
         throw new RuntimeException("Not implemented.");
       }
-      
+
       @Override
       public Query rewrite(Query query) throws IOException {
         throw new RuntimeException("Not implemented.");
       }
-      
+
       @Override
       public IndexReader getIndexReader() {
         return null;


Mime
View raw message