distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [1/2] incubator-distributedlog git commit: DL-97: Remove unused methods in BKLogHandler
Date Wed, 21 Dec 2016 08:09:58 GMT
Repository: incubator-distributedlog
Updated Branches:
  refs/heads/master 5b55bee23 -> 74a33029c


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
index aa08a24..df336fe 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
@@ -33,15 +33,14 @@ import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
 import com.twitter.distributedlog.util.ConfUtils;
 import com.twitter.util.Future;
 import com.twitter.util.Promise;
-import java.io.IOException;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
@@ -233,7 +232,7 @@ public class StreamManagerImpl implements StreamManager {
     }
 
     @Override
-    public Stream getOrCreateStream(String streamName) throws IOException {
+    public Stream getOrCreateStream(String streamName, boolean start) throws IOException
{
         Stream stream = streams.get(streamName);
         if (null == stream) {
             closeLock.readLock().lock();
@@ -261,7 +260,9 @@ public class StreamManagerImpl implements StreamManager {
                     numCached.getAndIncrement();
                     logger.info("Inserted mapping stream name {} -> stream {}", streamName,
stream);
                     stream.initialize();
-                    stream.start();
+                    if (start) {
+                        stream.start();
+                    }
                 }
             } finally {
                 closeLock.readLock().unlock();
@@ -283,8 +284,10 @@ public class StreamManagerImpl implements StreamManager {
 
     @Override
     public void scheduleRemoval(final Stream stream, long delayMs) {
-        logger.info("Scheduling removal of stream {} from cache after {} sec.",
-            stream.getStreamName(), delayMs);
+        if (delayMs > 0) {
+            logger.info("Scheduling removal of stream {} from cache after {} sec.",
+                    stream.getStreamName(), delayMs);
+        }
         schedule(new Runnable() {
             @Override
             public void run() {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
index 17fae4a..4195ed3 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
@@ -89,7 +89,8 @@ public class TestDistributedLogService extends TestDistributedLogBase {
         dlConf.addConfiguration(conf);
         dlConf.setLockTimeout(0)
                 .setOutputBufferSize(0)
-                .setPeriodicFlushFrequencyMilliSeconds(10);
+                .setPeriodicFlushFrequencyMilliSeconds(10)
+                .setSchedulerShutdownTimeoutMs(100);
         serverConf = newLocalServerConf();
         uri = createDLMURI("/" + testName.getMethodName());
         ensureURICreated(uri);
@@ -171,10 +172,11 @@ public class TestDistributedLogService extends TestDistributedLogBase
{
     public void testAcquireStreams() throws Exception {
         String streamName = testName.getMethodName();
         StreamImpl s0 = createUnstartedStream(service, streamName);
-        s0.suspendAcquiring();
-        DistributedLogServiceImpl service1 = createService(serverConf, dlConf);
+        ServerConfiguration serverConf1 = new ServerConfiguration();
+        serverConf1.addConfiguration(serverConf);
+        serverConf1.setServerPort(9999);
+        DistributedLogServiceImpl service1 = createService(serverConf1, dlConf);
         StreamImpl s1 = createUnstartedStream(service1, streamName);
-        s1.suspendAcquiring();
 
         // create write ops
         WriteOp op0 = createWriteOp(service, streamName, 0L);
@@ -190,7 +192,7 @@ public class TestDistributedLogService extends TestDistributedLogBase
{
                 1, s1.numPendingOps());
 
         // start acquiring s0
-        s0.resumeAcquiring().start();
+        s0.start();
         WriteResponse wr0 = Await.result(op0.result());
         assertEquals("Op 0 should succeed",
                 StatusCode.SUCCESS, wr0.getHeader().getCode());
@@ -201,12 +203,12 @@ public class TestDistributedLogService extends TestDistributedLogBase
{
         assertNull(s0.getLastException());
 
         // start acquiring s1
-        s1.resumeAcquiring().start();
+        s1.start();
         WriteResponse wr1 = Await.result(op1.result());
         assertEquals("Op 1 should fail",
                 StatusCode.FOUND, wr1.getHeader().getCode());
-        assertEquals("Service 1 should be in BACKOFF state",
-                StreamStatus.BACKOFF, s1.getStatus());
+        assertEquals("Service 1 should be in ERROR state",
+                StreamStatus.ERROR, s1.getStatus());
         assertNotNull(s1.getManager());
         assertNull(s1.getWriter());
         assertNotNull(s1.getLastException());
@@ -727,7 +729,7 @@ public class TestDistributedLogService extends TestDistributedLogBase
{
 
         for (Stream s : streamManager.getAcquiredStreams().values()) {
             StreamImpl stream = (StreamImpl) s;
-            stream.setStatus(StreamStatus.FAILED);
+            stream.setStatus(StreamStatus.ERROR);
         }
 
         Future<List<Void>> closeResult = localService.closeStreams();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a7f60b1..52414da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,7 +95,7 @@
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <zookeeper.version>3.5.1-alpha</zookeeper.version>
-    <bookkeeper.version>4.3.5-TWTTR-OSS</bookkeeper.version>
+    <bookkeeper.version>4.3.6-TWTTR-OSS</bookkeeper.version>
     <birdcage.sha>6.34.0</birdcage.sha>
     <scrooge.version>4.6.0</scrooge.version>
     <scrooge-maven-plugin.version>3.17.0</scrooge-maven-plugin.version>


Mime
View raw message