#include #include #include #include #include #include #include #include #include "memcache-lite-private.h" MC(return_t) MC(buffer_initialize)(MC(buffer_t) *buffer) { if (buffer == NULL) DLOGR(ERROR_NULL, "buffer"); buffer->data = malloc(MEMCACHE_BUFFER_START_SIZE); if (buffer->data == NULL) DLOGR(ERROR_MEM, "malloc()"); buffer->remaining = MEMCACHE_BUFFER_START_SIZE; buffer->allocated = MEMCACHE_BUFFER_START_SIZE; buffer->length = 0; buffer->cursor = buffer->data; return RETURN_SUCCESS; } MC(return_t) MC(buffer_resize)(MC(buffer_t) *buffer, size_t size) { /* buffer size is in block sizes, not arbitrary sizes */ size_t new_size = MEMCACHE_BUFFER_BLOCK * ((size/MEMCACHE_BUFFER_BLOCK)+1); if (buffer == NULL) DLOGR(ERROR_NULL, "buffer == NULL"); if (new_size < MEMCACHE_BUFFER_START_SIZE) return RETURN_SUCCESS; if (new_size > MEMCACHE_BUFFER_MAX_SIZE) DLOGR(ERROR_TOOBIG, "size"); /* TODO: consider using posix_memalign and sysconf(_SC_PAGESIZE) */ if (new_size > buffer->allocated) { /* ENLARGE */ char *new_buffer = NULL; DLOG("increasing size: %zu --> %zu", buffer->allocated, new_size); new_buffer = realloc(buffer->data, new_size); if (new_buffer == NULL) DLOGR(ERROR_MEM, "realloc() failed"); buffer->data = new_buffer; } else if (new_size < buffer->allocated && new_size < buffer->length) { /* SHRINK */ char *new_buffer = NULL; DLOG("shrinking size: %zu --> %zu", buffer->allocated, new_size); new_buffer = realloc(buffer->data, new_size); if (new_buffer == NULL) DLOGR(ERROR_MEM, "realloc() failed"); buffer->data = new_buffer; } buffer->allocated = new_size; buffer->remaining = buffer->allocated - buffer->length; /* reset the cursor position */ buffer->cursor = buffer->data + buffer->length; return RETURN_SUCCESS; } MC(return_t) MC(buffer_resize_if_needed)(MC(buffer_t) *buffer) { MC(return_t) ret = ERROR_NONE; if (buffer == NULL) DLOGR(ERROR_NULL, "buffer == NULL"); if (buffer->remaining != 0) return RETURN_SUCCESS; /* double the size if space is needed */ ret = MC(buffer_resize)(buffer, buffer->allocated * 2); if (ret != RETURN_SUCCESS) DLOGR(ret, "failed to resize buffer"); return RETURN_SUCCESS; } MC(return_t) MC(buffer_put)(MC(buffer_t) *buffer, const char *data, size_t size) { MC(return_t) ret = ERROR_NONE; if (buffer == NULL) DLOGR(ERROR_NULL, "buffer"); if (buffer->data == NULL) DLOGR(ERROR_NULL, "buffer->data"); if (data == NULL) DLOGR(ERROR_NULL, "data"); if (size > buffer->remaining) { DLOG("resizing is necessary"); ret = MC(buffer_resize)(buffer, size); } if (ret != ERROR_NONE) DLOGR(ret, "not enough space"); DLOG("buffer pre memcpy: %p", buffer); DLOG("-----------> data: %p", buffer->data); DLOG("---------> cursor: %p", buffer->cursor); memcpy(buffer->cursor, data, size); buffer->remaining -= size; buffer->length += size; buffer->cursor += size; return RETURN_SUCCESS; } MC(return_t) MC(buffer_getn)(MC(buffer_t) *buffer, char *data, size_t size) { if (buffer == NULL) DLOGR(ERROR_NULL, "buffer"); if (data == NULL) DLOGR(ERROR_NULL, "data"); if (size > buffer->length) DLOGR(ERROR_TOOBIG, "size:%zu", size); memcpy(data, buffer->data, size); MC(buffer_flush)(buffer, size); return RETURN_SUCCESS; } MC(return_t) MC(buffer_get)(MC(buffer_t) *buffer, char **dst, size_t *size) { if (buffer == NULL) DLOGR(ERROR_NULL, "buffer"); if (buffer->data == NULL) DLOGR(ERROR_NULL, "buffer->data"); if (dst == NULL) DLOGR(ERROR_NULL, "dst"); if (size == NULL) DLOGR(ERROR_NULL, "size"); *dst = malloc(buffer->length); if (*dst == NULL) DLOGR(ERROR_MEM, "malloc()"); *size = buffer->length; memcpy(*dst, buffer->data, buffer->length); MC(buffer_flush_all)(buffer); return RETURN_SUCCESS; } MC(return_t) MC(buffer_read_fd)(MC(buffer_t) *buffer, int fd) { long ret = -1; ssize_t bytes_read; struct io_event event; // num of events the ctx is for. TODO:DEFINE! struct iocb iocb; struct iocb *iocbs[] = { &iocb, NULL }; /* TODO: move into config */ struct timespec timeout = { .tv_sec = 0, .tv_nsec = 5000000 }; if (buffer == NULL) DLOGR(ERROR_NULL, "buffer == NULL"); /* Expect non-blocking and attempt to read as much as possible*/ if ((ret = MC(buffer_resize_if_needed)(buffer)) != RETURN_SUCCESS) DLOGR(ret, "call to buffer_resize_if_needed() failed"); /* TODO error handling */ io_prep_pread(&iocb, fd, buffer->cursor, buffer->remaining, 0); assert(1 == io_submit(*buffer->io_context, 1, iocbs)); switch ((ret = io_getevents(*buffer->io_context, 1, 1, &event, &timeout))) { case 1: /* success */ bytes_read = event.res; // process events[0] if (bytes_read > 0) { buffer->remaining -= bytes_read; buffer->cursor += bytes_read; buffer->length += bytes_read; } break; default: /* timeout */ DLOG("timeout or error occurred"); io_cancel(*buffer->io_context, &iocb, &event); DLOGR(ERROR_SOCKET, "connection error has occurred"); // case 0: /* error */ } return RETURN_SUCCESS; } MC(return_t) MC(buffer_write_fd)(MC(buffer_t) *buffer, int fd) { ssize_t bytes_written; struct io_event event; struct iocb iocb; struct iocb *iocbs[] = { &iocb, NULL }; /* TODO: move into config */ struct timespec timeout = { .tv_sec = 0, .tv_nsec = 5000000 }; long ret; if (buffer == NULL) DLOGR(ERROR_NULL, "buffer"); if (buffer->data == NULL) DLOGR(ERROR_NULL, "buffer->data"); if (fd < 0) DLOGR(ERROR_SOCKET, "bad fd"); if (buffer->length == 0) return RETURN_SUCCESS; bzero(&event, sizeof(event)); /* TODO error handling */ io_prep_pwrite(&iocb, fd, buffer->data, buffer->length, 0); iocb.data = NULL; if((ret = io_submit(*buffer->io_context, 1, iocbs)) != 1) { DLOGR(ERROR_SOCKET, "io_submit(): %s", strerror(-ret)); } switch ((ret = io_getevents(*buffer->io_context, 1, 1, &event, &timeout))) { case 1: /* success */ bytes_written = event.res; // process events[0] if (bytes_written > 0) MC(buffer_flush)(buffer, bytes_written); break; default: /* timeout or error */ DLOG("timeout or error occurred"); io_cancel(*buffer->io_context, &iocb, &event); DLOGR(ERROR_SOCKET, "connection error has occurred"); } return RETURN_SUCCESS; } MC(return_t) MC(buffer_flush_all)(MC(buffer_t) *buffer) { if (buffer == NULL) DLOGR(ERROR_NULL, "buffer"); /* does not bzero() any memory. */ buffer->length = 0; buffer->cursor = buffer->data; buffer->remaining = buffer->allocated; return RETURN_SUCCESS; } MC(return_t) MC(buffer_flush)(MC(buffer_t) *buffer, size_t size) { if (buffer == NULL) DLOGR(ERROR_NULL, "buffer"); if (buffer->data == NULL) DLOGR(ERROR_NULL, "buffer->data"); buffer->remaining += size; // IS THIS RIGHT? BUGGY TODO buffer->cursor = buffer->data + size + 1; buffer->length -= size; memmove(buffer->data, buffer->cursor, buffer->length); buffer->cursor = buffer->data + buffer->length; return RETURN_SUCCESS; } MC(return_t) MC(buffer_used)(MC(buffer_t) *buffer, size_t *size) { if (size == NULL) DLOGR(ERROR_NULL, "size"); if (buffer == NULL) DLOGR(ERROR_NULL, "buffer"); *size = buffer->length; return RETURN_SUCCESS; } MC(return_t) MC(buffer_available)(MC(buffer_t) *buffer, size_t *size) { if (size == NULL) DLOGR(ERROR_NULL, "size"); if (buffer == NULL) DLOGR(ERROR_NULL, "buffer"); *size = buffer->remaining; return RETURN_SUCCESS; } MC(return_t) MC(buffer_destroy)(MC(buffer_t) *buffer) { if (buffer == NULL) DLOGR(ERROR_NULL, "buffer"); if (buffer->data == NULL) return RETURN_SUCCESS; buffer->allocated = 0; buffer->remaining = 0; buffer->cursor = NULL; free(buffer->data); return RETURN_SUCCESS; }