Skip to content

Commit

Permalink
Allow per-platform log buffer size
Browse files Browse the repository at this point in the history
  • Loading branch information
dango-msft authored and yabmek-msft committed Dec 9, 2024
1 parent 23e35d7 commit a5e5a7c
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 15 deletions.
2 changes: 0 additions & 2 deletions db/log_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ enum RecordType {
};
static const int kMaxRecordType = kLastType;

static const int kBlockSize = 32768;

// Header is checksum (4 bytes), length (2 bytes), type (1 byte).
static const int kHeaderSize = 4 + 2 + 1;

Expand Down
14 changes: 7 additions & 7 deletions db/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
: file_(file),
reporter_(reporter),
checksum_(checksum),
backing_store_(new char[kBlockSize]),
backing_store_(new char[port::kLogBlockSize]),
buffer_(),
eof_(false),
last_record_offset_(0),
Expand All @@ -31,12 +31,12 @@ Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
Reader::~Reader() { delete[] backing_store_; }

bool Reader::SkipToInitialBlock() {
const size_t offset_in_block = initial_offset_ % kBlockSize;
const size_t offset_in_block = initial_offset_ % port::kLogBlockSize;
uint64_t block_start_location = initial_offset_ - offset_in_block;

// Don't search a block if we'd be in the trailer
if (offset_in_block > kBlockSize - 6) {
block_start_location += kBlockSize;
if (offset_in_block > port::kLogBlockSize - 6) {
block_start_location += port::kLogBlockSize;
}

end_of_buffer_offset_ = block_start_location;
Expand Down Expand Up @@ -192,14 +192,14 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
if (!eof_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
Status status = file_->Read(port::kLogBlockSize, &buffer_, backing_store_);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
ReportDrop(port::kLogBlockSize, status);
eof_ = true;
return kEof;
} else if (buffer_.size() < kBlockSize) {
} else if (buffer_.size() < port::kLogBlockSize) {
eof_ = true;
}
continue;
Expand Down
2 changes: 1 addition & 1 deletion db/log_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Reader {
bool const checksum_;
char* const backing_store_;
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
bool eof_; // Last Read() indicated EOF by returning < port::kLogBlockSize

// Offset of the last record returned by ReadRecord.
uint64_t last_record_offset_;
Expand Down
10 changes: 5 additions & 5 deletions db/log_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Writer::Writer(WritableFile* dest) : dest_(dest), block_offset_(0) {
}

Writer::Writer(WritableFile* dest, uint64_t dest_length)
: dest_(dest), block_offset_(dest_length % kBlockSize) {
: dest_(dest), block_offset_(dest_length % port::kLogBlockSize) {
InitTypeCrc(type_crc_);
}

Expand All @@ -41,7 +41,7 @@ Status Writer::AddRecord(const Slice& slice) {
Status s;
bool begin = true;
do {
const int leftover = kBlockSize - block_offset_;
const int leftover = port::kLogBlockSize - block_offset_;
assert(leftover >= 0);
if (leftover < kHeaderSize) {
// Switch to a new block
Expand All @@ -54,9 +54,9 @@ Status Writer::AddRecord(const Slice& slice) {
}

// Invariant: we never leave < kHeaderSize bytes in a block.
assert(kBlockSize - block_offset_ - kHeaderSize >= 0);
assert(port::kLogBlockSize - block_offset_ - kHeaderSize >= 0);

const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
const size_t avail = port::kLogBlockSize - block_offset_ - kHeaderSize;
const size_t fragment_length = (left < avail) ? left : avail;

RecordType type;
Expand All @@ -82,7 +82,7 @@ Status Writer::AddRecord(const Slice& slice) {
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr,
size_t length) {
assert(length <= 0xffff); // Must fit in two bytes
assert(block_offset_ + kHeaderSize + length <= kBlockSize);
assert(block_offset_ + kHeaderSize + length <= port::kLogBlockSize);

// Format the header
char buf[kHeaderSize];
Expand Down
3 changes: 3 additions & 0 deletions port/port_example.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ namespace port {
// TODO(jorlow): Many of these belong more in the environment class rather than
// here. We should try moving them and see if it affects perf.

// Buffer size for log
static const int kLogBlockSize = 32768;

// ------------------ Threading -------------------

// A Mutex represents an exclusive lock.
Expand Down
127 changes: 127 additions & 0 deletions port/port_posix.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// See port_example.h for documentation for the following types/functions.

#ifndef STORAGE_LEVELDB_PORT_PORT_POSIX_H_
#define STORAGE_LEVELDB_PORT_PORT_POSIX_H_

#undef PLATFORM_IS_LITTLE_ENDIAN
#if defined(OS_MACOSX)
#include <machine/endian.h>
#if defined(__DARWIN_LITTLE_ENDIAN) && defined(__DARWIN_BYTE_ORDER)
#define PLATFORM_IS_LITTLE_ENDIAN \
(__DARWIN_BYTE_ORDER == __DARWIN_LITTLE_ENDIAN)
#endif
#elif defined(OS_SOLARIS)
#include <sys/isa_defs.h>
#ifdef _LITTLE_ENDIAN
#define PLATFORM_IS_LITTLE_ENDIAN true
#else
#define PLATFORM_IS_LITTLE_ENDIAN false
#endif
#elif defined(OS_FREEBSD) || defined(OS_OPENBSD) ||\
defined(OS_NETBSD) || defined(OS_DRAGONFLYBSD)
#include <sys/types.h>
#include <sys/endian.h>
#define PLATFORM_IS_LITTLE_ENDIAN (_BYTE_ORDER == _LITTLE_ENDIAN)
#elif defined(OS_HPUX)
#define PLATFORM_IS_LITTLE_ENDIAN false
#elif defined(OS_ANDROID)
// Due to a bug in the NDK x86 <sys/endian.h> definition,
// _BYTE_ORDER must be used instead of __BYTE_ORDER on Android.
// See http://code.google.com/p/android/issues/detail?id=39824
#include <endian.h>
#define PLATFORM_IS_LITTLE_ENDIAN (_BYTE_ORDER == _LITTLE_ENDIAN)
#else
#include <endian.h>
#endif

#include <pthread.h>
#ifdef SNAPPY
#include <snappy.h>
#endif
#include <stdint.h>
#include <string>
#include "port/atomic_pointer.h"

#ifndef PLATFORM_IS_LITTLE_ENDIAN
#define PLATFORM_IS_LITTLE_ENDIAN (__BYTE_ORDER == __LITTLE_ENDIAN)
#endif

#if defined(OS_MACOSX) || defined(OS_SOLARIS) || defined(OS_FREEBSD) ||\
defined(OS_NETBSD) || defined(OS_OPENBSD) || defined(OS_DRAGONFLYBSD) ||\
defined(OS_ANDROID) || defined(OS_HPUX) || defined(CYGWIN)
// Use fread/fwrite/fflush on platforms without _unlocked variants
#define fread_unlocked fread
#define fwrite_unlocked fwrite
#define fflush_unlocked fflush
#endif

#if defined(OS_MACOSX) || defined(OS_FREEBSD) ||\
defined(OS_OPENBSD) || defined(OS_DRAGONFLYBSD)
// Use fsync() on platforms without fdatasync()
#define fdatasync fsync
#endif

#if defined(OS_ANDROID) && __ANDROID_API__ < 9
// fdatasync() was only introduced in API level 9 on Android. Use fsync()
// when targetting older platforms.
#define fdatasync fsync
#endif

namespace leveldb {
namespace port {

static const bool kLittleEndian = PLATFORM_IS_LITTLE_ENDIAN;
#undef PLATFORM_IS_LITTLE_ENDIAN

static const int kLogBlockSize = 32768;

class CondVar;

class Mutex {
public:
Mutex();
~Mutex();

void Lock();
void Unlock();
void AssertHeld() { }

private:
friend class CondVar;
pthread_mutex_t mu_;

// No copying
Mutex(const Mutex&);
void operator=(const Mutex&);
};

class CondVar {
public:
explicit CondVar(Mutex* mu);
~CondVar();
void Wait();
void Signal();
void SignalAll();
private:
pthread_cond_t cv_;
Mutex* mu_;
};

typedef pthread_once_t OnceType;
#define LEVELDB_ONCE_INIT PTHREAD_ONCE_INIT
extern void InitOnce(OnceType* once, void (*initializer)());

inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) {
return false;
}

uint32_t AcceleratedCRC32C(uint32_t crc, const char* buf, size_t size);

} // namespace port
} // namespace leveldb

#endif // STORAGE_LEVELDB_PORT_PORT_POSIX_H_
2 changes: 2 additions & 0 deletions port/port_stdcxx.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ namespace port {

class CondVar;

static const int kLogBlockSize = 32768;

// Thinly wraps std::mutex.
class LOCKABLE Mutex {
public:
Expand Down
133 changes: 133 additions & 0 deletions port/port_win.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// LevelDB Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// See port_example.h for documentation for the following types/functions.

// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of the University of California, Berkeley nor the
// names of its contributors may be used to endorse or promote products
// derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//

#ifndef STORAGE_LEVELDB_PORT_PORT_WIN_H_
#define STORAGE_LEVELDB_PORT_PORT_WIN_H_

#define close _close
#define fread_unlocked _fread_nolock

#include <string>
#include <mutex>
#include <stdint.h>
#include <cassert>
#include <condition_variable>

namespace leveldb {
namespace port {

// Windows is little endian (for now :p)
static const bool kLittleEndian = true;

static const int kLogBlockSize = 32768;

class CondVar;

class Mutex {
public:
Mutex() {

}

void Lock() {
mutex.lock();
}
void Unlock() {
mutex.unlock();
}

void AssertHeld() {
//TODO
}

private:
friend class CondVar;

std::mutex mutex;
};

// Thinly wraps std::condition_variable.
class CondVar {
public:
explicit CondVar(Mutex* mu) : mu_(mu) { assert(mu != nullptr); }
~CondVar() = default;

CondVar(const CondVar&) = delete;
CondVar& operator=(const CondVar&) = delete;

void Wait() {
std::unique_lock<std::mutex> lock(mu_->mutex, std::adopt_lock);
cv_.wait(lock);
lock.release();
}
void Signal() { cv_.notify_one(); }
void SignalAll() { cv_.notify_all(); }
private:
std::condition_variable cv_;
Mutex* const mu_;
};

// Storage for a lock-free pointer
class AtomicPointer {
private:
void * rep_;
public:
AtomicPointer() : rep_(nullptr) { }
explicit AtomicPointer(void* v);
void* Acquire_Load() const;

void Release_Store(void* v);

void* NoBarrier_Load() const;

void NoBarrier_Store(void* v);
};

// Thread-safe initialization.
// Used as follows:
// static port::OnceType init_control = LEVELDB_ONCE_INIT;
// static void Initializer() { ... do something ...; }
// ...
// port::InitOnce(&init_control, &Initializer);
typedef intptr_t OnceType;
#define LEVELDB_ONCE_INIT 0
inline void InitOnce(port::OnceType*, void(*initializer)()) {
initializer();
}

inline bool GetHeapProfile(void(*func)(void*, const char*, int), void* arg) {
return false;
}

uint32_t AcceleratedCRC32C(uint32_t crc, const char* buf, size_t size);
}
}

#endif // STORAGE_LEVELDB_PORT_PORT_WIN_H_

0 comments on commit a5e5a7c

Please sign in to comment.