Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PROCESS_COMPLETIONS_ON_SUBMIT: acquire CQ before submission #50

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions tokio-epoll-uring/src/system/submission/op_fut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,22 @@ where
};

fn do_submit(mut open_guard: SubmitSideOpenGuard, sqe: io_uring::squeue::Entry) {
// If PROCESS_COMPLETIONS_ON_SUBMIT is enabled, acquire the completion queue
// guard before submission so that we, not the poller task, are guaranteed to
// observe an immediate completion. TODO: the poller task may still be woken
// up through the epoll of the io_uring fd, investigate whether that actually
// happens and whether there's a way around it.

#[allow(unused_assignments)]
let mut cq_owned = None;
let cq_guard = if *crate::env_tunables::PROCESS_COMPLETIONS_ON_SUBMIT {
let cq = Arc::clone(&open_guard.completion_side);
cq_owned = Some(cq);
Some(cq_owned.as_ref().expect("we just set it").lock().unwrap())
} else {
None
};

if open_guard.submit_raw(sqe).is_err() {
// TODO: DESIGN: io_uring can deal have more ops inflight than the SQ.
// So, we could just submit_and_wait here. But, that'd prevent the
Expand All @@ -80,24 +96,11 @@ where
unreachable!("the `ops` has same size as the SQ, so, if SQ is full, we wouldn't have been able to get this slot");
}

// this allows us to keep the possible guard in cq_guard because the arc lives on stack
#[allow(unused_assignments)]
let mut cq_owned = None;

let cq_guard = if *crate::env_tunables::PROCESS_COMPLETIONS_ON_SUBMIT {
let cq = Arc::clone(&open_guard.completion_side);
cq_owned = Some(cq);
Some(cq_owned.as_ref().expect("we just set it").lock().unwrap())
} else {
None
};
drop(open_guard); // drop it asap to enable timely shutdown

if let Some(mut cq) = cq_guard {
// opportunistically process completion immediately
// TODO do it during ::poll() as well?
//
// FIXME: why are we doing this while holding the SubmitSideOpen
cq.process_completions(ProcessCompletionsCause::Regular);
}
}
Expand Down
Loading