Commit 5f6abcae authored by Andrew Price's avatar Andrew Price

Replace the body pointer in ipc_message_t with flags, format

flags can be used to set attributes for the message (e.g. replayed)
format can be used to version the on-disk structure of the messages
when we want to add more fields.
parent 02023ddd
......@@ -602,7 +602,7 @@ static void display_content(ipc_message_t *msg)
static void accept_pipe_cmd(ipc_message_t *msg, struct user *mesg_user)
{
enum ipc_types state = msg->head.type;
char *newbuff = msg->body;
char *newbuff = (char *)(msg + 1);
/*printf("\n<message type is %d>\n", state);*/
switch (state) {
......
......@@ -94,7 +94,7 @@ void broadcast(int state, const char *fmt, ...)
json_t * j = json_init(NULL);
json_addstring(j, "text", buff);
json_addstring(j, "type", "status");
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
ipcmsg_transmit(msg);
......
......@@ -23,7 +23,7 @@ int announce_logon(const char *usr, int type, int quiet)
msg = ipcmsg_create(IPC_CHECKONOFF, user->posn);
ipcmsg_destination(msg, SYSTEM_USER);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
ipcmsg_transmit(msg);
return 0;
......@@ -46,7 +46,7 @@ int announce_logoff(const char *usr, int type, const char *agent, const char *re
msg = ipcmsg_create(IPC_CHECKONOFF, user->posn);
ipcmsg_destination(msg, SYSTEM_USER);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
ipcmsg_transmit(msg);
return 0;
......@@ -68,7 +68,7 @@ int announce_join(const char *usr, int channel, int type, const char *agent, int
msg = ipcmsg_create(IPC_CHECKONOFF, user->posn);
ipcmsg_destination(msg, SYSTEM_USER);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
ipcmsg_transmit(msg);
return 0;
......@@ -92,7 +92,7 @@ int announce_leave(const char *usr, int channel, int type, const char *agent, co
msg = ipcmsg_create(IPC_CHECKONOFF, user->posn);
ipcmsg_destination(msg, SYSTEM_USER);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
ipcmsg_transmit(msg);
return 0;
......
......@@ -55,7 +55,7 @@ static void talk_send_to_room(const char * text, int channel, const char * type,
json_addstring(j, "text", text);
json_addstring(j, "type", type);
if (plural > -1) json_addint(j, "plural", plural);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
ipcmsg_transmit(msg);
}
......@@ -65,7 +65,7 @@ void talk_send_shout(char * text){
ipc_message_t * msg = ipcmsg_create(IPC_SAYTOALL, user->posn);
json_t * j = json_init(NULL);
json_addstring(j, "text", text);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
ipcmsg_transmit(msg);
}
......@@ -101,7 +101,7 @@ void talk_sayto(char *text, const char *to, const char *type)
json_addstring(j, "target", to);
json_addstring(j, "type", type);
json_addstring(j, "text", text);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
ipcmsg_transmit(msg);
......@@ -421,7 +421,7 @@ void t_notsayto(CommandList *cm, int argc, const char **argv, char *args)
json_addstring(j, "text", text);
json_addstring(j, "type", "notsayto");
json_addstring(j, "exclude", argv[1]);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
ipcmsg_transmit(msg);
}
......@@ -860,7 +860,7 @@ void t_gag(CommandList *cm, int argc, const char **argv, char *args)
json_addstring(j, "target", argv[1]);
json_addstring(j, "type", "gag");
json_addstring(j, "gag", argv[2]);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
ipcmsg_transmit(msg);
}
......@@ -871,7 +871,7 @@ void t_ungag(CommandList *cm, int argc, const char **argv, char *args)
json_t * j = json_init(NULL);
json_addstring(j, "target", argv[1]);
json_addstring(j, "type", "ungag");
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
ipcmsg_transmit(msg);
}
......@@ -885,7 +885,7 @@ void t_zod(CommandList *cm, int argc, const char **argv, char *args)
json_addstring(j, "type", "zod");
if (excuse!=NULL && !allspace(excuse))
json_addstring(j, "reason", excuse);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
ipcmsg_transmit(msg);
}
......@@ -899,7 +899,7 @@ void t_mrod(CommandList *cm, int argc, const char **argv, char *args)
json_addstring(j, "type", "mrod");
if (excuse!=NULL && !allspace(excuse))
json_addstring(j, "reason", excuse);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
ipcmsg_transmit(msg);
}
......@@ -914,7 +914,7 @@ void t_kick(CommandList *cm, int argc, const char **argv, char *args)
json_addstring(j, "admin", "yes");
if (excuse!=NULL && !allspace(excuse))
json_addstring(j, "reason", excuse);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
ipcmsg_transmit(msg);
}
......@@ -929,7 +929,7 @@ void t_remove(CommandList *cm, int argc, const char **argv, char *args)
json_addstring(j, "admin", "yes");
if (excuse!=NULL && !allspace(excuse))
json_addstring(j, "reason", excuse);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
ipcmsg_transmit(msg);
}
......@@ -1303,7 +1303,7 @@ void t_replay(CommandList *cm, int argc, const char **argv, char *args)
ipc_message_t * msg = ipcmsg_create(IPC_REPLAY, user->posn);
json_t * j = json_init(NULL);
json_addint(j, argv[1], atoll(argv[2]));
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
ipcmsg_transmit(msg);
}
......
......@@ -88,9 +88,9 @@ int ipc_connect(const char *host, const char *port, struct user *user)
ipc_message_t * msg = ipcmsg_create(IPC_HELLO, ipc_user->posn);
uint32_t pid = getpid();
ipcmsg_append(msg, &pid, sizeof(pid));
ipcmsg_append(&msg, &pid, sizeof(pid));
const char *nonce = get_nonce();
ipcmsg_append(msg, nonce, strlen(nonce));
ipcmsg_append(&msg, nonce, strlen(nonce));
ipcmsg_send(msg, ipcsock);
/* now send any queued up messages */
......@@ -160,7 +160,7 @@ unsigned int ipc_send_to_username(const char * dest, enum ipc_types msgtype, con
ssize_t dgram_len = strlen(buff);
ipc_message_t *msg = ipcmsg_create(msgtype, ipc_user->posn);
ipcmsg_destination(msg, uposn);
ipcmsg_append(msg, buff, dgram_len);
ipcmsg_append(&msg, buff, dgram_len);
ipcmsg_send(msg, ipcsock);
} else {
fprintf(stderr, "Failed to utf8_cleanup the message\n");
......
......@@ -103,7 +103,7 @@ void accept_action(ipc_connection_t *conn, ipc_message_t *msg)
}
ipc_message_t *update = ipcmsg_create(IPC_KICK, msg->head.src);
ipcmsg_destination(update, toid);
ipcmsg_append(update, buff, strlen(buff));
ipcmsg_append(&update, buff, strlen(buff));
msg_attach_to_username(update, victim_name);
ipcmsg_destroy(update);
......@@ -111,7 +111,7 @@ void accept_action(ipc_connection_t *conn, ipc_message_t *msg)
json_vaddstring(ej, "text", "%s just tried to MROD %s and failed",
attacker_name, victim_name);
}
ipcmsg_json_encode(event, ej);
ipcmsg_json_encode(&event, ej);
json_decref(ej);
// annouce what happened to everyone else
......@@ -145,14 +145,14 @@ void accept_action(ipc_connection_t *conn, ipc_message_t *msg)
}
ipc_message_t *update = ipcmsg_create(IPC_KICK, msg->head.src);
ipcmsg_destination(update, toid);
ipcmsg_append(update, buff, strlen(buff));
ipcmsg_append(&update, buff, strlen(buff));
msg_attach_to_username(update, victim_name);
ipcmsg_destroy(update);
} else {
json_vaddstring(ej, "text", "%s just tried to ZoD %s and failed",
attacker_name, victim_name);
}
ipcmsg_json_encode(event, ej);
ipcmsg_json_encode(&event, ej);
json_decref(ej);
// annouce what happened to everyone else
......@@ -193,7 +193,7 @@ void accept_action(ipc_connection_t *conn, ipc_message_t *msg)
ipc_message_t *update = ipcmsg_create(IPC_GAG, msg->head.src);
update->head.when = msg->head.when;
ipcmsg_destination(update, toid);
ipcmsg_append(update, buff, strlen(buff));
ipcmsg_append(&update, buff, strlen(buff));
msg_attach_to_username(update, victim_name);
ipcmsg_destroy(update);
......@@ -204,14 +204,14 @@ void accept_action(ipc_connection_t *conn, ipc_message_t *msg)
json_addint(gj, "success", success);
json_addstring(gj, "type", "gag");
json_vaddstring(gj, "text", "%s", gag_gag_msg(gtnum));
ipcmsg_json_encode(gm, gj);
ipcmsg_json_encode(&gm, gj);
msg_attach_to_username(gm, victim_name);
ipcmsg_destroy(gm);
} else {
json_vaddstring(ej, "text", "%s just tried to gag %s with %s and failed",
attacker_name, victim_name, gag_type(gtnum));
}
ipcmsg_json_encode(event, ej);
ipcmsg_json_encode(&event, ej);
json_decref(ej);
// annouce what happened to everyone else
......@@ -241,7 +241,7 @@ void accept_action(ipc_connection_t *conn, ipc_message_t *msg)
ipc_message_t *update = ipcmsg_create(IPC_GAG, msg->head.src);
update->head.when = msg->head.when;
ipcmsg_destination(update, toid);
ipcmsg_append(update, buff, strlen(buff));
ipcmsg_append(&update, buff, strlen(buff));
msg_attach_to_username(update, victim_name);
ipcmsg_destroy(update);
......@@ -252,14 +252,14 @@ void accept_action(ipc_connection_t *conn, ipc_message_t *msg)
json_addint(gj, "success", success);
json_addstring(gj, "type", "ungag");
json_vaddstring(gj, "text", "%s", gag_ungag_msg(oldg));
ipcmsg_json_encode(gm, gj);
ipcmsg_json_encode(&gm, gj);
msg_attach_to_username(gm, victim_name);
ipcmsg_destroy(gm);
} else {
json_vaddstring(ej, "text", "%s just tried to ungag %s and failed",
attacker_name, victim_name);
}
ipcmsg_json_encode(event, ej);
ipcmsg_json_encode(&event, ej);
json_decref(ej);
// annouce what happened to everyone else
......
......@@ -93,7 +93,7 @@ static int write_message(ipc_message_t *msg, unsigned n)
.iov_len = sizeof(*msg)
};
iov[1] = (struct iovec) {
.iov_base = msg->body,
.iov_base = (char *)(msg + 1),
.iov_len = msg->bodylen
};
if (pwritev(fd, iov, 2, 0) != (iov[0].iov_len + iov[1].iov_len))
......@@ -279,7 +279,9 @@ int replay_init(void)
for (unsigned i = 0; i < STORE_SIZE; i++) {
char pathname[STORE_FILE_NAME_LEN];
ipc_message_t *msg;
ipc_message_t mh;
struct stat st;
char *body;
int fd;
replay_fds[i] = -1;
......@@ -298,32 +300,25 @@ int replay_init(void)
close(fd);
continue;
}
msg = malloc(sizeof(*msg));
if (msg == NULL) {
perror("Failed to allocate message header");
close(fd);
return 1;
}
if (pread(fd, msg, sizeof(*msg), 0) != sizeof(*msg)) {
if (pread(fd, &mh, sizeof(mh), 0) != sizeof(mh)) {
perror("Failed to read message file");
close(fd);
return 1;
}
msg->body = malloc(msg->bodylen + 1);
if (msg->body == NULL) {
msg = malloc(sizeof(*msg) + mh.bodylen + 1);
if (msg == NULL) {
perror("Failed to allocate message read buffer");
free(msg);
close(fd);
return 1;
}
if (pread(fd, msg->body, msg->bodylen, sizeof(*msg)) != msg->bodylen) {
if (pread(fd, msg, sizeof(*msg) + mh.bodylen, 0) != sizeof(*msg) + mh.bodylen) {
perror("Failed to read message file");
free(msg->body);
free(msg);
close(fd);
return 1;
}
msg->body[msg->bodylen] = '\0';
body = (char *)(msg + 1);
body[msg->bodylen] = '\0';
if (msg->head.serial > highest_serial) {
highest_serial = msg->head.serial;
store_next = store_wrap(i + 1);
......
......@@ -250,7 +250,7 @@ void send_error(ipc_connection_t *conn, ipc_message_t *orig, const char *format,
json_t * j = json_init(NULL);
json_addstring(j, "text", text);
json_addstring(j, "type", type);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
msg_attach(msg, conn);
ipcmsg_destroy(msg);
json_decref(j);
......@@ -264,7 +264,7 @@ static ipc_message_t *msg_gaglist(void)
for (GagInfo *gi = gaglist; gi->name != NULL; gi++)
json_array_append_new(arr, json_string(gi->name));
ipcmsg_json_encode(msg, arr);
ipcmsg_json_encode(&msg, arr);
json_decref(arr);
return msg;
}
......@@ -272,6 +272,7 @@ static ipc_message_t *msg_gaglist(void)
void process_msg(ipc_connection_t *conn, ipc_message_t *msg)
{
ipcmsg_summary("PROCESS", msg);
char *body = (char *)(msg + 1);
/* client just told us who they are */
if (msg->head.type == IPC_HELLO) {
......@@ -284,10 +285,10 @@ void process_msg(ipc_connection_t *conn, ipc_message_t *msg)
/* user id / userposn is in the header src */
memcpy(&conn->user, &(msg->head.src), sizeof(conn->user));
/* first 4 bytes of body are PID / Session ID */
memcpy(&conn->addr, msg->body, sizeof(conn->addr));
memcpy(&conn->addr, body, sizeof(conn->addr));
/* rest of body is the NONCE,
* check this to make sure we have a matching client */
if (!match_nonce(&msg->body[4])) {
if (!match_nonce(&body[4])) {
printf("Mismatched nonce from fd=%d. dropping.\n", conn->fd);
ipcmsg_destroy(msg);
......@@ -331,7 +332,7 @@ void process_msg(ipc_connection_t *conn, ipc_message_t *msg)
json_t * j = json_init(NULL);
json_addint(j, "uptime", now - uptime);
json_addstring(j, "version", VERSION);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
msg_attach(msg, conn);
ipcmsg_destroy(msg);
json_decref(j);
......@@ -607,7 +608,7 @@ void msg_apply_gag(struct user *from, ipc_message_t * msg, const char * field)
return;
}
json_addstring(j, field, newtext);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
}
......@@ -763,7 +764,7 @@ ipc_message_t * msg_wholist(void)
json_array_append_new(list, j);
}
ipcmsg_json_encode(msg, list);
ipcmsg_json_encode(&msg, list);
json_decref(list);
close(users_fd);
return msg;
......@@ -804,7 +805,7 @@ ipc_message_t * msg_error(const char *type)
json_t * j = json_init(NULL);
json_addstring(j, "error", type);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
return msg;
}
......@@ -24,14 +24,20 @@ ipc_message_t * ipcmsg_create(uint32_t type, uint32_t src)
}
/* add body text to a message, +1 to make C string safe */
void ipcmsg_append(ipc_message_t *msg, const void * data, int len)
void ipcmsg_append(ipc_message_t **_msg, const void * data, int len)
{
if (msg == NULL) return;
int want = msg->bodylen + len + 1;
msg->body = realloc(msg->body, want);
memcpy(&msg->body[msg->bodylen], data, len);
ipc_message_t *msg = *_msg;
char *newseg;
if (msg == NULL)
return;
msg = realloc(msg, sizeof(*msg) + msg->bodylen + len + 1);
newseg = ((char *)(msg + 1)) + msg->bodylen;
memcpy(newseg, data, len);
msg->bodylen += len;
msg->body[ msg->bodylen ] = 0;
newseg[len] = '\0';
*_msg = msg;
}
void ipcmsg_summary(const char *intro, ipc_message_t *msg)
......@@ -60,7 +66,6 @@ void ipcmsg_destroy(ipc_message_t * msg)
if (msg == NULL) return;
if (msg->refcount > 0) return;
if (msg->body != NULL) free(msg->body);
memset(msg, 0, sizeof(ipc_message_t));
free(msg);
}
......@@ -98,7 +103,7 @@ void ipcmsg_send(ipc_message_t *msg, ipc_connection_t *conn)
iovused++;
msg->head.len = msg->bodylen;
if (msg->bodylen > 0) {
iov[1].iov_base = msg->body;
iov[1].iov_base = (char *)(msg + 1);
iov[1].iov_len = msg->bodylen;
iovused++;
}
......@@ -216,7 +221,7 @@ ipc_message_t * read_socket(ipc_connection_t * conn, int doread)
int wanted = conn->incoming->head.len - conn->incoming->bodylen;
int available = conn->i_buffer - used;
int using = _MIN(wanted, available);
ipcmsg_append(conn->incoming, &conn->p_buffer[used], using);
ipcmsg_append(&conn->incoming, &conn->p_buffer[used], using);
used += using;
}
......@@ -246,7 +251,7 @@ json_t * ipcmsg_json_decode(ipc_message_t *msg)
if (msg == NULL) return NULL;
if (msg->bodylen == 0) return NULL;
json_error_t err;
json_t * js = json_loads(msg->body, 0, &err);
json_t * js = json_loads((char *)(msg + 1), 0, &err);
if (js == NULL) {
fprintf(stderr, "JSON error at byte %d: %s\n", err.position, err.text);
......@@ -257,10 +262,11 @@ json_t * ipcmsg_json_decode(ipc_message_t *msg)
}
/** write json object into a message */
int ipcmsg_json_encode(ipc_message_t *msg, json_t *js)
int ipcmsg_json_encode(ipc_message_t **_msg, json_t *js)
{
ipc_message_t *msg = *_msg;
if (msg == NULL) return -1;
// if (msg->bodylen != 0) return -1;
if (js == NULL) return -1;
char * newtext = json_dumps(js, 0);
......@@ -268,13 +274,10 @@ int ipcmsg_json_encode(ipc_message_t *msg, json_t *js)
fprintf(stderr, "JSON error on encode\n");
return -1;
}
if (msg->body != NULL) {
free(msg->body);
msg->body = NULL;
msg->bodylen = 0;
}
msg->body = newtext;
msg->bodylen = strlen(newtext);
msg = realloc(msg, sizeof(*msg) + msg->bodylen + 1);
memcpy(msg + 1, newtext, msg->bodylen + 1);
*_msg = msg;
return 0;
}
......
......@@ -33,7 +33,8 @@ typedef struct {
typedef struct {
ipc_msghead_t head;
char * body;
uint32_t format;
uint32_t flags;
int bodylen;
int refcount;
} ipc_message_t;
......@@ -68,7 +69,7 @@ typedef struct {
/* socket.c */
ipc_message_t *ipcmsg_create(uint32_t type,uint32_t src);
void ipcmsg_append(ipc_message_t *msg, const void *data, int len);
void ipcmsg_append(ipc_message_t **msg, const void *data, int len);
void ipcmsg_destination(ipc_message_t *msg, uint32_t dest);
int msg_queue(ipc_message_t *msg, ipc_connection_t *conn);
void ipcmsg_destroy(ipc_message_t *msg);
......@@ -78,7 +79,7 @@ ipc_connection_t *ipcconn_create(void);
int ipcconn_connect(const char *host, const char *port);
ipc_message_t *read_socket(ipc_connection_t *conn, int doread);
json_t *ipcmsg_json_decode(ipc_message_t *msg);
int ipcmsg_json_encode(ipc_message_t *msg, json_t *js);
int ipcmsg_json_encode(ipc_message_t **msg, json_t *js);
json_t *json_init(ipc_message_t *msg);
int json_vaddstring(json_t *js, const char *key, const char *value, ...);
int json_addstring(json_t *js, const char *key, const char *value);
......
......@@ -133,7 +133,7 @@ static void accept_pipe_cmd(ipc_message_t *ipc, struct user *mesg_user)
/* keep a copy for later */
if (whoinfo != NULL)
free(whoinfo);
whoinfo = strdup(ipc->body);
whoinfo = strdup((char *)(ipc + 1));
whowhen = time(NULL);
/* if we want it, fall through, otherwise stop now */
if (whowant != 0)
......@@ -146,13 +146,14 @@ static void accept_pipe_cmd(ipc_message_t *ipc, struct user *mesg_user)
msg->state = ipc->head.type;
//msg->pid = ipc->head.src;
msg->user = *mesg_user;
msg->text = json_escape(ipc->body);
msg->text = json_escape((char *)(ipc + 1));
msg->serial = ipc->head.serial;
msg->when = ipc->head.when;
list_add_tail(&msg->list, &msglist);
printf("From=%s type=%d msg='%s'\n", mesg_user->record.name, ipc->head.type, ipc->body);
printf("From=%s type=%d msg='%s'\n", mesg_user->record.name, ipc->head.type,
(char *)(ipc + 1));
}
static int handle_command(CONNECTION *co);
......@@ -449,7 +450,7 @@ static int handle_command(CONNECTION *co)
json_addstring(j, "target", to);
json_addstring(j, "type", type);
json_addstring(j, "text", text);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
ipcmsg_transmit(msg);
......
......@@ -82,7 +82,7 @@ void talk_send_to_room(const char * text, int channel, const char * type, int pl
json_addstring(j, "text", text);
json_addstring(j, "type", type);
if (plural > -1) json_addint(j, "plural", plural);
ipcmsg_json_encode(msg, j);
ipcmsg_json_encode(&msg, j);
json_decref(j);
ipcmsg_transmit(msg);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment