package nbio import "core:container/queue" import "core:log" import "core:net" import "core:os" import "core:time" import win "core:sys/windows" _init :: proc(io: ^IO, allocator := context.allocator) -> (err: os.Errno) { io.allocator = allocator pool_init(&io.completion_pool, allocator = allocator) queue.init(&io.completed, allocator = allocator) io.timeouts = make([dynamic]^Completion, allocator) io.offsets = make(map[os.Handle]u32, allocator = allocator) win.ensure_winsock_initialized() defer if err != nil { assert(win.WSACleanup() == win.NO_ERROR) } io.iocp = win.CreateIoCompletionPort(win.INVALID_HANDLE_VALUE, nil, 0, 0) if io.iocp == nil { err = os.Platform_Error(win.GetLastError()) return } return } _destroy :: proc(io: ^IO) { context.allocator = io.allocator delete(io.timeouts) queue.destroy(&io.completed) pool_destroy(&io.completion_pool) delete(io.offsets) // TODO: error handling. win.CloseHandle(io.iocp) // win.WSACleanup() } _num_waiting :: #force_inline proc(io: ^IO) -> int { return io.completion_pool.num_waiting } _tick :: proc(io: ^IO) -> (err: os.Errno) { if queue.len(io.completed) == 0 { next_timeout := flush_timeouts(io) // Wait a maximum of a ms if there is nothing to do. // TODO: this is pretty naive, a typical server always has accept completions pending and will be at 100% cpu. wait_ms: win.DWORD = 1 if io.io_pending == 0 else 0 // But, to counter inaccuracies in low timeouts, // lets make the call exit immediately if the next timeout is close. if nt, ok := next_timeout.?; ok && nt <= time.Millisecond * 15 { wait_ms = 0 } events: [256]win.OVERLAPPED_ENTRY entries_removed: win.ULONG if !win.GetQueuedCompletionStatusEx(io.iocp, &events[0], len(events), &entries_removed, wait_ms, false) { if terr := win.GetLastError(); terr != win.WAIT_TIMEOUT { err = os.Platform_Error(terr) return } } // assert(io.io_pending >= int(entries_removed)) io.io_pending -= int(entries_removed) for event in events[:entries_removed] { if event.lpOverlapped == nil { @static logged: bool if !logged { log.warn("You have ran into a strange error some users have ran into on Windows 10 but I can't reproduce, I try to recover from the error but please chime in at https://github.com/laytan/odin-http/issues/34") logged = true } io.io_pending += 1 continue } // This is actually pointing at the Completion.over field, but because it is the first field // It is also a valid pointer to the Completion struct. completion := cast(^Completion)event.lpOverlapped queue.push_back(&io.completed, completion) } } // Prevent infinite loop when callback adds to completed by storing length. n := queue.len(io.completed) for _ in 0 ..< n { completion := queue.pop_front(&io.completed) context = completion.ctx handle_completion(io, completion) } return } _listen :: proc(socket: net.TCP_Socket, backlog := 1000) -> (err: net.Network_Error) { if res := win.listen(win.SOCKET(socket), i32(backlog)); res == win.SOCKET_ERROR { err = net.Listen_Error(win.WSAGetLastError()) } return } // Basically a copy of `os.open`, where a flag is added to signal async io, and creation of IOCP. // Specifically the FILE_FLAG_OVERLAPPEd flag. _open :: proc(io: ^IO, path: string, mode, perm: int) -> (os.Handle, os.Errno) { if len(path) == 0 { return os.INVALID_HANDLE, os.ERROR_FILE_NOT_FOUND } access: u32 //odinfmt:disable switch mode & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR) { case os.O_RDONLY: access = win.FILE_GENERIC_READ case os.O_WRONLY: access = win.FILE_GENERIC_WRITE case os.O_RDWR: access = win.FILE_GENERIC_READ | win.FILE_GENERIC_WRITE } //odinfmt:enable if mode & os.O_CREATE != 0 { access |= win.FILE_GENERIC_WRITE } if mode & os.O_APPEND != 0 { access &~= win.FILE_GENERIC_WRITE access |= win.FILE_APPEND_DATA } share_mode := win.FILE_SHARE_READ | win.FILE_SHARE_WRITE sa: ^win.SECURITY_ATTRIBUTES = nil sa_inherit := win.SECURITY_ATTRIBUTES { nLength = size_of(win.SECURITY_ATTRIBUTES), bInheritHandle = true, } if mode & os.O_CLOEXEC == 0 { sa = &sa_inherit } create_mode: u32 switch { case mode & (os.O_CREATE | os.O_EXCL) == (os.O_CREATE | os.O_EXCL): create_mode = win.CREATE_NEW case mode & (os.O_CREATE | os.O_TRUNC) == (os.O_CREATE | os.O_TRUNC): create_mode = win.CREATE_ALWAYS case mode & os.O_CREATE == os.O_CREATE: create_mode = win.OPEN_ALWAYS case mode & os.O_TRUNC == os.O_TRUNC: create_mode = win.TRUNCATE_EXISTING case: create_mode = win.OPEN_EXISTING } flags := win.FILE_ATTRIBUTE_NORMAL | win.FILE_FLAG_BACKUP_SEMANTICS // This line is the only thing different from the `os.open` procedure. // This makes it an asynchronous file that can be used in nbio. flags |= win.FILE_FLAG_OVERLAPPED wide_path := win.utf8_to_wstring(path) handle := os.Handle(win.CreateFileW(wide_path, access, share_mode, sa, create_mode, flags, nil)) if handle == os.INVALID_HANDLE { err := os.Platform_Error(win.GetLastError()) return os.INVALID_HANDLE, err } // Everything past here is custom/not from `os.open`. handle_iocp := win.CreateIoCompletionPort(win.HANDLE(handle), io.iocp, 0, 0) assert(handle_iocp == io.iocp) cmode: byte cmode |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS cmode |= FILE_SKIP_SET_EVENT_ON_HANDLE if !win.SetFileCompletionNotificationModes(win.HANDLE(handle), cmode) { win.CloseHandle(win.HANDLE(handle)) return os.INVALID_HANDLE, os.Platform_Error(win.GetLastError()) } if mode & os.O_APPEND != 0 { _seek(io, handle, 0, .End) } return handle, os.ERROR_NONE } _seek :: proc(io: ^IO, fd: os.Handle, offset: int, whence: Whence) -> (int, os.Errno) { switch whence { case .Set: io.offsets[fd] = u32(offset) case .Curr: io.offsets[fd] += u32(offset) case .End: size: win.LARGE_INTEGER ok := win.GetFileSizeEx(win.HANDLE(fd), &size) if !ok { return 0, os.Platform_Error(win.GetLastError()) } io.offsets[fd] = u32(size) + u32(offset) } return int(io.offsets[fd]), os.ERROR_NONE } _open_socket :: proc( io: ^IO, family: net.Address_Family, protocol: net.Socket_Protocol, ) -> ( socket: net.Any_Socket, err: net.Network_Error, ) { socket, err = net.create_socket(family, protocol) if err != nil do return err = prepare_socket(io, socket) if err != nil do net.close(socket) return } _accept :: proc(io: ^IO, socket: net.TCP_Socket, user: rawptr, callback: On_Accept) -> ^Completion { return submit( io, user, Op_Accept{ callback = callback, socket = win.SOCKET(socket), client = win.INVALID_SOCKET, }, ) } _connect :: proc(io: ^IO, ep: net.Endpoint, user: rawptr, callback: On_Connect) -> (^Completion, net.Network_Error) { if ep.port == 0 { return nil, net.Dial_Error.Port_Required } return submit(io, user, Op_Connect{ callback = callback, addr = endpoint_to_sockaddr(ep), }), nil } _close :: proc(io: ^IO, fd: Closable, user: rawptr, callback: On_Close) -> ^Completion { return submit(io, user, Op_Close{callback = callback, fd = fd}) } _read :: proc( io: ^IO, fd: os.Handle, offset: Maybe(int), buf: []byte, user: rawptr, callback: On_Read, all := false, ) -> ^Completion { return submit(io, user, Op_Read{ callback = callback, fd = fd, offset = offset.? or_else -1, buf = buf, all = all, len = len(buf), }) } _write :: proc( io: ^IO, fd: os.Handle, offset: Maybe(int), buf: []byte, user: rawptr, callback: On_Write, all := false, ) -> ^Completion { return submit(io, user, Op_Write{ callback = callback, fd = fd, offset = offset.? or_else -1, buf = buf, all = all, len = len(buf), }) } _recv :: proc(io: ^IO, socket: net.Any_Socket, buf: []byte, user: rawptr, callback: On_Recv, all := false) -> ^Completion { // TODO: implement UDP. if _, ok := socket.(net.UDP_Socket); ok do unimplemented("nbio.recv with UDP sockets is not yet implemented") return submit( io, user, Op_Recv{ callback = callback, socket = socket, buf = win.WSABUF{len = win.ULONG(len(buf)), buf = raw_data(buf)}, all = all, len = len(buf), }, ) } _send :: proc( io: ^IO, socket: net.Any_Socket, buf: []byte, user: rawptr, callback: On_Sent, endpoint: Maybe(net.Endpoint) = nil, all := false, ) -> ^Completion { // TODO: implement UDP. if _, ok := socket.(net.UDP_Socket); ok do unimplemented("nbio.send with UDP sockets is not yet implemented") return submit( io, user, Op_Send{ callback = callback, socket = socket, buf = win.WSABUF{len = win.ULONG(len(buf)), buf = raw_data(buf)}, all = all, len = len(buf), }, ) } _timeout :: proc(io: ^IO, dur: time.Duration, user: rawptr, callback: On_Timeout) -> ^Completion { completion := pool_get(&io.completion_pool) completion.op = Op_Timeout { callback = callback, expires = time.time_add(time.now(), dur), } completion.user_data = user completion.ctx = context append(&io.timeouts, completion) return completion } _next_tick :: proc(io: ^IO, user: rawptr, callback: On_Next_Tick) -> ^Completion { panic("unimplemented on windows: next_tick") } _poll :: proc(io: ^IO, fd: os.Handle, event: Poll_Event, multi: bool, user: rawptr, callback: On_Poll) -> ^Completion { panic("unimplemented on windows: poll") } _poll_remove :: proc(io: ^IO, fd: os.Handle, event: Poll_Event) -> ^Completion { panic("unimplemented on windows: poll_remove") }