Skip to content

Commit

Permalink
Merge pull request #39 from vertexclique/add-ring-options
Browse files Browse the repository at this point in the history
Add ring options
  • Loading branch information
vertexclique authored Jan 26, 2024
2 parents 4926e29 + 4ec18ee commit 323b1d0
Show file tree
Hide file tree
Showing 13 changed files with 242 additions and 108 deletions.
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ path = "h1-server.rs"
name = "h1-server-multishot"
path = "h1-server-multishot.rs"

[[example]]
name = "proactor-config-fwrite"
path = "proactor-config-fwrite.rs"

[[example]]
name = "tcp-server"
path = "tcp-server.rs"
Expand Down
69 changes: 69 additions & 0 deletions examples/proactor-config-fwrite.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use nuclei::*;
use std::fs::{File, OpenOptions};
use std::io;
use std::path::PathBuf;
use std::time::Duration;

use futures::io::SeekFrom;
use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use nuclei::config::{IoUringConfiguration, NucleiConfig};

const DARK_MATTER_TEXT: &'static str = "\
Dark matter is a form of matter thought to account for approximately \
85% of the matter in the universe and about a quarter of its total \
mass–energy density or about 2.241×10−27 kg/m3. Its presence is implied \
in a variety of astrophysical observations, including gravitational effects \
that cannot be explained by accepted theories of gravity unless more matter \
is present than can be seen. For this reason, most experts think that dark \
matter is abundant in the universe and that it has had a strong influence \
on its structure and evolution. Dark matter is called dark because it does \
not appear to interact with the electromagnetic field, which means it doesn't \
absorb, reflect or emit electromagnetic radiation, and is therefore difficult \
to detect.[1]\
\
";

