Merge branch 'port_race_test' of github.com:redpig/lk
edit: add a few quickie warning fixes
This commit is contained in:
@@ -275,6 +275,248 @@ bail:
|
||||
return __LINE__;
|
||||
}
|
||||
|
||||
static const char *kStatusPortNames[] = {
|
||||
"status0",
|
||||
"status1",
|
||||
};
|
||||
static const char *kRacePortName = "racer_port";
|
||||
|
||||
static const port_packet_t kRepeat =
|
||||
{{'R', 'E', 'P', 'E', 'A', 'T', 0, 0}};
|
||||
static const port_packet_t kQuit =
|
||||
{{'Q', 'U', 'I', 'T', 0, 0, 0, 0}};
|
||||
|
||||
static event_t race_evt;
|
||||
|
||||
static int race_thread(void *arg)
|
||||
{
|
||||
port_t r_port;
|
||||
int tid = (int)(intptr_t)arg;
|
||||
|
||||
printf("thread %d: connecting to control port\n", tid);
|
||||
status_t st = port_open("race_ctl", NULL, &r_port);
|
||||
if (st < 0) {
|
||||
printf("thread %d: could not open control port, status = %d\n", tid, st);
|
||||
return __LINE__;
|
||||
}
|
||||
|
||||
printf("thread %d: creating status port\n", tid);
|
||||
port_t w_port;
|
||||
st = port_create(kStatusPortNames[tid], PORT_MODE_UNICAST, &w_port);
|
||||
if (st < 0) {
|
||||
printf("thread %d: could not create status port, status = %d\n", tid, st);
|
||||
port_close(r_port);
|
||||
return __LINE__;
|
||||
}
|
||||
|
||||
// Loop is meant to coordinate a port_create() race.
|
||||
// The event triggers the race.
|
||||
// The thread sleeps briefly then cleans up
|
||||
// The thread reports its claim to its status port.
|
||||
// It then waits for a repeat or quit message.
|
||||
int ret = -1;
|
||||
while (ret < 0) {
|
||||
LTRACEF_LEVEL(1, "thread %d: waiting at the starting line\n", tid);
|
||||
if (event_wait_timeout(&race_evt, INFINITE_TIME) != NO_ERROR) {
|
||||
ret = __LINE__;
|
||||
break;
|
||||
}
|
||||
|
||||
port_t race_port;
|
||||
while(true) {
|
||||
st = port_create(kRacePortName, PORT_MODE_UNICAST, &race_port);
|
||||
if (st != ERR_BUSY)
|
||||
break;
|
||||
thread_sleep(25);
|
||||
} // EINTR all over again . . .
|
||||
LTRACEF_LEVEL(1, "thread %d: sampling chronochip (%p)\n", tid, race_port);
|
||||
if (st == ERR_ALREADY_EXISTS) {
|
||||
// lost the race to create the port.
|
||||
} else if (st < 0) {
|
||||
LTRACEF_LEVEL(1, "thread %d: could not open port, status = %d\n", tid, st);
|
||||
ret = __LINE__;
|
||||
break;
|
||||
} else { // Dispose of it now.
|
||||
thread_sleep(25);
|
||||
port_close(race_port);
|
||||
port_destroy(race_port);
|
||||
}
|
||||
|
||||
// Now send the stale pointer address as a status.
|
||||
port_packet_t claimed_port = {{0}};
|
||||
int len = sizeof(claimed_port.value);
|
||||
if (sizeof(race_port) < (size_t)len)
|
||||
len = sizeof(race_port);
|
||||
for (int i = 0; i < len; ++i) {
|
||||
claimed_port.value[i] = 0xff & ((uintptr_t)race_port >> (i * 8));
|
||||
}
|
||||
LTRACEF_LEVEL(1, "thread %d: reporting status\n", tid);
|
||||
st = port_write(w_port, &claimed_port, 1);
|
||||
if (st < 0) {
|
||||
printf("thread %d: could not write port, status = %d\n", tid, st);
|
||||
ret = __LINE__;
|
||||
break;
|
||||
}
|
||||
|
||||
LTRACEF_LEVEL(1, "thread %d: awaiting instructions\n", tid);
|
||||
port_result_t pr;
|
||||
st = port_read(r_port, INFINITE_TIME, &pr);
|
||||
if (st == ERR_CANCELLED) {
|
||||
printf("thread %d: could not read port, status = %d (CANCELLED)\n", tid, st);
|
||||
ret = __LINE__;
|
||||
break;
|
||||
} else if (st < 0) {
|
||||
printf("thread %d: could not read port, status = %d\n", tid, st);
|
||||
ret = __LINE__;
|
||||
break;
|
||||
}
|
||||
if (memcmp(pr.packet.value, kQuit.value, sizeof(pr.packet.value)) == 0) {
|
||||
ret = 0;
|
||||
break;
|
||||
}
|
||||
if (memcmp(pr.packet.value, kRepeat.value, sizeof(pr.packet.value)) == 0) {
|
||||
continue;
|
||||
}
|
||||
printf("thread %d: got a weird message from the control port\n", tid);
|
||||
ret = __LINE__;
|
||||
}
|
||||
thread_sleep((1+tid) * 5); // Make console output orderly.
|
||||
printf("thread %d: shutting down (ret=%d)\n", tid, ret);
|
||||
|
||||
port_close(r_port);
|
||||
port_close(w_port);
|
||||
port_destroy(w_port);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int two_threads_race(void)
|
||||
{
|
||||
printf("two_threads_race test . . .\n");
|
||||
// Used to tell the threads what to do.
|
||||
port_t w_port;
|
||||
status_t st = port_create("race_ctl", PORT_MODE_BROADCAST, &w_port);
|
||||
if (st < 0) {
|
||||
printf("could not create port, status = %d\n", st);
|
||||
return __LINE__;
|
||||
}
|
||||
|
||||
event_init(&race_evt, false, 0);
|
||||
|
||||
thread_t *t1 = thread_create(
|
||||
"rt0", &race_thread, (void *)0, DEFAULT_PRIORITY, DEFAULT_STACK_SIZE);
|
||||
thread_t *t2 = thread_create(
|
||||
"rt1", &race_thread, (void *)1, DEFAULT_PRIORITY, DEFAULT_STACK_SIZE);
|
||||
thread_set_real_time(t1);
|
||||
thread_set_real_time(t2);
|
||||
thread_resume(t1);
|
||||
thread_resume(t2);
|
||||
|
||||
// wait for each status port to be created so we can
|
||||
// track behavior.
|
||||
port_t r_port0, r_port1;
|
||||
printf("control: connecting to thread 0 . . .\n");
|
||||
while (true) {
|
||||
st = port_open(kStatusPortNames[0], NULL, &r_port0);
|
||||
if (st == NO_ERROR) {
|
||||
break;
|
||||
} else if (st == ERR_NOT_FOUND) {
|
||||
thread_sleep(100);
|
||||
} else {
|
||||
printf("could not open port, status = %d\n", st);
|
||||
// XXX: clean up...
|
||||
break;
|
||||
}
|
||||
}
|
||||
printf("control: connecting to thread 1 . . .\n");
|
||||
while (true) {
|
||||
st = port_open(kStatusPortNames[1], NULL, &r_port1);
|
||||
if (st == NO_ERROR) {
|
||||
break;
|
||||
} else if (st == ERR_NOT_FOUND) {
|
||||
thread_sleep(100);
|
||||
} else {
|
||||
printf("could not open port, status = %d\n", st);
|
||||
return __LINE__;
|
||||
}
|
||||
}
|
||||
|
||||
// control port says: 0 "REPEAT, or 1 "QUIT"
|
||||
int ret = 0;
|
||||
int count = 0;
|
||||
while (ret == 0) {
|
||||
LTRACEF_LEVEL(1, "Go!\n");
|
||||
printf(".");
|
||||
event_signal(&race_evt, false);
|
||||
port_result_t pr0, pr1;
|
||||
LTRACEF_LEVEL(1, "Collecting status from thread 0 . . .\n");
|
||||
st = port_read(r_port0, INFINITE_TIME, &pr0);
|
||||
if (st < 0) {
|
||||
printf("could not read port, status = %d\n", st);
|
||||
ret = __LINE__;
|
||||
}
|
||||
LTRACEF_LEVEL(1, "Collecting status from thread 1 . . .\n");
|
||||
st = port_read(r_port1, INFINITE_TIME, &pr1);
|
||||
if (st < 0) {
|
||||
printf("could not read port, status = %d\n", st);
|
||||
ret = __LINE__;
|
||||
}
|
||||
LTRACEF_LEVEL(1, "Checking responses . . .\n");
|
||||
if (memcmp(pr0.packet.value, pr1.packet.value, sizeof(pr0.packet.value)) != 0) {
|
||||
printf("Race detected on iteration %d!\n", count);
|
||||
ret = __LINE__;
|
||||
}
|
||||
event_unsignal(&race_evt);
|
||||
int repeat = (ret == 0 && count++ < 99 ? 1 : 0);
|
||||
LTRACEF_LEVEL(1, "Telling threads to %s\n", (repeat ? "repeat" : "quit"));
|
||||
st = port_write(w_port, (repeat ? &kRepeat : &kQuit), 1);
|
||||
if (st < 0) {
|
||||
printf("could not write port, status = %d\n", st);
|
||||
ret = __LINE__;
|
||||
}
|
||||
if (!repeat) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
printf("\n%d passes completed with result %d\n", count, ret);
|
||||
|
||||
st = port_close(r_port0);
|
||||
if (st < 0) {
|
||||
printf("could not close port, status = %d\n", st);
|
||||
ret = __LINE__;
|
||||
}
|
||||
|
||||
st = port_close(r_port1);
|
||||
if (st < 0) {
|
||||
printf("could not close port, status = %d\n", st);
|
||||
ret = __LINE__;
|
||||
}
|
||||
|
||||
st = port_close(w_port);
|
||||
if (st < 0) {
|
||||
printf("could not close port, status = %d\n", st);
|
||||
ret = __LINE__;
|
||||
}
|
||||
|
||||
int retcode = -1;
|
||||
thread_join(t1, &retcode, INFINITE_TIME);
|
||||
if (retcode)
|
||||
ret = retcode;
|
||||
|
||||
thread_join(t2, &retcode, INFINITE_TIME);
|
||||
if (retcode)
|
||||
ret = retcode;
|
||||
|
||||
st = port_destroy(w_port);
|
||||
if (st < 0) {
|
||||
printf("could not destroy port, status = %d\n", st);
|
||||
ret = __LINE__;
|
||||
}
|
||||
|
||||
printf("two_thread_race: %d\n", ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
static int two_threads_basic(void) {
|
||||
port_t w_port;
|
||||
@@ -713,6 +955,7 @@ int port_tests(int argc, const console_cmd_args *argv) {
|
||||
while (count--) {
|
||||
RUN_TEST(single_thread_basic);
|
||||
RUN_TEST(two_threads_basic);
|
||||
RUN_TEST(two_threads_race);
|
||||
RUN_TEST(group_basic);
|
||||
RUN_TEST(group_dynamic);
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
|
||||
#define READPORT_MAGIC (0x70727472) // 'prtr'
|
||||
#define PORTGROUP_MAGIC (0x70727467) // 'prtg'
|
||||
#define PORTHOLD_MAGIC (0x70727467) // 'prth'
|
||||
|
||||
#define PORT_BUFF_SIZE 8
|
||||
#define PORT_BUFF_SIZE_BIG 64
|
||||
@@ -129,16 +130,24 @@ status_t port_create(const char *name, port_mode_t mode, port_t *port) {
|
||||
return ERR_INVALID_ARGS;
|
||||
}
|
||||
|
||||
if (strlen(name) >= PORT_NAME_LEN)
|
||||
if (strnlen(name, PORT_NAME_LEN) >= PORT_NAME_LEN)
|
||||
return ERR_INVALID_ARGS;
|
||||
|
||||
|
||||
// Add a stack-allocated port to the list until we can
|
||||
// replace it with a heap-allocated port.
|
||||
write_port_t stack_wp = { .magic = PORTHOLD_MAGIC };
|
||||
// We waste a few cycles here with a throwaway copy.
|
||||
strlcpy(stack_wp.name, name, sizeof(stack_wp.name));
|
||||
|
||||
// lookup for existing port, return that if found.
|
||||
write_port_t *wp = NULL;
|
||||
THREAD_LOCK(state1);
|
||||
list_for_every_entry(&write_port_list, wp, write_port_t, node) {
|
||||
if (strcmp(wp->name, name) == 0) {
|
||||
// can't return closed ports.
|
||||
if (wp->magic == WRITEPORT_MAGIC_X)
|
||||
// can't return closed or partial ports.
|
||||
if (wp->magic == WRITEPORT_MAGIC_X ||
|
||||
wp->magic == PORTHOLD_MAGIC)
|
||||
wp = NULL;
|
||||
THREAD_UNLOCK(state1);
|
||||
if (wp) {
|
||||
@@ -149,12 +158,17 @@ status_t port_create(const char *name, port_mode_t mode, port_t *port) {
|
||||
}
|
||||
}
|
||||
}
|
||||
list_add_tail(&write_port_list, &stack_wp.node);
|
||||
THREAD_UNLOCK(state1);
|
||||
|
||||
// not found, create the write port and the circular buffer.
|
||||
wp = calloc(1, sizeof(write_port_t));
|
||||
if (!wp)
|
||||
if (!wp) {
|
||||
THREAD_LOCK(state2);
|
||||
list_delete(&stack_wp.node);
|
||||
THREAD_UNLOCK(state2);
|
||||
return ERR_NO_MEMORY;
|
||||
}
|
||||
|
||||
wp->magic = WRITEPORT_MAGIC_W;
|
||||
wp->mode = mode;
|
||||
@@ -164,13 +178,18 @@ status_t port_create(const char *name, port_mode_t mode, port_t *port) {
|
||||
wp->buf = make_buf(mode & PORT_MODE_BIG_BUFFER);
|
||||
if (!wp->buf) {
|
||||
free(wp);
|
||||
THREAD_LOCK(state2);
|
||||
list_delete(&stack_wp.node);
|
||||
THREAD_UNLOCK(state2);
|
||||
return ERR_NO_MEMORY;
|
||||
}
|
||||
|
||||
// todo: race condtion! a port with the same name could have been created
|
||||
// by another thread at is point.
|
||||
// Avoid a name collision by swapping the temporary placeholder out of the
|
||||
// list for the actual port.
|
||||
THREAD_LOCK(state2);
|
||||
// Let's reserve a stack allocated entry then swap it for the allocated one.
|
||||
list_add_tail(&write_port_list, &wp->node);
|
||||
list_delete(&stack_wp.node);
|
||||
THREAD_UNLOCK(state2);
|
||||
|
||||
*port = (void *)wp;
|
||||
@@ -205,7 +224,8 @@ status_t port_open(const char *name, void *ctx, port_t *port) {
|
||||
THREAD_LOCK(state);
|
||||
write_port_t *wp = NULL;
|
||||
list_for_every_entry(&write_port_list, wp, write_port_t, node) {
|
||||
if (strcmp(wp->name, name) == 0) {
|
||||
if (strcmp(wp->name, name) == 0 &&
|
||||
wp->magic != PORTHOLD_MAGIC) {
|
||||
// found; add read port to write port list.
|
||||
rp->wport = wp;
|
||||
if (wp->buf) {
|
||||
|
||||
Reference in New Issue
Block a user