From commits-return-80743-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Tue Nov 20 15:52:58 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 37ED71807A5 for ; Tue, 20 Nov 2018 15:52:56 +0100 (CET) Received: (qmail 66233 invoked by uid 500); 20 Nov 2018 14:52:54 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 65752 invoked by uid 99); 20 Nov 2018 14:52:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Nov 2018 14:52:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4B8BAE1325; Tue, 20 Nov 2018 14:52:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Tue, 20 Nov 2018 14:53:00 -0000 Message-Id: In-Reply-To: <44101e1d0ad0460587f2f524e57aa71f@git.apache.org> References: <44101e1d0ad0460587f2f524e57aa71f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/11] hbase-site git commit: Published site at 405bf5e6383a09f435baadbac6c389e9f6c43ac6. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/c8b83ace/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.Loader.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.Loader.html b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.Loader.html index 257263c..7a81ab8 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.Loader.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.Loader.html @@ -96,183 +96,180 @@ 088 Loader loader) throws IOException { 089 ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader); 090 tracker.setKeepDeletes(true); -091 try { -092 // Ignore the last log which is current active log. -093 while (logs.hasNext()) { -094 ProcedureWALFile log = logs.next(); -095 log.open(); -096 try { -097 reader.read(log); -098 } finally { -099 log.close(); -100 } -101 } -102 reader.finish(); -103 -104 // The tracker is now updated with all the procedures read from the logs -105 if (tracker.isPartial()) { -106 tracker.setPartialFlag(false); -107 } -108 tracker.resetModified(); -109 } finally { -110 tracker.setKeepDeletes(false); -111 } -112 } -113 -114 public static void writeHeader(OutputStream stream, ProcedureWALHeader header) -115 throws IOException { -116 header.writeDelimitedTo(stream); -117 } -118 -119 /* -120 * +-----------------+ -121 * | END OF WAL DATA | <---+ -122 * +-----------------+ | -123 * | | | -124 * | Tracker | | -125 * | | | -126 * +-----------------+ | -127 * | version | | -128 * +-----------------+ | -129 * | TRAILER_MAGIC | | -130 * +-----------------+ | -131 * | offset |-----+ -132 * +-----------------+ -133 */ -134 public static long writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker) -135 throws IOException { -136 long offset = stream.getPos(); -137 -138 // Write EOF Entry -139 ProcedureWALEntry.newBuilder() -140 .setType(ProcedureWALEntry.Type.PROCEDURE_WAL_EOF) -141 .build().writeDelimitedTo(stream); +091 // Ignore the last log which is current active log. +092 while (logs.hasNext()) { +093 ProcedureWALFile log = logs.next(); +094 log.open(); +095 try { +096 reader.read(log); +097 } finally { +098 log.close(); +099 } +100 } +101 reader.finish(); +102 +103 // The tracker is now updated with all the procedures read from the logs +104 if (tracker.isPartial()) { +105 tracker.setPartialFlag(false); +106 } +107 tracker.resetModified(); +108 tracker.setKeepDeletes(false); +109 } +110 +111 public static void writeHeader(OutputStream stream, ProcedureWALHeader header) +112 throws IOException { +113 header.writeDelimitedTo(stream); +114 } +115 +116 /* +117 * +-----------------+ +118 * | END OF WAL DATA | <---+ +119 * +-----------------+ | +120 * | | | +121 * | Tracker | | +122 * | | | +123 * +-----------------+ | +124 * | version | | +125 * +-----------------+ | +126 * | TRAILER_MAGIC | | +127 * +-----------------+ | +128 * | offset |-----+ +129 * +-----------------+ +130 */ +131 public static long writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker) +132 throws IOException { +133 long offset = stream.getPos(); +134 +135 // Write EOF Entry +136 ProcedureWALEntry.newBuilder() +137 .setType(ProcedureWALEntry.Type.PROCEDURE_WAL_EOF) +138 .build().writeDelimitedTo(stream); +139 +140 // Write Tracker +141 tracker.toProto().writeDelimitedTo(stream); 142 -143 // Write Tracker -144 tracker.toProto().writeDelimitedTo(stream); -145 -146 stream.write(TRAILER_VERSION); -147 StreamUtils.writeLong(stream, TRAILER_MAGIC); -148 StreamUtils.writeLong(stream, offset); -149 return stream.getPos() - offset; -150 } -151 -152 public static ProcedureWALHeader readHeader(InputStream stream) -153 throws IOException { -154 ProcedureWALHeader header; -155 try { -156 header = ProcedureWALHeader.parseDelimitedFrom(stream); -157 } catch (InvalidProtocolBufferException e) { -158 throw new InvalidWALDataException(e); -159 } -160 -161 if (header == null) { -162 throw new InvalidWALDataException("No data available to read the Header"); -163 } -164 -165 if (header.getVersion() < 0 || header.getVersion() != HEADER_VERSION) { -166 throw new InvalidWALDataException("Invalid Header version. got " + header.getVersion() + -167 " expected " + HEADER_VERSION); -168 } -169 -170 if (header.getType() < 0 || header.getType() > LOG_TYPE_MAX_VALID) { -171 throw new InvalidWALDataException("Invalid header type. got " + header.getType()); -172 } +143 stream.write(TRAILER_VERSION); +144 StreamUtils.writeLong(stream, TRAILER_MAGIC); +145 StreamUtils.writeLong(stream, offset); +146 return stream.getPos() - offset; +147 } +148 +149 public static ProcedureWALHeader readHeader(InputStream stream) +150 throws IOException { +151 ProcedureWALHeader header; +152 try { +153 header = ProcedureWALHeader.parseDelimitedFrom(stream); +154 } catch (InvalidProtocolBufferException e) { +155 throw new InvalidWALDataException(e); +156 } +157 +158 if (header == null) { +159 throw new InvalidWALDataException("No data available to read the Header"); +160 } +161 +162 if (header.getVersion() < 0 || header.getVersion() != HEADER_VERSION) { +163 throw new InvalidWALDataException("Invalid Header version. got " + header.getVersion() + +164 " expected " + HEADER_VERSION); +165 } +166 +167 if (header.getType() < 0 || header.getType() > LOG_TYPE_MAX_VALID) { +168 throw new InvalidWALDataException("Invalid header type. got " + header.getType()); +169 } +170 +171 return header; +172 } 173 -174 return header; -175 } -176 -177 public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size) -178 throws IOException { -179 // Beginning of the Trailer Jump. 17 = 1 byte version + 8 byte magic + 8 byte offset -180 long trailerPos = size - 17; -181 -182 if (trailerPos < startPos) { -183 throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos); -184 } -185 -186 stream.seek(trailerPos); -187 int version = stream.read(); -188 if (version != TRAILER_VERSION) { -189 throw new InvalidWALDataException("Invalid Trailer version. got " + version + -190 " expected " + TRAILER_VERSION); -191 } -192 -193 long magic = StreamUtils.readLong(stream); -194 if (magic != TRAILER_MAGIC) { -195 throw new InvalidWALDataException("Invalid Trailer magic. got " + magic + -196 " expected " + TRAILER_MAGIC); -197 } +174 public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size) +175 throws IOException { +176 // Beginning of the Trailer Jump. 17 = 1 byte version + 8 byte magic + 8 byte offset +177 long trailerPos = size - 17; +178 +179 if (trailerPos < startPos) { +180 throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos); +181 } +182 +183 stream.seek(trailerPos); +184 int version = stream.read(); +185 if (version != TRAILER_VERSION) { +186 throw new InvalidWALDataException("Invalid Trailer version. got " + version + +187 " expected " + TRAILER_VERSION); +188 } +189 +190 long magic = StreamUtils.readLong(stream); +191 if (magic != TRAILER_MAGIC) { +192 throw new InvalidWALDataException("Invalid Trailer magic. got " + magic + +193 " expected " + TRAILER_MAGIC); +194 } +195 +196 long trailerOffset = StreamUtils.readLong(stream); +197 stream.seek(trailerOffset); 198 -199 long trailerOffset = StreamUtils.readLong(stream); -200 stream.seek(trailerOffset); -201 -202 ProcedureWALEntry entry = readEntry(stream); -203 if (entry.getType() != ProcedureWALEntry.Type.PROCEDURE_WAL_EOF) { -204 throw new InvalidWALDataException("Invalid Trailer begin"); -205 } -206 -207 ProcedureWALTrailer trailer = ProcedureWALTrailer.newBuilder() -208 .setVersion(version) -209 .setTrackerPos(stream.getPos()) -210 .build(); -211 return trailer; -212 } -213 -214 public static ProcedureWALEntry readEntry(InputStream stream) throws IOException { -215 return ProcedureWALEntry.parseDelimitedFrom(stream); -216 } -217 -218 public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type, -219 Procedure<?> proc, Procedure<?>[] subprocs) throws IOException { -220 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); -221 builder.setType(type); -222 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); -223 if (subprocs != null) { -224 for (int i = 0; i < subprocs.length; ++i) { -225 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(subprocs[i])); -226 } -227 } -228 builder.build().writeDelimitedTo(slot); -229 } -230 -231 public static void writeInsert(ByteSlot slot, Procedure<?> proc) -232 throws IOException { -233 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null); -234 } -235 -236 public static void writeInsert(ByteSlot slot, Procedure<?> proc, Procedure<?>[] subprocs) -237 throws IOException { -238 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs); -239 } -240 -241 public static void writeUpdate(ByteSlot slot, Procedure<?> proc) -242 throws IOException { -243 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null); -244 } -245 -246 public static void writeDelete(ByteSlot slot, long procId) -247 throws IOException { -248 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); -249 builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); -250 builder.setProcId(procId); -251 builder.build().writeDelimitedTo(slot); -252 } -253 -254 public static void writeDelete(ByteSlot slot, Procedure<?> proc, long[] subprocs) -255 throws IOException { -256 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); -257 builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); -258 builder.setProcId(proc.getProcId()); -259 if (subprocs != null) { -260 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); -261 for (int i = 0; i < subprocs.length; ++i) { -262 builder.addChildId(subprocs[i]); -263 } -264 } -265 builder.build().writeDelimitedTo(slot); -266 } -267} +199 ProcedureWALEntry entry = readEntry(stream); +200 if (entry.getType() != ProcedureWALEntry.Type.PROCEDURE_WAL_EOF) { +201 throw new InvalidWALDataException("Invalid Trailer begin"); +202 } +203 +204 ProcedureWALTrailer trailer = ProcedureWALTrailer.newBuilder() +205 .setVersion(version) +206 .setTrackerPos(stream.getPos()) +207 .build(); +208 return trailer; +209 } +210 +211 public static ProcedureWALEntry readEntry(InputStream stream) throws IOException { +212 return ProcedureWALEntry.parseDelimitedFrom(stream); +213 } +214 +215 public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type, +216 Procedure<?> proc, Procedure<?>[] subprocs) throws IOException { +217 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); +218 builder.setType(type); +219 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); +220 if (subprocs != null) { +221 for (int i = 0; i < subprocs.length; ++i) { +222 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(subprocs[i])); +223 } +224 } +225 builder.build().writeDelimitedTo(slot); +226 } +227 +228 public static void writeInsert(ByteSlot slot, Procedure<?> proc) +229 throws IOException { +230 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null); +231 } +232 +233 public static void writeInsert(ByteSlot slot, Procedure<?> proc, Procedure<?>[] subprocs) +234 throws IOException { +235 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs); +236 } +237 +238 public static void writeUpdate(ByteSlot slot, Procedure<?> proc) +239 throws IOException { +240 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null); +241 } +242 +243 public static void writeDelete(ByteSlot slot, long procId) +244 throws IOException { +245 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); +246 builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); +247 builder.setProcId(procId); +248 builder.build().writeDelimitedTo(slot); +249 } +250 +251 public static void writeDelete(ByteSlot slot, Procedure<?> proc, long[] subprocs) +252 throws IOException { +253 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); +254 builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); +255 builder.setProcId(proc.getProcId()); +256 if (subprocs != null) { +257 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); +258 for (int i = 0; i < subprocs.length; ++i) { +259 builder.addChildId(subprocs[i]); +260 } +261 } +262 builder.build().writeDelimitedTo(slot); +263 } +264} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/c8b83ace/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.html b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.html index 257263c..7a81ab8 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.html @@ -96,183 +96,180 @@ 088 Loader loader) throws IOException { 089 ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader); 090 tracker.setKeepDeletes(true); -091 try { -092 // Ignore the last log which is current active log. -093 while (logs.hasNext()) { -094 ProcedureWALFile log = logs.next(); -095 log.open(); -096 try { -097 reader.read(log); -098 } finally { -099 log.close(); -100 } -101 } -102 reader.finish(); -103 -104 // The tracker is now updated with all the procedures read from the logs -105 if (tracker.isPartial()) { -106 tracker.setPartialFlag(false); -107 } -108 tracker.resetModified(); -109 } finally { -110 tracker.setKeepDeletes(false); -111 } -112 } -113 -114 public static void writeHeader(OutputStream stream, ProcedureWALHeader header) -115 throws IOException { -116 header.writeDelimitedTo(stream); -117 } -118 -119 /* -120 * +-----------------+ -121 * | END OF WAL DATA | <---+ -122 * +-----------------+ | -123 * | | | -124 * | Tracker | | -125 * | | | -126 * +-----------------+ | -127 * | version | | -128 * +-----------------+ | -129 * | TRAILER_MAGIC | | -130 * +-----------------+ | -131 * | offset |-----+ -132 * +-----------------+ -133 */ -134 public static long writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker) -135 throws IOException { -136 long offset = stream.getPos(); -137 -138 // Write EOF Entry -139 ProcedureWALEntry.newBuilder() -140 .setType(ProcedureWALEntry.Type.PROCEDURE_WAL_EOF) -141 .build().writeDelimitedTo(stream); +091 // Ignore the last log which is current active log. +092 while (logs.hasNext()) { +093 ProcedureWALFile log = logs.next(); +094 log.open(); +095 try { +096 reader.read(log); +097 } finally { +098 log.close(); +099 } +100 } +101 reader.finish(); +102 +103 // The tracker is now updated with all the procedures read from the logs +104 if (tracker.isPartial()) { +105 tracker.setPartialFlag(false); +106 } +107 tracker.resetModified(); +108 tracker.setKeepDeletes(false); +109 } +110 +111 public static void writeHeader(OutputStream stream, ProcedureWALHeader header) +112 throws IOException { +113 header.writeDelimitedTo(stream); +114 } +115 +116 /* +117 * +-----------------+ +118 * | END OF WAL DATA | <---+ +119 * +-----------------+ | +120 * | | | +121 * | Tracker | | +122 * | | | +123 * +-----------------+ | +124 * | version | | +125 * +-----------------+ | +126 * | TRAILER_MAGIC | | +127 * +-----------------+ | +128 * | offset |-----+ +129 * +-----------------+ +130 */ +131 public static long writeTrailer(FSDataOutputStream stream, ProcedureStoreTracker tracker) +132 throws IOException { +133 long offset = stream.getPos(); +134 +135 // Write EOF Entry +136 ProcedureWALEntry.newBuilder() +137 .setType(ProcedureWALEntry.Type.PROCEDURE_WAL_EOF) +138 .build().writeDelimitedTo(stream); +139 +140 // Write Tracker +141 tracker.toProto().writeDelimitedTo(stream); 142 -143 // Write Tracker -144 tracker.toProto().writeDelimitedTo(stream); -145 -146 stream.write(TRAILER_VERSION); -147 StreamUtils.writeLong(stream, TRAILER_MAGIC); -148 StreamUtils.writeLong(stream, offset); -149 return stream.getPos() - offset; -150 } -151 -152 public static ProcedureWALHeader readHeader(InputStream stream) -153 throws IOException { -154 ProcedureWALHeader header; -155 try { -156 header = ProcedureWALHeader.parseDelimitedFrom(stream); -157 } catch (InvalidProtocolBufferException e) { -158 throw new InvalidWALDataException(e); -159 } -160 -161 if (header == null) { -162 throw new InvalidWALDataException("No data available to read the Header"); -163 } -164 -165 if (header.getVersion() < 0 || header.getVersion() != HEADER_VERSION) { -166 throw new InvalidWALDataException("Invalid Header version. got " + header.getVersion() + -167 " expected " + HEADER_VERSION); -168 } -169 -170 if (header.getType() < 0 || header.getType() > LOG_TYPE_MAX_VALID) { -171 throw new InvalidWALDataException("Invalid header type. got " + header.getType()); -172 } +143 stream.write(TRAILER_VERSION); +144 StreamUtils.writeLong(stream, TRAILER_MAGIC); +145 StreamUtils.writeLong(stream, offset); +146 return stream.getPos() - offset; +147 } +148 +149 public static ProcedureWALHeader readHeader(InputStream stream) +150 throws IOException { +151 ProcedureWALHeader header; +152 try { +153 header = ProcedureWALHeader.parseDelimitedFrom(stream); +154 } catch (InvalidProtocolBufferException e) { +155 throw new InvalidWALDataException(e); +156 } +157 +158 if (header == null) { +159 throw new InvalidWALDataException("No data available to read the Header"); +160 } +161 +162 if (header.getVersion() < 0 || header.getVersion() != HEADER_VERSION) { +163 throw new InvalidWALDataException("Invalid Header version. got " + header.getVersion() + +164 " expected " + HEADER_VERSION); +165 } +166 +167 if (header.getType() < 0 || header.getType() > LOG_TYPE_MAX_VALID) { +168 throw new InvalidWALDataException("Invalid header type. got " + header.getType()); +169 } +170 +171 return header; +172 } 173 -174 return header; -175 } -176 -177 public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size) -178 throws IOException { -179 // Beginning of the Trailer Jump. 17 = 1 byte version + 8 byte magic + 8 byte offset -180 long trailerPos = size - 17; -181 -182 if (trailerPos < startPos) { -183 throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos); -184 } -185 -186 stream.seek(trailerPos); -187 int version = stream.read(); -188 if (version != TRAILER_VERSION) { -189 throw new InvalidWALDataException("Invalid Trailer version. got " + version + -190 " expected " + TRAILER_VERSION); -191 } -192 -193 long magic = StreamUtils.readLong(stream); -194 if (magic != TRAILER_MAGIC) { -195 throw new InvalidWALDataException("Invalid Trailer magic. got " + magic + -196 " expected " + TRAILER_MAGIC); -197 } +174 public static ProcedureWALTrailer readTrailer(FSDataInputStream stream, long startPos, long size) +175 throws IOException { +176 // Beginning of the Trailer Jump. 17 = 1 byte version + 8 byte magic + 8 byte offset +177 long trailerPos = size - 17; +178 +179 if (trailerPos < startPos) { +180 throw new InvalidWALDataException("Missing trailer: size=" + size + " startPos=" + startPos); +181 } +182 +183 stream.seek(trailerPos); +184 int version = stream.read(); +185 if (version != TRAILER_VERSION) { +186 throw new InvalidWALDataException("Invalid Trailer version. got " + version + +187 " expected " + TRAILER_VERSION); +188 } +189 +190 long magic = StreamUtils.readLong(stream); +191 if (magic != TRAILER_MAGIC) { +192 throw new InvalidWALDataException("Invalid Trailer magic. got " + magic + +193 " expected " + TRAILER_MAGIC); +194 } +195 +196 long trailerOffset = StreamUtils.readLong(stream); +197 stream.seek(trailerOffset); 198 -199 long trailerOffset = StreamUtils.readLong(stream); -200 stream.seek(trailerOffset); -201 -202 ProcedureWALEntry entry = readEntry(stream); -203 if (entry.getType() != ProcedureWALEntry.Type.PROCEDURE_WAL_EOF) { -204 throw new InvalidWALDataException("Invalid Trailer begin"); -205 } -206 -207 ProcedureWALTrailer trailer = ProcedureWALTrailer.newBuilder() -208 .setVersion(version) -209 .setTrackerPos(stream.getPos()) -210 .build(); -211 return trailer; -212 } -213 -214 public static ProcedureWALEntry readEntry(InputStream stream) throws IOException { -215 return ProcedureWALEntry.parseDelimitedFrom(stream); -216 } -217 -218 public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type, -219 Procedure<?> proc, Procedure<?>[] subprocs) throws IOException { -220 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); -221 builder.setType(type); -222 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); -223 if (subprocs != null) { -224 for (int i = 0; i < subprocs.length; ++i) { -225 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(subprocs[i])); -226 } -227 } -228 builder.build().writeDelimitedTo(slot); -229 } -230 -231 public static void writeInsert(ByteSlot slot, Procedure<?> proc) -232 throws IOException { -233 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null); -234 } -235 -236 public static void writeInsert(ByteSlot slot, Procedure<?> proc, Procedure<?>[] subprocs) -237 throws IOException { -238 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs); -239 } -240 -241 public static void writeUpdate(ByteSlot slot, Procedure<?> proc) -242 throws IOException { -243 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null); -244 } -245 -246 public static void writeDelete(ByteSlot slot, long procId) -247 throws IOException { -248 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); -249 builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); -250 builder.setProcId(procId); -251 builder.build().writeDelimitedTo(slot); -252 } -253 -254 public static void writeDelete(ByteSlot slot, Procedure<?> proc, long[] subprocs) -255 throws IOException { -256 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); -257 builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); -258 builder.setProcId(proc.getProcId()); -259 if (subprocs != null) { -260 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); -261 for (int i = 0; i < subprocs.length; ++i) { -262 builder.addChildId(subprocs[i]); -263 } -264 } -265 builder.build().writeDelimitedTo(slot); -266 } -267} +199 ProcedureWALEntry entry = readEntry(stream); +200 if (entry.getType() != ProcedureWALEntry.Type.PROCEDURE_WAL_EOF) { +201 throw new InvalidWALDataException("Invalid Trailer begin"); +202 } +203 +204 ProcedureWALTrailer trailer = ProcedureWALTrailer.newBuilder() +205 .setVersion(version) +206 .setTrackerPos(stream.getPos()) +207 .build(); +208 return trailer; +209 } +210 +211 public static ProcedureWALEntry readEntry(InputStream stream) throws IOException { +212 return ProcedureWALEntry.parseDelimitedFrom(stream); +213 } +214 +215 public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type, +216 Procedure<?> proc, Procedure<?>[] subprocs) throws IOException { +217 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); +218 builder.setType(type); +219 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); +220 if (subprocs != null) { +221 for (int i = 0; i < subprocs.length; ++i) { +222 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(subprocs[i])); +223 } +224 } +225 builder.build().writeDelimitedTo(slot); +226 } +227 +228 public static void writeInsert(ByteSlot slot, Procedure<?> proc) +229 throws IOException { +230 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null); +231 } +232 +233 public static void writeInsert(ByteSlot slot, Procedure<?> proc, Procedure<?>[] subprocs) +234 throws IOException { +235 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs); +236 } +237 +238 public static void writeUpdate(ByteSlot slot, Procedure<?> proc) +239 throws IOException { +240 writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null); +241 } +242 +243 public static void writeDelete(ByteSlot slot, long procId) +244 throws IOException { +245 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); +246 builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); +247 builder.setProcId(procId); +248 builder.build().writeDelimitedTo(slot); +249 } +250 +251 public static void writeDelete(ByteSlot slot, Procedure<?> proc, long[] subprocs) +252 throws IOException { +253 final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); +254 builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); +255 builder.setProcId(proc.getProcId()); +256 if (subprocs != null) { +257 builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); +258 for (int i = 0; i < subprocs.length; ++i) { +259 builder.addChildId(subprocs[i]); +260 } +261 } +262 builder.build().writeDelimitedTo(slot); +263 } +264}