Return to stream.c CVS log | Up to [ELWIX - Embedded LightWeight unIX -] / embedaddon / strongswan / src / libstrongswan / networking / streams |
1.1 misho 1: /* 2: * Copyright (C) 2013 Martin Willi 3: * Copyright (C) 2013 revosec AG 4: * 5: * This program is free software; you can redistribute it and/or modify it 6: * under the terms of the GNU General Public License as published by the 7: * Free Software Foundation; either version 2 of the License, or (at your 8: * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. 9: * 10: * This program is distributed in the hope that it will be useful, but 11: * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 12: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13: * for more details. 14: */ 15: 16: #include <library.h> 17: #include <errno.h> 18: #include <unistd.h> 19: 20: #include "stream.h" 21: 22: typedef struct private_stream_t private_stream_t; 23: 24: /** 25: * Private data of an stream_t object. 26: */ 27: struct private_stream_t { 28: 29: /** 30: * Public stream_t interface. 31: */ 32: stream_t public; 33: 34: /** 35: * Underlying socket 36: */ 37: int fd; 38: 39: /** 40: * Callback if data is ready to read 41: */ 42: stream_cb_t read_cb; 43: 44: /** 45: * Data for read-ready callback 46: */ 47: void *read_data; 48: 49: /** 50: * Callback if write is non-blocking 51: */ 52: stream_cb_t write_cb; 53: 54: /** 55: * Data for write-ready callback 56: */ 57: void *write_data; 58: }; 59: 60: METHOD(stream_t, read_, ssize_t, 61: private_stream_t *this, void *buf, size_t len, bool block) 62: { 63: while (TRUE) 64: { 65: ssize_t ret; 66: 67: if (block) 68: { 69: ret = recv(this->fd, buf, len, 0); 70: } 71: else 72: { 73: ret = recv(this->fd, buf, len, MSG_DONTWAIT); 74: if (ret == -1 && errno == EAGAIN) 75: { 76: /* unify EGAIN and EWOULDBLOCK */ 77: errno = EWOULDBLOCK; 78: } 79: } 80: if (ret == -1 && errno == EINTR) 81: { /* interrupted, try again */ 82: continue; 83: } 84: return ret; 85: } 86: } 87: 88: METHOD(stream_t, read_all, bool, 89: private_stream_t *this, void *buf, size_t len) 90: { 91: ssize_t ret; 92: 93: while (len) 94: { 95: ret = read_(this, buf, len, TRUE); 96: if (ret < 0) 97: { 98: return FALSE; 99: } 100: if (ret == 0) 101: { 102: errno = ECONNRESET; 103: return FALSE; 104: } 105: len -= ret; 106: buf += ret; 107: } 108: return TRUE; 109: } 110: 111: METHOD(stream_t, write_, ssize_t, 112: private_stream_t *this, void *buf, size_t len, bool block) 113: { 114: ssize_t ret; 115: 116: while (TRUE) 117: { 118: if (block) 119: { 120: ret = send(this->fd, buf, len, 0); 121: } 122: else 123: { 124: ret = send(this->fd, buf, len, MSG_DONTWAIT); 125: if (ret == -1 && errno == EAGAIN) 126: { 127: /* unify EGAIN and EWOULDBLOCK */ 128: errno = EWOULDBLOCK; 129: } 130: } 131: if (ret == -1 && errno == EINTR) 132: { /* interrupted, try again */ 133: continue; 134: } 135: return ret; 136: } 137: } 138: 139: METHOD(stream_t, write_all, bool, 140: private_stream_t *this, void *buf, size_t len) 141: { 142: ssize_t ret; 143: 144: while (len) 145: { 146: ret = write_(this, buf, len, TRUE); 147: if (ret < 0) 148: { 149: return FALSE; 150: } 151: if (ret == 0) 152: { 153: errno = ECONNRESET; 154: return FALSE; 155: } 156: len -= ret; 157: buf += ret; 158: } 159: return TRUE; 160: } 161: 162: /** 163: * Watcher callback 164: */ 165: static bool watch(private_stream_t *this, int fd, watcher_event_t event) 166: { 167: bool keep = FALSE; 168: stream_cb_t cb; 169: 170: switch (event) 171: { 172: case WATCHER_READ: 173: cb = this->read_cb; 174: this->read_cb = NULL; 175: keep = cb(this->read_data, &this->public); 176: if (keep) 177: { 178: this->read_cb = cb; 179: } 180: break; 181: case WATCHER_WRITE: 182: cb = this->write_cb; 183: this->write_cb = NULL; 184: keep = cb(this->write_data, &this->public); 185: if (keep) 186: { 187: this->write_cb = cb; 188: } 189: break; 190: case WATCHER_EXCEPT: 191: break; 192: } 193: return keep; 194: } 195: 196: /** 197: * Register watcher for stream callbacks 198: */ 199: static void add_watcher(private_stream_t *this) 200: { 201: watcher_event_t events = 0; 202: 203: if (this->read_cb) 204: { 205: events |= WATCHER_READ; 206: } 207: if (this->write_cb) 208: { 209: events |= WATCHER_WRITE; 210: } 211: if (events) 212: { 213: lib->watcher->add(lib->watcher, this->fd, events, 214: (watcher_cb_t)watch, this); 215: } 216: } 217: 218: METHOD(stream_t, on_read, void, 219: private_stream_t *this, stream_cb_t cb, void *data) 220: { 221: lib->watcher->remove(lib->watcher, this->fd); 222: 223: this->read_cb = cb; 224: this->read_data = data; 225: 226: add_watcher(this); 227: } 228: 229: METHOD(stream_t, on_write, void, 230: private_stream_t *this, stream_cb_t cb, void *data) 231: { 232: lib->watcher->remove(lib->watcher, this->fd); 233: 234: this->write_cb = cb; 235: this->write_data = data; 236: 237: add_watcher(this); 238: } 239: 240: METHOD(stream_t, get_file, FILE*, 241: private_stream_t *this) 242: { 243: FILE *file; 244: int fd; 245: 246: /* fclose() closes the FD passed to fdopen(), so dup() it */ 247: fd = dup(this->fd); 248: if (fd == -1) 249: { 250: return NULL; 251: } 252: file = fdopen(fd, "w+"); 253: if (!file) 254: { 255: close(fd); 256: } 257: return file; 258: } 259: 260: METHOD(stream_t, destroy, void, 261: private_stream_t *this) 262: { 263: lib->watcher->remove(lib->watcher, this->fd); 264: close(this->fd); 265: free(this); 266: } 267: 268: /** 269: * See header 270: */ 271: stream_t *stream_create_from_fd(int fd) 272: { 273: private_stream_t *this; 274: 275: INIT(this, 276: .public = { 277: .read = _read_, 278: .read_all = _read_all, 279: .on_read = _on_read, 280: .write = _write_, 281: .write_all = _write_all, 282: .on_write = _on_write, 283: .get_file = _get_file, 284: .destroy = _destroy, 285: }, 286: .fd = fd, 287: ); 288: 289: return &this->public; 290: }