bookkeeper-distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [10/29] incubator-distributedlog git commit: Simplify the state transition on stream
Date Wed, 21 Dec 2016 08:00:18 GMT
Simplify the state transition on stream

* the stream is created on INITIALIZING
* when the stream is started, it would start transition from INITIALIZING to INITIALIZED
* it would serve stream operations when the stream is INITIALIZED
* it would be turned to ERROR when encountered exceptions.
* the stream would be closed when service operation timeout or encountered any exceptions. it would first be removed from acquired mapping
* the stream would be removed from cached mapping depends on probation time.

    RB_ID=848047


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/f19e7564
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/f19e7564
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/f19e7564

Branch: refs/heads/merge/DL-98
Commit: f19e7564ff4a1ec1b5d6f2683db190d739df99bb
Parents: 0a18f56
Author: Leigh Stewart <lstewart@twitter.com>
Authored: Mon Dec 12 16:49:26 2016 -0800
Committer: Sijie Guo <sijieg@twitter.com>
Committed: Mon Dec 12 16:49:26 2016 -0800

----------------------------------------------------------------------
 .../service/DistributedLogServiceImpl.java      |  27 +-
 .../service/stream/StreamImpl.java              | 550 +++++++------------
 .../service/stream/StreamManager.java           |   5 +-
 .../service/stream/StreamManagerImpl.java       |  15 +-
 .../service/TestDistributedLogService.java      |  20 +-
 5 files changed, 222 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index 751e972..3a9b904 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -90,7 +90,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -378,7 +377,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
                     // if it is closed, we would not acquire stream again.
                     return null;
                 }
-                writer = streamManager.getOrCreateStream(stream);
+                writer = streamManager.getOrCreateStream(stream, true);
             } finally {
                 closeLock.readLock().unlock();
             }
@@ -631,26 +630,6 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
         logger.info("Released KeepAlive Latch. Main thread will shut the service down.");
     }
 
-    @VisibleForTesting
-    java.util.concurrent.Future<?> schedule(Runnable runnable, long delayMs) {
-        closeLock.readLock().lock();
-        try {
-            if (serverStatus != ServerStatus.WRITE_AND_ACCEPT) {
-                return null;
-            } else if (delayMs > 0) {
-                return scheduler.schedule(runnable, delayMs, TimeUnit.MILLISECONDS);
-            } else {
-                return scheduler.submit(runnable);
-            }
-        } catch (RejectedExecutionException ree) {
-            logger.error("Failed to schedule task {} in {} ms : ",
-                    new Object[] { runnable, delayMs, ree });
-            return null;
-        } finally {
-            closeLock.readLock().unlock();
-        }
-    }
-
     // Test methods.
 
     private DynamicDistributedLogConfiguration getDynConf(String streamName) {
@@ -664,8 +643,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
     }
 
     @VisibleForTesting
