bookkeeper-distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiazhai <...@git.apache.org>
Subject [GitHub] incubator-distributedlog pull request #133: DL-124: Use Java8 Future rather ...
Date Sat, 03 Jun 2017 11:01:49 GMT
Github user jiazhai commented on a diff in the pull request:

    https://github.com/apache/incubator-distributedlog/pull/133#discussion_r119984594
  
    --- Diff: distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
---
    @@ -525,75 +495,63 @@ public BKSyncLogWriter startLogSegmentNonPartitioned() throws IOException
{
          */
         @Override
         public BKAsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException {
    -        return (BKAsyncLogWriter) FutureUtils.result(openAsyncLogWriter());
    +        return (BKAsyncLogWriter) Utils.ioResult(openAsyncLogWriter());
         }
     
         @Override
    -    public Future<AsyncLogWriter> openAsyncLogWriter() {
    +    public CompletableFuture<AsyncLogWriter> openAsyncLogWriter() {
             try {
                 checkClosedOrInError("startLogSegmentNonPartitioned");
             } catch (AlreadyClosedException e) {
    -            return Future.exception(e);
    +            return FutureUtils.exception(e);
             }
     
    -        Future<BKLogWriteHandler> createWriteHandleFuture;
    +        CompletableFuture<BKLogWriteHandler> createWriteHandleFuture;
             synchronized (this) {
                 // 1. create the locked write handler
                 createWriteHandleFuture = asyncCreateWriteHandler(true);
             }
    -        return createWriteHandleFuture.flatMap(new AbstractFunction1<BKLogWriteHandler,
Future<AsyncLogWriter>>() {
    -            @Override
    -            public Future<AsyncLogWriter> apply(final BKLogWriteHandler writeHandler)
{
    -                final BKAsyncLogWriter writer;
    -                synchronized (BKDistributedLogManager.this) {
    -                    // 2. create the writer with the handler
    -                    writer = new BKAsyncLogWriter(
    -                            conf,
    -                            dynConf,
    -                            BKDistributedLogManager.this,
    -                            writeHandler,
    -                            featureProvider,
    -                            statsLogger);
    -                }
    -                // 3. recover the incomplete log segments
    -                return writeHandler.recoverIncompleteLogSegments()
    -                        .map(new AbstractFunction1<Long, AsyncLogWriter>() {
    -                            @Override
    -                            public AsyncLogWriter apply(Long lastTxId) {
    -                                // 4. update last tx id if successfully recovered
    -                                writer.setLastTxId(lastTxId);
    -                                return writer;
    -                            }
    -                        }).onFailure(new AbstractFunction1<Throwable, BoxedUnit>()
{
    -                            @Override
    -                            public BoxedUnit apply(Throwable cause) {
    -                                // 5. close the writer if recovery failed
    -                                writer.asyncAbort();
    -                                return BoxedUnit.UNIT;
    -                            }
    -                        });
    +        return createWriteHandleFuture.thenCompose(writeHandler -> {
    +            final BKAsyncLogWriter writer;
    +            synchronized (BKDistributedLogManager.this) {
    +                // 2. create the writer with the handler
    +                writer = new BKAsyncLogWriter(
    +                        conf,
    +                        dynConf,
    +                        BKDistributedLogManager.this,
    +                        writeHandler,
    +                        featureProvider,
    +                        statsLogger);
                 }
    +            // 3. recover the incomplete log segments
    +            return writeHandler.recoverIncompleteLogSegments()
    +                .thenApply(lastTxId -> {
    +                    // 4. update last tx id if successfully recovered
    +                    writer.setLastTxId(lastTxId);
    +                    return (AsyncLogWriter) writer;
    +                })
    +                .whenComplete((lastTxId, cause) -> {
    +                    if (null != cause) {
    +                        // 5. close the writer if recovery failed
    +                        writer.asyncAbort();
    +                    }
    +                });
             });
         }
     
         @Override
    -    public Future<DLSN> getDLSNNotLessThanTxId(final long fromTxnId) {
    -        return getLogSegmentsAsync().flatMap(new AbstractFunction1<List<LogSegmentMetadata>,
Future<DLSN>>() {
    -            @Override
    -            public Future<DLSN> apply(List<LogSegmentMetadata> segments)
{
    -                return getDLSNNotLessThanTxId(fromTxnId, segments);
    -            }
    -        });
    +    public CompletableFuture<DLSN> getDLSNNotLessThanTxId(final long fromTxnId)
{
    +        return getLogSegmentsAsync().thenCompose(segments -> getDLSNNotLessThanTxId(fromTxnId,
segments));
         }
     
    -    private Future<DLSN> getDLSNNotLessThanTxId(long fromTxnId,
    +    private CompletableFuture<DLSN> getDLSNNotLessThanTxId(long fromTxnId,
                                                     final List<LogSegmentMetadata>
segments) {
    --- End diff --
    
    Please also do the code alignment change in this file, also at line 565, 707, 838.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message