mirror of
https://github.com/bytecodealliance/wasm-micro-runtime.git
synced 2025-02-06 15:05:19 +00:00
257 lines
5.3 KiB
C
257 lines
5.3 KiB
C
/*
|
|
* Copyright (C) 2019 Intel Corporation. All rights reserved.
|
|
* SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
|
|
*/
|
|
|
|
#include "bh_queue.h"
|
|
|
|
typedef struct bh_queue_node {
|
|
struct bh_queue_node *next;
|
|
struct bh_queue_node *prev;
|
|
unsigned short tag;
|
|
unsigned int len;
|
|
void *body;
|
|
bh_msg_cleaner msg_cleaner;
|
|
} bh_queue_node;
|
|
|
|
struct bh_queue {
|
|
bh_queue_mutex queue_lock;
|
|
bh_queue_cond queue_wait_cond;
|
|
unsigned int cnt;
|
|
unsigned int max;
|
|
unsigned int drops;
|
|
bh_queue_node *head;
|
|
bh_queue_node *tail;
|
|
|
|
bool exit_loop_run;
|
|
};
|
|
|
|
char *
|
|
bh_message_payload(bh_message_t message)
|
|
{
|
|
return message->body;
|
|
}
|
|
|
|
uint32
|
|
bh_message_payload_len(bh_message_t message)
|
|
{
|
|
return message->len;
|
|
}
|
|
|
|
int
|
|
bh_message_type(bh_message_t message)
|
|
{
|
|
return message->tag;
|
|
}
|
|
|
|
bh_queue *
|
|
bh_queue_create()
|
|
{
|
|
int ret;
|
|
bh_queue *queue = bh_queue_malloc(sizeof(bh_queue));
|
|
|
|
if (queue) {
|
|
memset(queue, 0, sizeof(bh_queue));
|
|
queue->max = DEFAULT_QUEUE_LENGTH;
|
|
|
|
ret = bh_queue_mutex_init(&queue->queue_lock);
|
|
if (ret != 0) {
|
|
bh_queue_free(queue);
|
|
return NULL;
|
|
}
|
|
|
|
ret = bh_queue_cond_init(&queue->queue_wait_cond);
|
|
if (ret != 0) {
|
|
bh_queue_mutex_destroy(&queue->queue_lock);
|
|
bh_queue_free(queue);
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
return queue;
|
|
}
|
|
|
|
void
|
|
bh_queue_destroy(bh_queue *queue)
|
|
{
|
|
bh_queue_node *node;
|
|
|
|
if (!queue)
|
|
return;
|
|
|
|
bh_queue_mutex_lock(&queue->queue_lock);
|
|
while (queue->head) {
|
|
node = queue->head;
|
|
queue->head = node->next;
|
|
|
|
bh_free_msg(node);
|
|
}
|
|
bh_queue_mutex_unlock(&queue->queue_lock);
|
|
|
|
bh_queue_cond_destroy(&queue->queue_wait_cond);
|
|
bh_queue_mutex_destroy(&queue->queue_lock);
|
|
bh_queue_free(queue);
|
|
}
|
|
|
|
bool
|
|
bh_post_msg2(bh_queue *queue, bh_queue_node *msg)
|
|
{
|
|
if (queue->cnt >= queue->max) {
|
|
queue->drops++;
|
|
bh_free_msg(msg);
|
|
return false;
|
|
}
|
|
|
|
bh_queue_mutex_lock(&queue->queue_lock);
|
|
|
|
if (queue->cnt == 0) {
|
|
bh_assert(queue->head == NULL);
|
|
bh_assert(queue->tail == NULL);
|
|
queue->head = queue->tail = msg;
|
|
msg->next = msg->prev = NULL;
|
|
queue->cnt = 1;
|
|
|
|
bh_queue_cond_signal(&queue->queue_wait_cond);
|
|
}
|
|
else {
|
|
msg->next = NULL;
|
|
msg->prev = queue->tail;
|
|
queue->tail->next = msg;
|
|
queue->tail = msg;
|
|
queue->cnt++;
|
|
}
|
|
|
|
bh_queue_mutex_unlock(&queue->queue_lock);
|
|
|
|
return true;
|
|
}
|
|
|
|
bool
|
|
bh_post_msg(bh_queue *queue, unsigned short tag, void *body, unsigned int len)
|
|
{
|
|
bh_queue_node *msg = bh_new_msg(tag, body, len, NULL);
|
|
if (msg == NULL) {
|
|
queue->drops++;
|
|
if (len != 0 && body)
|
|
BH_FREE(body);
|
|
return false;
|
|
}
|
|
|
|
if (!bh_post_msg2(queue, msg)) {
|
|
// bh_post_msg2 already freed the msg for failure
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bh_queue_node *
|
|
bh_new_msg(unsigned short tag, void *body, unsigned int len, void *handler)
|
|
{
|
|
bh_queue_node *msg =
|
|
(bh_queue_node *)bh_queue_malloc(sizeof(bh_queue_node));
|
|
if (msg == NULL)
|
|
return NULL;
|
|
memset(msg, 0, sizeof(bh_queue_node));
|
|
msg->len = len;
|
|
msg->body = body;
|
|
msg->tag = tag;
|
|
msg->msg_cleaner = (bh_msg_cleaner)handler;
|
|
|
|
return msg;
|
|
}
|
|
|
|
void
|
|
bh_free_msg(bh_queue_node *msg)
|
|
{
|
|
if (msg->msg_cleaner) {
|
|
msg->msg_cleaner(msg->body);
|
|
bh_queue_free(msg);
|
|
return;
|
|
}
|
|
|
|
// note: sometime we just use the payload pointer for a integer value
|
|
// len!=0 is the only indicator about the body is an allocated buffer.
|
|
if (msg->body && msg->len)
|
|
bh_queue_free(msg->body);
|
|
|
|
bh_queue_free(msg);
|
|
}
|
|
|
|
bh_message_t
|
|
bh_get_msg(bh_queue *queue, uint64 timeout_us)
|
|
{
|
|
bh_queue_node *msg = NULL;
|
|
bh_queue_mutex_lock(&queue->queue_lock);
|
|
|
|
if (queue->cnt == 0) {
|
|
bh_assert(queue->head == NULL);
|
|
bh_assert(queue->tail == NULL);
|
|
|
|
if (timeout_us == 0) {
|
|
bh_queue_mutex_unlock(&queue->queue_lock);
|
|
return NULL;
|
|
}
|
|
|
|
bh_queue_cond_timedwait(&queue->queue_wait_cond, &queue->queue_lock,
|
|
timeout_us);
|
|
}
|
|
|
|
if (queue->cnt == 0) {
|
|
bh_assert(queue->head == NULL);
|
|
bh_assert(queue->tail == NULL);
|
|
}
|
|
else if (queue->cnt == 1) {
|
|
bh_assert(queue->head == queue->tail);
|
|
|
|
msg = queue->head;
|
|
queue->head = queue->tail = NULL;
|
|
queue->cnt = 0;
|
|
}
|
|
else {
|
|
msg = queue->head;
|
|
queue->head = queue->head->next;
|
|
queue->head->prev = NULL;
|
|
queue->cnt--;
|
|
}
|
|
|
|
bh_queue_mutex_unlock(&queue->queue_lock);
|
|
|
|
return msg;
|
|
}
|
|
|
|
unsigned
|
|
bh_queue_get_message_count(bh_queue *queue)
|
|
{
|
|
if (!queue)
|
|
return 0;
|
|
|
|
return queue->cnt;
|
|
}
|
|
|
|
void
|
|
bh_queue_enter_loop_run(bh_queue *queue, bh_queue_handle_msg_callback handle_cb,
|
|
void *arg)
|
|
{
|
|
if (!queue)
|
|
return;
|
|
|
|
while (!queue->exit_loop_run) {
|
|
bh_queue_node *message = bh_get_msg(queue, BHT_WAIT_FOREVER);
|
|
|
|
if (message) {
|
|
handle_cb(message, arg);
|
|
bh_free_msg(message);
|
|
}
|
|
}
|
|
}
|
|
|
|
void
|
|
bh_queue_exit_loop_run(bh_queue *queue)
|
|
{
|
|
if (queue) {
|
|
queue->exit_loop_run = true;
|
|
bh_queue_cond_signal(&queue->queue_wait_cond);
|
|
}
|
|
}
|