init: initial commit

This commit is contained in:
Upkeep 2022-10-15 17:24:32 +02:00
commit 6a366d1c43
Signed by: dvdrw
GPG Key ID: 044B8425E3CD03E0
16 changed files with 2219 additions and 0 deletions

56
.gitignore vendored Normal file
View File

@ -0,0 +1,56 @@
.ccls-cache
/build
# http://www.gnu.org/software/automake
Makefile.in
/ar-lib
/mdate-sh
/py-compile
/test-driver
/ylwrap
.deps/
.dirstamp
# http://www.gnu.org/software/autoconf
autom4te.cache
/autoscan.log
/autoscan-*.log
/aclocal.m4
/compile
/config.cache
/config.guess
/config.h.in
/config.log
/config.status
/config.sub
/configure
/configure.scan
/configure~
/depcomp
/install-sh
/missing
/stamp-h1
# https://www.gnu.org/software/libtool/
/ltmain.sh
# http://www.gnu.org/software/texinfo
/texinfo.tex
# http://www.gnu.org/software/m4/
m4/libtool.m4
m4/ltoptions.m4
m4/ltsugar.m4
m4/ltversion.m4
m4/lt~obsolete.m4
# Generated Makefile
# (meta build system like autotools,
# can automatically generate from config.status script
# (which is called by configure script))
Makefile

5
Makefile.am Normal file
View File

@ -0,0 +1,5 @@
AC_CFLAGS = -fno-strict-aliasing
SUBDIRS = src
ACLOCAL_AMFLAGS = -I m4

27
configure.ac Normal file
View File

@ -0,0 +1,27 @@
AC_INIT([ipsp-rpc], [0.1], [bug-report@address])
AM_INIT_AUTOMAKE([foreign subdir-objects -Wall -Werror])
AC_PROG_C
AM_PROG_AR
LT_INIT
AC_CONFIG_MACRO_DIRS([m4])
AC_SEARCH_LIBS([uv_loop_init], [uv], [],
[AC_MSG_ERROR(Missing the libuv library.)])
AC_ARG_WITH([mpack],
[AS_HELP_STRING([--without-mpack],
[disable support for mpack])],
[],
[with_mpack=yes])
AS_IF([test "x$with_mpack" = xyes],
[dnl AC_CONFIG_SUBDIRS([mpack])
AC_SUBST([HAVE_MPACK], [1])],
[AC_SUBST([HAVE_MPACK], [0])])
AC_CONFIG_HEADERS([config.h])
AC_CONFIG_FILES([Makefile src/Makefile])
AC_OUTPUT

8
src/Makefile.am Normal file
View File

@ -0,0 +1,8 @@
lib_LTLIBRARIES = libipsp.la
libipsp_la_SOURCES = htonl.h uthash.h dynarr.h dynarr.c linked_list.h linked_list.c transport/tcp.h transport/tcp.c server.c server.h
pkginclude_HEADERS = server.h
bin_PROGRAMS = ipsptest
ipsptest_SOURCES = test/main.c
ipsptest_LDADD = libipsp.la

5
src/defer.h Normal file
View File

@ -0,0 +1,5 @@
#ifndef RPC_DEFER_H
#define RPC_DEFER_H
#endif // RPC_DEFER_H

67
src/dynarr.c Normal file
View File

@ -0,0 +1,67 @@
#include "dynarr.h"
#include <string.h>
int
dynarr_init(dynarr_t *d, size_t n)
{
void *data = calloc(n, 1);
if(data == NULL) return -1;
d->data = data;
d->cap = n;
d->sz = 0;
return 0;
}
int
dynarr_release(dynarr_t *d)
{
free(d->data);
return 0;
}
int
dynarr_reserve(dynarr_t *d, size_t cap)
{
if(d->cap >= cap) return 0;
void *ndata = realloc(d->data, cap);
if(ndata == NULL) return -1;
d->cap = cap;
d->data = ndata;
return 0;
}
int
dynarr_clear(dynarr_t *d, u_int8_t c)
{
memset(d->data, c, d->cap);
d->sz = 0;
return 0;
}
int
dynarr_shrink(dynarr_t *d)
{
if(d->sz == d->cap) return 0;
void *ndata = realloc(d->data, d->sz);
if(ndata == NULL) return -1;
d->cap = d->sz;
d->data = ndata;
return 0;
}
int
dynarr_append(dynarr_t *d, void *buf, size_t sz)
{
if(d->sz + sz > d->cap)
dynarr_reserve(d,(d->sz + sz) < (2 * d->sz)
? (2 * d->sz) : (d->sz + sz));
memcpy(d->data + d->sz, buf, sz);
d->sz += sz;
return 0;
}

