asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Murtadha Hubail (Code Review)" <>
Subject Change in asterixdb[master]: Introducing Data Replication To AsterixDB
Date Wed, 23 Sep 2015 01:33:22 GMT
Murtadha Hubail has posted comments on this change.

Change subject: Introducing Data Replication To AsterixDB

Patch Set 6:



Thanks for the thorough review so far. I have addressed your comments and answered your questions.
I also fixed the license header on all the files. Please let me know if anything isn't clear.
I will hold on uploading the new patch until I get the rest of your comments.
File asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/

Line 162:             //Note: this is a hack since each node right now maintains its own copy
of the cluster configuration.
> Who provides the original configuration? When is the copy created? and Wher
Right now each NC gets a copy of the cluster configuration file during the binary distribution.
The file is static and never modified locally.

Line 187:                 recoveryMgr.checkpoint(true, NON_SHARP_CHECKPOINT_TARGET_LSN);
> It will be simpler if we can do checkpoint right after LifeCycleComponentMa
Done. Checkpoint in both cases now happen after LifeCycleComponentManager().stopAll().
File asterix-common/src/main/java/org/apache/asterix/common/config/

Line 14:  */
> This license should be changed.

Line 173:     public Set<String> getNodeReplicas(String nodeId) {
> It's better to change the method name as something like getNodeReplicaIds()
Did the renaming. This method was changed to just add the node itself and the result of getRemoteReplicasIds().

Line 197:             for (int i = nodeIndex - 1; i >= 0; i--) {
> Why is the direction of iteration different from the for clause in  getRemo
This code doesn't exist anymore.

Line 208:                 for (int i = cluster.getNode().size() - 1; i >= 0; i--) {
> same comment for this
This code doesn't exist anymore.

Line 221:     public Set<String> getNodeDataReplicas(String nodeId) {
> Please help me understand the difference between this method and the method
In one of my code iterations getNodeReplicas() excluded the node itself and this one included
it. Right now, they are the same. I removed this method and refactored getNodeReplicas to
just add the node itself and the result of getRemoteReplicasIds().
File asterix-common/src/main/java/org/apache/asterix/common/replication/

Line 22: 
> Please explain the difference between the LSMIndexReplicationJob and Asteri
LSMIndexReplicationJob is used for LSM Components only in Hyracks level. AsterixReplicationJob
is used for everything else. Currently it is used to transfer indexes metadata files.
File asterix-common/src/main/java/org/apache/asterix/common/replication/

Line 20: 
> What's the purpose of having a separate Interface without having any added 
File asterix-common/src/main/java/org/apache/asterix/common/replication/

Line 14:  */
> license should be changed.

Line 59:         String replicaIPAddreess = node.getClusterIp();
> typo: replicaIPAddreess --> replicaIPAddress

Line 73:         output.writeUTF(node.getId());
> why UTF?
why not? what do you have against UTF? :)
File asterix-common/src/main/java/org/apache/asterix/common/replication/

Line 24:     public enum ReplicaEventType {
> Let's explain the meansing of each event and when each event can happen.
SPLIT: remote replica failed.

MERGE: remote replica is rejoining the cluster.

SHUTDOWN: remote replica is shutting down normally.
File asterix-common/src/main/java/org/apache/asterix/common/transactions/

Line 40:  * NodeIdLength(4)
> NodeIdLength is first written and then NodeId is written.

Line 133:             buffer.put(nodeId.getBytes(java.nio.charset.StandardCharsets.UTF_8));
> why UTF?
why not? I might be in a mood to name my cluster nodes in Arabic :)

Line 169:     public void writeLogRecord(ByteBuffer buffer, long appendLSN) {
> In order to avoid duplicated code, we should create an private method which
Done. Refactored as writeLogRecordCommonFields(buffer) and readLogHeader(buffer) and readLogBody((buffer,
boolean allocateTupleBuffer). This is because the logic to allocateTupleBuffer or not comes
from the log header.

Line 204: 
> From here, please reflect the newly added fields to the LogRecordFormat at 
These fields are not written to the log record. They are just put in the copy to be sent to
the remote replica. Only flush has dataset id which is not new and reflected in the size comments.
The reader of this class will be confused with or without the comments :)

Line 347:     public void formJobTerminateLogRecord(ITransactionContext txnCtx, boolean isCommit)
> formXXXLogRecord() methods can be refactored to avoid duplicated code.

Line 437:         logSize += nodeIdLength;
> nodeIdLength can be included in the existing constants such as XXX_SIZE ins
This is the variable id string length, not the integer length. The integer length is already
added to the constants.

Line 463:     public int serialize(ByteBuffer buffer) {
> Again, this method should be refactored.

Line 498:     public void deserialize(ByteBuffer buffer) {
> Again, this should be refactored.
Done. Refactored as deserialize(ByteBuffer buffer, boolean remoteRecoveryLog, String localNodeId).
The new parameters to avoid allocating tuple buffer if there is no need for it.

Line 539:                         tupleBuffer = ByteBuffer.allocate(newValueSize);
> You don't want to allocate buffer for every update log record. It's better 
Done. This buffer should never be reused since its allocated only for remote recovery logs
that belong to the local node. I changed it to be local variable.

Line 652:     public int getSerializedLogSize() {
This was not considering the FLUSH log extra fields. I fixed it as well.
File asterix-replication/src/main/java/org/apache/asterix/replication/functions/

Line 42:     public final static int REPLICATION_FUNCATION_SIZE = INTEGER_SIZE;

Line 45:     public enum ReplicationFunctions {
> It's better to change the enum class name into something like  ReplicationF
Renamed to ReplicationRequestType. Added comments for roles. I changed OK to ACK to better
describe its function.

REPLICATE_LOG: txn log replication.

REPLICATE_FILE: replicate a file(s).

DELETE_FILE: delete a file(s).

GET_REPLICA_FILES: used during remote recovery to request lost LSM Components.

GET_REPLICA_LOGS: used during remote recovery to request lost txn logs.

GET_REPLICA_MAX_LSN: used during remote recovery to initialize the log manager LSN.

GET_REPLICA_MIN_LSN: used during remote recovery to specify the low water mark per replica.

UPDATE_REPLICA: used to update replica info such as IP Address change.

GOODBYE: used to notify replicas that the replication request has been completed.

REPLICA_EVENT: used to notify replicas about a remote replica split/merge.

LSM_COMPONENT_PROPERTIES: used to send the properties of an LSM Component before its physical
files are sent.

ACK: used to notify the requesting replica that the request has been completed successfully.

FLUSH_INDEX: request remote replica to flush an LSM component.

Line 67:             dataBuffer = ByteBuffer.allocate(requestSize);
> This doesn't work, right? The newly allocated buffer is not visible to the 
That's right and a great catch. I used to do this locally in the methods. I missed this when
I factored out the code here. I changed all of them to return the buffer.

Line 74:     public static void constructLSMComponentPropertiesRequest(LSMComponentProperties
lsmCompProp, ByteBuffer buffer)
> All constuctXXXRequest() methods write a given object to a destination buff

Line 77:         DataOutputStream oos = new DataOutputStream(outputStream);
> Better be avoiding creating these two object in every call unless this call
It is one call per component. The two memory writes could be avoided by using the byte array
of the outputStream but the socket calls will need to change to be like this:

which would eliminate the efficiency of sending bytebuffers. I'm planning to do profiling
after the code is merged and other optimizations. This could be a case that I should evaluate.

Line 83:             buffer = ByteBuffer.allocate(requestSize);
> This doesn't work, right? The newly allocated buffer is not visible to the 

Line 93:     public static ReplicationFunctions getRequestFunction(SocketChannel socketChannel,
ByteBuffer byteBuffer)
> better be getRequestType() or getReplicationRequestType()
Done. Changed to getRequestType()

Line 109:         ByteBuffer bb = ByteBuffer.allocate(REPLICATION_FUNCATION_SIZE);
> Why this bb and the bb in getOKBuffer() methods are allocated every time?
Goodbye buffer is sent at the end of every LSM component request to indicate the end of the
request. It cannot be a single reusable buffer since multiple threads might use it unless
it is synchronized. I don't think allocating 4 bytes is going to be a problem for that frequency.
However, I created a single instance of it in ReplicationManager and ReplicationChannel and
reused it with synchronization. Same for OKBuffer.

Line 139:         requestBuffer.putInt(serializedLog.length);
> serializedLog.length means the allocated byte array size, not the limit pos
I changed the clone to the following which would eliminate this issue:

byte[] serializedLog = new byte[logRecord.getSerializedLog().remaining()];

logRecord.getSerializedLog().get(serializedLog, 0, serializedLog.length);


I could've also used logRecord.getSerializedLogSize() but this one is more dynamic in case
of changes to the log record fields.

Line 151: 
> same comments as constructLSMComponentPropertiesRequest().
File asterix-replication/src/main/java/org/apache/asterix/replication/storage/

Line 57:     }
> This can be removed.
it is used in the create method below.
File asterix-replication/src/main/java/org/apache/asterix/replication/storage/

Line 153: 
> What will happen if the system crashes right after the file is deleted, but
Your point is correct if we care about nodes recovering (catch up) after they crash. However,
currently failed nodes will perform remote recovery when they return.

Line 278:     public Set<String> getLaggingReplicaIndexes(String replicaId, long targetLSN)
throws IOException {
> Can we name it as getLaggingReplicaIndexPaths()?
Actually this method is not used anymore. Its usage was replaced by the one below. Removed.

Line 297:     public HashMap<Long, String> getLaggingReplicaIndexes2(String replicaId,
long targetLSN) throws IOException {
> I understand the naming is hard. :) 
That's an ugly name but I changed it to it anyway :)

To view, visit
To unsubscribe, visit

Gerrit-MessageType: comment
Gerrit-Change-Id: I729fdd1144dbc9ff039b4bc414494860d7553810
Gerrit-PatchSet: 6
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <>
Gerrit-Reviewer: Ian Maxon <>
Gerrit-Reviewer: Jenkins <>
Gerrit-Reviewer: Murtadha Hubail <>
Gerrit-Reviewer: Yingyi Bu <>
Gerrit-Reviewer: Young-Seok Kim <>
Gerrit-Reviewer: abdullah alamoudi <>
Gerrit-HasComments: Yes

View raw message