Merge branch 'master' into dartuino-bringup

This commit is contained in:
Eric Holland
2015-11-20 15:05:57 -08:00
9 changed files with 1051 additions and 1 deletions

View File

@@ -26,6 +26,7 @@
#include <lib/console.h>
int thread_tests(void);
int port_tests(void);
void printf_tests(void);
void printf_tests_float(void);
void clock_tests(void);

422
app/tests/port_tests.c Normal file
View File

@@ -0,0 +1,422 @@
/*
* Copyright (c) 2015 Carlos Pizano-Uribe cpu@chromium.org
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files
* (the "Software"), to deal in the Software without restriction,
* including without limitation the rights to use, copy, modify, merge,
* publish, distribute, sublicense, and/or sell copies of the Software,
* and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include <debug.h>
#include <err.h>
#include <string.h>
#include <rand.h>
#include <kernel/port.h>
#include <kernel/thread.h>
#include <platform.h>
void* context1 = (void*) 0x53;
static void dump_port_result(const port_result_t* result)
{
const port_packet_t* p = &result->packet;
printf("[%02x %02x %02x %02x %02x %02x %02x %02x]\n",
p->value[0], p->value[1], p->value[2], p->value[3],
p->value[4], p->value[5], p->value[6], p->value[7]);
}
static int single_thread_basic(void)
{
port_t w_port;
status_t st = port_create("sh_prt1", PORT_MODE_UNICAST, &w_port);
if (st < 0) {
printf("could not create port, status = %d\n", st);
return __LINE__;
}
port_t r_port;
st = port_open("sh_prt0", context1, &r_port);
if (st != ERR_NOT_FOUND) {
printf("expected not to find port, status = %d\n", st);
return __LINE__;
}
st = port_open("sh_prt1", context1, &r_port);
if (st < 0) {
printf("could not open port, status = %d\n", st);
return __LINE__;
}
port_packet_t packet[3] = {
{{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}},
{{0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11}},
{{0x33, 0x66, 0x99, 0xcc, 0x33, 0x66, 0x99, 0xcc}},
};
st = port_write(w_port, &packet[0], 1);
if (st < 0) {
printf("could not write port, status = %d\n", st);
return __LINE__;
}
printf("reading from port:\n");
port_result_t res = {0};
st = port_read(r_port, 0, &res);
if (st < 0) {
printf("could not read port, status = %d\n", st);
return __LINE__;
}
if (res.ctx != context1) {
printf("bad context! = %p\n", res.ctx);
return __LINE__;
}
st = port_read(r_port, 0, &res);
if (st != ERR_TIMED_OUT) {
printf("expected timeout, status = %d\n", st);
return __LINE__;
}
st = port_write(w_port, &packet[1], 1);
if (st < 0) {
printf("could not write port, status = %d\n", st);
return __LINE__;
}
st = port_write(w_port, &packet[0], 1);
if (st < 0) {
printf("could not write port, status = %d\n", st);
return __LINE__;
}
st = port_write(w_port, &packet[2], 1);
if (st < 0) {
printf("could not write port, status = %d\n", st);
return __LINE__;
}
int expected_count = 3;
while (true) {
st = port_read(r_port, 0, &res);
if (st < 0)
break;
dump_port_result(&res);
--expected_count;
}
if (expected_count != 0) {
printf("invalid read count = %d\n", expected_count);
return __LINE__;
}
printf("\n");
// port should be empty. should be able to write 8 packets.
expected_count = 8;
while (true) {
st = port_write(w_port, &packet[1], 1);
if (st < 0)
break;
--expected_count;
st = port_write(w_port, &packet[2], 1);
if (st < 0)
break;
--expected_count;
}
if (expected_count != 0) {
printf("invalid write count = %d\n", expected_count);
return __LINE__;
}
// tod(cpu) fix this possibly wrong error.
if (st != ERR_PARTIAL_WRITE) {
printf("expected buffer error, status =%d\n", st);
return __LINE__;
}
// read 3 packets.
for (int ix = 0; ix != 3; ++ix) {
st = port_read(r_port, 0, &res);
if (st < 0) {
printf("could not read port, status = %d\n", st);
return __LINE__;
}
}
// there are 5 packets, now we add another 3.
st = port_write(w_port, packet, 3);
if (st < 0) {
printf("could not write port, status = %d\n", st);
return __LINE__;
}
expected_count = 8;
while (true) {
st = port_read(r_port, 0, &res);
if (st < 0)
break;
dump_port_result(&res);
--expected_count;
}
if (expected_count != 0) {
printf("invalid read count = %d\n", expected_count);
return __LINE__;
}
// attempt to use the wrong port.
st = port_write(r_port, &packet[1], 1);
if (st != ERR_BAD_HANDLE) {
printf("expected bad handle error, status = %d\n", st);
return __LINE__;
}
st = port_read(w_port, 0, &res);
if (st != ERR_BAD_HANDLE) {
printf("expected bad handle error, status = %d\n", st);
return __LINE__;
}
st = port_close(r_port);
if (st < 0) {
printf("could not close read port, status = %d\n", st);
return __LINE__;
}
st = port_close(w_port);
if (st < 0) {
printf("could not close write port, status = %d\n", st);
return __LINE__;
}
st = port_close(r_port);
if (st != ERR_BAD_HANDLE) {
printf("expected bad handle error, status = %d\n", st);
return __LINE__;
}
st = port_close(w_port);
if (st != ERR_BAD_HANDLE) {
printf("expected bad handle error, status = %d\n", st);
return __LINE__;
}
st = port_destroy(w_port);
if (st < 0) {
printf("could not destroy port, status = %d\n", st);
return __LINE__;
}
printf("single_thread_basic : ok\n");
return 0;
}
static int ping_pong_thread(void *arg)
{
port_t r_port;
status_t st = port_open("ping_port", NULL, &r_port);
if (st < 0) {
printf("thread: could not open port, status = %d\n", st);
return __LINE__;
}
bool should_dispose_pong_port = true;
port_t w_port;
st = port_create("pong_port", PORT_MODE_UNICAST, &w_port);
if (st == ERR_ALREADY_EXISTS) {
// won the race to create the port.
should_dispose_pong_port = false;
} else if (st < 0) {
printf("thread: could not open port, status = %d\n", st);
return __LINE__;
}
port_result_t pr;
// the loop is read-mutate-write until the write port
// is closed by the master thread.
while (true) {
st = port_read(r_port, INFINITE_TIME, &pr);
if (st == ERR_CANCELLED) {
break;
} else if (st < 0) {
printf("thread: could not read port, status = %d\n", st);
return __LINE__;
}
pr.packet.value[0]++;
pr.packet.value[5]--;
st = port_write(w_port, &pr.packet, 1);
if (st < 0) {
printf("thread: could not write port, status = %d\n", st);
return __LINE__;
}
}
port_close(r_port);
if (should_dispose_pong_port) {
port_close(w_port);
port_destroy(w_port);
}
return 0;
bail:
return __LINE__;
}
int two_threads_basic(void)
{
port_t w_port;
status_t st = port_create("ping_port", PORT_MODE_BROADCAST, &w_port);
if (st < 0) {
printf("could not create port, status = %d\n", st);
return __LINE__;
}
thread_t* t1 = thread_create(
"worker1", &ping_pong_thread, NULL, DEFAULT_PRIORITY, DEFAULT_STACK_SIZE);
thread_t* t2 = thread_create(
"worker2", &ping_pong_thread, NULL, DEFAULT_PRIORITY, DEFAULT_STACK_SIZE);
thread_resume(t1);
thread_resume(t2);
// wait for the pong port to be created, the two threads race to do it.
port_t r_port;
while (true) {
status_t st = port_open("pong_port", NULL, &r_port);
if (st == NO_ERROR) {
break;
} else if (st == ERR_NOT_FOUND) {
thread_sleep(100);
} else {
printf("could not open port, status = %d\n", st);
return __LINE__;
}
}
// We have two threads listening to the ping port. Which both reply
// on the pong port, so we get two packets in per packet out.
const int passes = 256;
printf("two_threads_basic test, %d passes\n", passes);
port_packet_t packet_out = {{0xaf, 0x77, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05}};
port_result_t pr;
for (int ix = 0; ix != passes; ++ix) {
const size_t count = 1 + ((unsigned int)rand() % 3);
for (size_t jx = 0; jx != count; ++jx) {
st = port_write(w_port, &packet_out, 1);
if (st < 0) {
printf("could not write port, status = %d\n", st);
return __LINE__;
}
}
packet_out.value[0]++;
packet_out.value[5]--;
for (size_t jx = 0; jx != count * 2; ++jx) {
st = port_read(r_port, INFINITE_TIME, &pr);
if (st < 0) {
printf("could not read port, status = %d\n", st);
return __LINE__;
}
if ((pr.packet.value[0] != packet_out.value[0]) ||
(pr.packet.value[5] != packet_out.value[5])) {
printf("unexpected data in packet, loop %d", ix);
return __LINE__;
}
}
}
thread_sleep(100);
// there should be no more packets to read.
st = port_read(r_port, 0, &pr);
if (st != ERR_TIMED_OUT) {
printf("unexpected packet, status = %d\n", st);
return __LINE__;
}
printf("two_threads_basic master shutdown\n");
st = port_close(r_port);
if (st < 0) {
printf("could not close port, status = %d\n", st);
return __LINE__;
}
st = port_close(w_port);
if (st < 0) {
printf("could not close port, status = %d\n", st);
return __LINE__;
}
st = port_destroy(w_port);
if (st < 0) {
printf("could not destroy port, status = %d\n", st);
return __LINE__;
}
int retcode = -1;
thread_join(t1, &retcode, INFINITE_TIME);
if (retcode)
goto fail;
thread_join(t2, &retcode, INFINITE_TIME);
if (retcode)
goto fail;
return 0;
fail:
printf("child thread exited with %d\n", retcode);
return __LINE__;
}
#define RUN_TEST(t) result = t(); if (result) goto fail
int port_tests(void)
{
int result;
int count = 2;
while (count--) {
RUN_TEST(single_thread_basic);
RUN_TEST(two_threads_basic);
}
printf("all tests passed\n");
return 0;
fail:
printf("test failed at line %d\n", result);
return 1;
}
#undef RUN_TEST

View File

@@ -14,6 +14,7 @@ MODULE_SRCS += \
$(LOCAL_DIR)/printf_tests.c \
$(LOCAL_DIR)/tests.c \
$(LOCAL_DIR)/thread_tests.c \
$(LOCAL_DIR)/port_tests.c \
MODULE_ARM_OVERRIDE_SRCS := \

View File

@@ -32,6 +32,7 @@ STATIC_COMMAND_START
STATIC_COMMAND("printf_tests", "test printf", (console_cmd)&printf_tests)
STATIC_COMMAND("printf_tests_float", "test printf with floating point", (console_cmd)&printf_tests_float)
STATIC_COMMAND("thread_tests", "test the scheduler", (console_cmd)&thread_tests)
STATIC_COMMAND("port_tests", "test the ports", (console_cmd)&port_tests)
STATIC_COMMAND("clock_tests", "test clocks", (console_cmd)&clock_tests)
STATIC_COMMAND("bench", "miscellaneous benchmarks", (console_cmd)&benchmarks)
STATIC_COMMAND("fibo", "threaded fibonacci", (console_cmd)&fibo)

View File

@@ -71,6 +71,7 @@
#define ERR_NO_RESOURCES (-41)
#define ERR_BAD_HANDLE (-42)
#define ERR_ACCESS_DENIED (-43)
#define ERR_PARTIAL_WRITE (-44)
#define ERR_USER_BASE (-16384)

101
include/kernel/port.h Normal file
View File

@@ -0,0 +1,101 @@
/*
* Copyright (c) 2015 Carlos Pizano-Uribe cpu@chromium.org
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files
* (the "Software"), to deal in the Software without restriction,
* including without limitation the rights to use, copy, modify, merge,
* publish, distribute, sublicense, and/or sell copies of the Software,
* and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef __KERNEL_PORT_H
#define __KERNEL_PORT_H
#include <sys/types.h>
#include <compiler.h>
__BEGIN_CDECLS;
/* Ports are named, opaque objects and come in tree flavors, the
* write-side, the read-side and a port group which is a collection
* of read-side ports.
*/
#define PORT_NAME_LEN 12
typedef void* port_t;
typedef struct {
char value[8];
} port_packet_t;
typedef struct {
void* ctx;
port_packet_t packet;
} port_result_t;
typedef enum {
PORT_MODE_BROADCAST = 0,
PORT_MODE_UNICAST = 1,
PORT_MODE_BIG_BUFFER = 2,
} port_mode_t;
/* Inits the port subsystem
*/
void port_init(void);
/* Make a named write-side port. broadcast ports can be opened by any
* number of read-clients. |name| can be up to PORT_NAME_LEN chars. If
* the write port exists it is returned even if the |mode| does not match.
*/
status_t port_create(const char* name, port_mode_t mode, port_t* port);
/* Make a read-side port. Only non-destroyed existing write ports can
* be opened with this api. Unicast ports can only be opened once. For
* broadcast ports, each call if successful returns a new port.
*/
status_t port_open(const char* name, void* ctx, port_t* port);
/* Creates a read-side port group which behaves just like a regular
* read-side port. A given port can only be assoicated with one port group.
*/
status_t port_group(port_t* ports, size_t count, port_t* group);
/* Write to a port |count| packets, non-blocking, all or none atomic success.
*/
status_t port_write(port_t port, const port_packet_t* pk, size_t count);
/* Read one packet from the port or port group, blocking. The |result| contains
* the port that the message was read from. If |timeout| is zero the call
* does not block.
*/
status_t port_read(port_t port, lk_time_t timeout, port_result_t* result);
/* Destroy the write-side port, flush queued packets and release all resources,
* all calls will now fail on that port. Only a closed port can be destroyed.
*/
status_t port_destroy(port_t port);
/* Close the read-side port or the write side port. A closed write side port
* can be opened and the pending packets read. closing a port group does not
* close the included ports.
*/
status_t port_close(port_t port);
__END_CDECLS;
#endif

