Skip to content

Commit

Permalink
bulkdump restore framework
Browse files Browse the repository at this point in the history
  • Loading branch information
kakaiu committed Dec 12, 2024
1 parent 4ef6bca commit 295f750
Show file tree
Hide file tree
Showing 17 changed files with 621 additions and 65 deletions.
2 changes: 1 addition & 1 deletion documentation/sphinx/source/bulkdump.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ ManagementAPI provides following interfaces to do the operations:
1. Submit a job: submitBulkDumpJob(BulkDumpState job); // For generating the input job metadata, see the point 4.
2. Clear a job: clearBulkDumpJob();
3. Enable the feature: setBulkDumpMode(int mode); // Set mode = 1 to enable; Set mode = 0 to disable.
4. BulkDump job metadata is generated by newBulkDumpTaskLocalSST(KeyRange range, std::string remoteRoot); // Will include more APIs to generate the metadata as the funcationality expands (sp of functionality).
4. BulkDump job metadata is generated by newBulkDumpJobLocalSST(KeyRange range, std::string remoteRoot); // Will include more APIs to generate the metadata as the funcationality expands (sp of functionality).

Mechanisms
==========
Expand Down
2 changes: 1 addition & 1 deletion fdbcli/BulkDumpCommand.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ ACTOR Future<UID> bulkDumpCommandActor(Reference<IClusterConnectionRecord> clust
}
std::string remoteRoot = tokens[4].toString();
KeyRange range = Standalone(KeyRangeRef(rangeBegin, rangeEnd));
state BulkDumpState bulkDumpJob = newBulkDumpTaskLocalSST(range, remoteRoot);
state BulkDumpState bulkDumpJob = newBulkDumpJobLocalSST(range, remoteRoot);
wait(submitBulkDumpJob(cx, bulkDumpJob));
return bulkDumpJob.getJobId();

Expand Down
3 changes: 2 additions & 1 deletion fdbcli/BulkLoadCommand.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/IRandom.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.

