Annotation of embedaddon/ntp/ports/winnt/ntpd/ntp_iocompletionport.c, revision 1.1.1.1

1.1       misho       1: #ifdef HAVE_CONFIG_H
                      2: # include <config.h>
                      3: #endif
                      4: 
                      5: #if defined (HAVE_IO_COMPLETION_PORT)
                      6: 
                      7: #include <stddef.h>
                      8: #include <stdio.h>
                      9: #include <process.h>
                     10: #include <syslog.h>
                     11: 
                     12: #include "ntp_stdlib.h"
                     13: #include "ntp_machine.h"
                     14: #include "ntp_fp.h"
                     15: #include "ntp.h"
                     16: #include "ntpd.h"
                     17: #include "ntp_refclock.h"
                     18: #include "ntp_iocompletionport.h"
                     19: #include "transmitbuff.h"
                     20: #include "ntp_request.h"
                     21: #include "ntp_assert.h"
                     22: #include "clockstuff.h"
                     23: #include "ntp_io.h"
                     24: #include "ntp_lists.h"
                     25: #include "clockstuff.h"
                     26: 
                     27: /*
                     28:  * Request types
                     29:  */
                     30: enum {
                     31:        SOCK_RECV,
                     32:        SOCK_SEND,
                     33:        SERIAL_WAIT,
                     34:        SERIAL_READ,
                     35:        SERIAL_WRITE
                     36: };
                     37: 
                     38: #ifdef _MSC_VER
                     39: # pragma warning(push)
                     40: # pragma warning(disable: 201)         /* nonstd extension nameless union */
                     41: #endif
                     42: 
                     43: typedef struct IoCompletionInfo {
                     44:        OVERLAPPED              overlapped;     /* must be first */
                     45:        int                     request_type;
                     46:        union {
                     47:                recvbuf_t *     recv_buf;
                     48:                transmitbuf_t * trans_buf;
                     49:        };
                     50: #ifdef DEBUG
                     51:        struct IoCompletionInfo *link;
                     52: #endif
                     53: } IoCompletionInfo;
                     54: 
                     55: #ifdef _MSC_VER
                     56: # pragma warning(pop)
                     57: #endif
                     58: 
                     59: /*
                     60:  * local function definitions
                     61:  */
                     62: static int QueueSerialWait(struct refclockio *, recvbuf_t *buff, IoCompletionInfo *lpo, BOOL clear_timestamp);
                     63: 
                     64: static int OnSocketRecv(ULONG_PTR, IoCompletionInfo *, DWORD, int);
                     65: static int OnSerialWaitComplete(ULONG_PTR, IoCompletionInfo *, DWORD, int);
                     66: static int OnSerialReadComplete(ULONG_PTR, IoCompletionInfo *, DWORD, int);
                     67: static int OnWriteComplete(ULONG_PTR, IoCompletionInfo *, DWORD, int);
                     68: 
                     69: /* keep a list to traverse to free memory on debug builds */
                     70: #ifdef DEBUG
                     71: static void free_io_completion_port_mem(void);
                     72: IoCompletionInfo *     compl_info_list;
                     73: CRITICAL_SECTION       compl_info_lock;
                     74: #define LOCK_COMPL()   EnterCriticalSection(&compl_info_lock);
                     75: #define UNLOCK_COMPL() LeaveCriticalSection(&compl_info_lock);
                     76: #endif
                     77: 
                     78: /* #define USE_HEAP */
                     79: 
                     80: #ifdef USE_HEAP
                     81: static HANDLE hHeapHandle = NULL;
                     82: #endif
                     83: 
                     84: static HANDLE hIoCompletionPort = NULL;
                     85: 
                     86: static HANDLE WaitableIoEventHandle = NULL;
                     87: static HANDLE WaitableExitEventHandle = NULL;
                     88: 
                     89: #ifdef NTPNEEDNAMEDHANDLE
                     90: #define WAITABLEIOEVENTHANDLE "WaitableIoEventHandle"
                     91: #else
                     92: #define WAITABLEIOEVENTHANDLE NULL
                     93: #endif
                     94: 
                     95: #define MAXHANDLES 3
                     96: HANDLE WaitHandles[MAXHANDLES] = { NULL, NULL, NULL };
                     97: 
                     98: IoCompletionInfo *
                     99: GetHeapAlloc(char *fromfunc)
                    100: {
                    101:        IoCompletionInfo *lpo;
                    102: 
                    103: #ifdef USE_HEAP
                    104:        lpo = HeapAlloc(hHeapHandle,
                    105:                        HEAP_ZERO_MEMORY,
                    106:                        sizeof(IoCompletionInfo));
                    107: #else
                    108:        lpo = emalloc(sizeof(*lpo));
                    109:        memset(lpo, 0, sizeof(*lpo));
                    110: #endif
                    111:        DPRINTF(3, ("Allocation %d memory for %s, ptr %x\n", sizeof(IoCompletionInfo), fromfunc, lpo));
                    112: 
                    113: #ifdef DEBUG
                    114:        LOCK_COMPL();
                    115:        LINK_SLIST(compl_info_list, lpo, link);
                    116:        UNLOCK_COMPL();
                    117: #endif
                    118: 
                    119:        return (lpo);
                    120: }
                    121: 
                    122: void
                    123: FreeHeap(IoCompletionInfo *lpo, char *fromfunc)
                    124: {
                    125: #ifdef DEBUG
                    126:        IoCompletionInfo *unlinked;
                    127: 
                    128:        DPRINTF(3, ("Freeing memory for %s, ptr %x\n", fromfunc, lpo));
                    129: 
                    130:        LOCK_COMPL();
                    131:        UNLINK_SLIST(unlinked, compl_info_list, lpo, link,
                    132:            IoCompletionInfo);
                    133:        UNLOCK_COMPL();
                    134: #endif
                    135: 
                    136: #ifdef USE_HEAP
                    137:        HeapFree(hHeapHandle, 0, lpo);
                    138: #else
                    139:        free(lpo);
                    140: #endif
                    141: }
                    142: 
                    143: transmitbuf_t *
                    144: get_trans_buf()
                    145: {
                    146:        transmitbuf_t *tb  = emalloc(sizeof(*tb));
                    147:        return (tb);
                    148: }
                    149: 
                    150: void
                    151: free_trans_buf(transmitbuf_t *tb)
                    152: {
                    153:        free(tb);
                    154: }
                    155: 
                    156: HANDLE
                    157: get_io_event()
                    158: {
                    159:        return( WaitableIoEventHandle );
                    160: }
                    161: HANDLE
                    162: get_exit_event()
                    163: {
                    164:        return( WaitableExitEventHandle );
                    165: }
                    166: 
                    167: /*  This function will add an entry to the I/O completion port
                    168:  *  that will signal the I/O thread to exit (gracefully)
                    169:  */
                    170: static void
                    171: signal_io_completion_port_exit()
                    172: {
                    173:        if (!PostQueuedCompletionStatus(hIoCompletionPort, 0, 0, 0)) {
                    174:                msyslog(LOG_ERR, "Can't request service thread to exit: %m");
                    175:                exit(1);
                    176:        }
                    177: }
                    178: 
                    179: static unsigned WINAPI
                    180: iocompletionthread(void *NotUsed)
                    181: {
                    182:        BOOL bSuccess = FALSE;
                    183:        int errstatus = 0;
                    184:        DWORD BytesTransferred = 0;
                    185:        ULONG_PTR Key = 0;
                    186:        IoCompletionInfo * lpo = NULL;
                    187:        u_long time_next_ifscan_after_error = 0;
                    188: 
                    189:        UNUSED_ARG(NotUsed);
                    190: 
                    191:        /*
                    192:         *      socket and refclock receive call gettimeofday()
                    193:         *      so the I/O thread needs to be on the same 
                    194:         *      processor as the main and timing threads
                    195:         *      to ensure consistent QueryPerformanceCounter()
                    196:         *      results.
                    197:         */
                    198:        lock_thread_to_processor(GetCurrentThread());
                    199: 
                    200:        /*      Set the thread priority high enough so I/O will
                    201:         *      preempt normal recv packet processing, but not
                    202:         *      higher than the timer sync thread.
                    203:         */
                    204:        if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL)) {
                    205:                msyslog(LOG_ERR, "Can't set thread priority: %m");
                    206:        }
                    207: 
                    208:        while (TRUE) {
                    209:                bSuccess = GetQueuedCompletionStatus(
                    210:                                        hIoCompletionPort, 
                    211:                                        &BytesTransferred, 
                    212:                                        &Key, 
                    213:                                        (LPOVERLAPPED *) &lpo, 
                    214:                                        INFINITE);
                    215:                if (lpo == NULL)
                    216:                {
                    217:                        DPRINTF(2, ("Overlapped IO Thread Exiting\n"));
                    218:                        break; /* fail */
                    219:                }
                    220:                
                    221:                /*
                    222:                 * Deal with errors
                    223:                 */
                    224:                if (bSuccess)
                    225:                        errstatus = 0;
                    226:                else
                    227:                {
                    228:                        errstatus = GetLastError();
                    229:                        if (BytesTransferred == 0)
                    230:                        {
                    231:                                if (WSA_OPERATION_ABORTED == errstatus) {
                    232:                                        DPRINTF(4, ("Transfer Operation aborted\n"));
                    233:                                } else if (ERROR_UNEXP_NET_ERR == errstatus) {
                    234:                                        /*
                    235:                                         * We get this error when trying to send an the network
                    236:                                         * interface is gone or has lost link.  Rescan interfaces
                    237:                                         * to catch on sooner, but no more than once per minute.
                    238:                                         * Once ntp is able to detect changes without polling
                    239:                                         * this should be unneccessary
                    240:                                         */
                    241:                                        if (time_next_ifscan_after_error < current_time) {
                    242:                                                time_next_ifscan_after_error = current_time + 60;
                    243:                                                timer_interfacetimeout(current_time);
                    244:                                        }
                    245:                                        DPRINTF(4, ("sendto unexpected network error, interface may be down\n"));
                    246:                                }
                    247:                        }
                    248:                        else
                    249:                        {
                    250:                                msyslog(LOG_ERR, "sendto error after %d bytes: %m", BytesTransferred);
                    251:                        }
                    252:                }
                    253: 
                    254:                /*
                    255:                 * Invoke the appropriate function based on
                    256:                 * the value of the request_type
                    257:                 */
                    258:                switch(lpo->request_type)
                    259:                {
                    260:                case SERIAL_WAIT:
                    261:                        OnSerialWaitComplete(Key, lpo, BytesTransferred, errstatus);
                    262:                        break;
                    263:                case SERIAL_READ:
                    264:                        OnSerialReadComplete(Key, lpo, BytesTransferred, errstatus);
                    265:                        break;
                    266:                case SOCK_RECV:
                    267:                        OnSocketRecv(Key, lpo, BytesTransferred, errstatus);
                    268:                        break;
                    269:                case SOCK_SEND:
                    270:                case SERIAL_WRITE:
                    271:                        OnWriteComplete(Key, lpo, BytesTransferred, errstatus);
                    272:                        break;
                    273:                default:
                    274:                        DPRINTF(1, ("Unknown request type %d found in completion port\n",
                    275:                                    lpo->request_type));
                    276:                        break;
                    277:                }
                    278:        }
                    279: 
                    280:        return 0;
                    281: }
                    282: 
                    283: /*  Create/initialise the I/O creation port
                    284:  */
                    285: void
                    286: init_io_completion_port(
                    287:        void
                    288:        )
                    289: {
                    290:        unsigned tid;
                    291:        HANDLE thread;
                    292: 
                    293: #ifdef DEBUG
                    294:        InitializeCriticalSection(&compl_info_lock);
                    295:        atexit(&free_io_completion_port_mem);
                    296: #endif
                    297: 
                    298: #ifdef USE_HEAP
                    299:        /*
                    300:         * Create a handle to the Heap
                    301:         */
                    302:        hHeapHandle = HeapCreate(0, 20*sizeof(IoCompletionInfo), 0);
                    303:        if (hHeapHandle == NULL)
                    304:        {
                    305:                msyslog(LOG_ERR, "Can't initialize Heap: %m");
                    306:                exit(1);
                    307:        }
                    308: #endif
                    309: 
                    310: #if 0  /* transmitbuff.c unused, no need to initialize it */
                    311:        init_transmitbuff();
                    312: #endif
                    313: 
                    314:        /* Create the event used to signal an IO event
                    315:         */
                    316:        WaitableIoEventHandle = CreateEvent(NULL, FALSE, FALSE, WAITABLEIOEVENTHANDLE);
                    317:        if (WaitableIoEventHandle == NULL) {
                    318:                msyslog(LOG_ERR,
                    319:                "Can't create I/O event handle: %m - another process may be running - EXITING");
                    320:                exit(1);
                    321:        }
                    322:        /* Create the event used to signal an exit event
                    323:         */
                    324:        WaitableExitEventHandle = CreateEvent(NULL, FALSE, FALSE, NULL);
                    325:        if (WaitableExitEventHandle == NULL) {
                    326:                msyslog(LOG_ERR,
                    327:                "Can't create exit event handle: %m - EXITING");
                    328:                exit(1);
                    329:        }
                    330: 
                    331:        /* Create the IO completion port
                    332:         */
                    333:        hIoCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
                    334:        if (hIoCompletionPort == NULL) {
                    335:                msyslog(LOG_ERR, "Can't create I/O completion port: %m");
                    336:                exit(1);
                    337:        }
                    338: 
                    339:        /*
                    340:         * Initialize the Wait Handles
                    341:         */
                    342:        WaitHandles[0] = WaitableIoEventHandle;
                    343:        WaitHandles[1] = WaitableExitEventHandle; /* exit request */
                    344:        WaitHandles[2] = get_timer_handle();
                    345: 
                    346:        /* Have one thread servicing I/O - there were 4, but this would 
                    347:         * somehow cause NTP to stop replying to ntpq requests; TODO
                    348:         */
                    349:        thread = (HANDLE)_beginthreadex(
                    350:                NULL, 
                    351:                0, 
                    352:                iocompletionthread, 
                    353:                NULL, 
                    354:                CREATE_SUSPENDED, 
                    355:                &tid);
                    356:        ResumeThread(thread);
                    357:        CloseHandle(thread);
                    358: }
                    359:        
                    360: 
                    361: #ifdef DEBUG
                    362: static void
                    363: free_io_completion_port_mem(
                    364:        void
                    365:        )
                    366: {
                    367:        IoCompletionInfo *      pci;
                    368: 
                    369: #if defined(_MSC_VER) && defined (_DEBUG)
                    370:        _CrtCheckMemory();
                    371: #endif
                    372:        LOCK_COMPL();
                    373:        while ((pci = compl_info_list) != NULL) {
                    374: 
                    375: #if 0  /* sockaddr with received-from address in recvbuf */
                    376:        /* is sometimes modified by system after we free it  */
                    377:        /* triggering heap corruption warning -- find a */
                    378:        /* better way to free it after I/O is surely done */
                    379:                /* this handles both xmit and recv buffs */
                    380:                if (pci->recv_buf != NULL) {
                    381:                        DPRINTF(1, ("freeing xmit/recv buff %p\n", pci->recv_buf));
                    382:                        free(pci->recv_buf);
                    383:                }
                    384: #endif
                    385: 
                    386:                FreeHeap(pci, "free_io_completion_port_mem");
                    387:                /* FreeHeap() removed this item from compl_info_list */
                    388:        }
                    389:        UNLOCK_COMPL()
                    390: 
                    391: #if defined(_MSC_VER) && defined (_DEBUG)
                    392:        _CrtCheckMemory();
                    393: #endif
                    394: }
                    395: #endif /* DEBUG */
                    396: 
                    397: 
                    398: void
                    399: uninit_io_completion_port(
                    400:        void
                    401:        )
                    402: {
                    403:        if (hIoCompletionPort != NULL) {
                    404:                /*  Get each of the service threads to exit
                    405:                */
                    406:                signal_io_completion_port_exit();
                    407:        }
                    408: }
                    409: 
                    410: 
                    411: static int
                    412: QueueSerialWait(
                    413:        struct refclockio *     rio,
                    414:        recvbuf_t *             buff,
                    415:        IoCompletionInfo *      lpo,
                    416:        BOOL                    clear_timestamp
                    417:        )
                    418: {
                    419:        lpo->request_type = SERIAL_WAIT;
                    420:        lpo->recv_buf = buff;
                    421: 
                    422:        if (clear_timestamp)
                    423:                memset(&buff->recv_time, 0, sizeof(buff->recv_time));
                    424: 
                    425:        buff->fd = _get_osfhandle(rio->fd);
                    426:        if (!WaitCommEvent((HANDLE) buff->fd, (DWORD *)&buff->recv_buffer, (LPOVERLAPPED) lpo)) {
                    427:                if (ERROR_IO_PENDING != GetLastError()) {
                    428:                        msyslog(LOG_ERR, "Can't wait on Refclock: %m");
                    429:                        freerecvbuf(buff);
                    430:                        return 0;
                    431:                }
                    432:        }
                    433:        return 1;
                    434: }
                    435: 
                    436: 
                    437: static int 
                    438: OnSerialWaitComplete(ULONG_PTR i, IoCompletionInfo *lpo, DWORD Bytes, int errstatus)
                    439: {
                    440:        recvbuf_t *buff;
                    441:        struct refclockio * rio = (struct refclockio *) i;
                    442:        struct peer *pp;
                    443:        l_fp arrival_time;
                    444:        DWORD comm_mask;
                    445:        DWORD modem_status;
                    446:        static const l_fp zero_time = { 0 };
                    447:        BOOL rc;
                    448: 
                    449:        get_systime(&arrival_time);
                    450: 
                    451:        /*
                    452:         * Get the recvbuf pointer from the overlapped buffer.
                    453:         */
                    454:        buff = lpo->recv_buf;
                    455:        comm_mask = (*(DWORD *)&buff->recv_buffer);
                    456: #ifdef DEBUG
                    457:                if (errstatus || comm_mask & ~(EV_RXFLAG | EV_RLSD)) {
                    458:                        msyslog(LOG_ERR, "WaitCommEvent returned unexpected mask %x errstatus %d",
                    459:                                comm_mask, errstatus);
                    460:                        exit(-1);
                    461:                }
                    462: #endif
                    463:                if (comm_mask & EV_RLSD) { 
                    464:                        modem_status = 0;
                    465:                        GetCommModemStatus((HANDLE)buff->fd, &modem_status);
                    466:                        if (modem_status & MS_RLSD_ON) {
                    467:                                /*
                    468:                                 * Use the timestamp from this PPS CD not
                    469:                                 * the later end of line.
                    470:                                 */
                    471:                                buff->recv_time = arrival_time;
                    472:                        }
                    473: 
                    474:                        if (!(comm_mask & EV_RXFLAG)) {
                    475:                                /*
                    476:                                 * if we didn't see an end of line yet
                    477:                                 * issue another wait for it.
                    478:                                 */
                    479:                                QueueSerialWait(rio, buff, lpo, FALSE);
                    480:                                return 1;
                    481:                        }
                    482:                }
                    483: 
                    484:                /*
                    485:                 * We've detected the end of line of serial input.
                    486:                 * Use this timestamp unless we already have a CD PPS
                    487:                 * timestamp in buff->recv_time.
                    488:                 */
                    489:                if (memcmp(&buff->recv_time, &zero_time, sizeof buff->recv_time)) {
                    490:                        /*
                    491:                         * We will first see a user PPS timestamp here on either
                    492:                         * the first or second line of text.  Log a one-time
                    493:                         * message while processing the second line.
                    494:                         */
                    495:                        if (1 == rio->recvcount) {
                    496:                                pp = (struct peer *)rio->srcclock;
                    497:                                msyslog(LOG_NOTICE, "Using user-mode PPS timestamp for %s",
                    498:                                        refnumtoa(&pp->srcadr));
                    499:                        }
                    500:                } else {
                    501:                        buff->recv_time = arrival_time;
                    502:                }
                    503: 
                    504:                /*
                    505:                 * Now that we have a complete line waiting, read it.
                    506:                 * There is still a race here, but we're likely to win.
                    507:                 */
                    508: 
                    509:                lpo->request_type = SERIAL_READ;
                    510: 
                    511:                rc = ReadFile(
                    512:                        (HANDLE)buff->fd,
                    513:                        buff->recv_buffer,
                    514:                        sizeof(buff->recv_buffer),
                    515:                        NULL,
                    516:                        (LPOVERLAPPED)lpo);
                    517: 
                    518:                if (!rc && ERROR_IO_PENDING != GetLastError()) {
                    519:                        msyslog(LOG_ERR, "Can't read from Refclock: %m");
                    520:                        freerecvbuf(buff);
                    521:                        return 0;
                    522:                }
                    523: 
                    524:        return 1;
                    525: }
                    526: 
                    527: /* Return 1 on Successful Read */
                    528: static int 
                    529: OnSerialReadComplete(ULONG_PTR i, IoCompletionInfo *lpo, DWORD Bytes, int errstatus)
                    530: {
                    531:        recvbuf_t *             buff;
                    532:        l_fp                    cr_time;
                    533:        struct refclockio *     rio;
                    534: 
                    535:        rio = (struct refclockio *)i;
                    536:        /*
                    537:         * Get the recvbuf pointer from the overlapped buffer.
                    538:         */
                    539:        buff = lpo->recv_buf;
                    540: 
                    541:        /*
                    542:         * ignore 0 bytes read due to timeout's and closure on fd
                    543:         */
                    544:        if (!errstatus && Bytes) {
                    545:                buff->recv_length = (int) Bytes;
                    546:                buff->receiver = rio->clock_recv;
                    547:                buff->dstadr = NULL;
                    548:                buff->recv_srcclock = rio->srcclock;
                    549:                packets_received++;
                    550:                /*
                    551:                 * Eat the first line of input as it's possibly
                    552:                 * partial and if a PPS is present, it may not 
                    553:                 * have fired since the port was opened.
                    554:                 */
                    555:                if (rio->recvcount++) {
                    556:                        cr_time = buff->recv_time;
                    557:                        add_full_recv_buffer(buff);
                    558:                        /*
                    559:                         * Mimic Unix line discipline and assume CR/LF
                    560:                         * line termination.  On Unix the CR terminates
                    561:                         * the line containing the timecode, and
                    562:                         * immediately after the LF terminates an empty
                    563:                         * line.  So synthesize the empty LF-terminated
                    564:                         * line using the same CR timestamp.  Both CR
                    565:                         * and LF are stripped by refclock_gtlin().
                    566:                         */
                    567:                        buff = get_free_recv_buffer_alloc();
                    568:                        buff->recv_time = cr_time;
                    569:                        buff->recv_length = 0;
                    570:                        buff->fd = _get_osfhandle(rio->fd);
                    571:                        buff->receiver = rio->clock_recv;
                    572:                        buff->dstadr = NULL;
                    573:                        buff->recv_srcclock = rio->srcclock;
                    574:                        add_full_recv_buffer(buff);
                    575:                        /*
                    576:                         * Now signal we have something to process
                    577:                         */
                    578:                        SetEvent(WaitableIoEventHandle);
                    579:                        buff = get_free_recv_buffer_alloc();
                    580:                }
                    581:        }
                    582: 
                    583:        QueueSerialWait(rio, buff, lpo, TRUE);
                    584: 
                    585:        return 1;
                    586: }
                    587: 
                    588: /*  Add a reference clock data structures I/O handles to
                    589:  *  the I/O completion port. Return 1 if any error.
                    590:  */  
                    591: int
                    592: io_completion_port_add_clock_io(
                    593:        struct refclockio *rio
                    594:        )
                    595: {
                    596:        IoCompletionInfo *lpo;
                    597:        recvbuf_t *buff;
                    598: 
                    599:        if (NULL == CreateIoCompletionPort(
                    600:                        (HANDLE)_get_osfhandle(rio->fd), 
                    601:                        hIoCompletionPort, 
                    602:                        (ULONG_PTR)rio,
                    603:                        0)) {
                    604:                msyslog(LOG_ERR, "Can't add COM port to i/o completion port: %m");
                    605:                return 1;
                    606:        }
                    607: 
                    608:        lpo = GetHeapAlloc("io_completion_port_add_clock_io");
                    609:        if (NULL == lpo) {
                    610:                msyslog(LOG_ERR, "Can't allocate heap for completion port: %m");
                    611:                return 1;
                    612:        }
                    613: 
                    614:        buff = get_free_recv_buffer_alloc();
                    615:        QueueSerialWait(rio, buff, lpo, TRUE);
                    616:        return 0;
                    617: }
                    618: 
                    619: /*
                    620:  * Queue a receiver on a socket. Returns 0 if no buffer can be queued 
                    621:  *
                    622:  *  Note: As per the winsock documentation, we use WSARecvFrom. Using
                    623:  *       ReadFile() is less efficient.
                    624:  */
                    625: static unsigned long 
                    626: QueueSocketRecv(
                    627:        SOCKET s,
                    628:        recvbuf_t *buff,
                    629:        IoCompletionInfo *lpo
                    630:        )
                    631: {
                    632:        WSABUF wsabuf;
                    633:        DWORD Flags;
                    634:        DWORD Result;
                    635: 
                    636:        lpo->request_type = SOCK_RECV;
                    637:        lpo->recv_buf = buff;
                    638: 
                    639:        if (buff != NULL) {
                    640:                Flags = 0;
                    641:                buff->fd = s;
                    642:                buff->recv_srcadr_len = sizeof(buff->recv_srcadr);
                    643:                wsabuf.buf = (char *)buff->recv_buffer;
                    644:                wsabuf.len = sizeof(buff->recv_buffer);
                    645: 
                    646:                if (SOCKET_ERROR == WSARecvFrom(buff->fd, &wsabuf, 1, 
                    647:                                                NULL, &Flags, 
                    648:                                                &buff->recv_srcadr.sa, 
                    649:                                                &buff->recv_srcadr_len, 
                    650:                                                (LPOVERLAPPED)lpo, NULL)) {
                    651:                        Result = GetLastError();
                    652:                        switch (Result) {
                    653:                                case NO_ERROR :
                    654:                                case WSA_IO_PENDING :
                    655:                                        break ;
                    656: 
                    657:                                case WSAENOTSOCK :
                    658:                                        msyslog(LOG_ERR, "Can't read from non-socket fd %d: %m", (int)buff->fd);
                    659:                                        /* return the buffer */
                    660:                                        freerecvbuf(buff);
                    661:                                        return 0;
                    662:                                        break;
                    663: 
                    664:                                case WSAEFAULT :
                    665:                                        msyslog(LOG_ERR, "The buffers parameter is incorrect: %m");
                    666:                                        /* return the buffer */
                    667:                                        freerecvbuf(buff);
                    668:                                        return 0;
                    669:                                break;
                    670: 
                    671:                                default :
                    672:                                  /* nop */ ;
                    673:                        }
                    674:                }
                    675:        }
                    676:        else 
                    677:                return 0;
                    678:        return 1;
                    679: }
                    680: 
                    681: 
                    682: /* Returns 0 if any Error */
                    683: static int 
                    684: OnSocketRecv(ULONG_PTR i, IoCompletionInfo *lpo, DWORD Bytes, int errstatus)
                    685: {
                    686:        struct recvbuf *buff = NULL;
                    687:        recvbuf_t *newbuff;
                    688:        l_fp arrival_time;
                    689:        struct interface * inter = (struct interface *) i;
                    690:        
                    691:        get_systime(&arrival_time);
                    692: 
                    693:        NTP_REQUIRE(NULL != lpo);
                    694:        NTP_REQUIRE(NULL != lpo->recv_buf);
                    695: 
                    696:        /*
                    697:         * Convert the overlapped pointer back to a recvbuf pointer.
                    698:         */
                    699:        buff = lpo->recv_buf;
                    700: 
                    701:        /*
                    702:         * If the socket is closed we get an Operation Aborted error
                    703:         * Just clean up
                    704:         */
                    705:        if (errstatus == WSA_OPERATION_ABORTED)
                    706:        {
                    707:                freerecvbuf(buff);
                    708:                lpo->recv_buf = NULL;
                    709:                FreeHeap(lpo, "OnSocketRecv: Socket Closed");
                    710:                return (1);
                    711:        }
                    712: 
                    713:        /*
                    714:         * Get a new recv buffer for the replacement socket receive
                    715:         */
                    716:        newbuff = get_free_recv_buffer_alloc();
                    717:        QueueSocketRecv(inter->fd, newbuff, lpo);
                    718: 
                    719:        DPRINTF(4, ("%sfd %d %s recv packet mode is %d\n", 
                    720:                    (MODE_BROADCAST == get_packet_mode(buff))
                    721:                        ? " **** Broadcast "
                    722:                        : "",
                    723:                    (int)buff->fd, stoa(&buff->recv_srcadr),
                    724:                    get_packet_mode(buff)));
                    725: 
                    726:        /*
                    727:         * If we keep it add some info to the structure
                    728:         */
                    729:        if (Bytes && !inter->ignore_packets) {
                    730:                memcpy(&buff->recv_time, &arrival_time, sizeof buff->recv_time);        
                    731:                buff->recv_length = (int) Bytes;
                    732:                buff->receiver = receive; 
                    733:                buff->dstadr = inter;
                    734: 
                    735:                DPRINTF(2, ("Received %d bytes fd %d in buffer %p from %s\n", 
                    736:                            Bytes, (int)buff->fd, buff, stoa(&buff->recv_srcadr)));
                    737: 
                    738:                packets_received++;
                    739:                inter->received++;
                    740:                add_full_recv_buffer(buff);
                    741:                /*
                    742:                 * Now signal we have something to process
                    743:                 */
                    744:                SetEvent(WaitableIoEventHandle);
                    745:        } else
                    746:                freerecvbuf(buff);
                    747: 
                    748:        return 1;
                    749: }
                    750: 
                    751: 
                    752: /*  Add a socket handle to the I/O completion port, and send 
                    753:  *  NTP_RECVS_PER_SOCKET recv requests to the kernel.
                    754:  */
                    755: extern int
                    756: io_completion_port_add_socket(SOCKET fd, struct interface *inter)
                    757: {
                    758:        IoCompletionInfo *lpo;
                    759:        recvbuf_t *buff;
                    760:        int n;
                    761: 
                    762:        if (fd != INVALID_SOCKET) {
                    763:                if (NULL == CreateIoCompletionPort((HANDLE)fd, 
                    764:                    hIoCompletionPort, (ULONG_PTR)inter, 0)) {
                    765:                        msyslog(LOG_ERR, "Can't add socket to i/o completion port: %m");
                    766:                        return 1;
                    767:                }
                    768:        }
                    769: 
                    770:        /*
                    771:         * Windows 2000 bluescreens with bugcheck 0x76
                    772:         * PROCESS_HAS_LOCKED_PAGES at ntpd process
                    773:         * termination when using more than one pending
                    774:         * receive per socket.  A runtime version test
                    775:         * would allow using more on newer versions
                    776:         * of Windows.
                    777:         */
                    778: 
                    779: #define WINDOWS_RECVS_PER_SOCKET 1
                    780: 
                    781:        for (n = 0; n < WINDOWS_RECVS_PER_SOCKET; n++) {
                    782: 
                    783:                buff = get_free_recv_buffer_alloc();
                    784:                lpo = (IoCompletionInfo *) GetHeapAlloc("io_completion_port_add_socket");
                    785:                if (lpo == NULL)
                    786:                {
                    787:                        msyslog(LOG_ERR, "Can't allocate heap for completion port: %m");
                    788:                        return 1;
                    789:                }
                    790: 
                    791:                QueueSocketRecv(fd, buff, lpo);
                    792: 
                    793:        }
                    794:        return 0;
                    795: }
                    796: 
                    797: static int 
                    798: OnWriteComplete(ULONG_PTR i, IoCompletionInfo *lpo, DWORD Bytes, int errstatus)
                    799: {
                    800:        transmitbuf_t *buff;
                    801:        struct interface *inter;
                    802: 
                    803:        UNUSED_ARG(Bytes);
                    804: 
                    805:        buff = lpo->trans_buf;
                    806: 
                    807:        free_trans_buf(buff);
                    808:        lpo->trans_buf = NULL;
                    809: 
                    810:        if (SOCK_SEND == lpo->request_type) {
                    811:                switch (errstatus) {
                    812:                case WSA_OPERATION_ABORTED:
                    813:                case NO_ERROR:
                    814:                        break;
                    815: 
                    816:                default:
                    817:                        inter = (struct interface *)i;
                    818:                        packets_notsent++;
                    819:                        inter->notsent++;
                    820:                        break;
                    821:                }
                    822:        }
                    823: 
                    824:        if (errstatus == WSA_OPERATION_ABORTED)
                    825:                FreeHeap(lpo, "OnWriteComplete: Socket Closed");
                    826:        else
                    827:                FreeHeap(lpo, "OnWriteComplete");
                    828:        return 1;
                    829: }
                    830: 
                    831: 
                    832: /*
                    833:  * mimic sendto() interface
                    834:  */
                    835: int
                    836: io_completion_port_sendto(
                    837:        int             fd,
                    838:        void  *         pkt,
                    839:        size_t          len,
                    840:        sockaddr_u *    dest
                    841:        )
                    842: {
                    843:        WSABUF                  wsabuf;
                    844:        transmitbuf_t *         buff;
                    845:        DWORD                   Result;
                    846:        int                     errval;
                    847:        int                     AddrLen;
                    848:        IoCompletionInfo *      lpo;
                    849:        DWORD                   Flags;
                    850: 
                    851:        Result = ERROR_SUCCESS;
                    852:        lpo = (IoCompletionInfo *)GetHeapAlloc("io_completion_port_sendto");
                    853:        if (lpo == NULL) {
                    854:                SetLastError(ERROR_OUTOFMEMORY);
                    855:                return -1;
                    856:        }
                    857: 
                    858:        if (len <= sizeof(buff->pkt)) {
                    859:                buff = get_trans_buf();
                    860: 
                    861:                if (buff == NULL) {
                    862:                        msyslog(LOG_ERR, "No more transmit buffers left - data discarded");
                    863:                        FreeHeap(lpo, "io_completion_port_sendto");
                    864:                        SetLastError(ERROR_OUTOFMEMORY);
                    865:                        return -1;
                    866:                }
                    867: 
                    868:                memcpy(&buff->pkt, pkt, len);
                    869:                wsabuf.buf = buff->pkt;
                    870:                wsabuf.len = len;
                    871: 
                    872:                AddrLen = SOCKLEN(dest);
                    873:                lpo->request_type = SOCK_SEND;
                    874:                lpo->trans_buf = buff;
                    875:                Flags = 0;
                    876: 
                    877:                Result = WSASendTo(fd, &wsabuf, 1, NULL, Flags,
                    878:                                   &dest->sa, AddrLen,
                    879:                                   (LPOVERLAPPED)lpo, NULL);
                    880:                if (Result == SOCKET_ERROR) {
                    881:                        errval = WSAGetLastError();
                    882:                        switch (errval) {
                    883: 
                    884:                        case NO_ERROR :
                    885:                        case WSA_IO_PENDING :
                    886:                                Result = ERROR_SUCCESS;
                    887:                                break ;
                    888: 
                    889:                        /*
                    890:                         * Something bad happened
                    891:                         */
                    892:                        default :
                    893:                                msyslog(LOG_ERR,
                    894:                                        "WSASendTo(%s) error %d: %s",
                    895:                                        stoa(dest), errval, strerror(errval));
                    896:                                free_trans_buf(buff);
                    897:                                lpo->trans_buf = NULL;
                    898:                                FreeHeap(lpo, "io_completion_port_sendto");
                    899:                                break;
                    900:                        }
                    901:                }
                    902: #ifdef DEBUG
                    903:                if (debug > 3)
                    904:                        printf("WSASendTo - %d bytes to %s : %d\n", len, stoa(dest), Result);
                    905: #endif
                    906:                if (ERROR_SUCCESS == Result)
                    907:                        return len;
                    908:                SetLastError(Result);
                    909:                return -1;
                    910:        } else {
                    911: #ifdef DEBUG
                    912:                if (debug) printf("Packet too large: %d Bytes\n", len);
                    913: #endif
                    914:                SetLastError(ERROR_INSUFFICIENT_BUFFER);
                    915:                return -1;
                    916:        }
                    917: }
                    918: 
                    919: 
                    920: /*
                    921:  * async_write, clone of write(), used by some reflock drivers
                    922:  */
                    923: int    
                    924: async_write(
                    925:        int fd,
                    926:        const void *data,
                    927:        unsigned int count
                    928:        )
                    929: {
                    930:        transmitbuf_t *buff;
                    931:        IoCompletionInfo *lpo;
                    932:        DWORD BytesWritten;
                    933: 
                    934:        if (count > sizeof buff->pkt) {
                    935: #ifdef DEBUG
                    936:                if (debug) {
                    937:                        printf("async_write: %d bytes too large, limit is %d\n",
                    938:                                count, sizeof buff->pkt);
                    939:                        exit(-1);
                    940:                }
                    941: #endif
                    942:                errno = ENOMEM;
                    943:                return -1;
                    944:        }
                    945: 
                    946:        buff = get_trans_buf();
                    947:        lpo = (IoCompletionInfo *) GetHeapAlloc("async_write");
                    948: 
                    949:        if (! buff || ! lpo) {
                    950:                if (buff) {
                    951:                        free_trans_buf(buff);
                    952:                        DPRINTF(1, ("async_write: out of memory\n"));
                    953:                } else
                    954:                        msyslog(LOG_ERR, "No more transmit buffers left - data discarded");
                    955: 
                    956:                errno = ENOMEM;
                    957:                return -1;
                    958:        }
                    959: 
                    960:        lpo->request_type = SERIAL_WRITE;
                    961:        lpo->trans_buf = buff;
                    962:        memcpy(&buff->pkt, data, count);
                    963: 
                    964:        if (!WriteFile((HANDLE)_get_osfhandle(fd), buff->pkt, count,
                    965:                &BytesWritten, (LPOVERLAPPED)lpo)
                    966:                && ERROR_IO_PENDING != GetLastError()) {
                    967: 
                    968:                msyslog(LOG_ERR, "async_write - error %m");
                    969:                free_trans_buf(buff);
                    970:                lpo->trans_buf = NULL;
                    971:                FreeHeap(lpo, "async_write");
                    972:                errno = EBADF;
                    973:                return -1;
                    974:        }
                    975: 
                    976:        return count;
                    977: }
                    978: 
                    979: 
                    980: /*
                    981:  * GetReceivedBuffers
                    982:  * Note that this is in effect the main loop for processing requests
                    983:  * both send and receive. This should be reimplemented
                    984:  */
                    985: int GetReceivedBuffers()
                    986: {
                    987:        isc_boolean_t have_packet = ISC_FALSE;
                    988:        while (!have_packet) {
                    989:                DWORD Index = WaitForMultipleObjects(MAXHANDLES, WaitHandles, FALSE, INFINITE);
                    990:                switch (Index) {
                    991:                case WAIT_OBJECT_0 + 0 : /* Io event */
                    992: # ifdef DEBUG
                    993:                        if ( debug > 3 )
                    994:                        {
                    995:                                printf( "IoEvent occurred\n" );
                    996:                        }
                    997: # endif
                    998:                        have_packet = ISC_TRUE;
                    999:                        break;
                   1000:                case WAIT_OBJECT_0 + 1 : /* exit request */
                   1001:                        exit(0);
                   1002:                        break;
                   1003:                case WAIT_OBJECT_0 + 2 : /* timer */
                   1004:                        timer();
                   1005:                        break;
                   1006:                case WAIT_IO_COMPLETION : /* loop */
                   1007:                case WAIT_TIMEOUT :
                   1008:                        break;
                   1009:                case WAIT_FAILED:
                   1010:                        msyslog(LOG_ERR, "ntpd: WaitForMultipleObjects Failed: Error: %m");
                   1011:                        break;
                   1012: 
                   1013:                        /* For now do nothing if not expected */
                   1014:                default:
                   1015:                        break;          
                   1016:                                
                   1017:                } /* switch */
                   1018:        }
                   1019: 
                   1020:        return (full_recvbuffs());      /* get received buffers */
                   1021: }
                   1022: 
                   1023: #else
                   1024:   static int NonEmptyCompilationUnit;
                   1025: #endif
                   1026: 

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>