View File

@@ -26,6 +26,7 @@
#include <kernel/thread.h>
#include <kernel/timer.h>
#include <kernel/mp.h>
#include <kernel/port.h>
void kernel_init(void)
{
@@ -43,5 +44,9 @@ void kernel_init(void)
// initialize kernel timers
dprintf(SPEW, "initializing timers\n");
timer_init();
// initialize ports
dprintf(SPEW, "initializing ports\n");
port_init();
}

517
kernel/port.c Normal file
View File

@@ -0,0 +1,517 @@
/*
* Copyright (c) 2015 Carlos Pizano-Uribe cpu@chromium.org
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files
* (the "Software"), to deal in the Software without restriction,
* including without limitation the rights to use, copy, modify, merge,
* publish, distribute, sublicense, and/or sell copies of the Software,
* and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/**
* @file
* @brief Port object functions
* @defgroup event Events
*
*/
#include <debug.h>
#include <list.h>
#include <malloc.h>
#include <string.h>
#include <pow2.h>
#include <err.h>
#include <kernel/thread.h>
#include <kernel/port.h>
// write ports can be in two states, open and closed, which have a
// different magic number.
#define WRITEPORT_MAGIC_W 'prtw'
#define WRITEPORT_MAGIC_X 'prtx'
#define READPORT_MAGIC 'prtr'
#define PORTGROUP_MAGIC 'prtg'
#define PORT_BUFF_SIZE 8
#define PORT_BUFF_SIZE_BIG 64
#define RESCHEDULE_POLICY 1
#define MAX_PORT_GROUP_COUNT 256
typedef struct {
uint log2;
uint avail;
uint head;
uint tail;
port_packet_t packet[1];
} port_buf_t;
typedef struct {
int magic;
struct list_node node;
port_buf_t* buf;
struct list_node rp_list;
port_mode_t mode;
char name[PORT_NAME_LEN];
} write_port_t;
typedef struct {
int magic;
wait_queue_t wait;
struct list_node rp_list;
} port_group_t;
typedef struct {
int magic;
struct list_node w_node;
struct list_node g_node;
port_buf_t* buf;
void* ctx;
wait_queue_t wait;
write_port_t* wport;
port_group_t* gport;
} read_port_t;
static struct list_node write_port_list;
static port_buf_t* make_buf(uint pk_count)
{
uint size = sizeof(port_buf_t) + ((pk_count - 1) * sizeof(port_packet_t));
port_buf_t* buf = (port_buf_t*) malloc(size);
if (!buf)
return NULL;
buf->log2 = log2_uint(pk_count);
buf->head = buf->tail = 0;
buf->avail = pk_count;
return buf;
}
static status_t buf_write(port_buf_t* buf, const port_packet_t* packets, size_t count)
{
if (buf->avail < count)
return ERR_NOT_ENOUGH_BUFFER;
for (size_t ix = 0; ix != count; ix++) {
buf->packet[buf->tail] = packets[ix];
buf->tail = modpow2(++buf->tail, buf->log2);
}
buf->avail -= count;
return NO_ERROR;
}
static status_t buf_read(port_buf_t* buf, port_result_t* pr)
{
if (buf->avail == valpow2(buf->log2))
return ERR_NO_MSG;
pr->packet = buf->packet[buf->head];
buf->head = modpow2(++buf->head, buf->log2);
++buf->avail;
return NO_ERROR;
}
// must be called before any use of ports.
void port_init(void)
{
list_initialize(&write_port_list);
}
status_t port_create(const char* name, port_mode_t mode, port_t* port)
{
if (!name || !port)
return ERR_INVALID_ARGS;
// only unicast ports can have a large buffer.
if (mode & PORT_MODE_BROADCAST) {
if (mode & PORT_MODE_BIG_BUFFER)
return ERR_INVALID_ARGS;
}
if (strlen(name) >= PORT_NAME_LEN)
return ERR_INVALID_ARGS;
// lookup for existing port, return that if found.
write_port_t* wp = NULL;
THREAD_LOCK(state1);
list_for_every_entry(&write_port_list, wp, write_port_t, node) {
if (strcmp(wp->name, name) == 0) {
// can't return closed ports.
if (wp->magic == WRITEPORT_MAGIC_X)
wp = NULL;
THREAD_UNLOCK(state1);
if (wp) {
*port = (void*) wp;
return ERR_ALREADY_EXISTS;
} else {
return ERR_BUSY;
}
}
}
THREAD_UNLOCK(state1);
// not found, create the write port and the circular buffer.
wp = calloc(1, sizeof(write_port_t));
if (!wp)
return ERR_NO_MEMORY;
wp->magic = WRITEPORT_MAGIC_W;
wp->mode = mode;
strlcpy(wp->name, name, sizeof(wp->name));
list_initialize(&wp->rp_list);
uint size = (mode & PORT_MODE_BIG_BUFFER) ? PORT_BUFF_SIZE_BIG : PORT_BUFF_SIZE;
wp->buf = make_buf(size);
if (!wp->buf) {
free(wp);
return ERR_NO_MEMORY;
}
// todo: race condtion! a port with the same name could have been created
// by another thread at is point.
THREAD_LOCK(state2);
list_add_tail(&write_port_list, &wp->node);
THREAD_UNLOCK(state2);
*port = (void*)wp;
return NO_ERROR;
}
status_t port_open(const char* name, void* ctx, port_t* port)
{
if (!name || !port)
return ERR_INVALID_ARGS;
// assume success; create the read port and buffer now.
read_port_t* rp = calloc(1, sizeof(read_port_t));
if (!rp)
return ERR_NO_MEMORY;
rp->magic = READPORT_MAGIC;
wait_queue_init(&rp->wait);
rp->ctx = ctx;
// |buf| might not be needed, but we always allocate outside the lock.
// this buffer is only needed for broadcast ports, but we don't know
// that here.
port_buf_t* buf = make_buf(PORT_BUFF_SIZE);
if (!buf)
return ERR_NO_MEMORY;
// find the named write port and associate it with read port.
status_t rc = ERR_NOT_FOUND;
THREAD_LOCK(state);
write_port_t* wp = NULL;
list_for_every_entry(&write_port_list, wp, write_port_t, node) {
if (strcmp(wp->name, name) == 0) {
// found; add read port to write port list.
rp->wport = wp;
if (wp->buf) {
// this is the first read port; transfer the circular buffer.
list_add_tail(&wp->rp_list, &rp->w_node);
rp->buf = wp->buf;
wp->buf = NULL;
rc = NO_ERROR;
} else if (buf) {
// not first read port.
if (wp->mode & PORT_MODE_UNICAST) {
// cannot add a second listener.
rc = ERR_NOT_ALLOWED;
break;
}
// use the new (small) circular buffer.
list_add_tail(&wp->rp_list, &rp->w_node);
rp->buf = buf;
buf = NULL;
rc = NO_ERROR;
} else {
// |buf| allocation failed and the buffer was needed.
rc = ERR_NO_MEMORY;
}
break;
}
}
THREAD_UNLOCK(state);
if (buf)
free(buf);
if (rc == NO_ERROR) {
*port = (void*)rp;
} else {
free(rp);
}
return rc;
}
status_t port_group(port_t* ports, size_t count, port_t* group)
{
if (count > MAX_PORT_GROUP_COUNT)
return ERR_TOO_BIG;
if (!ports || !group)
return ERR_INVALID_ARGS;
// assume success; create port group now.
port_group_t* pg = calloc(1, sizeof(port_group_t));
if (!pg)
return ERR_NO_MEMORY;
pg->magic = PORTGROUP_MAGIC;
wait_queue_init(&pg->wait);
list_initialize(&pg->rp_list);
status_t rc = NO_ERROR;
THREAD_LOCK(state);
for (size_t ix = 0; ix != count; ix++) {
read_port_t* rp = (read_port_t*)ports[ix];
if ((rp->magic != READPORT_MAGIC) || rp->gport) {
// wrong type of port, or port already part of a group,
// in any case, undo the changes to the previous read ports.
for (size_t jx = 0; jx != ix; jx++) {
((read_port_t*)ports[jx])->gport = NULL;
}
rc = ERR_BAD_HANDLE;
break;
}
// link port group and read port.
rp->gport = pg;
list_add_tail(&pg->rp_list, &rp->g_node);
}
THREAD_UNLOCK(state);
if (rc == NO_ERROR) {
*group = (port_t*)pg;
} else {
free(pg);
}
return rc;
}
status_t port_write(port_t port, const port_packet_t* pk, size_t count)
{
if (!port || !pk)
return ERR_INVALID_ARGS;
write_port_t* wp = (write_port_t*)port;
THREAD_LOCK(state);
if (wp->magic != WRITEPORT_MAGIC_W) {
// wrong port type.
THREAD_UNLOCK(state);
return ERR_BAD_HANDLE;
}
status_t status = NO_ERROR;
int awake_count = 0;
if (wp->buf) {
// there are no read ports, just write to the buffer.
status = buf_write(wp->buf, pk, count);
} else {
// there are read ports. for each, write and attempt to wake a thread
// from the port group or from the read port itself.
read_port_t* rp;
list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) {
if (buf_write(rp->buf, pk, count) < 0) {
// buffer full.
status = ERR_PARTIAL_WRITE;
continue;
}
int awaken = 0;
if (rp->gport) {
awaken = wait_queue_wake_one(&rp->gport->wait, false, NO_ERROR);
}
if (!awaken) {
awaken = wait_queue_wake_one(&rp->wait, false, NO_ERROR);
}
awake_count += awaken;
}
}
THREAD_UNLOCK(state);
#if RESCHEDULE_POLICY
if (awake_count)
thread_yield();
#endif
return status;
}
static inline status_t read_no_lock(read_port_t* rp, lk_time_t timeout, port_result_t* result)
{
status_t status = buf_read(rp->buf, result);
result->ctx = rp->ctx;
if (status != ERR_NO_MSG)
return status;
// early return allows compiler to elide the rest for the group read case.
if (!timeout)
return ERR_TIMED_OUT;
status_t wr = wait_queue_block(&rp->wait, timeout);
if (wr != NO_ERROR)
return wr;
// recursive tail call is usually optimized away with a goto.
return read_no_lock(rp, timeout, result);
}
status_t port_read(port_t port, lk_time_t timeout, port_result_t* result)
{
if (!port || !result)
return ERR_INVALID_ARGS;
status_t rc = ERR_GENERIC;
read_port_t* rp = (read_port_t*)port;
THREAD_LOCK(state);
if (rp->magic == READPORT_MAGIC) {
// dealing with a single port.
rc = read_no_lock(rp, timeout, result);
} else if (rp->magic == PORTGROUP_MAGIC) {
// dealing with a port group.
port_group_t* pg = (port_group_t*)port;
do {
// read each port with no timeout.
// todo: this order is fixed, probably a bad thing.
list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) {
rc = read_no_lock(rp, 0, result);
if (rc != ERR_TIMED_OUT)
goto read_exit;
}
// no data, block on the group waitqueue.
rc = wait_queue_block(&pg->wait, timeout);
} while (rc == NO_ERROR);
} else {
// wrong port type.
rc = ERR_BAD_HANDLE;
}
read_exit:
THREAD_UNLOCK(state);
return rc;
}
status_t port_destroy(port_t port)
{
if (!port)
return ERR_INVALID_ARGS;
write_port_t* wp = (write_port_t*) port;
port_buf_t* buf = NULL;
THREAD_LOCK(state);
if (wp->magic != WRITEPORT_MAGIC_X) {
// wrong port type.
THREAD_UNLOCK(state);
return ERR_BAD_HANDLE;
}
// remove self from global named ports list.
list_delete(&wp->node);
if (wp->buf) {
// we have no readers.
buf = wp->buf;
} else {
// for each reader:
read_port_t* rp;
list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) {
// wake the read and group ports.
wait_queue_wake_all(&rp->wait, false, ERR_CANCELLED);
if (rp->gport) {
wait_queue_wake_all(&rp->gport->wait, false, ERR_CANCELLED);
}
// remove self from reader ports.
rp->wport = NULL;
}
}
wp->magic = 0;
THREAD_UNLOCK(state);
free(buf);
free(wp);
return NO_ERROR;
}
status_t port_close(port_t port)
{
if (!port)
return ERR_INVALID_ARGS;
read_port_t* rp = (read_port_t*) port;
port_buf_t* buf = NULL;
THREAD_LOCK(state);
if (rp->magic == READPORT_MAGIC) {
// dealing with a read port.
if (rp->wport) {
// remove self from write port list and reassign the bufer if last.
list_delete(&rp->w_node);
if (list_is_empty(&rp->wport->rp_list)) {
rp->wport->buf = rp->buf;
rp->buf = NULL;
} else {
buf = rp->buf;
}
}
if (rp->gport) {
// remove self from port group list.
list_delete(&rp->g_node);
}
// wake up waiters, the return code is ERR_OBJECT_DESTROYED.
wait_queue_destroy(&rp->wait, true);
rp->magic = 0;
} else if (rp->magic == PORTGROUP_MAGIC) {
// dealing with a port group.
port_group_t* pg = (port_group_t*) port;
// wake up waiters.
wait_queue_destroy(&pg->wait, true);
// remove self from reader ports.
rp = NULL;
list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) {
rp->gport = NULL;
}
pg->magic = 0;
} else if (rp->magic == WRITEPORT_MAGIC_W) {
// dealing with a write port.
write_port_t* wp = (write_port_t*) port;
// mark it as closed. Now it can be read but not written to.
wp->magic = WRITEPORT_MAGIC_X;
THREAD_UNLOCK(state);
return NO_ERROR;
} else {
THREAD_UNLOCK(state);
return ERR_BAD_HANDLE;
}
THREAD_UNLOCK(state);
free(buf);
free(port);
return NO_ERROR;
}

View File

@@ -15,7 +15,8 @@ MODULE_SRCS := \
$(LOCAL_DIR)/thread.c \
$(LOCAL_DIR)/timer.c \
$(LOCAL_DIR)/semaphore.c \
$(LOCAL_DIR)/mp.c
$(LOCAL_DIR)/mp.c \
$(LOCAL_DIR)/port.c
ifeq ($(WITH_KERNEL_VM),1)
MODULE_DEPS += kernel/vm