添加服务端并发支持,还未测试

This commit is contained in:
zhangzheng
2024-01-01 22:57:49 +08:00
parent 0bc4a32d6f
commit 1e8d407664
16 changed files with 280 additions and 146 deletions

View File

@@ -33,6 +33,62 @@ typedef struct ipc_wait_bind_entry
slist_head_t node;
thread_t *th;
} ipc_wait_bind_entry_t;
int ipc_bind(ipc_t *ipc, obj_handler_t th_hd, umword_t user_id, thread_t *th_kobj)
{
int ret = -EINVAL;
task_t *cur_task = thread_get_current_task();
/*TODO:原子操作,绑定其他线程不一定是当前线程*/
if (ipc->svr_th == NULL)
{
mword_t status = spinlock_lock(&cur_task->kobj.lock); //!< 锁住当前的task
if (status < 0)
{
return -EACCES;
}
ref_counter_inc(&cur_task->ref_cn); //!< task引用计数+1
thread_t *recv_kobj;
if (!th_kobj)
{
recv_kobj = (thread_t *)obj_space_lookup_kobj_cmp_type(&cur_task->obj_space, th_hd, THREAD_TYPE);
}
else
{
recv_kobj = th_kobj;
}
if (!recv_kobj)
{
ret = -ENOENT;
goto end_bind;
}
ref_counter_inc(&recv_kobj->ref); //!< 绑定后线程的引用计数+1防止被删除
ipc->svr_th = recv_kobj;
ipc->user_id = user_id;
ipc_wait_bind_entry_t *pos;
slist_foreach_not_next(pos, &ipc->wait_bind, node) //!< 唤醒所有等待绑定的线程
{
ipc_wait_bind_entry_t *next = slist_next_entry(pos, &ipc->wait_bind, node);
assert(pos->th->status == THREAD_SUSPEND);
slist_del(&next->node);
thread_ready(pos->th, TRUE);
pos = next;
}
ret = 0;
end_bind:
//!< 先解锁然后在给task的引用计数-1
spinlock_set(&cur_task->kobj.lock, status);
ref_counter_dec_and_release(&cur_task->ref_cn, &cur_task->kobj);
}
else
{
ret = -ECANCELED;
}
return ret;
}
/**
* @brief ipc的系统调用
*
@@ -66,48 +122,8 @@ static void ipc_syscall(kobject_t *kobj, syscall_prot_t sys_p, msg_tag_t in_tag,
tag = msg_tag_init4(0, 0, 0, -EPROTO);
break;
}
/*TODO:原子操作,绑定其他线程不一定是当前线程*/
if (ipc->svr_th == NULL)
{
mword_t status = spinlock_lock(&cur_task->kobj.lock); //!< 锁住当前的task
if (status < 0)
{
tag = msg_tag_init4(0, 0, 0, -EACCES);
break;
}
ref_counter_inc(&cur_task->ref_cn); //!< task引用计数+1
thread_t *recv_kobj = (thread_t *)obj_space_lookup_kobj_cmp_type(&cur_task->obj_space, f->r[0], THREAD_TYPE);
if (!recv_kobj)
{
ret = -ENOENT;
goto end_bind;
}
ref_counter_inc(&recv_kobj->ref); //!< 绑定后线程的引用计数+1防止被删除
ipc->svr_th = recv_kobj;
ipc->user_id = f->r[1];
ipc_wait_bind_entry_t *pos;
slist_foreach_not_next(pos, &ipc->wait_bind, node) //!< 唤醒所有等待绑定的线程
{
ipc_wait_bind_entry_t *next = slist_next_entry(pos, &ipc->wait_bind, node);
assert(pos->th->status == THREAD_SUSPEND);
slist_del(&next->node);
thread_ready(pos->th, TRUE);
pos = next;
}
ret = 0;
end_bind:
//!< 先解锁然后在给task的引用计数-1
spinlock_set(&cur_task->kobj.lock, status);
ref_counter_dec_and_release(&cur_task->ref_cn, &cur_task->kobj);
tag = msg_tag_init4(0, 0, 0, ret);
}
else
{
tag = msg_tag_init4(0, 0, 0, -ECANCELED);
}
ret = ipc_bind(ipc, f->r[0], f->r[1], NULL);
tag = msg_tag_init4(0, 0, 0, ret);
}
break;
case IPC_DO:
@@ -177,12 +193,21 @@ static void ipc_release_stage2(kobject_t *kobj)
mm_limit_free(ipc->lim, kobj);
printk("ipc release 0x%x\n", kobj);
}
static bool_t ipc_put(kobject_t *kobj)
{
ipc_t *ipc = container_of(kobj, ipc_t, kobj);
return ref_counter_dec(&ipc->ref) == 1;
}
static void ipc_init(ipc_t *ipc, ram_limit_t *lim)
{
kobject_init(&ipc->kobj, IPC_TYPE);
slist_init(&ipc->wait_bind);
spinlock_init(&ipc->lock);
ref_counter_init(&ipc->ref);
ref_counter_inc(&ipc->ref);
ipc->lim = lim;
ipc->kobj.put_func = ipc_put;
ipc->kobj.invoke_func = ipc_syscall;
ipc->kobj.stage_1_func = ipc_release_stage1;
ipc->kobj.stage_2_func = ipc_release_stage2;

View File

@@ -25,7 +25,7 @@
#include "err.h"
#include "map.h"
#include "access.h"
#include "ipc.h"
enum thread_op
{
SET_EXEC_REGS,
@@ -423,13 +423,13 @@ static int ipc_data_copy(thread_t *dst_th, thread_t *src_th, msg_tag_t tag)
dst_th->msg.tag = tag;
return 0;
}
/**
* @brief 当前线程接收数据
*
* @return int
*/
static int thread_ipc_recv(msg_tag_t *ret_msg, ipc_timeout_t timeout, umword_t *ret_user_id)
static int thread_ipc_recv(msg_tag_t *ret_msg, ipc_timeout_t timeout,
umword_t *ret_user_id, ipc_t *ipc_kobj)
{
int ret = 0;
assert(ret_msg);
@@ -458,11 +458,22 @@ static int thread_ipc_recv(msg_tag_t *ret_msg, ipc_timeout_t timeout, umword_t *
//!< 加入等待队列
if (timeout.recv_timeout)
{
thread_wait_entry_init(&wait, cur_th, timeout.recv_timeout);
slist_add_append(&wait_recv_queue, &wait.node); //!< 放到等待队列中
}
}
if (ipc_kobj)
{
/*IPC对象的引用计数+1*/
ref_counter_inc(&ipc_kobj->ref);
cur_th->ipc_kobj = &ipc_kobj->kobj;
}
else
{
cur_th->ipc_kobj = NULL;
}
thread_suspend(cur_th); //!< 挂起
preemption(); //!< 进行调度
if (cur_th->ipc_status == THREAD_IPC_ABORT)
@@ -533,70 +544,11 @@ static int thread_ipc_reply(msg_tag_t in_tag)
cpulock_set(status);
return ret;
}
static int thread_ipc_send(thread_t *to_th, msg_tag_t in_tag, ipc_timeout_t timout)
int thread_ipc_call(thread_t *to_th, msg_tag_t in_tag, msg_tag_t *ret_tag,
ipc_timeout_t timout, umword_t *ret_user_id, bool_t is_call)
{
int ret = -EINVAL;
thread_t *cur_th = thread_get_current();
thread_t *recv_kobj = to_th;
mword_t lock_stats = spinlock_lock(&cur_th->kobj.lock);
if (lock_stats < 0)
{
//!< 锁已经无效了
return -EACCES;
}
again_check:
if (recv_kobj->status == THREAD_READY)
{
thread_wait_entry_t wait;
cur_th->ipc_status = THREAD_SEND; //!< 因为发送挂起
thread_wait_entry_init(&wait, cur_th, timout.send_timeout);
slist_add_append(&recv_kobj->wait_send_head, &wait.node); //!< 放到线程的等待队列中
slist_add_append(&wait_send_queue, &wait.node_timeout);
thread_suspend(cur_th); //!< 挂起
preemption(); //!< 进行调度
if (cur_th->ipc_status == THREAD_IPC_ABORT)
{
cur_th->ipc_status = THREAD_NONE;
ret = -ESHUTDOWN;
goto end;
}
else if (cur_th->ipc_status == THREAD_TIMEOUT)
{
ret = -EWTIMEDOUT;
goto end;
}
cur_th->ipc_status = THREAD_NONE;
goto again_check;
}
else if (recv_kobj->status == THREAD_SUSPEND && recv_kobj->ipc_status == THREAD_RECV)
{
// if (slist_in_list(&recv_kobj->wait_node))
// {
// //!< 如果已经在队列中,则删除
// slist_del(&recv_kobj->wait_node);
// }
//!< 开始发送数据
ret = ipc_data_copy(recv_kobj, cur_th, in_tag); //!< 拷贝数据
if (ret < 0)
{
//!< 拷贝失败
goto end;
}
recv_kobj->last_send_th = cur_th; //!< 设置接收者的上一次发送者是谁
ref_counter_inc(&cur_th->ref); //!< 作为发送者增加一次引用
thread_ready(recv_kobj, TRUE); //!< 直接唤醒接受者
preemption(); //!< 进行调度
}
ret = 0;
end:
spinlock_set(&cur_th->kobj.lock, lock_stats);
}
int thread_ipc_call(thread_t *to_th, msg_tag_t in_tag, msg_tag_t *ret_tag, ipc_timeout_t timout, umword_t *ret_user_id)
{
assert(ret_tag);
assert(is_call && ret_tag);
int ret = -EINVAL;
thread_t *cur_th = thread_get_current();
thread_t *recv_kobj = to_th;
@@ -635,11 +587,6 @@ again_check:
}
else if (recv_kobj->status == THREAD_SUSPEND && recv_kobj->ipc_status == THREAD_RECV)
{
// if (slist_in_list(&recv_kobj->wait_node))
// {
// //!< 如果已经在队列中,则删除
// slist_del(&recv_kobj->wait_node);
// }
//!< 开始发送数据
ret = ipc_data_copy(recv_kobj, cur_th, in_tag); //!< 拷贝数据
if (ret < 0)
@@ -647,14 +594,29 @@ again_check:
//!< 拷贝失败
goto end;
}
recv_kobj->last_send_th = cur_th; //!< 设置接收者的上一次发送者是谁
ref_counter_inc(&cur_th->ref); //!< 作为发送者增加一次引用
thread_ready(recv_kobj, TRUE); //!< 直接唤醒接受者
ret = thread_ipc_recv(ret_tag, timout, ret_user_id); //!< 当前线程进行接收
if (ret < 0)
if (recv_kobj->ipc_kobj)
{
//!< 接收超时
goto end;
// 绑定回复的ipc到当前的线程
assert(ipc_bind(((ipc_t *)(recv_kobj->ipc_kobj)), -1, 0, cur_th) >= 0);
ref_counter_dec_and_release(&((ipc_t *)(recv_kobj->ipc_kobj))->ref,
recv_kobj->ipc_kobj);
recv_kobj->ipc_kobj = NULL;
recv_kobj->last_send_th = NULL;
}
else
{
recv_kobj->last_send_th = cur_th; //!< 设置接收者的上一次发送者是谁
ref_counter_inc(&cur_th->ref); //!< 作为发送者增加一次引用
}
thread_ready(recv_kobj, TRUE); //!< 直接唤醒接受者
if (is_call)
{
ret = thread_ipc_recv(ret_tag, timout, ret_user_id, NULL); //!< 当前线程进行接收
if (ret < 0)
{
//!< 接收超时
goto end;
}
}
preemption(); //!< 进行调度
}
@@ -691,7 +653,7 @@ msg_tag_t thread_do_ipc(kobject_t *kobj, entry_frame_t *f, umword_t user_id)
ipc_timeout_t ipc_tm_out = ipc_timeout_create(f->r[3]);
to_th->user_id = user_id;
ret = thread_ipc_call(to_th, in_tag, &recv_tag, ipc_tm_out, &f->r[1]);
ret = thread_ipc_call(to_th, in_tag, &recv_tag, ipc_tm_out, &f->r[1], TRUE);
if (ret < 0)
{
return msg_tag_init4(0, 0, 0, ret);
@@ -710,8 +672,9 @@ msg_tag_t thread_do_ipc(kobject_t *kobj, entry_frame_t *f, umword_t user_id)
{
msg_tag_t ret_msg;
ipc_timeout_t ipc_tm_out = ipc_timeout_create(f->r[3]);
kobject_t *ipc_kobj = obj_space_lookup_kobj_cmp_type(&cur_task->obj_space, f->r[4], IPC_PROT);
int ret = thread_ipc_recv(&ret_msg, ipc_tm_out, &f->r[1]);
int ret = thread_ipc_recv(&ret_msg, ipc_tm_out, &f->r[1], (ipc_t *)ipc_kobj);
if (ret < 0)
{
return msg_tag_init4(0, 0, 0, ret);
@@ -726,7 +689,7 @@ msg_tag_t thread_do_ipc(kobject_t *kobj, entry_frame_t *f, umword_t user_id)
ipc_timeout_t ipc_tm_out = ipc_timeout_create(f->r[3]);
to_th->user_id = user_id;
ret = thread_ipc_send(to_th, in_tag, ipc_tm_out);
ret = thread_ipc_call(to_th, in_tag, NULL, ipc_tm_out, NULL, FALSE);
return msg_tag_init4(0, 0, 0, ret);
}
default:

View File

@@ -64,7 +64,7 @@ static void knl_main(void)
msg->msg_buf[1] = pos->pid;
msg->msg_buf[2] = 0;
int ret = thread_ipc_call(init_thread, msg_tag_init4(0, 3, 0, 0x0005 /*PM_PROT*/),
&tag, ipc_timeout_create2(3000, 3000), &user_id);
&tag, ipc_timeout_create2(3000, 3000), &user_id, TRUE);
if (ret < 0)
{