Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 91B6F200CC8 for ; Fri, 30 Jun 2017 07:48:51 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 90094160BED; Fri, 30 Jun 2017 05:48:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 60E20160BF7 for ; Fri, 30 Jun 2017 07:48:50 +0200 (CEST) Received: (qmail 89531 invoked by uid 500); 30 Jun 2017 05:48:49 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 89485 invoked by uid 99); 30 Jun 2017 05:48:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Jun 2017 05:48:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 50E37E0921; Fri, 30 Jun 2017 05:48:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ijuma@apache.org To: commits@kafka.apache.org Message-Id: <701c774df96a4c128a8eba01b7438c85@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: MINOR: Rename baseTimestamp to firstTimestamp to clarify usage Date: Fri, 30 Jun 2017 05:48:49 +0000 (UTC) archived-at: Fri, 30 Jun 2017 05:48:51 -0000 Repository: kafka Updated Branches: refs/heads/trunk a16475eb3 -> c4193cd1a MINOR: Rename baseTimestamp to firstTimestamp to clarify usage Author: Jason Gustafson Reviewers: Jun Rao , Ismael Juma Closes #3457 from hachikuji/rename-base-timestamp Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c4193cd1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c4193cd1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c4193cd1 Branch: refs/heads/trunk Commit: c4193cd1ad8eeed8383a1c2660383c455e848929 Parents: a16475e Author: Jason Gustafson Authored: Fri Jun 30 06:48:45 2017 +0100 Committer: Ismael Juma Committed: Fri Jun 30 06:48:45 2017 +0100 ---------------------------------------------------------------------- .../kafka/common/record/DefaultRecordBatch.java | 80 ++++++++++++-------- .../common/record/MemoryRecordsBuilder.java | 12 +-- .../kafka/common/record/MutableRecordBatch.java | 2 +- .../common/record/DefaultRecordBatchTest.java | 2 +- .../scala/unit/kafka/log/LogValidatorTest.scala | 2 +- 5 files changed, 57 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c4193cd1/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 2262b33..ff8d3b9 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -48,7 +48,7 @@ import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; * CRC => Uint32 * Attributes => Int16 * LastOffsetDelta => Int32 // also serves as LastSequenceDelta - * BaseTimestamp => Int64 + * FirstTimestamp => Int64 * MaxTimestamp => Int64 * ProducerId => Int64 * ProducerEpoch => Int16 @@ -64,12 +64,28 @@ import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; * computation to avoid the need to recompute the CRC when this field is assigned for every batch that is received by * the broker. The CRC-32C (Castagnoli) polynomial is used for the computation. * - * On compaction: unlike the older message formats, magic v2 and above preserves the first and last offset/sequence + * On Compaction: Unlike the older message formats, magic v2 and above preserves the first and last offset/sequence * numbers from the original batch when the log is cleaned. This is required in order to be able to restore the - * producer's state when the log is reloaded. If we did not retain the last sequence number, for example, then - * after a partition leader failure, the producer might see an OutOfSequence error. The base sequence number must - * be preserved for duplicate checking (the broker checks incoming Produce requests for duplicates by verifying - * that the first and last sequence numbers of the incoming batch match the last from that producer). + * producer's state when the log is reloaded. If we did not retain the last sequence number, then following + * a partition leader failure, once the new leader has rebuilt the producer state from the log, the next sequence + * expected number would no longer be in sync with what was written by the client. This would cause an + * unexpected OutOfOrderSequence error, which is typically fatal. The base sequence number must be preserved for + * duplicate checking: the broker checks incoming Produce requests for duplicates by verifying that the first and + * last sequence numbers of the incoming batch match the last from that producer. + * + * Note that if all of the records in a batch are removed during compaction, the broker may still retain an empty + * batch header in order to preserve the producer sequence information as described above. These empty batches + * are retained only until either a new sequence number is written by the corresponding producer or the producerId + * is expired from lack of activity. + * + * There is no similar need to preserve the timestamp from the original batch after compaction. The FirstTimestamp + * field therefore always reflects the timestamp of the first record in the batch. If the batch is empty, the + * FirstTimestamp will be set to -1 (NO_TIMESTAMP). + * + * Similarly, the MaxTimestamp field reflects the maximum timestamp of the current records if the timestamp type + * is CREATE_TIME. For LOG_APPEND_TIME, on the other hand, the MaxTimestamp field reflects the timestamp set + * by the broker and is preserved after compaction. Additionally, the MaxTimestamp of an empty batch always retains + * the previous value prior to becoming empty. * * The current attributes are given below: * @@ -92,9 +108,9 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe static final int ATTRIBUTE_LENGTH = 2; static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; static final int LAST_OFFSET_DELTA_LENGTH = 4; - static final int BASE_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH; - static final int BASE_TIMESTAMP_LENGTH = 8; - static final int MAX_TIMESTAMP_OFFSET = BASE_TIMESTAMP_OFFSET + BASE_TIMESTAMP_LENGTH; + static final int FIRST_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH; + static final int FIRST_TIMESTAMP_LENGTH = 8; + static final int MAX_TIMESTAMP_OFFSET = FIRST_TIMESTAMP_OFFSET + FIRST_TIMESTAMP_LENGTH; static final int MAX_TIMESTAMP_LENGTH = 8; static final int PRODUCER_ID_OFFSET = MAX_TIMESTAMP_OFFSET + MAX_TIMESTAMP_LENGTH; static final int PRODUCER_ID_LENGTH = 8; @@ -138,10 +154,10 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe * Get the timestamp of the first record in this batch. It is always the create time of the record even if the * timestamp type of the batch is log append time. * - * @return The base timestamp + * @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty */ - public long baseTimestamp() { - return buffer.getLong(BASE_TIMESTAMP_OFFSET); + public long firstTimestamp() { + return buffer.getLong(FIRST_TIMESTAMP_OFFSET); } @Override @@ -243,9 +259,9 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe return new RecordIterator() { @Override - protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) { + protected Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) { try { - return DefaultRecord.readFrom(inputStream, baseOffset, baseTimestamp, baseSequence, logAppendTime); + return DefaultRecord.readFrom(inputStream, baseOffset, firstTimestamp, baseSequence, logAppendTime); } catch (EOFException e) { throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached"); } catch (IOException e) { @@ -278,9 +294,9 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe buffer.position(RECORDS_OFFSET); return new RecordIterator() { @Override - protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) { + protected Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) { try { - return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime); + return DefaultRecord.readFrom(buffer, baseOffset, firstTimestamp, baseSequence, logAppendTime); } catch (BufferUnderflowException e) { throw new InvalidRecordException("Incorrect declared batch size, premature EOF reached"); } @@ -420,7 +436,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe byte magic, CompressionType compressionType, TimestampType timestampType, - long baseTimestamp, + long firstTimestamp, long maxTimestamp, long producerId, short epoch, @@ -431,8 +447,8 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe int numRecords) { if (magic < RecordBatch.CURRENT_MAGIC_VALUE) throw new IllegalArgumentException("Invalid magic value " + magic); - if (baseTimestamp < 0 && baseTimestamp != NO_TIMESTAMP) - throw new IllegalArgumentException("Invalid message timestamp " + baseTimestamp); + if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP) + throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp); short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch); @@ -442,7 +458,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe buffer.putInt(position + PARTITION_LEADER_EPOCH_OFFSET, partitionLeaderEpoch); buffer.put(position + MAGIC_OFFSET, magic); buffer.putShort(position + ATTRIBUTES_OFFSET, attributes); - buffer.putLong(position + BASE_TIMESTAMP_OFFSET, baseTimestamp); + buffer.putLong(position + FIRST_TIMESTAMP_OFFSET, firstTimestamp); buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp); buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta); buffer.putLong(position + PRODUCER_ID_OFFSET, producerId); @@ -466,13 +482,13 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe return 0; int size = RECORD_BATCH_OVERHEAD; - Long baseTimestamp = null; + Long firstTimestamp = null; while (iterator.hasNext()) { Record record = iterator.next(); int offsetDelta = (int) (record.offset() - baseOffset); - if (baseTimestamp == null) - baseTimestamp = record.timestamp(); - long timestampDelta = record.timestamp() - baseTimestamp; + if (firstTimestamp == null) + firstTimestamp = record.timestamp(); + long timestampDelta = record.timestamp() - firstTimestamp; size += DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, record.key(), record.value(), record.headers()); } @@ -486,12 +502,12 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe int size = RECORD_BATCH_OVERHEAD; int offsetDelta = 0; - Long baseTimestamp = null; + Long firstTimestamp = null; while (iterator.hasNext()) { SimpleRecord record = iterator.next(); - if (baseTimestamp == null) - baseTimestamp = record.timestamp(); - long timestampDelta = record.timestamp() - baseTimestamp; + if (firstTimestamp == null) + firstTimestamp = record.timestamp(); + long timestampDelta = record.timestamp() - firstTimestamp; size += DefaultRecord.sizeInBytes(offsetDelta++, timestampDelta, record.key(), record.value(), record.headers()); } @@ -516,7 +532,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe private abstract class RecordIterator implements CloseableIterator { private final Long logAppendTime; private final long baseOffset; - private final long baseTimestamp; + private final long firstTimestamp; private final int baseSequence; private final int numRecords; private int readRecords = 0; @@ -524,7 +540,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe public RecordIterator() { this.logAppendTime = timestampType() == TimestampType.LOG_APPEND_TIME ? maxTimestamp() : null; this.baseOffset = baseOffset(); - this.baseTimestamp = baseTimestamp(); + this.firstTimestamp = firstTimestamp(); this.baseSequence = baseSequence(); int numRecords = count(); if (numRecords < 0) @@ -544,7 +560,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe throw new NoSuchElementException(); readRecords++; - Record rec = readNext(baseOffset, baseTimestamp, baseSequence, logAppendTime); + Record rec = readNext(baseOffset, firstTimestamp, baseSequence, logAppendTime); if (readRecords == numRecords) { // Validate that the actual size of the batch is equal to declared size // by checking that after reading declared number of items, there no items left @@ -555,7 +571,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe return rec; } - protected abstract Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime); + protected abstract Record readNext(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime); protected abstract boolean ensureNoneRemaining(); http://git-wip-us.apache.org/repos/asf/kafka/blob/c4193cd1/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 68b0bf0..19d25d7 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -71,7 +71,7 @@ public class MemoryRecordsBuilder { private long maxTimestamp = RecordBatch.NO_TIMESTAMP; private long offsetOfMaxTimestamp = -1; private Long lastOffset = null; - private Long baseTimestamp = null; + private Long firstTimestamp = null; private MemoryRecords builtRecords; private boolean aborted = false; @@ -342,7 +342,7 @@ public class MemoryRecordsBuilder { maxTimestamp = this.maxTimestamp; DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType, - baseTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, + firstTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, numRecords); buffer.position(pos); @@ -389,8 +389,8 @@ public class MemoryRecordsBuilder { if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0) throw new IllegalArgumentException("Magic v" + magic + " does not support record headers"); - if (baseTimestamp == null) - baseTimestamp = timestamp; + if (firstTimestamp == null) + firstTimestamp = timestamp; if (magic > RecordBatch.MAGIC_VALUE_V1) { appendDefaultRecord(offset, timestamp, key, value, headers); @@ -605,7 +605,7 @@ public class MemoryRecordsBuilder { Header[] headers) throws IOException { ensureOpenForRecordAppend(); int offsetDelta = (int) (offset - baseOffset); - long timestampDelta = timestamp - baseTimestamp; + long timestampDelta = timestamp - firstTimestamp; int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers); recordWritten(offset, timestamp, sizeInBytes); } @@ -710,7 +710,7 @@ public class MemoryRecordsBuilder { recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value); } else { int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1); - long timestampDelta = baseTimestamp == null ? 0 : timestamp - baseTimestamp; + long timestampDelta = firstTimestamp == null ? 0 : timestamp - firstTimestamp; recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, headers); } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4193cd1/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java index 8049469..c13bb5a 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java @@ -33,7 +33,7 @@ public interface MutableRecordBatch extends RecordBatch { /** * Set the max timestamp for this batch. When using log append time, this effectively overrides the individual * timestamps of all the records contained in the batch. To avoid recompression, the record fields are not updated - * by this method, but clients ignore them if the timestamp time is log append time. Note that baseTimestamp is not + * by this method, but clients ignore them if the timestamp time is log append time. Note that firstTimestamp is not * updated by this method. * * This typically requires re-computation of the batch's CRC. http://git-wip-us.apache.org/repos/asf/kafka/blob/c4193cd1/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java index 587bf14..ab8cbb7 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java @@ -63,7 +63,7 @@ public class DefaultRecordBatchTest { assertEquals(isTransactional, batch.isTransactional()); assertEquals(timestampType, batch.timestampType()); assertEquals(timestamp, batch.maxTimestamp()); - assertEquals(RecordBatch.NO_TIMESTAMP, batch.baseTimestamp()); + assertEquals(RecordBatch.NO_TIMESTAMP, batch.firstTimestamp()); assertEquals(isControlBatch, batch.isControlBatch()); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4193cd1/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 2225ca6..a5d415e 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -991,7 +991,7 @@ class LogValidatorTest { def maybeCheckBaseTimestamp(expected: Long, batch: RecordBatch): Unit = { batch match { case b: DefaultRecordBatch => - assertEquals(s"Unexpected base timestamp of batch $batch", expected, b.baseTimestamp) + assertEquals(s"Unexpected base timestamp of batch $batch", expected, b.firstTimestamp) case _ => // no-op } }