Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New backup consolidated commit #11760

Draft
wants to merge 17 commits into
base: release-7.3
Choose a base branch
from
5 changes: 4 additions & 1 deletion fdbbackup/FileConverter.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ struct MutationFilesReadProgress : public ReferenceCounted<MutationFilesReadProg
if (fp->empty()) {
self->fileProgress.erase(self->fileProgress.begin());
} else {
// Keep fileProgress sorted
// Keep fileProgress sorted because only the first one can be chagned,so this is enough
for (int i = 1; i < self->fileProgress.size(); i++) {
if (*self->fileProgress[i - 1] <= *self->fileProgress[i]) {
break;
Expand Down Expand Up @@ -489,6 +489,9 @@ ACTOR Future<Void> convert(ConvertParams params) {
arena = Arena();
}

// keep getting data until a new version is encounter, then flush all data buffered and start to buffer for a
// new version.

ArenaReader rd(data.arena, data.message, AssumeVersion(g_network->protocolVersion()));
MutationRef m;
rd >> m;
Expand Down
34 changes: 17 additions & 17 deletions fdbbackup/backup.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2397,23 +2397,23 @@ ACTOR Future<Void> runRestore(Database db,
}

if (performRestore) {
Version restoredVersion = wait(backupAgent.restore(db,
origDb,
KeyRef(tagName),
KeyRef(container),
proxy,
ranges,
waitForDone,
targetVersion,
verbose,
KeyRef(addPrefix),
KeyRef(removePrefix),
LockDB::True,
UnlockDB::True,
onlyApplyMutationLogs,
inconsistentSnapshotOnly,
beginVersion,
encryptionKeyFile));
Version restoredVersion = wait(backupAgent.restoreConstructVersion(db,
origDb,
KeyRef(tagName),
KeyRef(container),
proxy,
ranges,
waitForDone,
targetVersion,
verbose,
KeyRef(addPrefix),
KeyRef(removePrefix),
LockDB::True,
UnlockDB::True,
onlyApplyMutationLogs,
inconsistentSnapshotOnly,
beginVersion,
encryptionKeyFile));

if (waitForDone && verbose) {
// If restore is now complete then report version restored
Expand Down
68 changes: 64 additions & 4 deletions fdbclient/BackupAgentBase.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,12 @@ Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion,
return ret;
}

// given a begin and end version, get the prefix in the database for this range
// which is applyLogKeys.begin/backupUid/hash(uint8)/version(64bites)/part
// returns multiple key ranges, each should be of length APPLY_BLOCK_SIZE
// (64, 200) -> [(64, 128), (128, 192), (192, 200)]
Standalone<VectorRef<KeyRangeRef>> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid) {
Standalone<VectorRef<KeyRangeRef>> ret;

Key baLogRangePrefix = backupUid.withPrefix(applyLogKeys.begin);

//TraceEvent("GetLogRanges").detail("BackupUid", backupUid).detail("Prefix", baLogRangePrefix);
Expand Down Expand Up @@ -292,6 +295,7 @@ void _addResult(bool* tenantMapChanging,
each mutation (if needed) and adding/removing prefixes from the mutations. The final mutations are then added to the
"result" vector alongside their encrypted counterparts (which is added to the "encryptedResult" vector)
*/
// hfu5: value is each Param2
ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
VectorRef<MutationRef>* result,
VectorRef<Optional<MutationRef>>* encryptedResult,
Expand All @@ -311,16 +315,19 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
memcpy(&protocolVersion, value.begin(), sizeof(uint64_t));
offset += sizeof(uint64_t);
if (protocolVersion <= 0x0FDB00A200090001) {
// it fails here now
TraceEvent(SevError, "DecodeBackupLogValue")
.detail("IncompatibleProtocolVersion", protocolVersion)
.detail("ValueSize", value.size())
.detail("Value", value);
throw incompatible_protocol_version();
}

state uint32_t totalBytes = 0;
memcpy(&totalBytes, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
// hfu5: this is the format for Param2
// change total bytes to 64 bytes in generateOldFormatMutations
state uint64_t totalBytes = 0;
memcpy(&totalBytes, value.begin() + offset, sizeof(uint64_t));
offset += sizeof(uint64_t);
state uint32_t consumed = 0;

if (totalBytes + offset > value.size())
Expand All @@ -331,18 +338,24 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
state KeyRangeRef tenantMapRange = TenantMetadata::tenantMap().subspace;

while (consumed < totalBytes) {
// fmt::print(stderr, "DecodeProcess11111, offset={}\n", offset);
uint32_t type = 0;
// hfu5: format should be type|kLen|vLen|Key|Value
memcpy(&type, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);

state uint32_t len1 = 0;
memcpy(&len1, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
state uint32_t len2 = 0;
memcpy(&len2, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);

// fmt::print(stderr, "DecodeProcess, offset={}, len1={}, len2={}, size={}, type={}, valid={}\n",
// offset, len1, len2, value.size(), type, isValidMutationType(type));
ASSERT(offset + len1 + len2 <= value.size() && isValidMutationType(type));

// mutationref is constructed here
state MutationRef logValue;
state Arena tempArena;
logValue.type = type;
Expand Down Expand Up @@ -448,6 +461,9 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
} else {
Version ver = key_version->rangeContaining(logValue.param1).value();
//TraceEvent("ApplyMutation").detail("LogValue", logValue).detail("Version", version).detail("Ver", ver).detail("Apply", version > ver && ver != invalidVersion);
// version is the version of this mutation decoded from log
// ver is the old version stored in keyVersionMap
// as a result, only add this mutation in log when the version is larger(to work with range file)
if (version > ver && ver != invalidVersion) {
if (removePrefix.size()) {
logValue.param1 = logValue.param1.removePrefix(removePrefix);
Expand Down Expand Up @@ -587,6 +603,7 @@ ACTOR Future<Void> readCommitted(Database cx,
}
}

// hfu5: read each version, potentially multiple part within the same version
ACTOR Future<Void> readCommitted(Database cx,
PromiseStream<RCGroup> results,
Future<Void> active,
Expand Down Expand Up @@ -639,14 +656,23 @@ ACTOR Future<Void> readCommitted(Database cx,
wait(lock->take(TaskPriority::DefaultYield, rangevalue.expectedSize() + rcGroup.items.expectedSize()));
releaser = FlowLock::Releaser(*lock, rangevalue.expectedSize() + rcGroup.items.expectedSize());

// iterate on a version range.
// each version - partition is a key-value pair
// hfu5 question: when in the edge case, two partitions of same key goes to two different blocks, so they
// cannot be combined here, what happens?
for (auto& s : rangevalue) {
// hfu5 : (version, part)
uint64_t groupKey = groupBy(s.key).first;
//TraceEvent("Log_ReadCommitted").detail("GroupKey", groupKey).detail("SkipGroup", skipGroup).detail("NextKey", nextKey.key).detail("End", end.key).detail("Valuesize", value.size()).detail("Index",index++).detail("Size",s.value.size());
if (groupKey != skipGroup) {
if (rcGroup.version == -1) {
rcGroup.version = tr.getReadVersion().get();
rcGroup.groupKey = groupKey;
} else if (rcGroup.groupKey != groupKey) {
// hfu5: if seeing a different version, then send result directly, and then create another
// rcGroup as a result, each rcgroup is for a single version, but a single version can span in
// different rcgroups

//TraceEvent("Log_ReadCommitted").detail("SendGroup0", rcGroup.groupKey).detail("ItemSize", rcGroup.items.size()).detail("DataLength",rcGroup.items[0].value.size());
// state uint32_t len(0);
// for (size_t j = 0; j < rcGroup.items.size(); ++j) {
Expand All @@ -665,6 +691,7 @@ ACTOR Future<Void> readCommitted(Database cx,
rcGroup.version = tr.getReadVersion().get();
rcGroup.groupKey = groupKey;
}
// this is each item, so according to kvMutationLogToTransactions, each item should be a partition
rcGroup.items.push_back_deep(rcGroup.items.arena(), s);
}
}
Expand Down Expand Up @@ -706,6 +733,8 @@ Future<Void> readCommitted(Database cx,
cx, results, Void(), lock, range, groupBy, Terminator::True, AccessSystemKeys::True, LockAware::True);
}

// restore transaction has to be first in the batch, or it is the only txn in batch to make sure it never conflicts with
// others.
ACTOR Future<Void> sendCommitTransactionRequest(CommitTransactionRequest req,
Key uid,
Version newBeginVersion,
Expand All @@ -722,6 +751,9 @@ ACTOR Future<Void> sendCommitTransactionRequest(CommitTransactionRequest req,

// mutations and encrypted mutations (and their relationship) is described in greater detail in the defenition of
// CommitTransactionRef in CommitTransaction.h
fmt::print(stderr, "BackupAgentBase: newBeginVersion={}\n", newBeginVersion);
TraceEvent("BackupAgentBaseNewBeginVersion").detail("NewBeginVersion", newBeginVersion).log();

req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::SetValue, applyBegin, versionKey));
req.transaction.encryptedMutations.push_back_deep(req.arena, Optional<MutationRef>());
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(applyBegin));
Expand Down Expand Up @@ -759,25 +791,32 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
state Version lastVersion = invalidVersion;
state bool endOfStream = false;
state int totalBytes = 0;
// two layer of loops, outside loop for each file range,
// inside look for each transaction(version)
// fmt::print(stderr, "BackupAgentBase-kvMutationLogToTransactions-beforeLoop\n");
loop {
state CommitTransactionRequest req;
state Version newBeginVersion = invalidVersion;
state int mutationSize = 0;
state bool tenantMapChanging = false;
loop {
try {
// fmt::print(stderr, "BackupAgentBase-RCGroup-Before\n");
state RCGroup group = waitNext(results.getFuture());
// fmt::print(stderr, "BackupAgentBase-RCGroup-After group={}\n", group.groupKey);
state CommitTransactionRequest curReq;
lock->release(group.items.expectedSize());
state int curBatchMutationSize = 0;
tenantMapChanging = false;

BinaryWriter bw(Unversioned());
for (int i = 0; i < group.items.size(); ++i) {
// hfu5 : each value should be a partition
bw.serializeBytes(group.items[i].value);
}
// Parse a single transaction from the backup mutation log
Standalone<StringRef> value = bw.toValue();
// ref: https://github.com/apple/foundationdb/blob/release-6.2/design/backup-dataFormat.md
wait(decodeBackupLogValue(&curReq.arena,
&curReq.transaction.mutations,
&curReq.transaction.encryptedMutations,
Expand Down Expand Up @@ -827,6 +866,8 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
}
mutationSize += curBatchMutationSize;
newBeginVersion = group.groupKey + 1;

// fmt::print(stderr, "BackupAgentBase-kvMutationLogToTransactions: newBeginVersion={}, groupKey={}\n", newBeginVersion, group.groupKey);

// At this point if the tenant map changed we would have already sent any normalKey mutations
// accumulated thus far, so all thats left to do is to send all the mutations in the the offending
Expand All @@ -836,6 +877,7 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
break;
}
} catch (Error& e) {
fmt::print(stderr, "BackupAgentBaseError error={}\n", e.code());
if (e.code() == error_code_end_of_stream) {
if (endVersion.present() && endVersion.get() > lastVersion && endVersion.get() > newBeginVersion) {
newBeginVersion = endVersion.get();
Expand Down Expand Up @@ -882,6 +924,13 @@ ACTOR Future<Void> coalesceKeyVersionCache(Key uid,
lastVersion = it.value();
} else {
Version ver = it.value();
// ver: version from keyVersion
// endVersion: after applying a batch of versions from log files, the largest version
// if ver < endVersion, that means this key in keyVersion is outdated
// in this case, runClearRange on the keyVersionMapRange prefix for this key,
// so that the alog key is the truth, otherwise, keyVersionMapRange should be the truth
// each key needs to be individually checked, because even though range file is for a range, log file does
// not
if (ver < endVersion && lastVersion < endVersion && ver != invalidVersion &&
lastVersion != invalidVersion) {
Key removeKey = it.range().begin.withPrefix(mapPrefix);
Expand Down Expand Up @@ -928,6 +977,7 @@ ACTOR Future<Void> applyMutations(Database cx,
state int maxBytes = CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES;

keyVersion->insert(metadataVersionKey, 0);
// fmt::print(stderr, "BackupAgentBaseApplyMutationBegin: begin={}, end={}\n", beginVersion, *endVersion);

try {
loop {
Expand All @@ -940,23 +990,33 @@ ACTOR Future<Void> applyMutations(Database cx,
}

int rangeCount = std::max(1, CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES / maxBytes);
// this means newEndVersion can only be at most of size APPLY_BLOCK_SIZE
state Version newEndVersion = std::min(*endVersion,
((beginVersion / CLIENT_KNOBS->APPLY_BLOCK_SIZE) + rangeCount) *
CLIENT_KNOBS->APPLY_BLOCK_SIZE);

// ranges each represent a partition of version, e.g. [100, 200], [201, 300], [301, 400]
// (64, 200) -> [(64, 128), (128, 192), (192, 200)] assuming block size is 64
state Standalone<VectorRef<KeyRangeRef>> ranges = getApplyRanges(beginVersion, newEndVersion, uid);
// fmt::print(stderr, "BackupAgentBaseApplyMutationRangeSize={}\n", ranges.size());
// ranges have format: applyLogKeys.begin/uid/hash(uint8)/version(64bites)/part
state size_t idx;
state std::vector<PromiseStream<RCGroup>> results;
state std::vector<Future<Void>> rc;
state std::vector<Reference<FlowLock>> locks;

// each RCGroup is for a single version, each results[i] is for a single range
// one range might have multiple versions
for (int i = 0; i < ranges.size(); ++i) {
// fmt::print(stderr, "BackupAgentBaseApplyMutationRangeRecord begin={}, end={}\n", ranges[i].begin, ranges[i].end);
results.push_back(PromiseStream<RCGroup>());
locks.push_back(makeReference<FlowLock>(
std::max(CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES / ranges.size(), CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES)));
rc.push_back(readCommitted(cx, results[i], locks[i], ranges[i], decodeBKMutationLogKey));
}

maxBytes = std::max<int>(maxBytes * CLIENT_KNOBS->APPLY_MAX_DECAY_RATE, CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES);

for (idx = 0; idx < ranges.size(); ++idx) {
int bytes =
wait(kvMutationLogToTransactions(cx,
Expand Down
Loading