Skip to content

Commit

Permalink
Task AB# 1310789: [LevelDB] Add mechanism to request suspension of ba…
Browse files Browse the repository at this point in the history
…ckground (BG Thread) tasks, to allow the db to go into a suspended state and fix deadlock caused by mechanism (#6)

* Added mechanism to request suspension of background (BG Thread) tasks, to allow the db to go into a suspended state

* Fix deadlock in leveldbd that could occur when attempting to shut down while db was in the middle of compaction phase.

* fixing tests

---------

Co-authored-by: tedzu <[email protected]>
  • Loading branch information
yabmek-msft and tzuvich authored Dec 20, 2024
1 parent 336b207 commit 12d8819
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 3 deletions.
36 changes: 33 additions & 3 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,13 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
background_compaction_scheduled_(false),
manual_compaction_(nullptr),
versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {}
&internal_comparator_)),
suspending_compaction_(false) {}

DBImpl::~DBImpl() {
// Wait for background work to finish.
mutex_.Lock();
suspending_compaction_.store(false, std::memory_order_release); // make sure that the suspend flag is clear
shutting_down_.store(true, std::memory_order_release);
while (background_compaction_scheduled_) {
background_work_finished_signal_.Wait();
Expand Down Expand Up @@ -670,6 +672,8 @@ void DBImpl::MaybeScheduleCompaction() {
// Already scheduled
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
} else if (imm_ == nullptr && suspending_compaction_.load(std::memory_order_acquire)) {
// DB is being suspended; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else if (imm_ == nullptr && manual_compaction_ == nullptr &&
Expand All @@ -681,6 +685,23 @@ void DBImpl::MaybeScheduleCompaction() {
}
}

void DBImpl::SuspendCompaction() {
// set suspend flag and wait for any currently executing bg tasks to complete
Log(options_.info_log, "BG suspend compaction\n");
mutex_.Lock();
suspending_compaction_.store(true, std::memory_order_release);
mutex_.Unlock();
Log(options_.info_log, "BG suspended\n");
}

void DBImpl::ResumeCompaction() {
Log(options_.info_log, "BG resume compaction\n");
mutex_.Lock();
suspending_compaction_.store(false, std::memory_order_release);
mutex_.Unlock();
Log(options_.info_log, "db BG resumed\n");
}

void DBImpl::BGWork(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundCall();
}
Expand All @@ -700,7 +721,9 @@ void DBImpl::BackgroundCall() {

// Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed.
MaybeScheduleCompaction();
if (!suspending_compaction_.load(std::memory_order_acquire)) {
MaybeScheduleCompaction();
}
background_work_finished_signal_.SignalAll();
}

Expand Down Expand Up @@ -766,6 +789,8 @@ void DBImpl::BackgroundCompaction() {
// Done
} else if (shutting_down_.load(std::memory_order_acquire)) {
// Ignore compaction errors found during shutting down
} else if (suspending_compaction_.load(std::memory_order_acquire)) {
// Ignore compaction errors found during suspend
} else {
Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());
}
Expand Down Expand Up @@ -1353,6 +1378,9 @@ Status DBImpl::MakeRoomForWrite(bool force) {
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
break;
} else if (suspending_compaction_.load(std::memory_order_acquire)) {
// suspending, don't do this now
break;
} else if (imm_ != nullptr) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
Expand Down Expand Up @@ -1397,7 +1425,9 @@ Status DBImpl::MakeRoomForWrite(bool force) {
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
if (!suspending_compaction_.load(std::memory_order_acquire)) {
MaybeScheduleCompaction();
}
}
}
return s;
Expand Down
9 changes: 9 additions & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ class DBImpl : public DB {
bool GetProperty(const Slice& property, std::string* value) override;
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override;
void CompactRange(const Slice* begin, const Slice* end) override;
// Set the suspend flag, which tells the database not to schedule background
// work until resume
// Waits for any currently executing BG work to complete before returning
void SuspendCompaction() override;
// Clears the suspend flag, so that the database can schedule background work
void ResumeCompaction() override;

// Extra methods (for testing) that are not in the public DB interface

Expand Down Expand Up @@ -195,6 +201,9 @@ class DBImpl : public DB {
// Has a background compaction been scheduled or is running?
bool background_compaction_scheduled_ GUARDED_BY(mutex_);

// Has anyone issued a request to suspend background work?
std::atomic<bool> suspending_compaction_ GUARDED_BY(mutex_);

ManualCompaction* manual_compaction_ GUARDED_BY(mutex_);

VersionSet* const versions_ GUARDED_BY(mutex_);
Expand Down
4 changes: 4 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2169,6 +2169,10 @@ class ModelDB : public DB {
}
void CompactRange(const Slice* start, const Slice* end) override {}

void SuspendCompaction() override {}

void ResumeCompaction() override {}

private:
class ModelIter : public Iterator {
public:
Expand Down
6 changes: 6 additions & 0 deletions include/leveldb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ class LEVELDB_EXPORT DB {
// Therefore the following call will compact the entire database:
// db->CompactRange(nullptr, nullptr);
virtual void CompactRange(const Slice* begin, const Slice* end) = 0;

// Allows the underlying storage to prepare for an application suspend event
virtual void SuspendCompaction() = 0;

// Allow the underlying storage to react to an application resume event
virtual void ResumeCompaction() = 0;
};

// Destroy the contents of the specified database.
Expand Down

0 comments on commit 12d8819

Please sign in to comment.