apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [1/2] apex-core git commit: APEXCORE-608 Streaming Containers use stale RPC proxy after connection is closed
Date Sun, 19 Feb 2017 00:48:40 GMT
Repository: apex-core
Updated Branches:
  refs/heads/master d80501bdc -> 32f229f21


APEXCORE-608 Streaming Containers use stale RPC proxy after connection is closed


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/c687bb5e
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/c687bb5e
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/c687bb5e

Branch: refs/heads/master
Commit: c687bb5e69e1ab9f3510d68dd726a1de20430908
Parents: 4302929
Author: Vlad Rozov <v.rozov@datatorrent.com>
Authored: Thu Jan 19 17:55:28 2017 -0800
Committer: Vlad Rozov <v.rozov@datatorrent.com>
Committed: Sat Feb 11 22:26:11 2017 -0800

----------------------------------------------------------------------
 .../datatorrent/stram/RecoverableRpcProxy.java  | 147 ++++++++++---------
 .../datatorrent/stram/StramLocalCluster.java    |  10 +-
 .../stram/StreamingContainerParent.java         |  44 +++---
 .../StreamingContainerUmbilicalProtocol.java    |   4 +-
 .../stram/engine/StreamingContainer.java        |  60 +++++---
 .../datatorrent/stram/StramRecoveryTest.java    |  12 +-
 6 files changed, 150 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/c687bb5e/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java b/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java
index 1de581e..e454d49 100644
--- a/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java
+++ b/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java
@@ -22,13 +22,15 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.net.ConnectException;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.nio.charset.Charset;
 import java.util.List;
 
+import javax.net.SocketFactory;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,8 +42,6 @@ import org.apache.http.NameValuePair;
 import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.client.utils.URLEncodedUtils;
 
-import com.google.common.base.Throwables;
-
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
 
 import static java.lang.Thread.sleep;
@@ -68,98 +68,111 @@ public class RecoverableRpcProxy implements java.lang.reflect.InvocationHandler,
   private static final int RPC_TIMEOUT_DEFAULT = 5000;
 
   private final Configuration conf;
-  private final String appPath;
   private StreamingContainerUmbilicalProtocol umbilical;
   private String lastConnectURI;
-  private long lastCompletedCallTms;
-  private long retryTimeoutMillis = Long.getLong(RETRY_TIMEOUT, RETRY_TIMEOUT_DEFAULT);
-  private long retryDelayMillis = Long.getLong(RETRY_DELAY, RETRY_DELAY_DEFAULT);
-  private int rpcTimeout = Integer.getInteger(RPC_TIMEOUT, RPC_TIMEOUT_DEFAULT);
-
-  public RecoverableRpcProxy(String appPath, Configuration conf) throws IOException
+  private long retryTimeoutMillis;
+  private long retryDelayMillis;
+  private int rpcTimeout;
+  private final UserGroupInformation currentUser;
+  private final SocketFactory defaultSocketFactory;
+  private final FSRecoveryHandler fsRecoveryHandler;
+
+  public RecoverableRpcProxy(String appPath, Configuration conf)
   {
     this.conf = conf;
-    this.appPath = appPath;
-    connect();
+    try {
+      currentUser = UserGroupInformation.getCurrentUser();
+      defaultSocketFactory = NetUtils.getDefaultSocketFactory(conf);
+      fsRecoveryHandler = new FSRecoveryHandler(appPath, conf);
+      connect(0);
+    } catch (IOException e) {
+      LOG.error("Fail to create RecoverableRpcProxy", e);
+      throw new RuntimeException(e);
+    }
   }
 
-  private void connect() throws IOException
+  private long connect(long timeMillis) throws IOException
   {
-    FSRecoveryHandler fsrh = new FSRecoveryHandler(appPath, conf);
-    String uriStr = fsrh.readConnectUri();
+    String uriStr = fsRecoveryHandler.readConnectUri();
     if (!uriStr.equals(lastConnectURI)) {
-      // reset timeout
       LOG.debug("Got new RPC connect address {}", uriStr);
-      lastCompletedCallTms = System.currentTimeMillis();
       lastConnectURI = uriStr;
-    }
-    URI heartbeatUri = URI.create(uriStr);
+      if (umbilical != null) {
+        RPC.stopProxy(umbilical);
+      }
 
-    String queryStr = heartbeatUri.getQuery();
-    List<NameValuePair> queryList = null;
-    if (queryStr != null) {
-      queryList = URLEncodedUtils.parse(queryStr, Charset.defaultCharset());
-    }
-    if (queryList != null) {
-      for (NameValuePair pair : queryList) {
-        String value = pair.getValue();
-        String key = pair.getName();
-        if (QP_rpcTimeout.equals(key)) {
-          this.rpcTimeout = Integer.parseInt(value);
-        } else if (QP_retryTimeoutMillis.equals(key)) {
-          this.retryTimeoutMillis = Long.parseLong(value);
-        } else if (QP_retryDelayMillis.equals(key)) {
-          this.retryDelayMillis = Long.parseLong(value);
+      retryTimeoutMillis = Long.getLong(RETRY_TIMEOUT, RETRY_TIMEOUT_DEFAULT);
+      retryDelayMillis = Long.getLong(RETRY_DELAY, RETRY_DELAY_DEFAULT);
+      rpcTimeout = Integer.getInteger(RPC_TIMEOUT, RPC_TIMEOUT_DEFAULT);
+
+      URI heartbeatUri = URI.create(uriStr);
+
+      String queryStr = heartbeatUri.getQuery();
+      if (queryStr != null) {
+        List<NameValuePair> queryList = URLEncodedUtils.parse(queryStr, Charset.defaultCharset());
+        if (queryList != null) {
+          for (NameValuePair pair : queryList) {
+            String value = pair.getValue();
+            String key = pair.getName();
+            if (QP_rpcTimeout.equals(key)) {
+              this.rpcTimeout = Integer.parseInt(value);
+            } else if (QP_retryTimeoutMillis.equals(key)) {
+              this.retryTimeoutMillis = Long.parseLong(value);
+            } else if (QP_retryDelayMillis.equals(key)) {
+              this.retryDelayMillis = Long.parseLong(value);
+            }
+          }
         }
       }
+      InetSocketAddress address = NetUtils.createSocketAddrForHost(heartbeatUri.getHost(),
heartbeatUri.getPort());
+      umbilical = RPC.getProxy(StreamingContainerUmbilicalProtocol.class, StreamingContainerUmbilicalProtocol.versionID,
address, currentUser, conf, defaultSocketFactory, rpcTimeout);
+      // reset timeout
+      return System.currentTimeMillis() + retryTimeoutMillis;
     }
-    InetSocketAddress address = NetUtils.createSocketAddrForHost(heartbeatUri.getHost(),
heartbeatUri.getPort());
-    umbilical = RPC.getProxy(StreamingContainerUmbilicalProtocol.class, StreamingContainerUmbilicalProtocol.versionID,
address, UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf),
rpcTimeout);
+    return timeMillis;
   }
 
-  public StreamingContainerUmbilicalProtocol getProxy()
+  public StreamingContainerUmbilicalProtocol getProxy() throws IOException
   {
-    StreamingContainerUmbilicalProtocol recoverableProxy = (StreamingContainerUmbilicalProtocol)java.lang.reflect.Proxy.newProxyInstance(umbilical.getClass().getClassLoader(),
umbilical.getClass().getInterfaces(), this);
+    if (umbilical == null) {
+      throw new IOException("RecoverableRpcProxy is closed.");
+    }
+    StreamingContainerUmbilicalProtocol recoverableProxy = (StreamingContainerUmbilicalProtocol)Proxy.newProxyInstance(umbilical.getClass().getClassLoader(),
umbilical.getClass().getInterfaces(), this);
     return recoverableProxy;
   }
 
   @Override
   @SuppressWarnings("SleepWhileInLoop")
-  public Object invoke(Object proxy, Method method, Object[] args) throws ConnectException,
SocketTimeoutException, InterruptedException, IllegalAccessException
+  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
   {
-    Object result;
+    long endTimeMillis = System.currentTimeMillis() + retryTimeoutMillis;
+    if (umbilical == null) {
+      endTimeMillis = connect(endTimeMillis);
+    }
     while (true) {
+      if (umbilical == null) {
+        throw new IOException("RecoverableRpcProxy is closed.");
+      }
       try {
-        if (umbilical == null) {
-          connect();
-        }
-        //long start = System.nanoTime();
-        result = method.invoke(umbilical, args);
-        lastCompletedCallTms = System.currentTimeMillis();
-        //long end = System.nanoTime();
-        //LOG.info(String.format("%s took %d ns", method.getName(), (end - start)));
-        return result;
-      } catch (InvocationTargetException e) {
+        return method.invoke(umbilical, args);
+      } catch (Throwable t) {
         // handle RPC failure
-        Throwable targetException = e.getTargetException();
-        long connectMillis = System.currentTimeMillis() - lastCompletedCallTms;
-        if (connectMillis < retryTimeoutMillis) {
-          LOG.warn("RPC failure, attempting reconnect after {} ms (remaining {} ms)", retryDelayMillis,
retryTimeoutMillis - connectMillis, targetException);
-          close();
+        while (t instanceof InvocationTargetException || t instanceof UndeclaredThrowableException)
{
+          Throwable cause = t.getCause();
+          if (cause != null) {
+            t = cause;
+          }
+        }
+        final long currentTimeMillis = System.currentTimeMillis();
+        if (currentTimeMillis < endTimeMillis) {
+          LOG.warn("RPC failure, will retry after {} ms (remaining {} ms)", retryDelayMillis,
endTimeMillis - currentTimeMillis, t);
           sleep(retryDelayMillis);
+          endTimeMillis = connect(endTimeMillis);
         } else {
-          LOG.error("Giving up RPC connection recovery after {} ms", connectMillis, targetException);
-          if (targetException instanceof java.net.ConnectException) {
-            throw (java.net.ConnectException)targetException;
-          } else if (targetException instanceof java.net.SocketTimeoutException) {
-            throw (java.net.SocketTimeoutException)targetException;
-          } else {
-            throw Throwables.propagate(targetException);
-          }
+          LOG.error("Giving up RPC connection recovery after {} ms", currentTimeMillis -
endTimeMillis + retryTimeoutMillis, t);
+          close();
+          throw t;
         }
-      } catch (IOException ex) {
-        close();
-        throw new RuntimeException(ex);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/c687bb5e/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index 2ffbabd..7e4b78a 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -105,13 +105,9 @@ public class StramLocalCluster implements Runnable, Controller
     }
 
     @Override
-    public void reportError(String containerId, int[] operators, String msg)
+    public void reportError(String containerId, int[] operators, String msg) throws IOException
     {
-      try {
-        log(containerId, msg);
-      } catch (IOException ex) {
-        // ignore
-      }
+      log(containerId, msg);
     }
 
     @Override
@@ -131,7 +127,7 @@ public class StramLocalCluster implements Runnable, Controller
     }
 
     @Override
-    public ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg)
+    public ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg) throws IOException
     {
       if (injectShutdown.containsKey(msg.getContainerId())) {
         ContainerHeartbeatResponse r = new ContainerHeartbeatResponse();

http://git-wip-us.apache.org/repos/asf/apex-core/blob/c687bb5e/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
index b36529a..e5d8a97 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
@@ -163,11 +163,16 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit
   @Override
   public void log(String containerId, String msg) throws IOException
   {
-    LOG.info("child msg: {} context: {}", msg, dagManager.getContainerAgent(containerId).container);
+    final StreamingContainerAgent sca = dagManager.getContainerAgent(containerId);
+    if (sca != null) {
+      LOG.info("child msg: {} context: {}", msg, sca.container);
+    } else {
+      LOG.info("unknown container {} msg: {}", containerId, msg);
+    }
   }
 
   @Override
-  public void reportError(String containerId, int[] operators, String msg)
+  public void reportError(String containerId, int[] operators, String msg) throws IOException
   {
     if (operators == null || operators.length == 0) {
       dagManager.recordEventAsync(new ContainerErrorEvent(containerId, msg));
@@ -179,24 +184,23 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit
         }
       }
     }
-    try {
-      log(containerId, msg);
-    } catch (IOException ex) {
-      // ignore
-    }
+    log(containerId, msg);
   }
 
   @Override
   public StreamingContainerContext getInitContext(String containerId)
       throws IOException
   {
+    StreamingContainerContext scc = null;
     StreamingContainerAgent sca = dagManager.getContainerAgent(containerId);
-
-    return sca.getInitContext();
+    if (sca != null) {
+      scc = sca.getInitContext();
+    }
+    return scc;
   }
 
   @Override
-  public ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg)
+  public ContainerHeartbeatResponse processHeartbeat(final ContainerHeartbeat msg) throws
IOException
   {
     // -- TODO
     // Change to use some sort of a annotation that developers can use to specify secure
code
@@ -208,20 +212,14 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit
     //LOG.debug("RPC latency from child container {} is {} ms (according to system clocks)",
msg.getContainerId(),
     // now - msg.sentTms);
     dagManager.updateRPCLatency(msg.getContainerId(), now - msg.sentTms);
-    try {
-      final ContainerHeartbeat fmsg = msg;
-      return SecureExecutor.execute(new SecureExecutor.WorkLoad<ContainerHeartbeatResponse>()
+    return SecureExecutor.execute(new SecureExecutor.WorkLoad<ContainerHeartbeatResponse>()
+    {
+      @Override
+      public ContainerHeartbeatResponse run()
       {
-        @Override
-        public ContainerHeartbeatResponse run()
-        {
-          return dagManager.processHeartbeat(fmsg);
-        }
-      });
-    } catch (IOException ex) {
-      LOG.error("Error processing heartbeat", ex);
-      return null;
-    }
+        return dagManager.processHeartbeat(msg);
+      }
+    });
   }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/c687bb5e/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