-    Stream newStream(String name) {
-        return streamFactory.create(name, getDynConf(name), streamManager);
+    Stream newStream(String name) throws IOException {
+        return streamManager.getOrCreateStream(name, false);
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
index 1204d39..3d5b9e7 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
@@ -26,7 +26,6 @@ import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.DistributedLogManager;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.DLException;
-import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
 import com.twitter.distributedlog.exceptions.OverCapacityException;
 import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
 import com.twitter.distributedlog.exceptions.StreamNotReadyException;
@@ -70,24 +69,23 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class StreamImpl implements Stream {
     static final Logger logger = LoggerFactory.getLogger(StreamImpl.class);
 
+    /**
+     * The status of the stream.
+     *
+     * The status change of the stream should just go in one direction. If a stream hits
+     * any error, the stream should be put in error state. If a stream is in error state,
+     * it should be removed and not reused anymore.
+     */
     public static enum StreamStatus {
         UNINITIALIZED(-1),
         INITIALIZING(0),
         INITIALIZED(1),
-        // if a stream is in failed state, it could be retried immediately.
-        // a stream will be put in failed state when encountered any stream exception.
-        FAILED(-2),
-        // if a stream is in backoff state, it would backoff for a while.
-        // a stream will be put in backoff state when failed to acquire the ownership.
-        BACKOFF(-3),
         CLOSING(-4),
         CLOSED(-5),
         // if a stream is in error state, it should be abort during closing.
@@ -112,26 +110,15 @@ public class StreamImpl implements Stream {
     private final Partition partition;
     private DistributedLogManager manager;
 
-    // A write has been attempted since the last stream acquire.
-    private volatile boolean writeSinceLastAcquire = false;
     private volatile AsyncLogWriter writer;
     private volatile StreamStatus status;
     private volatile String owner;
     private volatile Throwable lastException;
-    private volatile boolean running = true;
-    private volatile boolean suspended = false;
     private volatile Queue<StreamOp> pendingOps = new ArrayDeque<StreamOp>();
 
     private final Promise<Void> closePromise = new Promise<Void>();
     private final Object txnLock = new Object();
     private final TimeSequencer sequencer = new TimeSequencer();
-    // last acquire time
-    private final Stopwatch lastAcquireWatch = Stopwatch.createUnstarted();
-    // last acquire failure time
-    private final Stopwatch lastAcquireFailureWatch = Stopwatch.createUnstarted();
-    private final long nextAcquireWaitTimeMs;
-    private ScheduledFuture<?> tryAcquireScheduledFuture = null;
-    private long scheduledAcquireDelayMs = 0L;
     private final StreamRequestLimiter limiter;
     private final DynamicDistributedLogConfiguration dynConf;
     private final DistributedLogConfiguration dlConfig;
@@ -165,7 +152,7 @@ public class StreamImpl implements Stream {
         new ConcurrentHashMap<String, Counter>();
 
     // Since we may create and discard streams at initialization if there's a race,
-    // must not do any expensive intialization here (particularly any locking or
+    // must not do any expensive initialization here (particularly any locking or
     // significant resource allocation etc.).
     StreamImpl(final String name,
                final Partition partition,
@@ -189,7 +176,6 @@ public class StreamImpl implements Stream {
         this.partition = partition;
         this.status = StreamStatus.UNINITIALIZED;
         this.lastException = new IOException("Fail to write record to stream " + name);
-        this.nextAcquireWaitTimeMs = dlConfig.getZKSessionTimeoutMilliseconds() * 3 / 5;
         this.streamConfigProvider = streamConfigProvider;
         this.dlNamespace = dlNamespace;
         this.featureRateLimitDisabled = featureProvider.getFeature(
@@ -275,54 +261,16 @@ public class StreamImpl implements Stream {
         return String.format("Stream:%s, %s, %s Status:%s", name, manager, writer, status);
     }
 
-    // schedule stream acquistion
-    private void tryAcquireStreamOnce() {
-        if (!running) {
-            return;
-        }
-
-        boolean needAcquire = false;
-        boolean checkNextTime = false;
-        synchronized (this) {
-            switch (this.status) {
-            case INITIALIZING:
-                streamManager.notifyReleased(this);
-                needAcquire = true;
-                break;
-            case FAILED:
-                this.status = StreamStatus.INITIALIZING;
-                streamManager.notifyReleased(this);
-                needAcquire = true;
-                break;
-            case BACKOFF:
-                // We may end up here after timeout on streamLock. To avoid acquire on every timeout
-                // we should only try again if a write has been attempted since the last acquire
-                // attempt. If we end up here because the request handler woke us up, the flag will
-                // be set and we will try to acquire as intended.
-                if (writeSinceLastAcquire) {
-                    this.status = StreamStatus.INITIALIZING;
-                    streamManager.notifyReleased(this);
-                    needAcquire = true;
-                } else {
-                    checkNextTime = true;
-                }
-                break;
-            default:
-                break;
-            }
-        }
-        if (needAcquire) {
-            lastAcquireWatch.reset().start();
-            acquireStream().addEventListener(new FutureEventListener<Boolean>() {
+    @Override
+    public void start() {
+        // acquire the stream
+        acquireStream().addEventListener(new FutureEventListener<Boolean>() {
                 @Override
                 public void onSuccess(Boolean success) {
-                    synchronized (StreamImpl.this) {
-                        scheduledAcquireDelayMs = 0L;
-                        tryAcquireScheduledFuture = null;
-                    }
                     if (!success) {
-                        // schedule acquire in nextAcquireWaitTimeMs
-                        scheduleTryAcquireOnce(nextAcquireWaitTimeMs);
+                        // failed to acquire the stream. set the stream in error status and close it.
+                        setStreamInErrorStatus();
+                        requestClose("Failed to acquire the ownership");
                     }
                 }
 
@@ -330,65 +278,40 @@ public class StreamImpl implements Stream {
                 public void onFailure(Throwable cause) {
                     // unhandled exceptions
                     logger.error("Stream {} threw unhandled exception : ", name, cause);
+                    // failed to acquire the stream. set the stream in error status and close it.
                     setStreamInErrorStatus();
                     requestClose("Unhandled exception");
                 }
             });
-        } else if (StreamStatus.isUnavailable(status)) {
-            // if the stream is unavailable, stop the thread and close the stream
-            requestClose("Stream is unavailable anymore");
-        } else if (StreamStatus.INITIALIZED != status && lastAcquireWatch.elapsed(TimeUnit.HOURS) > 2) {
-            // if the stream isn't in initialized state and no writes coming in, then close the stream
-            requestClose("Stream not used anymore");
-        } else if (checkNextTime) {
-            synchronized (StreamImpl.this) {
-                scheduledAcquireDelayMs = 0L;
-                tryAcquireScheduledFuture = null;
-            }
-            // schedule acquire in nextAcquireWaitTimeMs
-            scheduleTryAcquireOnce(nextAcquireWaitTimeMs);
-        }
     }
 
-    private synchronized void scheduleTryAcquireOnce(long delayMs) {
-        if (null != tryAcquireScheduledFuture) {
-            if (delayMs <= 0) {
-                if (scheduledAcquireDelayMs <= 0L ||
-                        (scheduledAcquireDelayMs > 0L
-                                && !tryAcquireScheduledFuture.cancel(false))) {
-                    return;
-                }
-                // if the scheduled one could be cancelled, re-submit one
-            } else {
-                return;
+    //
+    // Stats Operations
+    //
+
+    void countException(Throwable t, StatsLogger streamExceptionLogger) {
+        String exceptionName = null == t ? "null" : t.getClass().getName();
+        Counter counter = exceptionCounters.get(exceptionName);
+        if (null == counter) {
+            counter = exceptionStatLogger.getCounter(exceptionName);
+            Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName, counter);
+            if (null != oldCounter) {
+                counter = oldCounter;
             }
         }
-        tryAcquireScheduledFuture = schedule(new Runnable() {
-            @Override
-            public void run() {
-                tryAcquireStreamOnce();
-            }
-        }, delayMs);
-        scheduledAcquireDelayMs = delayMs;
+        counter.inc();
+        streamExceptionLogger.getCounter(exceptionName).inc();
     }
 
-    @Override
-    public void start() {
-        scheduleTryAcquireOnce(0);
+    boolean isCriticalException(Throwable cause) {
+        return !(cause instanceof OwnershipAcquireFailedException);
     }
 
-    ScheduledFuture<?> schedule(Runnable runnable, long delayMs) {
-        if (!running) {
-            return null;
-        }
-        try {
-            return scheduler.schedule(name, runnable, delayMs, TimeUnit.MILLISECONDS);
-        } catch (RejectedExecutionException ree) {
-            logger.error("Failed to schedule task {} in {} ms : ",
-                    new Object[] { runnable, delayMs, ree });
-            return null;
-        }
-    }
+    //
+    // Service Timeout:
+    // - schedule a timeout function to handle operation timeouts: {@link #handleServiceTimeout(String)}
+    // - if the operation is completed within timeout period, cancel the timeout.
+    //
 
     void scheduleTimeout(final StreamOp op) {
         final Timeout timeout = requestTimer.newTimeout(new TimerTask() {
@@ -418,12 +341,14 @@ public class StreamImpl implements Stream {
      * stream off the proxy for a period of time, hopefully long enough for the
      * issues to be resolved, or for whoop to kick in and kill the shard.
      */
-    synchronized void handleServiceTimeout(String reason) {
-        if (StreamStatus.isUnavailable(status)) {
-            return;
+    void handleServiceTimeout(String reason) {
+        synchronized (this) {
+            if (StreamStatus.isUnavailable(status)) {
+                return;
+            }
+            // Mark stream in error state
+            setStreamInErrorStatus();
         }
-        // Mark stream in error state
-        setStreamInErrorStatus();
 
         // Async close request, and schedule eviction when its done.
         Future<Void> closeFuture = requestClose(reason, false /* dont remove */);
@@ -436,6 +361,10 @@ public class StreamImpl implements Stream {
         });
     }
 
+    //
+    // Submit the operation to the stream.
+    //
+
     /**
      * Execute the StreamOp. If reacquire is needed, this may initiate reacquire and queue the op for
      * execution once complete.
@@ -445,9 +374,6 @@ public class StreamImpl implements Stream {
      */
     @Override
     public void submit(StreamOp op) {
-        // Let stream acquire thread know a write has been attempted.
-        writeSinceLastAcquire = true;
-
         try {
             limiter.apply(op);
         } catch (OverCapacityException ex) {
@@ -460,36 +386,28 @@ public class StreamImpl implements Stream {
             scheduleTimeout(op);
         }
 
-        boolean notifyAcquireThread = false;
         boolean completeOpNow = false;
         boolean success = true;
         if (StreamStatus.isUnavailable(status)) {
             // Stream is closed, fail the op immediately
             op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
             return;
-        } if (StreamStatus.INITIALIZED == status && writer != null) {
+        } else if (StreamStatus.INITIALIZED == status && writer != null) {
             completeOpNow = true;
             success = true;
         } else {
             synchronized (this) {
                 if (StreamStatus.isUnavailable(status)) {
-                    // complete the write op as {@link #executeOp(op, success)} will handle closed case.
-                    completeOpNow = true;
-                    success = true;
+                    // Stream is closed, fail the op immediately
+                    op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
+                    return;
                 } if (StreamStatus.INITIALIZED == status) {
                     completeOpNow = true;
                     success = true;
-                } else if (StreamStatus.BACKOFF == status &&
-                        lastAcquireFailureWatch.elapsed(TimeUnit.MILLISECONDS) < nextAcquireWaitTimeMs) {
-                    completeOpNow = true;
-                    success = false;
                 } else if (failFastOnStreamNotReady) {
-                    notifyAcquireThread = true;
-                    completeOpNow = false;
-                    success = false;
                     op.fail(new StreamNotReadyException("Stream " + name + " is not ready; status = " + status));
-                } else { // closing & initializing
-                    notifyAcquireThread = true;
+                    return;
+                } else { // the stream is still initializing
                     pendingOps.add(op);
                     pendingOpsCounter.inc();
                     if (1 == pendingOps.size()) {
@@ -500,14 +418,15 @@ public class StreamImpl implements Stream {
                 }
             }
         }
-        if (notifyAcquireThread && !suspended) {
-            scheduleTryAcquireOnce(0L);
-        }
         if (completeOpNow) {
             executeOp(op, success);
         }
     }
 
+    //
+    // Execute operations and handle exceptions on operations
+    //
+
     /**
      * Execute the <i>op</i> immediately.
      *
@@ -516,20 +435,7 @@ public class StreamImpl implements Stream {
      * @param success
      *          whether the operation is success or not.
      */
-    void executeOp(StreamOp op, boolean success) {
-        closeLock.readLock().lock();
-        try {
-            if (StreamStatus.isUnavailable(status)) {
-                op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
-                return;
-            }
-            doExecuteOp(op, success);
-        } finally {
-            closeLock.readLock().unlock();
-        }
-    }
-
-    private void doExecuteOp(final StreamOp op, boolean success) {
+    void executeOp(final StreamOp op, boolean success) {
         final AsyncLogWriter writer;
         final Throwable lastException;
         synchronized (this) {
@@ -552,7 +458,7 @@ public class StreamImpl implements Stream {
                         case FOUND:
                             assert(cause instanceof OwnershipAcquireFailedException);
                             countAsException = false;
-                            handleOwnershipAcquireFailedException(op, (OwnershipAcquireFailedException) cause);
+                            handleExceptionOnStreamOp(op, cause);
                             break;
                         case ALREADY_CLOSED:
                             assert(cause instanceof AlreadyClosedException);
@@ -573,13 +479,14 @@ public class StreamImpl implements Stream {
                         case OVER_CAPACITY:
                             op.fail(cause);
                             break;
-                        // exceptions that *could* / *might* be recovered by creating a new writer
+                        // the DL writer hits exception, simple set the stream to error status
+                        // and fail the request
                         default:
-                            handleRecoverableDLException(op, cause);
+                            handleExceptionOnStreamOp(op, cause);
                             break;
                         }
                     } else {
-                        handleUnknownException(op, cause);
+                        handleExceptionOnStreamOp(op, cause);
                     }
                     if (countAsException) {
                         countException(cause, streamExceptionStatLogger);
@@ -587,88 +494,41 @@ public class StreamImpl implements Stream {
                 }
             });
         } else {
-            op.fail(lastException);
-        }
-    }
-
-    /**
-     * Handle recoverable dl exception.
-     *
-     * @param op
-     *          stream operation executing
-     * @param cause
-     *          exception received when executing <i>op</i>
-     */
-    private void handleRecoverableDLException(StreamOp op, final Throwable cause) {
-        AsyncLogWriter oldWriter = null;
-        boolean statusChanged = false;
-        synchronized (this) {
-            if (StreamStatus.INITIALIZED == status) {
-                oldWriter = setStreamStatus(StreamStatus.FAILED, StreamStatus.INITIALIZED,
-                        null, null, cause);
-                statusChanged = true;
+            if (null != lastException) {
+                op.fail(lastException);
+            } else {
+                op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
             }
         }
-        if (statusChanged) {
-            Abortables.asyncAbort(oldWriter, false);
-            logger.error("Failed to write data into stream {} : ", name, cause);
-            scheduleTryAcquireOnce(0L);
-        }
-        op.fail(cause);
     }
 
     /**
-     * Handle unknown exception when executing <i>op</i>.
+     * Handle exception when executing <i>op</i>.
      *
      * @param op
      *          stream operation executing
      * @param cause
      *          exception received when executing <i>op</i>
      */
-    private void handleUnknownException(StreamOp op, final Throwable cause) {
+    private void handleExceptionOnStreamOp(StreamOp op, final Throwable cause) {
         AsyncLogWriter oldWriter = null;
         boolean statusChanged = false;
         synchronized (this) {
             if (StreamStatus.INITIALIZED == status) {
-                oldWriter = setStreamStatus(StreamStatus.FAILED, StreamStatus.INITIALIZED,
-                        null, null, cause);
+                oldWriter = setStreamStatus(StreamStatus.ERROR, StreamStatus.INITIALIZED, null, cause);
                 statusChanged = true;
             }
         }
         if (statusChanged) {
             Abortables.asyncAbort(oldWriter, false);
-            logger.error("Failed to write data into stream {} : ", name, cause);
-            scheduleTryAcquireOnce(0L);
-        }
-        op.fail(cause);
-    }
-
-    /**
-     * Handle losing ownership during executing <i>op</i>.
-     *
-     * @param op
-     *          stream operation executing
-     * @param oafe
-     *          the ownership exception received when executing <i>op</i>
-     */
-    private void handleOwnershipAcquireFailedException(StreamOp op, final OwnershipAcquireFailedException oafe) {
-        logger.warn("Failed to write data into stream {} because stream is acquired by {} : {}",
-                new Object[]{name, oafe.getCurrentOwner(), oafe.getMessage()});
-        AsyncLogWriter oldWriter = null;
-        boolean statusChanged = false;
-        synchronized (this) {
-            if (StreamStatus.INITIALIZED == status) {
-                oldWriter =
-                    setStreamStatus(StreamStatus.BACKOFF, StreamStatus.INITIALIZED,
-                            null, oafe.getCurrentOwner(), oafe);
-                statusChanged = true;
+            if (isCriticalException(cause)) {
+                logger.error("Failed to write data into stream {} : ", name, cause);
+            } else {
+                logger.warn("Failed to write data into stream {} : {}", name, cause.getMessage());
             }
+            requestClose("Failed to write data into stream " + name + " : " + cause.getMessage());
         }
-        if (statusChanged) {
-            Abortables.asyncAbort(oldWriter, false);
-            scheduleTryAcquireOnce(nextAcquireWaitTimeMs);
-        }
-        op.fail(oafe);
+        op.fail(cause);
     }
 
     /**
@@ -680,129 +540,126 @@ public class StreamImpl implements Stream {
         fatalErrorHandler.notifyFatalError();
     }
 
-    void countException(Throwable t, StatsLogger streamExceptionLogger) {
-        String exceptionName = null == t ? "null" : t.getClass().getName();
-        Counter counter = exceptionCounters.get(exceptionName);
-        if (null == counter) {
-            counter = exceptionStatLogger.getCounter(exceptionName);
-            Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName, counter);
-            if (null != oldCounter) {
-                counter = oldCounter;
-            }
-        }
-        counter.inc();
-        streamExceptionLogger.getCounter(exceptionName).inc();
-    }
+    //
+    // Acquire streams
+    //
 
     Future<Boolean> acquireStream() {
-        // Reset this flag so the acquire thread knows whether re-acquire is needed.
-        writeSinceLastAcquire = false;
-
         final Stopwatch stopwatch = Stopwatch.createStarted();
         final Promise<Boolean> acquirePromise = new Promise<Boolean>();
         manager.openAsyncLogWriter().addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() {
 
             @Override
             public void onSuccess(AsyncLogWriter w) {
-                synchronized (txnLock) {
-                    sequencer.setLastId(w.getLastTxId());
-                }
-                AsyncLogWriter oldWriter;
-                Queue<StreamOp> oldPendingOps;
-                boolean success;
-                synchronized (StreamImpl.this) {
-                    oldWriter = setStreamStatus(StreamStatus.INITIALIZED,
-                            StreamStatus.INITIALIZING, w, null, null);
-                    oldPendingOps = pendingOps;
-                    pendingOps = new ArrayDeque<StreamOp>();
-                    success = true;
-                }
-                // check if the stream is allowed to be acquired
-                if (!streamManager.allowAcquire(StreamImpl.this)) {
-                    if (null != oldWriter) {
-                        Abortables.asyncAbort(oldWriter, true);
-                    }
-                    int maxAcquiredPartitions = dynConf.getMaxAcquiredPartitionsPerProxy();
-                    StreamUnavailableException sue = new StreamUnavailableException("Stream " + partition.getStream()
-                            + " is not allowed to acquire more than " + maxAcquiredPartitions + " partitions");
-                    countException(sue, exceptionStatLogger);
-                    logger.error("Failed to acquire stream {} because it is unavailable : {}",
-                            name, sue.getMessage());
-                    synchronized (this) {
-                        oldWriter = setStreamStatus(StreamStatus.ERROR,
-                                StreamStatus.INITIALIZED, null, null, sue);
-                        // we don't switch the pending ops since they are already switched
-                        // when setting the status to initialized
-                        success = false;
-                    }
-                }
-                processPendingRequestsAfterOpen(success, oldWriter, oldPendingOps);
+                onAcquireStreamSuccess(w, stopwatch, acquirePromise);
             }
 
             @Override
             public void onFailure(Throwable cause) {
-                AsyncLogWriter oldWriter;
-                Queue<StreamOp> oldPendingOps;
-                boolean success;
-                if (cause instanceof AlreadyClosedException) {
-                    countException(cause, streamExceptionStatLogger);
-                    handleAlreadyClosedException((AlreadyClosedException) cause);
-                    return;
-                } else if (cause instanceof OwnershipAcquireFailedException) {
-                    OwnershipAcquireFailedException oafe = (OwnershipAcquireFailedException) cause;
-                    logger.warn("Failed to acquire stream ownership for {}, current owner is {} : {}",
-                            new Object[]{name, oafe.getCurrentOwner(), oafe.getMessage()});
-                    synchronized (StreamImpl.this) {
-                        oldWriter = setStreamStatus(StreamStatus.BACKOFF,
-                                StreamStatus.INITIALIZING, null, oafe.getCurrentOwner(), oafe);
-                        oldPendingOps = pendingOps;
-                        pendingOps = new ArrayDeque<StreamOp>();
-                        success = false;
-                    }
-                } else if (cause instanceof InvalidStreamNameException) {
-                    InvalidStreamNameException isne = (InvalidStreamNameException) cause;
-                    countException(isne, streamExceptionStatLogger);
-                    logger.error("Failed to acquire stream {} due to its name is invalid", name);
-                    synchronized (StreamImpl.this) {
-                        oldWriter = setStreamStatus(StreamStatus.ERROR,
-                                StreamStatus.INITIALIZING, null, null, isne);
-                        oldPendingOps = pendingOps;
-                        pendingOps = new ArrayDeque<StreamOp>();
-                        success = false;
-                    }
-                } else {
-                    countException(cause, streamExceptionStatLogger);
-                    logger.error("Failed to initialize stream {} : ", name, cause);
-                    synchronized (StreamImpl.this) {
-                        oldWriter = setStreamStatus(StreamStatus.FAILED,
-                                StreamStatus.INITIALIZING, null, null, cause);
-                        oldPendingOps = pendingOps;
-                        pendingOps = new ArrayDeque<StreamOp>();
-                        success = false;
-                    }
-                }
-                processPendingRequestsAfterOpen(success, oldWriter, oldPendingOps);
+                onAcquireStreamFailure(cause, stopwatch, acquirePromise);
             }
 
-            void processPendingRequestsAfterOpen(boolean success,
-                                                 AsyncLogWriter oldWriter,
-                                                 Queue<StreamOp> oldPendingOps) {
-                if (success) {
-                    streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
-                } else {
-                    streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
-                }
-                for (StreamOp op : oldPendingOps) {
-                    executeOp(op, success);
-                    pendingOpsCounter.dec();
-                }
-                Abortables.asyncAbort(oldWriter, true);
-                FutureUtils.setValue(acquirePromise, success);
-            }
         }, scheduler, getStreamName()));
         return acquirePromise;
     }
 
+    private void onAcquireStreamSuccess(AsyncLogWriter w,
+                                        Stopwatch stopwatch,
+                                        Promise<Boolean> acquirePromise) {
+        synchronized (txnLock) {
+            sequencer.setLastId(w.getLastTxId());
+        }
+        AsyncLogWriter oldWriter;
+        Queue<StreamOp> oldPendingOps;
+        boolean success;
+        synchronized (StreamImpl.this) {
+            oldWriter = setStreamStatus(StreamStatus.INITIALIZED,
+                    StreamStatus.INITIALIZING, w, null);
+            oldPendingOps = pendingOps;
+            pendingOps = new ArrayDeque<StreamOp>();
+            success = true;
+        }
+        // check if the stream is allowed to be acquired
+        if (!streamManager.allowAcquire(StreamImpl.this)) {
+            if (null != oldWriter) {
+                Abortables.asyncAbort(oldWriter, true);
+            }
+            int maxAcquiredPartitions = dynConf.getMaxAcquiredPartitionsPerProxy();
+            StreamUnavailableException sue = new StreamUnavailableException("Stream " + partition.getStream()
+                    + " is not allowed to acquire more than " + maxAcquiredPartitions + " partitions");
+            countException(sue, exceptionStatLogger);
+            logger.error("Failed to acquire stream {} because it is unavailable : {}",
+                    name, sue.getMessage());
+            synchronized (this) {
+                oldWriter = setStreamStatus(StreamStatus.ERROR,
+                        StreamStatus.INITIALIZED, null, sue);
+                // we don't switch the pending ops since they are already switched
+                // when setting the status to initialized
+                success = false;
+            }
+        }
+        processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise);
+    }
+
+    private void onAcquireStreamFailure(Throwable cause,
+                                        Stopwatch stopwatch,
+                                        Promise<Boolean> acquirePromise) {
+        AsyncLogWriter oldWriter;
+        Queue<StreamOp> oldPendingOps;
+        boolean success;
+        if (cause instanceof AlreadyClosedException) {
+            countException(cause, streamExceptionStatLogger);
+            handleAlreadyClosedException((AlreadyClosedException) cause);
+            return;
+        } else {
+            if (isCriticalException(cause)) {
+                countException(cause, streamExceptionStatLogger);
+                logger.error("Failed to acquire stream {} : ", name, cause);
+            } else {
+                logger.warn("Failed to acquire stream {} : {}", name, cause.getMessage());
+            }
+            synchronized (StreamImpl.this) {
+                oldWriter = setStreamStatus(StreamStatus.ERROR,
+                        StreamStatus.INITIALIZING, null, cause);
+                oldPendingOps = pendingOps;
+                pendingOps = new ArrayDeque<StreamOp>();
+                success = false;
+            }
+        }
+        processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise);
+    }
+
+    /**
+     * Process the pending request after acquired stream.
+     *
+     * @param success whether the acquisition succeed or not
+     * @param oldWriter the old writer to abort
+     * @param oldPendingOps the old pending ops to execute
+     * @param stopwatch stopwatch to measure the time spent on acquisition
+     * @param acquirePromise the promise to complete the acquire operation
+     */
+    void processPendingRequestsAfterAcquire(boolean success,
+                                            AsyncLogWriter oldWriter,
+                                            Queue<StreamOp> oldPendingOps,
+                                            Stopwatch stopwatch,
+                                            Promise<Boolean> acquirePromise) {
+        if (success) {
+            streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        } else {
+            streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        }
+        for (StreamOp op : oldPendingOps) {
+            executeOp(op, success);
+            pendingOpsCounter.dec();
+        }
+        Abortables.asyncAbort(oldWriter, true);
+        FutureUtils.setValue(acquirePromise, success);
+    }
+
+    //
+    // Stream Status Changes
+    //
+
     synchronized void setStreamInErrorStatus() {
         if (StreamStatus.CLOSING == status || StreamStatus.CLOSED == status) {
             return;
@@ -819,8 +676,6 @@ public class StreamImpl implements Stream {
      *          old status
      * @param writer
      *          new log writer
-     * @param owner
-     *          new owner
      * @param t
      *          new exception
      * @return old writer if it exists
@@ -828,7 +683,6 @@ public class StreamImpl implements Stream {
     synchronized AsyncLogWriter setStreamStatus(StreamStatus newStatus,
                                                 StreamStatus oldStatus,
                                                 AsyncLogWriter writer,
-                                                String owner,
                                                 Throwable t) {
         if (oldStatus != this.status) {
             logger.info("Stream {} status already changed from {} -> {} when trying to change it to {}",
@@ -836,6 +690,11 @@ public class StreamImpl implements Stream {
             return null;
         }
 
+        String owner = null;
+        if (t instanceof OwnershipAcquireFailedException) {
+            owner = ((OwnershipAcquireFailedException) t).getCurrentOwner();
+        }
+
         AsyncLogWriter oldWriter = this.writer;
         this.writer = writer;
         if (null != owner && owner.equals(clientId)) {
@@ -852,10 +711,6 @@ public class StreamImpl implements Stream {
         }
         this.lastException = t;
         this.status = newStatus;
-        if (StreamStatus.BACKOFF == newStatus && null != owner) {
-            // start failure watch
-            this.lastAcquireFailureWatch.reset().start();
-        }
         if (StreamStatus.INITIALIZED == newStatus) {
             streamManager.notifyAcquired(this);
             logger.info("Inserted acquired stream {} -> writer {}", name, this);
@@ -866,12 +721,16 @@ public class StreamImpl implements Stream {
         return oldWriter;
     }
 
+    //
+    // Stream Close Functions
+    //
+
     void close(DistributedLogManager dlm) {
         if (null != dlm) {
             try {
                 dlm.close();
             } catch (IOException ioe) {
-                logger.warn("Failed to close dlm for {} : ", ioe);
+                logger.warn("Failed to close dlm for {} : ", name, ioe);
             }
         }
     }
@@ -902,12 +761,16 @@ public class StreamImpl implements Stream {
         // them.
         close(abort);
         if (uncache) {
+            final long probationTimeoutMs;
+            if (null != owner) {
+                probationTimeoutMs = 2 * dlConfig.getZKSessionTimeoutMilliseconds() / 3;
+            } else {
+                probationTimeoutMs = 0L;
+            }
             closePromise.onSuccess(new AbstractFunction1<Void, BoxedUnit>() {
                 @Override
                 public BoxedUnit apply(Void result) {
-                    if (streamManager.notifyRemoved(StreamImpl.this)) {
-                        logger.info("Removed cached stream {} after closed.", name);
-                    }
+                    streamManager.scheduleRemoval(StreamImpl.this, probationTimeoutMs);
                     return BoxedUnit.UNIT;
                 }
             });
@@ -949,14 +812,6 @@ public class StreamImpl implements Stream {
             closeLock.writeLock().unlock();
         }
         logger.info("Closing stream {} ...", name);
-        running = false;
-        // stop any outstanding ownership acquire actions first
-        synchronized (this) {
-            if (null != tryAcquireScheduledFuture) {
-                tryAcquireScheduledFuture.cancel(true);
-            }
-        }
-        logger.info("Stopped threads of stream {}.", name);
         // Close the writers to release the locks before failing the requests
         Future<Void> closeWriterFuture;
         if (abort) {
@@ -1016,19 +871,6 @@ public class StreamImpl implements Stream {
     // Test-only apis
 
     @VisibleForTesting
-    public StreamImpl suspendAcquiring() {
-        suspended = true;
-        return this;
-    }
-
-    @VisibleForTesting
-    public StreamImpl resumeAcquiring() {
-        suspended = false;
-        scheduleTryAcquireOnce(0L);
-        return this;
-    }
-
-    @VisibleForTesting
     public int numPendingOps() {
         Queue<StreamOp> queue = pendingOps;
         return null == queue ? 0 : queue.size();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
index 972eb55..e171e46 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
@@ -43,10 +43,11 @@ public interface StreamManager {
 
     /**
      * Get a cached stream and create a new one if it doesnt exist.
-     * @param stream name
+     * @param streamName stream name
+     * @param start whether to start the stream after it is created.
      * @return future satisfied once close complete
      */
-    Stream getOrCreateStream(String stream) throws IOException;
+    Stream getOrCreateStream(String streamName, boolean start) throws IOException;
 
     /**
      * Asynchronously create a new stream.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
index aa08a24..df336fe 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
@@ -33,15 +33,14 @@ import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
 import com.twitter.distributedlog.util.ConfUtils;
 import com.twitter.util.Future;
 import com.twitter.util.Promise;
-import java.io.IOException;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
@@ -233,7 +232,7 @@ public class StreamManagerImpl implements StreamManager {
     }
 
     @Override
-    public Stream getOrCreateStream(String streamName) throws IOException {
+    public Stream getOrCreateStream(String streamName, boolean start) throws IOException {
         Stream stream = streams.get(streamName);
         if (null == stream) {
             closeLock.readLock().lock();
@@ -261,7 +260,9 @@ public class StreamManagerImpl implements StreamManager {
                     numCached.getAndIncrement();
                     logger.info("Inserted mapping stream name {} -> stream {}", streamName, stream);
                     stream.initialize();
-                    stream.start();
+                    if (start) {
+                        stream.start();
+                    }
                 }
             } finally {
                 closeLock.readLock().unlock();
@@ -283,8 +284,10 @@ public class StreamManagerImpl implements StreamManager {
 
     @Override
     public void scheduleRemoval(final Stream stream, long delayMs) {
-        logger.info("Scheduling removal of stream {} from cache after {} sec.",
-            stream.getStreamName(), delayMs);
+        if (delayMs > 0) {
+            logger.info("Scheduling removal of stream {} from cache after {} sec.",
+                    stream.getStreamName(), delayMs);
+        }
         schedule(new Runnable() {
             @Override
             public void run() {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
index 17fae4a..4195ed3 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
@@ -89,7 +89,8 @@ public class TestDistributedLogService extends TestDistributedLogBase {
         dlConf.addConfiguration(conf);
         dlConf.setLockTimeout(0)
                 .setOutputBufferSize(0)
-                .setPeriodicFlushFrequencyMilliSeconds(10);
+                .setPeriodicFlushFrequencyMilliSeconds(10)
+                .setSchedulerShutdownTimeoutMs(100);
         serverConf = newLocalServerConf();
         uri = createDLMURI("/" + testName.getMethodName());
         ensureURICreated(uri);
@@ -171,10 +172,11 @@ public class TestDistributedLogService extends TestDistributedLogBase {
     public void testAcquireStreams() throws Exception {
         String streamName = testName.getMethodName();
         StreamImpl s0 = createUnstartedStream(service, streamName);
-        s0.suspendAcquiring();
-        DistributedLogServiceImpl service1 = createService(serverConf, dlConf);
+        ServerConfiguration serverConf1 = new ServerConfiguration();
+        serverConf1.addConfiguration(serverConf);
+        serverConf1.setServerPort(9999);
+        DistributedLogServiceImpl service1 = createService(serverConf1, dlConf);
         StreamImpl s1 = createUnstartedStream(service1, streamName);
-        s1.suspendAcquiring();
 
         // create write ops
         WriteOp op0 = createWriteOp(service, streamName, 0L);
@@ -190,7 +192,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
                 1, s1.numPendingOps());
 
         // start acquiring s0
-        s0.resumeAcquiring().start();
+        s0.start();
         WriteResponse wr0 = Await.result(op0.result());
         assertEquals("Op 0 should succeed",
                 StatusCode.SUCCESS, wr0.getHeader().getCode());
@@ -201,12 +203,12 @@ public class TestDistributedLogService extends TestDistributedLogBase {
         assertNull(s0.getLastException());
 
         // start acquiring s1
-        s1.resumeAcquiring().start();
+        s1.start();
         WriteResponse wr1 = Await.result(op1.result());
         assertEquals("Op 1 should fail",
                 StatusCode.FOUND, wr1.getHeader().getCode());
-        assertEquals("Service 1 should be in BACKOFF state",
-                StreamStatus.BACKOFF, s1.getStatus());
+        assertEquals("Service 1 should be in ERROR state",
+                StreamStatus.ERROR, s1.getStatus());
         assertNotNull(s1.getManager());
         assertNull(s1.getWriter());
         assertNotNull(s1.getLastException());
@@ -727,7 +729,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
 
         for (Stream s : streamManager.getAcquiredStreams().values()) {
             StreamImpl stream = (StreamImpl) s;
-            stream.setStatus(StreamStatus.FAILED);
+            stream.setStatus(StreamStatus.ERROR);
         }
 
         Future<List<Void>> closeResult = localService.closeStreams();


Mime
View raw message