Commit 822ce86e authored by Andrew Price's avatar Andrew Price
Browse files

Make the replay cache persistent

Just a simple store that uses one file per message.
parent f533e1f4
Loading
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -155,7 +155,8 @@ int main(int argc, char **argv)
	if (err)
		return err;

	init_server();
	if (init_server() != 0)
		return 1;

	mainsock = open_mainsock(cfg_get_int("port"));

+113 −16
Original line number Diff line number Diff line
@@ -2,6 +2,7 @@
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
@@ -23,7 +24,14 @@
#include "servsock.h"
#include "replay.h"

#define REPLAY_DIR STATEDIR "/replay"

#define _STR(x) #x
#define STR(x) _STR(x)
#define STORE_SIZE 1000
#define STORE_SIZE_STR STR(STORE_SIZE)
#define STORE_SIZE_LEN (sizeof(STORE_SIZE_STR) - 1)
#define STORE_FILE_NAME_LEN (sizeof(REPLAY_DIR) + STORE_SIZE_LEN + 1)

static uint64_t serial = 0;

@@ -31,14 +39,6 @@ static ipc_message_t ** store = NULL;
static int store_next = 0;
static int store_len = 0;

/* look at the history and set the serial number
 * appropriately.
 */
void load_serial(void)
{
	serial = 1;
}

static int store_wrap(int index)
{
	while (index < 0) index += STORE_SIZE;
@@ -46,16 +46,36 @@ static int store_wrap(int index)
	return index;
}

/* store the message for later replay */
void store_message(ipc_message_t *msg)
static int write_message(ipc_message_t *msg, unsigned n)
{
	if (store == NULL) {
		/* create the store */
		store = calloc(STORE_SIZE, sizeof(ipc_message_t *));
		store_next = 0;
		store_len = 0;
	char pathname[STORE_FILE_NAME_LEN];
	struct iovec iov[2];
	int fd;

	sprintf(pathname, REPLAY_DIR "/%0*u", (int)STORE_SIZE_LEN, n);
	fd = open(pathname, O_RDWR|O_CLOEXEC|O_CREAT|O_TRUNC, S_IRUSR|S_IWUSR);
	if (fd < 0) {
		perror(pathname);
		return 1;
	}
	iov[0] = (struct iovec) {
		.iov_base = msg,
		.iov_len = sizeof(*msg)
	};
	iov[1] = (struct iovec) {
		.iov_base = msg->body,
		.iov_len = msg->bodylen
	};
	if (pwritev(fd, iov, 2, 0) != (iov[0].iov_len + iov[1].iov_len))
		perror(pathname);
	fsync(fd);
	close(fd);
	return 0;
}

/* store the message for later replay */
void store_message(ipc_message_t *msg)
{
	/* only store info/message, not actions */
	if (msg->head.type <= 26 &&
	   !( msg->head.type == IPC_TEXT || msg->head.type == IPC_WIZ))
@@ -69,6 +89,7 @@ void store_message(ipc_message_t *msg)
		store[store_next] = NULL;
	}

	write_message(msg, store_next);
	/* add to ref count so it wont get cleaned away yet
	 * insert it at the current location and bump pointers
	 */
@@ -84,7 +105,6 @@ void store_message(ipc_message_t *msg)
 */
void assign_serial( ipc_message_t *msg )
{
	if (serial == 0) load_serial();
	msg->head.serial = serial++;
	msg->head.when = time(NULL);
}
@@ -203,3 +223,80 @@ void replay(ipc_connection_t *conn, ipc_message_t *msg)

	return;
}

int replay_init(void)
{
	uint64_t highest_serial = 0;
	int ret;

	store = calloc(STORE_SIZE, sizeof(ipc_message_t *));
	if (store == NULL) {
		perror("Failed to allocate replay store");
		return 1;
	}
	store_next = 0;
	store_len = 0;

	ret = mkdir(REPLAY_DIR, S_IRWXU|S_IXGRP|S_IXOTH);
	if (ret == -1 && errno != EEXIST) {
		perror(REPLAY_DIR);
		return 1;
	}
	for (unsigned i = 0; i < STORE_SIZE; i++) {
		char pathname[STORE_FILE_NAME_LEN];
		ipc_message_t *msg;
		struct stat st;
		int fd;

		sprintf(pathname, REPLAY_DIR "/%0*u", (int)STORE_SIZE_LEN, i);
		fd = open(pathname, O_RDONLY|O_CLOEXEC|O_CREAT, S_IRUSR|S_IWUSR);
		if (fd < 0) {
			perror(pathname);
			return 1;
		}
		if (fstat(fd, &st) != 0) {
			perror(pathname);
			close(fd);
			return 1;
		}
		if (st.st_size == 0) {
			close(fd);
			continue;
		}
		msg = malloc(sizeof(*msg));
		if (msg == NULL) {
			perror("Failed to allocate message header");
			close(fd);
			return 1;
		}
		if (pread(fd, msg, sizeof(*msg), 0) != sizeof(*msg)) {
			perror("Failed to read message file");
			close(fd);
			return 1;
		}
		msg->body = malloc(msg->bodylen);
		if (msg->body == NULL) {
			perror("Failed to allocate message read buffer");
			free(msg);
			close(fd);
			return 1;
		}
		if (pread(fd, msg->body, msg->bodylen, sizeof(*msg)) != msg->bodylen) {
			perror("Failed to read message file");
			free(msg->body);
			free(msg);
			close(fd);
			return 1;
		}
		if (msg->head.serial > highest_serial) {
			highest_serial = msg->head.serial;
			store_next = store_wrap(i + 1);
		}
		msg->refcount = 1;
		store_len++;
		store[i] = msg;
		close(fd);
	}
	serial = highest_serial + 1;
	return 0;
}
+6 −2
Original line number Diff line number Diff line
/* replay.c */
void load_serial(void);
#ifndef REPLAY_H
#define REPLAY_H

int replay_init(void);
void store_message(ipc_message_t *msg);
void assign_serial(ipc_message_t *msg);
void replay(ipc_connection_t *conn, ipc_message_t *msg);

#endif /* REPLAY_H */
+4 −1
Original line number Diff line number Diff line
@@ -729,10 +729,13 @@ void migrate_old_folders(void)
	}
}

void init_server()
int init_server()
{
	INIT_LIST_HEAD(&connection_list);
	if (replay_init() != 0)
		return 1;
	poll_init();
	return 0;
}

ipc_message_t * msg_wholist(void)
+1 −1
Original line number Diff line number Diff line
@@ -23,7 +23,7 @@ void msg_attach_to_channel(ipc_message_t *msg, int channel, const char * exclude
void msg_attach_to_perm(ipc_message_t *msg, perm_t perm);
void msg_attach(ipc_message_t *msg, ipc_connection_t *conn);
void migrate_old_folders(void);
void init_server(void);
int init_server(void);
void send_error(ipc_connection_t *conn, ipc_message_t *orig, const char *format, ...);
void msg_apply_gag(struct user * from, ipc_message_t * msg, const char * field);
ipc_message_t * msg_wholist(void);