From 37485d27b47311566ab8a6d0c7f1fc3e3f77e2c9 Mon Sep 17 00:00:00 2001 From: Sreenath Bodagala Date: Wed, 23 Oct 2024 19:36:21 +0000 Subject: [PATCH 1/6] - Extend the disk queue entry data structure to hold the following information, needed by the recovery algorithm when unicast is enabled: - PrevVersion of a version - List of log servers that a commit proxy sends a commit version to And, extend the code to populate "unknownCommittedVersions" list (the list of commit versions whose commit status is not known, and to be determined by the recovery version computation algorithm) on a log server restart. NOTE: Please note that these changes will cause version incompatibility and so additional code/logic will need to be added to make sure that upgrades/restart related simulation tests work properly. --- fdbserver/TLogServer.actor.cpp | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 4be6ffb613b..e129910e6dc 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -53,17 +53,19 @@ struct TLogQueueEntryRef { UID id; Version version; Version knownCommittedVersion; + Version prevVersion; + std::vector tLogLocIds; StringRef messages; - TLogQueueEntryRef() : version(0), knownCommittedVersion(0) {} + TLogQueueEntryRef() : version(0), knownCommittedVersion(0), prevVersion(0) {} TLogQueueEntryRef(Arena& a, TLogQueueEntryRef const& from) : id(from.id), version(from.version), knownCommittedVersion(from.knownCommittedVersion), - messages(a, from.messages) {} + messages(a, from.messages), prevVersion(from.prevVersion), tLogLocIds(from.tLogLocIds) {} // To change this serialization, ProtocolVersion::TLogQueueEntryRef must be updated, and downgrades need to be // considered template void serialize(Ar& ar) { - serializer(ar, version, messages, knownCommittedVersion, id); + serializer(ar, version, messages, knownCommittedVersion, prevVersion, tLogLocIds, id); } size_t expectedSize() const { return messages.expectedSize(); } }; @@ -73,8 +75,11 @@ struct AlternativeTLogQueueEntryRef { Version version; Version knownCommittedVersion; std::vector* alternativeMessages; + Version prevVersion; + std::vector tLogLocIds; - AlternativeTLogQueueEntryRef() : version(0), knownCommittedVersion(0), alternativeMessages(nullptr) {} + AlternativeTLogQueueEntryRef() + : version(0), knownCommittedVersion(0), alternativeMessages(nullptr), prevVersion(0) {} template void serialize(Ar& ar) { @@ -84,7 +89,7 @@ struct AlternativeTLogQueueEntryRef { for (auto& msg : *alternativeMessages) { ar.serializeBytes(msg.message); } - serializer(ar, knownCommittedVersion, id); + serializer(ar, knownCommittedVersion, prevVersion, tLogLocIds, id); } uint32_t expectedSize() const { @@ -2430,6 +2435,8 @@ ACTOR Future tLogCommit(TLogData* self, qe.knownCommittedVersion = logData->knownCommittedVersion; qe.messages = req.messages; qe.id = logData->logId; + qe.prevVersion = req.seqPrevVersion; + qe.tLogLocIds = req.tLogLocIds; self->persistentQueue->push(qe, logData); self->diskQueueCommitBytes += qe.expectedSize(); @@ -3430,6 +3437,16 @@ ACTOR Future restorePersistentState(TLogData* self, } logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, qe.knownCommittedVersion); + + if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) { + logData->unknownCommittedVersions.emplace_front(qe.version, qe.prevVersion, qe.tLogLocIds); + + while (!logData->unknownCommittedVersions.empty() && + logData->unknownCommittedVersions.back().version <= logData->knownCommittedVersion) { + logData->unknownCommittedVersions.pop_back(); + } + } + if (qe.version > logData->version.get()) { commitMessages(self, logData, qe.version, qe.arena(), qe.messages); logData->version.set(qe.version); From 0b7fbaeea2667d38d2cb8109c2a949d96583c912 Mon Sep 17 00:00:00 2001 From: Sreenath Bodagala Date: Wed, 13 Nov 2024 18:44:09 +0000 Subject: [PATCH 2/6] - On restart, make log servers read disk queue entries from known committed version onwards in the case where the known committed version is behind LogData::persistentDataDurableVersion (and version vector unicast is enabled). This is so we will populate "LogData::unknownCommittedVersions", on log server restart, with all versions that are needed by unicast recovery. --- fdbserver/TLogServer.actor.cpp | 53 +++++++++++++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index e129910e6dc..d3f24a5913c 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -222,6 +222,8 @@ static const KeyRangeRef persistTxsTagsKeys = KeyRangeRef("TxsTags/"_sr, "TxsTag static const KeyRange persistTagMessagesKeys = prefixRange("TagMsg/"_sr); static const KeyRange persistTagMessageRefsKeys = prefixRange("TagMsgRef/"_sr); static const KeyRange persistTagPoppedKeys = prefixRange("TagPop/"_sr); +static const KeyRef persistUnicastRecoveryLocationKey = KeyRef("UnicastRecoveryLocation"_sr); +static const KeyRef persistSpillTargetLogDataIdKey = KeyRef("SpillTargetLogDataId"_sr); static const KeyRef persistEncryptionAtRestModeKey = "encryptionAtRestMode"_sr; @@ -736,6 +738,10 @@ struct LogData : NonCopyable, public ReferenceCounted { tLogData->persistentData->clear(KeyRangeRef(msgRefKey, strinc(msgRefKey))); Key poppedKey = logIdKey.withPrefix(persistTagPoppedKeys.begin); tLogData->persistentData->clear(KeyRangeRef(poppedKey, strinc(poppedKey))); + if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) { + tLogData->persistentData->clear(singleKeyRange(logIdKey.withPrefix(persistUnicastRecoveryLocationKey))); + tLogData->persistentData->clear(singleKeyRange(logIdKey.withPrefix(persistSpillTargetLogDataIdKey))); + } } for (auto it = peekTracker.begin(); it != peekTracker.end(); ++it) { @@ -1113,6 +1119,17 @@ ACTOR Future updatePersistentData(TLogData* self, Reference logDa KeyValueRef(persistRecoveryLocationKey, BinaryWriter::toValue(locationIter->value.first, Unversioned()))); } + if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) { + auto kcvLocationIter = logData->versionLocation.lastLessOrEqual(logData->knownCommittedVersion); + if (kcvLocationIter != logData->versionLocation.end()) { + self->persistentData->set(KeyValueRef(persistUnicastRecoveryLocationKey, + BinaryWriter::toValue(kcvLocationIter->value.first, Unversioned()))); + } + + self->persistentData->set( + KeyValueRef(persistSpillTargetLogDataIdKey, BinaryWriter::toValue(logData->logId, Unversioned()))); + } + self->persistentData->set( KeyValueRef(BinaryWriter::toValue(logData->logId, Unversioned()).withPrefix(persistCurrentVersionKeys.begin), BinaryWriter::toValue(newPersistentDataVersion, Unversioned()))); @@ -2446,7 +2463,9 @@ ACTOR Future tLogCommit(TLogData* self, // Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors logData->version.set(req.version); if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) { + ASSERT(req.tLogCount == req.tLogLocIds.size()); logData->unknownCommittedVersions.emplace_front(req.version, req.seqPrevVersion, req.tLogLocIds); + while (!logData->unknownCommittedVersions.empty() && logData->unknownCommittedVersions.back().version <= req.knownCommittedVersion) { logData->unknownCommittedVersions.pop_back(); @@ -3065,6 +3084,7 @@ ACTOR Future pullAsyncData(TLogData* self, qe.knownCommittedVersion = logData->knownCommittedVersion; qe.messages = StringRef(); qe.id = logData->logId; + qe.prevVersion = 0; self->persistentQueue->push(qe, logData); self->diskQueueCommitBytes += qe.expectedSize(); @@ -3220,10 +3240,13 @@ ACTOR Future restorePersistentState(TLogData* self, state Future fRecoverCounts = storage->readRange(persistRecoveryCountKeys); state Future fProtocolVersions = storage->readRange(persistProtocolVersionKeys); state Future fTLogSpillTypes = storage->readRange(persistTLogSpillTypeKeys); + state Future> fUnicastRecoveryLocation = storage->readValue(persistUnicastRecoveryLocationKey); + state Future> fSpillTargetLogDataId = storage->readValue(persistSpillTargetLogDataIdKey); // FIXME: metadata in queue? - wait(waitForAll(std::vector{ fFormat, fRecoveryLocation, fEncryptionAtRestMode })); + wait(waitForAll(std::vector{ + fFormat, fRecoveryLocation, fEncryptionAtRestMode, fUnicastRecoveryLocation, fSpillTargetLogDataId })); wait(waitForAll(std::vector{ fVers, fKnownCommitted, fLocality, @@ -3392,6 +3415,22 @@ ACTOR Future restorePersistentState(TLogData* self, } } + state Optional spillTargetLogDataId; + if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST && fUnicastRecoveryLocation.get().present() && + fSpillTargetLogDataId.get().present()) { + spillTargetLogDataId = BinaryReader::fromStringRef(fSpillTargetLogDataId.get().get(), Unversioned()); + auto iter = self->id_data.find(spillTargetLogDataId.get()); + if (iter != self->id_data.end()) { + Reference spillTargetLogData = iter->second; + if (spillTargetLogData->knownCommittedVersion < spillTargetLogData->persistentDataDurableVersion) { + minimumRecoveryLocation = BinaryReader::fromStringRef( + fUnicastRecoveryLocation.get().get(), Unversioned()); + } + } else { + spillTargetLogDataId.reset(); + } + } + std::sort(logsByVersion.begin(), logsByVersion.end()); for (const auto& pair : logsByVersion) { // TLogs that have been fully spilled won't have queue entries read in the loop below. @@ -3432,6 +3471,17 @@ ACTOR Future restorePersistentState(TLogData* self, // logData->version.get()); if (logData) { + if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST && spillTargetLogDataId.present() && + qe.id == spillTargetLogDataId.get() && qe.version < logData->persistentDataDurableVersion) { + logData->unknownCommittedVersions.emplace_front(qe.version, qe.prevVersion, qe.tLogLocIds); + + while (!logData->unknownCommittedVersions.empty() && + logData->unknownCommittedVersions.back().version <= logData->knownCommittedVersion) { + logData->unknownCommittedVersions.pop_back(); + } + continue; + } + if (!self->spillOrder.size() || self->spillOrder.back() != qe.id) { self->spillOrder.push_back(qe.id); } @@ -3724,6 +3774,7 @@ ACTOR Future tLogStart(TLogData* self, InitializeTLogRequest req, Locality qe.knownCommittedVersion = logData->knownCommittedVersion; qe.messages = StringRef(); qe.id = logData->logId; + qe.prevVersion = 0; self->persistentQueue->push(qe, logData); self->diskQueueCommitBytes += qe.expectedSize(); From c6f99e29bec4b64159dceeb0620f13e5f7be958e Mon Sep 17 00:00:00 2001 From: Sreenath Bodagala Date: Thu, 5 Dec 2024 20:17:00 +0000 Subject: [PATCH 3/6] If version vector is enabled then preserve all versions from "knownCommittedVersion" onwards in log server's disk queue. --- fdbserver/TLogServer.actor.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index d3f24a5913c..792a9b927d3 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -976,10 +976,19 @@ ACTOR Future popDiskQueue(TLogData* self, Reference logData) { IDiskQueue::location minLocation = 0; Version minVersion = 0; auto locationIter = logData->versionLocation.lower_bound(logData->persistentDataVersion); + // If version vector is enabled then we need to preserve all versions from "knownCommittedVersion" + // onwards (for recovery purpose). Adjust the iterator position accordingly. + if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST && + logData->knownCommittedVersion < logData->persistentDataVersion) { + locationIter = logData->versionLocation.lastLessOrEqual(logData->knownCommittedVersion); + } if (locationIter != logData->versionLocation.end()) { minLocation = locationIter->value.first; minVersion = locationIter->key; } + if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) { + ASSERT_WE_THINK(minVersion <= logData->knownCommittedVersion); + } logData->minPoppedTagVersion = std::numeric_limits::max(); for (int tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) { From 4680e47ad26bd6e5d0b95d17afaa11db723796a9 Mon Sep 17 00:00:00 2001 From: Sreenath Bodagala Date: Thu, 5 Dec 2024 20:59:15 +0000 Subject: [PATCH 4/6] - Address a review comment --- fdbserver/TLogServer.actor.cpp | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 792a9b927d3..df056197aef 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -789,6 +789,12 @@ struct LogData : NonCopyable, public ReferenceCounted { stoppedPromise.send(Void()); } } + + void purgeUnknownCommittedVersions(Version upToVersion) { + while (!unknownCommittedVersions.empty() && unknownCommittedVersions.back().version <= upToVersion) { + unknownCommittedVersions.pop_back(); + } + } }; template @@ -2474,11 +2480,8 @@ ACTOR Future tLogCommit(TLogData* self, if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) { ASSERT(req.tLogCount == req.tLogLocIds.size()); logData->unknownCommittedVersions.emplace_front(req.version, req.seqPrevVersion, req.tLogLocIds); - - while (!logData->unknownCommittedVersions.empty() && - logData->unknownCommittedVersions.back().version <= req.knownCommittedVersion) { - logData->unknownCommittedVersions.pop_back(); - } + // Purge versions from "unknownCommittedVersions" list till "req.knownCommittedVersion". + logData->purgeUnknownCommittedVersions(req.knownCommittedVersion); } else { ASSERT(req.prevVersion == req.seqPrevVersion); // @todo remove this assert later } @@ -3483,11 +3486,8 @@ ACTOR Future restorePersistentState(TLogData* self, if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST && spillTargetLogDataId.present() && qe.id == spillTargetLogDataId.get() && qe.version < logData->persistentDataDurableVersion) { logData->unknownCommittedVersions.emplace_front(qe.version, qe.prevVersion, qe.tLogLocIds); - - while (!logData->unknownCommittedVersions.empty() && - logData->unknownCommittedVersions.back().version <= logData->knownCommittedVersion) { - logData->unknownCommittedVersions.pop_back(); - } + // Purge versions from "unknownCommittedVersions" list till the "knownCommittedVersion". + logData->purgeUnknownCommittedVersions(logData->knownCommittedVersion); continue; } @@ -3499,11 +3499,8 @@ ACTOR Future restorePersistentState(TLogData* self, if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) { logData->unknownCommittedVersions.emplace_front(qe.version, qe.prevVersion, qe.tLogLocIds); - - while (!logData->unknownCommittedVersions.empty() && - logData->unknownCommittedVersions.back().version <= logData->knownCommittedVersion) { - logData->unknownCommittedVersions.pop_back(); - } + // Purge versions from "unknownCommittedVersions" list till the "knownCommittedVersion". + logData->purgeUnknownCommittedVersions(logData->knownCommittedVersion); } if (qe.version > logData->version.get()) { From dfb2d97765d89f2de0e684732c1d822a633b61f1 Mon Sep 17 00:00:00 2001 From: Sreenath Bodagala Date: Thu, 5 Dec 2024 21:31:50 +0000 Subject: [PATCH 5/6] - Make log servers preserve the disk queue positions of all versions from known committed version onwards, when version vector is enabled. --- fdbserver/TLogServer.actor.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index df056197aef..715c4e851c2 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1223,7 +1223,9 @@ ACTOR Future updatePersistentData(TLogData* self, Reference logDa } if (minVersion != std::numeric_limits::max()) { self->persistentQueue->forgetBefore( - newPersistentDataVersion, + (!SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST + ? newPersistentDataVersion + : std::min(newPersistentDataVersion, logData->knownCommittedVersion)), logData); // SOMEDAY: this can cause a slow task (~0.5ms), presumably from erasing too many versions. // Should we limit the number of versions cleared at a time? } From 9249ca1a01ce0e3e118bec657bd35130d15b8421 Mon Sep 17 00:00:00 2001 From: Sreenath Bodagala Date: Tue, 10 Dec 2024 16:34:19 +0000 Subject: [PATCH 6/6] - This change is in the context of the change made in PR https://github.com/apple/foundationdb/pull/11815 in "main". In case of spill by reference, log servers should use the logic that is based over "TagData::popped" to decide how long to keep the disk queue positions of versions in memory (instead of using the logic that is based over "LogData::persistentDataVersion", which is applicable to spill by value case). --- fdbserver/TLogServer.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 715c4e851c2..4690b5796f0 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1225,7 +1225,7 @@ ACTOR Future updatePersistentData(TLogData* self, Reference logDa self->persistentQueue->forgetBefore( (!SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST ? newPersistentDataVersion - : std::min(newPersistentDataVersion, logData->knownCommittedVersion)), + : std::min(minVersion, logData->knownCommittedVersion)), logData); // SOMEDAY: this can cause a slow task (~0.5ms), presumably from erasing too many versions. // Should we limit the number of versions cleared at a time? }