lph-11/odin-http/nbio/nbio_darwin.odin
2025-03-13 18:14:21 +13:00

270 lines
6.1 KiB
Odin

package nbio
import "core:container/queue"
import "core:net"
import "core:os"
import "core:time"
import kqueue "_kqueue"
_init :: proc(io: ^IO, allocator := context.allocator) -> (err: os.Errno) {
qerr: kqueue.Queue_Error
io.kq, qerr = kqueue.kqueue()
if qerr != .None do return kq_err_to_os_err(qerr)
pool_init(&io.completion_pool, allocator = allocator)
io.timeouts = make([dynamic]^Completion, allocator)
io.io_pending = make([dynamic]^Completion, allocator)
queue.init(&io.completed, allocator = allocator)
io.allocator = allocator
return
}
_num_waiting :: #force_inline proc(io: ^IO) -> int {
return io.completion_pool.num_waiting
}
_destroy :: proc(io: ^IO) {
context.allocator = io.allocator
delete(io.timeouts)
delete(io.io_pending)
queue.destroy(&io.completed)
os.close(io.kq)
pool_destroy(&io.completion_pool)
}
_tick :: proc(io: ^IO) -> os.Errno {
return flush(io)
}
_listen :: proc(socket: net.TCP_Socket, backlog := 1000) -> net.Network_Error {
errno := os.listen(os.Socket(socket), backlog)
if errno != nil {
return net.Listen_Error(errno.(os.Platform_Error))
}
return nil
}
_accept :: proc(io: ^IO, socket: net.TCP_Socket, user: rawptr, callback: On_Accept) -> ^Completion {
completion := pool_get(&io.completion_pool)
completion.ctx = context
completion.user_data = user
completion.operation = Op_Accept{
callback = callback,
sock = socket,
}
queue.push_back(&io.completed, completion)
return completion
}
// Wraps os.close using the kqueue.
_close :: proc(io: ^IO, fd: Closable, user: rawptr, callback: On_Close) -> ^Completion {
completion := pool_get(&io.completion_pool)
completion.ctx = context
completion.user_data = user
completion.operation = Op_Close{
callback = callback,
}
op := &completion.operation.(Op_Close)
switch h in fd {
case net.TCP_Socket: op.handle = os.Handle(h)
case net.UDP_Socket: op.handle = os.Handle(h)
case net.Socket: op.handle = os.Handle(h)
case os.Handle: op.handle = h
}
queue.push_back(&io.completed, completion)
return completion
}
// TODO: maybe call this dial?
_connect :: proc(io: ^IO, endpoint: net.Endpoint, user: rawptr, callback: On_Connect) -> (^Completion, net.Network_Error) {
if endpoint.port == 0 {
return nil, net.Dial_Error.Port_Required
}
family := net.family_from_endpoint(endpoint)
sock, err := net.create_socket(family, .TCP)
if err != nil {
return nil, err
}
if err = _prepare_socket(sock); err != nil {
close(io, net.any_socket_to_socket(sock))
return nil, err
}
completion := pool_get(&io.completion_pool)
completion.ctx = context
completion.user_data = user
completion.operation = Op_Connect {
callback = callback,
socket = sock.(net.TCP_Socket),
sockaddr = _endpoint_to_sockaddr(endpoint),
}
queue.push_back(&io.completed, completion)
return completion, nil
}
_read :: proc(
io: ^IO,
fd: os.Handle,
offset: Maybe(int),
buf: []byte,
user: rawptr,
callback: On_Read,
all := false,
) -> ^Completion {
completion := pool_get(&io.completion_pool)
completion.ctx = context
completion.user_data = user
completion.operation = Op_Read {
callback = callback,
fd = fd,
buf = buf,
offset = offset.? or_else -1,
all = all,
len = len(buf),
}
queue.push_back(&io.completed, completion)
return completion
}
_recv :: proc(io: ^IO, socket: net.Any_Socket, buf: []byte, user: rawptr, callback: On_Recv, all := false) -> ^Completion {
completion := pool_get(&io.completion_pool)
completion.ctx = context
completion.user_data = user
completion.operation = Op_Recv {
callback = callback,
socket = socket,
buf = buf,
all = all,
len = len(buf),
}
queue.push_back(&io.completed, completion)
return completion
}
_send :: proc(
io: ^IO,
socket: net.Any_Socket,
buf: []byte,
user: rawptr,
callback: On_Sent,
endpoint: Maybe(net.Endpoint) = nil,
all := false,
) -> ^Completion {
if _, ok := socket.(net.UDP_Socket); ok {
assert(endpoint != nil)
}
completion := pool_get(&io.completion_pool)
completion.ctx = context
completion.user_data = user
completion.operation = Op_Send {
callback = callback,
socket = socket,
buf = buf,
endpoint = endpoint,
all = all,
len = len(buf),
}
queue.push_back(&io.completed, completion)
return completion
}
_write :: proc(
io: ^IO,
fd: os.Handle,
offset: Maybe(int),
buf: []byte,
user: rawptr,
callback: On_Write,
all := false,
) -> ^Completion {
completion := pool_get(&io.completion_pool)
completion.ctx = context
completion.user_data = user
completion.operation = Op_Write {
callback = callback,
fd = fd,
buf = buf,
offset = offset.? or_else -1,
all = all,
len = len(buf),
}
queue.push_back(&io.completed, completion)
return completion
}
// Runs the callback after the timeout, using the kqueue.
_timeout :: proc(io: ^IO, dur: time.Duration, user: rawptr, callback: On_Timeout) -> ^Completion {
completion := pool_get(&io.completion_pool)
completion.ctx = context
completion.user_data = user
completion.operation = Op_Timeout {
callback = callback,
expires = time.time_add(time.now(), dur),
}
append(&io.timeouts, completion)
return completion
}
_next_tick :: proc(io: ^IO, user: rawptr, callback: On_Next_Tick) -> ^Completion {
completion := pool_get(&io.completion_pool)
completion.ctx = context
completion.user_data = user
completion.operation = Op_Next_Tick {
callback = callback,
}
queue.push_back(&io.completed, completion)
return completion
}
_poll :: proc(io: ^IO, fd: os.Handle, event: Poll_Event, multi: bool, user: rawptr, callback: On_Poll) -> ^Completion {
completion := pool_get(&io.completion_pool)
completion.ctx = context
completion.user_data = user
completion.operation = Op_Poll{
callback = callback,
fd = fd,
event = event,
multi = multi,
}
append(&io.io_pending, completion)
return completion
}
_poll_remove :: proc(io: ^IO, fd: os.Handle, event: Poll_Event) -> ^Completion {
completion := pool_get(&io.completion_pool)
completion.ctx = context
completion.operation = Op_Poll_Remove{
fd = fd,
event = event,
}
append(&io.io_pending, completion)
return completion
}