53
src/dynarr.h Normal file
View File

@ -0,0 +1,53 @@
#ifndef RPC_DYNARR_H
#define RPC_DYNARR_H
#include <stdlib.h>
typedef struct dynarr_s {
void *data;
size_t sz, cap;
} dynarr_t;
/**
* Initializes a dynarr_t with at least #n bytes of capacity.
* Returns -1 if unable to allocate enough memory.
*/
int
dynarr_init(dynarr_t *d, size_t n);
/**
* Releases allocated memory associated with dynarr_t #d.
*/
int
dynarr_release(dynarr_t *d);
/**
* Fill the buffer #d with character #c and set its size to 0.
*/
int
dynarr_clear(dynarr_t *d, u_int8_t c);
/**
* Reserves at least #cap bytes in #d's buffer.
* Returns -1 if unable to allocate enough memory.
*/
int
dynarr_reserve(dynarr_t *d, size_t cap);
/**
* Shrinks #d's buffer to be as small as the data inside. No-op if capacity
* matches size.
*/
int
dynarr_shrink(dynarr_t *d);
/**
* Copies over #sz bytes from #buf to the end of #d's data, possibly expanding
* the buffer by at least the amount necessary to store both the data and #buf.
*
* Returns -1 if unable to allocate enough memory.
*/
int
dynarr_append(dynarr_t *d, void *buf, size_t sz);
#endif // RPC_DYNARR_H

12
src/htonl.h Normal file
View File

@ -0,0 +1,12 @@
#ifndef RPC_HTONL_H
#define RPC_HTONL_H
#include <config.h>
#ifdef HAVE_UNISTD_H
#include <arpa/inet.h>
#else
#include <winsock.h>
#endif
#endif //RPC_HTONL_H

50
src/linked_list.c Normal file
View File

@ -0,0 +1,50 @@
#include "linked_list.h"
list_node_t *
llist_node(void *data)
{
list_node_t *node = malloc(sizeof(list_node_t));
if(node == NULL) return node;
node->data = data;
node->next = NULL;
return node;
}
list_node_t *
llist_append(list_node_t *list, void *data)
{
for(; list->next != NULL; list = list->next);
list_node_t* new = llist_node(data);
list->next = new;
return new;
}
list_node_t *
llist_prepend(list_node_t **list, void *data)
{
list_node_t* new = llist_node(data);
if(new != NULL) {
new->next = *list;
*list = new;
}
return new;
}
void *
llist_at(list_node_t *list, size_t n)
{
for(int i = 0; i < n && list != NULL; list = list->next, i++);
return list;
}
size_t
llist_length(const list_node_t *list)
{
int i = 0;
for(; list != NULL; list = list->next, ++i);
return i;
}

32
src/linked_list.h Normal file
View File

@ -0,0 +1,32 @@
#ifndef RPC_LINKED_LIST_H
#define RPC_LINKED_LIST_H
#include <stdlib.h>
typedef struct list_node_s {
struct list_node_s *next;
void *data;
} list_node_t;
list_node_t *
llist_node(void *data);
list_node_t *
llist_append(list_node_t *list, void *data);
list_node_t *
llist_prepend(list_node_t **list, void *data);
/**
* Inserts at #n, or appends if shorter than #n.
*/
list_node_t *
llist_insert(list_node_t **list, size_t n, void *data);
void *
llist_at(list_node_t *list, size_t n);
size_t
llist_length(const list_node_t *list);
#endif // RPC_LINKED_LIST_H

0
src/server.c Normal file
View File

10
src/server.h Normal file
View File

@ -0,0 +1,10 @@
#ifndef RPC_SERVER_H
#define RPC_SERVER_H
#include <uv.h>
typedef struct {
} server_t;
#endif //RPC_SERVER_H

67
src/test/main.c Normal file
View File