Expand Down Expand Up @@ -128,7 +129,7 @@ ACTOR Future<UID> bulkLoadCommandActor(Reference<IClusterConnectionRecord> clust
std::string byteSampleFile = tokens[6].toString(); // TODO(BulkLoad): reject if the input bytes sampling file is
// not same as the configuration as FDB cluster
KeyRange range = Standalone(KeyRangeRef(rangeBegin, rangeEnd));
state BulkLoadState bulkLoadTask = newBulkLoadTaskLocalSST(range, folder, dataFile, byteSampleFile);
state BulkLoadState bulkLoadTask = newBulkLoadTaskLocalSST(UID(), range, folder, dataFile, byteSampleFile);
wait(submitBulkLoadTask(cx, bulkLoadTask));
return bulkLoadTask.getTaskId();

Expand Down
32 changes: 31 additions & 1 deletion fdbclient/BulkDumping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,37 @@

#include "fdbclient/BulkDumping.h"

BulkDumpState newBulkDumpTaskLocalSST(const KeyRange& range, const std::string& remoteRoot) {
std::string stringRemovePrefix(std::string str, const std::string& prefix) {
if (str.compare(0, prefix.length(), prefix) == 0) {
str.erase(0, prefix.length());
} else {
return "";
}
return str;
}

Key getKeyFromHexString(const std::string& rawString) {
if (rawString.empty()) {
return Key();
}
std::vector<uint8_t> byteList;
ASSERT((rawString.size() + 1) % 3 == 0);
for (size_t i = 0; i < rawString.size(); i += 3) {
std::string byteString = rawString.substr(i, 2);
uint8_t byte = static_cast<uint8_t>(std::stoul(byteString, nullptr, 16));
byteList.push_back(byte);
ASSERT(i + 2 >= rawString.size() || rawString[i + 2] == ' ');
}
return Standalone(StringRef(byteList.data(), byteList.size()));
}

BulkDumpState newBulkDumpJobLocalSST(const KeyRange& range, const std::string& remoteRoot) {
return BulkDumpState(
range, BulkDumpFileType::SST, BulkDumpTransportMethod::CP, BulkDumpExportMethod::File, remoteRoot);
}

BulkDumpRestoreState newBulkDumpRestoreJobLocalSST(const UID& jobId,
const KeyRange& range,
const std::string& remoteRoot) {
return BulkDumpRestoreState(jobId, remoteRoot, range, BulkDumpTransportMethod::CP);
}
6 changes: 4 additions & 2 deletions fdbclient/BulkLoading.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

#include "fdbclient/BulkLoading.h"

BulkLoadState newBulkLoadTaskLocalSST(KeyRange range,
BulkLoadState newBulkLoadTaskLocalSST(UID jobID,
KeyRange range,
std::string folder,
std::string dataFile,
std::string bytesSampleFile) {
Expand All @@ -32,5 +33,6 @@ BulkLoadState newBulkLoadTaskLocalSST(KeyRange range,
BulkLoadInjectMethod::File,
folder,
dataFiles,
bytesSampleFile);
bytesSampleFile,
jobID);
}
85 changes: 85 additions & 0 deletions fdbclient/ManagementAPI.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3215,6 +3215,91 @@ ACTOR Future<size_t> getBulkDumpCompleteTaskCount(Database cx, KeyRange rangeToR
return completeTaskCount;
}

ACTOR Future<Optional<BulkDumpRestoreState>> getOngoingBulkDumpRestoreJob(Database cx) {
state RangeResult rangeResult;
state Transaction tr(cx);
loop {
try {
// At most one job at a time, so looking at the first returned range is sufficient
wait(store(rangeResult,
krmGetRanges(&tr,
bulkDumpRestorePrefix,
normalKeys,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES)));
for (int i = 0; i < rangeResult.size() - 1; i++) {
if (rangeResult[i].value.empty()) {
continue;
}
BulkDumpRestoreState job = decodeBulkDumpRestoreState(rangeResult[i].value);
KeyRange jobRange = job.getRange();
ASSERT(!jobRange.empty() && jobRange.begin == rangeResult[i].key &&
jobRange.end == rangeResult[i + 1].key);
return job;
}
return Optional<BulkDumpRestoreState>();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}

ACTOR Future<bool> isBulkDumpRestoreAlive(Transaction* tr, Optional<UID> jobId = Optional<UID>()) {
state RangeResult rangeResult;
// At most one job at a time, so looking at the first returned range is sufficient
wait(store(rangeResult,
krmGetRanges(tr,
bulkDumpRestorePrefix,
normalKeys,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT,
CLIENT_KNOBS->KRM_GET_RANGE_LIMIT_BYTES)));
for (int i = 0; i < rangeResult.size() - 1; i++) {
if (rangeResult[i].value.empty()) {
continue;
}
BulkDumpRestoreState job = decodeBulkDumpRestoreState(rangeResult[i].value);
KeyRange jobRange = job.getRange();
ASSERT(!jobRange.empty() && jobRange.begin == rangeResult[i].key && jobRange.end == rangeResult[i + 1].key);
if (jobId.present() && job.getJobId() != jobId.get()) {
continue;
}
return true;
}
return false;
}

ACTOR Future<Void> submitBulkDumpRestore(Database cx, BulkDumpRestoreState jobState) {
state Transaction tr(cx);
loop {
try {
bool existAny = wait(isBulkDumpRestoreAlive(&tr));
if (existAny) {
return Void(); // early exit
}
wait(krmSetRange(&tr, bulkDumpRestorePrefix, jobState.getRange(), bulkDumpRestoreValue(jobState)));
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}

ACTOR Future<Void> clearBulkDumpRestore(Database cx) {
state Transaction tr(cx);
loop {
try {
tr.clear(bulkDumpRestoreKeys);
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}

// Persist a new owner if input uniqueId is not existing; Update description if input uniqueId exists
ACTOR Future<Void> registerRangeLockOwner(Database cx, std::string uniqueId, std::string description) {
if (uniqueId.empty() || description.empty()) {
Expand Down
15 changes: 15 additions & 0 deletions fdbclient/SystemData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,21 @@ BulkDumpState decodeBulkDumpState(const ValueRef& value) {
return bulkDumpState;
}

// Bulk dumping retore keys
const KeyRangeRef bulkDumpRestoreKeys = KeyRangeRef("\xff/bulkDumpRestore/"_sr, "\xff/bulkDumpRestore0"_sr);
const KeyRef bulkDumpRestorePrefix = bulkDumpRestoreKeys.begin;

const Value bulkDumpRestoreValue(const BulkDumpRestoreState& bulkDumpRestoreState) {
return ObjectWriter::toValue(bulkDumpRestoreState, IncludeVersion());
}

BulkDumpRestoreState decodeBulkDumpRestoreState(const ValueRef& value) {
BulkDumpRestoreState bulkDumpRestoreState;
ObjectReader reader(value.begin(), IncludeVersion());
reader.deserialize(bulkDumpRestoreState);
return bulkDumpRestoreState;
}

// Range Lock
const std::string rangeLockNameForBulkLoad = "BulkLoad";

Expand Down
114 changes: 110 additions & 4 deletions fdbclient/include/fdbclient/BulkDumping.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@

#ifndef FDBCLIENT_BULKDUMPING_H
#define FDBCLIENT_BULKDUMPING_H
#include "flow/Trace.h"
#pragma once

#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
#include "flow/TDMetric.actor.h"

std::string stringRemovePrefix(std::string str, const std::string& prefix);

Key getKeyFromHexString(const std::string& rawString);

// Define the configuration of bytes sampling
// Use for setting manifest file
Expand Down Expand Up @@ -141,6 +145,7 @@ struct BulkDumpManifest {

BulkDumpManifest() = default;

// For dumping
BulkDumpManifest(const BulkDumpFileSet& fileSet,
const Key& beginKey,
const Key& endKey,
Expand All @@ -153,6 +158,29 @@ struct BulkDumpManifest {
ASSERT(isValid());
}

// For restoring dumped data
BulkDumpManifest(const std::string& rawString) {
std::vector<std::string> parts = splitString(rawString, ", ");
ASSERT(parts.size() == 15);
std::string rootPath = stringRemovePrefix(parts[0], "[RootPath]: ");
std::string relativePath = stringRemovePrefix(parts[1], "[RelativePath]: ");
std::string manifestFileName = stringRemovePrefix(parts[2], "[ManifestFileName]: ");
std::string dataFileName = stringRemovePrefix(parts[3], "[DataFileName]: ");
std::string byteSampleFileName = stringRemovePrefix(parts[4], "[ByteSampleFileName]: ");
fileSet = BulkDumpFileSet(rootPath, relativePath, manifestFileName, dataFileName, byteSampleFileName);
beginKey = getKeyFromHexString(stringRemovePrefix(parts[5], "[BeginKey]: "));
endKey = getKeyFromHexString(stringRemovePrefix(parts[6], "[EndKey]: "));
version = std::stoll(stringRemovePrefix(parts[7], "[Version]: "));
checksum = stringRemovePrefix(parts[8], "[Checksum]: ");
bytes = std::stoull(stringRemovePrefix(parts[9], "[Bytes]: "));
int version = std::stoi(stringRemovePrefix(parts[10], "[ByteSampleVersion]: "));
std::string method = stringRemovePrefix(parts[11], "[ByteSampleMethod]: ");
int factor = std::stoi(stringRemovePrefix(parts[12], "[ByteSampleFactor]: "));
int overhead = std::stoi(stringRemovePrefix(parts[13], "[ByteSampleOverhead]: "));
double minimalProbability = std::stod(stringRemovePrefix(parts[14], "[ByteSampleMinimalProbability]: "));
byteSampleSetting = ByteSampleSetting(version, method, factor, overhead, minimalProbability);
}

bool isValid() const {
if (beginKey >= endKey) {
return false;
Expand All @@ -166,6 +194,8 @@ struct BulkDumpManifest {
return true;
}

KeyRange getRange() const { return Standalone(KeyRangeRef(beginKey, endKey)); }

std::string getBeginKeyString() const { return beginKey.toFullHexStringPlain(); }

std::string getEndKeyString() const { return endKey.toFullHexStringPlain(); }
Expand Down Expand Up @@ -219,7 +249,7 @@ struct BulkDumpState {
BulkDumpState() = default;

// The only public interface to create a valid task
// This constructor is call when users submitting a task, e.g. by newBulkDumpTaskLocalSST()
// This constructor is call when users submitting a task, e.g. by newBulkDumpJobLocalSST()
BulkDumpState(KeyRange range,
BulkDumpFileType fileType,
BulkDumpTransportMethod transportMethod,
Expand Down Expand Up @@ -365,10 +395,86 @@ struct BulkDumpState {
Optional<BulkDumpManifest> bulkDumpManifest; // Resulting remote bulkDumpManifest after the dumping task completes
};

// User API to create bulkDump task metadata
// User API to create bulkDump job metadata
// The dumped data is within the input range
// The data is dumped to the input remoteRoot
// The remoteRoot can be either a local root or a remote blobstore root string
BulkDumpState newBulkDumpTaskLocalSST(const KeyRange& range, const std::string& remoteRoot);
BulkDumpState newBulkDumpJobLocalSST(const KeyRange& range, const std::string& remoteRoot);

enum class BulkDumpRestorePhase : uint8_t {
Invalid = 0,
Submitted = 1,
Triggered = 2,
Complete = 3,
};

struct BulkDumpRestoreState {
constexpr static FileIdentifier file_identifier = 1384496;

BulkDumpRestoreState() = default;
BulkDumpRestoreState(const UID& jobId,
const std::string& remoteRoot,
const KeyRange& range,
BulkDumpTransportMethod transportMethod)
: jobId(jobId), remoteRoot(remoteRoot), range(range), phase(BulkDumpRestorePhase::Submitted),
transportMethod(transportMethod) {}

std::string toString() const {
return "[BulkDumpRestoreState]: [JobId]: " + jobId.toString() + ", [RemoteRoot]: " + remoteRoot +
", [Range]: " + range.toString() + ", [Phase]: " + std::to_string(static_cast<uint8_t>(phase)) +
", [TransportMethod]: " + std::to_string(static_cast<uint8_t>(transportMethod)) +
", [ManifestPath]: " + manifestPath + ", [DataPath]: " + dataPath +
", [ByteSamplePath]: " + byteSamplePath;
}

std::string getRemoteRoot() const { return remoteRoot; }

BulkDumpTransportMethod getTransportMethod() const { return transportMethod; }

UID getJobId() const { return jobId; }

BulkDumpRestorePhase getPhase() const { return phase; }

KeyRange getRange() const { return range; }

BulkDumpRestoreState getTaskToTrigger(const BulkDumpManifest& manifest) const {
BulkDumpRestoreState res = *this;
const std::string relativePath = joinPath(manifest.fileSet.rootPath, manifest.fileSet.relativePath);
res.manifestPath = joinPath(relativePath, manifest.fileSet.manifestFileName);
res.dataPath = joinPath(relativePath, manifest.fileSet.dataFileName);
res.byteSamplePath = joinPath(relativePath, manifest.fileSet.byteSampleFileName);
res.range = manifest.getRange();
res.phase = BulkDumpRestorePhase::Triggered;
return res;
}

std::string getDataFilePath() const { return dataPath; }

std::string getBytesSampleFilePath() const { return byteSamplePath; }

template <class Ar>
void serialize(Ar& ar) {
serializer(ar, jobId, range, transportMethod, remoteRoot, phase, manifestPath, dataPath, byteSamplePath);
}

private:
UID jobId;
KeyRange range;
BulkDumpTransportMethod transportMethod = BulkDumpTransportMethod::Invalid;
std::string remoteRoot;
BulkDumpRestorePhase phase;
std::string manifestPath;
std::string dataPath;
std::string byteSamplePath;
};

// User API to create bulkDumpRestore job metadata
// The restore data is within the input range from the remoteRoot
// The remoteRoot can be either a local folder or a remote blobstore folder string
// JobId is the job ID of the bulkdump job
// All data of the bulkdump job is uploaded to the folder <remoteRoot>/<jobId>
BulkDumpRestoreState newBulkDumpRestoreJobLocalSST(const UID& jobId,
const KeyRange& range,
const std::string& remoteRoot);

#endif
Loading

0 comments on commit 295f750

Please sign in to comment.