index 77a33e6..13832ba 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
@@ -431,7 +431,7 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
    * @param operators
    * @param msg
    */
-  void reportError(String containerId, int[] operators, String msg);
+  void reportError(String containerId, int[] operators, String msg) throws IOException;
 
   /**
    * To be called periodically by child for heartbeat protocol.
@@ -439,6 +439,6 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
    * @param msg
    * @return
    */
-  ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg);
+  ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/c687bb5e/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index c3886b4..38963c4 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -297,10 +297,12 @@ public class StreamingContainer extends YarnContainerMain
 
     int exitStatus = 1; // interpreted as unrecoverable container failure
 
-    RecoverableRpcProxy rpcProxy = new RecoverableRpcProxy(appPath, new Configuration());
-    final StreamingContainerUmbilicalProtocol umbilical = rpcProxy.getProxy();
+    RecoverableRpcProxy rpcProxy = null;
+    StreamingContainerUmbilicalProtocol umbilical = null;
     final String childId = System.getProperty(StreamingApplication.DT_PREFIX + "cid");
     try {
+      rpcProxy = new RecoverableRpcProxy(appPath, new Configuration());
+      umbilical = rpcProxy.getProxy();
       StreamingContainerContext ctx = umbilical.getInitContext(childId);
       StreamingContainer stramChild = new StreamingContainer(childId, umbilical);
       logger.debug("Container Context = {}", ctx);
@@ -312,25 +314,24 @@ public class StreamingContainer extends YarnContainerMain
       } finally {
         stramChild.teardown();
       }
