bookkeeper-distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [30/31] incubator-distributedlog git commit: DL-164: Create stream operation should not be submitted by StreamImpl
Date Fri, 30 Dec 2016 00:07:44 GMT
DL-164: Create stream operation should not be submitted by StreamImpl


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/63d6bde1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/63d6bde1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/63d6bde1

Branch: refs/heads/master
Commit: 63d6bde1929085780171e63cc5b0c95581daa564
Parents: cfc049c
Author: Sijie Guo <sijieg@twitter.com>
Authored: Wed Dec 28 19:09:59 2016 -0800
Committer: Sijie Guo <sijieg@twitter.com>
Committed: Thu Dec 29 02:14:02 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKAsyncLogReader.java        | 21 ++++++++++++------
 .../distributedlog/ReadAheadEntryReader.java    | 14 ++++++++++--
 .../tools/DistributedLogTool.java               |  6 ++---
 distributedlog-service/pom.xml                  |  2 +-
 .../service/DistributedLogServiceImpl.java      |  1 +
 .../placement/LeastLoadPlacementPolicy.java     |  9 ++++++--
 .../service/placement/ServerLoad.java           | 23 +++++++++++---------
 .../service/placement/StreamLoad.java           |  3 +++
 .../src/main/resources/findbugsExclude.xml      |  4 ++++
 9 files changed, 57 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/63d6bde1/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
index cebbc33..aee4103 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java
@@ -28,6 +28,7 @@ import com.twitter.distributedlog.exceptions.IdleReaderException;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.exceptions.ReadCancelledException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
+import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.Utils;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
@@ -83,10 +84,11 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification
{
                 }
             };
 
+    private final String streamName;
     protected final BKDistributedLogManager bkDistributedLogManager;
     protected final BKLogReadHandler readHandler;
     private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
-    private final ScheduledExecutorService executorService;
+    private final OrderedScheduler scheduler;
     private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>();
     private final Object scheduleLock = new Object();
     private final AtomicLong scheduleCount = new AtomicLong(0);
