Artifact 80c8d89772d7007cec3076aa4ddaedad0de035541aee5c19d1fb7b78dfab116f:
- File
src/OFKernelEventObserver_kqueue.m
— part of check-in
[b84490ab4f]
at
2016-03-20 12:05:46
on branch 0.8
— Never block when the read buffer is non-empty
This was broken by 88f2f03. The problem only existed when something was
in the read buffer, but not processed completely, as after processing
the read buffer, it would go on to wait for data - but since not the
entire read buffer had been processed, it meant there was still data
left there that needed to be handled first. (user: js, size: 5923) [annotate] [blame] [check-ins using]
/* * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 * Jonathan Schleifer <js@heap.zone> * * All rights reserved. * * This file is part of ObjFW. It may be distributed under the terms of the * Q Public License 1.0, which can be found in the file LICENSE.QPL included in * the packaging of this file. * * Alternatively, it may be distributed under the terms of the GNU General * Public License, either version 2 or 3, which can be found in the file * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this * file. */ #include "config.h" #include <assert.h> #include <errno.h> #include <math.h> #include <fcntl.h> #include <unistd.h> #include <sys/types.h> #include <sys/event.h> #include <sys/time.h> #import "OFKernelEventObserver.h" #import "OFKernelEventObserver+Private.h" #import "OFKernelEventObserver_kqueue.h" #import "OFDataArray.h" #import "OFArray.h" #ifdef OF_HAVE_THREADS # import "OFMutex.h" #endif #import "OFInitializationFailedException.h" #import "OFObserveFailedException.h" #import "OFOutOfRangeException.h" #define EVENTLIST_SIZE 64 @implementation OFKernelEventObserver_kqueue - init { self = [super init]; @try { struct kevent event; #ifdef HAVE_KQUEUE1 if ((_kernelQueue = kqueue1(O_CLOEXEC)) == -1) @throw [OFInitializationFailedException exceptionWithClass: [self class]]; #else int flags; if ((_kernelQueue = kqueue()) == -1) @throw [OFInitializationFailedException exceptionWithClass: [self class]]; if ((flags = fcntl(_kernelQueue, F_GETFD, 0)) != -1) fcntl(_kernelQueue, F_SETFD, flags | FD_CLOEXEC); #endif EV_SET(&event, _cancelFD[0], EVFILT_READ, EV_ADD, 0, 0, 0); if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) @throw [OFInitializationFailedException exceptionWithClass: [self class]]; } @catch (id e) { [self release]; @throw e; } return self; } - (void)dealloc { close(_kernelQueue); [super dealloc]; } - (void)addObjectForReading: (id <OFReadyForReadingObserving>)object { struct kevent event; memset(&event, 0, sizeof(event)); event.ident = [object fileDescriptorForReading]; event.filter = EVFILT_READ; event.flags = EV_ADD; #ifndef __NetBSD__ event.udata = object; #else event.udata = (intptr_t)object; #endif #ifdef OF_HAVE_THREADS [_mutex lock]; @try { #endif [_readObjects addObject: object]; if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) { [_readObjects removeObjectIdenticalTo: object]; @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; } #ifdef OF_HAVE_THREADS } @finally { [_mutex unlock]; } #endif } - (void)addObjectForWriting: (id <OFReadyForWritingObserving>)object { struct kevent event; memset(&event, 0, sizeof(event)); event.ident = [object fileDescriptorForWriting]; event.filter = EVFILT_WRITE; event.flags = EV_ADD; #ifndef __NetBSD__ event.udata = object; #else event.udata = (intptr_t)object; #endif #ifdef OF_HAVE_THREADS [_mutex lock]; @try { #endif [_writeObjects addObject: object]; if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) { [_writeObjects removeObjectIdenticalTo: object]; @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; } #ifdef OF_HAVE_THREADS } @finally { [_mutex unlock]; } #endif } - (void)removeObjectForReading: (id <OFReadyForReadingObserving>)object { struct kevent event; memset(&event, 0, sizeof(event)); event.ident = [object fileDescriptorForReading]; event.filter = EVFILT_READ; event.flags = EV_DELETE; #ifdef OF_HAVE_THREADS [_mutex lock]; @try { #endif if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; [_readObjects removeObjectIdenticalTo: object]; #ifdef OF_HAVE_THREADS } @finally { [_mutex unlock]; } #endif } - (void)removeObjectForWriting: (id <OFReadyForWritingObserving>)object { struct kevent event; memset(&event, 0, sizeof(event)); event.ident = [object fileDescriptorForWriting]; event.filter = EVFILT_WRITE; event.flags = EV_DELETE; #ifdef OF_HAVE_THREADS [_mutex lock]; @try { #endif if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; [_writeObjects removeObjectIdenticalTo: object]; #ifdef OF_HAVE_THREADS } @finally { [_mutex unlock]; } #endif } - (void)observeForTimeInterval: (of_time_interval_t)timeInterval { struct timespec timeout; struct kevent eventList[EVENTLIST_SIZE]; int i, events; if ([self OF_processReadBuffers]) return; timeout.tv_sec = (time_t)timeInterval; timeout.tv_nsec = lrint((timeInterval - timeout.tv_sec) * 1000000000); events = kevent(_kernelQueue, NULL, 0, eventList, EVENTLIST_SIZE, (timeInterval != -1 ? &timeout : NULL)); if (events < 0) @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; for (i = 0; i < events; i++) { void *pool; if (eventList[i].flags & EV_ERROR) @throw [OFObserveFailedException exceptionWithObserver: self errNo: (int)eventList[i].data]; if (eventList[i].ident == _cancelFD[0]) { char buffer; assert(eventList[i].filter == EVFILT_READ); OF_ENSURE(read(_cancelFD[0], &buffer, 1) == 1); continue; } pool = objc_autoreleasePoolPush(); switch (eventList[i].filter) { case EVFILT_READ: if ([_delegate respondsToSelector: @selector(objectIsReadyForReading:)]) [_delegate objectIsReadyForReading: (id)eventList[i].udata]; break; case EVFILT_WRITE: if ([_delegate respondsToSelector: @selector(objectIsReadyForWriting:)]) [_delegate objectIsReadyForWriting: (id)eventList[i].udata]; break; default: assert(0); } objc_autoreleasePoolPop(pool); } } @end