diff --git a/fdbbackup/FileConverter.actor.cpp b/fdbbackup/FileConverter.actor.cpp index 84cded889c2..14c7ab68d19 100644 --- a/fdbbackup/FileConverter.actor.cpp +++ b/fdbbackup/FileConverter.actor.cpp @@ -291,7 +291,7 @@ struct MutationFilesReadProgress : public ReferenceCountedempty()) { self->fileProgress.erase(self->fileProgress.begin()); } else { - // Keep fileProgress sorted + // Keep fileProgress sorted because only the first one can be chagned,so this is enough for (int i = 1; i < self->fileProgress.size(); i++) { if (*self->fileProgress[i - 1] <= *self->fileProgress[i]) { break; @@ -489,6 +489,9 @@ ACTOR Future convert(ConvertParams params) { arena = Arena(); } + // keep getting data until a new version is encounter, then flush all data buffered and start to buffer for a + // new version. + ArenaReader rd(data.arena, data.message, AssumeVersion(g_network->protocolVersion())); MutationRef m; rd >> m; diff --git a/fdbbackup/backup.actor.cpp b/fdbbackup/backup.actor.cpp index 7f9b91195cd..b19ba33d821 100644 --- a/fdbbackup/backup.actor.cpp +++ b/fdbbackup/backup.actor.cpp @@ -2397,23 +2397,23 @@ ACTOR Future runRestore(Database db, } if (performRestore) { - Version restoredVersion = wait(backupAgent.restore(db, - origDb, - KeyRef(tagName), - KeyRef(container), - proxy, - ranges, - waitForDone, - targetVersion, - verbose, - KeyRef(addPrefix), - KeyRef(removePrefix), - LockDB::True, - UnlockDB::True, - onlyApplyMutationLogs, - inconsistentSnapshotOnly, - beginVersion, - encryptionKeyFile)); + Version restoredVersion = wait(backupAgent.restoreConstructVersion(db, + origDb, + KeyRef(tagName), + KeyRef(container), + proxy, + ranges, + waitForDone, + targetVersion, + verbose, + KeyRef(addPrefix), + KeyRef(removePrefix), + LockDB::True, + UnlockDB::True, + onlyApplyMutationLogs, + inconsistentSnapshotOnly, + beginVersion, + encryptionKeyFile)); if (waitForDone && verbose) { // If restore is now complete then report version restored diff --git a/fdbclient/BackupAgentBase.actor.cpp b/fdbclient/BackupAgentBase.actor.cpp index 201e33b3505..ae2a00ec759 100644 --- a/fdbclient/BackupAgentBase.actor.cpp +++ b/fdbclient/BackupAgentBase.actor.cpp @@ -187,9 +187,12 @@ Standalone> getLogRanges(Version beginVersion, return ret; } +// given a begin and end version, get the prefix in the database for this range +// which is applyLogKeys.begin/backupUid/hash(uint8)/version(64bites)/part +// returns multiple key ranges, each should be of length APPLY_BLOCK_SIZE +// (64, 200) -> [(64, 128), (128, 192), (192, 200)] Standalone> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid) { Standalone> ret; - Key baLogRangePrefix = backupUid.withPrefix(applyLogKeys.begin); //TraceEvent("GetLogRanges").detail("BackupUid", backupUid).detail("Prefix", baLogRangePrefix); @@ -292,6 +295,7 @@ void _addResult(bool* tenantMapChanging, each mutation (if needed) and adding/removing prefixes from the mutations. The final mutations are then added to the "result" vector alongside their encrypted counterparts (which is added to the "encryptedResult" vector) */ +// hfu5: value is each Param2 ACTOR static Future decodeBackupLogValue(Arena* arena, VectorRef* result, VectorRef>* encryptedResult, @@ -311,6 +315,7 @@ ACTOR static Future decodeBackupLogValue(Arena* arena, memcpy(&protocolVersion, value.begin(), sizeof(uint64_t)); offset += sizeof(uint64_t); if (protocolVersion <= 0x0FDB00A200090001) { + // it fails here now TraceEvent(SevError, "DecodeBackupLogValue") .detail("IncompatibleProtocolVersion", protocolVersion) .detail("ValueSize", value.size()) @@ -318,9 +323,11 @@ ACTOR static Future decodeBackupLogValue(Arena* arena, throw incompatible_protocol_version(); } - state uint32_t totalBytes = 0; - memcpy(&totalBytes, value.begin() + offset, sizeof(uint32_t)); - offset += sizeof(uint32_t); + // hfu5: this is the format for Param2 + // change total bytes to 64 bytes in generateOldFormatMutations + state uint64_t totalBytes = 0; + memcpy(&totalBytes, value.begin() + offset, sizeof(uint64_t)); + offset += sizeof(uint64_t); state uint32_t consumed = 0; if (totalBytes + offset > value.size()) @@ -331,9 +338,12 @@ ACTOR static Future decodeBackupLogValue(Arena* arena, state KeyRangeRef tenantMapRange = TenantMetadata::tenantMap().subspace; while (consumed < totalBytes) { + // fmt::print(stderr, "DecodeProcess11111, offset={}\n", offset); uint32_t type = 0; + // hfu5: format should be type|kLen|vLen|Key|Value memcpy(&type, value.begin() + offset, sizeof(uint32_t)); offset += sizeof(uint32_t); + state uint32_t len1 = 0; memcpy(&len1, value.begin() + offset, sizeof(uint32_t)); offset += sizeof(uint32_t); @@ -341,8 +351,11 @@ ACTOR static Future decodeBackupLogValue(Arena* arena, memcpy(&len2, value.begin() + offset, sizeof(uint32_t)); offset += sizeof(uint32_t); + // fmt::print(stderr, "DecodeProcess, offset={}, len1={}, len2={}, size={}, type={}, valid={}\n", + // offset, len1, len2, value.size(), type, isValidMutationType(type)); ASSERT(offset + len1 + len2 <= value.size() && isValidMutationType(type)); + // mutationref is constructed here state MutationRef logValue; state Arena tempArena; logValue.type = type; @@ -448,6 +461,9 @@ ACTOR static Future decodeBackupLogValue(Arena* arena, } else { Version ver = key_version->rangeContaining(logValue.param1).value(); //TraceEvent("ApplyMutation").detail("LogValue", logValue).detail("Version", version).detail("Ver", ver).detail("Apply", version > ver && ver != invalidVersion); + // version is the version of this mutation decoded from log + // ver is the old version stored in keyVersionMap + // as a result, only add this mutation in log when the version is larger(to work with range file) if (version > ver && ver != invalidVersion) { if (removePrefix.size()) { logValue.param1 = logValue.param1.removePrefix(removePrefix); @@ -587,6 +603,7 @@ ACTOR Future readCommitted(Database cx, } } +// hfu5: read each version, potentially multiple part within the same version ACTOR Future readCommitted(Database cx, PromiseStream results, Future active, @@ -639,7 +656,12 @@ ACTOR Future readCommitted(Database cx, wait(lock->take(TaskPriority::DefaultYield, rangevalue.expectedSize() + rcGroup.items.expectedSize())); releaser = FlowLock::Releaser(*lock, rangevalue.expectedSize() + rcGroup.items.expectedSize()); + // iterate on a version range. + // each version - partition is a key-value pair + // hfu5 question: when in the edge case, two partitions of same key goes to two different blocks, so they + // cannot be combined here, what happens? for (auto& s : rangevalue) { + // hfu5 : (version, part) uint64_t groupKey = groupBy(s.key).first; //TraceEvent("Log_ReadCommitted").detail("GroupKey", groupKey).detail("SkipGroup", skipGroup).detail("NextKey", nextKey.key).detail("End", end.key).detail("Valuesize", value.size()).detail("Index",index++).detail("Size",s.value.size()); if (groupKey != skipGroup) { @@ -647,6 +669,10 @@ ACTOR Future readCommitted(Database cx, rcGroup.version = tr.getReadVersion().get(); rcGroup.groupKey = groupKey; } else if (rcGroup.groupKey != groupKey) { + // hfu5: if seeing a different version, then send result directly, and then create another + // rcGroup as a result, each rcgroup is for a single version, but a single version can span in + // different rcgroups + //TraceEvent("Log_ReadCommitted").detail("SendGroup0", rcGroup.groupKey).detail("ItemSize", rcGroup.items.size()).detail("DataLength",rcGroup.items[0].value.size()); // state uint32_t len(0); // for (size_t j = 0; j < rcGroup.items.size(); ++j) { @@ -665,6 +691,7 @@ ACTOR Future readCommitted(Database cx, rcGroup.version = tr.getReadVersion().get(); rcGroup.groupKey = groupKey; } + // this is each item, so according to kvMutationLogToTransactions, each item should be a partition rcGroup.items.push_back_deep(rcGroup.items.arena(), s); } } @@ -706,6 +733,8 @@ Future readCommitted(Database cx, cx, results, Void(), lock, range, groupBy, Terminator::True, AccessSystemKeys::True, LockAware::True); } +// restore transaction has to be first in the batch, or it is the only txn in batch to make sure it never conflicts with +// others. ACTOR Future sendCommitTransactionRequest(CommitTransactionRequest req, Key uid, Version newBeginVersion, @@ -722,6 +751,9 @@ ACTOR Future sendCommitTransactionRequest(CommitTransactionRequest req, // mutations and encrypted mutations (and their relationship) is described in greater detail in the defenition of // CommitTransactionRef in CommitTransaction.h + fmt::print(stderr, "BackupAgentBase: newBeginVersion={}\n", newBeginVersion); + TraceEvent("BackupAgentBaseNewBeginVersion").detail("NewBeginVersion", newBeginVersion).log(); + req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::SetValue, applyBegin, versionKey)); req.transaction.encryptedMutations.push_back_deep(req.arena, Optional()); req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(applyBegin)); @@ -759,6 +791,9 @@ ACTOR Future kvMutationLogToTransactions(Database cx, state Version lastVersion = invalidVersion; state bool endOfStream = false; state int totalBytes = 0; + // two layer of loops, outside loop for each file range, + // inside look for each transaction(version) + // fmt::print(stderr, "BackupAgentBase-kvMutationLogToTransactions-beforeLoop\n"); loop { state CommitTransactionRequest req; state Version newBeginVersion = invalidVersion; @@ -766,7 +801,9 @@ ACTOR Future kvMutationLogToTransactions(Database cx, state bool tenantMapChanging = false; loop { try { + // fmt::print(stderr, "BackupAgentBase-RCGroup-Before\n"); state RCGroup group = waitNext(results.getFuture()); + // fmt::print(stderr, "BackupAgentBase-RCGroup-After group={}\n", group.groupKey); state CommitTransactionRequest curReq; lock->release(group.items.expectedSize()); state int curBatchMutationSize = 0; @@ -774,10 +811,12 @@ ACTOR Future kvMutationLogToTransactions(Database cx, BinaryWriter bw(Unversioned()); for (int i = 0; i < group.items.size(); ++i) { + // hfu5 : each value should be a partition bw.serializeBytes(group.items[i].value); } // Parse a single transaction from the backup mutation log Standalone value = bw.toValue(); + // ref: https://github.com/apple/foundationdb/blob/release-6.2/design/backup-dataFormat.md wait(decodeBackupLogValue(&curReq.arena, &curReq.transaction.mutations, &curReq.transaction.encryptedMutations, @@ -827,6 +866,8 @@ ACTOR Future kvMutationLogToTransactions(Database cx, } mutationSize += curBatchMutationSize; newBeginVersion = group.groupKey + 1; + + // fmt::print(stderr, "BackupAgentBase-kvMutationLogToTransactions: newBeginVersion={}, groupKey={}\n", newBeginVersion, group.groupKey); // At this point if the tenant map changed we would have already sent any normalKey mutations // accumulated thus far, so all thats left to do is to send all the mutations in the the offending @@ -836,6 +877,7 @@ ACTOR Future kvMutationLogToTransactions(Database cx, break; } } catch (Error& e) { + fmt::print(stderr, "BackupAgentBaseError error={}\n", e.code()); if (e.code() == error_code_end_of_stream) { if (endVersion.present() && endVersion.get() > lastVersion && endVersion.get() > newBeginVersion) { newBeginVersion = endVersion.get(); @@ -882,6 +924,13 @@ ACTOR Future coalesceKeyVersionCache(Key uid, lastVersion = it.value(); } else { Version ver = it.value(); + // ver: version from keyVersion + // endVersion: after applying a batch of versions from log files, the largest version + // if ver < endVersion, that means this key in keyVersion is outdated + // in this case, runClearRange on the keyVersionMapRange prefix for this key, + // so that the alog key is the truth, otherwise, keyVersionMapRange should be the truth + // each key needs to be individually checked, because even though range file is for a range, log file does + // not if (ver < endVersion && lastVersion < endVersion && ver != invalidVersion && lastVersion != invalidVersion) { Key removeKey = it.range().begin.withPrefix(mapPrefix); @@ -928,6 +977,7 @@ ACTOR Future applyMutations(Database cx, state int maxBytes = CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES; keyVersion->insert(metadataVersionKey, 0); + // fmt::print(stderr, "BackupAgentBaseApplyMutationBegin: begin={}, end={}\n", beginVersion, *endVersion); try { loop { @@ -940,16 +990,25 @@ ACTOR Future applyMutations(Database cx, } int rangeCount = std::max(1, CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES / maxBytes); + // this means newEndVersion can only be at most of size APPLY_BLOCK_SIZE state Version newEndVersion = std::min(*endVersion, ((beginVersion / CLIENT_KNOBS->APPLY_BLOCK_SIZE) + rangeCount) * CLIENT_KNOBS->APPLY_BLOCK_SIZE); + + // ranges each represent a partition of version, e.g. [100, 200], [201, 300], [301, 400] + // (64, 200) -> [(64, 128), (128, 192), (192, 200)] assuming block size is 64 state Standalone> ranges = getApplyRanges(beginVersion, newEndVersion, uid); + // fmt::print(stderr, "BackupAgentBaseApplyMutationRangeSize={}\n", ranges.size()); + // ranges have format: applyLogKeys.begin/uid/hash(uint8)/version(64bites)/part state size_t idx; state std::vector> results; state std::vector> rc; state std::vector> locks; + // each RCGroup is for a single version, each results[i] is for a single range + // one range might have multiple versions for (int i = 0; i < ranges.size(); ++i) { + // fmt::print(stderr, "BackupAgentBaseApplyMutationRangeRecord begin={}, end={}\n", ranges[i].begin, ranges[i].end); results.push_back(PromiseStream()); locks.push_back(makeReference( std::max(CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES / ranges.size(), CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES))); @@ -957,6 +1016,7 @@ ACTOR Future applyMutations(Database cx, } maxBytes = std::max(maxBytes * CLIENT_KNOBS->APPLY_MAX_DECAY_RATE, CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES); + for (idx = 0; idx < ranges.size(); ++idx) { int bytes = wait(kvMutationLogToTransactions(cx, diff --git a/fdbclient/BackupContainerFileSystem.actor.cpp b/fdbclient/BackupContainerFileSystem.actor.cpp index 62d35d82549..d944c7c775c 100644 --- a/fdbclient/BackupContainerFileSystem.actor.cpp +++ b/fdbclient/BackupContainerFileSystem.actor.cpp @@ -259,12 +259,16 @@ class BackupContainerFileSystemImpl { for (int idx : indices) { const LogFile& file = files[idx]; if (lastEnd == invalidVersion) { - if (file.beginVersion > begin) + if (file.beginVersion > begin) { + // flowguru: the first version of the first file must be smaller or equal to the desired + // beginVersion return false; + } if (file.endVersion > begin) { lastBegin = begin; lastTags = file.totalTags; } else { + // if endVerison of file is smaller than desired beginVersion, then do not include this file continue; } } else if (lastEnd < file.beginVersion) { @@ -904,7 +908,15 @@ class BackupContainerFileSystemImpl { // If "keyRangesFilter" is empty, the file set will cover all key ranges present in the backup. // It's generally a good idea to specify "keyRangesFilter" to reduce the number of files for // restore times. - // + // hfu5:1. it first reads and parse snapshot file, each snapshot file can map to a list of range files + // including ranges/ and kvranges/, then it collects range files who has intersecting keys + // 2. not sure why restorable.targetVersion < maxKeyRangeVersion it would continue + // 3. then it has a minKeyRangeVersion representing min version of all range files + // 4. then it read all log files with start smaller than targetVersion and end larget than minKeyRangeVersion + // 4. if the first log file start version is smaller than minKeyRangeVersion, then we do not know the value, + // give up. + // otherwise return both range and log files. + // 5. LogFile object is created in BackupContainerFileSystem::listLogFiles, and tagID are populated for plog // If "logsOnly" is true, then only log files are returned and "keyRangesFilter" is ignored, // because the log can contain mutations of the whole key space, unlike range files that each // is limited to a smaller key range. @@ -943,6 +955,7 @@ class BackupContainerFileSystemImpl { state Version minKeyRangeVersion = MAX_VERSION; state Version maxKeyRangeVersion = -1; + // iterate each listed file, why still return a vector std::pair, std::map> results = wait(bc->readKeyspaceSnapshot(snapshots[i])); @@ -955,6 +968,7 @@ class BackupContainerFileSystemImpl { maxKeyRangeVersion = snapshots[i].endVersion; } else { for (const auto& rangeFile : results.first) { + // each file is a version on a [begin, end] key range const auto& keyRange = results.second.at(rangeFile.fileName); if (keyRange.intersects(keyRangesFilter)) { restorable.ranges.push_back(rangeFile); @@ -971,9 +985,20 @@ class BackupContainerFileSystemImpl { // 'latestVersion' represents using the minimum restorable version in a snapshot. restorable.targetVersion = targetVersion == latestVersion ? maxKeyRangeVersion : targetVersion; // Any version < maxKeyRangeVersion is not restorable. + // hfu5 question: why? what if target version is 8500, and this snapshot has [8000, 8200, 8800] + // do we give up directly? why it is not restorable? + // not give up, try to find the next smaller one + // if max is 1000, target is 500, then try to find a smaller max + // if max is 300, target is 500, then do the restore + // as a result, find the first snapshot, whose max version is smaller than targetVersion, + // [1, 100], [101, 200], [201, 300], [301, 400], if i want to restore to 230, + // then continue on [201, 300] and [301, 400], and return on [101, 200] + // later will list all log files from [101, 230] if (restorable.targetVersion < maxKeyRangeVersion) continue; + // restorable.snapshot.beginVersion is set to the smallest(oldest) snapshot's beginVersion + // question: should i always find a smaller log file that is smaller than this range's version? restorable.snapshot = snapshots[i]; // No logs needed if there is a complete filtered key space snapshot at the target version. @@ -993,6 +1018,7 @@ class BackupContainerFileSystemImpl { store(plogs, bc->listLogFiles(minKeyRangeVersion, restorable.targetVersion, true))); if (plogs.size() > 0) { + // hfu5 : this is how files are decided logs.swap(plogs); // sort by tag ID so that filterDuplicates works. std::sort(logs.begin(), logs.end(), [](const LogFile& a, const LogFile& b) { @@ -1005,6 +1031,8 @@ class BackupContainerFileSystemImpl { restorable.logs.swap(filtered); // sort by version order again for continuous analysis std::sort(restorable.logs.begin(), restorable.logs.end()); + // sort by version, but isPartitionedLogsContinuous will sort each tag separately + // need to refactor. if (isPartitionedLogsContinuous(restorable.logs, minKeyRangeVersion, restorable.targetVersion)) { restorable.continuousBeginVersion = minKeyRangeVersion; restorable.continuousEndVersion = restorable.targetVersion + 1; // not inclusive diff --git a/fdbclient/ClientKnobs.cpp b/fdbclient/ClientKnobs.cpp index b2b43012ce9..29d7736c14b 100644 --- a/fdbclient/ClientKnobs.cpp +++ b/fdbclient/ClientKnobs.cpp @@ -182,6 +182,7 @@ void ClientKnobs::initialize(Randomize randomize) { init( BACKUP_DISPATCH_ADDTASK_SIZE, 50 ); init( RESTORE_DISPATCH_ADDTASK_SIZE, 150 ); init( RESTORE_DISPATCH_BATCH_SIZE, 30000 ); if( randomize && BUGGIFY ) RESTORE_DISPATCH_BATCH_SIZE = 20; + init (RESTORE_PARTITIONED_BATCH_VERSION_SIZE, 1000000); init( RESTORE_WRITE_TX_SIZE, 256 * 1024 ); init( APPLY_MAX_LOCK_BYTES, 1e9 ); init( APPLY_MIN_LOCK_BYTES, 11e6 ); //Must be bigger than TRANSACTION_SIZE_LIMIT diff --git a/fdbclient/FileBackupAgent.actor.cpp b/fdbclient/FileBackupAgent.actor.cpp index f0f02877e0c..56f7b29eed3 100644 --- a/fdbclient/FileBackupAgent.actor.cpp +++ b/fdbclient/FileBackupAgent.actor.cpp @@ -40,6 +40,7 @@ #include "fdbclient/KeyRangeMap.h" #include "fdbclient/Knobs.h" #include "fdbclient/ManagementAPI.actor.h" +#include "fdbclient/PartitionedLogIterator.h" #include "fdbclient/RestoreInterface.h" #include "fdbclient/Status.h" #include "fdbclient/SystemData.h" @@ -156,7 +157,6 @@ ACTOR Future> TagUidMap::getAll_impl(TagUidMap* tagsMa KeyBackedTag::KeyBackedTag(std::string tagName, StringRef tagMapPrefix) : KeyBackedProperty(TagUidMap(tagMapPrefix).getProperty(tagName)), tagName(tagName), tagMapPrefix(tagMapPrefix) {} - class RestoreConfig : public KeyBackedTaskConfig { public: RestoreConfig(UID uid = UID()) : KeyBackedTaskConfig(fileRestorePrefixRange.begin, uid) {} @@ -172,6 +172,7 @@ class RestoreConfig : public KeyBackedTaskConfig { KeyBackedProperty onlyApplyMutationLogs() { return configSpace.pack(__FUNCTION__sr); } KeyBackedProperty inconsistentSnapshotOnly() { return configSpace.pack(__FUNCTION__sr); } KeyBackedProperty unlockDBAfterRestore() { return configSpace.pack(__FUNCTION__sr); } + KeyBackedProperty transformPartitionedLog() { return configSpace.pack(__FUNCTION__sr); } // XXX: Remove restoreRange() once it is safe to remove. It has been changed to restoreRanges KeyBackedProperty restoreRange() { return configSpace.pack(__FUNCTION__sr); } // XXX: Changed to restoreRangeSet. It can be removed. @@ -232,16 +233,20 @@ class RestoreConfig : public KeyBackedTaskConfig { // Describes a file to load blocks from during restore. Ordered by version and then fileName to enable // incrementally advancing through the map, saving the version and path of the next starting point. + // question: do we want to add tag here? struct RestoreFile { - Version version; + Version version; // this is beginVersion, not endVersion std::string fileName; bool isRange{ false }; // false for log file int64_t blockSize{ 0 }; int64_t fileSize{ 0 }; Version endVersion{ ::invalidVersion }; // not meaningful for range files + int64_t tagId = -1; // only meaningful to log files, Log router tag. Non-negative for new backup format. + int64_t totalTags = -1; // only meaningful to log files, Total number of log router tags. Tuple pack() const { - return Tuple::makeTuple(version, fileName, (int)isRange, fileSize, blockSize, endVersion); + return Tuple::makeTuple( + version, fileName, (int64_t)isRange, fileSize, blockSize, endVersion, tagId, totalTags); } static RestoreFile unpack(Tuple const& t) { RestoreFile r; @@ -252,6 +257,8 @@ class RestoreConfig : public KeyBackedTaskConfig { r.fileSize = t.getInt(i++); r.blockSize = t.getInt(i++); r.endVersion = t.getInt(i++); + r.tagId = t.getInt(i++); + r.totalTags = t.getInt(i++); return r; } }; @@ -297,6 +304,8 @@ class RestoreConfig : public KeyBackedTaskConfig { Version beginVersion = BinaryReader::fromStringRef(beginVal.get().get(), Unversioned()); Version endVersion = BinaryReader::fromStringRef(endVal.get().get(), Unversioned()); + fmt::print(stderr, "GetLag internal: begin={}, end={}\n", beginVersion, endVersion); + return endVersion - beginVersion; } @@ -403,61 +412,62 @@ typedef RestoreConfig::RestoreFile RestoreFile; ACTOR Future RestoreConfig::getProgress_impl(RestoreConfig restore, Reference tr) { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::LOCK_AWARE); - - state Future fileCount = restore.fileCount().getD(tr); - state Future fileBlockCount = restore.fileBlockCount().getD(tr); - state Future fileBlocksDispatched = restore.filesBlocksDispatched().getD(tr); - state Future fileBlocksFinished = restore.fileBlocksFinished().getD(tr); - state Future bytesWritten = restore.bytesWritten().getD(tr); - state Future status = restore.stateText(tr); - state Future currentVersion = restore.getCurrentVersion(tr); - state Future lag = restore.getApplyVersionLag(tr); - state Future firstConsistentVersion = restore.firstConsistentVersion().getD(tr); - state Future tag = restore.tag().getD(tr); - state Future> lastError = restore.lastError().getD(tr); - - // restore might no longer be valid after the first wait so make sure it is not needed anymore. - state UID uid = restore.getUid(); - wait(success(fileCount) && success(fileBlockCount) && success(fileBlocksDispatched) && - success(fileBlocksFinished) && success(bytesWritten) && success(status) && success(currentVersion) && - success(lag) && success(firstConsistentVersion) && success(tag) && success(lastError)); - - std::string errstr = "None"; - if (lastError.get().second != 0) - errstr = format("'%s' %" PRId64 "s ago.\n", - lastError.get().first.c_str(), - (tr->getReadVersion().get() - lastError.get().second) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND); - - TraceEvent("FileRestoreProgress") - .detail("RestoreUID", uid) - .detail("Tag", tag.get()) - .detail("State", status.get().toString()) - .detail("FileCount", fileCount.get()) - .detail("FileBlocksFinished", fileBlocksFinished.get()) - .detail("FileBlocksTotal", fileBlockCount.get()) - .detail("FileBlocksInProgress", fileBlocksDispatched.get() - fileBlocksFinished.get()) - .detail("BytesWritten", bytesWritten.get()) - .detail("CurrentVersion", currentVersion.get()) - .detail("FirstConsistentVersion", firstConsistentVersion.get()) - .detail("ApplyLag", lag.get()) - .detail("TaskInstance", THIS_ADDR); - - return format("Tag: %s UID: %s State: %s Blocks: %lld/%lld BlocksInProgress: %lld Files: %lld BytesWritten: " - "%lld CurrentVersion: %lld FirstConsistentVersion: %lld ApplyVersionLag: %lld LastError: %s", - tag.get().c_str(), - uid.toString().c_str(), - status.get().toString().c_str(), - fileBlocksFinished.get(), - fileBlockCount.get(), - fileBlocksDispatched.get() - fileBlocksFinished.get(), - fileCount.get(), - bytesWritten.get(), - currentVersion.get(), - firstConsistentVersion.get(), - lag.get(), - errstr.c_str()); + return ""; + // tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + // tr->setOption(FDBTransactionOptions::LOCK_AWARE); + + // state Future fileCount = restore.fileCount().getD(tr); + // state Future fileBlockCount = restore.fileBlockCount().getD(tr); + // state Future fileBlocksDispatched = restore.filesBlocksDispatched().getD(tr); + // state Future fileBlocksFinished = restore.fileBlocksFinished().getD(tr); + // state Future bytesWritten = restore.bytesWritten().getD(tr); + // state Future status = restore.stateText(tr); + // state Future currentVersion = restore.getCurrentVersion(tr); + // state Future lag = restore.getApplyVersionLag(tr); + // state Future firstConsistentVersion = restore.firstConsistentVersion().getD(tr); + // state Future tag = restore.tag().getD(tr); + // state Future> lastError = restore.lastError().getD(tr); + + // // restore might no longer be valid after the first wait so make sure it is not needed anymore. + // state UID uid = restore.getUid(); + // wait(success(fileCount) && success(fileBlockCount) && success(fileBlocksDispatched) && + // success(fileBlocksFinished) && success(bytesWritten) && success(status) && success(currentVersion) && + // success(lag) && success(firstConsistentVersion) && success(tag) && success(lastError)); + + // std::string errstr = "None"; + // if (lastError.get().second != 0) + // errstr = format("'%s' %" PRId64 "s ago.\n", + // lastError.get().first.c_str(), + // (tr->getReadVersion().get() - lastError.get().second) / CLIENT_KNOBS->CORE_VERSIONSPERSECOND); + + // TraceEvent("FileRestoreProgress") + // .detail("RestoreUID", uid) + // .detail("Tag", tag.get()) + // .detail("State", status.get().toString()) + // .detail("FileCount", fileCount.get()) + // .detail("FileBlocksFinished", fileBlocksFinished.get()) + // .detail("FileBlocksTotal", fileBlockCount.get()) + // .detail("FileBlocksInProgress", fileBlocksDispatched.get() - fileBlocksFinished.get()) + // .detail("BytesWritten", bytesWritten.get()) + // .detail("CurrentVersion", currentVersion.get()) + // .detail("FirstConsistentVersion", firstConsistentVersion.get()) + // .detail("ApplyLag", lag.get()) + // .detail("TaskInstance", THIS_ADDR); + + // return format("Tag: %s UID: %s State: %s Blocks: %lld/%lld BlocksInProgress: %lld Files: %lld BytesWritten: " + // "%lld CurrentVersion: %lld FirstConsistentVersion: %lld ApplyVersionLag: %lld LastError: %s", + // tag.get().c_str(), + // uid.toString().c_str(), + // status.get().toString().c_str(), + // fileBlocksFinished.get(), + // fileBlockCount.get(), + // fileBlocksDispatched.get() - fileBlocksFinished.get(), + // fileCount.get(), + // bytesWritten.get(), + // currentVersion.get(), + // firstConsistentVersion.get(), + // lag.get(), + // errstr.c_str()); } ACTOR Future RestoreConfig::getFullStatus_impl(RestoreConfig restore, @@ -488,6 +498,407 @@ ACTOR Future RestoreConfig::getFullStatus_impl(RestoreConfig restor return returnStr; } +// two buffers are alternatively serving data and reading data from file +// thus when one buffer is serving data through peek() +// the other buffer is reading data from file to provide pipelining. +class TwoBuffers : public ReferenceCounted, NonCopyable { +public: + class IteratorBuffer : public ReferenceCounted { + public: + // std::shared_ptr data; + char* data; + // has_value means there is data, otherwise it means there is no data being fetched or ready + // is_valid means data is being fetched, is_ready means data is ready + std::optional> fetchingData; + size_t size; + int capacity; + IteratorBuffer(int _capacity) { + // data = std::shared_ptr(new char[capacity]()); + data = new char[_capacity](); + fmt::print(stderr, "Allocating {}\n", _capacity); + fetchingData.reset(); + size = 0; + capacity = _capacity; + } + ~IteratorBuffer() { + // data = std::shared_ptr(new char[capacity]()); + delete[] data; + } + bool is_valid() { return fetchingData.has_value(); } + }; + TwoBuffers(int capacity, Reference _bc, std::vector& _files); + // ready need to be called first before calling peek + // because a shared_ptr cannot be wrapped by a Future + // this method ensures the current buffer has available data + Future ready(); + ACTOR static Future ready(Reference self); + // fill buffer[index] with the next block of file + // it has side effects to change currentFileIndex and currentFilePosition + ACTOR static Future readNextBlock(Reference self, int index); + // peek can only be called after ready is called + // it returns the pointer to the active buffer + // std::shared_ptr peek(); + char* peek(); + + bool hasNext(); + + // discard the current buffer and swap to the next one + void discardAndSwap(); + + // try to fill the buffer[index] + // but no-op if the buffer have valid data or it is actively being filled + void fillBufferIfAbsent(int index); + + size_t getBufferSize(); + +private: + Reference buffers[2]; // Two buffers for alternating + size_t bufferCapacity; // Size of each buffer in bytes + Reference bc; + std::vector files; + + int cur; // Index of the current active buffer (0 or 1) + size_t currentFileIndex; // Index of the current file being read + size_t currentFilePosition; // Current read position in the current file +}; + +TwoBuffers::TwoBuffers(int capacity, Reference _bc, std::vector& _files) + : currentFileIndex(0), currentFilePosition(0), cur(0) { + bufferCapacity = capacity; + files = _files; + bc = _bc; + buffers[0] = makeReference(capacity); + buffers[1] = makeReference(capacity); +} + +bool TwoBuffers::hasNext() { + if (buffers[0]->is_valid() && buffers[0]->size > 0 || buffers[1]->is_valid() && buffers[1]->size > 0) { + return true; + } + while (currentFileIndex < files.size() && files[currentFileIndex].fileSize == 0) { + // skip empty files + ++currentFileIndex; + } + return currentFileIndex != files.size(); + // fillBufferIfAbsent(cur); +} + +Future TwoBuffers::ready() { + return ready(Reference::addRef(this)); +} + +ACTOR Future TwoBuffers::ready(Reference self) { + // if cur is not ready, then wait + // fmt::print(stderr, "Ready:: hasNext={}\n", self->hasNext()); + if (!self->hasNext()) { + return Void(); + } + // try to fill the current buffer, and wait before it is filled + // fmt::print(stderr, "Ready:: beforeFillBuffer\n"); + self->fillBufferIfAbsent(self->cur); + // fmt::print(stderr, "Ready:: afterFillBuffer, index={}, has_value={}\n", self->cur, self->buffers[self->cur]->fetchingData.has_value()); + wait(self->buffers[self->cur]->fetchingData.value()); + // fmt::print(stderr, "Ready:: afterWaitForData\n"); + // try to fill the next buffer, do not wait for the filling + self->fillBufferIfAbsent(1 - self->cur); + return Void(); +} + +char* TwoBuffers::peek() { + return buffers[cur]->data; +} + +void TwoBuffers::discardAndSwap() { + // invalid cur and change cur to next + buffers[cur]->fetchingData.reset(); + cur = 1 - cur; +} + +size_t TwoBuffers::getBufferSize() { + return buffers[cur]->size; +} + +// only one readNextBlock can be run at a single time, otherwie the same block might be loaded twice +ACTOR Future TwoBuffers::readNextBlock(Reference self, int index) { + state Reference asyncFile; + if (self->currentFileIndex >= self->files.size()) { + self->buffers[index]->size = 0; + return Void(); + } + // fmt::print(stderr, + // "readNextBlock::beforeReadFile, name={}, size={}\n", + // self->files[self->currentFileIndex].fileName, + // self->files[self->currentFileIndex].fileSize); + Reference asyncFileTmp = wait(self->bc->readFile(self->files[self->currentFileIndex].fileName)); + asyncFile = asyncFileTmp; + state size_t fileSize = self->files[self->currentFileIndex].fileSize; + size_t remaining = fileSize - self->currentFilePosition; + state size_t bytesToRead = std::min(self->bufferCapacity, remaining); + // fmt::print(stderr, + // "readNextBlock::beforeActualRead, name={}, position={}, size={}, bytesToRead={}\n", + // self->files[self->currentFileIndex].fileName, + // self->currentFilePosition, + // fileSize, + // bytesToRead); + state int bytesRead = + wait(asyncFile->read(static_cast(self->buffers[index]->data), bytesToRead, self->currentFilePosition)); + // fmt::print(stderr, + // "readNextBlock::AfterActualRead, name={}, bytesRead={}\n", + // self->files[self->currentFileIndex].fileName, + // bytesRead); + // fmt::print(stderr, + // "readNextBlock::Index={}", self->currentFileIndex); + if (bytesRead != bytesToRead) + throw restore_bad_read(); + self->buffers[index]->size = bytesRead; // Set to actual bytes read + // self->bufferOffset[index] = 0; // Reset bufferOffset for the new data + self->currentFilePosition += bytesRead; + if (self->currentFilePosition >= fileSize) { + self->currentFileIndex++; + self->currentFilePosition = 0; + } + return Void(); +} + +void TwoBuffers::fillBufferIfAbsent(int index) { + auto self = Reference::addRef(this); + // fmt::print(stderr, "fillBufferIfAbsent::[{}] valid={}\n", index, self->buffers[index]->is_valid()); + + if (self->buffers[index]->is_valid()) { + // if this buffer is valid, then do not overwrite it + return; + } + // fmt::print(stderr, "fillBufferIfAbsent::beforeReadNextBlock\n"); + self->buffers[index]->fetchingData = readNextBlock(self, index); + return; +} + +class PartitionedLogIteratorTwoBuffers : public PartitionedLogIterator { +private: + Reference twobuffer; + + // consume single version data upto the end of the current batch + // stop if seeing a different version from the parameter. + // it has side effects to update bufferOffset after reading the data + Future>> consumeData(Version firstVersion); + ACTOR static Future>> consumeData( + Reference self, + Version v); + + // each block has a format of {
[mutations]}, need to skip the header to read mutations + // this method check if bufferOffset is at the boundary and advance it if necessary + void removeBlockHeader(); + +public: + // read up to a fixed number of block count + // noted that each version has to be contained within 2 blocks + const int BATCH_READ_BLOCK_COUNT = 1; + const int BLOCK_SIZE = CLIENT_KNOBS->BACKUP_LOGFILE_BLOCK_SIZE; + const int mutationHeaderBytes = sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t); + Reference bc; + int tag; + std::vector files; + bool hasMoreData; // Flag indicating if more data is available + size_t bufferOffset; // Current read offset + // empty means no data, future is valid but not ready means being fetched + // future is ready means it currently holds data + + PartitionedLogIteratorTwoBuffers(Reference _bc, + int _tag, + std::vector _files); + + // whether there are more contents for this tag in all files specified + bool hasNext() const; + + // find the next version without advanding the iterator + Future peekNextVersion(); + ACTOR static Future peekNextVersion(Reference iterator); + + // get all the mutations of next version and advance the iterator + // this might issue multiple consumeData() if the data of a version cross buffer boundary + Future>> getNext(); + ACTOR static Future>> getNext( + Reference iterator); +}; + +Future>> PartitionedLogIteratorTwoBuffers::consumeData(Version firstVersion) { + return consumeData(Reference::addRef(this), firstVersion); +} + +bool endOfBlock(char* start, int offset) { + unsigned char paddingChar = '\xff'; + return (unsigned char)*(start + offset) == paddingChar; +} + +ACTOR Future>> PartitionedLogIteratorTwoBuffers::consumeData( + Reference self, + Version firstVersion) { + state Standalone> mutations = Standalone>(); + wait(self->twobuffer->ready()); + // fmt::print(stderr, "ConsumeData version={}\n", firstVersion); + char* start = self->twobuffer->peek(); + int size = self->twobuffer->getBufferSize(); + bool foundNewVersion = false; + while (self->bufferOffset < size) { + while (self->bufferOffset < size && !endOfBlock(start, self->bufferOffset)) { + // for each block + self->removeBlockHeader(); + + Version version; + std::memcpy(&version, start + self->bufferOffset, sizeof(Version)); + version = bigEndian64(version); + if (version != firstVersion) { + foundNewVersion = true; + break; // Different version, stop here + } + + int32_t subsequence; + std::memcpy(&subsequence, start + self->bufferOffset + sizeof(Version), sizeof(int32_t)); + subsequence = bigEndian32(subsequence); + + int32_t mutationSize; + std::memcpy( + &mutationSize, start + self->bufferOffset + sizeof(Version) + sizeof(int32_t), sizeof(int32_t)); + mutationSize = bigEndian32(mutationSize); + + // assumption: the entire mutation is within the buffer + size_t mutationTotalSize = self->mutationHeaderBytes + mutationSize; + ASSERT(self->bufferOffset + mutationTotalSize <= size); + + // this is reported wrong + // fmt::print(stderr, "ConsumeData:: size={}\n", mutationSize); + + Standalone mutationData = makeString(mutationSize); + std::memcpy( + mutateString(mutationData), start + self->bufferOffset + self->mutationHeaderBytes, mutationSize); + // BinaryWriter bw(Unversioned()); + // // todo: transform from stringref to mutationref here + // bw.serializeBytes(mutationData); + ArenaReader reader(mutationData.arena(), mutationData, AssumeVersion(g_network->protocolVersion())); + MutationRef mutation; + reader >> mutation; + + VersionedMutation vm; + vm.version = version; + vm.subsequence = subsequence; + vm.mutation = mutation; + mutations.push_back_deep(mutations.arena(), vm); + // Move the bufferOffset to include this mutation + self->bufferOffset += mutationTotalSize; + // fmt::print(stderr, "ConsumeData NewOffset={}, size={}, end={}\n", self->bufferOffset, size, endOfBlock(start, self->bufferOffset)); + } + // need to see if this is printed + // fmt::print(stderr, "ConsumeData: Finish while loop NewOffset={}, size={}, end={}\n", self->bufferOffset, size, endOfBlock(start, self->bufferOffset)); + + if (self->bufferOffset < size && endOfBlock(start, self->bufferOffset)) { + // there are paddings + int remain = self->BLOCK_SIZE - (self->bufferOffset % self->BLOCK_SIZE); + self->bufferOffset += remain; + // fmt::print(stderr, "SkipPadding newOffset={}\n", self->bufferOffset); + } + if (foundNewVersion) { + break; + } + } + + return mutations; +} + +void PartitionedLogIteratorTwoBuffers::removeBlockHeader() { + // wait(logFile->append((uint8_t*)&PARTITIONED_MLOG_VERSION, sizeof(PARTITIONED_MLOG_VERSION))); + if (bufferOffset % BLOCK_SIZE == 0) { + bufferOffset += sizeof(uint32_t); + } +} + +PartitionedLogIteratorTwoBuffers::PartitionedLogIteratorTwoBuffers(Reference _bc, + int _tag, + std::vector _files) + : bc(_bc), tag(_tag), files(std::move(_files)), bufferOffset(0) { + int bufferCapacity = BATCH_READ_BLOCK_COUNT * BLOCK_SIZE; + twobuffer = makeReference(bufferCapacity, _bc, files); +} + +bool PartitionedLogIteratorTwoBuffers::hasNext() const { + // if there are no more data, return false, else return true + // if currentFileIndex is not the end, then there are more data + // if it is in the process of loading the last block, fileIndex=files.size() - 1 + // because bufferDataSize and buffer are set before adding fileIndex + // if currentFileIndex >= files.size(), then bufferDataSize must has been set + // + // fmt::print(stderr, "hasNext tag={}, hasNext={}\n", tag, twobuffer->hasNext()); + return twobuffer->hasNext(); +} + +Future PartitionedLogIteratorTwoBuffers::peekNextVersion() { + return peekNextVersion(Reference::addRef(this)); +} +ACTOR Future PartitionedLogIteratorTwoBuffers::peekNextVersion( + Reference self) { + // Read the first mutation's version + if (!self->hasNext()) { + return Version(0); + } + // fmt::print(stderr, "peekNextVersion::BeforeReady, tag={} \n", self->tag); + wait(self->twobuffer->ready()); + // fmt::print(stderr, "peekNextVersion::AfterReady, tag={} \n", self->tag); + char* start = self->twobuffer->peek(); + // fmt::print(stderr, + // "peekNextVersion::afterPeek, tag={} , startNull={}, offset={}, bufferSize={}\n", + // self->tag, + // start == nullptr, + // self->bufferOffset, + // self->twobuffer->getBufferSize()); + self->removeBlockHeader(); + // fmt::print(stderr, "peekNextVersion::afterRemoveBlockHeader, tag={}, offset={} \n", self->tag, self->bufferOffset); + Version version; + std::memcpy(&version, start + self->bufferOffset, sizeof(Version)); + version = bigEndian64(version); + // now i have peekNextVersion::afterMemcpy, tag=0, version=-1 + // seeing version = -1, means there are 8 0xff + // fmt::print(stderr, "peekNextVersion::afterMemcpy, tag={}, version={}\n", self->tag, version); + return version; +} + +ACTOR Future>> PartitionedLogIteratorTwoBuffers::getNext( + Reference self) { + state Standalone> mutations; + if (!self->hasNext()) { + TraceEvent(SevWarn, "IteratorExhausted").log(); + return mutations; + } + state Version firstVersion = wait(self->peekNextVersion()); + Standalone> firstBatch = wait(self->consumeData(firstVersion)); + mutations = firstBatch; + // If the current buffer is fully consumed, then we need to check the next buffer in case + // the version is sliced across this buffer boundary + // fmt::print(stderr, "GetNextBeforeWhile\n"); + + while (self->bufferOffset >= self->twobuffer->getBufferSize()) { + // fmt::print(stderr, "getNext: offset={}, size={}\n", self->bufferOffset, self->twobuffer->getBufferSize()); + self->twobuffer->discardAndSwap(); + self->bufferOffset = 0; + // data for one version cannot exceed single buffer size + // if hitting the end of a batch, check the next batch in case version is + if (self->twobuffer->hasNext()) { + // now this is run for each block, but it is not necessary if it is the last block of a file + // cannot check hasMoreData here because other buffer might have the last piece + Standalone> batch = wait(self->consumeData(firstVersion)); + for (const VersionedMutation& vm : batch) { + mutations.push_back_deep(mutations.arena(), vm); + } + } else { + break; + } + } + return mutations; +} + +Future>> PartitionedLogIteratorTwoBuffers::getNext() { + // fmt::print(stderr, "getNext, k={}, offset={}\n", tag, bufferOffset); + return getNext(Reference::addRef(this)); +} + FileBackupAgent::FileBackupAgent() : subspace(Subspace(fileBackupPrefixRange.begin)) // The other subspaces have logUID -> value @@ -1155,6 +1566,7 @@ ACTOR static Future decodeKVPairs(StringRefReader* reader, // If eof reached or first value len byte is 0xFF then a valid block end was reached. if (reader->eof() || *reader->rptr == 0xFF) { + // hfu5: last key is not included results->push_back(results->arena(), KeyValueRef(KeyRef(k, kLen), ValueRef())); break; } @@ -1368,6 +1780,8 @@ struct LogFileWriter { int64_t blockEnd; }; +// input: a string of [param1, param2], [param1, param2] ..., [param1, param2] +// output: a vector of [param1, param2] after removing the length info Standalone> decodeMutationLogFileBlock(const Standalone& buf) { Standalone> results({}, buf.arena()); StringRefReader reader(buf, restore_corrupted_data()); @@ -1640,6 +2054,7 @@ ACTOR static Future addBackupTask(StringRef name, state Reference task(new Task(name, version, doneKey, priority)); // Bind backup config to new task + // allow this new task to find the config(keyspace) of the parent task wait(config.toTask(tr, task, setValidation)); // Set task specific params @@ -3058,6 +3473,7 @@ struct BackupLogsDispatchTask : BackupTaskFuncBase { if (!partitionedLog.present() || !partitionedLog.get()) { // Add the initial log range task to read/copy the mutations and the next logs dispatch task which will // run after this batch is done + // read blog/ prefix and write those (param1, param2) into files wait(success(BackupLogRangeTaskFunc::addTask(tr, taskBucket, task, @@ -3065,6 +3481,7 @@ struct BackupLogsDispatchTask : BackupTaskFuncBase { beginVersion, endVersion, TaskCompletionKey::joinWith(logDispatchBatchFuture)))); + // issue the next key range wait(success(BackupLogsDispatchTask::addTask(tr, taskBucket, task, @@ -3771,6 +4188,7 @@ struct RestoreRangeTaskFunc : RestoreFileTaskFuncBase { state Reference inFile = wait(bc.get()->readFile(rangeFile.fileName)); state Standalone> blockData; try { + // data is each real KV, not encoded mutations Standalone> data = wait(decodeRangeFileBlock(inFile, readOffset, readLen, cx)); blockData = data; } catch (Error& e) { @@ -3858,6 +4276,8 @@ struct RestoreRangeTaskFunc : RestoreFileTaskFuncBase { // Clear the range we are about to set. // If start == 0 then use fileBegin for the start of the range, else data[start] // If iend == end then use fileEnd for the end of the range, else data[iend] + // it seems we are clear the raw key, without alog prefix, right? + // [80, 120], [100] state KeyRange trRange = KeyRangeRef( (start == 0) ? fileRange.begin : data[start].key.removePrefix(removePrefix.get()).withPrefix(addPrefix.get()), @@ -3945,8 +4365,10 @@ struct RestoreRangeTaskFunc : RestoreFileTaskFuncBase { // Update the KV range map if originalFileRange is set std::vector> updateMap; std::vector ranges = Params.getOriginalFileRanges(task); + // if want to restore((a, b), (e, f), (x, y)), then there are 3 ranges for (auto& range : ranges) { Value versionEncoded = BinaryWriter::toValue(Params.inputFile().get(task).version, Unversioned()); + // hfu5 : find how it is synced updateMap.push_back(krmSetRange(tr, restore.applyMutationsMapPrefix(), range, versionEncoded)); } @@ -3964,6 +4386,7 @@ struct RestoreRangeTaskFunc : RestoreFileTaskFuncBase { int64_t len, TaskCompletionKey completionKey, Reference waitFor = Reference()) { + // Key doneKey = wait(completionKey.get(tr, taskBucket)); state Reference task(new Task(RestoreRangeTaskFunc::name, RestoreRangeTaskFunc::version, doneKey)); @@ -4025,6 +4448,7 @@ std::pair decodeMutationLogKey(const StringRef& key) { // [includeVersion:uint64_t][val_length:uint32_t][mutation_1][mutation_2]...[mutation_k], // where a mutation is encoded as: // [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][param1][param2] +// noted version needs to be included here std::vector decodeMutationLogValue(const StringRef& value) { StringRefReader reader(value, restore_corrupted_data()); @@ -4061,6 +4485,7 @@ std::vector decodeMutationLogValue(const StringRef& value) { } void AccumulatedMutations::addChunk(int chunkNumber, const KeyValueRef& kv) { + // hfu5[important] : here it validates that partition(chunk) number has to be continuous if (chunkNumber == lastChunkNumber + 1) { lastChunkNumber = chunkNumber; serializedMutations += kv.value.toString(); @@ -4091,6 +4516,7 @@ bool AccumulatedMutations::isComplete() const { // range in ranges. // It is undefined behavior to run this if isComplete() does not return true. bool AccumulatedMutations::matchesAnyRange(const RangeMapFilters& filters) const { + // decode param2, so that each actual mutations are in mutations variable std::vector mutations = decodeMutationLogValue(serializedMutations); for (auto& m : mutations) { if (m.type == MutationRef::Encrypted) { @@ -4143,17 +4569,22 @@ bool RangeMapFilters::match(const KeyRangeRef& range) const { std::vector filterLogMutationKVPairs(VectorRef data, const RangeMapFilters& filters) { std::unordered_map mutationBlocksByVersion; + // first group mutations by version for (auto& kv : data) { + // each kv is a [param1, param2] auto versionAndChunkNumber = decodeMutationLogKey(kv.key); mutationBlocksByVersion[versionAndChunkNumber.first].addChunk(versionAndChunkNumber.second, kv); } std::vector output; + // then add each version to the output, and now each K in output is also a KeyValueRef, + // but mutations of the same versions stay together for (auto& vb : mutationBlocksByVersion) { AccumulatedMutations& m = vb.second; // If the mutations are incomplete or match one of the ranges, include in results. + // hfu5: incomplete, why? if (!m.isComplete() || m.matchesAnyRange(filters)) { output.insert(output.end(), m.kvs.begin(), m.kvs.end()); } @@ -4192,7 +4623,7 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase { state Reference tr(new ReadYourWritesTransaction(cx)); state Reference bc; - state std::vector ranges; + state std::vector ranges; // this is the actual KV, not version loop { try { @@ -4231,9 +4662,10 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase { tr->reset(); loop { try { - if (start == end) + if (start == end) { + fmt::print(stderr, "Old Task Finish Log\n"); return Void(); - + } tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); @@ -4241,7 +4673,7 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase { state int txBytes = 0; for (; i < end && txBytes < dataSizeLimit; ++i) { Key k = dataFiltered[i].key.withPrefix(mutationLogPrefix); - ValueRef v = dataFiltered[i].value; + ValueRef v = dataFiltered[i].value; // each KV is a [param1 with added prefix -> param2] tr->set(k, v); txBytes += k.expectedSize(); txBytes += v.expectedSize(); @@ -4313,6 +4745,7 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase { state Reference task(new Task(RestoreLogDataTaskFunc::name, RestoreLogDataTaskFunc::version, doneKey)); // Create a restore config from the current task and bind it to the new task. + // RestoreConfig(parentTask) createsa prefix of : fileRestorePrefixRange.begin/uid->config/[uid] wait(RestoreConfig(parentTask).toTask(tr, task)); Params.inputFile().set(task, lf); Params.readOffset().set(task, offset); @@ -4342,6 +4775,621 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase { StringRef RestoreLogDataTaskFunc::name = "restore_log_data"_sr; REGISTER_TASKFUNC(RestoreLogDataTaskFunc); +// this method takes a version and a list of list of mutations of this verison, +// each list is returned from a iterator sorted by sub +// it will first add all mutations in subsequence order +// then combine them in old-format (param1, parma2) and return +// this method assumes that iterator can return a list of mutations +/* + mutations are serialized in file as below format: + `` + `` + `` + `…` + ` + + for now, assume each iterator returns a vector > + noted that the mutation's arena has to be valid during the execution + + according to BackupWorker::addMutation, version has 64, sub has 32 and mutation length has 32 + So iterator will combine all mutations in the same version and return a vector + iterator should also return the subsequence together with each mutation + as here we will do another mergeSort for subsequence again to decide the order + and here we will decode the stringref + + + Version currentVersion; + uint32_t sub; + uint32_t mutationSize; + BinaryReader rd(str, Unversioned()); + rd >> currentVersion >> sub >> mutationSize; +*/ + +// type|kLen|vLen|Key|Value +// similar to addBackupMutations( +// MutationList::push_back_deep +Standalone transformMutationToOldFormat(MutationRef m) { + // i need to customize the encoding here according to + /* + // hfu5: format should be type|kLen|vLen|Key|Value + memcpy(&type, value.begin() + offset, sizeof(uint32_t)); + offset += sizeof(uint32_t); + state uint32_t len1 = 0; + memcpy(&len1, value.begin() + offset, sizeof(uint32_t)); + offset += sizeof(uint32_t); + state uint32_t len2 = 0; + memcpy(&len2, value.begin() + offset, sizeof(uint32_t)); + offset += sizeof(uint32_t); + + // mutationref is constructed here + state MutationRef logValue; + state Arena tempArena; + logValue.type = type; + logValue.param1 = value.substr(offset, len1); + offset += len1; + logValue.param2 = value.substr(offset, len2); + offset += len2; + */ + BinaryWriter bw(Unversioned()); + uint32_t len1, len2, type; + type = m.type; + len1 = m.param1.size(); + len2 = m.param2.size(); + bw << type; + bw << len1; + bw << len2; + bw.serializeBytes(m.param1); // << is overloaded for stringref to write its size first, so + bw.serializeBytes(m.param2); + // next step to see if there are additional bytes added by binary writer + // fmt::print(stderr, "generate old format transaction, type={}, len1={}, len2={}, total={}\n", type, len1, len2, bw.toValue().size()); + return bw.toValue(); +} + +Standalone> generateOldFormatMutations( + Version commitVersion, + std::vector>>& newFormatMutations) { + // fmt::print(stderr, "StartTransform, version={}, mutationListSize={}\n", commitVersion, newFormatMutations.size()); + Standalone> results; + std::vector>> oldFormatMutations; + // mergeSort subversion here + // just do a global sort for everyone + int64_t totalBytes = 0; + std::map>> mutationsBySub; + int i = 0; + for (auto& vec : newFormatMutations) { + // fmt::print(stderr, "Transform mutationList[{}], size={}\n", i, vec.size()); + int j = 0; + for (auto& p : vec) { + uint32_t sub = p.subsequence; + // fmt::print(stderr, "Transform inner mutationList[{}], mutation[{}], subsequence={}\n", i, j, sub); + // fmt::print(stderr, "before transform each mutation\n"); + // fmt::print(stderr, "Transform each mutation, mutation={}\n", p.mutation.toString()); + // transform the mutation format and add to each subversion + // where is mutation written in new format + Standalone mutationOldFormat = transformMutationToOldFormat(p.mutation); + mutationsBySub[sub].push_back(mutationOldFormat); + totalBytes += mutationOldFormat.size(); + ++j; + } + ++i; + } + // the list of param2 needs to have the first 64 bites as 0x0FDB00A200090001 + BinaryWriter param2Writer(IncludeVersion(ProtocolVersion::withBackupMutations())); + param2Writer << totalBytes; + + for (auto& mutationsForSub : mutationsBySub) { + // concatenate them to param2Str + for (auto& m : mutationsForSub.second) { + // refer to transformMutationToOldFormat + // binary writer adds additional 8 bytes at the beginning for version, need to remove it + // because it is concatenated here and we will use memcpy to process this long string + // instead of binary reader + // fmt::print(stderr, "Combine param2, currentSize={}, eachSize={}\n", param2Writer.toValue().size(), m.size()); + param2Writer.serializeBytes(m); + } + } + Key param2Concat = param2Writer.toValue(); + // fmt::print(stderr, "param2Concat size={}\n", param2Concat.size()); + + // deal with param1 + int32_t hashBase = commitVersion / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE; + + BinaryWriter wrParam1(Unversioned()); // hash/commitVersion/part + wrParam1 << (uint8_t)hashlittle(&hashBase, sizeof(hashBase), 0); + wrParam1 << bigEndian64(commitVersion); + uint32_t* partBuffer = nullptr; + + // just generate a list of (param1, param2) + // are they mutations or are they key value + // param2 has format: length_of_the_mutation_group | encoded_mutation_1 | … | encoded_mutation_k + // each mutation has format type|kLen|vLen|Key|Value + for (int part = 0; part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE < param2Concat.size(); part++) { + KeyValueRef backupKV; + // Assign the second parameter as the part + backupKV.value = param2Concat.substr(part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE, + std::min(param2Concat.size() - part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE, + CLIENT_KNOBS->MUTATION_BLOCK_SIZE)); + // Write the last part of the mutation to the serialization, if the buffer is not defined + if (!partBuffer) { + // part = 0 + wrParam1 << bigEndian32(part); + // Define the last buffer part + partBuffer = (uint32_t*)((char*)wrParam1.getData() + wrParam1.getLength() - sizeof(uint32_t)); + } else { + // part > 0 + *partBuffer = bigEndian32(part); + } + backupKV.key = wrParam1.toValue(); + results.push_back_deep(results.arena(), backupKV); + // fmt::print(stderr, "Pushed mutation, length={}, blockSize={}\n", wrParam1.getLength(), CLIENT_KNOBS->MUTATION_BLOCK_SIZE); + } + return results; +} + +struct RestoreLogDataPartitionedTaskFunc : RestoreFileTaskFuncBase { + static StringRef name; + static constexpr uint32_t version = 1; + StringRef getName() const override { return name; }; + + static struct { + static TaskParam maxTagID() { return __FUNCTION__sr; } + static TaskParam beginVersion() { return __FUNCTION__sr; } + static TaskParam endVersion() { return __FUNCTION__sr; } + static TaskParam> logs() { return __FUNCTION__sr; } + } Params; + + ACTOR static Future _execute(Database cx, + Reference taskBucket, + Reference futureBucket, + Reference task) { + state RestoreConfig restore(task); + + state int64_t maxTagID = Params.maxTagID().get(task); + state std::vector logs = Params.logs().get(task); + state Version begin = Params.beginVersion().get(task); + state Version end = Params.endVersion().get(task); + + state Reference tr(new ReadYourWritesTransaction(cx)); + state Reference bc; + state std::vector ranges; // this is the actual KV, not version + + loop { + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + + Reference _bc = wait(restore.sourceContainer().getOrThrow(tr)); + bc = getBackupContainerWithProxy(_bc); + + wait(store(ranges, restore.getRestoreRangesOrDefault(tr))); + + wait(checkTaskVersion(tr->getDatabase(), task, name, version)); + wait(taskBucket->keepRunning(tr, task)); + + break; + } catch (Error& e) { + wait(tr->onError(e)); + } + } + + std::vector> filesByTag(maxTagID + 1); + for (RestoreConfig::RestoreFile& f : logs) { + // find the tag, aggregate files by tags + // fmt::print(stderr, "LogFile name={}, tag={}, size={}\n", f.fileName, f.tagId, f.fileSize); + if (f.tagId == -1) { + // inconsistent data + TraceEvent(SevError, "PartitionedLogFileNoTag") + .detail("FileName", f.fileName) + .detail("FileSize", f.fileSize) + .log(); + } else { + filesByTag[f.tagId].push_back(f); + } + } + + state std::vector> iterators(maxTagID + 1); + // for each tag, create an iterator + for (int k = 0; k < filesByTag.size(); k++) { + iterators[k] = makeReference(bc, k, filesByTag[k]); + } + + // mergeSort all iterator until all are exhausted + state int totalItereators = iterators.size(); + // it stores all mutations for the next min version, in new format + state std::vector>> mutationsSingleVersion; + state bool atLeastOneIteratorHasNext = true; + state int64_t minVersion; + state int k; + state int versionRestored = 0; + + fmt::print(stderr, "FlowguruLoopBefore\n"); + // TODO: set this to false + // now it stuck here + while (atLeastOneIteratorHasNext) { + // fmt::print(stderr, "FlowguruLoopStart totalItereators={}\n", atLeastOneIteratorHasNext, totalItereators); + atLeastOneIteratorHasNext = false; + minVersion = std::numeric_limits::max(); + k = 0; + for (;k < totalItereators; k++) { + // fmt::print(stderr, "FlowguruLoopNotHaveNext k={}, hasNext={}\n", k, iterators[k]->hasNext()); + if (!iterators[k]->hasNext()) { + TraceEvent("FlowguruLoopNotHaveNext").detail("K", k).log(); + continue; + } + // TODO: maybe embed filtering key into iterator, + // as a result, backup agent should not worry about key range filtering + atLeastOneIteratorHasNext = true; + // fmt::print(stderr, "FlowguruLoop1 k={}\n", k); + Version v = wait(iterators[k]->peekNextVersion()); + TraceEvent("FlowguruLoop2").detail("Version", v).log(); + // fmt::print(stderr, "FlowguruLoop2 k={}, v={}, minVersion={}\n", k, v, minVersion); + if (v < minVersion) { + minVersion = v; + mutationsSingleVersion.clear(); + Standalone> tmp = wait(iterators[k]->getNext()); + mutationsSingleVersion.push_back(tmp); + } else if (v == minVersion) { + Standalone> tmp = wait(iterators[k]->getNext()); + mutationsSingleVersion.push_back(tmp); + } + } + + // fmt::print(stderr, "after iteration k={}, atLeastOneIteratorHasNext={}\n", k, atLeastOneIteratorHasNext); + if (atLeastOneIteratorHasNext) { + // transform from new format to old format(param1, param2) + // in the current implementation, each version will trigger a mutation + // if each version data is too small, we might want to combine multiple versions + // for a single mutation + // fmt::print(stderr, "StartTransform\n"); + state Standalone> oldFormatMutations = + generateOldFormatMutations(minVersion, mutationsSingleVersion); + // fmt::print(stderr, "FinishTransform, size={}\n", oldFormatMutations.size()); + state int mutationIndex = 0; + state int txnCount = 0; + state int txBytes = 0; + state int totalMutation = oldFormatMutations.size(); + state int txBytesLimit = CLIENT_KNOBS->RESTORE_WRITE_TX_SIZE; + state Key mutationLogPrefix = restore.mutationLogPrefix(); + + // this method should be executed exactly once, the transaction parameter indicates this + // however, i need to KV into alog prefix, I need multiple transaction for this + // the good part is taht it is idempotent + // but i guess i still hve to extract it out to a execute method of another taskfunc + loop { + try { + // fmt::print(stderr, "Commit:, mutationIndex={}, total={}\n", mutationIndex, totalMutation); + if (mutationIndex == totalMutation) { + break; + } + txBytes = 0; + tr->reset(); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + + while (mutationIndex + txnCount < totalMutation && txBytes < txBytesLimit) { + Key k = oldFormatMutations[mutationIndex + txnCount].key.withPrefix(mutationLogPrefix); + ValueRef v = oldFormatMutations[mutationIndex + txnCount] + .value; // each KV is a [param1 with added prefix -> param2] + tr->set(k, v); + txBytes += k.expectedSize(); + txBytes += v.expectedSize(); + ++txnCount; + } + wait(tr->commit()); + mutationIndex += txnCount; // update mutationIndex after commit + txnCount = 0; + } catch (Error& e) { + fmt::print(stderr, "CommitError={}, mutationIndex={}, total={}\n", e.code(), mutationIndex, totalMutation); + if (e.code() == error_code_transaction_too_large) { + txBytesLimit /= 2; + } else { + wait(tr->onError(e)); + } + } + } + ++versionRestored; + } + // fmt::print(stderr, "VeryEndOfLoop:, versionRestored={}, atLeastOneIteratorHasNext={}\n", versionRestored, atLeastOneIteratorHasNext); + mutationsSingleVersion.clear(); + } + fmt::print(stderr, "QuitLoop: begin={}, end={}, versionRestored={}\n", begin, end, versionRestored); + return Void(); + } + + ACTOR static Future _finish(Reference tr, + Reference taskBucket, + Reference futureBucket, + Reference task) { + RestoreConfig(task).fileBlocksFinished().atomicOp(tr, 1, MutationRef::Type::AddValue); + + state Reference taskFuture = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]); + + // TODO: Check to see if there is a leak in the FutureBucket since an invalid task (validation key fails) + // will never set its taskFuture. + wait(taskFuture->set(tr, taskBucket) && taskBucket->finish(tr, task)); + + return Void(); + } + + ACTOR static Future addTask(Reference tr, + Reference taskBucket, + Reference parentTask, + int64_t maxTagID, + std::vector logs, + Version begin, + Version end, + TaskCompletionKey completionKey, + Reference waitFor = Reference()) { + Key doneKey = wait(completionKey.get(tr, taskBucket)); + state Reference task(new Task(RestoreLogDataPartitionedTaskFunc::name, RestoreLogDataPartitionedTaskFunc::version, doneKey)); + + // Create a restore config from the current task and bind it to the new task. + // RestoreConfig(parentTask) createsa prefix of : fileRestorePrefixRange.begin/uid->config/[uid] + wait(RestoreConfig(parentTask).toTask(tr, task)); + Params.maxTagID().set(task, maxTagID); + Params.beginVersion().set(task, begin); + Params.endVersion().set(task, end); + Params.logs().set(task, logs); + + if (!waitFor) { + return taskBucket->addTask(tr, task); + } + + wait(waitFor->onSetAddTask(tr, taskBucket, task)); + return "OnSetAddTask"_sr; + } + + Future execute(Database cx, + Reference tb, + Reference fb, + Reference task) override { + return _execute(cx, tb, fb, task); + }; + Future finish(Reference tr, + Reference tb, + Reference fb, + Reference task) override { + return _finish(tr, tb, fb, task); + }; +}; +StringRef RestoreLogDataPartitionedTaskFunc::name = "restore_log_data_partitioned"_sr; +REGISTER_TASKFUNC(RestoreLogDataPartitionedTaskFunc); + +// each task can be partitioned to smaller ranges because commit proxy would +// only start to commit alog/ prefix mutations to original prefix when +// the final version is set, but do it in a single task for now for simplicity +struct RestoreDispatchPartitionedTaskFunc : RestoreTaskFuncBase { + static StringRef name; + static constexpr uint32_t version = 1; + StringRef getName() const override { return name; }; + + static struct { + static TaskParam beginVersion() { return __FUNCTION__sr; } + static TaskParam endVersion() { return __FUNCTION__sr; } + } Params; + + ACTOR static Future _finish(Reference tr, + Reference taskBucket, + Reference futureBucket, + Reference task) { + state RestoreConfig restore(task); + + state Version beginVersion = Params.beginVersion().get(task); + state Version endVersion = Params.endVersion().get(task); + Reference _bc = wait(restore.sourceContainer().getOrThrow(tr)); + state Reference bc = getBackupContainerWithProxy(_bc); + state Reference onDone = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]); + + state Version restoreVersion; + + wait(store(restoreVersion, restore.restoreVersion().getOrThrow(tr)) && + checkTaskVersion(tr->getDatabase(), task, name, version)); + + state int nextEndVersion = + std::min(restoreVersion, endVersion + CLIENT_KNOBS->RESTORE_PARTITIONED_BATCH_VERSION_SIZE); + fmt::print(stderr, "Very begin Begin={}, End={}, nextEnd={}, restoreVersion={}\n", beginVersion, endVersion, nextEndVersion, restoreVersion); + // update the apply mutations end version so the mutations from the + // previous batch can be applied. + // Only do this once beginVersion is > 0 (it will be 0 for the initial dispatch). + if (beginVersion > 0) { + // hfu5 : unblock apply alog to normal key space + // if the last file is [80, 100] and the restoreVersion is 90, we should use 90 here + // this call an additional call after last file + restore.setApplyEndVersion(tr, std::min(beginVersion, restoreVersion + 1)); + } + + // The applyLag must be retrieved AFTER potentially updating the apply end version. + state int64_t applyLag = wait(restore.getApplyVersionLag(tr)); + + fmt::print(stderr, "at begin, ApplyLag={}\n", applyLag); + // this is to guarantee commit proxy is catching up doing apply alog -> normal key + // with this backupFile -> alog process + // If starting a new batch and the apply lag is too large then re-queue and wait + if (applyLag > (BUGGIFY ? 1 : CLIENT_KNOBS->CORE_VERSIONSPERSECOND * 300)) { + // Wait a small amount of time and then re-add this same task. + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); + wait(success(RestoreDispatchPartitionedTaskFunc::addTask(tr, taskBucket, task, beginVersion, endVersion))); + + TraceEvent("RestorePartitionDispatch") + .detail("RestoreUID", restore.getUid()) + .detail("BeginVersion", beginVersion) + .detail("ApplyLag", applyLag) + .detail("Decision", "too_far_behind") + .detail("TaskInstance", THIS_ADDR); + + wait(taskBucket->finish(tr, task)); + return Void(); + } + + // Get a batch of files. We're targeting batchSize blocks(30k) being dispatched so query for batchSize(150) + // files (each of which is 0 or more blocks). + int fileLimit = 1000; + state RestoreConfig::FileSetT::RangeResultType files = + wait(restore.fileSet().getRange(tr, + Optional({ beginVersion, "" }), + Optional({ endVersion, "" }), + fileLimit)); + + state int64_t maxTagID = 0; + state std::vector logs; + state std::vector ranges; + for (auto f : files.results) { + if (f.isRange) { + ranges.push_back(f); + } else { + logs.push_back(f); + maxTagID = std::max(maxTagID, f.tagId); + } + } + // allPartsDone will be set once all block tasks in the current batch are finished. + // create a new future for the new batch + state Reference allPartsDone = futureBucket->future(tr); + restore.batchFuture().set(tr, allPartsDone->pack()); + + // if there are no files, if i am not the last batch, then on to the next batch + // if there are no files and i am the last batch, then just wait for applying to finish + // do we need this files.results.size() == 0 at all? + // if (files.results.size() == 0 && beginVersion >= restoreVersion) { + // fmt::print(stderr, "CheckBegin and restore, begin={}, restore={}, applyLag={}\n", beginVersion, restoreVersion, applyLag); + if (beginVersion >= restoreVersion) { + fmt::print(stderr, "Reaching end, beginVersion={}, restoreVersion={}, ApplyLag={}\n", beginVersion, restoreVersion, applyLag); + if (applyLag == 0) { + // i am the last batch + // If apply lag is 0 then we are done so create the completion task + wait(success(RestoreCompleteTaskFunc::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal()))); + + TraceEvent("RestorePartitionDispatch") + .detail("RestoreUID", restore.getUid()) + .detail("BeginVersion", beginVersion) + .detail("ApplyLag", applyLag) + .detail("Decision", "restore_complete") + .detail("TaskInstance", THIS_ADDR); + } else { + // i am the last batch, and applyLag is not zero, then I will create another dummy task to wait + // for apply log to be zero, then it will go into the branch above. + // Applying of mutations is not yet finished so wait a small amount of time and then re-add this + // same task. + // this is only to create a dummy one wait for it to finish + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); + wait(success( + RestoreDispatchPartitionedTaskFunc::addTask(tr, taskBucket, task, beginVersion, endVersion))); + + TraceEvent("RestorePartitionDispatch") + .detail("RestoreUID", restore.getUid()) + .detail("BeginVersion", beginVersion) + .detail("ApplyLag", applyLag) + .detail("Decision", "apply_still_behind") + .detail("TaskInstance", THIS_ADDR); + } + wait(taskBucket->finish(tr, task)); + return Void(); + } + + // if we reach here, this batch is not empty(i.e. we have range and/or mutation files in this) + + // Start moving through the file list and queuing up blocks. Only queue up to RESTORE_DISPATCH_ADDTASK_SIZE + // blocks per Dispatch task and target batchSize total per batch but a batch must end on a complete version + // boundary so exceed the limit if necessary to reach the end of a version of files. + state std::vector> addTaskFutures; + state int i = 0; + // need to process all range files, because RestoreRangeTaskFunc takes a block offset, keep using ti here. + // this can be done first, because they are not overlap within a restore uid + // each task will read the file, restore those key to their original keys after clear that range + // also it will update the keyVersionMap[key -> versionFromRangeFile] + // by this time, corresponding mutation files within the same version range has not been applied yet + // because they are waiting for the singal of this RestoreDispatchPartitionedTaskFunc + // when log are being applied, they will compare version of key to the keyVersionMap updated by range file + // after each RestoreDispatchPartitionedTaskFunc, keyVersionMap will be clear if mutation version is larger. + for (; i < ranges.size(); ++i) { + RestoreConfig::RestoreFile& f = ranges[i]; + // For each block of the file + for (int64_t j = 0; j < f.fileSize; j += f.blockSize) { + addTaskFutures.push_back(RestoreRangeTaskFunc::addTask(tr, + taskBucket, + task, + f, + j, + std::min(f.blockSize, f.fileSize - j), + TaskCompletionKey::joinWith(allPartsDone))); + } + } + bool is_set = wait(allPartsDone->isSet(tr)); + fmt::print(stderr, "Before add new task begin={}, end={}, nextEnd={}, isSet={} \n", beginVersion, endVersion, nextEndVersion, is_set); + // aggregate logs by tag id + addTaskFutures.push_back(RestoreLogDataPartitionedTaskFunc::addTask(tr, + taskBucket, + task, + maxTagID, + logs, + beginVersion, + endVersion, + TaskCompletionKey::joinWith(allPartsDone))); + // even if file exsists, but they are empty, in this case just start the next batch + fmt::print(stderr, "After add new task begin={}, end={}, nextEnd={} \n", beginVersion, endVersion, nextEndVersion); + + addTaskFutures.push_back(RestoreDispatchPartitionedTaskFunc::addTask( + tr, taskBucket, task, endVersion, nextEndVersion, TaskCompletionKey::noSignal(), allPartsDone)); + + wait(waitForAll(addTaskFutures)); + // fmt::print(stderr, "before wait finish begin={}, end={}, nextEnd={} \n", beginVersion, endVersion, nextEndVersion); + wait(taskBucket->finish(tr, task)); + // fmt::print(stderr, "Add parent task begin={}, end={}, nextEnd={}, should happen only after children are done \n", beginVersion, endVersion, nextEndVersion); + + + TraceEvent("RestorePartitionDispatch") + .detail("RestoreUID", restore.getUid()) + .detail("BeginVersion", beginVersion) + .detail("EndVersion", endVersion) + .detail("ApplyLag", applyLag) + .detail("Decision", "dispatch_batch_complete") + .detail("TaskInstance", THIS_ADDR) + .log(); + + return Void(); + } + + ACTOR static Future addTask(Reference tr, + Reference taskBucket, + Reference parentTask, + Version beginVersion, + Version endVersion, + TaskCompletionKey completionKey = TaskCompletionKey::noSignal(), + Reference waitFor = Reference()) { + Key doneKey = wait(completionKey.get(tr, taskBucket)); + + // Use high priority for dispatch tasks that have to queue more blocks for the current batch + unsigned int priority = 0; + state Reference task(new Task( + RestoreDispatchPartitionedTaskFunc::name, RestoreDispatchPartitionedTaskFunc::version, doneKey, priority)); + + // Create a config from the parent task and bind it to the new task + wait(RestoreConfig(parentTask).toTask(tr, task)); + Params.beginVersion().set(task, beginVersion); + Params.endVersion().set(task, endVersion); + + if (!waitFor) { + return taskBucket->addTask(tr, task); + } + + wait(waitFor->onSetAddTask(tr, taskBucket, task)); + return "OnSetAddTask"_sr; + } + + Future execute(Database cx, + Reference tb, + Reference fb, + Reference task) override { + return Void(); + }; + Future finish(Reference tr, + Reference tb, + Reference fb, + Reference task) override { + return _finish(tr, tb, fb, task); + }; +}; +StringRef RestoreDispatchPartitionedTaskFunc::name = "restore_dispatch_partitioned"_sr; +REGISTER_TASKFUNC(RestoreDispatchPartitionedTaskFunc); struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { static StringRef name; static constexpr uint32_t version = 1; @@ -4367,15 +5415,17 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { state int64_t remainingInBatch = Params.remainingInBatch().get(task); state bool addingToExistingBatch = remainingInBatch > 0; state Version restoreVersion; - state Future> onlyApplyMutationLogs = restore.onlyApplyMutationLogs().get(tr); - wait(store(restoreVersion, restore.restoreVersion().getOrThrow(tr)) && success(onlyApplyMutationLogs) && + wait(store(restoreVersion, restore.restoreVersion().getOrThrow(tr)) && checkTaskVersion(tr->getDatabase(), task, name, version)); // If not adding to an existing batch then update the apply mutations end version so the mutations from the // previous batch can be applied. Only do this once beginVersion is > 0 (it will be 0 for the initial // dispatch). if (!addingToExistingBatch && beginVersion > 0) { + // hfu5 : unblock apply alog to normal key space + // if the last file is [80, 100] and the restoreVersion is 90, we should use 90 here + // this call an additional call after last file restore.setApplyEndVersion(tr, std::min(beginVersion, restoreVersion + 1)); } @@ -4402,6 +5452,7 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { return Void(); } + // question why do we need beginFile at all -- this to handle stop in the middle of version case state std::string beginFile = Params.beginFile().getOrDefault(task); // Get a batch of files. We're targeting batchSize blocks being dispatched so query for batchSize files // (each of which is 0 or more blocks). @@ -4453,6 +5504,7 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { .detail("TaskInstance", THIS_ADDR); } else if (beginVersion < restoreVersion) { // If beginVersion is less than restoreVersion then do one more dispatch task to get there + // there are no more files between beginVersion and restoreVersion wait(success(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, restoreVersion, "", 0, batchSize))); TraceEvent("FileRestoreDispatch") @@ -4479,6 +5531,7 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { } else { // Applying of mutations is not yet finished so wait a small amount of time and then re-add this // same task. + // this is only to create a dummy one wait for it to finish wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); wait(success(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, beginVersion, "", 0, batchSize))); @@ -4508,6 +5561,9 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { state int64_t beginBlock = Params.beginBlock().getOrDefault(task); state int i = 0; + // for each file + // not creating a new task at this level because restore files are read back together -- both range and log + // so i have to process range files anyway. for (; i < files.results.size(); ++i) { RestoreConfig::RestoreFile& f = files.results[i]; @@ -4517,6 +5573,14 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { if (f.version != endVersion && remainingInBatch <= 0) { // Next start will be at the first version after endVersion at the first file first block ++endVersion; + // beginFile set to empty to indicate we are not in the middle of a range + // by middle of a range, we mean that we have rangeFile v=80, and logFile v=[80, 100], + // then we have to include this log file too in this batch + + // if range comes first, say range=80, log=(81, 100), then its fine we stop before the log, + // what if log comes first: + // range=80, log=(60,90), and range should not be read, but + // what about the 80-90 part? we should not allow those to commit beginFile = ""; beginBlock = 0; break; @@ -4626,8 +5690,12 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { // If beginFile is not empty then we had to stop in the middle of a version (possibly within a file) so we // cannot end the batch here because we do not know if we got all of the files and blocks from the last // version queued, so make sure remainingInBatch is at least 1. - if (!beginFile.empty()) + if (!beginFile.empty()) { + // this is to make sure if we stop in the middle of a version, we do not end this batch + // instead next RestoreDispatchTaskFunc should have addingToExistingBatch as true + // thus they are considered the same batch and alog will be committed only when all of them succeed remainingInBatch = std::max(1, remainingInBatch); + } // If more blocks need to be dispatched in this batch then add a follow-on task that is part of the // allPartsDone group which will won't wait to run and will add more block tasks. @@ -4654,6 +5722,7 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase { allPartsDone)); wait(waitForAll(addTaskFutures)); + fmt::print(stderr, "Old Task Add parent task begin={}, end={}, should happen only after children are done \n", beginVersion, endVersion); // If adding to existing batch then task is joined with a batch future so set done future. Future setDone = addingToExistingBatch ? onDone->set(tr, taskBucket) : Void(); @@ -4819,6 +5888,7 @@ ACTOR Future abortRestore(Database cx, Key tagName) { struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { static StringRef name; static constexpr uint32_t version = 1; + static constexpr uint32_t step = 1000000; static struct { static TaskParam firstVersion() { return __FUNCTION__sr; } @@ -4906,6 +5976,7 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { for (auto const& r : ranges) { keyRangesFilter.push_back_deep(keyRangesFilter.arena(), KeyRangeRef(r)); } + // hfu5 : all files are read from here state Optional restorable = wait(bc->getRestoreSet(restoreVersion, keyRangesFilter, logsOnly, beginVersion)); if (!restorable.present()) @@ -4916,6 +5987,8 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { state std::vector files; if (!logsOnly) { beginVersion = restorable.get().snapshot.beginVersion; + fmt::print(stderr, "FullRestoreTask, set beginVersion={}\n", beginVersion); + if (!inconsistentSnapshotOnly) { for (const RangeFile& f : restorable.get().ranges) { files.push_back({ f.version, f.fileName, true, f.blockSize, f.fileSize }); @@ -4926,6 +5999,7 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { } else { for (int i = 0; i < restorable.get().ranges.size(); ++i) { const RangeFile& f = restorable.get().ranges[i]; + // hfu5: insert range files first files.push_back({ f.version, f.fileName, true, f.blockSize, f.fileSize }); // In inconsistentSnapshotOnly mode, if all range files have the same version, then it is the // firstConsistentVersion, otherwise unknown (use -1). @@ -4942,10 +6016,13 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { } if (!inconsistentSnapshotOnly) { for (const LogFile& f : restorable.get().logs) { - files.push_back({ f.beginVersion, f.fileName, false, f.blockSize, f.fileSize, f.endVersion }); + // hfu5: log files are added to files here + files.push_back( + { f.beginVersion, f.fileName, false, f.blockSize, f.fileSize, f.endVersion, f.tagId, f.totalTags }); } } // First version for which log data should be applied + fmt::print(stderr, "FullRestoreTask:: beginVersion={}\n", beginVersion); Params.firstVersion().set(task, beginVersion); tr->reset(); @@ -4978,8 +6055,10 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { state int nFileBlocks = 0; state int nFiles = 0; auto fileSet = restore.fileSet(); + // as a result, fileSet has everything, including [beginVersion, endVersion] for each tag for (; i != end && txBytes < 1e6; ++i) { txBytes += fileSet.insert(tr, *i); + // handle the remaining nFileBlocks += (i->fileSize + i->blockSize - 1) / i->blockSize; ++nFiles; } @@ -5011,7 +6090,8 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { Reference futureBucket, Reference task) { state RestoreConfig restore(task); - + state bool transformPartitionedLog; + state Version restoreVersion; state Version firstVersion = Params.firstVersion().getOrDefault(task, invalidVersion); if (firstVersion == invalidVersion) { wait(restore.logError( @@ -5029,8 +6109,20 @@ struct StartFullRestoreTaskFunc : RestoreTaskFuncBase { restore.setApplyEndVersion(tr, firstVersion); // Apply range data and log data in order - wait(success(RestoreDispatchTaskFunc::addTask( - tr, taskBucket, task, 0, "", 0, CLIENT_KNOBS->RESTORE_DISPATCH_BATCH_SIZE))); + wait(store(transformPartitionedLog, restore.transformPartitionedLog().getD(tr, Snapshot::False, false))); + wait(store(restoreVersion, restore.restoreVersion().getOrThrow(tr))); + + if (transformPartitionedLog) { + fmt::print(stderr, + "StartInitial task, firstVersion={}, begin={}, endVersion={}\n", + firstVersion, + 0, + restoreVersion); + wait(success(RestoreDispatchPartitionedTaskFunc::addTask(tr, taskBucket, task, 0, firstVersion + step))); + } else { + wait(success(RestoreDispatchTaskFunc::addTask( + tr, taskBucket, task, 0, "", 0, CLIENT_KNOBS->RESTORE_DISPATCH_BATCH_SIZE))); + } wait(taskBucket->finish(tr, task)); return Void(); @@ -5443,7 +6535,8 @@ class FileBackupAgentImpl { OnlyApplyMutationLogs onlyApplyMutationLogs, InconsistentSnapshotOnly inconsistentSnapshotOnly, Version beginVersion, - UID uid) { + UID uid, + TransformPartitionedLog transformPartitionedLog) { KeyRangeMap restoreRangeSet; for (auto& range : ranges) { restoreRangeSet.insert(range, 1); @@ -5516,6 +6609,7 @@ class FileBackupAgentImpl { restore.inconsistentSnapshotOnly().set(tr, inconsistentSnapshotOnly); restore.beginVersion().set(tr, beginVersion); restore.unlockDBAfterRestore().set(tr, unlockDB); + restore.transformPartitionedLog().set(tr, transformPartitionedLog); if (BUGGIFY && restoreRanges.size() == 1) { restore.restoreRange().set(tr, restoreRanges[0]); } else { @@ -6099,25 +7193,27 @@ class FileBackupAgentImpl { // When set to true, gives an inconsistent snapshot, thus not recommended // beginVersions: restore's begin version for each range // randomUid: the UID for lock the database - ACTOR static Future restore(FileBackupAgent* backupAgent, - Database cx, - Optional cxOrig, - Key tagName, - Key url, - Optional proxy, - Standalone> ranges, - Standalone> beginVersions, - WaitForComplete waitForComplete, - Version targetVersion, - Verbose verbose, - Key addPrefix, - Key removePrefix, - LockDB lockDB, - UnlockDB unlockDB, - OnlyApplyMutationLogs onlyApplyMutationLogs, - InconsistentSnapshotOnly inconsistentSnapshotOnly, - Optional encryptionKeyFileName, - UID randomUid) { + ACTOR static Future restore( + FileBackupAgent* backupAgent, + Database cx, + Optional cxOrig, + Key tagName, + Key url, + Optional proxy, + Standalone> ranges, + Standalone> beginVersions, + WaitForComplete waitForComplete, + Version targetVersion, + Verbose verbose, + Key addPrefix, + Key removePrefix, + LockDB lockDB, + UnlockDB unlockDB, + OnlyApplyMutationLogs onlyApplyMutationLogs, + InconsistentSnapshotOnly inconsistentSnapshotOnly, + Optional encryptionKeyFileName, + UID randomUid, + TransformPartitionedLog transformPartitionedLog = TransformPartitionedLog::False) { // The restore command line tool won't allow ranges to be empty, but correctness workloads somehow might. if (ranges.empty()) { throw restore_error(); @@ -6183,7 +7279,8 @@ class FileBackupAgentImpl { onlyApplyMutationLogs, inconsistentSnapshotOnly, beginVersion, - randomUid)); + randomUid, + transformPartitionedLog)); wait(tr->commit()); break; } catch (Error& e) { @@ -6545,7 +7642,8 @@ Future FileBackupAgent::restore(Database cx, UnlockDB unlockDB, OnlyApplyMutationLogs onlyApplyMutationLogs, InconsistentSnapshotOnly inconsistentSnapshotOnly, - Optional const& encryptionKeyFileName) { + Optional const& encryptionKeyFileName, + TransformPartitionedLog transformPartitionedLog) { return FileBackupAgentImpl::restore(this, cx, cxOrig, @@ -6564,26 +7662,28 @@ Future FileBackupAgent::restore(Database cx, onlyApplyMutationLogs, inconsistentSnapshotOnly, encryptionKeyFileName, - deterministicRandom()->randomUniqueID()); + deterministicRandom()->randomUniqueID(), + transformPartitionedLog); } -Future FileBackupAgent::restore(Database cx, - Optional cxOrig, - Key tagName, - Key url, - Optional proxy, - Standalone> ranges, - WaitForComplete waitForComplete, - Version targetVersion, - Verbose verbose, - Key addPrefix, - Key removePrefix, - LockDB lockDB, - UnlockDB unlockDB, - OnlyApplyMutationLogs onlyApplyMutationLogs, - InconsistentSnapshotOnly inconsistentSnapshotOnly, - Version beginVersion, - Optional const& encryptionKeyFileName) { +Future FileBackupAgent::restoreConstructVersion(Database cx, + Optional cxOrig, + Key tagName, + Key url, + Optional proxy, + Standalone> ranges, + WaitForComplete waitForComplete, + Version targetVersion, + Verbose verbose, + Key addPrefix, + Key removePrefix, + LockDB lockDB, + UnlockDB unlockDB, + OnlyApplyMutationLogs onlyApplyMutationLogs, + InconsistentSnapshotOnly inconsistentSnapshotOnly, + Version beginVersion, + Optional const& encryptionKeyFileName, + TransformPartitionedLog transformPartitionedLog) { Standalone> beginVersions; for (auto i = 0; i < ranges.size(); ++i) { beginVersions.push_back(beginVersions.arena(), beginVersion); @@ -6604,25 +7704,26 @@ Future FileBackupAgent::restore(Database cx, unlockDB, onlyApplyMutationLogs, inconsistentSnapshotOnly, - encryptionKeyFileName); + encryptionKeyFileName, + transformPartitionedLog); } -Future FileBackupAgent::restore(Database cx, - Optional cxOrig, - Key tagName, - Key url, - Optional proxy, - WaitForComplete waitForComplete, - Version targetVersion, - Verbose verbose, - KeyRange range, - Key addPrefix, - Key removePrefix, - LockDB lockDB, - OnlyApplyMutationLogs onlyApplyMutationLogs, - InconsistentSnapshotOnly inconsistentSnapshotOnly, - Version beginVersion, - Optional const& encryptionKeyFileName) { +Future FileBackupAgent::restoreKeyRange(Database cx, + Optional cxOrig, + Key tagName, + Key url, + Optional proxy, + WaitForComplete waitForComplete, + Version targetVersion, + Verbose verbose, + KeyRange range, + Key addPrefix, + Key removePrefix, + LockDB lockDB, + OnlyApplyMutationLogs onlyApplyMutationLogs, + InconsistentSnapshotOnly inconsistentSnapshotOnly, + Version beginVersion, + Optional const& encryptionKeyFileName) { Standalone> rangeRef; if (range.begin.empty() && range.end.empty()) { addDefaultBackupRanges(rangeRef); diff --git a/fdbclient/KeyRangeMap.actor.cpp b/fdbclient/KeyRangeMap.actor.cpp index a678c28e4a3..0d57cc280e8 100644 --- a/fdbclient/KeyRangeMap.actor.cpp +++ b/fdbclient/KeyRangeMap.actor.cpp @@ -199,10 +199,12 @@ ACTOR Future krmSetRange(Transaction* tr, Key mapPrefix, KeyRange range, V } ACTOR Future krmSetRange(Reference tr, Key mapPrefix, KeyRange range, Value value) { + // keyVersionMap, (a, b), v1 state KeyRange withPrefix = KeyRangeRef(mapPrefix.toString() + range.begin.toString(), mapPrefix.toString() + range.end.toString()); RangeResult old = wait(tr->getRange(lastLessOrEqual(withPrefix.end), firstGreaterThan(withPrefix.end), 1, Snapshot::True)); + // fetch [keyVersionMap/end, keyVersionMap/inc(end)] Value oldValue; bool hasResult = old.size() > 0 && old[0].key.startsWith(mapPrefix); @@ -213,8 +215,10 @@ ACTOR Future krmSetRange(Reference tr, Key mapP if (!conflictRange.empty()) tr->addReadConflictRange(conflictRange); - tr->clear(withPrefix); - tr->set(withPrefix.begin, value); + tr->clear(withPrefix); // clear [keyVersionMap/a, keyVersionMap/b) + tr->set(withPrefix.begin, value); // set [keyVersionMap/a, v1) + // set [keyVersionMap/b, preveiousVersion], because end is exclusive here, + // but starting from end it might be covered by another range file, so set it to old value tr->set(withPrefix.end, oldValue); return Void(); diff --git a/fdbclient/TaskBucket.actor.cpp b/fdbclient/TaskBucket.actor.cpp index 203b64453aa..145f3aeed37 100644 --- a/fdbclient/TaskBucket.actor.cpp +++ b/fdbclient/TaskBucket.actor.cpp @@ -1078,6 +1078,10 @@ class TaskFutureImpl { taskFuture->futureBucket->setOptions(tr); bool is_set = wait(isSet(tr, taskFuture)); + // this means that if the task future is already set, then just return + // so the taskFuture cannot be completed already if we want to join + // vectorFuture with taskFuture + fmt::print(stderr, "TaskFuture isSet={}\n", is_set); if (is_set) { return Void(); } @@ -1127,6 +1131,7 @@ class TaskFutureImpl { bool is_set = wait(isSet(tr, taskFuture)); + fmt::print(stderr, "TaskFuture::onSet, isSet={}\n", is_set); if (is_set) { CODE_PROBE(true, "is_set == true"); wait(performAction(tr, taskBucket, taskFuture, task)); @@ -1137,6 +1142,7 @@ class TaskFutureImpl { for (auto& v : task->params) { tr->set(callbackSpace.pack(v.key), v.value); } + // fmt::print(stderr, "TaskFuture::onSet callback added, isSet={}\n", is_set); } return Void(); @@ -1262,6 +1268,8 @@ class TaskFutureImpl { taskFuture->futureBucket->setOptions(tr); std::vector> vectorFuture; + // the next line means generate a new task future with different key, + // but share the same prefix of futureBucket with the input taskFuture state Reference future = taskFuture->futureBucket->future(tr); vectorFuture.push_back(future); wait(join(tr, taskBucket, taskFuture, vectorFuture)); @@ -1276,7 +1284,7 @@ TaskFuture::TaskFuture(const Reference bucket, Key k) : futureBuck key = deterministicRandom()->randomUniqueID().toString(); } - prefix = futureBucket->prefix.get(key); + prefix = futureBucket->prefix.get(key); // this ::get actually append the key to the taskBucket prefix blocks = prefix.get("bl"_sr); callbacks = prefix.get("cb"_sr); } @@ -1347,5 +1355,6 @@ ACTOR Future getCompletionKey(TaskCompletionKey* self, Future TaskCompletionKey::get(Reference tr, Reference taskBucket) { ASSERT(key.present() == (joinFuture.getPtr() == nullptr)); + // from the parent bucket, it generate a new taskfuture and returns the key of the new taskfuture return key.present() ? key.get() : getCompletionKey(this, joinFuture->joinedFuture(tr, taskBucket)); } diff --git a/fdbclient/include/fdbclient/BackupAgent.actor.h b/fdbclient/include/fdbclient/BackupAgent.actor.h index c8931750aad..6729706faec 100644 --- a/fdbclient/include/fdbclient/BackupAgent.actor.h +++ b/fdbclient/include/fdbclient/BackupAgent.actor.h @@ -45,6 +45,7 @@ FDB_BOOLEAN_PARAM(ForceAction); FDB_BOOLEAN_PARAM(Terminator); FDB_BOOLEAN_PARAM(IncrementalBackupOnly); FDB_BOOLEAN_PARAM(UsePartitionedLog); +FDB_BOOLEAN_PARAM(TransformPartitionedLog); FDB_BOOLEAN_PARAM(OnlyApplyMutationLogs); FDB_BOOLEAN_PARAM(SnapshotBackupUseTenantCache); FDB_BOOLEAN_PARAM(InconsistentSnapshotOnly); @@ -203,42 +204,47 @@ class FileBackupAgent : public BackupAgentBase { UnlockDB = UnlockDB::True, OnlyApplyMutationLogs = OnlyApplyMutationLogs::False, InconsistentSnapshotOnly = InconsistentSnapshotOnly::False, - Optional const& encryptionKeyFileName = {}); - - Future restore(Database cx, - Optional cxOrig, - Key tagName, - Key url, - Optional proxy, - WaitForComplete = WaitForComplete::True, - Version targetVersion = ::invalidVersion, - Verbose = Verbose::True, - KeyRange range = KeyRange(), - Key addPrefix = Key(), - Key removePrefix = Key(), - LockDB = LockDB::True, - OnlyApplyMutationLogs = OnlyApplyMutationLogs::False, - InconsistentSnapshotOnly = InconsistentSnapshotOnly::False, - Version beginVersion = ::invalidVersion, - Optional const& encryptionKeyFileName = {}); - - Future restore(Database cx, - Optional cxOrig, - Key tagName, - Key url, - Optional proxy, - Standalone> ranges, - WaitForComplete waitForComplete = WaitForComplete::True, - Version targetVersion = ::invalidVersion, - Verbose verbose = Verbose::True, - Key addPrefix = Key(), - Key removePrefix = Key(), - LockDB lockDB = LockDB::True, - UnlockDB unlockDB = UnlockDB::True, - OnlyApplyMutationLogs onlyApplyMutationLogs = OnlyApplyMutationLogs::False, - InconsistentSnapshotOnly inconsistentSnapshotOnly = InconsistentSnapshotOnly::False, - Version beginVersion = ::invalidVersion, - Optional const& encryptionKeyFileName = {}); + Optional const& encryptionKeyFileName = {}, + TransformPartitionedLog transformPartitionedLog = TransformPartitionedLog::True); + + // this method will construct range and version vectors and then call restore() + Future restoreKeyRange(Database cx, + Optional cxOrig, + Key tagName, + Key url, + Optional proxy, + WaitForComplete = WaitForComplete::True, + Version targetVersion = ::invalidVersion, + Verbose = Verbose::True, + KeyRange range = KeyRange(), + Key addPrefix = Key(), + Key removePrefix = Key(), + LockDB = LockDB::True, + OnlyApplyMutationLogs = OnlyApplyMutationLogs::False, + InconsistentSnapshotOnly = InconsistentSnapshotOnly::False, + Version beginVersion = ::invalidVersion, + Optional const& encryptionKeyFileName = {}); + + // create a version vector of size ranges.size(), all elements are the same, i.e. beginVersion + Future restoreConstructVersion( + Database cx, + Optional cxOrig, + Key tagName, + Key url, + Optional proxy, + Standalone> ranges, + WaitForComplete waitForComplete = WaitForComplete::True, + Version targetVersion = ::invalidVersion, + Verbose verbose = Verbose::True, + Key addPrefix = Key(), + Key removePrefix = Key(), + LockDB lockDB = LockDB::True, + UnlockDB unlockDB = UnlockDB::True, + OnlyApplyMutationLogs onlyApplyMutationLogs = OnlyApplyMutationLogs::False, + InconsistentSnapshotOnly inconsistentSnapshotOnly = InconsistentSnapshotOnly::False, + Version beginVersion = ::invalidVersion, + Optional const& encryptionKeyFileName = {}, + TransformPartitionedLog transformPartitionedLog = TransformPartitionedLog::False); Future atomicRestore(Database cx, Key tagName, @@ -522,8 +528,8 @@ using RangeResultWithVersion = std::pair; struct RCGroup { RangeResult items; - Version version; - uint64_t groupKey; + Version version; // this is read version for this group + uint64_t groupKey; // this is the original version for this group RCGroup() : version(-1), groupKey(ULLONG_MAX){}; @@ -676,6 +682,7 @@ class KeyBackedTaskConfig : public KeyBackedClass { Reference task, SetValidation setValidation = SetValidation::True) { // Set the uid task parameter + // task's uid is set to my uid TaskParams.uid().set(task, uid); if (!setValidation) { diff --git a/fdbclient/include/fdbclient/ClientKnobs.h b/fdbclient/include/fdbclient/ClientKnobs.h index 6847d5c5d55..e8e1b707c35 100644 --- a/fdbclient/include/fdbclient/ClientKnobs.h +++ b/fdbclient/include/fdbclient/ClientKnobs.h @@ -183,6 +183,7 @@ class ClientKnobs : public KnobsImpl { int RESTORE_DISPATCH_ADDTASK_SIZE; int RESTORE_DISPATCH_BATCH_SIZE; int RESTORE_WRITE_TX_SIZE; + int RESTORE_PARTITIONED_BATCH_VERSION_SIZE; int APPLY_MAX_LOCK_BYTES; int APPLY_MIN_LOCK_BYTES; int APPLY_BLOCK_SIZE; diff --git a/fdbclient/include/fdbclient/PartitionedLogIterator.h b/fdbclient/include/fdbclient/PartitionedLogIterator.h new file mode 100644 index 00000000000..dfd8ba5a9f4 --- /dev/null +++ b/fdbclient/include/fdbclient/PartitionedLogIterator.h @@ -0,0 +1,26 @@ +#ifndef FDBCLIENT_PARTITIONED_LOG_ITERATOR_H +#define FDBCLIENT_PARTITIONED_LOG_ITERATOR_H + +#include "fdbclient/FDBTypes.h" + +// Structure to represent each mutation entity +struct VersionedMutation { + Version version; + int32_t subsequence; + MutationRef mutation; + VersionedMutation(Arena& p, const VersionedMutation& toCopy) : mutation(p, toCopy.mutation) { + version = toCopy.version; + subsequence = toCopy.subsequence; + } + VersionedMutation() {} +}; + +class PartitionedLogIterator : public ReferenceCounted { +public: + virtual bool hasNext() const = 0; + + virtual Future peekNextVersion() = 0; + + virtual Future>> getNext() = 0; +}; +#endif \ No newline at end of file diff --git a/fdbserver/ApplyMetadataMutation.cpp b/fdbserver/ApplyMetadataMutation.cpp index 4a5b247428f..23f7d2f867d 100644 --- a/fdbserver/ApplyMetadataMutation.cpp +++ b/fdbserver/ApplyMetadataMutation.cpp @@ -510,6 +510,7 @@ class ApplyMetadataMutationsImpl { } void checkSetApplyMutationsEndRange(MutationRef m) { + // only proceed when see mutation with applyMutationsEndRange if (!m.param1.startsWith(applyMutationsEndRange.begin)) { return; } @@ -532,6 +533,11 @@ class ApplyMetadataMutationsImpl { auto addPrefixValue = txnStateStore->readValue(uid.withPrefix(applyMutationsAddPrefixRange.begin)).get(); auto removePrefixValue = txnStateStore->readValue(uid.withPrefix(applyMutationsRemovePrefixRange.begin)).get(); auto beginValue = txnStateStore->readValue(uid.withPrefix(applyMutationsBeginRange.begin)).get(); + fmt::print(stderr, "ApplyMedataMutationBegin: begin={}, end={}\n", beginValue.present() ? BinaryReader::fromStringRef(beginValue.get(), Unversioned()) : 0, p.endVersion); + TraceEvent("BackupAgentBaseApplyMutationsBegin") + .detail("BeginVersion", beginValue.present() ? BinaryReader::fromStringRef(beginValue.get(), Unversioned()) : 0) + .detail("EndVersion", p.endVersion) + .log(); p.worker = applyMutations( cx, uid, diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index cbed292db6b..9199788e233 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -671,6 +671,7 @@ ACTOR Future addMutation(Reference logFile, StringRef mutation, int64_t* blockEnd, int blockSize) { + // version, subversion, messageSize, message state int bytes = sizeof(Version) + sizeof(uint32_t) + sizeof(int) + mutation.size(); // Convert to big Endianness for version.version, version.sub, and msgSize diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index 894b403a76e..a1c70566829 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -496,6 +496,7 @@ bool isWhitelisted(const std::vector>& binPathVec, StringR return std::find(binPathVec.begin(), binPathVec.end(), binPath) != binPathVec.end(); } +// hfu5 question is logRangeMutations Key version or actual key ACTOR Future addBackupMutations(ProxyCommitData* self, const std::map* logRangeMutations, LogPushData* toCommit, @@ -512,6 +513,7 @@ ACTOR Future addBackupMutations(ProxyCommitData* self, // Serialize the log range mutations within the map for (; logRangeMutation != logRangeMutations->cend(); ++logRangeMutation) { // FIXME: this is re-implementing the serialize function of MutationListRef in order to have a yield + // this is 0x0FDB00A200090001 valueWriter = BinaryWriter(IncludeVersion(ProtocolVersion::withBackupMutations())); valueWriter << logRangeMutation->second.totalSize(); @@ -532,7 +534,7 @@ ACTOR Future addBackupMutations(ProxyCommitData* self, Key val = valueWriter.toValue(); - BinaryWriter wr(Unversioned()); + BinaryWriter wr(Unversioned()); // backupName/hash/commitVersion/part, so wr is param1 // Serialize the log destination wr.serializeBytes(logRangeMutation->first); diff --git a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp index e73d41fb157..ced75448229 100644 --- a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp +++ b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp @@ -372,18 +372,18 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { try { // TODO: Change to my restore agent code TraceEvent(SevError, "MXFastRestore").detail("RestoreFunction", "ShouldChangeToMyOwnRestoreLogic"); - wait(success(backupAgent->restore(cx, - cx, - self->backupTag, - KeyRef(lastBackupContainer), - {}, - WaitForComplete::True, - ::invalidVersion, - Verbose::True, - normalKeys, - Key(), - Key(), - self->locked))); + wait(success(backupAgent->restoreKeyRange(cx, + cx, + self->backupTag, + KeyRef(lastBackupContainer), + {}, + WaitForComplete::True, + ::invalidVersion, + Verbose::True, + normalKeys, + Key(), + Key(), + self->locked))); TraceEvent(SevError, "BARW_RestoreAllowedOverwrittingDatabase", randomID).log(); ASSERT(false); } catch (Error& e) { diff --git a/fdbserver/workloads/BackupCorrectness.actor.cpp b/fdbserver/workloads/BackupCorrectness.actor.cpp index d9722d916d6..b00ccb546e7 100644 --- a/fdbserver/workloads/BackupCorrectness.actor.cpp +++ b/fdbserver/workloads/BackupCorrectness.actor.cpp @@ -492,18 +492,18 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { // Try doing a restore without clearing the keys if (rowCount > 0) { try { - wait(success(backupAgent->restore(cx, - cx, - self->backupTag, - KeyRef(lastBackupContainer), - {}, - WaitForComplete::True, - ::invalidVersion, - Verbose::True, - normalKeys, - Key(), - Key(), - self->locked))); + wait(success(backupAgent->restoreKeyRange(cx, + cx, + self->backupTag, + KeyRef(lastBackupContainer), + {}, + WaitForComplete::True, + ::invalidVersion, + Verbose::True, + normalKeys, + Key(), + Key(), + self->locked))); TraceEvent(SevError, "BARW_RestoreAllowedOverwrittingDatabase", randomID).log(); ASSERT(false); } catch (Error& e) { @@ -531,23 +531,23 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { })); state Standalone restoreTag(self->backupTag.toString() + "_system"); printf("BackupCorrectness, backupAgent.restore is called for tag:%s\n", restoreTag.toString().c_str()); - wait(success(backupAgent->restore(cx, - cx, - restoreTag, - KeyRef(lastBackupContainer->getURL()), - lastBackupContainer->getProxy(), - systemRestoreRanges, - WaitForComplete::True, - targetVersion, - Verbose::True, - Key(), - Key(), - self->locked, - UnlockDB::True, - OnlyApplyMutationLogs::False, - InconsistentSnapshotOnly::False, - ::invalidVersion, - self->encryptionKeyFileName))); + wait(success(backupAgent->restoreConstructVersion(cx, + cx, + restoreTag, + KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), + systemRestoreRanges, + WaitForComplete::True, + targetVersion, + Verbose::True, + Key(), + Key(), + self->locked, + UnlockDB::True, + OnlyApplyMutationLogs::False, + InconsistentSnapshotOnly::False, + ::invalidVersion, + self->encryptionKeyFileName))); printf("BackupCorrectness, backupAgent.restore finished for tag:%s\n", restoreTag.toString().c_str()); return Void(); } @@ -731,22 +731,22 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { restoreIndex, range.toString().c_str(), restoreTag.toString().c_str()); - restores.push_back(backupAgent.restore(cx, - cx, - restoreTag, - KeyRef(lastBackupContainer->getURL()), - lastBackupContainer->getProxy(), - WaitForComplete::True, - targetVersion, - Verbose::True, - range, - Key(), - Key(), - self->locked, - OnlyApplyMutationLogs::False, - InconsistentSnapshotOnly::False, - ::invalidVersion, - self->encryptionKeyFileName)); + restores.push_back(backupAgent.restoreKeyRange(cx, + cx, + restoreTag, + KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), + WaitForComplete::True, + targetVersion, + Verbose::True, + range, + Key(), + Key(), + self->locked, + OnlyApplyMutationLogs::False, + InconsistentSnapshotOnly::False, + ::invalidVersion, + self->encryptionKeyFileName)); } } else { multipleRangesInOneTag = true; @@ -755,23 +755,23 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { printf("BackupCorrectness, backupAgent.restore is called for restoreIndex:%d tag:%s\n", restoreIndex, restoreTag.toString().c_str()); - restores.push_back(backupAgent.restore(cx, - cx, - restoreTag, - KeyRef(lastBackupContainer->getURL()), - lastBackupContainer->getProxy(), - self->restoreRanges, - WaitForComplete::True, - targetVersion, - Verbose::True, - Key(), - Key(), - self->locked, - UnlockDB::True, - OnlyApplyMutationLogs::False, - InconsistentSnapshotOnly::False, - ::invalidVersion, - self->encryptionKeyFileName)); + restores.push_back(backupAgent.restoreConstructVersion(cx, + cx, + restoreTag, + KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), + self->restoreRanges, + WaitForComplete::True, + targetVersion, + Verbose::True, + Key(), + Key(), + self->locked, + UnlockDB::True, + OnlyApplyMutationLogs::False, + InconsistentSnapshotOnly::False, + ::invalidVersion, + self->encryptionKeyFileName)); } // Sometimes kill and restart the restore @@ -788,23 +788,24 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { tr->clear(range); return Void(); })); - restores[restoreIndex] = backupAgent.restore(cx, - cx, - restoreTags[restoreIndex], - KeyRef(lastBackupContainer->getURL()), - lastBackupContainer->getProxy(), - self->restoreRanges, - WaitForComplete::True, - ::invalidVersion, - Verbose::True, - Key(), - Key(), - self->locked, - UnlockDB::True, - OnlyApplyMutationLogs::False, - InconsistentSnapshotOnly::False, - ::invalidVersion, - self->encryptionKeyFileName); + restores[restoreIndex] = + backupAgent.restoreConstructVersion(cx, + cx, + restoreTags[restoreIndex], + KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), + self->restoreRanges, + WaitForComplete::True, + ::invalidVersion, + Verbose::True, + Key(), + Key(), + self->locked, + UnlockDB::True, + OnlyApplyMutationLogs::False, + InconsistentSnapshotOnly::False, + ::invalidVersion, + self->encryptionKeyFileName); } } else { for (restoreIndex = 0; restoreIndex < restores.size(); restoreIndex++) { @@ -820,22 +821,23 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { tr->clear(self->restoreRanges[restoreIndex]); return Void(); })); - restores[restoreIndex] = backupAgent.restore(cx, - cx, - restoreTags[restoreIndex], - KeyRef(lastBackupContainer->getURL()), - lastBackupContainer->getProxy(), - WaitForComplete::True, - ::invalidVersion, - Verbose::True, - self->restoreRanges[restoreIndex], - Key(), - Key(), - self->locked, - OnlyApplyMutationLogs::False, - InconsistentSnapshotOnly::False, - ::invalidVersion, - self->encryptionKeyFileName); + restores[restoreIndex] = + backupAgent.restoreKeyRange(cx, + cx, + restoreTags[restoreIndex], + KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), + WaitForComplete::True, + ::invalidVersion, + Verbose::True, + self->restoreRanges[restoreIndex], + Key(), + Key(), + self->locked, + OnlyApplyMutationLogs::False, + InconsistentSnapshotOnly::False, + ::invalidVersion, + self->encryptionKeyFileName); } } } diff --git a/fdbserver/workloads/BackupCorrectnessPartitioned.actor.cpp b/fdbserver/workloads/BackupCorrectnessPartitioned.actor.cpp new file mode 100644 index 00000000000..1046ce92085 --- /dev/null +++ b/fdbserver/workloads/BackupCorrectnessPartitioned.actor.cpp @@ -0,0 +1,826 @@ +/* + * BackupCorrectness.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/DatabaseConfiguration.h" +#include "fdbclient/ManagementAPI.actor.h" +#include "fdbclient/ReadYourWrites.h" +#include "fdbrpc/simulator.h" +#include "fdbclient/BackupAgent.actor.h" +#include "fdbclient/BackupContainer.h" +#include "fdbclient/BackupContainerFileSystem.h" +#include "fdbclient/TenantManagement.actor.h" +#include "fdbserver/Knobs.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "fdbserver/workloads/BulkSetup.actor.h" +#include "flow/IRandom.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +// A workload which test the correctness of backup and restore process +struct BackupAndRestorePartitionedCorrectnessWorkload : TestWorkload { + static constexpr auto NAME = "BackupAndRestorePartitionedCorrectness"; + double backupAfter, restoreAfter, abortAndRestartAfter; + double minBackupAfter; + double backupStartAt, restoreStartAfterBackupFinished, stopDifferentialAfter; + Key backupTag; + int backupRangesCount, backupRangeLengthMax; + bool differentialBackup, performRestore, agentRequest; + Standalone> backupRanges; + std::vector skippedRestoreRanges; + Standalone> restoreRanges; + static int backupAgentRequests; + LockDB locked{ false }; + bool allowPauses; + bool shareLogRange; + bool shouldSkipRestoreRanges; + bool defaultBackup; + Optional encryptionKeyFileName; + + BackupAndRestorePartitionedCorrectnessWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { + locked.set(sharedRandomNumber % 2); + backupAfter = getOption(options, "backupAfter"_sr, 10.0); + double minBackupAfter = getOption(options, "minBackupAfter"_sr, backupAfter); + if (backupAfter > minBackupAfter) { + backupAfter = deterministicRandom()->random01() * (backupAfter - minBackupAfter) + minBackupAfter; + } + restoreAfter = getOption(options, "restoreAfter"_sr, 35.0); + performRestore = getOption(options, "performRestore"_sr, true); + backupTag = getOption(options, "backupTag"_sr, BackupAgentBase::getDefaultTag()); + backupRangesCount = getOption(options, "backupRangesCount"_sr, 5); + backupRangeLengthMax = getOption(options, "backupRangeLengthMax"_sr, 1); + abortAndRestartAfter = + getOption(options, + "abortAndRestartAfter"_sr, + deterministicRandom()->random01() < 0.5 + ? deterministicRandom()->random01() * (restoreAfter - backupAfter) + backupAfter + : 0.0); + differentialBackup = + getOption(options, "differentialBackup"_sr, deterministicRandom()->random01() < 0.5 ? true : false); + stopDifferentialAfter = + getOption(options, + "stopDifferentialAfter"_sr, + differentialBackup ? deterministicRandom()->random01() * + (restoreAfter - std::max(abortAndRestartAfter, backupAfter)) + + std::max(abortAndRestartAfter, backupAfter) + : 0.0); + agentRequest = getOption(options, "simBackupAgents"_sr, true); + allowPauses = getOption(options, "allowPauses"_sr, true); + shareLogRange = getOption(options, "shareLogRange"_sr, false); + defaultBackup = getOption(options, "defaultBackup"_sr, false); + + std::vector restorePrefixesToInclude = + getOption(options, "restorePrefixesToInclude"_sr, std::vector()); + + shouldSkipRestoreRanges = deterministicRandom()->random01() < 0.3 ? true : false; + if (getOption(options, "encrypted"_sr, deterministicRandom()->random01() < 0.1)) { + encryptionKeyFileName = "simfdb/" + getTestEncryptionFileName(); + } + + TraceEvent("BARW_ClientId").detail("Id", wcx.clientId); + UID randomID = nondeterministicRandom()->randomUniqueID(); + TraceEvent("BARW_PerformRestore", randomID).detail("Value", performRestore); + if (defaultBackup) { + addDefaultBackupRanges(backupRanges); + } else if (shareLogRange) { + bool beforePrefix = sharedRandomNumber & 1; + if (beforePrefix) + backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(normalKeys.begin, "\xfe\xff\xfe"_sr)); + else + backupRanges.push_back_deep(backupRanges.arena(), + KeyRangeRef(strinc("\x00\x00\x01"_sr), normalKeys.end)); + } else if (backupRangesCount <= 0) { + backupRanges.push_back_deep(backupRanges.arena(), normalKeys); + } else { + // Add backup ranges + std::set rangeEndpoints; + while (rangeEndpoints.size() < backupRangesCount * 2) { + rangeEndpoints.insert(deterministicRandom()->randomAlphaNumeric( + deterministicRandom()->randomInt(1, backupRangeLengthMax + 1))); + } + + // Create ranges from the keys, in order, to prevent overlaps + std::vector sortedEndpoints(rangeEndpoints.begin(), rangeEndpoints.end()); + sort(sortedEndpoints.begin(), sortedEndpoints.end()); + for (auto i = sortedEndpoints.begin(); i != sortedEndpoints.end(); ++i) { + const std::string& start = *i++; + backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(start, *i)); + + // Track the added range + TraceEvent("BARW_BackupCorrectnessRange", randomID).detail("RangeBegin", start).detail("RangeEnd", *i); + } + } + + if (performRestore && !restorePrefixesToInclude.empty() && shouldSkipRestoreRanges) { + for (auto& range : backupRanges) { + bool intersection = false; + for (auto& prefix : restorePrefixesToInclude) { + KeyRange prefixRange(KeyRangeRef(prefix, strinc(prefix))); + if (range.intersects(prefixRange)) { + intersection = true; + } + TraceEvent("BARW_PrefixSkipRangeDetails") + .detail("PrefixMandatory", printable(prefix)) + .detail("BackupRange", printable(range)) + .detail("Intersection", intersection); + } + // If the backup range intersects with restorePrefixesToInclude or a coin flip is true then use it as a + // restore range as well, otherwise skip it. + if (intersection || deterministicRandom()->coinflip()) { + restoreRanges.push_back_deep(restoreRanges.arena(), range); + } else { + skippedRestoreRanges.push_back(range); + } + } + } else { + restoreRanges = backupRanges; + } + + // If no random backup ranges intersected with restorePrefixesToInclude or won the coin flip then restoreRanges + // will be empty, so move an item from skippedRestoreRanges to restoreRanges. + if (restoreRanges.empty()) { + ASSERT(!skippedRestoreRanges.empty()); + restoreRanges.push_back_deep(restoreRanges.arena(), skippedRestoreRanges.back()); + skippedRestoreRanges.pop_back(); + } + + for (auto& range : restoreRanges) { + TraceEvent("BARW_RestoreRange", randomID) + .detail("RangeBegin", printable(range.begin)) + .detail("RangeEnd", printable(range.end)); + } + for (auto& range : skippedRestoreRanges) { + TraceEvent("BARW_SkipRange", randomID) + .detail("RangeBegin", printable(range.begin)) + .detail("RangeEnd", printable(range.end)); + } + } + + Future setup(Database const& cx) override { + if (clientId != 0) { + return Void(); + } + + return _setup(cx, this); + } + + ACTOR Future _setup(Database cx, BackupAndRestorePartitionedCorrectnessWorkload* self) { + state bool adjusted = false; + state TenantMapEntry entry; + + if (!self->defaultBackup && (cx->defaultTenant.present() || BUGGIFY)) { + if (cx->defaultTenant.present()) { + wait(store(entry, TenantAPI::getTenant(cx.getReference(), cx->defaultTenant.get()))); + + // If we are specifying sub-ranges (or randomly, if backing up normal keys), adjust them to be relative + // to the tenant + if (self->backupRanges.size() != 1 || self->backupRanges[0] != normalKeys || + deterministicRandom()->coinflip()) { + adjusted = true; + Standalone> modifiedBackupRanges; + for (int i = 0; i < self->backupRanges.size(); ++i) { + modifiedBackupRanges.push_back_deep( + modifiedBackupRanges.arena(), + self->backupRanges[i].withPrefix(entry.prefix, self->backupRanges.arena())); + } + self->backupRanges = modifiedBackupRanges; + } + } + for (auto r : getSystemBackupRanges()) { + self->backupRanges.push_back_deep(self->backupRanges.arena(), r); + } + + if (adjusted) { + Standalone> modifiedRestoreRanges; + for (int i = 0; i < self->restoreRanges.size(); ++i) { + modifiedRestoreRanges.push_back_deep( + modifiedRestoreRanges.arena(), + self->restoreRanges[i].withPrefix(entry.prefix, self->restoreRanges.arena())); + } + self->restoreRanges = modifiedRestoreRanges; + + for (int i = 0; i < self->skippedRestoreRanges.size(); ++i) { + self->skippedRestoreRanges[i] = self->skippedRestoreRanges[i].withPrefix(entry.prefix); + } + } + for (auto r : getSystemBackupRanges()) { + self->restoreRanges.push_back_deep(self->restoreRanges.arena(), r); + } + } + + return Void(); + } + + Future start(Database const& cx) override { + if (clientId != 0) + return Void(); + + TraceEvent(SevInfo, "BARW_Param").detail("Locked", locked); + TraceEvent(SevInfo, "BARW_Param").detail("BackupAfter", backupAfter); + TraceEvent(SevInfo, "BARW_Param").detail("RestoreAfter", restoreAfter); + TraceEvent(SevInfo, "BARW_Param").detail("PerformRestore", performRestore); + TraceEvent(SevInfo, "BARW_Param").detail("BackupTag", printable(backupTag).c_str()); + TraceEvent(SevInfo, "BARW_Param").detail("BackupRangesCount", backupRangesCount); + TraceEvent(SevInfo, "BARW_Param").detail("BackupRangeLengthMax", backupRangeLengthMax); + TraceEvent(SevInfo, "BARW_Param").detail("AbortAndRestartAfter", abortAndRestartAfter); + TraceEvent(SevInfo, "BARW_Param").detail("DifferentialBackup", differentialBackup); + TraceEvent(SevInfo, "BARW_Param").detail("StopDifferentialAfter", stopDifferentialAfter); + TraceEvent(SevInfo, "BARW_Param").detail("AgentRequest", agentRequest); + TraceEvent(SevInfo, "BARW_Param").detail("Encrypted", encryptionKeyFileName.present()); + + return _start(cx, this); + } + + Future check(Database const& cx) override { + if (clientId != 0) + return true; + else + return _check(cx, this); + } + + ACTOR static Future _check(Database cx, BackupAndRestorePartitionedCorrectnessWorkload* self) { + state Transaction tr(cx); + loop { + try { + state int restoreIndex; + for (restoreIndex = 0; restoreIndex < self->skippedRestoreRanges.size(); restoreIndex++) { + state KeyRangeRef range = self->skippedRestoreRanges[restoreIndex]; + Standalone restoreTag(self->backupTag.toString() + "_" + std::to_string(restoreIndex)); + RangeResult res = wait(tr.getRange(range, GetRangeLimits::ROW_LIMIT_UNLIMITED)); + if (!res.empty()) { + TraceEvent(SevError, "BARW_UnexpectedRangePresent").detail("Range", printable(range)); + return false; + } + } + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + return true; + } + + void getMetrics(std::vector& m) override {} + + ACTOR static Future changePaused(Database cx, FileBackupAgent* backupAgent) { + loop { + wait(backupAgent->changePause(cx, true)); + wait(delay(30 * deterministicRandom()->random01())); + wait(backupAgent->changePause(cx, false)); + wait(delay(120 * deterministicRandom()->random01())); + } + } + + ACTOR static Future statusLoop(Database cx, std::string tag) { + state FileBackupAgent agent; + loop { + bool active = wait(agent.checkActive(cx)); + TraceEvent("BARW_AgentActivityCheck").detail("IsActive", active); + std::string status = wait(agent.getStatus(cx, ShowErrors::True, tag)); + puts(status.c_str()); + std::string statusJSON = wait(agent.getStatusJSON(cx, tag)); + puts(statusJSON.c_str()); + wait(delay(2.0)); + } + } + + ACTOR static Future doBackup(BackupAndRestorePartitionedCorrectnessWorkload* self, + double startDelay, + FileBackupAgent* backupAgent, + Database cx, + Key tag, + Standalone> backupRanges, + double stopDifferentialDelay, + Promise submittted) { + + state UID randomID = nondeterministicRandom()->randomUniqueID(); + + state Future stopDifferentialFuture = delay(stopDifferentialDelay); + wait(delay(startDelay)); + + if (startDelay || BUGGIFY) { + TraceEvent("BARW_DoBackupAbortBackup1", randomID) + .detail("Tag", printable(tag)) + .detail("StartDelay", startDelay); + + try { + wait(backupAgent->abortBackup(cx, tag.toString())); + } catch (Error& e) { + TraceEvent("BARW_DoBackupAbortBackupException", randomID).error(e).detail("Tag", printable(tag)); + if (e.code() != error_code_backup_unneeded) + throw; + } + } + + TraceEvent("BARW_DoBackupSubmitBackup", randomID) + .detail("Tag", printable(tag)) + .detail("StopWhenDone", stopDifferentialDelay ? "False" : "True"); + + state std::string backupContainer = "file://simfdb/backups/"; + state Future status = statusLoop(cx, tag.toString()); + try { + wait(backupAgent->submitBackup(cx, + StringRef(backupContainer), + {}, + deterministicRandom()->randomInt(0, 60), + deterministicRandom()->randomInt(0, 2000), + tag.toString(), + backupRanges, + true, + StopWhenDone{ !stopDifferentialDelay }, + UsePartitionedLog::True, // enable partitioned log here + IncrementalBackupOnly::False, + self->encryptionKeyFileName)); + } catch (Error& e) { + TraceEvent("BARW_DoBackupSubmitBackupException", randomID).error(e).detail("Tag", printable(tag)); + if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate) + throw; + } + + submittted.send(Void()); + + // Stop the differential backup, if enabled + if (stopDifferentialDelay) { + CODE_PROBE(!stopDifferentialFuture.isReady(), + "Restore starts at specified time - stopDifferential not ready"); + wait(stopDifferentialFuture); + TraceEvent("BARW_DoBackupWaitToDiscontinue", randomID) + .detail("Tag", printable(tag)) + .detail("DifferentialAfter", stopDifferentialDelay); + + try { + if (BUGGIFY) { + state KeyBackedTag backupTag = makeBackupTag(tag.toString()); + TraceEvent("BARW_DoBackupWaitForRestorable", randomID).detail("Tag", backupTag.tagName); + + // Wait until the backup is in a restorable state and get the status, URL, and UID atomically + state Reference lastBackupContainer; + state UID lastBackupUID; + state EBackupState resultWait = wait(backupAgent->waitBackup( + cx, backupTag.tagName, StopWhenDone::False, &lastBackupContainer, &lastBackupUID)); + + TraceEvent("BARW_DoBackupWaitForRestorable", randomID) + .detail("Tag", backupTag.tagName) + .detail("Result", BackupAgentBase::getStateText(resultWait)); + + state bool restorable = false; + if (lastBackupContainer) { + state Future fdesc = lastBackupContainer->describeBackup(); + wait(ready(fdesc)); + + if (!fdesc.isError()) { + state BackupDescription desc = fdesc.get(); + wait(desc.resolveVersionTimes(cx)); + printf("BackupDescription:\n%s\n", desc.toString().c_str()); + restorable = desc.maxRestorableVersion.present(); + } + } + + TraceEvent("BARW_LastBackupContainer", randomID) + .detail("BackupTag", printable(tag)) + .detail("LastBackupContainer", lastBackupContainer ? lastBackupContainer->getURL() : "") + .detail("LastBackupUID", lastBackupUID) + .detail("WaitStatus", BackupAgentBase::getStateText(resultWait)) + .detail("Restorable", restorable); + + // Do not check the backup, if aborted + if (resultWait == EBackupState::STATE_ABORTED) { + } + // Ensure that a backup container was found + else if (!lastBackupContainer) { + TraceEvent(SevError, "BARW_MissingBackupContainer", randomID) + .detail("LastBackupUID", lastBackupUID) + .detail("BackupTag", printable(tag)) + .detail("WaitStatus", BackupAgentBase::getStateText(resultWait)); + printf("BackupCorrectnessMissingBackupContainer tag: %s status: %s\n", + printable(tag).c_str(), + BackupAgentBase::getStateText(resultWait)); + } + // Check that backup is restorable + else if (!restorable) { + TraceEvent(SevError, "BARW_NotRestorable", randomID) + .detail("LastBackupUID", lastBackupUID) + .detail("BackupTag", printable(tag)) + .detail("BackupFolder", lastBackupContainer->getURL()) + .detail("WaitStatus", BackupAgentBase::getStateText(resultWait)); + printf("BackupCorrectnessNotRestorable: tag: %s\n", printable(tag).c_str()); + } + + // Abort the backup, if not the first backup because the second backup may have aborted the backup + // by now + if (startDelay) { + TraceEvent("BARW_DoBackupAbortBackup2", randomID) + .detail("Tag", printable(tag)) + .detail("WaitStatus", BackupAgentBase::getStateText(resultWait)) + .detail("LastBackupContainer", lastBackupContainer ? lastBackupContainer->getURL() : "") + .detail("Restorable", restorable); + wait(backupAgent->abortBackup(cx, tag.toString())); + } else { + TraceEvent("BARW_DoBackupDiscontinueBackup", randomID) + .detail("Tag", printable(tag)) + .detail("DifferentialAfter", stopDifferentialDelay); + wait(backupAgent->discontinueBackup(cx, tag)); + } + } + + else { + TraceEvent("BARW_DoBackupDiscontinueBackup", randomID) + .detail("Tag", printable(tag)) + .detail("DifferentialAfter", stopDifferentialDelay); + wait(backupAgent->discontinueBackup(cx, tag)); + } + } catch (Error& e) { + TraceEvent("BARW_DoBackupDiscontinueBackupException", randomID).error(e).detail("Tag", printable(tag)); + if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate) + throw; + } + } + + // Wait for the backup to complete + TraceEvent("BARW_DoBackupWaitBackup", randomID).detail("Tag", printable(tag)); + state EBackupState statusValue = wait(backupAgent->waitBackup(cx, tag.toString(), StopWhenDone::True)); + + state std::string statusText; + + std::string _statusText = wait(backupAgent->getStatus(cx, ShowErrors::True, tag.toString())); + statusText = _statusText; + // Can we validate anything about status? + + TraceEvent("BARW_DoBackupComplete", randomID) + .detail("Tag", printable(tag)) + .detail("Status", statusText) + .detail("StatusValue", BackupAgentBase::getStateText(statusValue)); + + return Void(); + } + + ACTOR static Future clearAndRestoreSystemKeys(Database cx, + BackupAndRestorePartitionedCorrectnessWorkload* self, + FileBackupAgent* backupAgent, + Version targetVersion, + Reference lastBackupContainer, + Standalone> systemRestoreRanges) { + // restore system keys before restoring any other ranges + wait(runRYWTransaction(cx, [=](Reference tr) -> Future { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + for (auto& range : systemRestoreRanges) + tr->clear(range); + return Void(); + })); + state Standalone restoreTag(self->backupTag.toString() + "_system"); + printf("BackupCorrectness, backupAgent.restore is called for tag:%s\n", restoreTag.toString().c_str()); + wait(success(backupAgent->restoreConstructVersion(cx, + cx, + restoreTag, + KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), + systemRestoreRanges, + WaitForComplete::True, + targetVersion, + Verbose::True, + Key(), + Key(), + self->locked, + UnlockDB::True, + OnlyApplyMutationLogs::False, + InconsistentSnapshotOnly::False, + ::invalidVersion, + self->encryptionKeyFileName, + TransformPartitionedLog::True))); + printf("BackupCorrectness, backupAgent.restore finished for tag:%s\n", restoreTag.toString().c_str()); + return Void(); + } + + ACTOR static Future _start(Database cx, BackupAndRestorePartitionedCorrectnessWorkload* self) { + state FileBackupAgent backupAgent; + state bool extraTasks = false; + state DatabaseConfiguration config = wait(getDatabaseConfiguration(cx)); + TraceEvent("BARW_Arguments") + .detail("BackupTag", printable(self->backupTag)) + .detail("PerformRestore", self->performRestore) + .detail("BackupAfter", self->backupAfter) + .detail("RestoreAfter", self->restoreAfter) + .detail("AbortAndRestartAfter", self->abortAndRestartAfter) + .detail("DifferentialAfter", self->stopDifferentialAfter); + + state UID randomID = nondeterministicRandom()->randomUniqueID(); + if (self->allowPauses && BUGGIFY) { + state Future cp = changePaused(cx, &backupAgent); + } + + // Increment the backup agent requests + if (self->agentRequest) { + BackupAndRestorePartitionedCorrectnessWorkload::backupAgentRequests++; + } + + if (self->encryptionKeyFileName.present()) { + wait(BackupContainerFileSystem::createTestEncryptionKeyFile(self->encryptionKeyFileName.get())); + } + + try { + state Future startRestore = delay(self->restoreAfter); + + // backup + wait(delay(self->backupAfter)); + + TraceEvent("BARW_DoBackup1", randomID).detail("Tag", printable(self->backupTag)); + state Promise submitted; + state Future b = doBackup( + self, 0, &backupAgent, cx, self->backupTag, self->backupRanges, self->stopDifferentialAfter, submitted); + + if (self->abortAndRestartAfter) { + TraceEvent("BARW_DoBackup2", randomID) + .detail("Tag", printable(self->backupTag)) + .detail("AbortWait", self->abortAndRestartAfter); + wait(submitted.getFuture()); + b = b && doBackup(self, + self->abortAndRestartAfter, + &backupAgent, + cx, + self->backupTag, + self->backupRanges, + self->stopDifferentialAfter, + Promise()); + } + + TraceEvent("BARW_DoBackupWait", randomID) + .detail("BackupTag", printable(self->backupTag)) + .detail("AbortAndRestartAfter", self->abortAndRestartAfter); + try { + wait(b); + } catch (Error& e) { + if (e.code() != error_code_database_locked) + throw; + if (self->performRestore) + throw; + return Void(); + } + TraceEvent("BARW_DoBackupDone", randomID) + .detail("BackupTag", printable(self->backupTag)) + .detail("AbortAndRestartAfter", self->abortAndRestartAfter); + + state KeyBackedTag keyBackedTag = makeBackupTag(self->backupTag.toString()); + UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx.getReference())); + state UID logUid = uidFlag.first; + state Key destUidValue = wait(BackupConfig(logUid).destUidValue().getD(cx.getReference())); + state Reference lastBackupContainer = + wait(BackupConfig(logUid).backupContainer().getD(cx.getReference())); + + + + CODE_PROBE(!startRestore.isReady(), "Restore starts at specified time"); + wait(startRestore); + + if (lastBackupContainer && self->performRestore) { + wait(runRYWTransaction(cx, [=](Reference tr) -> Future { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + for (auto& kvrange : self->backupRanges) + tr->clear(kvrange); + return Void(); + })); + + // restore database + TraceEvent("BARW_Restore", randomID) + .detail("LastBackupContainer", lastBackupContainer->getURL()) + .detail("RestoreAfter", self->restoreAfter) + .detail("BackupTag", printable(self->backupTag)); + + auto container = IBackupContainer::openContainer(lastBackupContainer->getURL(), + lastBackupContainer->getProxy(), + lastBackupContainer->getEncryptionKeyFileName()); + BackupDescription desc = wait(container->describeBackup()); + + state Version targetVersion = -1; + if (desc.maxRestorableVersion.present()) { + if (deterministicRandom()->random01() < 0.1) { + targetVersion = desc.minRestorableVersion.get(); + } else if (deterministicRandom()->random01() < 0.1) { + targetVersion = desc.maxRestorableVersion.get(); + } else if (deterministicRandom()->random01() < 0.5) { + targetVersion = deterministicRandom()->randomInt64(desc.minRestorableVersion.get(), + desc.contiguousLogEnd.get()); + } + } + + TraceEvent("BARW_RestoreDebug").detail("TargetVersion", targetVersion); + + state std::vector> restores; + state std::vector> restoreTags; + state bool multipleRangesInOneTag = false; + state int restoreIndex = 0; + // make sure system keys are not present in the restoreRanges as they will get restored first separately + // from the rest + Standalone> modifiedRestoreRanges; + Standalone> systemRestoreRanges; + for (int i = 0; i < self->restoreRanges.size(); ++i) { + if (config.tenantMode != TenantMode::REQUIRED || + !self->restoreRanges[i].intersects(getSystemBackupRanges())) { + modifiedRestoreRanges.push_back_deep(modifiedRestoreRanges.arena(), self->restoreRanges[i]); + } else { + KeyRangeRef normalKeyRange = self->restoreRanges[i] & normalKeys; + KeyRangeRef systemKeyRange = self->restoreRanges[i] & systemKeys; + if (!normalKeyRange.empty()) { + modifiedRestoreRanges.push_back_deep(modifiedRestoreRanges.arena(), normalKeyRange); + } + if (!systemKeyRange.empty()) { + systemRestoreRanges.push_back_deep(systemRestoreRanges.arena(), systemKeyRange); + } + } + } + self->restoreRanges = modifiedRestoreRanges; + if (!systemRestoreRanges.empty()) { + // We are able to restore system keys first since we restore an entire cluster at once rather than + // partial key ranges. + wait(clearAndRestoreSystemKeys( + cx, self, &backupAgent, targetVersion, lastBackupContainer, systemRestoreRanges)); + } + + multipleRangesInOneTag = true; + Standalone restoreTag(self->backupTag.toString() + "_" + std::to_string(restoreIndex)); + restoreTags.push_back(restoreTag); + printf("BackupCorrectness, backupAgent.restore is called for restoreIndex:%d tag:%s\n", + restoreIndex, + restoreTag.toString().c_str()); + restores.push_back(backupAgent.restoreConstructVersion(cx, + cx, + restoreTag, + KeyRef(lastBackupContainer->getURL()), + lastBackupContainer->getProxy(), + self->restoreRanges, + WaitForComplete::True, + targetVersion, + Verbose::True, + Key(), + Key(), + self->locked, + UnlockDB::True, + OnlyApplyMutationLogs::False, + InconsistentSnapshotOnly::False, + ::invalidVersion, + self->encryptionKeyFileName, + TransformPartitionedLog::True)); + + wait(waitForAll(restores)); + + for (auto& restore : restores) { + ASSERT(!restore.isError()); + } + } + + state Key backupAgentKey = uidPrefixKey(logRangesRange.begin, logUid); + state Key backupLogValuesKey = destUidValue.withPrefix(backupLogKeys.begin); + state Key backupLatestVersionsPath = destUidValue.withPrefix(backupLatestVersionsPrefix); + state Key backupLatestVersionsKey = uidPrefixKey(backupLatestVersionsPath, logUid); + state int displaySystemKeys = 0; + + // Ensure that there is no left over key within the backup subspace + loop { + state Reference tr(new ReadYourWritesTransaction(cx)); + + TraceEvent("BARW_CheckLeftoverKeys", randomID).detail("BackupTag", printable(self->backupTag)); + + try { + // Check the left over tasks + // We have to wait for the list to empty since an abort and get status + // can leave extra tasks in the queue + TraceEvent("BARW_CheckLeftoverTasks", randomID).detail("BackupTag", printable(self->backupTag)); + state int64_t taskCount = wait(backupAgent.getTaskCount(tr)); + state int waitCycles = 0; + + if ((taskCount) && false) { + TraceEvent("BARW_EndingNonzeroTaskCount", randomID) + .detail("BackupTag", printable(self->backupTag)) + .detail("TaskCount", taskCount) + .detail("WaitCycles", waitCycles); + printf("EndingNonZeroTasks: %ld\n", (long)taskCount); + wait(TaskBucket::debugPrintRange(cx, normalKeys.end, StringRef())); + } + + while (taskCount > 0) { + waitCycles++; + + TraceEvent("BARW_NonzeroTaskWait", randomID) + .detail("BackupTag", printable(self->backupTag)) + .detail("TaskCount", taskCount) + .detail("WaitCycles", waitCycles); + printf("%.6f %-10s Wait #%4d for %lld tasks to end\n", + now(), + randomID.toString().c_str(), + waitCycles, + (long long)taskCount); + + wait(delay(5.0)); + + tr = makeReference(cx); + wait(store(taskCount, backupAgent.getTaskCount(tr))); + } + + RangeResult agentValues = + wait(tr->getRange(KeyRange(KeyRangeRef(backupAgentKey, strinc(backupAgentKey))), 100)); + + // Error if the system keyspace for the backup tag is not empty + if (agentValues.size() > 0) { + displaySystemKeys++; + printf("BackupCorrectnessLeftOverMutationKeys: (%d) %s\n", + agentValues.size(), + printable(backupAgentKey).c_str()); + TraceEvent(SevError, "BackupCorrectnessLeftOverMutationKeys", randomID) + .detail("BackupTag", printable(self->backupTag)) + .detail("LeftOverKeys", agentValues.size()) + .detail("KeySpace", printable(backupAgentKey)); + for (auto& s : agentValues) { + TraceEvent("BARW_LeftOverKey", randomID) + .detail("Key", printable(StringRef(s.key.toString()))) + .detail("Value", printable(StringRef(s.value.toString()))); + printf(" Key: %-50s Value: %s\n", + printable(StringRef(s.key.toString())).c_str(), + printable(StringRef(s.value.toString())).c_str()); + } + } else { + printf("No left over backup agent configuration keys\n"); + } + + Optional latestVersion = wait(tr->get(backupLatestVersionsKey)); + if (latestVersion.present()) { + TraceEvent(SevError, "BackupCorrectnessLeftOverVersionKey", randomID) + .detail("BackupTag", printable(self->backupTag)) + .detail("BackupLatestVersionsKey", backupLatestVersionsKey.printable()) + .detail("DestUidValue", destUidValue.printable()); + } else { + printf("No left over backup version key\n"); + } + + RangeResult versions = wait(tr->getRange( + KeyRange(KeyRangeRef(backupLatestVersionsPath, strinc(backupLatestVersionsPath))), 1)); + if (!self->shareLogRange || !versions.size()) { + RangeResult logValues = wait( + tr->getRange(KeyRange(KeyRangeRef(backupLogValuesKey, strinc(backupLogValuesKey))), 100)); + + // Error if the log/mutation keyspace for the backup tag is not empty + if (logValues.size() > 0) { + displaySystemKeys++; + printf("BackupCorrectnessLeftOverLogKeys: (%d) %s\n", + logValues.size(), + printable(backupLogValuesKey).c_str()); + TraceEvent(SevError, "BackupCorrectnessLeftOverLogKeys", randomID) + .detail("BackupTag", printable(self->backupTag)) + .detail("LeftOverKeys", logValues.size()) + .detail("KeySpace", printable(backupLogValuesKey)); + } else { + printf("No left over backup log keys\n"); + } + } + + break; + } catch (Error& e) { + TraceEvent("BARW_CheckException", randomID).error(e); + wait(tr->onError(e)); + } + } + + if (displaySystemKeys) { + wait(TaskBucket::debugPrintRange(cx, normalKeys.end, StringRef())); + } + + TraceEvent("BARW_Complete", randomID).detail("BackupTag", printable(self->backupTag)); + + // Decrement the backup agent requets + if (self->agentRequest) { + BackupAndRestorePartitionedCorrectnessWorkload::backupAgentRequests--; + } + + // SOMEDAY: Remove after backup agents can exist quiescently + if ((g_simulator->backupAgents == ISimulator::BackupAgentType::BackupToFile) && + (!BackupAndRestorePartitionedCorrectnessWorkload::backupAgentRequests)) { + g_simulator->backupAgents = ISimulator::BackupAgentType::NoBackupAgents; + } + } catch (Error& e) { + TraceEvent(SevError, "BackupAndRestorePartitionedCorrectness").error(e).GetLastError(); + throw; + } + return Void(); + } +}; + +int BackupAndRestorePartitionedCorrectnessWorkload::backupAgentRequests = 0; + +// std::string getTestEncryptionFileName() { +// return "test_encryption_key_file"; +// } + +WorkloadFactory BackupAndRestorePartitionedCorrectnessWorkloadFactory; diff --git a/fdbserver/workloads/IncrementalBackup.actor.cpp b/fdbserver/workloads/IncrementalBackup.actor.cpp index 8712d09fe6b..cba4440d993 100644 --- a/fdbserver/workloads/IncrementalBackup.actor.cpp +++ b/fdbserver/workloads/IncrementalBackup.actor.cpp @@ -252,40 +252,40 @@ struct IncrementalBackupWorkload : TestWorkload { } if (!systemRestoreRange.empty()) { TraceEvent("IBackupSystemRestoreAttempt").detail("BeginVersion", beginVersion); - wait(success(self->backupAgent.restore(cx, - cx, - "system_restore"_sr, - backupURL, - {}, - systemRestoreRange, - WaitForComplete::True, - invalidVersion, - Verbose::True, - Key(), - Key(), - LockDB::True, - UnlockDB::True, - OnlyApplyMutationLogs::True, - InconsistentSnapshotOnly::False, - beginVersion))); + wait(success(self->backupAgent.restoreConstructVersion(cx, + cx, + "system_restore"_sr, + backupURL, + {}, + systemRestoreRange, + WaitForComplete::True, + invalidVersion, + Verbose::True, + Key(), + Key(), + LockDB::True, + UnlockDB::True, + OnlyApplyMutationLogs::True, + InconsistentSnapshotOnly::False, + beginVersion))); } TraceEvent("IBackupRestoreAttempt").detail("BeginVersion", beginVersion); - wait(success(self->backupAgent.restore(cx, - cx, - Key(self->tag.toString()), - backupURL, - {}, - restoreRange, - WaitForComplete::True, - invalidVersion, - Verbose::True, - Key(), - Key(), - LockDB::True, - UnlockDB::True, - OnlyApplyMutationLogs::True, - InconsistentSnapshotOnly::False, - beginVersion))); + wait(success(self->backupAgent.restoreConstructVersion(cx, + cx, + Key(self->tag.toString()), + backupURL, + {}, + restoreRange, + WaitForComplete::True, + invalidVersion, + Verbose::True, + Key(), + Key(), + LockDB::True, + UnlockDB::True, + OnlyApplyMutationLogs::True, + InconsistentSnapshotOnly::False, + beginVersion))); TraceEvent("IBackupRestoreSuccess").log(); } return Void(); diff --git a/fdbserver/workloads/MetaclusterRestoreWorkload.actor.cpp b/fdbserver/workloads/MetaclusterRestoreWorkload.actor.cpp index 1a0c3e2807f..0f40bb0c827 100644 --- a/fdbserver/workloads/MetaclusterRestoreWorkload.actor.cpp +++ b/fdbserver/workloads/MetaclusterRestoreWorkload.actor.cpp @@ -266,7 +266,8 @@ struct MetaclusterRestoreWorkload : TestWorkload { })); TraceEvent("MetaclusterRestoreWorkloadRestoreCluster").detail("ClusterName", clusterName); - wait(success(backupAgent.restore(dataDb, dataDb, clusterName, StringRef(backupUrl), {}, backupRanges))); + wait(success( + backupAgent.restoreConstructVersion(dataDb, dataDb, clusterName, StringRef(backupUrl), {}, backupRanges))); state std::vector messages; if (addToMetacluster) { diff --git a/fdbserver/workloads/RestoreBackup.actor.cpp b/fdbserver/workloads/RestoreBackup.actor.cpp index 7fdab3462fb..3c30c54a32c 100644 --- a/fdbserver/workloads/RestoreBackup.actor.cpp +++ b/fdbserver/workloads/RestoreBackup.actor.cpp @@ -121,34 +121,34 @@ struct RestoreBackupWorkload : TestWorkload { if (config.tenantMode == TenantMode::REQUIRED) { // restore system keys - wait(success(self->backupAgent.restore(cx, - cx, - "system_restore"_sr, - Key(self->backupContainer->getURL()), - self->backupContainer->getProxy(), - getSystemBackupRanges(), - WaitForComplete::True, - ::invalidVersion, - Verbose::True))); + wait(success(self->backupAgent.restoreConstructVersion(cx, + cx, + "system_restore"_sr, + Key(self->backupContainer->getURL()), + self->backupContainer->getProxy(), + getSystemBackupRanges(), + WaitForComplete::True, + ::invalidVersion, + Verbose::True))); // restore user data - wait(success(self->backupAgent.restore(cx, - cx, - self->tag, - Key(self->backupContainer->getURL()), - self->backupContainer->getProxy(), - WaitForComplete::True, - ::invalidVersion, - Verbose::True, - normalKeys))); + wait(success(self->backupAgent.restoreKeyRange(cx, + cx, + self->tag, + Key(self->backupContainer->getURL()), + self->backupContainer->getProxy(), + WaitForComplete::True, + ::invalidVersion, + Verbose::True, + normalKeys))); } else { - wait(success(self->backupAgent.restore(cx, - cx, - self->tag, - Key(self->backupContainer->getURL()), - self->backupContainer->getProxy(), - WaitForComplete::True, - ::invalidVersion, - Verbose::True))); + wait(success(self->backupAgent.restoreKeyRange(cx, + cx, + self->tag, + Key(self->backupContainer->getURL()), + self->backupContainer->getProxy(), + WaitForComplete::True, + ::invalidVersion, + Verbose::True))); } return Void(); diff --git a/fdbserver/workloads/RestoreFromBlob.actor.cpp b/fdbserver/workloads/RestoreFromBlob.actor.cpp index 834cefb8700..bf93adbc849 100644 --- a/fdbserver/workloads/RestoreFromBlob.actor.cpp +++ b/fdbserver/workloads/RestoreFromBlob.actor.cpp @@ -60,16 +60,16 @@ struct RestoreFromBlobWorkload : TestWorkload { wait(delay(self->restoreAfter)); if (config.tenantMode == TenantMode::REQUIRED) { // restore system keys followed by user keys - wait(success(backupAgent.restore( + wait(success(backupAgent.restoreConstructVersion( cx, {}, self->backupTag, self->backupURL, {}, getSystemBackupRanges(), self->waitForComplete))); Standalone> restoreRanges; restoreRanges.push_back_deep(restoreRanges.arena(), normalKeys); - wait(success(backupAgent.restore( + wait(success(backupAgent.restoreConstructVersion( cx, {}, self->backupTag, self->backupURL, {}, restoreRanges, self->waitForComplete))); } else { Standalone> restoreRanges; addDefaultBackupRanges(restoreRanges); - wait(success(backupAgent.restore( + wait(success(backupAgent.restoreConstructVersion( cx, {}, self->backupTag, self->backupURL, {}, restoreRanges, self->waitForComplete))); } return Void(); diff --git a/fdbserver/workloads/RestoreMultiRanges.actor.cpp b/fdbserver/workloads/RestoreMultiRanges.actor.cpp index b4e7e0ff5b4..8bb674d66ef 100644 --- a/fdbserver/workloads/RestoreMultiRanges.actor.cpp +++ b/fdbserver/workloads/RestoreMultiRanges.actor.cpp @@ -150,15 +150,15 @@ struct RestoreMultiRangesWorkload : TestWorkload { state Standalone> ranges; ranges.push_back_deep(ranges.arena(), KeyRangeRef("a"_sr, "aaaaa"_sr)); ranges.push_back_deep(ranges.arena(), KeyRangeRef("bb"_sr, "bbbbb"_sr)); // Skip "b" - wait(success(self->backupAgent.restore(cx, - cx, - Key(tagName), - Key(container->getURL()), - {}, - ranges, - WaitForComplete::True, - ::invalidVersion, - Verbose::True))); + wait(success(self->backupAgent.restoreConstructVersion(cx, + cx, + Key(tagName), + Key(container->getURL()), + {}, + ranges, + WaitForComplete::True, + ::invalidVersion, + Verbose::True))); TraceEvent("RestoreMultiRanges_Success"); return Void(); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index f93605683f9..80db0cdf822 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -135,6 +135,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES fast/BackupAzureBlobCorrectness.toml IGNORE) add_fdb_test(TEST_FILES fast/BackupS3BlobCorrectness.toml IGNORE) add_fdb_test(TEST_FILES fast/BackupCorrectness.toml) + add_fdb_test(TEST_FILES fast/BackupCorrectnessPartitioned.toml) add_fdb_test(TEST_FILES fast/BackupCorrectnessWithEKPKeyFetchFailures.toml) add_fdb_test(TEST_FILES fast/BackupCorrectnessWithTenantDeletion.toml) add_fdb_test(TEST_FILES fast/EncryptedBackupCorrectness.toml) diff --git a/tests/fast/BackupCorrectnessPartitioned.toml b/tests/fast/BackupCorrectnessPartitioned.toml new file mode 100644 index 00000000000..bb7026e433f --- /dev/null +++ b/tests/fast/BackupCorrectnessPartitioned.toml @@ -0,0 +1,19 @@ +testClass = "Backup" + +[[test]] +testTitle = 'BackupAndRestore' +clearAfterTest = false +simBackupAgents = 'BackupToFile' + + [[test.workload]] + testName = 'Cycle' + nodeCount = 30000 + transactionsPerSecond = 10000.0 + testDuration = 300.0 + expectedRate = 0 + + [[test.workload]] + testName = 'BackupAndRestorePartitionedCorrectness' + backupAfter = 10.0 + restoreAfter = 60.0 + backupRangesCount = -1 \ No newline at end of file