@ -0,0 +1,67 @@
#include <stdio.h>
#include <uv.h>
#include <stdlib.h>
#include <signal.h>
#include "../server.h"
#include "../transport/tcp.h"
static uv_loop_t loop;
static uv_tcp_t server_sock;
static rpc_tcp_server_t *server;
static void
catch_sigint(int signal)
{
rpc_tcp_close(server, NULL, NULL);
uv_loop_close(&loop);
exit(0);
}
static void
on_msg(rpc_tcp_stream_t *stream, int status, void *buf, ssize_t n)
{
if(status < 0) {
fprintf(stderr, "on_msg() recieved status = %d, %s\n",
status, uv_err_name(status));
if(n == -1)
rpc_tcp_conn_close((rpc_tcp_conn_t*)stream, NULL, NULL);
return;
}
printf("on_msg() recieved message on stream (%d): ",
stream == NULL ? 0 : stream->id);
printf("%.*s\n", (int)n, (char *)buf);
/* rpc_tcp_conn_close(stream->owner, NULL, NULL); */
free(buf);
}
static void
on_new_conn(rpc_tcp_conn_t *conn, int status)
{
rpc_tcp_conn_read_start(conn, on_msg);
}
int
main()
{
uv_loop_init(&loop);
printf("rpc: main() started\n\n");
uv_tcp_t server_sock;
uv_tcp_init(&loop, &server_sock);
struct sockaddr_in s;
uv_ip4_addr("0.0.0.0", 4321, &s);
uv_tcp_bind(&server_sock, (const struct sockaddr*)&s, 0);
server = rpc_tcp_listen(&server_sock, 10, on_new_conn);
signal(SIGINT, catch_sigint);
uv_run(&loop, UV_RUN_DEFAULT);
uv_loop_close(&loop);
return 0;
}

530
src/transport/tcp.c Normal file
View File

