@@ -19,10 +19,11 @@ #include "config.h" #import "OFKernelEventObserver.h" #import "OFKernelEventObserver+Private.h" #import "OFArray.h" +#import "OFDataArray.h" #import "OFStream.h" #import "OFStream+Private.h" #ifndef OF_HAVE_PIPE # import "OFStreamSocket.h" #endif @@ -49,10 +50,18 @@ #import "OFOutOfRangeException.h" #import "socket.h" #import "socket_helpers.h" +enum { + QUEUE_ADD = 0, + QUEUE_REMOVE = 1, + QUEUE_READ = 0, + QUEUE_WRITE = 2 +}; +#define QUEUE_ACTION (QUEUE_ADD | QUEUE_REMOVE) + @implementation OFKernelEventObserver @synthesize delegate = _delegate; + (void)initialize { @@ -135,10 +144,14 @@ #endif #ifdef OF_HAVE_THREADS _mutex = [[OFMutex alloc] init]; #endif + + _queueActions = [[OFDataArray alloc] + initWithItemSize: sizeof(int)]; + _queueObjects = [[OFMutableArray alloc] init]; } @catch (id e) { [self release]; @throw e; } @@ -160,32 +173,211 @@ #ifdef OF_HAVE_THREADS [_mutex release]; #endif + [_queueActions release]; + [_queueObjects release]; + [super dealloc]; } - (void)addObjectForReading: (id )object { - OF_UNRECOGNIZED_SELECTOR +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + int action = QUEUE_ADD | QUEUE_READ; + + [_queueActions addItem: &action]; + [_queueObjects addObject: object]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif + + [self cancel]; } - (void)addObjectForWriting: (id )object { - OF_UNRECOGNIZED_SELECTOR +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + int action = QUEUE_ADD | QUEUE_WRITE; + + [_queueActions addItem: &action]; + [_queueObjects addObject: object]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif + + [self cancel]; } - (void)removeObjectForReading: (id )object { - OF_UNRECOGNIZED_SELECTOR +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + int action = QUEUE_REMOVE | QUEUE_READ; + + [_queueActions addItem: &action]; + [_queueObjects addObject: object]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif + + [self cancel]; } - (void)removeObjectForWriting: (id )object { +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + int action = QUEUE_REMOVE | QUEUE_WRITE; + + [_queueActions addItem: &action]; + [_queueObjects addObject: object]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif + + [self cancel]; +} + +- (void)OF_addObjectForReading: (id )object +{ + OF_UNRECOGNIZED_SELECTOR +} + +- (void)OF_addObjectForWriting: (id )object +{ + OF_UNRECOGNIZED_SELECTOR +} + +- (void)OF_removeObjectForReading: (id )object +{ + OF_UNRECOGNIZED_SELECTOR +} + +- (void)OF_removeObjectForWriting: (id )object +{ OF_UNRECOGNIZED_SELECTOR } + +- (void)OF_processQueue +{ + void *pool = objc_autoreleasePoolPush(); + +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + int *queueActions = [_queueActions items]; + id const *queueObjects = [_queueObjects objects]; + size_t count = [_queueActions count]; + + OF_ENSURE([_queueObjects count] == count); + + for (size_t i = 0; i < count; i++) { + int action = queueActions[i]; + id object = queueObjects[i]; + + switch (action) { + case QUEUE_ADD | QUEUE_READ: + [_readObjects addObject: object]; + + @try { + [self OF_addObjectForReading: object]; + } @catch (id e) { + [_readObjects + removeObjectIdenticalTo: object]; + + @throw e; + } + + break; + case QUEUE_ADD | QUEUE_WRITE: + [_writeObjects addObject: object]; + + @try { + [self OF_addObjectForWriting: object]; + } @catch (id e) { + [_writeObjects + removeObjectIdenticalTo: object]; + + @throw e; + } + + break; + case QUEUE_REMOVE | QUEUE_READ: + [self OF_removeObjectForReading: object]; + + [_readObjects removeObjectIdenticalTo: object]; + + break; + case QUEUE_REMOVE | QUEUE_WRITE: + [self OF_removeObjectForWriting: object]; + + [_writeObjects removeObjectIdenticalTo: object]; + + break; + default: + OF_ENSURE(0); + } + } + + [_queueActions removeAllItems]; + [_queueObjects removeAllObjects]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif + + objc_autoreleasePoolPop(pool); +} + +- (bool)OF_processReadBuffers +{ + bool foundInReadBuffer = false; + + for (id object in _readObjects) { + void *pool = objc_autoreleasePoolPush(); + + if ([object isKindOfClass: [OFStream class]] && + [object hasDataInReadBuffer] && + ![object OF_isWaitingForDelimiter]) { + if ([_delegate respondsToSelector: + @selector(objectIsReadyForReading:)]) + [_delegate objectIsReadyForReading: object]; + + foundInReadBuffer = true; + } + + objc_autoreleasePoolPop(pool); + } + + /* + * As long as we have data in the read buffer for any stream, we don't + * want to block. + */ + return foundInReadBuffer; +} - (void)observe { [self observeForTimeInterval: -1]; } @@ -212,33 +404,6 @@ OF_ENSURE(sendto(_cancelFD[1], "", 1, 0, (struct sockaddr*)&_cancelAddr, 8) > 0); # endif #endif } - -- (bool)OF_processReadBuffers -{ - bool foundInReadBuffer = false; - - for (id object in _readObjects) { - void *pool = objc_autoreleasePoolPush(); - - if ([object isKindOfClass: [OFStream class]] && - [object hasDataInReadBuffer] && - ![object OF_isWaitingForDelimiter]) { - if ([_delegate respondsToSelector: - @selector(objectIsReadyForReading:)]) - [_delegate objectIsReadyForReading: object]; - - foundInReadBuffer = true; - } - - objc_autoreleasePoolPop(pool); - } - - /* - * As long as we have data in the read buffer for any stream, we don't - * want to block. - */ - return foundInReadBuffer; -} @end