Skip to content

Commit 200a8c7

Browse files
committed
implement wait and triggering
1 parent 1a71fcf commit 200a8c7

File tree

1 file changed

+128
-10
lines changed
  • crates/symmetric_executor/src

1 file changed

+128
-10
lines changed

crates/symmetric_executor/src/lib.rs

Lines changed: 128 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66
};
77

88
use executor::exports::symmetric::runtime::symmetric_executor::{
9-
self, CallbackData, CallbackState,
9+
self, CallbackData, CallbackState, GuestEventSubscription,
1010
};
1111

1212
mod executor;
@@ -23,10 +23,18 @@ impl symmetric_executor::GuestEventSubscription for EventSubscription {
2323
fn ready(&self) -> bool {
2424
match &self.inner {
2525
EventType::Triggered {
26-
last_counter: _,
26+
last_counter,
2727
event_fd: _,
28-
object: _,
29-
} => todo!(),
28+
event,
29+
} => {
30+
let current_counter = event.lock().unwrap().counter;
31+
let active =
32+
current_counter != last_counter.load(std::sync::atomic::Ordering::Acquire);
33+
if active {
34+
last_counter.store(current_counter, std::sync::atomic::Ordering::Release);
35+
}
36+
active
37+
}
3038
EventType::SystemTime(system_time) => *system_time <= SystemTime::now(),
3139
}
3240
}
@@ -55,14 +63,32 @@ impl symmetric_executor::GuestEventGenerator for EventGenerator {
5563
inner: EventType::Triggered {
5664
last_counter: AtomicU32::new(0),
5765
event_fd,
58-
object: Arc::clone(&self.0),
66+
event: Arc::clone(&self.0),
5967
},
6068
callback: None,
6169
})
6270
}
6371

64-
fn activate(&self) -> () {
65-
todo!()
72+
fn activate(&self) {
73+
if let Ok(mut event) = self.0.lock() {
74+
event.counter += 1;
75+
let file_signal: u64 = 1;
76+
event.waiting.iter().for_each(|fd| {
77+
let result = unsafe {
78+
libc::write(
79+
*fd,
80+
core::ptr::from_ref(&file_signal).cast(),
81+
core::mem::size_of_val(&file_signal),
82+
)
83+
};
84+
if result >= 0 {
85+
assert_eq!(
86+
result,
87+
core::mem::size_of_val(&file_signal).try_into().unwrap()
88+
);
89+
}
90+
});
91+
}
6692
}
6793
}
6894

@@ -80,8 +106,100 @@ impl symmetric_executor::Guest for Guest {
80106
type EventSubscription = EventSubscription;
81107
type EventGenerator = EventGenerator;
82108

83-
fn run() -> () {
84-
todo!()
109+
fn run() {
110+
loop {
111+
let mut wait = libc::timeval {
112+
tv_sec: i64::MAX,
113+
tv_usec: 999999,
114+
};
115+
let mut tvptr = core::ptr::null_mut();
116+
let mut maxfd = 0;
117+
let now = SystemTime::now();
118+
let mut rfds = core::mem::MaybeUninit::<libc::fd_set>::uninit();
119+
let rfd_ptr = unsafe { core::ptr::from_mut(rfds.assume_init_mut()) };
120+
unsafe { libc::FD_ZERO(rfd_ptr) };
121+
{
122+
let mut ex = EXECUTOR.lock().unwrap();
123+
ex.active_tasks.iter_mut().for_each(|task| {
124+
if task.ready() {
125+
task.callback.take_if(|(f, data)| {
126+
matches!((f)(data.handle() as *mut ()), CallbackState::Ready)
127+
});
128+
} else {
129+
match &task.inner {
130+
EventType::Triggered {
131+
last_counter: _,
132+
event_fd,
133+
event: _,
134+
} => {
135+
unsafe { libc::FD_SET(*event_fd, rfd_ptr) };
136+
if *event_fd > maxfd {
137+
maxfd = *event_fd + 1;
138+
}
139+
}
140+
EventType::SystemTime(system_time) => {
141+
if *system_time > now {
142+
let diff = system_time.duration_since(now).unwrap_or_default(); //.as_micros();
143+
let secs = diff.as_secs() as i64;
144+
let usecs = diff.subsec_micros() as i64;
145+
if secs < wait.tv_sec
146+
|| (secs == wait.tv_sec && usecs < wait.tv_usec)
147+
{
148+
wait.tv_sec = secs;
149+
wait.tv_usec = usecs;
150+
// timeoutindex = n;
151+
}
152+
tvptr = core::ptr::from_mut(&mut wait);
153+
} else {
154+
task.callback.take_if(|(f, data)| {
155+
matches!(
156+
(f)(data.handle() as *mut ()),
157+
CallbackState::Ready
158+
)
159+
});
160+
}
161+
}
162+
}
163+
}
164+
});
165+
ex.active_tasks.retain(|task| task.callback.is_some());
166+
if ex.active_tasks.is_empty() {
167+
break;
168+
}
169+
}
170+
// with no work left the break should have occured
171+
assert!(!tvptr.is_null() || maxfd > 0);
172+
let selectresult = unsafe {
173+
libc::select(
174+
maxfd,
175+
rfd_ptr,
176+
std::ptr::null_mut(),
177+
std::ptr::null_mut(),
178+
tvptr,
179+
)
180+
};
181+
// we could look directly for the timeout
182+
if selectresult > 0 {
183+
let mut dummy: u64 = 0;
184+
// reset active file descriptors
185+
for i in 0..maxfd {
186+
if unsafe { libc::FD_ISSET(i, rfd_ptr) } {
187+
let readresult = unsafe {
188+
libc::read(
189+
i,
190+
core::ptr::from_mut(&mut dummy).cast(),
191+
core::mem::size_of_val(&dummy),
192+
)
193+
};
194+
assert!(
195+
readresult <= 0
196+
|| readresult
197+
== isize::try_from(core::mem::size_of_val(&dummy)).unwrap()
198+
);
199+
}
200+
}
201+
}
202+
}
85203
}
86204

87205
fn register(
@@ -124,7 +242,7 @@ enum EventType {
124242
Triggered {
125243
last_counter: AtomicU32,
126244
event_fd: EventFd,
127-
object: Arc<Mutex<EventInner>>,
245+
event: Arc<Mutex<EventInner>>,
128246
},
129247
SystemTime(SystemTime),
130248
}

0 commit comments

Comments
 (0)