@ -0,0 +1,530 @@
#include "tcp.h"
#include "../htonl.h"
#include "../uthash.h"
#include "../dynarr.h"
#include <stdlib.h>
typedef struct {
rpc_tcp_conn_new_cb new_conn;
rpc_tcp_server_t *server;
} rpc_tcp_state_t;
static void
init_tcp_server(rpc_tcp_server_t *s, uv_tcp_t *sock)
{
s->_conns = NULL;
s->_sock = sock;
}
static void
init_tcp_state(rpc_tcp_state_t *s, rpc_tcp_conn_new_cb cb,
rpc_tcp_server_t *server)
{
s->new_conn = cb;
s->server = server;
return;
}
static void
init_tcp_conn(rpc_tcp_conn_t *c, rpc_tcp_server_t *_server,
UT_hash_handle *hh, uv_tcp_t *_client, dynarr_t *buf)
{
c->_client = _client;
c->_server = _server;
c->_hh = hh;
c->_buf = buf;
c->_streams = NULL;
c->stream_id = 0;
}
static void
on_new_connection(uv_stream_t *stream, int status)
{
rpc_tcp_conn_new_cb cb = ((rpc_tcp_state_t *)stream->data)->new_conn;
rpc_tcp_server_t *server = ((rpc_tcp_state_t *)stream->data)->server;
if(status < 0) {
goto ERR_CB;
}
uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
if(client == NULL) {
status = -1;
goto ERR_CB;
}
uv_tcp_init(stream->loop, client);
status = uv_accept(stream, (uv_stream_t*) client);
if(status < 0) {
goto FREE_CLIENT;
}
rpc_tcp_conn_t *conn = malloc(sizeof(rpc_tcp_conn_t));
if(conn == NULL) goto FREE_CLIENT;
dynarr_t *buf = malloc(sizeof(dynarr_t));
if(buf == NULL) { status = -1; goto FREE_CONNECTION; }
status = dynarr_init(buf, 65536);
if(status < 0) { goto FREE_BUF; }
UT_hash_handle *hh = malloc(sizeof(UT_hash_handle));
if(hh == NULL) { status = -1; goto FREE_BUF; }
init_tcp_conn(conn, server, hh, client, buf);
// Keep a map of all connections, indexed by their client socket ptr
HASH_ADD(_hh, server->_conns, _client, sizeof(void*), conn);
cb(conn, 0);
return;
FREE_BUF:
free(buf);
FREE_CONNECTION:
free(conn);
FREE_CLIENT:
free(client);
ERR_CB:
free(stream->data);
cb(NULL, status);
}
rpc_tcp_server_t *
rpc_tcp_listen(uv_tcp_t *sock, int backlog, rpc_tcp_conn_new_cb cb)
{
rpc_tcp_server_t *ret = NULL;
rpc_tcp_server_t *server = malloc(sizeof(rpc_tcp_server_t));
if(server == NULL) goto RET; else ret = server;
init_tcp_server(server, sock);
rpc_tcp_state_t *state = malloc(sizeof(rpc_tcp_state_t));
if(state == NULL) { ret = NULL; goto FREE_SERVER; }
init_tcp_state(state, cb, server);
sock->data = state;
int r = uv_listen((uv_stream_t *)sock, backlog, on_new_connection);
if(r < 0) { ret = NULL; goto FREE_STATE; }
// `ret' should contain a pointer to the server at this point
goto RET;
FREE_STATE:
free(state);
FREE_SERVER:
free(server);
RET:
return ret;
}
static void
cleanup_conn(uv_handle_t *h)
{
rpc_tcp_conn_t *conn = h->data;
dynarr_release(conn->_buf);
free(conn->_buf);
free(conn->_client);
free(conn->_hh);
free(conn);
}
int
rpc_tcp_conn_close(rpc_tcp_conn_t *conn, rpc_tcp_conn_close_cb conn_cb,
rpc_tcp_stream_close_cb stream_cb)
{
rpc_tcp_stream_t *stream, *tmp;
HASH_ITER(_hh, conn->_streams, stream, tmp) {
HASH_DELETE(_hh, conn->_streams, stream);
if(stream_cb != NULL) stream_cb(stream, 0);
free(stream);
free(stream->_hh);
}
HASH_DELETE(_hh, conn->_server->_conns, conn);
if(conn_cb != NULL) conn_cb(conn, 0);
conn->_client->data = conn;
uv_close((uv_handle_t*)conn->_client, cleanup_conn);
return 0;
}
static void
cleanup_server(uv_handle_t *h)
{
// All connections, streams, and hashtable should have been closed & freed
// at this point. This only leaves the struct itself.
rpc_tcp_server_t *s = h->data;
free(s);
}
int
rpc_tcp_close(rpc_tcp_server_t *server, rpc_tcp_conn_close_cb conn_cb,
rpc_tcp_stream_close_cb stream_cb)
{
rpc_tcp_conn_t *conn, *tmp;
HASH_ITER(_hh, server->_conns, conn, tmp) {
HASH_DELETE(_hh, server->_conns, conn);
// Copy-paste of rpc_tcp_conn_close(), sans removing itself from parent
// server
{
rpc_tcp_stream_t *stream, *tmp;
HASH_ITER(_hh, conn->_streams, stream, tmp) {
HASH_DELETE(_hh, conn->_streams, stream);
if(stream_cb != NULL) stream_cb(stream, 0);
free(stream);
free(stream->_hh);
}
if(conn_cb != NULL) conn_cb(conn, 0);
conn->_client->data = conn;
uv_close((uv_handle_t*)conn->_client, cleanup_conn);
}
}
// Allocated in `rpc_tcp_listen()' to pass user-provided callbacks, etc.
free(server->_sock->data);
server->_sock->data = server;
uv_close((uv_handle_t*)server->_sock, cleanup_server);
return 0;
}
typedef struct {
rpc_tcp_stream_new_cb cb;
rpc_tcp_stream_t *s;
int r;
} rpc_new_stream_state_t;
static void
init_tcp_stream(rpc_tcp_stream_t *s,
uv_stream_t *_stream,
rpc_tcp_conn_t *_owner,
UT_hash_handle *_hh,
uint16_t id)
{
s->_stream = _stream;
s->owner = _owner;
s->id = id;
}
static void
defer_new_stream_cb(uv_timer_t *t)
{
rpc_new_stream_state_t *state = t->data;
rpc_tcp_stream_new_cb cb = state->cb;
rpc_tcp_stream_t *s = state->s;
int r = state->r;
uv_close((uv_handle_t *)t, (uv_close_cb)free);
free(state);
cb(s, r);
}
int
rpc_tcp_stream_open(rpc_tcp_conn_t *conn, rpc_tcp_stream_new_cb cb)
{
int status = 0;
rpc_tcp_stream_t *stream = malloc(sizeof(rpc_tcp_stream_t));
if(stream == NULL) {
status = -1;
goto RET;
}
// TODO: keep track of and reuse freed stream ids
UT_hash_handle *hh = malloc(sizeof(UT_hash_handle));
if(hh == NULL) { status = -1; goto FREE_STREAM; }
init_tcp_stream(stream, (uv_stream_t *)conn->_client,
conn, hh, conn->stream_id++);
// Keep track of all streams by their id in a hashmap
HASH_ADD(_hh, conn->_streams, id, 2, stream);
// TODO: notify peer of new stream instead of just tagging frames
uv_timer_t *t = malloc(sizeof(uv_timer_t));
if(t == NULL) {
status = -1;
goto FREE_HH;
}
// Yield execution with a next-iteration timer
uv_timer_init(conn->_server->_sock->loop, t);
uv_timer_start(t, defer_new_stream_cb, 0, 0);
goto RET;
FREE_HH:
free(hh);
FREE_STREAM:
free(stream);
RET:
return status;
}
int
rpc_tcp_stream_close(rpc_tcp_stream_t *stream, rpc_tcp_stream_close_cb cb)
{
// Delete the stream from the owning connection's hashmap
HASH_DELETE(_hh, stream->owner->_streams, stream);
if(cb != NULL) cb(stream, 0);
free(stream->_hh);
free(stream);
return 0;
}
typedef struct {
rpc_tcp_stream_msg_cb cb;
rpc_tcp_stream_t *stream;
uv_buf_t *bufs;
} stream_send_state_t;
static void
on_write_complete(uv_write_t *handle, int status)
{
stream_send_state_t *data = handle->data;
rpc_tcp_stream_msg_cb cb = data->cb;
rpc_tcp_stream_t *stream = data->stream;
uv_buf_t buf = data->bufs[1];
free(data->bufs[0].base);
free(data->bufs);
free(data);
uv_close((uv_handle_t*)handle, (uv_close_cb)free);
cb(stream, status, buf.base, buf.len);
}
int
rpc_tcp_stream_send(rpc_tcp_stream_t *stream, void *buf, uint32_t n,
rpc_tcp_stream_msg_cb cb)
{
int status = 0;
uv_write_t *w = malloc(sizeof(uv_write_t));
if(w == NULL) {
status = -1;
goto RET;
}
uv_buf_t *bufs = malloc(sizeof(uv_buf_t) * 2);
if(bufs == NULL) {
status = -1;
goto FREE_REQ;
}
rpc_tcp_msg_header *header = malloc(sizeof(rpc_tcp_msg_header));
if(header == NULL) {
status = -1;
goto FREE_BUFS;
}
stream_send_state_t *data = malloc(sizeof(stream_send_state_t));
if(data == NULL) {
status = -1;
goto FREE_HEADER;
}
data->cb = cb;
data->stream = stream;
data->bufs = bufs;
header->sz = htonl(n);
header->id = htons(stream->id);
bufs[0].base = (char *)header;
bufs[0].len = sizeof(rpc_tcp_msg_header);
bufs[1].base = buf;
bufs[1].len = n;
uv_write(w, stream->_stream, bufs, 2, on_write_complete);
return 0;
FREE_HEADER:
free(header);
FREE_BUFS:
free(bufs);
FREE_REQ:
free(w);
RET:
return status;
}
typedef struct {
rpc_tcp_conn_t *owner;
size_t total_sz, left_sz;
uint16_t stream_id;
unsigned char is_reading_msg;
} read_state;
static void
read_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
{
buf->base = malloc(suggested_size);
buf->len = suggested_size;
}
static void
read_init(read_state *s, void *buf, ssize_t nread);
static void
read_body(read_state *s, void *buf, ssize_t nread)
{
rpc_tcp_conn_t *c = s->owner;
ssize_t leftover = s->left_sz - nread;
#define HAND_OFF_MSG \
void *message = malloc(s->total_sz); \
memcpy(message, c->_buf->data, c->_buf->sz); \
dynarr_clear(c->_buf, 0); \
if (s->stream_id == 0) { \
if(c->_recv != NULL) \
c->_recv(NULL, 0, message, s->total_sz); \
else free(message); \
} \
else if(s->stream_id != 0) { \
rpc_tcp_stream_t *stream = NULL; \
HASH_FIND(_hh, c->_streams, &s->stream_id, sizeof(s->stream_id), \
stream); \
if(stream != NULL) { \
if(stream->_recv != NULL) \
c->_recv(stream, 0, message, s->total_sz); \
else free(message); \
} else free(message); \
}
// We've read the entire message, exactly
if (leftover == 0) {
dynarr_append(c->_buf, buf, s->left_sz);
HAND_OFF_MSG;
}
// We've read a chunk of the message
else if(leftover > 0) {
dynarr_append(c->_buf, buf, s->left_sz);
}
// We've read more than just the current message
else if (leftover < 0) {
dynarr_append(c->_buf, buf, s->left_sz);
HAND_OFF_MSG;
s->is_reading_msg = 0;
read_init(s, buf + (nread + leftover), -leftover);
}
#undef HAND_OFF_MSG
}
static void
read_init(read_state *s, void *buf, ssize_t nread)
{
if(nread < sizeof(rpc_tcp_msg_header)) {
// TODO: handle this case
fprintf(stderr, "read_cb(stub): initial message packet smaller "
"than header");
exit(2);
}
s->is_reading_msg = 1;
s->total_sz = ntohl(*(uint32_t *)(buf + offsetof(rpc_tcp_msg_header, sz)));
s->stream_id = ntohs(*(uint16_t *)(buf + offsetof(rpc_tcp_msg_header, id)));
s->left_sz = s->total_sz;
size_t leftover = nread - sizeof(rpc_tcp_msg_header);
if(leftover > 0) {
read_body(s, buf + sizeof(rpc_tcp_msg_header), leftover);
}
}
static void
read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
{
if(nread == 0) goto FREE_BUF;
if(nread < 0) {
read_state *s = stream->data;
s->is_reading_msg = 0;
dynarr_clear(s->owner->_buf, 0);
s->owner->_recv((rpc_tcp_stream_t*)s->owner, nread, NULL, -1);
// Pipe broken: prepare to destroy the connection
if(nread == UV_EOF) {
free(s);
}
goto FREE_BUF;
}
read_state *s = (read_state *)stream->data;
rpc_tcp_conn_t *c = s->owner;
if(s->is_reading_msg) {
// Already in the middle of reading a message
read_body(s, buf->base, nread);
} else {
// Read the message starting from the header
read_init(s, buf->base, nread);
}
FREE_BUF:
if(buf != NULL && buf->base != NULL)
free(buf->base);
}
int
rpc_tcp_conn_read_start(rpc_tcp_conn_t *conn, rpc_tcp_stream_msg_cb cb)
{
read_state *state = malloc(sizeof(read_state));
if(state == NULL) return -1;
*state = (read_state){
.owner = conn,
.total_sz = 0,
.left_sz = 0,
.is_reading_msg = 0
};
conn->_client->data = state;
conn->_recv = cb;
uv_read_start((uv_stream_t*)conn->_client, read_alloc, read_cb);
}
int
rpc_tcp_conn_read_stop(rpc_tcp_conn_t *conn)
{
conn->_recv = NULL;
rpc_tcp_conn_read_stop_all(conn);
free(conn->_client->data);
return 0;
}
int
rpc_tcp_conn_read_stop_all(rpc_tcp_conn_t *conn)
{
rpc_tcp_stream_t *stream, *tmp;
HASH_ITER(_hh, conn->_streams, stream, tmp) {
stream->_recv = NULL;
}
}
int
rpc_tcp_stream_read_start(rpc_tcp_stream_t *stream, rpc_tcp_stream_msg_cb cb)
{
stream->_recv = cb;
return 0;
}
int
rpc_tcp_stream_read_stop(rpc_tcp_stream_t *stream)
{
stream->_recv = NULL;
return 0;
}

