#include #include #include #include #include #include #include #include #include "memcache-lite-private.h" MC(return_t) MC(io_connect_unix)(const char *path, int *fd) { struct sockaddr_un socket_address; struct sockaddr *addr_ptr = (struct sockaddr *) &socket_address; if (path == NULL) DLOGR(ERROR_NULL, "path"); if (fd == NULL) DLOGR(ERROR_NULL, "fd"); /* setup socket and connect */ *fd = socket(AF_UNIX, SOCK_STREAM, 0); if (*fd < 0) DLOGR(ERROR_CONNECT, "socket() error"); socket_address.sun_family = AF_UNIX; strcpy(socket_address.sun_path, path); if (connect(*fd, addr_ptr, sizeof(struct sockaddr_un)) < 0) { close(*fd); DLOGR(ERROR_CONNECT, "connect() error"); } return RETURN_SUCCESS; } MC(return_t) MC(io_readn)(MC(state_t) *state, char *dst, size_t size) { ssize_t ret = 0; size_t length = 0; if (state == NULL) FAILURE(ERROR_NULL, "state"); if (size > MEMCACHE_BUFFER_MAX_SIZE) FAILURE(ERROR_TOOBIG, "size"); if (size == 0) return RETURN_SUCCESS; for (;;) { MC(buffer_used)(&state->read, &length); if (length >= size) break; ret = MC(buffer_read_fd)(&state->read, state->socket); if (ret != RETURN_SUCCESS) { state->connected = 0; close(state->socket); FAILURE(ret, "could not read from fd"); } } if (dst == NULL) /* just eat the data */ MC(buffer_flush)(&state->read, size); else MC(buffer_getn)(&state->read, dst, size); return RETURN_SUCCESS; } MC(return_t) MC(io_readn_until)(MC(state_t) *state, char end, char *dst, size_t max) { ssize_t ret = 0; MC(return_t) buffer_ret = ERROR_NONE; size_t start_length = 0, amount_read = 0, length = 0; char *match = NULL; if (state == NULL) FAILURE(ERROR_NULL, "state"); // TODO: move into helper buffer_match(char) func MC(buffer_used)(&state->read, &start_length); if (memchr(state->read.data, end, start_length) != NULL) return RETURN_SUCCESS; do { MC(buffer_used)(&state->read, &start_length); if (start_length >= max) return RETURN_FAILURE; ret = MC(buffer_read_fd)(&state->read, state->socket); if (ret != RETURN_SUCCESS) { state->connected = 0; close(state->socket); FAILURE(ret, "could not read from fd"); } MC(buffer_used)(&state->read, &length); amount_read = length - start_length; /* Only scan the last read block for the character */ DLOG("total_length: %zu", length); DLOG("amount_read: %zu", amount_read); match = memchr(state->read.cursor - amount_read, end, amount_read); DLOG("match: %p", match); DLOG("cursor: %p", state->read.cursor); } while(match == NULL); buffer_ret = MC(buffer_getn)(&state->read, dst, (unsigned long) (match - state->read.data)); if (buffer_ret != ERROR_NONE) FAILURE(buffer_ret, "failed to copy read data"); return RETURN_SUCCESS; } MC(return_t) MC(io_write)(MC(state_t) *state, const char *src, size_t len) { MC(return_t) ret = ERROR_NULL; if (state == NULL) FAILURE(ERROR_NULL, "state"); ret = MC(buffer_put)(&state->write, src, len); if (ret != RETURN_SUCCESS) FAILURE(ret, "cannot write to buffer"); return RETURN_SUCCESS; } MC(return_t) MC(io_write_sync)(MC(state_t) *state) { size_t used = 0; int max_tries = 100; MC(return_t) ret = ERROR_NULL; if (state == NULL) FAILURE(ERROR_NULL, "state"); do { ret = MC(buffer_write_fd)(&state->write, state->socket); if (ret != ERROR_NONE || max_tries <= 0) { state->connected = 0; close(state->socket); FAILURE(ret, "failed to write to fd"); } MC(buffer_used)(&state->write, &used); max_tries--; } while (used > 0); return RETURN_SUCCESS; }