Index: src/OFKernelEventObserver.h ================================================================== --- src/OFKernelEventObserver.h +++ src/OFKernelEventObserver.h @@ -20,10 +20,13 @@ OF_ASSUME_NONNULL_BEGIN @class OFMutableArray OF_GENERIC(ObjectType); @class OFDate; +#ifdef OF_HAVE_THREADS +@class OFMutex; +#endif /*! * @protocol OFKernelEventObserverDelegate * OFKernelEventObserver.h ObjFW/OFKernelEventObserver.h * @@ -111,10 +114,13 @@ #ifdef OF_HAVE_PIPE int _cancelFD[2]; #else of_socket_t _cancelFD[2]; struct sockaddr_in _cancelAddr; +#endif +#ifdef OF_HAVE_THREADS + OFMutex *_mutex; #endif } /*! * The delegate for the OFKernelEventObserver. Index: src/OFKernelEventObserver.m ================================================================== --- src/OFKernelEventObserver.m +++ src/OFKernelEventObserver.m @@ -25,10 +25,13 @@ #import "OFStream+Private.h" #ifndef OF_HAVE_PIPE # import "OFStreamSocket.h" #endif #import "OFDate.h" +#ifdef OF_HAVE_THREADS +# import "OFMutex.h" +#endif #ifdef HAVE_KQUEUE # import "OFKernelEventObserver_kqueue.h" #endif #ifdef HAVE_EPOLL @@ -128,10 +131,14 @@ &cancelAddrLen) != 0) @throw [OFInitializationFailedException exceptionWithClass: [self class]]; # endif #endif + +#ifdef OF_HAVE_THREADS + _mutex = [[OFMutex alloc] init]; +#endif } @catch (id e) { [self release]; @throw e; } @@ -148,10 +155,14 @@ of_socket_port_free(_cancelAddr.sin_port, SOCK_DGRAM); #endif [_readObjects release]; [_writeObjects release]; + +#ifdef OF_HAVE_THREADS + [_mutex release]; +#endif [super dealloc]; } - (void)addObjectForReading: (id )object Index: src/OFKernelEventObserver_LockedQueue.h ================================================================== --- src/OFKernelEventObserver_LockedQueue.h +++ src/OFKernelEventObserver_LockedQueue.h @@ -18,21 +18,15 @@ OF_ASSUME_NONNULL_BEGIN @class OFMutableArray OF_GENERIC(ObjectType); @class OFDataArray; -#ifdef OF_HAVE_THREADS -@class OFMutex; -#endif @interface OFKernelEventObserver_LockedQueue: OFKernelEventObserver { OFDataArray *_queueActions, *_queueFDs; OFMutableArray *_queueObjects; -#ifdef OF_HAVE_THREADS - OFMutex *_mutex; -#endif } - (void)OF_addObjectForReading: (id)object fileDescriptor: (int)fd; - (void)OF_addObjectForWriting: (id)object Index: src/OFKernelEventObserver_LockedQueue.m ================================================================== --- src/OFKernelEventObserver_LockedQueue.m +++ src/OFKernelEventObserver_LockedQueue.m @@ -41,14 +41,10 @@ @try { _queueActions = [[OFDataArray alloc] initWithItemSize: sizeof(int)]; _queueFDs = [[OFDataArray alloc] initWithItemSize: sizeof(int)]; _queueObjects = [[OFMutableArray alloc] init]; - -#ifdef OF_HAVE_THREADS - _mutex = [[OFMutex alloc] init]; -#endif } @catch (id e) { @throw [OFInitializationFailedException exceptionWithClass: [self class]]; } @@ -58,13 +54,10 @@ - (void)dealloc { [_queueActions release]; [_queueFDs release]; [_queueObjects release]; -#ifdef OF_HAVE_THREADS - [_mutex release]; -#endif [super dealloc]; } - (void)addObjectForReading: (id )object Index: src/OFKernelEventObserver_epoll.m ================================================================== --- src/OFKernelEventObserver_epoll.m +++ src/OFKernelEventObserver_epoll.m @@ -25,12 +25,16 @@ #include #import "OFKernelEventObserver.h" #import "OFKernelEventObserver+Private.h" #import "OFKernelEventObserver_epoll.h" +#import "OFArray.h" #import "OFMapTable.h" #import "OFNull.h" +#ifdef OF_HAVE_THREADS +# import "OFMutex.h" +#endif #import "OFInitializationFailedException.h" #import "OFObserveFailedException.h" #define EVENTLIST_SIZE 64 @@ -86,104 +90,122 @@ } - (void)OF_addObject: (id)object fileDescriptor: (int)fd events: (int)addEvents -{ - intptr_t events; - - events = (intptr_t)[_FDToEvents valueForKey: (void*)(intptr_t)fd]; - if (events == 0) { - struct epoll_event event; - - memset(&event, 0, sizeof(event)); - event.events = addEvents; - event.data.ptr = object; - - if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &event) == -1) - @throw [OFObserveFailedException - exceptionWithObserver: self - errNo: errno]; - - [_FDToEvents setValue: (void*)(intptr_t)addEvents - forKey: (void*)(intptr_t)fd]; - } else { - struct epoll_event event; + objectsArray: (OFMutableArray*)objectsArray +{ +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + struct epoll_event event; + intptr_t events; + + events = (intptr_t)[_FDToEvents + valueForKey: (void*)(intptr_t)fd]; memset(&event, 0, sizeof(event)); event.events = (int)events | addEvents; event.data.ptr = object; - if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &event) == -1) + [objectsArray addObject: object]; + + if (epoll_ctl(_epfd, + (events == 0 ? EPOLL_CTL_ADD : EPOLL_CTL_MOD), + fd, &event) == -1) { + [objectsArray removeObjectIdenticalTo: object]; @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; + } [_FDToEvents setValue: (void*)(events | addEvents) forKey: (void*)(intptr_t)fd]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; } +#endif } - (void)OF_removeObject: (id)object fileDescriptor: (int)fd events: (int)removeEvents -{ - intptr_t events; - - events = (intptr_t)[_FDToEvents valueForKey: (void*)(intptr_t)fd]; - events &= ~removeEvents; - - if (events == 0) { - if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) == -1) - @throw [OFObserveFailedException - exceptionWithObserver: self - errNo: errno]; - - [_FDToEvents removeValueForKey: (void*)(intptr_t)fd]; - } else { - struct epoll_event event; - - memset(&event, 0, sizeof(event)); - event.events = (int)events; - event.data.ptr = object; - - if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &event) == -1) - @throw [OFObserveFailedException - exceptionWithObserver: self - errNo: errno]; - - [_FDToEvents setValue: (void*)events - forKey: (void*)(intptr_t)fd]; - } + objectsArray: (OFMutableArray*)objectsArray +{ +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + intptr_t events; + + events = (intptr_t)[_FDToEvents + valueForKey: (void*)(intptr_t)fd]; + events &= ~removeEvents; + + if (events == 0) { + if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) == -1) + @throw [OFObserveFailedException + exceptionWithObserver: self + errNo: errno]; + + [_FDToEvents removeValueForKey: (void*)(intptr_t)fd]; + } else { + struct epoll_event event; + + memset(&event, 0, sizeof(event)); + event.events = (int)events; + event.data.ptr = object; + + if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &event) == -1) + @throw [OFObserveFailedException + exceptionWithObserver: self + errNo: errno]; + + [_FDToEvents setValue: (void*)events + forKey: (void*)(intptr_t)fd]; + } + + [objectsArray removeObjectIdenticalTo: object]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif } - (void)addObjectForReading: (id )object { [self OF_addObject: object fileDescriptor: [object fileDescriptorForReading] - events: EPOLLIN]; + events: EPOLLIN + objectsArray: _readObjects]; } - (void)addObjectForWriting: (id )object { [self OF_addObject: object fileDescriptor: [object fileDescriptorForWriting] - events: EPOLLOUT]; + events: EPOLLOUT + objectsArray: _writeObjects]; } - (void)removeObjectForReading: (id )object { [self OF_removeObject: object fileDescriptor: [object fileDescriptorForReading] - events: EPOLLIN]; + events: EPOLLIN + objectsArray: _readObjects]; } - (void)removeObjectForWriting: (id )object { [self OF_removeObject: object fileDescriptor: [object fileDescriptorForWriting] - events: EPOLLOUT]; + events: EPOLLOUT + objectsArray: _writeObjects]; } - (void)observeForTimeInterval: (of_time_interval_t)timeInterval { OFNull *nullObject = [OFNull null]; Index: src/OFKernelEventObserver_kqueue.m ================================================================== --- src/OFKernelEventObserver_kqueue.m +++ src/OFKernelEventObserver_kqueue.m @@ -30,10 +30,13 @@ #import "OFKernelEventObserver.h" #import "OFKernelEventObserver+Private.h" #import "OFKernelEventObserver_kqueue.h" #import "OFDataArray.h" #import "OFArray.h" +#ifdef OF_HAVE_THREADS +# import "OFMutex.h" +#endif #import "OFInitializationFailedException.h" #import "OFObserveFailedException.h" #import "OFOutOfRangeException.h" @@ -94,14 +97,27 @@ event.udata = object; #else event.udata = (intptr_t)object; #endif - if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) - @throw [OFObserveFailedException - exceptionWithObserver: self - errNo: errno]; +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + [_readObjects addObject: object]; + + if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) { + [_readObjects removeObjectIdenticalTo: object]; + @throw [OFObserveFailedException + exceptionWithObserver: self + errNo: errno]; + } +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif } - (void)addObjectForWriting: (id )object { struct kevent event; @@ -114,14 +130,27 @@ event.udata = object; #else event.udata = (intptr_t)object; #endif - if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) - @throw [OFObserveFailedException - exceptionWithObserver: self - errNo: errno]; +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + [_writeObjects addObject: object]; + + if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) { + [_writeObjects removeObjectIdenticalTo: object]; + @throw [OFObserveFailedException + exceptionWithObserver: self + errNo: errno]; + } +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif } - (void)removeObjectForReading: (id )object { struct kevent event; @@ -129,14 +158,25 @@ memset(&event, 0, sizeof(event)); event.ident = [object fileDescriptorForReading]; event.filter = EVFILT_READ; event.flags = EV_DELETE; - if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) - @throw [OFObserveFailedException - exceptionWithObserver: self - errNo: errno]; +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) + @throw [OFObserveFailedException + exceptionWithObserver: self + errNo: errno]; + + [_readObjects removeObjectIdenticalTo: object]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif } - (void)removeObjectForWriting: (id )object { struct kevent event; @@ -144,14 +184,25 @@ memset(&event, 0, sizeof(event)); event.ident = [object fileDescriptorForWriting]; event.filter = EVFILT_WRITE; event.flags = EV_DELETE; - if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) - @throw [OFObserveFailedException - exceptionWithObserver: self - errNo: errno]; +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) + @throw [OFObserveFailedException + exceptionWithObserver: self + errNo: errno]; + + [_writeObjects removeObjectIdenticalTo: object]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif } - (void)observeForTimeInterval: (of_time_interval_t)timeInterval { void *pool = objc_autoreleasePoolPush();