mirror of
https://github.com/bytecodealliance/wasm-micro-runtime.git
synced 2025-05-11 20:21:11 +00:00
Fix wait_info data race for deletion and fix atomic_wait logic (#2016)
Fix a data race for test main_proc_exit_wait.c from #1963. And fix atomic_wait logic that was wrong before: - a thread 1 started executing wasm instruction wasm_atomic_wait but hasn't reached waiting on condition variable - a main thread calls proc_exit and notifies all the threads that reached waiting on condition variable Which leads to thread 1 hang on waiting on condition variable after that Now it's atomically checked whether proc_exit was already called.
This commit is contained in:
parent
578fbc5a55
commit
2de24587a8
|
@ -5,6 +5,9 @@
|
|||
|
||||
#include "bh_log.h"
|
||||
#include "wasm_shared_memory.h"
|
||||
#if WASM_ENABLE_THREAD_MGR != 0
|
||||
#include "../libraries/thread-mgr/thread_manager.h"
|
||||
#endif
|
||||
|
||||
static bh_list shared_memory_list_head;
|
||||
static bh_list *const shared_memory_list = &shared_memory_list_head;
|
||||
|
@ -21,6 +24,8 @@ typedef struct AtomicWaitInfo {
|
|||
korp_mutex wait_list_lock;
|
||||
bh_list wait_list_head;
|
||||
bh_list *wait_list;
|
||||
/* WARNING: insert to the list allowed only in acquire_wait_info
|
||||
otherwise there will be data race as described in PR #2016 */
|
||||
} AtomicWaitInfo;
|
||||
|
||||
typedef struct AtomicWaitNode {
|
||||
|
@ -298,7 +303,7 @@ notify_wait_list(bh_list *wait_list, uint32 count)
|
|||
}
|
||||
|
||||
static AtomicWaitInfo *
|
||||
acquire_wait_info(void *address, bool create)
|
||||
acquire_wait_info(void *address, AtomicWaitNode *wait_node)
|
||||
{
|
||||
AtomicWaitInfo *wait_info = NULL;
|
||||
bh_list_status ret;
|
||||
|
@ -308,7 +313,7 @@ acquire_wait_info(void *address, bool create)
|
|||
if (address)
|
||||
wait_info = (AtomicWaitInfo *)bh_hash_map_find(wait_map, address);
|
||||
|
||||
if (!create) {
|
||||
if (!wait_node) {
|
||||
os_mutex_unlock(&wait_map_lock);
|
||||
return wait_info;
|
||||
}
|
||||
|
@ -336,6 +341,12 @@ acquire_wait_info(void *address, bool create)
|
|||
}
|
||||
}
|
||||
|
||||
os_mutex_lock(&wait_info->wait_list_lock);
|
||||
ret = bh_list_insert(wait_info->wait_list, wait_node);
|
||||
os_mutex_unlock(&wait_info->wait_list_lock);
|
||||
bh_assert(ret == BH_LIST_SUCCESS);
|
||||
(void)ret;
|
||||
|
||||
os_mutex_unlock(&wait_map_lock);
|
||||
|
||||
bh_assert(wait_info);
|
||||
|
@ -376,16 +387,22 @@ destroy_wait_info(void *wait_info)
|
|||
}
|
||||
}
|
||||
|
||||
static bool
|
||||
map_remove_wait_info(HashMap *wait_map_, AtomicWaitInfo *wait_info,
|
||||
void *address)
|
||||
static void
|
||||
map_try_release_wait_info(HashMap *wait_map_, AtomicWaitInfo *wait_info,
|
||||
void *address)
|
||||
{
|
||||
os_mutex_lock(&wait_map_lock);
|
||||
os_mutex_lock(&wait_info->wait_list_lock);
|
||||
if (wait_info->wait_list->len > 0) {
|
||||
return false;
|
||||
os_mutex_unlock(&wait_info->wait_list_lock);
|
||||
os_mutex_unlock(&wait_map_lock);
|
||||
return;
|
||||
}
|
||||
os_mutex_unlock(&wait_info->wait_list_lock);
|
||||
|
||||
bh_hash_map_remove(wait_map_, address, NULL, NULL);
|
||||
return true;
|
||||
os_mutex_unlock(&wait_map_lock);
|
||||
destroy_wait_info(wait_info);
|
||||
}
|
||||
|
||||
uint32
|
||||
|
@ -396,7 +413,8 @@ wasm_runtime_atomic_wait(WASMModuleInstanceCommon *module, void *address,
|
|||
AtomicWaitInfo *wait_info;
|
||||
AtomicWaitNode *wait_node;
|
||||
WASMSharedMemNode *node;
|
||||
bool check_ret, is_timeout, no_wait, removed_from_map;
|
||||
WASMExecEnv *exec_env;
|
||||
bool check_ret, is_timeout, no_wait;
|
||||
|
||||
bh_assert(module->module_type == Wasm_Module_Bytecode
|
||||
|| module->module_type == Wasm_Module_AoT);
|
||||
|
@ -418,14 +436,6 @@ wasm_runtime_atomic_wait(WASMModuleInstanceCommon *module, void *address,
|
|||
return -1;
|
||||
}
|
||||
|
||||
/* acquire the wait info, create new one if not exists */
|
||||
wait_info = acquire_wait_info(address, true);
|
||||
|
||||
if (!wait_info) {
|
||||
wasm_runtime_set_exception(module, "failed to acquire wait_info");
|
||||
return -1;
|
||||
}
|
||||
|
||||
node = search_module((WASMModuleCommon *)module_inst->module);
|
||||
os_mutex_lock(&node->shared_mem_lock);
|
||||
no_wait = (!wait64 && *(uint32 *)address != (uint32)expect)
|
||||
|
@ -435,40 +445,59 @@ wasm_runtime_atomic_wait(WASMModuleInstanceCommon *module, void *address,
|
|||
if (no_wait) {
|
||||
return 1;
|
||||
}
|
||||
else {
|
||||
bh_list_status ret;
|
||||
|
||||
if (!(wait_node = wasm_runtime_malloc(sizeof(AtomicWaitNode)))) {
|
||||
wasm_runtime_set_exception(module, "failed to create wait node");
|
||||
return -1;
|
||||
}
|
||||
memset(wait_node, 0, sizeof(AtomicWaitNode));
|
||||
|
||||
if (0 != os_mutex_init(&wait_node->wait_lock)) {
|
||||
wasm_runtime_free(wait_node);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (0 != os_cond_init(&wait_node->wait_cond)) {
|
||||
os_mutex_destroy(&wait_node->wait_lock);
|
||||
wasm_runtime_free(wait_node);
|
||||
return -1;
|
||||
}
|
||||
|
||||
wait_node->status = S_WAITING;
|
||||
os_mutex_lock(&wait_info->wait_list_lock);
|
||||
ret = bh_list_insert(wait_info->wait_list, wait_node);
|
||||
os_mutex_unlock(&wait_info->wait_list_lock);
|
||||
bh_assert(ret == BH_LIST_SUCCESS);
|
||||
(void)ret;
|
||||
if (!(wait_node = wasm_runtime_malloc(sizeof(AtomicWaitNode)))) {
|
||||
wasm_runtime_set_exception(module, "failed to create wait node");
|
||||
return -1;
|
||||
}
|
||||
memset(wait_node, 0, sizeof(AtomicWaitNode));
|
||||
|
||||
if (0 != os_mutex_init(&wait_node->wait_lock)) {
|
||||
wasm_runtime_free(wait_node);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (0 != os_cond_init(&wait_node->wait_cond)) {
|
||||
os_mutex_destroy(&wait_node->wait_lock);
|
||||
wasm_runtime_free(wait_node);
|
||||
return -1;
|
||||
}
|
||||
|
||||
wait_node->status = S_WAITING;
|
||||
|
||||
/* acquire the wait info, create new one if not exists */
|
||||
wait_info = acquire_wait_info(address, wait_node);
|
||||
|
||||
if (!wait_info) {
|
||||
os_mutex_destroy(&wait_node->wait_lock);
|
||||
wasm_runtime_free(wait_node);
|
||||
wasm_runtime_set_exception(module, "failed to acquire wait_info");
|
||||
return -1;
|
||||
}
|
||||
|
||||
#if WASM_ENABLE_THREAD_MGR != 0
|
||||
exec_env =
|
||||
wasm_clusters_search_exec_env((WASMModuleInstanceCommon *)module_inst);
|
||||
bh_assert(exec_env);
|
||||
#endif
|
||||
|
||||
os_mutex_lock(&node->shared_mem_lock);
|
||||
no_wait = (!wait64 && *(uint32 *)address != (uint32)expect)
|
||||
|| (wait64 && *(uint64 *)address != expect);
|
||||
os_mutex_unlock(&node->shared_mem_lock);
|
||||
|
||||
/* condition wait start */
|
||||
os_mutex_lock(&wait_node->wait_lock);
|
||||
|
||||
os_cond_reltimedwait(&wait_node->wait_cond, &wait_node->wait_lock,
|
||||
timeout < 0 ? BHT_WAIT_FOREVER
|
||||
: (uint64)timeout / 1000);
|
||||
if (!no_wait
|
||||
#if WASM_ENABLE_THREAD_MGR != 0
|
||||
&& !wasm_cluster_is_thread_terminated(exec_env)
|
||||
#endif
|
||||
) {
|
||||
os_cond_reltimedwait(&wait_node->wait_cond, &wait_node->wait_lock,
|
||||
timeout < 0 ? BHT_WAIT_FOREVER
|
||||
: (uint64)timeout / 1000);
|
||||
}
|
||||
|
||||
is_timeout = wait_node->status == S_WAITING ? true : false;
|
||||
os_mutex_unlock(&wait_node->wait_lock);
|
||||
|
@ -486,14 +515,12 @@ wasm_runtime_atomic_wait(WASMModuleInstanceCommon *module, void *address,
|
|||
wasm_runtime_free(wait_node);
|
||||
|
||||
/* Release wait info if no wait nodes attached */
|
||||
removed_from_map = map_remove_wait_info(wait_map, wait_info, address);
|
||||
os_mutex_unlock(&wait_info->wait_list_lock);
|
||||
if (removed_from_map)
|
||||
destroy_wait_info(wait_info);
|
||||
map_try_release_wait_info(wait_map, wait_info, address);
|
||||
os_mutex_unlock(&node->shared_mem_lock);
|
||||
|
||||
(void)check_ret;
|
||||
return is_timeout ? 2 : 0;
|
||||
return no_wait ? 1 : is_timeout ? 2 : 0;
|
||||
}
|
||||
|
||||
uint32
|
||||
|
@ -523,7 +550,7 @@ wasm_runtime_atomic_notify(WASMModuleInstanceCommon *module, void *address,
|
|||
return -1;
|
||||
}
|
||||
|
||||
wait_info = acquire_wait_info(address, false);
|
||||
wait_info = acquire_wait_info(address, NULL);
|
||||
|
||||
/* Nobody wait on this address */
|
||||
if (!wait_info) {
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
#define _WASM_SHARED_MEMORY_H
|
||||
|
||||
#include "bh_common.h"
|
||||
#include "wasm_exec_env.h"
|
||||
#if WASM_ENABLE_INTERP != 0
|
||||
#include "wasm_runtime.h"
|
||||
#endif
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
#include <pthread.h>
|
||||
#include <stdbool.h>
|
||||
#include <unistd.h>
|
||||
#include <limits.h>
|
||||
|
||||
#include "wasi_thread_start.h"
|
||||
|
||||
|
@ -23,7 +24,6 @@ static bool termination_by_trap;
|
|||
static bool termination_in_main_thread;
|
||||
static blocking_task_type_t blocking_task_type;
|
||||
|
||||
#define TIMEOUT_SECONDS 10ll
|
||||
#define NUM_THREADS 3
|
||||
static pthread_barrier_t barrier;
|
||||
|
||||
|
@ -36,15 +36,14 @@ void
|
|||
run_long_task()
|
||||
{
|
||||
if (blocking_task_type == BLOCKING_TASK_BUSY_WAIT) {
|
||||
for (int i = 0; i < TIMEOUT_SECONDS; i++)
|
||||
sleep(1);
|
||||
for (;;) {
|
||||
}
|
||||
}
|
||||
else if (blocking_task_type == BLOCKING_TASK_ATOMIC_WAIT) {
|
||||
__builtin_wasm_memory_atomic_wait32(
|
||||
0, 0, TIMEOUT_SECONDS * 1000 * 1000 * 1000);
|
||||
__builtin_wasm_memory_atomic_wait32(0, 0, -1);
|
||||
}
|
||||
else {
|
||||
sleep(TIMEOUT_SECONDS);
|
||||
sleep(UINT_MAX);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user