-    } catch (Error error) {
-      logger.error("Fatal error in container!", error);
+    } catch (Error | Exception e) {
+      logger.error("Fatal {} in container!", (e instanceof Error) ? "Error" : "Exception",
e);
       /* Report back any failures, for diagnostic purposes */
-      String msg = ExceptionUtils.getStackTrace(error);
-      umbilical.reportError(childId, null, "FATAL: " + msg);
-    } catch (Exception exception) {
-      logger.error("Fatal exception in container!", exception);
-      /* Report back any failures, for diagnostic purposes */
-      String msg = ExceptionUtils.getStackTrace(exception);
-      umbilical.reportError(childId, null, msg);
+      try {
+        umbilical.reportError(childId, null, ExceptionUtils.getStackTrace(e));
+      } catch (Exception ex) {
+        logger.debug("Fail to log", ex);
+      }
     } finally {
-      rpcProxy.close();
+      if (rpcProxy != null) {
+        rpcProxy.close();
+      }
       DefaultMetricsSystem.shutdown();
       logger.info("Exit status for container: {}", exitStatus);
       LogManager.shutdown();
-    }
-
-    if (exitStatus != 0) {
-      System.exit(exitStatus);
+      if (exitStatus != 0) {
+        System.exit(exitStatus);
+      }
     }
   }
 
@@ -601,8 +602,8 @@ public class StreamingContainer extends YarnContainerMain
 
   public void heartbeatLoop() throws Exception
   {
-    umbilical.log(containerId, "[" + containerId + "] Entering heartbeat loop..");
     logger.debug("Entering heartbeat loop (interval is {} ms)", this.heartbeatIntervalMillis);
+    umbilical.log(containerId, "[" + containerId + "] Entering heartbeat loop..");
     final YarnConfiguration conf = new YarnConfiguration();
     long tokenLifeTime = (long)(containerContext.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR)
* containerContext.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME));
     long expiryTime = System.currentTimeMillis();
@@ -718,7 +719,7 @@ public class StreamingContainer extends YarnContainerMain
       } while (rsp.hasPendingRequests);
 
     }
-    logger.debug("Exiting hearbeat loop");
+    logger.debug("[{}] Exiting heartbeat loop", containerId);
     umbilical.log(containerId, "[" + containerId + "] Exiting heartbeat loop..");
   }
 
