Index: src/Makefile ================================================================== --- src/Makefile +++ src/Makefile @@ -89,19 +89,20 @@ OFINIFile.m \ OFSettings.m \ OFZIPArchive.m \ OFZIPArchiveEntry.m SRCS_PLUGINS = OFPlugin.m -SRCS_SOCKETS = OFHTTPClient.m \ - OFHTTPRequest.m \ - OFHTTPResponse.m \ - OFHTTPServer.m \ - OFKernelEventObserver.m \ - OFStreamSocket.m \ - OFTCPSocket.m \ - OFUDPSocket.m \ - resolver.m \ +SRCS_SOCKETS = OFHTTPClient.m \ + OFHTTPRequest.m \ + OFHTTPResponse.m \ + OFHTTPServer.m \ + OFKernelEventObserver.m \ + OFKernelEventObserver_LockedQueue.m \ + OFStreamSocket.m \ + OFTCPSocket.m \ + OFUDPSocket.m \ + resolver.m \ socket.m SRCS_THREADS = OFCondition.m \ OFMutex.m \ OFRecursiveMutex.m \ OFThreadPool.m \ Index: src/OFKernelEventObserver+Private.h ================================================================== --- src/OFKernelEventObserver+Private.h +++ src/OFKernelEventObserver+Private.h @@ -17,14 +17,9 @@ #import "OFKernelEventObserver.h" OF_ASSUME_NONNULL_BEGIN @interface OFKernelEventObserver () -- (void)OF_addObjectForReading: (id)object; -- (void)OF_addObjectForWriting: (id)object; -- (void)OF_removeObjectForReading: (id)object; -- (void)OF_removeObjectForWriting: (id)object; -- (void)OF_processQueueAndStoreRemovedIn: (nullable OFMutableArray*)removed; - (void)OF_processReadBuffers; @end OF_ASSUME_NONNULL_END Index: src/OFKernelEventObserver.h ================================================================== --- src/OFKernelEventObserver.h +++ src/OFKernelEventObserver.h @@ -19,15 +19,10 @@ #import "socket.h" OF_ASSUME_NONNULL_BEGIN @class OFMutableArray OF_GENERIC(ObjectType); -@class OFMutableDictionary OF_GENERIC(KeyType, ObjectType); -@class OFDataArray; -#ifdef OF_HAVE_THREADS -@class OFMutex; -#endif @class OFDate; /*! * @protocol OFKernelEventObserverDelegate * OFKernelEventObserver.h ObjFW/OFKernelEventObserver.h @@ -110,22 +105,17 @@ { OFMutableArray OF_GENERIC(id ) *_readObjects; OFMutableArray OF_GENERIC(id ) *_writeObjects; - OFMutableArray *_queue; - OFDataArray *_queueActions; id _delegate; #ifdef OF_HAVE_PIPE int _cancelFD[2]; #else of_socket_t _cancelFD[2]; struct sockaddr_in _cancelAddr; #endif -#ifdef OF_HAVE_THREADS - OFMutex *_mutex; -#endif } /*! * The delegate for the OFKernelEventObserver. */ Index: src/OFKernelEventObserver.m ================================================================== --- src/OFKernelEventObserver.m +++ src/OFKernelEventObserver.m @@ -16,25 +16,18 @@ #define __NO_EXT_QNX #include "config.h" -#include - #import "OFKernelEventObserver.h" #import "OFKernelEventObserver+Private.h" #import "OFArray.h" -#import "OFDictionary.h" #import "OFStream.h" #import "OFStream+Private.h" -#import "OFDataArray.h" #ifndef OF_HAVE_PIPE # import "OFStreamSocket.h" #endif -#ifdef OF_HAVE_THREADS -# import "OFMutex.h" -#endif #import "OFDate.h" #ifdef HAVE_KQUEUE # import "OFKernelEventObserver_kqueue.h" #endif @@ -53,18 +46,10 @@ #import "OFOutOfRangeException.h" #import "socket.h" #import "socket_helpers.h" -enum { - QUEUE_ADD = 0, - QUEUE_REMOVE = 1, - QUEUE_READ = 0, - QUEUE_WRITE = 2 -}; -#define QUEUE_ACTION (QUEUE_ADD | QUEUE_REMOVE) - @implementation OFKernelEventObserver @synthesize delegate = _delegate; + (void)initialize { @@ -108,13 +93,10 @@ socklen_t cancelAddrLen; #endif _readObjects = [[OFMutableArray alloc] init]; _writeObjects = [[OFMutableArray alloc] init]; - _queue = [[OFMutableArray alloc] init]; - _queueActions = [[OFDataArray alloc] - initWithItemSize: sizeof(int)]; #ifdef OF_HAVE_PIPE if (pipe(_cancelFD)) @throw [OFInitializationFailedException exceptionWithClass: [self class]]; @@ -146,14 +128,10 @@ &cancelAddrLen) != 0) @throw [OFInitializationFailedException exceptionWithClass: [self class]]; # endif #endif - -#ifdef OF_HAVE_THREADS - _mutex = [[OFMutex alloc] init]; -#endif } @catch (id e) { [self release]; @throw e; } @@ -170,168 +148,32 @@ of_socket_port_free(_cancelAddr.sin_port, SOCK_DGRAM); #endif [_readObjects release]; [_writeObjects release]; - [_queue release]; - [_queueActions release]; -#ifdef OF_HAVE_THREADS - [_mutex release]; -#endif [super dealloc]; } - (void)addObjectForReading: (id )object { -#ifdef OF_HAVE_THREADS - [_mutex lock]; -#endif - @try { - int qi = QUEUE_ADD | QUEUE_READ; - - [_queue addObject: object]; - [_queueActions addItem: &qi]; - } @finally { -#ifdef OF_HAVE_THREADS - [_mutex unlock]; -#endif - } - - [self cancel]; + OF_UNRECOGNIZED_SELECTOR } - (void)addObjectForWriting: (id )object { -#ifdef OF_HAVE_THREADS - [_mutex lock]; -#endif - @try { - int qi = QUEUE_ADD | QUEUE_WRITE; - - [_queue addObject: object]; - [_queueActions addItem: &qi]; - } @finally { -#ifdef OF_HAVE_THREADS - [_mutex unlock]; -#endif - } - - [self cancel]; + OF_UNRECOGNIZED_SELECTOR } - (void)removeObjectForReading: (id )object { -#ifdef OF_HAVE_THREADS - [_mutex lock]; -#endif - @try { - int qi = QUEUE_REMOVE | QUEUE_READ; - - [_queue addObject: object]; - [_queueActions addItem: &qi]; - } @finally { -#ifdef OF_HAVE_THREADS - [_mutex unlock]; -#endif - } - - [self cancel]; + OF_UNRECOGNIZED_SELECTOR } - (void)removeObjectForWriting: (id )object { -#ifdef OF_HAVE_THREADS - [_mutex lock]; -#endif - @try { - int qi = QUEUE_REMOVE | QUEUE_WRITE; - - [_queue addObject: object]; - [_queueActions addItem: &qi]; - } @finally { -#ifdef OF_HAVE_THREADS - [_mutex unlock]; -#endif - } - - [self cancel]; -} - -- (void)OF_addObjectForReading: (id)object -{ - OF_UNRECOGNIZED_SELECTOR -} - -- (void)OF_addObjectForWriting: (id)object -{ - OF_UNRECOGNIZED_SELECTOR -} - -- (void)OF_removeObjectForReading: (id)object -{ - OF_UNRECOGNIZED_SELECTOR -} - -- (void)OF_removeObjectForWriting: (id)object -{ - OF_UNRECOGNIZED_SELECTOR -} - -- (void)OF_processQueueAndStoreRemovedIn: (OFMutableArray*)removed -{ -#ifdef OF_HAVE_THREADS - [_mutex lock]; -#endif - @try { - id const *queueObjects = [_queue objects]; - int *queueActionItems = [_queueActions items]; - size_t i, count = [_queue count]; - - for (i = 0; i < count; i++) { - id object = queueObjects[i]; - int action = queueActionItems[i]; - - switch (action) { - case QUEUE_ADD | QUEUE_READ: - [_readObjects addObject: object]; - - [self OF_addObjectForReading: object]; - - break; - case QUEUE_ADD | QUEUE_WRITE: - [_writeObjects addObject: object]; - - [self OF_addObjectForWriting: object]; - - break; - case QUEUE_REMOVE | QUEUE_READ: - [self OF_removeObjectForReading: object]; - - [removed addObject: object]; - [_readObjects removeObjectIdenticalTo: object]; - - break; - case QUEUE_REMOVE | QUEUE_WRITE: - [self OF_removeObjectForWriting: object]; - - [removed addObject: object]; - [_writeObjects removeObjectIdenticalTo: object]; - - break; - default: - assert(0); - } - } - - [_queue removeAllObjects]; - [_queueActions removeAllItems]; - } @finally { -#ifdef OF_HAVE_THREADS - [_mutex unlock]; -#endif - } + OF_UNRECOGNIZED_SELECTOR } - (void)observe { [self observeForTimeInterval: -1]; ADDED src/OFKernelEventObserver_LockedQueue.h Index: src/OFKernelEventObserver_LockedQueue.h ================================================================== --- src/OFKernelEventObserver_LockedQueue.h +++ src/OFKernelEventObserver_LockedQueue.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015 + * Jonathan Schleifer + * + * All rights reserved. + * + * This file is part of ObjFW. It may be distributed under the terms of the + * Q Public License 1.0, which can be found in the file LICENSE.QPL included in + * the packaging of this file. + * + * Alternatively, it may be distributed under the terms of the GNU General + * Public License, either version 2 or 3, which can be found in the file + * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this + * file. + */ + +#import "OFKernelEventObserver.h" + +OF_ASSUME_NONNULL_BEGIN + +@class OFMutableArray OF_GENERIC(ObjectType); +@class OFDataArray; +#ifdef OF_HAVE_THREADS +@class OFMutex; +#endif + +@interface OFKernelEventObserver_LockedQueue: OFKernelEventObserver +{ + OFDataArray *_queueActions, *_queueFDs; + OFMutableArray *_queueObjects; +#ifdef OF_HAVE_THREADS + OFMutex *_mutex; +#endif +} + +- (void)OF_addObjectForReading: (id)object + fileDescriptor: (int)fd; +- (void)OF_addObjectForWriting: (id)object + fileDescriptor: (int)fd; +- (void)OF_removeObjectForReading: (id)object + fileDescriptor: (int)fd; +- (void)OF_removeObjectForWriting: (id)object + fileDescriptor: (int)fd; +- (void)OF_processQueue; +@end + +OF_ASSUME_NONNULL_END ADDED src/OFKernelEventObserver_LockedQueue.m Index: src/OFKernelEventObserver_LockedQueue.m ================================================================== --- src/OFKernelEventObserver_LockedQueue.m +++ src/OFKernelEventObserver_LockedQueue.m @@ -0,0 +1,240 @@ +/* + * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015 + * Jonathan Schleifer + * + * All rights reserved. + * + * This file is part of ObjFW. It may be distributed under the terms of the + * Q Public License 1.0, which can be found in the file LICENSE.QPL included in + * the packaging of this file. + * + * Alternatively, it may be distributed under the terms of the GNU General + * Public License, either version 2 or 3, which can be found in the file + * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this + * file. + */ + +#include "config.h" + +#import "OFKernelEventObserver_LockedQueue.h" +#import "OFArray.h" +#import "OFDataArray.h" +#ifdef OF_HAVE_THREADS +# import "OFMutex.h" +#endif + +#import "OFInitializationFailedException.h" + +enum { + QUEUE_ADD = 0, + QUEUE_REMOVE = 1, + QUEUE_READ = 0, + QUEUE_WRITE = 2 +}; +#define QUEUE_ACTION (QUEUE_ADD | QUEUE_REMOVE) + +@implementation OFKernelEventObserver_LockedQueue +- init +{ + self = [super init]; + + @try { + _queueActions = [[OFDataArray alloc] + initWithItemSize: sizeof(int)]; + _queueFDs = [[OFDataArray alloc] initWithItemSize: sizeof(int)]; + _queueObjects = [[OFMutableArray alloc] init]; + +#ifdef OF_HAVE_THREADS + _mutex = [[OFMutex alloc] init]; +#endif + } @catch (id e) { + @throw [OFInitializationFailedException + exceptionWithClass: [self class]]; + } + + return self; +} + +- (void)dealloc +{ + [_queueActions release]; + [_queueFDs release]; + [_queueObjects release]; +#ifdef OF_HAVE_THREADS + [_mutex release]; +#endif + + [super dealloc]; +} + +- (void)addObjectForReading: (id )object +{ +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + int qi = QUEUE_ADD | QUEUE_READ; + int fd = [object fileDescriptorForReading]; + + [_queueActions addItem: &qi]; + [_queueFDs addItem: &fd]; + [_queueObjects addObject: object]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif + + [self cancel]; +} + +- (void)addObjectForWriting: (id )object +{ +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + int qi = QUEUE_ADD | QUEUE_WRITE; + int fd = [object fileDescriptorForWriting]; + + [_queueActions addItem: &qi]; + [_queueFDs addItem: &fd]; + [_queueObjects addObject: object]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif + + [self cancel]; +} + +- (void)removeObjectForReading: (id )object +{ +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + int qi = QUEUE_REMOVE | QUEUE_READ; + int fd = [object fileDescriptorForReading]; + + [_queueActions addItem: &qi]; + [_queueFDs addItem: &fd]; + [_queueObjects addObject: object]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif + + [self cancel]; +} + +- (void)removeObjectForWriting: (id )object +{ +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + int qi = QUEUE_REMOVE | QUEUE_WRITE; + int fd = [object fileDescriptorForWriting]; + + [_queueActions addItem: &qi]; + [_queueFDs addItem: &fd]; + [_queueObjects addObject: object]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif + + [self cancel]; +} + +- (void)OF_addObjectForReading: (id)object + fileDescriptor: (int)fd +{ + OF_UNRECOGNIZED_SELECTOR +} + +- (void)OF_addObjectForWriting: (id)object + fileDescriptor: (int)fd +{ + OF_UNRECOGNIZED_SELECTOR +} + +- (void)OF_removeObjectForReading: (id)object + fileDescriptor: (int)fd +{ + OF_UNRECOGNIZED_SELECTOR +} + +- (void)OF_removeObjectForWriting: (id)object + fileDescriptor: (int)fd +{ + OF_UNRECOGNIZED_SELECTOR +} + +- (void)OF_processQueue +{ +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + int *queueActions = [_queueActions items]; + int *queueFDs = [_queueFDs items]; + id const *queueObjects = [_queueObjects objects]; + size_t i, count = [_queueActions count]; + + OF_ENSURE([_queueFDs count] == count); + OF_ENSURE([_queueObjects count] == count); + + for (i = 0; i < count; i++) { + int action = queueActions[i]; + int fd = queueFDs[i]; + id object = queueObjects[i]; + + switch (action) { + case QUEUE_ADD | QUEUE_READ: + [_readObjects addObject: object]; + + [self OF_addObjectForReading: object + fileDescriptor: fd]; + + break; + case QUEUE_ADD | QUEUE_WRITE: + [_writeObjects addObject: object]; + + [self OF_addObjectForWriting: object + fileDescriptor: fd]; + + break; + case QUEUE_REMOVE | QUEUE_READ: + [self OF_removeObjectForReading: object + fileDescriptor: fd]; + + [_readObjects removeObjectIdenticalTo: object]; + + break; + case QUEUE_REMOVE | QUEUE_WRITE: + [self OF_removeObjectForWriting: object + fileDescriptor: fd]; + + [_writeObjects removeObjectIdenticalTo: object]; + + break; + default: + OF_ENSURE(0); + } + } + + [_queueActions removeAllItems]; + [_queueFDs removeAllItems]; + [_queueObjects removeAllObjects]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif +} +@end Index: src/OFKernelEventObserver_epoll.m ================================================================== --- src/OFKernelEventObserver_epoll.m +++ src/OFKernelEventObserver_epoll.m @@ -154,32 +154,32 @@ [_FDToEvents setValue: (void*)events forKey: (void*)(intptr_t)fd]; } } -- (void)OF_addObjectForReading: (id)object +- (void)addObjectForReading: (id )object { [self OF_addObject: object fileDescriptor: [object fileDescriptorForReading] events: EPOLLIN]; } -- (void)OF_addObjectForWriting: (id)object +- (void)addObjectForWriting: (id )object { [self OF_addObject: object fileDescriptor: [object fileDescriptorForWriting] events: EPOLLOUT]; } -- (void)OF_removeObjectForReading: (id)object +- (void)removeObjectForReading: (id )object { [self OF_removeObject: object fileDescriptor: [object fileDescriptorForReading] events: EPOLLIN]; } -- (void)OF_removeObjectForWriting: (id)object +- (void)removeObjectForWriting: (id )object { [self OF_removeObject: object fileDescriptor: [object fileDescriptorForWriting] events: EPOLLOUT]; } @@ -189,11 +189,10 @@ OFNull *nullObject = [OFNull null]; void *pool = objc_autoreleasePoolPush(); struct epoll_event eventList[EVENTLIST_SIZE]; int i, events; - [self OF_processQueueAndStoreRemovedIn: nil]; [self OF_processReadBuffers]; objc_autoreleasePoolPop(pool); events = epoll_wait(_epfd, eventList, EVENTLIST_SIZE, Index: src/OFKernelEventObserver_kqueue.h ================================================================== --- src/OFKernelEventObserver_kqueue.h +++ src/OFKernelEventObserver_kqueue.h @@ -22,11 +22,9 @@ @class OFMutableArray OF_GENERIC(ObjectType); @interface OFKernelEventObserver_kqueue: OFKernelEventObserver { int _kernelQueue; - OFDataArray *_changeList; - OFMutableArray *_removedArray; } @end OF_ASSUME_NONNULL_END Index: src/OFKernelEventObserver_kqueue.m ================================================================== --- src/OFKernelEventObserver_kqueue.m +++ src/OFKernelEventObserver_kqueue.m @@ -60,16 +60,15 @@ if ((flags = fcntl(_kernelQueue, F_GETFD, 0)) != -1) fcntl(_kernelQueue, F_SETFD, flags | FD_CLOEXEC); #endif - _changeList = [[OFDataArray alloc] initWithItemSize: - sizeof(struct kevent)]; EV_SET(&event, _cancelFD[0], EVFILT_READ, EV_ADD, 0, 0, 0); - [_changeList addItem: &event]; - _removedArray = [[OFMutableArray alloc] init]; + if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) + @throw [OFInitializationFailedException + exceptionWithClass: [self class]]; } @catch (id e) { [self release]; @throw e; } @@ -78,23 +77,17 @@ - (void)dealloc { close(_kernelQueue); - [_changeList release]; - [_removedArray release]; - [super dealloc]; } -- (void)OF_addObjectForReading: (id)object +- (void)addObjectForReading: (id )object { struct kevent event; - if ([_changeList count] >= INT_MAX) - @throw [OFOutOfRangeException exception]; - memset(&event, 0, sizeof(event)); event.ident = [object fileDescriptorForReading]; event.filter = EVFILT_READ; event.flags = EV_ADD; #ifndef OF_NETBSD @@ -101,20 +94,20 @@ event.udata = object; #else event.udata = (intptr_t)object; #endif - [_changeList addItem: &event]; + if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) + @throw [OFObserveFailedException + exceptionWithObserver: self + errNo: errno]; } -- (void)OF_addObjectForWriting: (id)object +- (void)addObjectForWriting: (id )object { struct kevent event; - if ([_changeList count] >= INT_MAX) - @throw [OFOutOfRangeException exception]; - memset(&event, 0, sizeof(event)); event.ident = [object fileDescriptorForWriting]; event.filter = EVFILT_WRITE; event.flags = EV_ADD; #ifndef OF_NETBSD @@ -121,33 +114,44 @@ event.udata = object; #else event.udata = (intptr_t)object; #endif - [_changeList addItem: &event]; + if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) + @throw [OFObserveFailedException + exceptionWithObserver: self + errNo: errno]; } -- (void)OF_removeObjectForReading: (id)object +- (void)removeObjectForReading: (id )object { struct kevent event; memset(&event, 0, sizeof(event)); event.ident = [object fileDescriptorForReading]; event.filter = EVFILT_READ; event.flags = EV_DELETE; - [_changeList addItem: &event]; + + if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) + @throw [OFObserveFailedException + exceptionWithObserver: self + errNo: errno]; } -- (void)OF_removeObjectForWriting: (id)object +- (void)removeObjectForWriting: (id )object { struct kevent event; memset(&event, 0, sizeof(event)); event.ident = [object fileDescriptorForWriting]; event.filter = EVFILT_WRITE; event.flags = EV_DELETE; - [_changeList addItem: &event]; + + if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) + @throw [OFObserveFailedException + exceptionWithObserver: self + errNo: errno]; } - (void)observeForTimeInterval: (of_time_interval_t)timeInterval { void *pool = objc_autoreleasePoolPush(); @@ -156,31 +160,21 @@ int i, events; timeout.tv_sec = (time_t)timeInterval; timeout.tv_nsec = lrint((timeInterval - timeout.tv_sec) * 1000000000); - /* - * Make sure to keep the streams retained and thus the file descriptors - * valid until the actual change has been performed. - */ - [self OF_processQueueAndStoreRemovedIn: _removedArray]; - [self OF_processReadBuffers]; objc_autoreleasePoolPop(pool); - events = kevent(_kernelQueue, [_changeList items], - (int)[_changeList count], eventList, EVENTLIST_SIZE, + events = kevent(_kernelQueue, NULL, 0, eventList, EVENTLIST_SIZE, (timeInterval != -1 ? &timeout : NULL)); if (events < 0) @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; - [_changeList removeAllItems]; - [_removedArray removeAllObjects]; - for (i = 0; i < events; i++) { if (eventList[i].flags & EV_ERROR) @throw [OFObserveFailedException exceptionWithObserver: self errNo: (int)eventList[i].data]; Index: src/OFKernelEventObserver_poll.h ================================================================== --- src/OFKernelEventObserver_poll.h +++ src/OFKernelEventObserver_poll.h @@ -12,20 +12,20 @@ * Public License, either version 2 or 3, which can be found in the file * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this * file. */ -#import "OFKernelEventObserver.h" +#import "OFKernelEventObserver_LockedQueue.h" OF_ASSUME_NONNULL_BEGIN @class OFDataArray; -@interface OFKernelEventObserver_poll: OFKernelEventObserver +@interface OFKernelEventObserver_poll: OFKernelEventObserver_LockedQueue { OFDataArray *_FDs; size_t _maxFD; id __unsafe_unretained *_FDToObject; } @end OF_ASSUME_NONNULL_END Index: src/OFKernelEventObserver_poll.m ================================================================== --- src/OFKernelEventObserver_poll.m +++ src/OFKernelEventObserver_poll.m @@ -124,34 +124,38 @@ } } } - (void)OF_addObjectForReading: (id)object + fileDescriptor: (int)fd { [self OF_addObject: object - fileDescriptor: [object fileDescriptorForReading] + fileDescriptor: fd events: POLLIN]; } - (void)OF_addObjectForWriting: (id)object + fileDescriptor: (int)fd { [self OF_addObject: object - fileDescriptor: [object fileDescriptorForWriting] + fileDescriptor: fd events: POLLOUT]; } - (void)OF_removeObjectForReading: (id)object + fileDescriptor: (int)fd { [self OF_removeObject: object - fileDescriptor: [object fileDescriptorForReading] + fileDescriptor: fd events: POLLIN]; } - (void)OF_removeObjectForWriting: (id)object + fileDescriptor: (int)fd { [self OF_removeObject: object - fileDescriptor: [object fileDescriptorForWriting] + fileDescriptor: fd events: POLLOUT]; } - (void)observeForTimeInterval: (of_time_interval_t)timeInterval { @@ -158,11 +162,11 @@ void *pool = objc_autoreleasePoolPush(); struct pollfd *FDs; int events; size_t i, nFDs; - [self OF_processQueueAndStoreRemovedIn: nil]; + [self OF_processQueue]; [self OF_processReadBuffers]; objc_autoreleasePoolPop(pool); FDs = [_FDs items]; Index: src/OFKernelEventObserver_select.h ================================================================== --- src/OFKernelEventObserver_select.h +++ src/OFKernelEventObserver_select.h @@ -25,17 +25,17 @@ #ifdef HAVE_SYS_SELECT_H # include #endif -#import "OFKernelEventObserver.h" +#import "OFKernelEventObserver_LockedQueue.h" OF_ASSUME_NONNULL_BEGIN -@interface OFKernelEventObserver_select: OFKernelEventObserver +@interface OFKernelEventObserver_select: OFKernelEventObserver_LockedQueue { fd_set _readFDs, _writeFDs; int _maxFD; } @end OF_ASSUME_NONNULL_END Index: src/OFKernelEventObserver_select.m ================================================================== --- src/OFKernelEventObserver_select.m +++ src/OFKernelEventObserver_select.m @@ -63,13 +63,12 @@ return self; } - (void)OF_addObjectForReading: (id)object + fileDescriptor: (int)fd { - int fd = [object fileDescriptorForReading]; - if (fd < 0 || fd > INT_MAX - 1) @throw [OFOutOfRangeException exception]; #ifndef OF_WINDOWS if (fd >= FD_SETSIZE) @@ -81,13 +80,12 @@ FD_SET(fd, &_readFDs); } - (void)OF_addObjectForWriting: (id)object + fileDescriptor: (int)fd { - int fd = [object fileDescriptorForWriting]; - if (fd < 0 || fd > INT_MAX - 1) @throw [OFOutOfRangeException exception]; #ifndef OF_WINDOWS if (fd >= FD_SETSIZE) @@ -99,15 +97,14 @@ FD_SET(fd, &_writeFDs); } - (void)OF_removeObjectForReading: (id)object + fileDescriptor: (int)fd { /* TODO: Adjust _maxFD */ - int fd = [object fileDescriptorForReading]; - if (fd < 0) @throw [OFOutOfRangeException exception]; #ifndef OF_WINDOWS if (fd >= FD_SETSIZE) @@ -116,15 +113,14 @@ FD_CLR(fd, &_readFDs); } - (void)OF_removeObjectForWriting: (id)object + fileDescriptor: (int)fd { /* TODO: Adjust _maxFD */ - int fd = [object fileDescriptorForWriting]; - if (fd < 0) @throw [OFOutOfRangeException exception]; #ifndef OF_WINDOWS if (fd >= FD_SETSIZE) @@ -142,11 +138,11 @@ fd_set writeFDs; struct timeval timeout; int events; size_t i, count; - [self OF_processQueueAndStoreRemovedIn: nil]; + [self OF_processQueue]; [self OF_processReadBuffers]; objc_autoreleasePoolPop(pool); #ifdef FD_COPY