From 6264b6d43cf999a2899731abadf124672b7bf6af Mon Sep 17 00:00:00 2001 From: PabloMK7 Date: Mon, 9 Oct 2023 23:59:08 +0200 Subject: [PATCH] Use RunAsync in multiple socket operations (#7053) * Use RunAsync in multiple socket operations * EOF newline * Fix linux compilation * Fix compilation on macos --- src/core/hle/service/soc/soc_u.cpp | 327 ++++++++++++++++++----------- 1 file changed, 208 insertions(+), 119 deletions(-) diff --git a/src/core/hle/service/soc/soc_u.cpp b/src/core/hle/service/soc/soc_u.cpp index 031320eb26..99dfb938bf 100644 --- a/src/core/hle/service/soc/soc_u.cpp +++ b/src/core/hle/service/soc/soc_u.cpp @@ -1093,59 +1093,88 @@ void SOC_U::RecvFromOther(Kernel::HLERequestContext& ctx) { #endif // _WIN32 u32 addr_len = rp.Pop(); rp.PopPID(); - auto& buffer = rp.PopMappedBuffer(); - CTRSockAddr ctr_src_addr; - std::vector output_buff(len); - std::vector addr_buff(addr_len); - sockaddr src_addr; - socklen_t src_addr_len = sizeof(src_addr); - - s32 ret = -1; - if (GetSocketBlocking(fd_info->second) && !dont_wait) { - PreTimerAdjust(); - } - - if (addr_len > 0) { - ret = static_cast(::recvfrom(fd_info->second.socket_fd, - reinterpret_cast(output_buff.data()), len, flags, - &src_addr, &src_addr_len)); - if (ret >= 0 && src_addr_len > 0) { - ctr_src_addr = CTRSockAddr::FromPlatform(src_addr); - std::memcpy(addr_buff.data(), &ctr_src_addr, addr_len); - } - } else { - ret = static_cast(::recvfrom(fd_info->second.socket_fd, - reinterpret_cast(output_buff.data()), len, flags, - NULL, 0)); - addr_buff.resize(0); - } - int recv_error = (ret == SOCKET_ERROR_VALUE) ? GET_ERRNO : 0; - if (GetSocketBlocking(fd_info->second) && !dont_wait) { - PostTimerAdjust(ctx, "RecvFromOther"); - } + bool needs_async = GetSocketBlocking(fd_info->second) && !dont_wait; + struct AsyncData { + // Input + u32 len{}; + u32 flags{}; + u32 addr_len{}; + SocketHolder* fd_info; #ifdef _WIN32 - if (dont_wait && was_blocking) { - SetSocketBlocking(fd_info->second, true); - } + bool dont_wait; + bool was_blocking; #endif - if (ret == SOCKET_ERROR_VALUE) { - ret = TranslateError(recv_error); - } else { - buffer.Write(output_buff.data(), 0, ret); - } - IPC::RequestBuilder rb = rp.MakeBuilder(2, 4); - rb.Push(RESULT_SUCCESS); - rb.Push(ret); - rb.PushStaticBuffer(std::move(addr_buff), 0); - rb.PushMappedBuffer(buffer); + // Output + s32 ret{}; + int recv_error; + Kernel::MappedBuffer* buffer; + std::vector output_buff; + std::vector addr_buff; + }; + + auto async_data = std::make_shared(); + async_data->buffer = &rp.PopMappedBuffer(); + async_data->ret = -1; + async_data->len = len; + async_data->flags = flags; + async_data->addr_len = addr_len; + async_data->output_buff.resize(len); + async_data->addr_buff.resize(addr_len); + async_data->fd_info = &fd_info->second; +#ifdef _WIN32 + async_data->dont_wait = dont_wait; + async_data->was_blocking = was_blocking; +#endif + + ctx.RunAsync( + [async_data](Kernel::HLERequestContext& ctx) { + sockaddr src_addr; + socklen_t src_addr_len = sizeof(src_addr); + CTRSockAddr ctr_src_addr; + if (async_data->addr_len > 0) { + async_data->ret = static_cast( + ::recvfrom(async_data->fd_info->socket_fd, + reinterpret_cast(async_data->output_buff.data()), + async_data->len, async_data->flags, &src_addr, &src_addr_len)); + if (async_data->ret >= 0 && src_addr_len > 0) { + ctr_src_addr = CTRSockAddr::FromPlatform(src_addr); + std::memcpy(async_data->addr_buff.data(), &ctr_src_addr, async_data->addr_len); + } + } else { + async_data->ret = static_cast( + ::recvfrom(async_data->fd_info->socket_fd, + reinterpret_cast(async_data->output_buff.data()), + async_data->len, async_data->flags, NULL, 0)); + async_data->addr_buff.resize(0); + } + async_data->recv_error = (async_data->ret == SOCKET_ERROR_VALUE) ? GET_ERRNO : 0; + return 0; + }, + [this, async_data](Kernel::HLERequestContext& ctx) { + if (async_data->ret == SOCKET_ERROR_VALUE) { + async_data->ret = TranslateError(async_data->recv_error); + } else { + async_data->buffer->Write(async_data->output_buff.data(), 0, async_data->ret); + } +#ifdef _WIN32 + if (async_data->dont_wait && async_data->was_blocking) { + SetSocketBlocking(*async_data->fd_info, true); + } +#else + (void)this; +#endif + IPC::RequestBuilder rb(ctx, 0x07, 2, 4); + rb.Push(RESULT_SUCCESS); + rb.Push(async_data->ret); + rb.PushStaticBuffer(std::move(async_data->addr_buff), 0); + rb.PushMappedBuffer(*async_data->buffer); + }, + needs_async); } void SOC_U::RecvFrom(Kernel::HLERequestContext& ctx) { - // TODO(Subv): Calling this function on a blocking socket will block the emu thread, - // preventing graceful shutdown when closing the emulator, this can be fixed by always - // performing nonblocking operations and spinlock until the data is available IPC::RequestParser rp(ctx); u32 socket_handle = rp.Pop(); auto fd_info = open_sockets.find(socket_handle); @@ -1172,55 +1201,89 @@ void SOC_U::RecvFrom(Kernel::HLERequestContext& ctx) { u32 addr_len = rp.Pop(); rp.PopPID(); - CTRSockAddr ctr_src_addr; - std::vector output_buff(len); - std::vector addr_buff(addr_len); - sockaddr src_addr; - socklen_t src_addr_len = sizeof(src_addr); - - s32 ret = -1; - if (GetSocketBlocking(fd_info->second) && !dont_wait) { - PreTimerAdjust(); - } - if (addr_len > 0) { - // Only get src adr if input adr available - ret = static_cast(::recvfrom(fd_info->second.socket_fd, - reinterpret_cast(output_buff.data()), len, flags, - &src_addr, &src_addr_len)); - if (ret >= 0 && src_addr_len > 0) { - ctr_src_addr = CTRSockAddr::FromPlatform(src_addr); - std::memcpy(addr_buff.data(), &ctr_src_addr, addr_len); - } - } else { - ret = static_cast(::recvfrom(fd_info->second.socket_fd, - reinterpret_cast(output_buff.data()), len, flags, - NULL, 0)); - addr_buff.resize(0); - } - int recv_error = (ret == SOCKET_ERROR_VALUE) ? GET_ERRNO : 0; - if (GetSocketBlocking(fd_info->second) && !dont_wait) { - PostTimerAdjust(ctx, "RecvFrom"); - } + bool needs_async = GetSocketBlocking(fd_info->second) && !dont_wait; + struct AsyncData { + // Input + u32 len{}; + u32 flags{}; + u32 addr_len{}; + SocketHolder* fd_info; #ifdef _WIN32 - if (dont_wait && was_blocking) { - SetSocketBlocking(fd_info->second, true); - } + bool dont_wait; + bool was_blocking; #endif - s32 total_received = ret; - if (ret == SOCKET_ERROR_VALUE) { - ret = TranslateError(recv_error); - total_received = 0; - } - // Write only the data we received to avoid overwriting parts of the buffer with zeros - output_buff.resize(total_received); + // Output + s32 ret{}; + int recv_error; + std::vector output_buff; + std::vector addr_buff; + }; - IPC::RequestBuilder rb = rp.MakeBuilder(3, 4); - rb.Push(RESULT_SUCCESS); - rb.Push(ret); - rb.Push(total_received); - rb.PushStaticBuffer(std::move(output_buff), 0); - rb.PushStaticBuffer(std::move(addr_buff), 1); + auto async_data = std::make_shared(); + async_data->ret = -1; + async_data->len = len; + async_data->flags = flags; + async_data->addr_len = addr_len; + async_data->output_buff.resize(len); + async_data->addr_buff.resize(addr_len); + async_data->fd_info = &fd_info->second; +#ifdef _WIN32 + async_data->dont_wait = dont_wait; + async_data->was_blocking = was_blocking; +#endif + + ctx.RunAsync( + [async_data](Kernel::HLERequestContext& ctx) { + sockaddr src_addr; + socklen_t src_addr_len = sizeof(src_addr); + CTRSockAddr ctr_src_addr; + if (async_data->addr_len > 0) { + // Only get src adr if input adr available + async_data->ret = static_cast( + ::recvfrom(async_data->fd_info->socket_fd, + reinterpret_cast(async_data->output_buff.data()), + async_data->len, async_data->flags, &src_addr, &src_addr_len)); + if (async_data->ret >= 0 && src_addr_len > 0) { + ctr_src_addr = CTRSockAddr::FromPlatform(src_addr); + std::memcpy(async_data->addr_buff.data(), &ctr_src_addr, async_data->addr_len); + } + } else { + async_data->ret = static_cast( + ::recvfrom(async_data->fd_info->socket_fd, + reinterpret_cast(async_data->output_buff.data()), + async_data->len, async_data->flags, NULL, 0)); + async_data->addr_buff.resize(0); + } + async_data->recv_error = (async_data->ret == SOCKET_ERROR_VALUE) ? GET_ERRNO : 0; + return 0; + }, + [this, async_data](Kernel::HLERequestContext& ctx) { + +#ifdef _WIN32 + if (async_data->dont_wait && async_data->was_blocking) { + SetSocketBlocking(*async_data->fd_info, true); + } +#else + (void)this; +#endif + s32 total_received = async_data->ret; + if (async_data->ret == SOCKET_ERROR_VALUE) { + async_data->ret = TranslateError(async_data->recv_error); + total_received = 0; + } + + // Write only the data we received to avoid overwriting parts of the buffer with zeros + async_data->output_buff.resize(total_received); + + IPC::RequestBuilder rb(ctx, 0x08, 3, 4); + rb.Push(RESULT_SUCCESS); + rb.Push(async_data->ret); + rb.Push(total_received); + rb.PushStaticBuffer(std::move(async_data->output_buff), 0); + rb.PushStaticBuffer(std::move(async_data->addr_buff), 1); + }, + needs_async); } void SOC_U::Poll(Kernel::HLERequestContext& ctx) { @@ -1230,45 +1293,71 @@ void SOC_U::Poll(Kernel::HLERequestContext& ctx) { rp.PopPID(); auto input_fds = rp.PopStaticBuffer(); - std::vector ctr_fds(nfds); - std::memcpy(ctr_fds.data(), input_fds.data(), nfds * sizeof(CTRPollFD)); + struct AsyncData { + // Input + s32 timeout; + u32 nfds; + + // Input/Output + std::vector platform_pollfd; + std::vector has_libctru_bug; + std::vector ctr_fds; + + // Output + s32 ret; + int poll_error; + }; + auto async_data = std::make_shared(); + async_data->timeout = timeout; + async_data->nfds = nfds; + + async_data->ctr_fds.resize(nfds); + std::memcpy(async_data->ctr_fds.data(), input_fds.data(), nfds * sizeof(CTRPollFD)); // The 3ds_pollfd and the pollfd structures may be different (Windows/Linux have different // sizes) // so we have to copy the data in order - std::vector platform_pollfd(nfds); - std::vector has_libctru_bug(nfds, false); + async_data->platform_pollfd.resize(nfds); + async_data->has_libctru_bug.resize(nfds, false); for (u32 i = 0; i < nfds; i++) { - platform_pollfd[i] = CTRPollFD::ToPlatform(*this, ctr_fds[i], has_libctru_bug[i]); + async_data->platform_pollfd[i] = + CTRPollFD::ToPlatform(*this, async_data->ctr_fds[i], async_data->has_libctru_bug[i]); } - if (timeout) { - PreTimerAdjust(); - } - s32 ret = ::poll(platform_pollfd.data(), nfds, timeout); - if (timeout) { - PostTimerAdjust(ctx, "Poll"); - } + ctx.RunAsync( + [async_data](Kernel::HLERequestContext& ctx) { + async_data->ret = + ::poll(async_data->platform_pollfd.data(), async_data->nfds, async_data->timeout); + if (async_data->ret == SOCKET_ERROR_VALUE) { + async_data->poll_error = GET_ERRNO; + } + return 0; + }, + [this, async_data](Kernel::HLERequestContext& ctx) { + // Now update the output 3ds_pollfd structure + for (u32 i = 0; i < async_data->nfds; i++) { + async_data->ctr_fds[i] = CTRPollFD::FromPlatform( + *this, async_data->platform_pollfd[i], async_data->has_libctru_bug[i]); + } - // Now update the output 3ds_pollfd structure - for (u32 i = 0; i < nfds; i++) { - ctr_fds[i] = CTRPollFD::FromPlatform(*this, platform_pollfd[i], has_libctru_bug[i]); - } + std::vector output_fds(async_data->nfds * sizeof(CTRPollFD)); + std::memcpy(output_fds.data(), async_data->ctr_fds.data(), + async_data->nfds * sizeof(CTRPollFD)); - std::vector output_fds(nfds * sizeof(CTRPollFD)); - std::memcpy(output_fds.data(), ctr_fds.data(), nfds * sizeof(CTRPollFD)); + if (async_data->ret == SOCKET_ERROR_VALUE) { + int err = async_data->poll_error; + LOG_DEBUG(Service_SOC, "Socket error: {}", err); - if (ret == SOCKET_ERROR_VALUE) { - int err = GET_ERRNO; - LOG_ERROR(Service_SOC, "Socket error: {}", err); + async_data->ret = TranslateError(GET_ERRNO); + } - ret = TranslateError(GET_ERRNO); - } - - IPC::RequestBuilder rb = rp.MakeBuilder(2, 2); - rb.Push(RESULT_SUCCESS); - rb.Push(ret); - rb.PushStaticBuffer(std::move(output_fds), 0); + IPC::RequestBuilder rb(ctx, static_cast(ctx.CommandHeader().command_id.Value()), 2, + 2); + rb.Push(RESULT_SUCCESS); + rb.Push(async_data->ret); + rb.PushStaticBuffer(std::move(output_fds), 0); + }, + timeout != 0); } void SOC_U::GetSockName(Kernel::HLERequestContext& ctx) {