// #[nuclei::main]
fn main() -> io::Result<()> {
let nuclei_config = NucleiConfig {
// Other options for IO_URING are:
// * low_latency_driven,
// * kernel_poll_only
// * io_poll
iouring: IoUringConfiguration::interrupt_driven(1 << 11),
};
let _ = Proactor::with_config(nuclei_config);

// Approximately ~75,9 MB
let dark_matter = vec![DARK_MATTER_TEXT; 100_000].join("\n");

let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("data");
path.push("dark-matter");

drive(async {
// Approximately ~75,9 MB
let dark_matter = vec![DARK_MATTER_TEXT; 100_000].join("\n");

let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("data");
path.push("dark-matter");

let fo = OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.unwrap();
let mut file = Handle::<File>::new(fo).unwrap();
file.write_all(dark_matter.as_bytes()).await.unwrap();

let mut buf = vec![];
assert!(file.seek(SeekFrom::Start(0)).await.is_ok());
assert_eq!(file.read_to_end(&mut buf).await.unwrap(), dark_matter.len());
assert_eq!(&buf[0..dark_matter.len()], dark_matter.as_bytes());

println!("Length of file is {}", buf.len());
});

Ok(())
}
28 changes: 14 additions & 14 deletions src/async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@ impl AsyncRead for Handle<File> {
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
let mut store = &mut self.get_mut().store_file;
let store = &mut self.get_mut().store_file;

if let Some(mut store_file) = store.as_mut() {
if let Some(store_file) = store.as_mut() {
let fd: RawFd = store_file.receive_fd();
let op_state = store_file.op_state();
let (_, pos) = store_file.bufpair();
Expand Down Expand Up @@ -239,10 +239,10 @@ const NON_READ: &[u8] = &[];

#[cfg(all(feature = "iouring", target_os = "linux"))]
impl AsyncBufRead for Handle<File> {
fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let mut store = &mut self.get_mut().store_file;
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let store = &mut self.get_mut().store_file;

if let Some(mut store_file) = store.as_mut() {
if let Some(store_file) = store.as_mut() {
let fd: RawFd = store_file.receive_fd();
let op_state = store_file.op_state();
let (bufp, pos) = store_file.bufpair();
Expand All @@ -267,7 +267,7 @@ impl AsyncBufRead for Handle<File> {
}

fn consume(self: Pin<&mut Self>, amt: usize) {
let mut store = self.get_mut().store_file.as_mut().unwrap();
let store = self.get_mut().store_file.as_mut().unwrap();
store.buf().consume(amt);
}
}
Expand All @@ -279,9 +279,9 @@ impl AsyncWrite for Handle<File> {
cx: &mut Context<'_>,
bufslice: &[u8],
) -> Poll<io::Result<usize>> {
let mut store = &mut self.get_mut().store_file;
let store = &mut self.get_mut().store_file;

if let Some(mut store_file) = store.as_mut() {
if let Some(store_file) = store.as_mut() {
let fd: RawFd = store_file.receive_fd();
let op_state = store_file.op_state();
let (bufp, pos) = store_file.bufpair();
Expand Down Expand Up @@ -319,9 +319,9 @@ impl AsyncWrite for Handle<File> {
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
let mut store = &mut self.get_mut().store_file;
let store = &mut self.get_mut().store_file;

if let Some(mut store_file) = store.as_mut() {
if let Some(store_file) = store.as_mut() {
let fd: RawFd = store_file.receive_fd();
let op_state = store_file.op_state();
let (_, pos) = store_file.bufpair();
Expand Down Expand Up @@ -349,9 +349,9 @@ impl AsyncWrite for Handle<File> {
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mut store = &mut self.get_mut().store_file;
let store = &mut self.get_mut().store_file;

if let Some(mut store_file) = store.as_mut() {
if let Some(store_file) = store.as_mut() {
let fd: RawFd = store_file.receive_fd();
let op_state = store_file.op_state();

Expand All @@ -377,7 +377,7 @@ impl AsyncSeek for Handle<File> {
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
let mut store = &mut self.get_mut().store_file.as_mut().unwrap();
let store = &mut self.get_mut().store_file.as_mut().unwrap();

let (cursor, offset) = match pos {
io::SeekFrom::Start(n) => {
Expand All @@ -392,7 +392,7 @@ impl AsyncSeek for Handle<File> {
}
};
let valid_seek = if offset.is_negative() {
match cursor.checked_sub(offset.abs() as usize) {
match cursor.checked_sub(offset.unsigned_abs() as usize) {
Some(valid_seek) => valid_seek,
None => {
let invalid = io::Error::from(io::ErrorKind::InvalidInput);
Expand Down
73 changes: 73 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,89 @@ pub struct IoUringConfiguration {
/// If [None] passed unbounded workers will be limited by the process task limit,
/// as indicated by the rlimit [RLIMIT_NPROC](https://man7.org/linux/man-pages/man2/getrlimit.2.html) limit.
pub per_numa_unbounded_worker_count: Option<u32>,

/// This argument allows aggressively waiting on CQ(completion queue) to have low latency of IO completions.
/// Basically, this argument polls CQEs(completion queue events) directly on cq at userspace.
/// Mind that, using this increase pressure on CPUs from userspace side. By default, nuclei reaps the CQ with
/// aggressive wait. This is double polling approach for nuclei where Kernel gets submissions by itself (SQPOLL),
/// processes it and puts completions to completion queue and we immediately pick up without latency
/// (aggressive_poll).
///
/// **[default]**: `true`.
pub aggressive_poll: bool,

/// Perform busy-waiting for I/O completion events, as opposed to getting notifications via an
/// asynchronous IRQ (Interrupt Request). This will reduce latency, but increases CPU usage.
/// This is only usable on file systems that support polling and files opened with `O_DIRECT`.
///
/// **[default]**: `false`.
pub iopoll_enabled: bool,
// XXX: `redrive_kthread_wake` = bool, syncs queue changes so kernel threads got awakened. increased cpu usage.
}

impl IoUringConfiguration {
///
/// Standard way to use IO_URING. No polling, purely IRQ awaken IO completion.
/// This is a normal way to process IO, mind that with this approach
/// `actual completion time != userland reception of completion`.
/// Throughput is low compared to all the other config alternatives.
///
/// **NOTE:** If you don't know what to select as configuration, please select this.
pub fn interrupt_driven(queue_len: u32) -> Self {
Self {
queue_len,
sqpoll_wake_interval: None,
per_numa_bounded_worker_count: None,
per_numa_unbounded_worker_count: None,
aggressive_poll: false,
iopoll_enabled: false,
}
}

///
/// Low Latency Driven version of IO_URING, where it is suitable for high traffic environments.
/// High throughput low latency solution where it consumes a lot of resources.
pub fn low_latency_driven(queue_len: u32) -> Self {
Self {
queue_len,
sqpoll_wake_interval: Some(2),
aggressive_poll: true,
..Self::default()
}
}

///
/// Kernel poll only version of IO_URING, where it is suitable for high traffic environments.
/// This version won't allow aggressive polling on completion queue(CQ).
pub fn kernel_poll_only(queue_len: u32) -> Self {
Self {
queue_len,
sqpoll_wake_interval: Some(2),
aggressive_poll: true,
..Self::default()
}
}

///
/// IOPOLL enabled ring configuration for operating on files with low-latency.
pub fn io_poll(queue_len: u32) -> Self {
Self {
queue_len,
iopoll_enabled: true,
..Self::default()
}
}
}

impl Default for IoUringConfiguration {
fn default() -> Self {
Self {
queue_len: 1 << 11,
sqpoll_wake_interval: Some(2),
per_numa_bounded_worker_count: Some(1 << 8),
per_numa_unbounded_worker_count: Some(1 << 9),
aggressive_poll: true,
iopoll_enabled: false,
}
}
}
13 changes: 6 additions & 7 deletions src/proactor.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::ops::DerefMut;
use std::task::{Context, Poll};
use std::time::Duration;
use std::{future::Future, io};

use crate::config::NucleiConfig;
use once_cell::sync::{Lazy, OnceCell};
use once_cell::sync::OnceCell;

use super::syscore::*;
use super::waker::*;
Expand All @@ -25,7 +24,7 @@ impl Proactor {
/// Returns a reference to the proactor.
pub fn get() -> &'static Proactor {
unsafe {
&PROACTOR.get_or_init(|| {
PROACTOR.get_or_init(|| {
Proactor(
SysProactor::new(NucleiConfig::default())
.expect("cannot initialize IO backend"),
Expand All @@ -37,15 +36,14 @@ impl Proactor {
/// Builds a proactor instance with given config and returns a reference to it.
pub fn with_config(config: NucleiConfig) -> &'static Proactor {
unsafe {
let mut proactor =
let proactor =
Proactor(SysProactor::new(config.clone()).expect("cannot initialize IO backend"));
PROACTOR
.set(proactor)
.map_err(|e| "Proactor instance not being able to set.")
.unwrap();
let proactor =
Proactor(SysProactor::new(config).expect("cannot initialize IO backend"));
&PROACTOR.get_or_init(|| proactor)

PROACTOR.wait()
}
}

Expand Down Expand Up @@ -133,6 +131,7 @@ mod proactor_tests {
sqpoll_wake_interval: Some(11),
per_numa_bounded_worker_count: Some(12),
per_numa_unbounded_worker_count: Some(13),
..IoUringConfiguration::default()
},
};
let new = Proactor::with_config(config);
Expand Down
4 changes: 2 additions & 2 deletions src/syscore/linux/iouring/fs/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl Buffer {
unsafe {
let data: *mut u8 = self.data.cast().as_ptr();
let cap = self.cap - self.pos;
slice::from_raw_parts(data.offset(self.pos as isize), cap as usize)
slice::from_raw_parts(data.add(self.pos), cap)
}
} else {
&[]
Expand Down Expand Up @@ -81,7 +81,7 @@ impl Buffer {

#[inline(always)]
pub fn consume(&mut self, amt: usize) {
self.pos = cmp::min(self.pos + amt as usize, self.cap);
self.pos = cmp::min(self.pos + amt, self.cap);
}

#[inline(always)]
Expand Down
2 changes: 0 additions & 2 deletions src/syscore/linux/iouring/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,4 @@ pub(crate) mod buffer;
pub(crate) mod cancellation;
pub(crate) mod store_file;

pub(crate) use buffer::*;
pub(crate) use cancellation::*;
pub(crate) use store_file::*;
8 changes: 2 additions & 6 deletions src/syscore/linux/iouring/fs/store_file.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
use crate::Handle;
use lever::sync::prelude::TTas;
use std::fs::File;
use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::os::unix::io::{FromRawFd, RawFd};
use std::pin::Pin;
use std::sync::Arc;

use super::buffer::Buffer;
use crate::syscore::Processor;
use lever::sync::atomics::AtomicBox;
use pin_utils::unsafe_pinned;
use std::task::{Context, Poll};

pub struct StoreFile {
fd: RawFd,
Expand Down Expand Up @@ -87,7 +83,7 @@ impl StoreFile {
&mut self.pos
}

pub(crate) fn guard_op(self: &mut Self, op: Op) {
pub(crate) fn guard_op(&mut self, op: Op) {
// let this = unsafe { Pin::get_unchecked_mut(self) };
// if *self.op_state.get() != Op::Pending && *self.op_state.get() != op {
// self.cancel();
Expand Down
Loading

0 comments on commit 323b1d0

Please sign in to comment.