Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extensions to log server disk queue (made in the context of version vector/unicast) #11777

Open
wants to merge 6 commits into
base: version-vector-disk-queue
Choose a base branch
from
80 changes: 74 additions & 6 deletions fdbserver/TLogServer.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,19 @@ struct TLogQueueEntryRef {
UID id;
Version version;
Version knownCommittedVersion;
Version prevVersion;
std::vector<uint16_t> tLogLocIds;
StringRef messages;
TLogQueueEntryRef() : version(0), knownCommittedVersion(0) {}
TLogQueueEntryRef() : version(0), knownCommittedVersion(0), prevVersion(0) {}
TLogQueueEntryRef(Arena& a, TLogQueueEntryRef const& from)
: id(from.id), version(from.version), knownCommittedVersion(from.knownCommittedVersion),
messages(a, from.messages) {}
messages(a, from.messages), prevVersion(from.prevVersion), tLogLocIds(from.tLogLocIds) {}

// To change this serialization, ProtocolVersion::TLogQueueEntryRef must be updated, and downgrades need to be
// considered
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, messages, knownCommittedVersion, id);
serializer(ar, version, messages, knownCommittedVersion, prevVersion, tLogLocIds, id);
}
size_t expectedSize() const { return messages.expectedSize(); }
};
Expand All @@ -73,8 +75,11 @@ struct AlternativeTLogQueueEntryRef {
Version version;
Version knownCommittedVersion;
std::vector<TagsAndMessage>* alternativeMessages;
Version prevVersion;
std::vector<uint16_t> tLogLocIds;

AlternativeTLogQueueEntryRef() : version(0), knownCommittedVersion(0), alternativeMessages(nullptr) {}
AlternativeTLogQueueEntryRef()
: version(0), knownCommittedVersion(0), alternativeMessages(nullptr), prevVersion(0) {}

template <class Ar>
void serialize(Ar& ar) {
Expand All @@ -84,7 +89,7 @@ struct AlternativeTLogQueueEntryRef {
for (auto& msg : *alternativeMessages) {
ar.serializeBytes(msg.message);
}
serializer(ar, knownCommittedVersion, id);
serializer(ar, knownCommittedVersion, prevVersion, tLogLocIds, id);
}

uint32_t expectedSize() const {
Expand Down Expand Up @@ -217,6 +222,8 @@ static const KeyRangeRef persistTxsTagsKeys = KeyRangeRef("TxsTags/"_sr, "TxsTag
static const KeyRange persistTagMessagesKeys = prefixRange("TagMsg/"_sr);
static const KeyRange persistTagMessageRefsKeys = prefixRange("TagMsgRef/"_sr);
static const KeyRange persistTagPoppedKeys = prefixRange("TagPop/"_sr);
static const KeyRef persistUnicastRecoveryLocationKey = KeyRef("UnicastRecoveryLocation"_sr);
static const KeyRef persistSpillTargetLogDataIdKey = KeyRef("SpillTargetLogDataId"_sr);

static const KeyRef persistEncryptionAtRestModeKey = "encryptionAtRestMode"_sr;

Expand Down Expand Up @@ -731,6 +738,10 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
tLogData->persistentData->clear(KeyRangeRef(msgRefKey, strinc(msgRefKey)));
Key poppedKey = logIdKey.withPrefix(persistTagPoppedKeys.begin);
tLogData->persistentData->clear(KeyRangeRef(poppedKey, strinc(poppedKey)));
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
tLogData->persistentData->clear(singleKeyRange(logIdKey.withPrefix(persistUnicastRecoveryLocationKey)));
tLogData->persistentData->clear(singleKeyRange(logIdKey.withPrefix(persistSpillTargetLogDataIdKey)));
}
}

for (auto it = peekTracker.begin(); it != peekTracker.end(); ++it) {
Expand Down Expand Up @@ -1108,6 +1119,17 @@ ACTOR Future<Void> updatePersistentData(TLogData* self, Reference<LogData> logDa
KeyValueRef(persistRecoveryLocationKey, BinaryWriter::toValue(locationIter->value.first, Unversioned())));
}

if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
auto kcvLocationIter = logData->versionLocation.lastLessOrEqual(logData->knownCommittedVersion);
if (kcvLocationIter != logData->versionLocation.end()) {
self->persistentData->set(KeyValueRef(persistUnicastRecoveryLocationKey,
BinaryWriter::toValue(kcvLocationIter->value.first, Unversioned())));
}

self->persistentData->set(
KeyValueRef(persistSpillTargetLogDataIdKey, BinaryWriter::toValue(logData->logId, Unversioned())));
}

self->persistentData->set(
KeyValueRef(BinaryWriter::toValue(logData->logId, Unversioned()).withPrefix(persistCurrentVersionKeys.begin),
BinaryWriter::toValue(newPersistentDataVersion, Unversioned())));
Expand Down Expand Up @@ -2430,6 +2452,8 @@ ACTOR Future<Void> tLogCommit(TLogData* self,
qe.knownCommittedVersion = logData->knownCommittedVersion;
qe.messages = req.messages;
qe.id = logData->logId;
qe.prevVersion = req.seqPrevVersion;
qe.tLogLocIds = req.tLogLocIds;
self->persistentQueue->push(qe, logData);

self->diskQueueCommitBytes += qe.expectedSize();
Expand All @@ -2439,7 +2463,9 @@ ACTOR Future<Void> tLogCommit(TLogData* self,
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
logData->version.set(req.version);
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
ASSERT(req.tLogCount == req.tLogLocIds.size());
logData->unknownCommittedVersions.emplace_front(req.version, req.seqPrevVersion, req.tLogLocIds);

while (!logData->unknownCommittedVersions.empty() &&
logData->unknownCommittedVersions.back().version <= req.knownCommittedVersion) {
logData->unknownCommittedVersions.pop_back();
Expand Down Expand Up @@ -3058,6 +3084,7 @@ ACTOR Future<Void> pullAsyncData(TLogData* self,
qe.knownCommittedVersion = logData->knownCommittedVersion;
qe.messages = StringRef();
qe.id = logData->logId;
qe.prevVersion = 0;
self->persistentQueue->push(qe, logData);

self->diskQueueCommitBytes += qe.expectedSize();
Expand Down Expand Up @@ -3213,10 +3240,13 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
state Future<RangeResult> fRecoverCounts = storage->readRange(persistRecoveryCountKeys);
state Future<RangeResult> fProtocolVersions = storage->readRange(persistProtocolVersionKeys);
state Future<RangeResult> fTLogSpillTypes = storage->readRange(persistTLogSpillTypeKeys);
state Future<Optional<Value>> fUnicastRecoveryLocation = storage->readValue(persistUnicastRecoveryLocationKey);
state Future<Optional<Value>> fSpillTargetLogDataId = storage->readValue(persistSpillTargetLogDataIdKey);

// FIXME: metadata in queue?

wait(waitForAll(std::vector{ fFormat, fRecoveryLocation, fEncryptionAtRestMode }));
wait(waitForAll(std::vector{
fFormat, fRecoveryLocation, fEncryptionAtRestMode, fUnicastRecoveryLocation, fSpillTargetLogDataId }));
wait(waitForAll(std::vector{ fVers,
fKnownCommitted,
fLocality,
Expand Down Expand Up @@ -3385,6 +3415,22 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
}
}

state Optional<UID> spillTargetLogDataId;
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST && fUnicastRecoveryLocation.get().present() &&
fSpillTargetLogDataId.get().present()) {
spillTargetLogDataId = BinaryReader::fromStringRef<UID>(fSpillTargetLogDataId.get().get(), Unversioned());
auto iter = self->id_data.find(spillTargetLogDataId.get());
if (iter != self->id_data.end()) {
Reference<LogData> spillTargetLogData = iter->second;
if (spillTargetLogData->knownCommittedVersion < spillTargetLogData->persistentDataDurableVersion) {
minimumRecoveryLocation = BinaryReader::fromStringRef<IDiskQueue::location>(
fUnicastRecoveryLocation.get().get(), Unversioned());
}
} else {
spillTargetLogDataId.reset();
}
}

std::sort(logsByVersion.begin(), logsByVersion.end());
for (const auto& pair : logsByVersion) {
// TLogs that have been fully spilled won't have queue entries read in the loop below.
Expand Down Expand Up @@ -3425,11 +3471,32 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
// logData->version.get());

if (logData) {
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST && spillTargetLogDataId.present() &&
qe.id == spillTargetLogDataId.get() && qe.version < logData->persistentDataDurableVersion) {
logData->unknownCommittedVersions.emplace_front(qe.version, qe.prevVersion, qe.tLogLocIds);

while (!logData->unknownCommittedVersions.empty() &&
logData->unknownCommittedVersions.back().version <= logData->knownCommittedVersion) {
logData->unknownCommittedVersions.pop_back();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make a function for these lines for code reuse?
There are similar code below and in tLogCommit().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Done now.

continue;
}

if (!self->spillOrder.size() || self->spillOrder.back() != qe.id) {
self->spillOrder.push_back(qe.id);
}
logData->knownCommittedVersion =
std::max(logData->knownCommittedVersion, qe.knownCommittedVersion);

if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
logData->unknownCommittedVersions.emplace_front(qe.version, qe.prevVersion, qe.tLogLocIds);

while (!logData->unknownCommittedVersions.empty() &&
logData->unknownCommittedVersions.back().version <= logData->knownCommittedVersion) {
logData->unknownCommittedVersions.pop_back();
}
}

if (qe.version > logData->version.get()) {
commitMessages(self, logData, qe.version, qe.arena(), qe.messages);
logData->version.set(qe.version);
Expand Down Expand Up @@ -3707,6 +3774,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
qe.knownCommittedVersion = logData->knownCommittedVersion;
qe.messages = StringRef();
qe.id = logData->logId;
qe.prevVersion = 0;
self->persistentQueue->push(qe, logData);

self->diskQueueCommitBytes += qe.expectedSize();
Expand Down