Artifact 6587f7a7e41a5c023f1dd0c83f77b955569995388960380fea0d1285c8c912ae:
- File
src/OFKernelEventObserver_kqueue.m
— part of check-in
[85917ea0dd]
at
2016-03-20 14:19:43
on branch 0.8
— Use the locked queue for kqueue and epoll as well
_readObjects must only be changed from the thread running the observer
and not from a thread adding or removing objects to observe. This is
already handled by the locked queue used by poll and select, so the best
way to solve this is to use the locked queue for kqueue and epoll as
well. (user: js, size: 5152) [annotate] [blame] [check-ins using]
/* * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 * Jonathan Schleifer <js@heap.zone> * * All rights reserved. * * This file is part of ObjFW. It may be distributed under the terms of the * Q Public License 1.0, which can be found in the file LICENSE.QPL included in * the packaging of this file. * * Alternatively, it may be distributed under the terms of the GNU General * Public License, either version 2 or 3, which can be found in the file * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this * file. */ #include "config.h" #include <assert.h> #include <errno.h> #include <math.h> #include <fcntl.h> #include <unistd.h> #include <sys/types.h> #include <sys/event.h> #include <sys/time.h> #import "OFKernelEventObserver.h" #import "OFKernelEventObserver+Private.h" #import "OFKernelEventObserver_kqueue.h" #import "OFDataArray.h" #import "OFArray.h" #ifdef OF_HAVE_THREADS # import "OFMutex.h" #endif #import "OFInitializationFailedException.h" #import "OFObserveFailedException.h" #import "OFOutOfRangeException.h" #define EVENTLIST_SIZE 64 @implementation OFKernelEventObserver_kqueue - init { self = [super init]; @try { struct kevent event; #ifdef HAVE_KQUEUE1 if ((_kernelQueue = kqueue1(O_CLOEXEC)) == -1) @throw [OFInitializationFailedException exceptionWithClass: [self class]]; #else int flags; if ((_kernelQueue = kqueue()) == -1) @throw [OFInitializationFailedException exceptionWithClass: [self class]]; if ((flags = fcntl(_kernelQueue, F_GETFD, 0)) != -1) fcntl(_kernelQueue, F_SETFD, flags | FD_CLOEXEC); #endif EV_SET(&event, _cancelFD[0], EVFILT_READ, EV_ADD, 0, 0, 0); if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) @throw [OFInitializationFailedException exceptionWithClass: [self class]]; } @catch (id e) { [self release]; @throw e; } return self; } - (void)dealloc { close(_kernelQueue); [super dealloc]; } - (void)OF_addObjectForReading: (id <OFReadyForReadingObserving>)object { struct kevent event; memset(&event, 0, sizeof(event)); event.ident = [object fileDescriptorForReading]; event.filter = EVFILT_READ; event.flags = EV_ADD; #ifndef __NetBSD__ event.udata = object; #else event.udata = (intptr_t)object; #endif if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; } - (void)OF_addObjectForWriting: (id <OFReadyForWritingObserving>)object { struct kevent event; memset(&event, 0, sizeof(event)); event.ident = [object fileDescriptorForWriting]; event.filter = EVFILT_WRITE; event.flags = EV_ADD; #ifndef __NetBSD__ event.udata = object; #else event.udata = (intptr_t)object; #endif if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; } - (void)OF_removeObjectForReading: (id <OFReadyForReadingObserving>)object { struct kevent event; memset(&event, 0, sizeof(event)); event.ident = [object fileDescriptorForReading]; event.filter = EVFILT_READ; event.flags = EV_DELETE; if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; } - (void)OF_removeObjectForWriting: (id <OFReadyForWritingObserving>)object { struct kevent event; memset(&event, 0, sizeof(event)); event.ident = [object fileDescriptorForWriting]; event.filter = EVFILT_WRITE; event.flags = EV_DELETE; if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; } - (void)observeForTimeInterval: (of_time_interval_t)timeInterval { struct timespec timeout; struct kevent eventList[EVENTLIST_SIZE]; int i, events; [self OF_processQueue]; if ([self OF_processReadBuffers]) return; timeout.tv_sec = (time_t)timeInterval; timeout.tv_nsec = lrint((timeInterval - timeout.tv_sec) * 1000000000); events = kevent(_kernelQueue, NULL, 0, eventList, EVENTLIST_SIZE, (timeInterval != -1 ? &timeout : NULL)); if (events < 0) @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; for (i = 0; i < events; i++) { void *pool; if (eventList[i].flags & EV_ERROR) @throw [OFObserveFailedException exceptionWithObserver: self errNo: (int)eventList[i].data]; if (eventList[i].ident == _cancelFD[0]) { char buffer; assert(eventList[i].filter == EVFILT_READ); OF_ENSURE(read(_cancelFD[0], &buffer, 1) == 1); continue; } pool = objc_autoreleasePoolPush(); switch (eventList[i].filter) { case EVFILT_READ: if ([_delegate respondsToSelector: @selector(objectIsReadyForReading:)]) [_delegate objectIsReadyForReading: (id)eventList[i].udata]; break; case EVFILT_WRITE: if ([_delegate respondsToSelector: @selector(objectIsReadyForWriting:)]) [_delegate objectIsReadyForWriting: (id)eventList[i].udata]; break; default: assert(0); } objc_autoreleasePoolPop(pool); } } @end