@@ -19,10 +19,11 @@ #include "config.h" #import "OFKernelEventObserver.h" #import "OFKernelEventObserver+Private.h" #import "OFArray.h" +#import "OFDataArray.h" #import "OFStream.h" #import "OFStream+Private.h" #ifndef OF_HAVE_PIPE # import "OFStreamSocket.h" #endif @@ -53,10 +54,17 @@ #ifdef __wii__ /* FIXME: Add a port registry for Wii */ static uint16_t freePort = 65535; #endif + +enum { + QUEUE_ADD = 0, + QUEUE_REMOVE = 1, + QUEUE_READ = 0, + QUEUE_WRITE = 2 +}; @implementation OFKernelEventObserver + (void)initialize { if (self != [OFKernelEventObserver class]) @@ -138,10 +146,14 @@ #endif #ifdef OF_HAVE_THREADS _mutex = [[OFMutex alloc] init]; #endif + + _queueActions = [[OFDataArray alloc] + initWithItemSize: sizeof(int)]; + _queueObjects = [[OFMutableArray alloc] init]; } @catch (id e) { [self release]; @throw e; } @@ -159,10 +171,13 @@ #ifdef OF_HAVE_THREADS [_mutex release]; #endif + [_queueActions release]; + [_queueObjects release]; + [super dealloc]; } - (id )delegate { @@ -174,56 +189,175 @@ _delegate = delegate; } - (void)addObjectForReading: (id )object { - OF_UNRECOGNIZED_SELECTOR +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + int action = QUEUE_ADD | QUEUE_READ; + + [_queueActions addItem: &action]; + [_queueObjects addObject: object]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif + + [self cancel]; } - (void)addObjectForWriting: (id )object { - OF_UNRECOGNIZED_SELECTOR +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + int action = QUEUE_ADD | QUEUE_WRITE; + + [_queueActions addItem: &action]; + [_queueObjects addObject: object]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif + + [self cancel]; } - (void)removeObjectForReading: (id )object { - OF_UNRECOGNIZED_SELECTOR +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + int action = QUEUE_REMOVE | QUEUE_READ; + + [_queueActions addItem: &action]; + [_queueObjects addObject: object]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif + + [self cancel]; } - (void)removeObjectForWriting: (id )object { - OF_UNRECOGNIZED_SELECTOR -} - -- (void)observe -{ - [self observeForTimeInterval: -1]; -} - -- (void)observeForTimeInterval: (of_time_interval_t)timeInterval -{ - OF_UNRECOGNIZED_SELECTOR -} - -- (void)observeUntilDate: (OFDate*)date -{ - [self observeForTimeInterval: [date timeIntervalSinceNow]]; -} - -- (void)cancel -{ -#ifdef OF_HAVE_PIPE - OF_ENSURE(write(_cancelFD[1], "", 1) > 0); -#else -# ifndef __wii__ - OF_ENSURE(sendto(_cancelFD[1], "", 1, 0, - (struct sockaddr*)&_cancelAddr, sizeof(_cancelAddr)) > 0); -# else - OF_ENSURE(sendto(_cancelFD[1], "", 1, 0, - (struct sockaddr*)&_cancelAddr, 8) > 0); -# endif -#endif +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + int action = QUEUE_REMOVE | QUEUE_WRITE; + + [_queueActions addItem: &action]; + [_queueObjects addObject: object]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif + + [self cancel]; +} + +- (void)OF_addObjectForReading: (id )object +{ + OF_UNRECOGNIZED_SELECTOR +} + +- (void)OF_addObjectForWriting: (id )object +{ + OF_UNRECOGNIZED_SELECTOR +} + +- (void)OF_removeObjectForReading: (id )object +{ + OF_UNRECOGNIZED_SELECTOR +} + +- (void)OF_removeObjectForWriting: (id )object +{ + OF_UNRECOGNIZED_SELECTOR +} + +- (void)OF_processQueue +{ + void *pool = objc_autoreleasePoolPush(); + +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + int *queueActions = [_queueActions items]; + id const *queueObjects = [_queueObjects objects]; + size_t i, count = [_queueActions count]; + + OF_ENSURE([_queueObjects count] == count); + + for (i = 0; i < count; i++) { + int action = queueActions[i]; + id object = queueObjects[i]; + + switch (action) { + case QUEUE_ADD | QUEUE_READ: + [_readObjects addObject: object]; + + @try { + [self OF_addObjectForReading: object]; + } @catch (id e) { + [_readObjects + removeObjectIdenticalTo: object]; + + @throw e; + } + + break; + case QUEUE_ADD | QUEUE_WRITE: + [_writeObjects addObject: object]; + + @try { + [self OF_addObjectForWriting: object]; + } @catch (id e) { + [_writeObjects + removeObjectIdenticalTo: object]; + + @throw e; + } + + break; + case QUEUE_REMOVE | QUEUE_READ: + [self OF_removeObjectForReading: object]; + + [_readObjects removeObjectIdenticalTo: object]; + + break; + case QUEUE_REMOVE | QUEUE_WRITE: + [self OF_removeObjectForWriting: object]; + + [_writeObjects removeObjectIdenticalTo: object]; + + break; + default: + OF_ENSURE(0); + } + } + + [_queueActions removeAllItems]; + [_queueObjects removeAllObjects]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif + + objc_autoreleasePoolPop(pool); } - (bool)OF_processReadBuffers { id const *objects = [_readObjects objects]; @@ -250,6 +384,36 @@ * As long as we have data in the read buffer for any stream, we don't * want to block. */ return foundInReadBuffer; } + +- (void)observe +{ + [self observeForTimeInterval: -1]; +} + +- (void)observeForTimeInterval: (of_time_interval_t)timeInterval +{ + OF_UNRECOGNIZED_SELECTOR +} + +- (void)observeUntilDate: (OFDate*)date +{ + [self observeForTimeInterval: [date timeIntervalSinceNow]]; +} + +- (void)cancel +{ +#ifdef OF_HAVE_PIPE + OF_ENSURE(write(_cancelFD[1], "", 1) > 0); +#else +# ifndef OF_WII + OF_ENSURE(sendto(_cancelFD[1], "", 1, 0, + (struct sockaddr*)&_cancelAddr, sizeof(_cancelAddr)) > 0); +# else + OF_ENSURE(sendto(_cancelFD[1], "", 1, 0, + (struct sockaddr*)&_cancelAddr, 8) > 0); +# endif +#endif +} @end