Commit d6b35f9f authored by Andrew Price's avatar Andrew Price
Browse files

Defer closing replay files

close()ing replay files after storing a message can block on i/o under
certain conditions. Stuff the fds of the opened files into a list and
process that list only when a poll_wait() times out, which means there
are no messages waiting to be processed. Under flood conditions where
the list is full of open fds, they'll be reused when storing new
messages.
parent e27f8078
Loading
Loading
Loading
Loading
Loading
+33 −3
Original line number Diff line number Diff line
@@ -47,13 +47,42 @@ static int store_wrap(int index)
	return index;
}

static int replay_fds[STORE_SIZE];

void replay_commit(void)
{
	/* If a user pastes a novel into the talker we don't want to be stuck
	   doing i/o for a long time afterwards, so a rate limit is needed. */
#define COMMIT_RATELIMIT (10)
	static unsigned i = 0;
	unsigned closed = 0;

	for (; i < STORE_SIZE && closed < COMMIT_RATELIMIT; i++) {
		int fd = replay_fds[i];
		if (fd >= 0) {
			replay_fds[i] = -1;
			fsync(fd);
			close(fd);
			closed++;
		}
	}
	if (i == STORE_SIZE)
		i = 0;
}

static void defer_close(unsigned n, int fd)
{
	replay_fds[n] = fd;
}

static int write_message(ipc_message_t *msg, unsigned n)
{
	char pathname[STORE_FILE_NAME_LEN];
	int fd = replay_fds[n];
	struct iovec iov[2];
	int fd;

	sprintf(pathname, REPLAY_DIR "/%0*u", (int)STORE_SIZE_LEN, n);
	if (fd < 0)
		fd = open(pathname, O_RDWR|O_CLOEXEC|O_CREAT|O_TRUNC, S_IRUSR|S_IWUSR);
	if (fd < 0) {
		perror(pathname);
@@ -69,7 +98,7 @@ static int write_message(ipc_message_t *msg, unsigned n)
	};
	if (pwritev(fd, iov, 2, 0) != (iov[0].iov_len + iov[1].iov_len))
		perror(pathname);
	close(fd);
	defer_close(n, fd);
	return 0;
}

@@ -255,6 +284,7 @@ int replay_init(void)
		struct stat st;
		int fd;

		replay_fds[i] = -1;
		sprintf(pathname, REPLAY_DIR "/%0*u", (int)STORE_SIZE_LEN, i);
		fd = open(pathname, O_RDONLY|O_CLOEXEC|O_CREAT, S_IRUSR|S_IWUSR);
		if (fd < 0) {
+1 −0
Original line number Diff line number Diff line
@@ -5,5 +5,6 @@ int replay_init(void);
void store_message(ipc_message_t *msg);
void assign_serial(ipc_message_t *msg);
void replay(ipc_connection_t *conn, ipc_message_t *msg);
void replay_commit(void);

#endif /* REPLAY_H */
+4 −2
Original line number Diff line number Diff line
@@ -143,7 +143,7 @@ static int mainsock_event_cb(poll_event_t *events, int nmemb, void *data)
		/* event on mainsock */
		if (ev->data == NULL) {
			if (ev->is_error) {
				return 1;
				return -1;
			} else
			if (ev->is_read) {
				accept_connection(*mainsock);
@@ -182,7 +182,7 @@ static int mainsock_event_cb(poll_event_t *events, int nmemb, void *data)
			}
		}
	}
	return 0;
	return nmemb;
}

void watch_mainsock(int mainsock)
@@ -199,6 +199,8 @@ void watch_mainsock(int mainsock)
	do {
		while ((ret = poll_wait(1000, mainsock_event_cb, &mainsock)) >= 0) {
			/* end of events handling, do periodic stuff here */
			if (ret == 0)
				replay_commit();
		}
		/* poll got interrupted, not actually an error, go around again */
	} while (ret == -1 && errno == EINTR);