@@ -208,13 +210,14 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification
{
     }
 
     BKAsyncLogReader(BKDistributedLogManager bkdlm,
-                     ScheduledExecutorService executorService,
+                     OrderedScheduler scheduler,
                      DLSN startDLSN,
                      Optional<String> subscriberId,
                      boolean returnEndOfStreamRecord,
                      StatsLogger statsLogger) {
+        this.streamName = bkdlm.getStreamName();
         this.bkDistributedLogManager = bkdlm;
-        this.executorService = executorService;
+        this.scheduler = scheduler;
         this.readHandler = bkDistributedLogManager.createReadHandler(subscriberId,
                 this, true);
         LOG.debug("Starting async reader at {}", startDLSN);
@@ -251,7 +254,7 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification
{
             // Except when idle reader threshold is less than a second (tests?)
             period = Math.min(period, idleErrorThresholdMillis / 5);
 
-            return executorService.scheduleAtFixedRate(new Runnable() {
+            return scheduler.scheduleAtFixedRate(streamName, new Runnable() {
                 @Override
                 public void run() {
                     PendingReadRequest nextRequest = pendingRequests.peek();
@@ -371,7 +374,7 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification
{
 
     @Override
     public String getStreamName() {
-        return bkDistributedLogManager.getStreamName();
+        return streamName;
     }
 
     /**
@@ -470,7 +473,7 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification
{
         long prevCount = scheduleCount.getAndIncrement();
         if (0 == prevCount) {
             scheduleDelayStopwatch.reset().start();
-            executorService.submit(this);
+            scheduler.submit(streamName, this);
         }
     }
 
@@ -659,7 +662,11 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification
{
                         scheduleDelayStopwatch.reset().start();
                         scheduleCount.set(0);
                         // the request could still wait for more records
-                        backgroundScheduleTask = executorService.schedule(BACKGROUND_READ_SCHEDULER,
remainingWaitTime, nextRequest.deadlineTimeUnit);
+                        backgroundScheduleTask = scheduler.schedule(
+                                streamName,
+                                BACKGROUND_READ_SCHEDULER,
+                                remainingWaitTime,
+                                nextRequest.deadlineTimeUnit);
                         return;
                     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/63d6bde1/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java
index 19f4497..40e3930 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java
@@ -54,6 +54,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -228,9 +229,12 @@ public class ReadAheadEntryReader implements
                 if (null != closePromise) {
                     return;
                 }
+            }
+            try {
                 safeRun();
+            } catch (Throwable cause) {
+                logger.error("Caught unexpected exception : ", cause);
             }
-
         }
 
         abstract void safeRun();
@@ -275,6 +279,7 @@ public class ReadAheadEntryReader implements
     // State of the reader
     //
 
+    private final AtomicBoolean started = new AtomicBoolean(false);
     private boolean isInitialized = false;
     private boolean readAheadPaused = false;
     private Promise<Void> closePromise = null;
@@ -428,6 +433,7 @@ public class ReadAheadEntryReader implements
     public void start(final List<LogSegmentMetadata> segmentList) {
         logger.info("Starting the readahead entry reader for {} : segments = {}",
                 readHandler.getFullyQualifiedName(), segmentList);
+        started.set(true);
         processLogSegments(segmentList);
     }
 
@@ -530,7 +536,7 @@ public class ReadAheadEntryReader implements
 
     void setLastException(IOException cause) {
         if (!lastException.compareAndSet(null, cause)) {
-            return;
+            logger.debug("last exception has already been set to ", lastException.get());
         }
         // the exception is set and notify the state change
         notifyStateChangeOnFailure(cause);
@@ -829,6 +835,7 @@ public class ReadAheadEntryReader implements
             }
             skipTruncatedLogSegments = false;
             if (!isAllowedToPosition(segment, dlsnToStart)) {
+                logger.error("segment {} is not allowed to position at {}", segment, dlsnToStart);
                 return;
             }
 
@@ -969,6 +976,9 @@ public class ReadAheadEntryReader implements
 
     @Override
     public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
+        if (!started.get()) {
+            return;
+        }
         logger.info("segments is updated with {}", segments);
         processLogSegments(segments);
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/63d6bde1/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
index 4565921..03d70bd 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java
@@ -188,10 +188,8 @@ public class DistributedLogTool extends Tool {
             try {
                 return runCmd();
             } finally {
-                synchronized (this) {
-                    if (null != namespace) {
-                        namespace.close();
-                    }
+                if (null != namespace) {
+                    namespace.close();
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/63d6bde1/distributedlog-service/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-service/pom.xml b/distributedlog-service/pom.xml
index e74d486..052ce15 100644
--- a/distributedlog-service/pom.xml
+++ b/distributedlog-service/pom.xml
@@ -130,7 +130,7 @@
     <dependency>
       <groupId>org.apache.curator</groupId>
       <artifactId>curator-test</artifactId>
-      <version>2.2.0-incubating</version>
+      <version>3.2.1</version>
       <scope>test</scope>
       <exclusions>
         <exclusion>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/63d6bde1/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index e7974c7..5dee7fd 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -59,6 +59,7 @@ import com.twitter.distributedlog.service.stream.StreamOp;
 import com.twitter.distributedlog.service.stream.StreamOpStats;
 import com.twitter.distributedlog.service.stream.TruncateOp;
 import com.twitter.distributedlog.service.stream.WriteOp;
+import com.twitter.distributedlog.service.stream.admin.StreamAdminOp;
 import com.twitter.distributedlog.service.stream.WriteOpWithPayload;
 import com.twitter.distributedlog.service.stream.admin.StreamAdminOp;
 import com.twitter.distributedlog.service.stream.limiter.ServiceRequestLimiter;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/63d6bde1/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
index e4c8128..8c8dc23 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
@@ -71,10 +71,15 @@ public class LeastLoadPlacementPolicy extends PlacementPolicy {
     });
   }
 
+  private synchronized String getStreamOwner(String stream) {
+    return streamToServer.get(stream);
+  }
+
   @Override
   public Future<String> placeStream(String stream) {
-    if (streamToServer.containsKey(stream)) {
-      return Future.value(streamToServer.get(stream));
+    String streamOwner = getStreamOwner(stream);
+    if (null != streamOwner) {
+      return Future.value(streamOwner);
     }
     Future<StreamLoad> streamLoadFuture = loadAppraiser.getStreamLoad(stream);
     return streamLoadFuture.map(new Function<StreamLoad, String>() {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/63d6bde1/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
index d7fbcf2..801e499 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
@@ -57,7 +57,7 @@ public class ServerLoad implements Comparable {
   synchronized public long removeStream(String stream) {
     for (StreamLoad streamLoad : streamLoads) {
       if (streamLoad.stream.equals(stream)) {
-        this.load -= load;
+        this.load -= streamLoad.getLoad();
         streamLoads.remove(streamLoad);
         return this.load;
       }
@@ -65,19 +65,19 @@ public class ServerLoad implements Comparable {
     return this.load; //Throwing an exception wouldn't help us as our logic should never
reach here
   }
 
-  public long getLoad() {
+  public synchronized long getLoad() {
     return load;
   }
 
-  public Set<StreamLoad> getStreamLoads() {
+  public synchronized Set<StreamLoad> getStreamLoads() {
     return streamLoads;
   }
 
-  public String getServer() {
+  public synchronized String getServer() {
     return server;
   }
 
-  protected com.twitter.distributedlog.service.placement.thrift.ServerLoad toThrift() {
+  protected synchronized com.twitter.distributedlog.service.placement.thrift.ServerLoad toThrift()
{
     com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad
         = new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
     tServerLoad.setServer(server);
@@ -125,9 +125,9 @@ public class ServerLoad implements Comparable {
   }
 
   @Override
-  public int compareTo(Object o) {
+  public synchronized int compareTo(Object o) {
     ServerLoad other = (ServerLoad) o;
-    if (load == other.load) {
+    if (load == other.getLoad()) {
       return server.compareTo(other.getServer());
     } else {
       return Long.compare(load, other.getLoad());
@@ -135,18 +135,21 @@ public class ServerLoad implements Comparable {
   }
 
   @Override
-  public boolean equals(Object o) {
+  public synchronized boolean equals(Object o) {
+    if (!(o instanceof ServerLoad)) {
+      return false;
+    }
     ServerLoad other = (ServerLoad) o;
     return server.equals(other.getServer()) && load == other.getLoad() &&
streamLoads.equals(other.getStreamLoads());
   }
 
   @Override
-  public String toString() {
+  public synchronized String toString() {
     return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", server, load,
streamLoads);
   }
 
   @Override
-  public int hashCode() {
+  public synchronized int hashCode() {
     return new HashCodeBuilder().append(server).append(load).append(streamLoads).build();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/63d6bde1/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
index 4f3dc71..d7b7efd 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
@@ -93,6 +93,9 @@ public class StreamLoad implements Comparable {
 
   @Override
   public boolean equals(Object o) {
+    if (!(o instanceof StreamLoad)) {
+      return false;
+    }
     StreamLoad other = (StreamLoad) o;
     return stream.equals(other.getStream()) && load == other.getLoad();
   }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/63d6bde1/distributedlog-service/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/resources/findbugsExclude.xml b/distributedlog-service/src/main/resources/findbugsExclude.xml
index 502befa..d28ea93 100644
--- a/distributedlog-service/src/main/resources/findbugsExclude.xml
+++ b/distributedlog-service/src/main/resources/findbugsExclude.xml
@@ -21,6 +21,10 @@
     <Class name="~com\.twitter\.distributedlog\.thrift.*" />
   </Match>
   <Match>
+    <!-- generated code, we can't be held responsible for findbugs in it //-->
+    <Class name="~com\.twitter\.distributedlog\.service\.placement\.thrift.*" />
+  </Match>
+  <Match>
     <!-- it is safe to cast exception here. //-->
     <Class name="com.twitter.distributedlog.service.DistributedLogServiceImpl$Stream$2"
/>
     <Method name="onFailure" />


Mime
View raw message