ACCUMULO-378 Lower the batchwriter "batch" size, and make it configurable.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3243d2ff
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3243d2ff
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3243d2ff
Branch: refs/heads/ACCUMULO-378
Commit: 3243d2ff9209246e7b03453460dfd4a3f231b190
Parents: da0a228
Author: Josh Elser <elserj@apache.org>
Authored: Wed May 21 16:50:17 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Wed May 21 16:50:17 2014 -0400
----------------------------------------------------------------------
.../java/org/apache/accumulo/core/conf/Property.java | 2 ++
.../replication/BatchWriterReplicationReplayer.java | 12 ++++++++++--
2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3243d2ff/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index b1ee499..f239756 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -281,6 +281,8 @@ public enum Property {
@Experimental
TSERV_REPLICATION_DEFAULT_HANDLER("tserver.replication.default.replayer", "org.apache.accumulo.tserver.replication.BatchWriterReplicationReplayer",
PropertyType.CLASSNAME, "Default AccumuloReplicationReplayer implementation"),
+ @Experimental
+ TSERV_REPLICATION_BW_REPLAYER_MEMORY("tserver.replication.batchwriter.replayer.memory",
"25M", PropertyType.MEMORY, "Memory to provide to batchwriter to replay mutations for replication"),
// properties that are specific to logger server behavior
LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this category affect
the behavior of the write-ahead logger servers"),
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3243d2ff/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
index 45c1409..ea50199 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
@@ -26,11 +26,15 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.replication.AccumuloReplicationReplayer;
import org.apache.accumulo.core.replication.RemoteReplicationErrorCode;
import org.apache.accumulo.core.replication.thrift.KeyValues;
import org.apache.accumulo.core.replication.thrift.RemoteReplicationException;
import org.apache.accumulo.core.replication.thrift.WalEdits;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
import org.slf4j.Logger;
@@ -45,8 +49,10 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay
@Override
public long replicateLog(Connector conn, String tableName, WalEdits data) throws RemoteReplicationException
{
+ final AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance());
final LogFileKey key = new LogFileKey();
final LogFileValue value = new LogFileValue();
+ final long memoryInBytes = conf.getMemoryInBytes(Property.TSERV_REPLICATION_BW_REPLAYER_MEMORY);
BatchWriter bw = null;
long mutationsApplied = 0l;
@@ -63,14 +69,16 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay
// Create the batchScanner if we don't already have one.
if (null == bw) {
+ BatchWriterConfig bwConfig = new BatchWriterConfig();
+ bwConfig.setMaxMemory(memoryInBytes);
try {
- bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ bw = conn.createBatchWriter(tableName, bwConfig);
} catch (TableNotFoundException e) {
throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST.ordinal(),
"Table " + tableName + " does not exist");
}
}
- log.info("Applying {} updates to table {} as part of batch", value.mutations.size(),
tableName);
+ log.info("Applying {} mutations to table {} as part of batch", value.mutations.size(),
tableName);
try {
bw.addMutations(value.mutations);
|