Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D5B7018903 for ; Mon, 25 Jan 2016 17:03:46 +0000 (UTC) Received: (qmail 78223 invoked by uid 500); 25 Jan 2016 17:03:46 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 78058 invoked by uid 500); 25 Jan 2016 17:03:46 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 77489 invoked by uid 99); 25 Jan 2016 17:03:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 25 Jan 2016 17:03:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9E478E0A31; Mon, 25 Jan 2016 17:03:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Mon, 25 Jan 2016 17:03:53 -0000 Message-Id: <7ffbe78dd48e42f5ae592623acd930ad@git.apache.org> In-Reply-To: <559c9bd584d2468fa814e7b2d13a1662@git.apache.org> References: <559c9bd584d2468fa814e7b2d13a1662@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/51] [partial] hbase-site git commit: Published site at a87d9560fcf4803bdd7a01b6e4ec21435d4e11b9. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/f8d6f420/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.Entry.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.Entry.html b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.Entry.html index 0ba2dff..3541594 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.Entry.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.Entry.html @@ -30,683 +30,719 @@ 022 023import org.apache.commons.logging.Log; 024import org.apache.commons.logging.LogFactory; -025import org.apache.hadoop.hbase.classification.InterfaceAudience; -026import org.apache.hadoop.hbase.classification.InterfaceStability; -027import org.apache.hadoop.fs.FSDataInputStream; -028import org.apache.hadoop.hbase.procedure2.Procedure; -029import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; -030import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; -031import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; -032import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry; -033 -034/** -035 * Helper class that loads the procedures stored in a WAL -036 */ -037@InterfaceAudience.Private -038@InterfaceStability.Evolving -039public class ProcedureWALFormatReader { -040 private static final Log LOG = LogFactory.getLog(ProcedureWALFormatReader.class); -041 -042 // ============================================================================================== -043 // We read the WALs in reverse order. from the newest to the oldest. -044 // We have different entry types: -045 // - INIT: Procedure submitted by the user (also known as 'root procedure') -046 // - INSERT: Children added to the procedure <parentId>:[<childId>, ...] -047 // - UPDATE: The specified procedure was updated -048 // - DELETE: The procedure was removed (completed/rolledback and result TTL expired) -049 // -050 // In the WAL we can find multiple times the same procedure as UPDATE or INSERT. -051 // We read the WAL from top to bottom, so every time we find an entry of the -052 // same procedure, that will be the "latest" update. -053 // -054 // We keep two in-memory maps: -055 // - localProcedureMap: is the map containing the entries in the WAL we are processing -056 // - procedureMap: is the map containing all the procedures we found up to the WAL in process. -057 // localProcedureMap is merged with the procedureMap once we reach the WAL EOF. -058 // -059 // Since we are reading the WALs in reverse order (newest to oldest), -060 // if we find an entry related to a procedure we already have in 'procedureMap' we can discard it. -061 // -062 // The WAL is append-only so the last procedure in the WAL is the one that -063 // was in execution at the time we crashed/closed the server. -064 // given that, the procedure replay order can be inferred by the WAL order. -065 // -066 // Example: -067 // WAL-2: [A, B, A, C, D] -068 // WAL-1: [F, G, A, F, B] -069 // Replay-Order: [D, C, A, B, F, G] -070 // -071 // The "localProcedureMap" keeps a "replayOrder" list. Every time we add the -072 // record to the map that record is moved to the head of the "replayOrder" list. -073 // Using the example above: -074 // WAL-2 localProcedureMap.replayOrder is [D, C, A, B] -075 // WAL-1 localProcedureMap.replayOrder is [F, G] -076 // -077 // each time we reach the WAL-EOF, the "replayOrder" list is merged/appended in 'procedureMap' -078 // so using the example above we end up with: [D, C, A, B] + [F, G] as replay order. -079 // -080 // Fast Start: INIT/INSERT record and StackIDs -081 // --------------------------------------------- -082 // We have to special record, INIT and INSERT that tracks the first time -083 // the procedure was added to the WAL. We can use that information to be able -084 // to start procedures before reaching the end of the WAL, or before reading all the WALs. -085 // but in some cases the WAL with that record can be already gone. -086 // In alternative we can use the stackIds on each procedure, -087 // to identify when a procedure is ready to start. -088 // If there are gaps in the sum of the stackIds we need to read more WALs. -089 // -090 // Example (all procs child of A): -091 // WAL-2: [A, B] A stackIds = [0, 4], B stackIds = [1, 5] -092 // WAL-1: [A, B, C, D] -093 // -094 // In the case above we need to read one more WAL to be able to consider -095 // the root procedure A and all children as ready. -096 // ============================================================================================== -097 private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024); -098 private final WalProcedureMap procedureMap = new WalProcedureMap(1024); -099 -100 //private long compactionLogId; -101 private long maxProcId = 0; -102 -103 private final ProcedureStoreTracker tracker; -104 private final boolean hasFastStartSupport; -105 -106 public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) { -107 this.tracker = tracker; -108 // we support fast-start only if we have a clean shutdown. -109 this.hasFastStartSupport = !tracker.isEmpty(); -110 } -111 -112 public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException { -113 FSDataInputStream stream = log.getStream(); -114 try { -115 boolean hasMore = true; -116 while (hasMore) { -117 ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream); -118 if (entry == null) { -119 LOG.warn("nothing left to decode. exiting with missing EOF"); -120 hasMore = false; -121 break; -122 } -123 switch (entry.getType()) { -124 case PROCEDURE_WAL_INIT: -125 readInitEntry(entry); -126 break; -127 case PROCEDURE_WAL_INSERT: -128 readInsertEntry(entry); -129 break; -130 case PROCEDURE_WAL_UPDATE: -131 case PROCEDURE_WAL_COMPACT: -132 readUpdateEntry(entry); -133 break; -134 case PROCEDURE_WAL_DELETE: -135 readDeleteEntry(entry); -136 break; -137 case PROCEDURE_WAL_EOF: -138 hasMore = false; -139 break; -140 default: -141 throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry); -142 } -143 } -144 } catch (IOException e) { -145 LOG.error("got an exception while reading the procedure WAL: " + log, e); -146 loader.markCorruptedWAL(log, e); -147 } -148 -149 if (localProcedureMap.isEmpty()) { -150 LOG.info("No active entry found in state log " + log + ". removing it"); -151 loader.removeLog(log); -152 } else { -153 log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId()); -154 procedureMap.mergeTail(localProcedureMap); -155 //if (hasFastStartSupport) { -156 // TODO: Some procedure may be already runnables (see readInitEntry()) -157 // (we can also check the "update map" in the log trackers) -158 // -------------------------------------------------- -159 //EntryIterator iter = procedureMap.fetchReady(); -160 //if (iter != null) loader.load(iter); -161 // -------------------------------------------------- -162 //} -163 } -164 } -165 -166 public void finalize(ProcedureWALFormat.Loader loader) throws IOException { -167 // notify the loader about the max proc ID -168 loader.setMaxProcId(maxProcId); -169 -170 // fetch the procedure ready to run. -171 ProcedureIterator procIter = procedureMap.fetchReady(); -172 if (procIter != null) loader.load(procIter); -173 -174 // remaining procedures have missing link or dependencies -175 // consider them as corrupted, manual fix is probably required. -176 procIter = procedureMap.fetchAll(); -177 if (procIter != null) loader.handleCorrupted(procIter); -178 } -179 -180 private void loadProcedure(final ProcedureWALEntry entry, final ProcedureProtos.Procedure proc) { -181 maxProcId = Math.max(maxProcId, proc.getProcId()); -182 if (isRequired(proc.getProcId())) { -183 if (LOG.isTraceEnabled()) { -184 LOG.trace("read " + entry.getType() + " entry " + proc.getProcId()); -185 } -186 localProcedureMap.add(proc); -187 tracker.setDeleted(proc.getProcId(), false); -188 } -189 } -190 -191 private void readInitEntry(final ProcedureWALEntry entry) -192 throws IOException { -193 assert entry.getProcedureCount() == 1 : "Expected only one procedure"; -194 loadProcedure(entry, entry.getProcedure(0)); -195 } -196 -197 private void readInsertEntry(final ProcedureWALEntry entry) throws IOException { -198 assert entry.getProcedureCount() >= 1 : "Expected one or more procedures"; -199 loadProcedure(entry, entry.getProcedure(0)); -200 for (int i = 1; i < entry.getProcedureCount(); ++i) { -201 loadProcedure(entry, entry.getProcedure(i)); -202 } -203 } -204 -205 private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException { -206 assert entry.getProcedureCount() == 1 : "Expected only one procedure"; -207 loadProcedure(entry, entry.getProcedure(0)); -208 } -209 -210 private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException { -211 assert entry.getProcedureCount() == 0 : "Expected no procedures"; -212 assert entry.hasProcId() : "expected ProcID"; -213 if (LOG.isTraceEnabled()) { -214 LOG.trace("read delete entry " + entry.getProcId()); -215 } -216 maxProcId = Math.max(maxProcId, entry.getProcId()); -217 localProcedureMap.remove(entry.getProcId()); -218 tracker.setDeleted(entry.getProcId(), true); -219 } -220 -221 private boolean isDeleted(final long procId) { -222 return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES; -223 } -224 -225 private boolean isRequired(final long procId) { -226 return !isDeleted(procId) && !procedureMap.contains(procId); -227 } -228 -229 // ========================================================================== -230 // We keep an in-memory map of the procedures sorted by replay order. -231 // (see the details in the beginning of the file) -232 // _______________________________________________ -233 // procedureMap = | A | | E | | C | | | | | G | | | -234 // D B -235 // replayOrderHead = C <-> B <-> E <-> D <-> A <-> G -236 // -237 // We also have a lazy grouping by "root procedure", and a list of -238 // unlinked procedure. If after reading all the WALs we have unlinked -239 // procedures it means that we had a missing WAL or a corruption. -240 // rootHead = A <-> D <-> G -241 // B E -242 // C -243 // unlinkFromLinkList = None -244 // ========================================================================== -245 private static class Entry { -246 // hash-table next -247 protected Entry hashNext; -248 // child head -249 protected Entry childHead; -250 // double-link for rootHead or childHead -251 protected Entry linkNext; -252 protected Entry linkPrev; -253 // replay double-linked-list -254 protected Entry replayNext; -255 protected Entry replayPrev; -256 // procedure-infos -257 protected Procedure procedure; -258 protected ProcedureProtos.Procedure proto; -259 protected boolean ready = false; -260 -261 public Entry(Entry hashNext) { this.hashNext = hashNext; } -262 -263 public long getProcId() { return proto.getProcId(); } -264 public long getParentId() { return proto.getParentId(); } -265 public boolean hasParent() { return proto.hasParentId(); } -266 public boolean isReady() { return ready; } -267 -268 public Procedure convert() throws IOException { -269 if (procedure == null) { -270 procedure = Procedure.convert(proto); -271 } -272 return procedure; -273 } -274 -275 @Override -276 public String toString() { -277 return "Entry(" + getProcId() + ", parentId=" + getParentId() + ")"; -278 } -279 } +025import org.apache.hadoop.fs.FSDataInputStream; +026import org.apache.hadoop.hbase.classification.InterfaceAudience; +027import org.apache.hadoop.hbase.classification.InterfaceStability; +028import org.apache.hadoop.hbase.ProcedureInfo; +029import org.apache.hadoop.hbase.procedure2.Procedure; +030import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; +031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; +032import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; +033import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry; +034 +035/** +036 * Helper class that loads the procedures stored in a WAL +037 */ +038@InterfaceAudience.Private +039@InterfaceStability.Evolving +040public class ProcedureWALFormatReader { +041 private static final Log LOG = LogFactory.getLog(ProcedureWALFormatReader.class); +042 +043 // ============================================================================================== +044 // We read the WALs in reverse order. from the newest to the oldest. +045 // We have different entry types: +046 // - INIT: Procedure submitted by the user (also known as 'root procedure') +047 // - INSERT: Children added to the procedure <parentId>:[<childId>, ...] +048 // - UPDATE: The specified procedure was updated +049 // - DELETE: The procedure was removed (completed/rolledback and result TTL expired) +050 // +051 // In the WAL we can find multiple times the same procedure as UPDATE or INSERT. +052 // We read the WAL from top to bottom, so every time we find an entry of the +053 // same procedure, that will be the "latest" update. +054 // +055 // We keep two in-memory maps: +056 // - localProcedureMap: is the map containing the entries in the WAL we are processing +057 // - procedureMap: is the map containing all the procedures we found up to the WAL in process. +058 // localProcedureMap is merged with the procedureMap once we reach the WAL EOF. +059 // +060 // Since we are reading the WALs in reverse order (newest to oldest), +061 // if we find an entry related to a procedure we already have in 'procedureMap' we can discard it. +062 // +063 // The WAL is append-only so the last procedure in the WAL is the one that +064 // was in execution at the time we crashed/closed the server. +065 // given that, the procedure replay order can be inferred by the WAL order. +066 // +067 // Example: +068 // WAL-2: [A, B, A, C, D] +069 // WAL-1: [F, G, A, F, B] +070 // Replay-Order: [D, C, A, B, F, G] +071 // +072 // The "localProcedureMap" keeps a "replayOrder" list. Every time we add the +073 // record to the map that record is moved to the head of the "replayOrder" list. +074 // Using the example above: +075 // WAL-2 localProcedureMap.replayOrder is [D, C, A, B] +076 // WAL-1 localProcedureMap.replayOrder is [F, G] +077 // +078 // each time we reach the WAL-EOF, the "replayOrder" list is merged/appended in 'procedureMap' +079 // so using the example above we end up with: [D, C, A, B] + [F, G] as replay order. +080 // +081 // Fast Start: INIT/INSERT record and StackIDs +082 // --------------------------------------------- +083 // We have two special record, INIT and INSERT that tracks the first time +084 // the procedure was added to the WAL. We can use that information to be able +085 // to start procedures before reaching the end of the WAL, or before reading all the WALs. +086 // but in some cases the WAL with that record can be already gone. +087 // In alternative we can use the stackIds on each procedure, +088 // to identify when a procedure is ready to start. +089 // If there are gaps in the sum of the stackIds we need to read more WALs. +090 // +091 // Example (all procs child of A): +092 // WAL-2: [A, B] A stackIds = [0, 4], B stackIds = [1, 5] +093 // WAL-1: [A, B, C, D] +094 // +095 // In the case above we need to read one more WAL to be able to consider +096 // the root procedure A and all children as ready. +097 // ============================================================================================== +098 private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024); +099 private final WalProcedureMap procedureMap = new WalProcedureMap(1024); +100 +101 //private long compactionLogId; +102 private long maxProcId = 0; +103 +104 private final ProcedureStoreTracker tracker; +105 private final boolean hasFastStartSupport; +106 +107 public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) { +108 this.tracker = tracker; +109 // we support fast-start only if we have a clean shutdown. +110 this.hasFastStartSupport = !tracker.isEmpty(); +111 } +112 +113 public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException { +114 FSDataInputStream stream = log.getStream(); +115 try { +116 boolean hasMore = true; +117 while (hasMore) { +118 ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream); +119 if (entry == null) { +120 LOG.warn("nothing left to decode. exiting with missing EOF"); +121 hasMore = false; +122 break; +123 } +124 switch (entry.getType()) { +125 case PROCEDURE_WAL_INIT: +126 readInitEntry(entry); +127 break; +128 case PROCEDURE_WAL_INSERT: +129 readInsertEntry(entry); +130 break; +131 case PROCEDURE_WAL_UPDATE: +132 case PROCEDURE_WAL_COMPACT: +133 readUpdateEntry(entry); +134 break; +135 case PROCEDURE_WAL_DELETE: +136 readDeleteEntry(entry); +137 break; +138 case PROCEDURE_WAL_EOF: +139 hasMore = false; +140 break; +141 default: +142 throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry); +143 } +144 } +145 } catch (IOException e) { +146 LOG.error("got an exception while reading the procedure WAL: " + log, e); +147 loader.markCorruptedWAL(log, e); +148 } +149 +150 if (!localProcedureMap.isEmpty()) { +151 log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId()); +152 procedureMap.mergeTail(localProcedureMap); +153 //if (hasFastStartSupport) { +154 // TODO: Some procedure may be already runnables (see readInitEntry()) +155 // (we can also check the "update map" in the log trackers) +156 // -------------------------------------------------- +157 //EntryIterator iter = procedureMap.fetchReady(); +158 //if (iter != null) loader.load(iter); +159 // -------------------------------------------------- +160 //} +161 } +162 } +163 +164 public void finalize(ProcedureWALFormat.Loader loader) throws IOException { +165 // notify the loader about the max proc ID +166 loader.setMaxProcId(maxProcId); +167 +168 // fetch the procedure ready to run. +169 ProcedureIterator procIter = procedureMap.fetchReady(); +170 if (procIter != null) loader.load(procIter); +171 +172 // remaining procedures have missing link or dependencies +173 // consider them as corrupted, manual fix is probably required. +174 procIter = procedureMap.fetchAll(); +175 if (procIter != null) loader.handleCorrupted(procIter); +176 } +177 +178 private void loadProcedure(final ProcedureWALEntry entry, final ProcedureProtos.Procedure proc) { +179 maxProcId = Math.max(maxProcId, proc.getProcId()); +180 if (isRequired(proc.getProcId())) { +181 if (LOG.isTraceEnabled()) { +182 LOG.trace("read " + entry.getType() + " entry " + proc.getProcId()); +183 } +184 localProcedureMap.add(proc); +185 tracker.setDeleted(proc.getProcId(), false); +186 } +187 } +188 +189 private void readInitEntry(final ProcedureWALEntry entry) +190 throws IOException { +191 assert entry.getProcedureCount() == 1 : "Expected only one procedure"; +192 loadProcedure(entry, entry.getProcedure(0)); +193 } +194 +195 private void readInsertEntry(final ProcedureWALEntry entry) throws IOException { +196 assert entry.getProcedureCount() >= 1 : "Expected one or more procedures"; +197 loadProcedure(entry, entry.getProcedure(0)); +198 for (int i = 1; i < entry.getProcedureCount(); ++i) { +199 loadProcedure(entry, entry.getProcedure(i)); +200 } +201 } +202 +203 private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException { +204 assert entry.getProcedureCount() == 1 : "Expected only one procedure"; +205 loadProcedure(entry, entry.getProcedure(0)); +206 } +207 +208 private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException { +209 assert entry.getProcedureCount() == 0 : "Expected no procedures"; +210 assert entry.hasProcId() : "expected ProcID"; +211 if (LOG.isTraceEnabled()) { +212 LOG.trace("read delete entry " + entry.getProcId()); +213 } +214 maxProcId = Math.max(maxProcId, entry.getProcId()); +215 localProcedureMap.remove(entry.getProcId()); +216 assert !procedureMap.contains(entry.getProcId()); +217 tracker.setDeleted(entry.getProcId(), true); +218 } +219 +220 private boolean isDeleted(final long procId) { +221 return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES; +222 } +223 +224 private boolean isRequired(final long procId) { +225 return !isDeleted(procId) && !procedureMap.contains(procId); +226 } +227 +228 // ========================================================================== +229 // We keep an in-memory map of the procedures sorted by replay order. +230 // (see the details in the beginning of the file) +231 // _______________________________________________ +232 // procedureMap = | A | | E | | C | | | | | G | | | +233 // D B +234 // replayOrderHead = C <-> B <-> E <-> D <-> A <-> G +235 // +236 // We also have a lazy grouping by "root procedure", and a list of +237 // unlinked procedure. If after reading all the WALs we have unlinked +238 // procedures it means that we had a missing WAL or a corruption. +239 // rootHead = A <-> D <-> G +240 // B E +241 // C +242 // unlinkFromLinkList = None +243 // ========================================================================== +244 private static class Entry { +245 // hash-table next +246 protected Entry hashNext; +247 // child head +248 protected Entry childHead; +249 // double-link for rootHead or childHead +250 protected Entry linkNext; +251 protected Entry linkPrev; +252 // replay double-linked-list +253 protected Entry replayNext; +254 protected Entry replayPrev; +255 // procedure-infos +256 protected Procedure procedure; +257 protected ProcedureProtos.Procedure proto; +258 protected boolean ready = false; +259 +260 public Entry(Entry hashNext) { this.hashNext = hashNext; } +261 +262 public long getProcId() { return proto.getProcId(); } +263 public long getParentId() { return proto.getParentId(); } +264 public boolean hasParent() { return proto.hasParentId(); } +265 public boolean isReady() { return ready; } +266 +267 public boolean isCompleted() { +268 if (!hasParent()) { +269 switch (proto.getState()) { +270 case ROLLEDBACK: +271 return true; +272 case FINISHED: +273 return !proto.hasException(); +274 default: +275 break; +276 } +277 } +278 return false; +279 } 280 -281 private static class EntryIterator implements ProcedureIterator { -282 private final Entry replayHead; -283 private Entry current; -284 -285 public EntryIterator(Entry replayHead) { -286 this.replayHead = replayHead; -287 this.current = replayHead; -288 } -289 -290 @Override -291 public void reset() { -292 this.current = replayHead; -293 } -294 -295 @Override -296 public boolean hasNext() { -297 return current != null; -298 } -299 -300 @Override -301 public Procedure next() throws IOException { -302 try { -303 return current.convert(); -304 } finally { -305 current = current.replayNext; -306 } -307 } -308 } -309 -310 private static class WalProcedureMap { -311 // procedure hash table -312 private Entry[] procedureMap; -313 -314 // replay-order double-linked-list -315 private Entry replayOrderHead; -316 private Entry replayOrderTail; -317 -318 // root linked-list -319 private Entry rootHead; -320 -321 // pending unlinked children (root not present yet) -322 private Entry childUnlinkedHead; -323 -324 // Track ProcId range -325 private long minProcId = Long.MAX_VALUE; -326 private long maxProcId = Long.MIN_VALUE; -327 -328 public WalProcedureMap(int size) { -329 procedureMap = new Entry[size]; -330 replayOrderHead = null; -331 replayOrderTail = null; -332 rootHead = null; -333 childUnlinkedHead = null; +281 public Procedure convert() throws IOException { +282 if (procedure == null) { +283 procedure = Procedure.convert(proto); +284 } +285 return procedure; +286 } +287 +288 public ProcedureInfo convertToInfo() { +289 return ProcedureInfo.convert(proto); +290 } +291 +292 @Override +293 public String toString() { +294 return "Entry(" + getProcId() + ", parentId=" + getParentId() + ")"; +295 } +296 } +297 +298 private static class EntryIterator implements ProcedureIterator { +299 private final Entry replayHead; +300 private Entry current; +301 +302 public EntryIterator(Entry replayHead) { +303 this.replayHead = replayHead; +304 this.current = replayHead; +305 } +306 +307 @Override +308 public void reset() { +309 this.current = replayHead; +310 } +311 +312 @Override +313 public boolean hasNext() { +314 return current != null; +315 } +316 +317 @Override +318 public boolean isNextCompleted() { +319 return current != null && current.isCompleted(); +320 } +321 +322 @Override +323 public void skipNext() { +324 current = current.replayNext; +325 } +326 +327 @Override +328 public Procedure nextAsProcedure() throws IOException { +329 try { +330 return current.convert(); +331 } finally { +332 current = current.replayNext; +333 } 334 } 335 -336 public void add(ProcedureProtos.Procedure procProto) { -337 trackProcIds(procProto.getProcId()); -338 Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId()); -339 boolean isNew = entry.proto == null; -340 entry.proto = procProto; -341 addToReplayList(entry); -342 -343 if (isNew) { -344 if (procProto.hasParentId()) { -345 childUnlinkedHead = addToLinkList(entry, childUnlinkedHead); -346 } else { -347 rootHead = addToLinkList(entry, rootHead); -348 } -349 } -350 } -351 -352 public boolean remove(long procId) { -353 trackProcIds(procId); -354 Entry entry = removeFromMap(procId); -355 if (entry != null) { -356 unlinkFromReplayList(entry); -357 unlinkFromLinkList(entry); -358 return true; -359 } -360 return false; -361 } -362 -363 private void trackProcIds(long procId) { -364 minProcId = Math.min(minProcId, procId); -365 maxProcId = Math.max(maxProcId, procId); -366 } -367 -368 public long getMinProcId() { -369 return minProcId; +336 @Override +337 public ProcedureInfo nextAsProcedureInfo() { +338 try { +339 return current.convertToInfo(); +340 } finally { +341 current = current.replayNext; +342 } +343 } +344 } +345 +346 private static class WalProcedureMap { +347 // procedure hash table +348 private Entry[] procedureMap; +349 +350 // replay-order double-linked-list +351 private Entry replayOrderHead; +352 private Entry replayOrderTail; +353 +354 // root linked-list +355 private Entry rootHead; +356 +357 // pending unlinked children (root not present yet) +358 private Entry childUnlinkedHead; +359 +360 // Track ProcId range +361 private long minProcId = Long.MAX_VALUE; +362 private long maxProcId = Long.MIN_VALUE; +363 +364 public WalProcedureMap(int size) { +365 procedureMap = new Entry[size]; +366 replayOrderHead = null; +367 replayOrderTail = null; +368 rootHead = null; +369 childUnlinkedHead = null; 370 } 371 -372 public long getMaxProcId() { -373 return maxProcId; -374 } -375 -376 public boolean contains(long procId) { -377 return getProcedure(procId) != null; -378 } -379 -380 public boolean isEmpty() { -381 return replayOrderHead == null; -382 } -383 -384 public void clear() { -385 for (int i = 0; i < procedureMap.length; ++i) { -386 procedureMap[i] = null; -387 } -388 replayOrderHead = null; -389 replayOrderTail = null; -390 rootHead = null; -391 childUnlinkedHead = null; -392 minProcId = Long.MAX_VALUE; -393 maxProcId = Long.MIN_VALUE; -394 } -395 -396 /* -397 * Merges two WalProcedureMap, -398 * the target is the "global" map, the source is the "local" map. -399 * - The entries in the hashtables are guaranteed to be unique. -400 * On replay we don't load procedures that already exist in the "global" -401 * map (the one we are merging the "local" in to). -402 * - The replayOrderList of the "local" nao will be appended to the "global" -403 * map replay list. -404 * - The "local" map will be cleared at the end of the operation. -405 */ -406 public void mergeTail(WalProcedureMap other) { -407 for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) { -408 int slotIndex = getMapSlot(p.getProcId()); -409 p.hashNext = procedureMap[slotIndex]; -410 procedureMap[slotIndex] = p; -411 } -412 -413 if (replayOrderHead == null) { -414 replayOrderHead = other.replayOrderHead; -415 replayOrderTail = other.replayOrderTail; -416 rootHead = other.rootHead; -417 childUnlinkedHead = other.childUnlinkedHead; -418 } else { -419 // append replay list -420 assert replayOrderTail.replayNext == null; -421 assert other.replayOrderHead.replayPrev == null; -422 replayOrderTail.replayNext = other.replayOrderHead; -423 other.replayOrderHead.replayPrev = replayOrderTail; -424 replayOrderTail = other.replayOrderTail; -425 -426 // merge rootHead -427 if (rootHead == null) { -428 rootHead = other.rootHead; -429 } else if (other.rootHead != null) { -430 Entry otherTail = findLinkListTail(other.rootHead); -431 otherTail.linkNext = rootHead; -432 rootHead.linkPrev = otherTail; -433 rootHead = other.rootHead; -434 } -435 -436 // merge childUnlinkedHead -437 if (childUnlinkedHead == null) { -438 childUnlinkedHead = other.childUnlinkedHead; -439 } else if (other.childUnlinkedHead != null) { -440 Entry otherTail = findLinkListTail(other.childUnlinkedHead); -441 otherTail.linkNext = childUnlinkedHead; -442 childUnlinkedHead.linkPrev = otherTail; -443 childUnlinkedHead = other.childUnlinkedHead; -444 } -445 } -446 -447 other.clear(); -448 } -449 -450 /* -451 * Returns an EntryIterator with the list of procedures ready -452 * to be added to the executor. -453 * A Procedure is ready if its children and parent are ready. -454 */ -455 public EntryIterator fetchReady() { -456 buildGraph(); -457 -458 Entry readyHead = null; -459 Entry readyTail = null; -460 Entry p = replayOrderHead; -461 while (p != null) { -462 Entry next = p.replayNext; -463 if (p.isReady()) { -464 unlinkFromReplayList(p); -465 if (readyTail != null) { -466 readyTail.replayNext = p; -467 p.replayPrev = readyTail; -468 } else { -469 p.replayPrev = null; -470 readyHead = p; -471 } -472 readyTail = p; -473 p.replayNext = null; -474 } -475 p = next; -476 } -477 // we need the hash-table lookups for parents, so this must be done -478 // out of the loop where we check isReadyToRun() -479 for (p = readyHead; p != null; p = p.replayNext) { -480 removeFromMap(p.getProcId()); -481 unlinkFromLinkList(p); -482 } -483 return readyHead != null ? new EntryIterator(readyHead) : null; +372 public void add(ProcedureProtos.Procedure procProto) { +373 trackProcIds(procProto.getProcId()); +374 Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId()); +375 boolean isNew = entry.proto == null; +376 entry.proto = procProto; +377 addToReplayList(entry); +378 +379 if (isNew) { +380 if (procProto.hasParentId()) { +381 childUnlinkedHead = addToLinkList(entry, childUnlinkedHead); +382 } else { +383 rootHead = addToLinkList(entry, rootHead); +384 } +385 } +386 } +387 +388 public boolean remove(long procId) { +389 trackProcIds(procId); +390 Entry entry = removeFromMap(procId); +391 if (entry != null) { +392 unlinkFromReplayList(entry); +393 unlinkFromLinkList(entry); +394 return true; +395 } +396 return false; +397 } +398 +399 private void trackProcIds(long procId) { +400 minProcId = Math.min(minProcId, procId); +401 maxProcId = Math.max(maxProcId, procId); +402 } +403 +404 public long getMinProcId() { +405 return minProcId; +406 } +407 +408 public long getMaxProcId() { +409 return maxProcId; +410 } +411 +412 public boolean contains(long procId) { +413 return getProcedure(procId) != null; +414 } +415 +416 public boolean isEmpty() { +417 return replayOrderHead == null; +418 } +419 +420 public void clear() { +421 for (int i = 0; i < procedureMap.length; ++i) { +422 procedureMap[i] = null; +423 } +424 replayOrderHead = null; +425 replayOrderTail = null; +426 rootHead = null; +427 childUnlinkedHead = null; +428 minProcId = Long.MAX_VALUE; +429 maxProcId = Long.MIN_VALUE; +430 } +431 +432 /* +433 * Merges two WalProcedureMap, +434 * the target is the "global" map, the source is the "local" map. +435 * - The entries in the hashtables are guaranteed to be unique. +436 * On replay we don't load procedures that already exist in the "global" +437 * map (the one we are merging the "local" in to). +438 * - The replayOrderList of the "local" nao will be appended to the "global" +439 * map replay list. +440 * - The "local" map will be cleared at the end of the operation. +441 */ +442 public void mergeTail(WalProcedureMap other) { +443 for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) { +444 int slotIndex = getMapSlot(p.getProcId()); +445 p.hashNext = procedureMap[slotIndex]; +446 procedureMap[slotIndex] = p; +447 } +448 +449 if (replayOrderHead == null) { +450 replayOrderHead = other.replayOrderHead; +451 replayOrderTail = other.replayOrderTail; +452 rootHead = other.rootHead; +453 childUnlinkedHead = other.childUnlinkedHead; +454 } else { +455 // append replay list +456 assert replayOrderTail.replayNext == null; +457 assert other.replayOrderHead.replayPrev == null; +458 replayOrderTail.replayNext = other.replayOrderHead; +459 other.replayOrderHead.replayPrev = replayOrderTail; +460 replayOrderTail = other.replayOrderTail; +461 +462 // merge rootHead +463 if (rootHead == null) { +464 rootHead = other.rootHead; +465 } else if (other.rootHead != null) { +466 Entry otherTail = findLinkListTail(other.rootHead); +467 otherTail.linkNext = rootHead; +468 rootHead.linkPrev = otherTail; +469 rootHead = other.rootHead; +470 } +471 +472 // merge childUnlinkedHead +473 if (childUnlinkedHead == null) { +474 childUnlinkedHead = other.childUnlinkedHead; +475 } else if (other.childUnlinkedHead != null) { +476 Entry otherTail = findLinkListTail(other.childUnlinkedHead); +477 otherTail.linkNext = childUnlinkedHead; +478 childUnlinkedHead.linkPrev = otherTail; +479 childUnlinkedHead = other.childUnlin