157
src/transport/tcp.h Normal file
View File

@ -0,0 +1,157 @@
#ifndef RPC_TRANSPORT_H
#define RPC_TRANSPORT_H
#include <uv.h>
#pragma pack(1)
typedef struct {
uint32_t sz;
uint16_t id;
} rpc_tcp_msg_header;
// Forward decl for opaque hashtable impl
typedef struct UT_hash_handle UT_hash_handle;
typedef struct rpc_tcp_conn_s rpc_tcp_conn_t;
typedef struct rpc_tcp_server_s rpc_tcp_server_t;
typedef struct rpc_tcp_stream_s rpc_tcp_stream_t;
typedef void(*rpc_tcp_stream_msg_cb)(rpc_tcp_stream_t*, int status,
void *buf, ssize_t n);
typedef rpc_tcp_stream_msg_cb rpc_tcp_msg_cb;
typedef struct rpc_tcp_stream_s {
void *data;
uint16_t id;
rpc_tcp_msg_cb _recv;
uv_stream_t *_stream;
UT_hash_handle *_hh;
rpc_tcp_conn_t *owner;
} rpc_tcp_stream_t;
typedef struct dynarr_s dynarr_t;
typedef struct rpc_tcp_conn_s {
void *data;
uint16_t stream_id;
dynarr_t *_buf;
rpc_tcp_msg_cb _recv;
UT_hash_handle *_hh;
rpc_tcp_server_t *_server;
uv_tcp_t *_client;
rpc_tcp_stream_t *_streams;
} rpc_tcp_conn_t;
typedef struct rpc_tcp_server_s {
rpc_tcp_conn_t *_conns;
uv_tcp_t *_sock;
} rpc_tcp_server_t;
typedef void(*rpc_tcp_server_close_cb)(rpc_tcp_server_t*, int status);
typedef void(*rpc_tcp_conn_new_cb)(rpc_tcp_conn_t*, int status);
typedef void(*rpc_tcp_conn_close_cb)(rpc_tcp_conn_t*, int status);
typedef void(*rpc_tcp_stream_new_cb)(rpc_tcp_stream_t*, int status);
typedef void(*rpc_tcp_stream_close_cb)(rpc_tcp_stream_t*, int status);
/**
* Upgrade a bound uv_tcp_t socket #sock into an #rpc_tcp_server_t.
* Callback #cb will be called on each new connection.
*/
rpc_tcp_server_t *
rpc_tcp_listen(uv_tcp_t *sock, int backlog, rpc_tcp_conn_new_cb cb);
/**
* Closes a TCP server #server.
*
* #conn_cb (#stream_cb) will be called when closing each connection (stream),
* to give you a chance to release any user-allocated data. You should free any
* user-allocated resources on the server before calling this function.
*
* All pointers to the server, and any connection or stream belonging to the
* server are invalidated after a call to this function.
*/
int
rpc_tcp_close(rpc_tcp_server_t *server, rpc_tcp_conn_close_cb conn_cb,
rpc_tcp_stream_close_cb stream_cb);
/**
* Closes the connection #conn and all its streams.
*
* #stream_cb will be called when closing each stream to give you a chance to
* release any user-allocated data. #conn_cb will be called when closing the
* connection to give you a chance to release any user-allocated data.
*
* The connection will be removed from its server after this call. All pointers
* to the connection or its streams are invalidated after a call to this
* function.
*
*/
int
rpc_tcp_conn_close(rpc_tcp_conn_t *conn, rpc_tcp_conn_close_cb conn_cb,
rpc_tcp_stream_close_cb stream_cb);
/**
* Starts reading for messages on the connection's default stream.
*
* Callback stream pointer argument will be NULL. Ownership of the returned
* buffer (if `status == 0') is transferred to the callback.
*/
int
rpc_tcp_conn_read_start(rpc_tcp_conn_t *conn, rpc_tcp_stream_msg_cb cb);
/**
* Stops reading for messages on the connection altogether.
*/
int
rpc_tcp_conn_read_stop(rpc_tcp_conn_t *conn);
/**
* Stops reading for messages on all of the connection's streams (but not the
* connection itself).
*/
int
rpc_tcp_conn_read_stop_all(rpc_tcp_conn_t *conn);
/**
* Opens a stream on this connection. #cb will be called with the opened stream
* if successful, or `status < 0' otherwise.
*/
int
rpc_tcp_stream_open(rpc_tcp_conn_t *conn, rpc_tcp_stream_new_cb cb);
/**
* Closes the stream #stream. #cb will called when closing the stream to give
* you a change to release any user-allocated data.
*
* The stream will be removed from its connection after this call. All pointers
* to the stream are invalidated after a call to this function.
*/
int
rpc_tcp_stream_close(rpc_tcp_stream_t *stream, rpc_tcp_stream_close_cb cb);
/**
* Send message #buf of length #n over stream #stream. Calls #cb after
* finishing.
*
* The provided buffer and length will be returned untouched to the callback,
* with `status == 0' if successful.
*/
int
rpc_tcp_stream_send(rpc_tcp_stream_t *stream, void *buf, uint32_t n,
rpc_tcp_stream_msg_cb cb);
/**
* Start listening to messages on #stream with #cb.
*/
int
rpc_tcp_stream_read_start(rpc_tcp_stream_t *stream, rpc_tcp_stream_msg_cb cb);
/**
* Stop listening to messages on #stream.
*/
int
rpc_tcp_stream_read_stop(rpc_tcp_stream_t *stream);
#endif //RPC_TRANSPORT_H

1140
src/uthash.h Normal file

File diff suppressed because it is too large Load Diff