@@ -1,8 +1,8 @@ /* - * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015 - * Jonathan Schleifer + * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 + * Jonathan Schleifer * * All rights reserved. * * This file is part of ObjFW. It may be distributed under the terms of the * Q Public License 1.0, which can be found in the file LICENSE.QPL included in @@ -16,26 +16,23 @@ #define __NO_EXT_QNX #include "config.h" -#include - #import "OFKernelEventObserver.h" #import "OFKernelEventObserver+Private.h" #import "OFArray.h" -#import "OFDictionary.h" +#import "OFDataArray.h" #import "OFStream.h" #import "OFStream+Private.h" -#import "OFDataArray.h" #ifndef OF_HAVE_PIPE # import "OFStreamSocket.h" #endif +#import "OFDate.h" #ifdef OF_HAVE_THREADS # import "OFMutex.h" #endif -#import "OFDate.h" #ifdef HAVE_KQUEUE # import "OFKernelEventObserver_kqueue.h" #endif #ifdef HAVE_EPOLL @@ -53,17 +50,21 @@ #import "OFOutOfRangeException.h" #import "socket.h" #import "socket_helpers.h" +#ifdef __wii__ +/* FIXME: Add a port registry for Wii */ +static uint16_t freePort = 65535; +#endif + enum { QUEUE_ADD = 0, QUEUE_REMOVE = 1, QUEUE_READ = 0, QUEUE_WRITE = 2 }; -#define QUEUE_ACTION (QUEUE_ADD | QUEUE_REMOVE) @implementation OFKernelEventObserver + (void)initialize { if (self != [OFKernelEventObserver class]) @@ -106,13 +107,10 @@ socklen_t cancelAddrLen; #endif _readObjects = [[OFMutableArray alloc] init]; _writeObjects = [[OFMutableArray alloc] init]; - _queue = [[OFMutableArray alloc] init]; - _queueActions = [[OFDataArray alloc] - initWithItemSize: sizeof(int)]; #ifdef OF_HAVE_PIPE if (pipe(_cancelFD)) @throw [OFInitializationFailedException exceptionWithClass: [self class]]; @@ -126,12 +124,13 @@ _cancelAddr.sin_family = AF_INET; _cancelAddr.sin_port = 0; _cancelAddr.sin_addr.s_addr = inet_addr("127.0.0.1"); # ifdef __wii__ + _cancelAddr.sin_len = 8; /* The Wii does not accept port 0 as "choose any free port" */ - _cancelAddr.sin_port = 65535; + _cancelAddr.sin_port = freePort--; # endif if (bind(_cancelFD[0], (struct sockaddr*)&_cancelAddr, sizeof(_cancelAddr))) @throw [OFInitializationFailedException @@ -147,10 +146,14 @@ #endif #ifdef OF_HAVE_THREADS _mutex = [[OFMutex alloc] init]; #endif + + _queueActions = [[OFDataArray alloc] + initWithItemSize: sizeof(int)]; + _queueObjects = [[OFMutableArray alloc] init]; } @catch (id e) { [self release]; @throw e; } @@ -163,15 +166,17 @@ if (_cancelFD[1] != _cancelFD[0]) close(_cancelFD[1]); [_readObjects release]; [_writeObjects release]; - [_queue release]; - [_queueActions release]; + #ifdef OF_HAVE_THREADS [_mutex release]; #endif + + [_queueActions release]; + [_queueObjects release]; [super dealloc]; } - (id )delegate @@ -186,155 +191,202 @@ - (void)addObjectForReading: (id )object { #ifdef OF_HAVE_THREADS [_mutex lock]; -#endif @try { - int qi = QUEUE_ADD | QUEUE_READ; +#endif + int action = QUEUE_ADD | QUEUE_READ; - [_queue addObject: object]; - [_queueActions addItem: &qi]; + [_queueActions addItem: &action]; + [_queueObjects addObject: object]; +#ifdef OF_HAVE_THREADS } @finally { -#ifdef OF_HAVE_THREADS [_mutex unlock]; -#endif } +#endif [self cancel]; } - (void)addObjectForWriting: (id )object { #ifdef OF_HAVE_THREADS [_mutex lock]; -#endif @try { - int qi = QUEUE_ADD | QUEUE_WRITE; +#endif + int action = QUEUE_ADD | QUEUE_WRITE; - [_queue addObject: object]; - [_queueActions addItem: &qi]; + [_queueActions addItem: &action]; + [_queueObjects addObject: object]; +#ifdef OF_HAVE_THREADS } @finally { -#ifdef OF_HAVE_THREADS [_mutex unlock]; -#endif } +#endif [self cancel]; } - (void)removeObjectForReading: (id )object { #ifdef OF_HAVE_THREADS [_mutex lock]; -#endif @try { - int qi = QUEUE_REMOVE | QUEUE_READ; +#endif + int action = QUEUE_REMOVE | QUEUE_READ; - [_queue addObject: object]; - [_queueActions addItem: &qi]; + [_queueActions addItem: &action]; + [_queueObjects addObject: object]; +#ifdef OF_HAVE_THREADS } @finally { -#ifdef OF_HAVE_THREADS [_mutex unlock]; -#endif } +#endif [self cancel]; } - (void)removeObjectForWriting: (id )object { #ifdef OF_HAVE_THREADS [_mutex lock]; -#endif - @try { - int qi = QUEUE_REMOVE | QUEUE_WRITE; - - [_queue addObject: object]; - [_queueActions addItem: &qi]; - } @finally { -#ifdef OF_HAVE_THREADS - [_mutex unlock]; -#endif - } - - [self cancel]; -} - -- (void)OF_addObjectForReading: (id)object -{ - OF_UNRECOGNIZED_SELECTOR -} - -- (void)OF_addObjectForWriting: (id)object -{ - OF_UNRECOGNIZED_SELECTOR -} - -- (void)OF_removeObjectForReading: (id)object -{ - OF_UNRECOGNIZED_SELECTOR -} - -- (void)OF_removeObjectForWriting: (id)object -{ - OF_UNRECOGNIZED_SELECTOR -} - -- (void)OF_processQueueAndStoreRemovedIn: (OFMutableArray*)removed -{ -#ifdef OF_HAVE_THREADS - [_mutex lock]; -#endif - @try { - id const *queueObjects = [_queue objects]; - int *queueActionItems = [_queueActions items]; - size_t i, count = [_queue count]; - - for (i = 0; i < count; i++) { - id object = queueObjects[i]; - int action = queueActionItems[i]; + @try { +#endif + int action = QUEUE_REMOVE | QUEUE_WRITE; + + [_queueActions addItem: &action]; + [_queueObjects addObject: object]; +#ifdef OF_HAVE_THREADS + } @finally { + [_mutex unlock]; + } +#endif + + [self cancel]; +} + +- (void)OF_addObjectForReading: (id )object +{ + OF_UNRECOGNIZED_SELECTOR +} + +- (void)OF_addObjectForWriting: (id )object +{ + OF_UNRECOGNIZED_SELECTOR +} + +- (void)OF_removeObjectForReading: (id )object +{ + OF_UNRECOGNIZED_SELECTOR +} + +- (void)OF_removeObjectForWriting: (id )object +{ + OF_UNRECOGNIZED_SELECTOR +} + +- (void)OF_processQueue +{ + void *pool = objc_autoreleasePoolPush(); + +#ifdef OF_HAVE_THREADS + [_mutex lock]; + @try { +#endif + int *queueActions = [_queueActions items]; + id const *queueObjects = [_queueObjects objects]; + size_t i, count = [_queueActions count]; + + OF_ENSURE([_queueObjects count] == count); + + for (i = 0; i < count; i++) { + int action = queueActions[i]; + id object = queueObjects[i]; switch (action) { case QUEUE_ADD | QUEUE_READ: [_readObjects addObject: object]; - [self OF_addObjectForReading: object]; + @try { + [self OF_addObjectForReading: object]; + } @catch (id e) { + [_readObjects + removeObjectIdenticalTo: object]; + + @throw e; + } break; case QUEUE_ADD | QUEUE_WRITE: [_writeObjects addObject: object]; - [self OF_addObjectForWriting: object]; + @try { + [self OF_addObjectForWriting: object]; + } @catch (id e) { + [_writeObjects + removeObjectIdenticalTo: object]; + + @throw e; + } break; case QUEUE_REMOVE | QUEUE_READ: [self OF_removeObjectForReading: object]; - [removed addObject: object]; [_readObjects removeObjectIdenticalTo: object]; break; case QUEUE_REMOVE | QUEUE_WRITE: [self OF_removeObjectForWriting: object]; - [removed addObject: object]; [_writeObjects removeObjectIdenticalTo: object]; break; default: - assert(0); + OF_ENSURE(0); } } - [_queue removeAllObjects]; [_queueActions removeAllItems]; - } @finally { + [_queueObjects removeAllObjects]; #ifdef OF_HAVE_THREADS + } @finally { [_mutex unlock]; + } #endif + + objc_autoreleasePoolPop(pool); +} + +- (bool)OF_processReadBuffers +{ + id const *objects = [_readObjects objects]; + size_t i, count = [_readObjects count]; + bool foundInReadBuffer = false; + + for (i = 0; i < count; i++) { + void *pool = objc_autoreleasePoolPush(); + + if ([objects[i] isKindOfClass: [OFStream class]] && + [objects[i] hasDataInReadBuffer] && + ![objects[i] OF_isWaitingForDelimiter]) { + if ([_delegate respondsToSelector: + @selector(objectIsReadyForReading:)]) + [_delegate objectIsReadyForReading: objects[i]]; + + foundInReadBuffer = true; + } + + objc_autoreleasePoolPop(pool); } + + /* + * As long as we have data in the read buffer for any stream, we don't + * want to block. + */ + return foundInReadBuffer; } - (void)observe { [self observeForTimeInterval: -1]; @@ -353,29 +405,15 @@ - (void)cancel { #ifdef OF_HAVE_PIPE OF_ENSURE(write(_cancelFD[1], "", 1) > 0); #else - OF_ENSURE(sendto(_cancelFD[1], "", 1, 0, (struct sockaddr*)&_cancelAddr, - sizeof(_cancelAddr)) > 0); +# ifndef OF_WII + OF_ENSURE(sendto(_cancelFD[1], "", 1, 0, + (struct sockaddr*)&_cancelAddr, sizeof(_cancelAddr)) > 0); +# else + OF_ENSURE(sendto(_cancelFD[1], "", 1, 0, + (struct sockaddr*)&_cancelAddr, 8) > 0); +# endif #endif } - -- (void)OF_processReadBuffers -{ - id const *objects = [_readObjects objects]; - size_t i, count = [_readObjects count]; - - for (i = 0; i < count; i++) { - void *pool = objc_autoreleasePoolPush(); - - if ([objects[i] isKindOfClass: [OFStream class]] && - [objects[i] hasDataInReadBuffer] && - ![objects[i] OF_isWaitingForDelimiter] && - [_delegate respondsToSelector: - @selector(objectIsReadyForReading:)]) - [_delegate objectIsReadyForReading: objects[i]]; - - objc_autoreleasePoolPop(pool); - } -} @end