bookkeeper-distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [15/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException
Date Mon, 12 Jun 2017 15:45:22 GMT
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
new file mode 100644
index 0000000..862f05a
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.placement;
+
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.impl.BKNamespaceDriver;
+import org.apache.distributedlog.util.Utils;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.TreeSet;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Transaction;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the PlacementStateManager that saves data to and loads from Zookeeper to
+ * avoid necessitating an additional system for the resource placement.
+ */
+public class ZKPlacementStateManager implements PlacementStateManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(ZKPlacementStateManager.class);
+
+    private static final String SERVER_LOAD_DIR = "/.server-load";
+
+    private final String serverLoadPath;
+    private final ZooKeeperClient zkClient;
+
+    private boolean watching = false;
+
+    public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) {
+        String zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri);
+        zkClient = BKNamespaceDriver.createZKClientBuilder(
+            String.format("ZKPlacementStateManager-%s", zkServers),
+            conf,
+            zkServers,
+            statsLogger.scope("placement_state_manager")).build();
+        serverLoadPath = uri.getPath() + SERVER_LOAD_DIR;
+    }
+
+    private void createServerLoadPathIfNoExists(byte[] data)
+        throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException {
+        try {
+            Utils.zkCreateFullPathOptimistic(
+                zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT);
+        } catch (KeeperException.NodeExistsException nee) {
+            logger.debug("the server load path {} is already created by others", serverLoadPath, nee);
+        }
+    }
+
+    @Override
+    public void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException {
+        logger.info("saving ownership");
+        try {
+            ZooKeeper zk = zkClient.get();
+            // use timestamp as data so watchers will see any changes
+            byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
+
+            if (zk.exists(serverLoadPath, false) == null) { //create path to rootnode if it does not yet exist
+                createServerLoadPathIfNoExists(timestamp);
+            }
+
+            Transaction tx = zk.transaction();
+            List<String> children = zk.getChildren(serverLoadPath, false);
+            HashSet<String> servers = new HashSet<String>(children);
+            tx.setData(serverLoadPath, timestamp, -1); // trigger the watcher that data has been updated
+            for (ServerLoad serverLoad : serverLoads) {
+                String server = serverToZkFormat(serverLoad.getServer());
+                String serverPath = serverPath(server);
+                if (servers.contains(server)) {
+                    servers.remove(server);
+                    tx.setData(serverPath, serverLoad.serialize(), -1);
+                } else {
+                    tx.create(serverPath, serverLoad.serialize(), zkClient.getDefaultACL(), CreateMode.PERSISTENT);
+                }
+            }
+            for (String server : servers) {
+                tx.delete(serverPath(server), -1);
+            }
+            tx.commit();
+        } catch (InterruptedException | IOException | KeeperException e) {
+            throw new StateManagerSaveException(e);
+        }
+    }
+
+    @Override
+    public TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException {
+        TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
+        try {
+            ZooKeeper zk = zkClient.get();
+            List<String> children = zk.getChildren(serverLoadPath, false);
+            for (String server : children) {
+                ownerships.add(ServerLoad.deserialize(zk.getData(serverPath(server), false, new Stat())));
+            }
+            return ownerships;
+        } catch (InterruptedException | IOException | KeeperException e) {
+            throw new StateManagerLoadException(e);
+        }
+    }
+
+    @Override
+    public synchronized void watch(final PlacementCallback callback) {
+        if (watching) {
+            return; // do not double watch
+        }
+        watching = true;
+
+        try {
+            ZooKeeper zk = zkClient.get();
+            try {
+                zk.getData(serverLoadPath, new Watcher() {
+                    @Override
+                    public void process(WatchedEvent watchedEvent) {
+                        try {
+                            callback.callback(loadOwnership());
+                        } catch (StateManagerLoadException e) {
+                            logger.error("Watch of Ownership failed", e);
+                        } finally {
+                            watching = false;
+                            watch(callback);
+                        }
+                    }
+                }, new Stat());
+            } catch (KeeperException.NoNodeException nee) {
+                byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
+                createServerLoadPathIfNoExists(timestamp);
+                watching = false;
+                watch(callback);
+            }
+        } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) {
+            logger.error("Watch of Ownership failed", e);
+            watching = false;
+            watch(callback);
+        }
+    }
+
+    public String serverPath(String server) {
+        return String.format("%s/%s", serverLoadPath, server);
+    }
+
+    protected String serverToZkFormat(String server) {
+        return server.replaceAll("/", "--");
+    }
+
+    protected String zkFormatToServer(String zkFormattedServer) {
+        return zkFormattedServer.replaceAll("--", "/");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/package-info.java
new file mode 100644
index 0000000..ea79251
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Placement Policy to place streams across proxy services.
+ */
+package org.apache.distributedlog.service.placement;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
new file mode 100644
index 0000000..83ac668
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.exceptions.ChecksumFailedException;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
+import org.apache.distributedlog.service.ResponseUtils;
+import org.apache.distributedlog.thrift.service.ResponseHeader;
+import org.apache.distributedlog.util.Sequencer;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import com.twitter.util.Return;
+import com.twitter.util.Try;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+/**
+ * Abstract Stream Operation.
+ */
+public abstract class AbstractStreamOp<Response> implements StreamOp {
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractStreamOp.class);
+
+    protected final String stream;
+    protected final OpStatsLogger opStatsLogger;
+    private final Promise<Response> result = new Promise<Response>();
+    protected final Stopwatch stopwatch = Stopwatch.createUnstarted();
+    protected final Long checksum;
+    protected final Feature checksumDisabledFeature;
+
+    public AbstractStreamOp(String stream,
+                            OpStatsLogger statsLogger,
+                            Long checksum,
+                            Feature checksumDisabledFeature) {
+        this.stream = stream;
+        this.opStatsLogger = statsLogger;
+        // start here in case the operation is failed before executing.
+        stopwatch.reset().start();
+        this.checksum = checksum;
+        this.checksumDisabledFeature = checksumDisabledFeature;
+    }
+
+    @Override
+    public String streamName() {
+        return stream;
+    }
+
+    @Override
+    public Stopwatch stopwatch() {
+        return stopwatch;
+    }
+
+    @Override
+    public void preExecute() throws DLException {
+        if (!checksumDisabledFeature.isAvailable() && null != checksum) {
+            Long serverChecksum = computeChecksum();
+            if (null != serverChecksum && !checksum.equals(serverChecksum)) {
+                throw new ChecksumFailedException();
+            }
+        }
+    }
+
+    @Override
+    public Long computeChecksum() {
+        return null;
+    }
+
+    @Override
+    public Future<Void> execute(AsyncLogWriter writer, Sequencer sequencer, Object txnLock) {
+        stopwatch.reset().start();
+        return executeOp(writer, sequencer, txnLock)
+                .addEventListener(new FutureEventListener<Response>() {
+            @Override
+            public void onSuccess(Response response) {
+                opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                setResponse(response);
+            }
+            @Override
+            public void onFailure(Throwable cause) {
+            }
+        }).voided();
+    }
+
+    /**
+     * Fail with current <i>owner</i> and its reason <i>t</i>.
+     *
+     * @param cause
+     *          failure reason
+     */
+    @Override
+    public void fail(Throwable cause) {
+        if (cause instanceof OwnershipAcquireFailedException) {
+            // Ownership exception is a control exception, not an error, so we don't stat
+            // it with the other errors.
+            OwnershipAcquireFailedException oafe = (OwnershipAcquireFailedException) cause;
+            fail(ResponseUtils.ownerToHeader(oafe.getCurrentOwner()));
+        } else {
+            opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+            fail(ResponseUtils.exceptionToHeader(cause));
+        }
+    }
+
+    protected void setResponse(Response response) {
+      Return<Response> responseTry = new Return(response);
+      boolean isEmpty = result.updateIfEmpty(responseTry);
+      if (!isEmpty) {
+        Option<Try<Response>> resultTry = result.poll();
+        logger.error("Result set multiple times. Value='{}', New='{}'", resultTry, responseTry);
+      }
+    }
+
+    /**
+     * Return the full response, header and body.
+     *
+     * @return A future containing the response or the exception
+     *      encountered by the op if it failed.
+     */
+    public Future<Response> result() {
+        return result;
+    }
+
+    /**
+     * Execute the operation and return its corresponding response.
+     *
+     * @param writer
+     *          writer to execute the operation.
+     * @param sequencer
+     *          sequencer used for generating transaction id for stream operations
+     * @param txnLock
+     *          transaction lock to guarantee ordering of transaction id
+     * @return future representing the operation.
+     */
+    protected abstract Future<Response> executeOp(AsyncLogWriter writer,
+                                                  Sequencer sequencer,
+                                                  Object txnLock);
+
+    // fail the result with the given response header
+    protected abstract void fail(ResponseHeader header);
+
+    public static OpStatsLogger requestStat(StatsLogger statsLogger, String opName) {
+        return requestLogger(statsLogger).getOpStatsLogger(opName);
+    }
+
+    public static StatsLogger requestLogger(StatsLogger statsLogger) {
+        return statsLogger.scope("request");
+    }
+
+    public static StatsLogger requestScope(StatsLogger statsLogger, String scope) {
+        return requestLogger(statsLogger).scope(scope);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java
new file mode 100644
index 0000000..8befffc
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import org.apache.distributedlog.protocol.util.ProtocolUtils;
+import org.apache.distributedlog.service.ResponseUtils;
+import org.apache.distributedlog.thrift.service.ResponseHeader;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import com.twitter.util.Future;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * Abstract Write Operation.
+ */
+public abstract class AbstractWriteOp extends AbstractStreamOp<WriteResponse> {
+
+    protected AbstractWriteOp(String stream,
+                              OpStatsLogger statsLogger,
+                              Long checksum,
+                              Feature checksumDisabledFeature) {
+        super(stream, statsLogger, checksum, checksumDisabledFeature);
+    }
+
+    @Override
+    protected void fail(ResponseHeader header) {
+        setResponse(ResponseUtils.write(header));
+    }
+
+    @Override
+    public Long computeChecksum() {
+        return ProtocolUtils.streamOpCRC32(stream);
+    }
+
+    @Override
+    public Future<ResponseHeader> responseHeader() {
+        return result().map(new AbstractFunction1<WriteResponse, ResponseHeader>() {
+            @Override
+            public ResponseHeader apply(WriteResponse response) {
+                return response.getHeader();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
new file mode 100644
index 0000000..6c98468
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.exceptions.AlreadyClosedException;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.LockingException;
+import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
+import org.apache.distributedlog.exceptions.RequestDeniedException;
+import org.apache.distributedlog.service.ResponseUtils;
+import org.apache.distributedlog.service.streamset.Partition;
+import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
+import org.apache.distributedlog.thrift.service.BulkWriteResponse;
+import org.apache.distributedlog.thrift.service.ResponseHeader;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import org.apache.distributedlog.util.Sequencer;
+import com.twitter.util.ConstFuture;
+import com.twitter.util.Future;
+import com.twitter.util.Future$;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Try;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * Bulk Write Operation.
+ */
+public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements WriteOpWithPayload {
+    private final List<ByteBuffer> buffers;
+    private final long payloadSize;
+
+    // Stats
+    private final Counter deniedBulkWriteCounter;
+    private final Counter successRecordCounter;
+    private final Counter failureRecordCounter;
+    private final Counter redirectRecordCounter;
+    private final OpStatsLogger latencyStat;
+    private final Counter bytes;
+    private final Counter bulkWriteBytes;
+
+    private final AccessControlManager accessControlManager;
+
+    // We need to pass these through to preserve ownership change behavior in
+    // client/server. Only include failures which are guaranteed to have failed
+    // all subsequent writes.
+    private boolean isDefiniteFailure(Try<DLSN> result) {
+        boolean def = false;
+        try {
+            result.get();
+        } catch (Exception ex) {
+            if (ex instanceof OwnershipAcquireFailedException
+                || ex instanceof AlreadyClosedException
+                || ex instanceof LockingException) {
+                def = true;
+            }
+        }
+        return def;
+    }
+
+    public BulkWriteOp(String stream,
+                       List<ByteBuffer> buffers,
+                       StatsLogger statsLogger,
+                       StatsLogger perStreamStatsLogger,
+                       StreamPartitionConverter streamPartitionConverter,
+                       Long checksum,
+                       Feature checksumDisabledFeature,
+                       AccessControlManager accessControlManager) {
+        super(stream, requestStat(statsLogger, "bulkWrite"), checksum, checksumDisabledFeature);
+        this.buffers = buffers;
+        long total = 0;
+        // We do this here because the bytebuffers are mutable.
+        for (ByteBuffer bb : buffers) {
+          total += bb.remaining();
+        }
+        this.payloadSize = total;
+
+        final Partition partition = streamPartitionConverter.convert(stream);
+        // Write record stats
+        StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
+        this.deniedBulkWriteCounter = streamOpStats.requestDeniedCounter("bulkWrite");
+        this.successRecordCounter = streamOpStats.recordsCounter("success");
+        this.failureRecordCounter = streamOpStats.recordsCounter("failure");
+        this.redirectRecordCounter = streamOpStats.recordsCounter("redirect");
+        this.bulkWriteBytes = streamOpStats.scopedRequestCounter("bulkWrite", "bytes");
+        this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "bulkWrite");
+        this.bytes = streamOpStats.streamRequestCounter(partition, "bulkWrite", "bytes");
+
+        this.accessControlManager = accessControlManager;
+
+        final long size = getPayloadSize();
+        result().addEventListener(new FutureEventListener<BulkWriteResponse>() {
+            @Override
+            public void onSuccess(BulkWriteResponse response) {
+                if (response.getHeader().getCode() == StatusCode.SUCCESS) {
+                    latencyStat.registerSuccessfulEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+                    bytes.add(size);
+                    bulkWriteBytes.add(size);
+                } else {
+                    latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+                }
+            }
+            @Override
+            public void onFailure(Throwable cause) {
+                latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS));
+            }
+        });
+    }
+
+    @Override
+    public void preExecute() throws DLException {
+        if (!accessControlManager.allowWrite(stream)) {
+            deniedBulkWriteCounter.inc();
+            throw new RequestDeniedException(stream, "bulkWrite");
+        }
+        super.preExecute();
+    }
+
+    @Override
+    public long getPayloadSize() {
+      return payloadSize;
+    }
+
+    @Override
+    protected Future<BulkWriteResponse> executeOp(AsyncLogWriter writer,
+                                                  Sequencer sequencer,
+                                                  Object txnLock) {
+        // Need to convert input buffers to LogRecords.
+        List<LogRecord> records;
+        Future<List<Future<DLSN>>> futureList;
+        synchronized (txnLock) {
+            records = asRecordList(buffers, sequencer);
+            futureList = writer.writeBulk(records);
+        }
+
+        // Collect into a list of tries to make it easier to extract exception or DLSN.
+        Future<List<Try<DLSN>>> writes = asTryList(futureList);
+
+        Future<BulkWriteResponse> response = writes.flatMap(
+            new AbstractFunction1<List<Try<DLSN>>, Future<BulkWriteResponse>>() {
+                @Override
+                public Future<BulkWriteResponse> apply(List<Try<DLSN>> results) {
+
+                    // Considered a success at batch level even if no individual writes succeeed.
+                    // The reason is that its impossible to make an appropriate decision re retries without
+                    // individual buffer failure reasons.
+                    List<WriteResponse> writeResponses = new ArrayList<WriteResponse>(results.size());
+                    BulkWriteResponse bulkWriteResponse =
+                        ResponseUtils.bulkWriteSuccess().setWriteResponses(writeResponses);
+
+                    // Promote the first result to an op-level failure if we're sure all other writes have
+                    // failed.
+                    if (results.size() > 0) {
+                        Try<DLSN> firstResult = results.get(0);
+                        if (isDefiniteFailure(firstResult)) {
+                            return new ConstFuture(firstResult);
+                        }
+                    }
+
+                    // Translate all futures to write responses.
+                    Iterator<Try<DLSN>> iterator = results.iterator();
+                    while (iterator.hasNext()) {
+                        Try<DLSN> completedFuture = iterator.next();
+                        try {
+                            DLSN dlsn = completedFuture.get();
+                            WriteResponse writeResponse = ResponseUtils.writeSuccess().setDlsn(dlsn.serialize());
+                            writeResponses.add(writeResponse);
+                            successRecordCounter.inc();
+                        } catch (Exception ioe) {
+                            WriteResponse writeResponse = ResponseUtils.write(ResponseUtils.exceptionToHeader(ioe));
+                            writeResponses.add(writeResponse);
+                            if (StatusCode.FOUND == writeResponse.getHeader().getCode()) {
+                                redirectRecordCounter.inc();
+                            } else {
+                                failureRecordCounter.inc();
+                            }
+                        }
+                    }
+
+                    return Future.value(bulkWriteResponse);
+                }
+            }
+        );
+
+        return response;
+    }
+
+    private List<LogRecord> asRecordList(List<ByteBuffer> buffers, Sequencer sequencer) {
+        List<LogRecord> records = new ArrayList<LogRecord>(buffers.size());
+        for (ByteBuffer buffer : buffers) {
+            byte[] payload = new byte[buffer.remaining()];
+            buffer.get(payload);
+            records.add(new LogRecord(sequencer.nextId(), payload));
+        }
+        return records;
+    }
+
+    private Future<List<Try<DLSN>>> asTryList(Future<List<Future<DLSN>>> futureList) {
+        return futureList.flatMap(new AbstractFunction1<List<Future<DLSN>>, Future<List<Try<DLSN>>>>() {
+            @Override
+            public Future<List<Try<DLSN>>> apply(List<Future<DLSN>> results) {
+                return Future$.MODULE$.collectToTry(results);
+            }
+        });
+    }
+
+    @Override
+    protected void fail(ResponseHeader header) {
+        if (StatusCode.FOUND == header.getCode()) {
+            redirectRecordCounter.add(buffers.size());
+        } else {
+            failureRecordCounter.add(buffers.size());
+        }
+        setResponse(ResponseUtils.bulkWrite(header));
+    }
+
+    @Override
+    public Future<ResponseHeader> responseHeader() {
+        return result().map(new AbstractFunction1<BulkWriteResponse, ResponseHeader>() {
+            @Override
+            public ResponseHeader apply(BulkWriteResponse response) {
+                return response.getHeader();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java
new file mode 100644
index 0000000..3ecb46f
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.RequestDeniedException;
+import org.apache.distributedlog.service.ResponseUtils;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import org.apache.distributedlog.util.Sequencer;
+import com.twitter.util.Future;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * Operation to delete a log stream.
+ */
+public class DeleteOp extends AbstractWriteOp {
+    private final StreamManager streamManager;
+    private final Counter deniedDeleteCounter;
+    private final AccessControlManager accessControlManager;
+
+    public DeleteOp(String stream,
+                    StatsLogger statsLogger,
+                    StatsLogger perStreamStatsLogger,
+                    StreamManager streamManager,
+                    Long checksum,
+                    Feature checksumEnabledFeature,
+                    AccessControlManager accessControlManager) {
+        super(stream, requestStat(statsLogger, "delete"), checksum, checksumEnabledFeature);
+        StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
+        this.deniedDeleteCounter = streamOpStats.requestDeniedCounter("delete");
+        this.accessControlManager = accessControlManager;
+        this.streamManager = streamManager;
+    }
+
+    @Override
+    protected Future<WriteResponse> executeOp(AsyncLogWriter writer,
+                                              Sequencer sequencer,
+                                              Object txnLock) {
+        Future<Void> result = streamManager.deleteAndRemoveAsync(streamName());
+        return result.map(new AbstractFunction1<Void, WriteResponse>() {
+            @Override
+            public WriteResponse apply(Void value) {
+                return ResponseUtils.writeSuccess();
+            }
+        });
+    }
+
+    @Override
+    public void preExecute() throws DLException {
+        if (!accessControlManager.allowTruncate(stream)) {
+            deniedDeleteCounter.inc();
+            throw new RequestDeniedException(stream, "delete");
+        }
+        super.preExecute();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java
new file mode 100644
index 0000000..0ffa619
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.RequestDeniedException;
+import org.apache.distributedlog.service.ResponseUtils;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import org.apache.distributedlog.util.Sequencer;
+import com.twitter.util.Future;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * Heartbeat Operation.
+ */
+public class HeartbeatOp extends AbstractWriteOp {
+
+    static final byte[] HEARTBEAT_DATA = "heartbeat".getBytes(UTF_8);
+
+    private final AccessControlManager accessControlManager;
+    private final Counter deniedHeartbeatCounter;
+    private final byte dlsnVersion;
+
+    private boolean writeControlRecord = false;
+
+    public HeartbeatOp(String stream,
+                       StatsLogger statsLogger,
+                       StatsLogger perStreamStatsLogger,
+                       byte dlsnVersion,
+                       Long checksum,
+                       Feature checksumDisabledFeature,
+                       AccessControlManager accessControlManager) {
+        super(stream, requestStat(statsLogger, "heartbeat"), checksum, checksumDisabledFeature);
+        StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
+        this.deniedHeartbeatCounter = streamOpStats.requestDeniedCounter("heartbeat");
+        this.dlsnVersion = dlsnVersion;
+        this.accessControlManager = accessControlManager;
+    }
+
+    public HeartbeatOp setWriteControlRecord(boolean writeControlRecord) {
+        this.writeControlRecord = writeControlRecord;
+        return this;
+    }
+
+    @Override
+    protected Future<WriteResponse> executeOp(AsyncLogWriter writer,
+                                              Sequencer sequencer,
+                                              Object txnLock) {
+        // write a control record if heartbeat is the first request of the recovered log segment.
+        if (writeControlRecord) {
+            long txnId;
+            Future<DLSN> writeResult;
+            synchronized (txnLock) {
+                txnId = sequencer.nextId();
+                LogRecord hbRecord = new LogRecord(txnId, HEARTBEAT_DATA);
+                hbRecord.setControl();
+                writeResult = writer.write(hbRecord);
+            }
+            return writeResult.map(new AbstractFunction1<DLSN, WriteResponse>() {
+                @Override
+                public WriteResponse apply(DLSN value) {
+                    return ResponseUtils.writeSuccess().setDlsn(value.serialize(dlsnVersion));
+                }
+            });
+        } else {
+            return Future.value(ResponseUtils.writeSuccess());
+        }
+    }
+
+    @Override
+    public void preExecute() throws DLException {
+        if (!accessControlManager.allowAcquire(stream)) {
+            deniedHeartbeatCounter.inc();
+            throw new RequestDeniedException(stream, "heartbeat");
+        }
+        super.preExecute();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java
new file mode 100644
index 0000000..6ec8642
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.RequestDeniedException;
+import org.apache.distributedlog.service.ResponseUtils;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import org.apache.distributedlog.util.Sequencer;
+import com.twitter.util.Future;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * Operation to release ownership of a log stream.
+ */
+public class ReleaseOp extends AbstractWriteOp {
+    private final StreamManager streamManager;
+    private final Counter deniedReleaseCounter;
+    private final AccessControlManager accessControlManager;
+
+    public ReleaseOp(String stream,
+                     StatsLogger statsLogger,
+                     StatsLogger perStreamStatsLogger,
+                     StreamManager streamManager,
+                     Long checksum,
+                     Feature checksumDisabledFeature,
+                     AccessControlManager accessControlManager) {
+        super(stream, requestStat(statsLogger, "release"), checksum, checksumDisabledFeature);
+        StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
+        this.deniedReleaseCounter = streamOpStats.requestDeniedCounter("release");
+        this.accessControlManager = accessControlManager;
+        this.streamManager = streamManager;
+    }
+
+    @Override
+    protected Future<WriteResponse> executeOp(AsyncLogWriter writer,
+                                              Sequencer sequencer,
+                                              Object txnLock) {
+        Future<Void> result = streamManager.closeAndRemoveAsync(streamName());
+        return result.map(new AbstractFunction1<Void, WriteResponse>() {
+            @Override
+            public WriteResponse apply(Void value) {
+                return ResponseUtils.writeSuccess();
+            }
+        });
+    }
+
+    @Override
+    public void preExecute() throws DLException {
+        if (!accessControlManager.allowRelease(stream)) {
+            deniedReleaseCounter.inc();
+            throw new RequestDeniedException(stream, "release");
+        }
+        super.preExecute();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/Stream.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/Stream.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/Stream.java
new file mode 100644
index 0000000..3517a63
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/Stream.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.service.streamset.Partition;
+import com.twitter.util.Future;
+import java.io.IOException;
+
+/**
+ * Stream is the per stream request handler in the DL service layer.
+ *
+ * <p>The collection of Streams in the proxy are managed by StreamManager.
+ */
+public interface Stream {
+
+    /**
+     * Get the stream configuration for this stream.
+     *
+     * @return stream configuration
+     */
+    DynamicDistributedLogConfiguration getStreamConfiguration();
+
+    /**
+     * Get the stream's last recorded current owner (may be out of date). Used
+     * as a hint for the client.
+     * @return last known owner for the stream
+     */
+    String getOwner();
+
+    /**
+     * Get the stream name.
+     * @return stream name
+     */
+    String getStreamName();
+
+    /**
+     * Get the represented partition name.
+     *
+     * @return represented partition name.
+     */
+    Partition getPartition();
+
+    /**
+     * Expensive initialization code run after stream has been allocated in
+     * StreamManager.
+     *
+     * @throws IOException when encountered exception on initialization
+     */
+    void initialize() throws IOException;
+
+    /**
+     * Another initialize method (actually Thread.start). Should probably be
+     * moved to initialize().
+     */
+    void start();
+
+    /**
+     * Asynchronous close method.
+     * @param reason for closing
+     * @return future satisfied once close complete
+     */
+    Future<Void> requestClose(String reason);
+
+    /**
+     * Delete the stream from DL backend.
+     *
+     * @throws IOException when encountered exception on deleting the stream.
+     */
+    void delete() throws IOException;
+
+    /**
+     * Execute the stream operation against this stream.
+     *
+     * @param op operation to execute
+     */
+    void submit(StreamOp op);
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java
new file mode 100644
index 0000000..845ef21
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+
+/**
+ * Factory to create a stream with provided stream configuration {@code streamConf}.
+ */
+public interface StreamFactory {
+
+    /**
+     * Create a stream object.
+     *
+     * @param name stream name
+     * @param streamConf stream configuration
+     * @param streamManager manager of streams
+     * @return stream object
+     */
+    Stream create(String name,
+                  DynamicDistributedLogConfiguration streamConf,
+                  StreamManager streamManager);
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java
new file mode 100644
index 0000000..2b90d55
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.stream;
+
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.service.FatalErrorHandler;
+import org.apache.distributedlog.service.config.ServerConfiguration;
+import org.apache.distributedlog.service.config.StreamConfigProvider;
+import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
+import org.apache.distributedlog.util.OrderedScheduler;
+import com.twitter.util.Timer;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.jboss.netty.util.HashedWheelTimer;
+
+/**
+ * The implementation of {@link StreamFactory}.
+ */
+public class StreamFactoryImpl implements StreamFactory {
+    private final String clientId;
+    private final StreamOpStats streamOpStats;
+    private final ServerConfiguration serverConfig;
+    private final DistributedLogConfiguration dlConfig;
+    private final FeatureProvider featureProvider;
+    private final StreamConfigProvider streamConfigProvider;
+    private final StreamPartitionConverter streamPartitionConverter;
+    private final DistributedLogNamespace dlNamespace;
+    private final OrderedScheduler scheduler;
+    private final FatalErrorHandler fatalErrorHandler;
+    private final HashedWheelTimer requestTimer;
+    private final Timer futureTimer;
+
+    public StreamFactoryImpl(String clientId,
+        StreamOpStats streamOpStats,
+        ServerConfiguration serverConfig,
+        DistributedLogConfiguration dlConfig,
+        FeatureProvider featureProvider,
+        StreamConfigProvider streamConfigProvider,
+        StreamPartitionConverter streamPartitionConverter,
+        DistributedLogNamespace dlNamespace,
+        OrderedScheduler scheduler,
+        FatalErrorHandler fatalErrorHandler,
+        HashedWheelTimer requestTimer) {
+
+        this.clientId = clientId;
+        this.streamOpStats = streamOpStats;
+        this.serverConfig = serverConfig;
+        this.dlConfig = dlConfig;
+        this.featureProvider = featureProvider;
+        this.streamConfigProvider = streamConfigProvider;
+        this.streamPartitionConverter = streamPartitionConverter;
+        this.dlNamespace = dlNamespace;
+        this.scheduler = scheduler;
+        this.fatalErrorHandler = fatalErrorHandler;
+        this.requestTimer = requestTimer;
+        this.futureTimer = new com.twitter.finagle.util.HashedWheelTimer(requestTimer);
+    }
+
+    @Override
+    public Stream create(String name,
+                         DynamicDistributedLogConfiguration streamConf,
+                         StreamManager streamManager) {
+        return new StreamImpl(name,
+            streamPartitionConverter.convert(name),
+            clientId,
+            streamManager,
+            streamOpStats,
+            serverConfig,
+            dlConfig,
+            streamConf,
+            featureProvider,
+            streamConfigProvider,
+            dlNamespace,
+            scheduler,
+            fatalErrorHandler,
+            requestTimer,
+            futureTimer);
+    }
+}


Mime
View raw message