@@ -827,7 +828,7 @@ public class StreamingContainer extends YarnContainerMain
         try {
           umbilical.log(this.containerId, "deploy request failed: " + rsp.deployRequest +
" " + ExceptionUtils.getStackTrace(e));
         } catch (IOException ioe) {
-          // ignore
+          logger.debug("Fail to log", ioe);
         }
         this.exitHeartbeatLoop = true;
         throw new IllegalStateException("Deploy request failed: " + rsp.deployRequest, e);
@@ -1436,19 +1437,32 @@ public class StreamingContainer extends YarnContainerMain
               logger.error("Voluntary container termination due to an error in operator {}.",
currentdi, error);
               operators = new int[]{currentdi.id};
             }
-            umbilical.reportError(containerId, operators, "Voluntary container termination
due to an error. " + ExceptionUtils.getStackTrace(error));
-            System.exit(1);
+            try {
+              umbilical.reportError(containerId, operators, "Voluntary container termination
due to an error. " + ExceptionUtils.getStackTrace(error));
+            } catch (Exception e) {
+              logger.debug("Fail to log", e);
+            } finally {
+              System.exit(1);
+            }
           } catch (Exception ex) {
             if (currentdi == null) {
               failedNodes.add(ndi.id);
               logger.error("Operator set {} stopped running due to an exception.", setOperators,
ex);
               int[] operators = new int[]{ndi.id};
-              umbilical.reportError(containerId, operators, "Stopped running due to an exception.
" + ExceptionUtils.getStackTrace(ex));
+              try {
+                umbilical.reportError(containerId, operators, "Stopped running due to an
exception. " + ExceptionUtils.getStackTrace(ex));
+              } catch (Exception e) {
+                logger.debug("Fail to log", e);
+              }
             } else {
               failedNodes.add(currentdi.id);
               logger.error("Abandoning deployment of operator {} due to setup failure.",
currentdi, ex);
               int[] operators = new int[]{currentdi.id};
-              umbilical.reportError(containerId, operators, "Abandoning deployment due to
setup failure. " + ExceptionUtils.getStackTrace(ex));
+              try {
+                umbilical.reportError(containerId, operators, "Abandoning deployment due
to setup failure. " + ExceptionUtils.getStackTrace(ex));
+              } catch (Exception e) {
+                logger.debug("Fail to log", e);
+              }
             }
           } finally {
             if (setOperators.contains(ndi)) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/c687bb5e/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index 74e18ee..e8ec26c 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -40,6 +40,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -474,7 +475,7 @@ public class StramRecoveryTest
 
     StreamingContainerUmbilicalProtocol impl = Mockito.mock(StreamingContainerUmbilicalProtocol.class,
Mockito.withSettings().extraInterfaces(Closeable.class));
 
-    Mockito.doAnswer(new org.mockito.stubbing.Answer<Void>()
+    final Answer<Void> answer = new Answer<Void>()
     {
       @Override
       public Void answer(InvocationOnMock invocation)
@@ -491,8 +492,9 @@ public class StramRecoveryTest
         }
         return null;
       }
-    })
-    .when(impl).log("containerId", "timeout");
+    };
+    Mockito.doAnswer(answer).when(impl).log("containerId", "timeout");
+    Mockito.doAnswer(answer).when(impl).reportError("containerId", null, "timeout");
 
     Server server = new RPC.Builder(conf).setProtocol(StreamingContainerUmbilicalProtocol.class).setInstance(impl)
         .setBindAddress("0.0.0.0").setPort(0).setNumHandlers(1).setVerbose(false).build();
@@ -546,7 +548,7 @@ public class StramRecoveryTest
 
     rp = new RecoverableRpcProxy(appPath, conf);
     protocolProxy = rp.getProxy();
-    protocolProxy.log("containerId", "msg");
+    protocolProxy.reportError("containerId", null, "msg");
     try {
       protocolProxy.log("containerId", "timeout");
       Assert.fail("expected socket timeout");
@@ -562,7 +564,7 @@ public class StramRecoveryTest
     uri = RecoverableRpcProxy.toConnectURI(address);
     recoveryHandler.writeConnectUri(uri.toString());
 
-    protocolProxy.log("containerId", "timeout");
+    protocolProxy.reportError("containerId", null, "timeout");
     Assert.assertTrue("timedout", timedout.get());
 
     restoreSystemProperty(RecoverableRpcProxy.RPC_TIMEOUT, rpcTimeout);


Mime
View raw message