From notifications-return-110124-archive-asf-public=cust-asf.ponee.io@skywalking.apache.org Sat Dec 26 04:25:58 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id DBF79180634 for ; Sat, 26 Dec 2020 05:25:57 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id 17A2C417AF for ; Sat, 26 Dec 2020 04:25:57 +0000 (UTC) Received: (qmail 25770 invoked by uid 500); 26 Dec 2020 04:25:56 -0000 Mailing-List: contact notifications-help@skywalking.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@skywalking.apache.org Delivered-To: mailing list notifications@skywalking.apache.org Received: (qmail 25760 invoked by uid 99); 26 Dec 2020 04:25:56 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 26 Dec 2020 04:25:56 +0000 From: =?utf-8?q?GitBox?= To: notifications@skywalking.apache.org Subject: =?utf-8?q?=5BGitHub=5D_=5Bskywalking-satellite=5D_EvanLjp_commented_on_a_cha?= =?utf-8?q?nge_in_pull_request_=2310=3A_mmap-queue-plugin?= Message-ID: <160895675605.15632.9670040349300598465.asfpy@gitbox.apache.org> Date: Sat, 26 Dec 2020 04:25:56 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit References: In-Reply-To: EvanLjp commented on a change in pull request #10: URL: https://github.com/apache/skywalking-satellite/pull/10#discussion_r548941349 ########## File path: plugins/queue/mmap/queue_opreation.go ########## @@ -0,0 +1,167 @@ +// MIT License +// +// Copyright (c) 2018 Aman Mangal +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package mmap + +import ( + "fmt" +) + +// Because the design of the mmap-queue in Satellite references the design of the +// bigqueue(https://github.com/grandecola/bigqueue), the queue operation file retains +// the original author license. +// +// The reason why we references the source codes of bigqueue rather than using the lib +// is the file queue in Satellite is like following. +// 1. Only one consumer and publisher in the Satellite queue. +// 2. Reusing files strategy is required to reduce the creation times in the Satellite queue. +// 3. More complex OFFSET design is needed to ensure the final stability of data. + +const uInt64Size = 8 + +// push writes the data into the file system. It first writes the length of the data, +// then the data itself. It means the whole data may not exist in the one segments. +func (q *Queue) push(bytes []byte) error { + if q.isFull() { + return fmt.Errorf("cannot push data when the queue is full") + } + id, offset := q.meta.GetWritingOffset() + id, offset, err := q.writeLength(len(bytes), id, offset) + if err != nil { + return err + } + id, offset, err = q.writeBytes(bytes, id, offset) + if err != nil { + return err + } + q.meta.PutWritingOffset(id, offset) + q.unflushedNum++ + if q.unflushedNum == q.FlushCeilingNum { + q.flushChannel <- struct{}{} + q.unflushedNum = 0 + } + return nil +} + +// pop reads the data from the file system. It first reads the length of the data, +// then the data itself. It means the whole data may not exist in the one segments. +func (q *Queue) pop() (data []byte, rid, roffset int64, err error) { + if q.isEmpty() { + return nil, 0, 0, fmt.Errorf("cannot read data when the queue is empty") + } + id, offset := q.meta.GetReadingOffset() + id, offset, length, err := q.readLength(id, offset) + if err != nil { + return nil, 0, 0, err + } + bytes, id, offset, err := q.readBytes(id, offset, length) + if err != nil { + return nil, 0, 0, err + } + q.meta.PutReadingOffset(id, offset) + return bytes, id, offset, nil +} + +// readBytes reads bytes into the memory mapped file. +func (q *Queue) readBytes(id, offset int64, length int) (data []byte, newID, newOffset int64, err error) { + counter := 0 + res := make([]byte, length) + for { + segment, err := q.GetSegment(id) + if err != nil { + return nil, 0, 0, err + } + readBytes, err := segment.ReadAt(res[counter:], offset) + if err != nil { + return nil, 0, 0, err + } + counter += readBytes + offset += int64(readBytes) + if offset == int64(q.SegmentSize) { + id, offset = id+1, 0 + } + if counter == length { + break + } + } + return res, id, offset, nil +} + +// readLength reads the data length with 8 Bits spaces. +func (q *Queue) readLength(id, offset int64) (newID, newOffset int64, length int, err error) { + if offset+uInt64Size > int64(q.SegmentSize) { + id, offset = id+1, 0 + } + segment, err := q.GetSegment(id) + if err != nil { + return 0, 0, 0, err + } + num := segment.ReadUint64At(offset) + offset += uInt64Size + if offset == int64(q.SegmentSize) { + id, offset = id+1, 0 + } + return id, offset, int(num), nil +} + +// writeLength write the data length with 8 Bits spaces. +func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset int64, err error) { + if offset+uInt64Size > int64(q.SegmentSize) { Review comment: the method if for storing data size rather than Segment size ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org