--- libelwix/src/ring.c 2026/02/11 13:36:09 1.6 +++ libelwix/src/ring.c 2026/02/18 11:41:47 1.8 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: ring.c,v 1.6 2026/02/11 13:36:09 misho Exp $ +* $Id: ring.c,v 1.8 2026/02/18 11:41:47 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -61,6 +61,7 @@ rbuf_init(ringbuf_t *rbuf, int num) atomic_store_explicit((atomic_int*) &rbuf->rb_head, 0, memory_order_relaxed); atomic_store_explicit((atomic_int*) &rbuf->rb_tail, 0, memory_order_relaxed); + atomic_store_explicit((atomic_int*) &rbuf->rb_full, 0, memory_order_relaxed); rbuf->rb_buffer = e_calloc(num, sizeof(struct iovec)); if (!rbuf->rb_buffer) @@ -92,6 +93,7 @@ rbuf_free(ringbuf_t *rbuf) atomic_store_explicit((atomic_int*) &rbuf->rb_head, 0, memory_order_relaxed); atomic_store_explicit((atomic_int*) &rbuf->rb_tail, 0, memory_order_relaxed); + atomic_store_explicit((atomic_int*) &rbuf->rb_full, 0, memory_order_relaxed); } /* @@ -111,6 +113,7 @@ rbuf_purge(ringbuf_t *rbuf) atomic_store_explicit((atomic_int*) &rbuf->rb_head, 0, memory_order_relaxed); atomic_store_explicit((atomic_int*) &rbuf->rb_tail, 0, memory_order_relaxed); + atomic_store_explicit((atomic_int*) &rbuf->rb_full, 0, memory_order_relaxed); } /* @@ -122,11 +125,14 @@ rbuf_purge(ringbuf_t *rbuf) int rbuf_isempty(ringbuf_t *rbuf) { - if (!rbuf) + if (!rbuf || !rbuf->rb_bufnum) return -1; + if (atomic_load_explicit((atomic_int*) &rbuf->rb_full, memory_order_acquire)) + return 0; + return (atomic_load_explicit((atomic_int*) &rbuf->rb_head, memory_order_acquire) == - atomic_load_explicit((atomic_int*) &rbuf->rb_tail, memory_order_acquire)); + atomic_load_explicit((atomic_int*) &rbuf->rb_tail, memory_order_acquire)); } /* @@ -140,16 +146,14 @@ rbuf_isfull(ringbuf_t *rbuf) { int h, t; - if (!rbuf) + if (!rbuf || !rbuf->rb_bufnum) return -1; - if (!rbuf->rb_bufnum) - return 1; - t = atomic_load_explicit((atomic_int*) &rbuf->rb_tail, memory_order_acquire); - h = atomic_load_explicit((atomic_int*) &rbuf->rb_head, memory_order_relaxed) + 1; - if (h >= rbuf->rb_bufnum) - h ^= h; + if (!atomic_load_explicit((atomic_int*) &rbuf->rb_full, memory_order_acquire)) + return 0; + t = atomic_load_explicit((atomic_int*) &rbuf->rb_tail, memory_order_acquire); + h = atomic_load_explicit((atomic_int*) &rbuf->rb_head, memory_order_acquire); return (h == t); } @@ -159,31 +163,49 @@ rbuf_isfull(ringbuf_t *rbuf) * @rbuf = Ring buffer * @data = Data * @len = Length + * @lost = Permit to lost data * return: -1 error, 1 buffer is full or 0 ok */ int -rbuf_enqueue(ringbuf_t *rbuf, void *data, size_t len) +rbuf_enqueue(ringbuf_t *rbuf, void *data, size_t len, int lost) { - int h, t, n; + int h, t, f, n, t2, drop = 0; struct iovec *iov; - if (!rbuf || !rbuf->rb_buffer) + if (!rbuf || !rbuf->rb_buffer || !rbuf->rb_bufnum) return -1; - if (!rbuf->rb_bufnum) - return 1; - h = atomic_load_explicit((atomic_int*) &rbuf->rb_head, memory_order_relaxed); + f = atomic_load_explicit((atomic_int*) &rbuf->rb_full, memory_order_acquire); t = atomic_load_explicit((atomic_int*) &rbuf->rb_tail, memory_order_acquire); - n = (h + 1) % rbuf->rb_bufnum; + h = atomic_load_explicit((atomic_int*) &rbuf->rb_head, memory_order_acquire); - if (n == t) - return 1; + if (f && h == t) { + if (!lost) + return 1; + else + drop = 1; + } + n = (h + 1) % rbuf->rb_bufnum; + iov = rbuf->rb_buffer + h; iov->iov_len = len; iov->iov_base = data; atomic_store_explicit((atomic_int*) &rbuf->rb_head, n, memory_order_release); + if (drop) { + t2 = (t + 1) % rbuf->rb_bufnum; + while (42) { + drop = t; + if (atomic_compare_exchange_weak_explicit((atomic_int*) &rbuf->rb_tail, + &drop, t2, memory_order_release, memory_order_relaxed)) + break; + t = drop; + t2 = (t + 1) % rbuf->rb_bufnum; + } + } else + t2 = atomic_load_explicit((atomic_int*) &rbuf->rb_tail, memory_order_acquire); + atomic_store_explicit((atomic_int*) &rbuf->rb_full, (n == t2), memory_order_release); return 0; } @@ -197,24 +219,32 @@ rbuf_enqueue(ringbuf_t *rbuf, void *data, size_t len) int rbuf_dequeue(ringbuf_t *rbuf, struct iovec **out) { - int h, t, n; + int h, t, n, f; - if (!rbuf || !rbuf->rb_buffer) + if (!rbuf || !rbuf->rb_buffer || !rbuf->rb_bufnum) return -1; - if (!rbuf->rb_bufnum) - return 1; - h = atomic_load_explicit((atomic_int*) &rbuf->rb_head, memory_order_acquire); - t = atomic_load_explicit((atomic_int*) &rbuf->rb_tail, memory_order_relaxed); - n = (t + 1) % rbuf->rb_bufnum; + while (42) { + h = atomic_load_explicit((atomic_int*) &rbuf->rb_head, memory_order_acquire); + f = atomic_load_explicit((atomic_int*) &rbuf->rb_full, memory_order_acquire); + t = atomic_load_explicit((atomic_int*) &rbuf->rb_tail, memory_order_acquire); - if (h == t) - return 1; + if (!f && h == t) + return 1; - if (out) - *out = rbuf->rb_buffer + t; + n = (t + 1) % rbuf->rb_bufnum; - atomic_store_explicit((atomic_int*) &rbuf->rb_tail, n, memory_order_release); + f = t; + if (atomic_compare_exchange_weak_explicit((atomic_int*) &rbuf->rb_tail, + &f, n, memory_order_release, memory_order_relaxed)) { + if (out) + *out = rbuf->rb_buffer + t; + + atomic_store_explicit((atomic_int*) &rbuf->rb_full, 0, memory_order_release); + break; + } + } + return 0; } @@ -234,6 +264,7 @@ lrb_init(lrbuf_t *lrb, u_int size) atomic_store_explicit((atomic_int*) &lrb->lrb_head, 0, memory_order_relaxed); atomic_store_explicit((atomic_int*) &lrb->lrb_tail, 0, memory_order_relaxed); + atomic_store_explicit((atomic_int*) &lrb->lrb_full, 0, memory_order_relaxed); lrb->lrb_data = e_malloc(size); if (!lrb->lrb_data) @@ -265,6 +296,7 @@ lrb_free(lrbuf_t *lrb) atomic_store_explicit((atomic_int*) &lrb->lrb_head, 0, memory_order_relaxed); atomic_store_explicit((atomic_int*) &lrb->lrb_tail, 0, memory_order_relaxed); + atomic_store_explicit((atomic_int*) &lrb->lrb_full, 0, memory_order_relaxed); } /* @@ -296,12 +328,14 @@ lrb_purge(lrbuf_t *lrb) int lrb_isempty(lrbuf_t *lrb) { - if (!lrb) + if (!lrb || !lrb->lrb_size) return -1; - return (!atomic_load_explicit((atomic_int*) &lrb->lrb_full, memory_order_acquire) && - (atomic_load_explicit((atomic_int*) &lrb->lrb_head, memory_order_acquire) == - atomic_load_explicit((atomic_int*) &lrb->lrb_tail, memory_order_acquire))); + if (atomic_load_explicit((atomic_int*) &lrb->lrb_full, memory_order_acquire)) + return 0; + + return (atomic_load_explicit((atomic_int*) &lrb->lrb_head, memory_order_acquire) == + atomic_load_explicit((atomic_int*) &lrb->lrb_tail, memory_order_acquire)); } /* @@ -315,10 +349,8 @@ lrb_isfull(lrbuf_t *lrb) { int h, t; - if (!lrb) + if (!lrb || !lrb->lrb_size) return -1; - if (!lrb->lrb_size) - return 1; if (!atomic_load_explicit((atomic_int*) &lrb->lrb_full, memory_order_acquire)) return 0; @@ -366,9 +398,9 @@ lrb_enqueue(lrbuf_t *lrb, void *data, size_t len, int { int h, t = 0, n, t2 = 0, unused, drop = 0; - if (!lrb || !lrb->lrb_data) + if (!lrb || !lrb->lrb_data || !lrb->lrb_size) return -1; - if (!lrb->lrb_size || lrb->lrb_size <= len) + if (lrb->lrb_size <= len) return 1; lrb_unused(lrb, unused); @@ -453,9 +485,9 @@ lrb_dequeue(lrbuf_t *lrb, void *data, size_t len) { int h, t, t2, n, l, f; - if (!lrb) + if (!lrb || !lrb->lrb_size) return -1; - if (!lrb->lrb_size || !len || lrb_isempty(lrb)) + if (!len || lrb_isempty(lrb)) return 0; if (lrb->lrb_size <= len) len = lrb->lrb_size - 1; @@ -484,7 +516,7 @@ lrb_dequeue(lrbuf_t *lrb, void *data, size_t len) } else { if (data) { memcpy(data, lrb->lrb_data + t, n); - memcpy(data + n, lrb->lrb_data, l - n); + memcpy(((u_char*) data) + n, lrb->lrb_data, l - n); } t2 = l - n; }