Index: configure.ac ================================================================== --- configure.ac +++ configure.ac @@ -465,14 +465,16 @@ AC_CHECK_FUNC(localtime_r, [ AC_DEFINE(HAVE_LOCALTIME_R, 1, [Whether we have localtime_r]) ]) AC_CHECK_HEADER(poll.h, [ - AC_DEFINE(OF_HAVE_POLL, 1, [Whether poll is supported]) + AC_DEFINE(HAVE_POLL_H, 1, [Whether we have poll.h]) + AC_SUBST(OFSTREAMPOLLOBSERVER_M, "OFStreamPollObserver.m") ]) AC_CHECK_HEADERS(sys/select.h, [ - AC_DEFINE(OF_HAVE_SYS_SELECT_H, 1, [Whether we have sys/select.h]) + AC_DEFINE(HAVE_SYS_SELECT_H, 1, [Whether we have sys/select.h]) + AC_SUBST(OFSTREAMSELECTOBSERVER_M, "OFStreamSelectObserver.m") ]) AC_MSG_CHECKING(for getaddrinfo) AC_TRY_COMPILE([ #include Index: extra.mk.in ================================================================== --- extra.mk.in +++ extra.mk.in @@ -17,13 +17,15 @@ OBJC_PROPERTIES_M = @OBJC_PROPERTIES_M@ OBJC_SYNC_M = @OBJC_SYNC_M@ OFHTTPREQUESTTESTS_M = @OFHTTPREQUESTTESTS_M@ OFPLUGIN_M = @OFPLUGIN_M@ OFPLUGINTESTS_M = @OFPLUGINTESTS_M@ +OFSTREAMPOLLOBSERVER_M = @OFSTREAMPOLLOBSERVER_M@ +OFSTREAMSELECTOBSERVER_M = @OFSTREAMSELECTOBSERVER_M@ OFTHREAD_M = @OFTHREAD_M@ OFTHREADTESTS_M = @OFTHREADTESTS_M@ PROPERTIESTESTS_M = @PROPERTIESTESTS_M@ REEXPORT_LIBOBJC = @REEXPORT_LIBOBJC@ TESTPLUGIN = @TESTPLUGIN@ TESTS = @TESTS@ TEST_LAUNCHER = @TEST_LAUNCHER@ THREADING_H = @THREADING_H@ Index: src/Makefile ================================================================== --- src/Makefile +++ src/Makefile @@ -77,10 +77,12 @@ OFDictionary_hashtable.m \ OFMutableArray_adjacent.m \ OFMutableDictionary_hashtable.m \ OFMutableSet_hashtable.m \ OFSet_hashtable.m \ + ${OFSTREAMPOLLOBSERVER_M} \ + ${OFSTREAMSELECTOBSERVER_M} \ ${ASPRINTF_M} \ ${FOUNDATION_COMPAT_M} \ iso_8859_15.m \ windows_1252.m \ ${OBJC_PROPERTIES_M} \ Index: src/OFStreamObserver.h ================================================================== --- src/OFStreamObserver.h +++ src/OFStreamObserver.h @@ -12,14 +12,10 @@ * Public License, either version 2 or 3, which can be found in the file * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this * file. */ -#if !defined(OF_HAVE_POLL) && defined(OF_HAVE_SYS_SELECT_H) -# include -#endif - #import "OFObject.h" #ifdef _WIN32 # ifndef _WIN32_WINNT # define _WIN32_WINNT 0x0501 @@ -27,13 +23,10 @@ # include # include #endif @class OFStream; -#ifdef OF_HAVE_POLL -@class OFDataArray; -#endif @class OFMutableArray; @class OFMutableDictionary; /** * \brief A protocol that needs to be implemented by delegates for @@ -78,19 +71,10 @@ { OFMutableArray *readStreams; OFMutableArray *writeStreams; OFMutableArray *queue, *queueInfo; id delegate; -#ifdef OF_HAVE_POLL - OFDataArray *FDs; - OFMutableDictionary *FDToStream; -#else - fd_set readFDs; - fd_set writeFDs; - fd_set exceptFDs; - int nFDs; -#endif int cancelFD[2]; #ifdef _WIN32 struct sockaddr_in cancelAddr; #endif } @@ -174,9 +158,13 @@ * * \param timeout The time to wait for an event, in milliseconds * \return A boolean whether events occurred during the timeinterval */ - (BOOL)observeWithTimeout: (int)timeout; + +/// \cond internal +- (BOOL)_processCache; +/// \endcond @end @interface OFObject (OFStreamObserverDelegate) @end Index: src/OFStreamObserver.m ================================================================== --- src/OFStreamObserver.m +++ src/OFStreamObserver.m @@ -17,40 +17,36 @@ #include "config.h" #define OF_STREAM_OBSERVER_M #define __NO_EXT_QNX -#include - #include - #include -#ifdef OF_HAVE_POLL -# include -#endif - #import "OFStreamObserver.h" -#import "OFDataArray.h" #import "OFArray.h" #import "OFDictionary.h" #import "OFStream.h" #import "OFNumber.h" #ifdef _WIN32 # import "OFTCPSocket.h" #endif #import "OFAutoreleasePool.h" + +#ifdef HAVE_POLL_H +# import "OFStreamPollObserver.h" +#endif +#if defined(HAVE_SYS_SELECT_H) || defined(_WIN32) +# import "OFStreamSelectObserver.h" +#endif #import "OFInitializationFailedException.h" +#import "OFNotImplementedException.h" #import "OFOutOfRangeException.h" #import "macros.h" -#ifdef _WIN32 -# define close(sock) closesocket(sock) -#endif - enum { QUEUE_ADD = 0, QUEUE_REMOVE = 1, QUEUE_READ = 0, QUEUE_WRITE = 2 @@ -60,35 +56,42 @@ + observer { return [[[self alloc] init] autorelease]; } +#if defined(HAVE_POLL_H) ++ alloc +{ + if (self == [OFStreamObserver class]) + return [OFStreamPollObserver alloc]; + + return [super alloc]; +} +#elif defined(HAVE_SYS_SELECT_H) || defined(_WIN32) ++ alloc +{ + if (self == [OFStreamObserver class]) + return [OFStreamSelectObserver alloc]; + + return [super alloc]; +} +#endif + - init { self = [super init]; @try { #ifdef _WIN32 struct sockaddr_in cancelAddr2; socklen_t cancelAddrLen; #endif -#ifdef OF_HAVE_POLL - struct pollfd p = { 0, POLLIN, 0 }; -#endif readStreams = [[OFMutableArray alloc] init]; writeStreams = [[OFMutableArray alloc] init]; queue = [[OFMutableArray alloc] init]; queueInfo = [[OFMutableArray alloc] init]; -#ifdef OF_HAVE_POLL - FDs = [[OFDataArray alloc] initWithItemSize: - sizeof(struct pollfd)]; - FDToStream = [[OFMutableDictionary alloc] init]; -#else - FD_ZERO(&readFDs); - FD_ZERO(&writeFDs); -#endif #ifndef _WIN32 if (pipe(cancelFD)) @throw [OFInitializationFailedException newWithClass: isa]; @@ -120,18 +123,10 @@ if (getsockname(cancelFD[0], (struct sockaddr*)&cancelAddr, &cancelAddrLen)) @throw [OFInitializationFailedException newWithClass: isa]; #endif - -#ifdef OF_HAVE_POLL - p.fd = cancelFD[0]; - [FDs addItem: &p]; -#else - FD_SET(cancelFD[0], &readFDs); - nFDs = cancelFD[0] + 1; -#endif } @catch (id e) { [self release]; @throw e; } @@ -146,14 +141,10 @@ [(id)delegate release]; [readStreams release]; [writeStreams release]; [queue release]; [queueInfo release]; -#ifdef OF_HAVE_POLL - [FDToStream release]; - [FDs release]; -#endif [super dealloc]; } - (id )delegate @@ -164,92 +155,10 @@ - (void)setDelegate: (id )delegate_ { OF_SETTER(delegate, delegate_, YES, NO) } -#ifdef OF_HAVE_POLL -- (void)_addStream: (OFStream*)stream - withEvents: (short)events -{ - struct pollfd *FDsCArray = [FDs cArray]; - size_t i, count = [FDs count]; - int fileDescriptor = [stream fileDescriptor]; - BOOL found = NO; - - for (i = 0; i < count; i++) { - if (FDsCArray[i].fd == fileDescriptor) { - FDsCArray[i].events |= events; - found = YES; - } - } - - if (!found) { - OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; - struct pollfd p = { fileDescriptor, events | POLLERR, 0 }; - [FDs addItem: &p]; - [FDToStream setObject: stream - forKey: [OFNumber numberWithInt: - fileDescriptor]]; - [pool release]; - } -} - -- (void)_removeStream: (OFStream*)stream - withEvents: (short)events -{ - struct pollfd *FDsCArray = [FDs cArray]; - size_t i, nFDs = [FDs count]; - int fileDescriptor = [stream fileDescriptor]; - - for (i = 0; i < nFDs; i++) { - if (FDsCArray[i].fd == fileDescriptor) { - OFAutoreleasePool *pool; - - FDsCArray[i].events &= ~events; - - if ((FDsCArray[i].events & ~POLLERR) != 0) - return; - - pool = [[OFAutoreleasePool alloc] init]; - - [FDs removeItemAtIndex: i]; - [FDToStream removeObjectForKey: - [OFNumber numberWithInt: fileDescriptor]]; - - [pool release]; - } - } -} -#else -- (void)_addStream: (OFStream*)stream - withFDSet: (fd_set*)FDSet -{ - OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; - int fileDescriptor = [stream fileDescriptor]; - - FD_SET(fileDescriptor, FDSet); - FD_SET(fileDescriptor, &exceptFDs); - - if (fileDescriptor >= nFDs) - nFDs = fileDescriptor + 1; - - [pool release]; -} - -- (void)_removeStream: (OFStream*)stream - withFDSet: (fd_set*)FDSet - otherFDSet: (fd_set*)otherFDSet -{ - int fileDescriptor = [stream fileDescriptor]; - - FD_CLR(fileDescriptor, FDSet); - - if (!FD_ISSET(fileDescriptor, otherFDSet)) - FD_CLR(fileDescriptor, &exceptFDs); -} -#endif - - (void)addStreamToObserveForReading: (OFStream*)stream { OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_READ]; @@ -326,230 +235,48 @@ #endif [pool release]; } -- (void)_processQueue -{ - @synchronized (queue) { - OFStream **queueCArray = [queue cArray]; - OFNumber **queueInfoCArray = [queueInfo cArray]; - size_t i, count = [queue count]; - - for (i = 0; i < count; i++) { - switch ([queueInfoCArray[i] intValue]) { - case QUEUE_ADD | QUEUE_READ: - [readStreams addObject: queueCArray[i]]; -#ifdef OF_HAVE_POLL - [self _addStream: queueCArray[i] - withEvents: POLLIN]; -#else - [self _addStream: queueCArray[i] - withFDSet: &readFDs]; -#endif - break; - case QUEUE_ADD | QUEUE_WRITE: - [writeStreams addObject: queueCArray[i]]; -#ifdef OF_HAVE_POLL - [self _addStream: queueCArray[i] - withEvents: POLLOUT]; -#else - [self _addStream: queueCArray[i] - withFDSet: &writeFDs]; -#endif - break; - case QUEUE_REMOVE | QUEUE_READ: - [readStreams removeObjectIdenticalTo: - queueCArray[i]]; -#ifdef OF_HAVE_POLL - [self _removeStream: queueCArray[i] - withEvents: POLLIN]; -#else - [self _removeStream: queueCArray[i] - withFDSet: &readFDs - otherFDSet: &writeFDs]; -#endif - break; - case QUEUE_REMOVE | QUEUE_WRITE: - [writeStreams removeObjectIdenticalTo: - queueCArray[i]]; -#ifdef OF_HAVE_POLL - [self _removeStream: queueCArray[i] - withEvents: POLLOUT]; -#else - [self _removeStream: queueCArray[i] - withFDSet: &writeFDs - otherFDSet: &readFDs]; -#endif - break; - default: - assert(0); - } - } - - [queue removeNObjects: count]; - [queueInfo removeNObjects: count]; - } -} - - (void)observe { [self observeWithTimeout: -1]; } - (BOOL)observeWithTimeout: (int)timeout { - OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; + @throw [OFNotImplementedException newWithClass: isa + selector: _cmd]; +} + +- (BOOL)_processCache +{ + OFAutoreleasePool *pool; + OFStream **cArray = [readStreams cArray]; + size_t i, count = [readStreams count]; BOOL foundInCache = NO; - OFStream **cArray; - size_t i, count; -#ifdef OF_HAVE_POLL - struct pollfd *FDsCArray; - size_t nFDs; -#else - fd_set readFDs_; - fd_set writeFDs_; - fd_set exceptFDs_; - struct timeval time; -#endif - - [self _processQueue]; - - cArray = [readStreams cArray]; - count = [readStreams count]; + + pool = [[OFAutoreleasePool alloc] init]; for (i = 0; i < count; i++) { if ([cArray[i] pendingBytes] > 0) { [delegate streamDidBecomeReadyForReading: cArray[i]]; foundInCache = YES; [pool releaseObjects]; } } + [pool release]; + /* * As long as we have data in the cache for any stream, we don't want * to block. */ if (foundInCache) return YES; -#ifdef OF_HAVE_POLL - FDsCArray = [FDs cArray]; - nFDs = [FDs count]; - -# ifdef OPEN_MAX - if (nFDs > OPEN_MAX) - @throw [OFOutOfRangeException newWithClass: isa]; -# endif - - if (poll(FDsCArray, (nfds_t)nFDs, timeout) < 1) - return NO; - - for (i = 0; i < nFDs; i++) { - OFNumber *num; - OFStream *stream; - - if (FDsCArray[i].revents & POLLIN) { - if (FDsCArray[i].fd == cancelFD[0]) { - char buffer; - - assert(read(cancelFD[0], &buffer, 1) > 0); - FDsCArray[i].revents = 0; - - continue; - } - - num = [OFNumber numberWithInt: FDsCArray[i].fd]; - stream = [FDToStream objectForKey: num]; - [delegate streamDidBecomeReadyForReading: stream]; - [pool releaseObjects]; - } - - if (FDsCArray[i].revents & POLLOUT) { - num = [OFNumber numberWithInt: FDsCArray[i].fd]; - stream = [FDToStream objectForKey: num]; - [delegate streamDidBecomeReadyForReading: stream]; - [pool releaseObjects]; - } - - if (FDsCArray[i].revents & POLLERR) { - num = [OFNumber numberWithInt: FDsCArray[i].fd]; - stream = [FDToStream objectForKey: num]; - [delegate streamDidReceiveException: stream]; - [pool releaseObjects]; - } - - FDsCArray[i].revents = 0; - } -#else -# ifdef FD_COPY - FD_COPY(&readFDs, &readFDs_); - FD_COPY(&writeFDs, &writeFDs_); - FD_COPY(&exceptFDs, &exceptFDs_); -# else - readFDs_ = readFDs; - writeFDs_ = writeFDs; - exceptFDs_ = exceptFDs; -# endif - - time.tv_sec = timeout / 1000; - time.tv_usec = (timeout % 1000) * 1000; - - if (select(nFDs, &readFDs_, &writeFDs_, &exceptFDs_, - (timeout != -1 ? &time : NULL)) < 1) - return NO; - - if (FD_ISSET(cancelFD[0], &readFDs_)) { - char buffer; -#ifndef _WIN32 - assert(read(cancelFD[0], &buffer, 1) > 0); -#else - assert(recvfrom(cancelFD[0], &buffer, 1, 0, NULL, NULL) > 0); -#endif - } - - for (i = 0; i < count; i++) { - int fileDescriptor = [cArray[i] fileDescriptor]; - - if (FD_ISSET(fileDescriptor, &readFDs_)) { - [delegate streamDidBecomeReadyForReading: cArray[i]]; - [pool releaseObjects]; - } - - if (FD_ISSET(fileDescriptor, &exceptFDs_)) { - [delegate streamDidReceiveException: cArray[i]]; - [pool releaseObjects]; - - /* - * Prevent calling it twice in case the FD is in both - * sets. - */ - FD_CLR(fileDescriptor, &exceptFDs_); - } - } - - cArray = [writeStreams cArray]; - count = [writeStreams count]; - - for (i = 0; i < count; i++) { - int fileDescriptor = [cArray[i] fileDescriptor]; - - if (FD_ISSET(fileDescriptor, &writeFDs_)) { - [delegate streamDidBecomeReadyForWriting: cArray[i]]; - [pool releaseObjects]; - } - - if (FD_ISSET(fileDescriptor, &exceptFDs_)) { - [delegate streamDidReceiveException: cArray[i]]; - [pool releaseObjects]; - } - } -#endif - - [pool release]; - - return YES; + return NO; } @end @implementation OFObject (OFStreamObserverDelegate) - (void)streamDidBecomeReadyForReading: (OFStream*)stream ADDED src/OFStreamPollObserver.h Index: src/OFStreamPollObserver.h ================================================================== --- src/OFStreamPollObserver.h +++ src/OFStreamPollObserver.h @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2008, 2009, 2010, 2011 + * Jonathan Schleifer + * + * All rights reserved. + * + * This file is part of ObjFW. It may be distributed under the terms of the + * Q Public License 1.0, which can be found in the file LICENSE.QPL included in + * the packaging of this file. + * + * Alternatively, it may be distributed under the terms of the GNU General + * Public License, either version 2 or 3, which can be found in the file + * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this + * file. + */ + +#import "OFStreamObserver.h" + +@class OFDataArray; + +@interface OFStreamPollObserver: OFStreamObserver +{ + OFDataArray *FDs; + OFMutableDictionary *FDToStream; +} +@end ADDED src/OFStreamPollObserver.m Index: src/OFStreamPollObserver.m ================================================================== --- src/OFStreamPollObserver.m +++ src/OFStreamPollObserver.m @@ -0,0 +1,239 @@ +/* + * Copyright (c) 2008, 2009, 2010, 2011 + * Jonathan Schleifer + * + * All rights reserved. + * + * This file is part of ObjFW. It may be distributed under the terms of the + * Q Public License 1.0, which can be found in the file LICENSE.QPL included in + * the packaging of this file. + * + * Alternatively, it may be distributed under the terms of the GNU General + * Public License, either version 2 or 3, which can be found in the file + * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this + * file. + */ + +#include "config.h" + +#define __NO_EXT_QNX + +#include +#include +#include + +#import "OFStreamPollObserver.h" +#import "OFStream.h" +#import "OFArray.h" +#import "OFDictionary.h" +#import "OFDataArray.h" +#import "OFNumber.h" +#import "OFAutoreleasePool.h" + +#import "OFOutOfRangeException.h" + +enum { + QUEUE_ADD = 0, + QUEUE_REMOVE = 1, + QUEUE_READ = 0, + QUEUE_WRITE = 2 +}; + +@implementation OFStreamPollObserver + +- init +{ + self = [super init]; + + @try { + struct pollfd p = { 0, POLLIN, 0 }; + + FDs = [[OFDataArray alloc] initWithItemSize: + sizeof(struct pollfd)]; + FDToStream = [[OFMutableDictionary alloc] init]; + + p.fd = cancelFD[0]; + [FDs addItem: &p]; + } @catch (id e) { + [self release]; + @throw e; + } + + return self; +} + +- (void)dealloc +{ + [FDToStream release]; + [FDs release]; + + [super dealloc]; +} + + +- (void)_addStream: (OFStream*)stream + withEvents: (short)events +{ + struct pollfd *FDsCArray = [FDs cArray]; + size_t i, count = [FDs count]; + int fileDescriptor = [stream fileDescriptor]; + BOOL found = NO; + + for (i = 0; i < count; i++) { + if (FDsCArray[i].fd == fileDescriptor) { + FDsCArray[i].events |= events; + found = YES; + } + } + + if (!found) { + OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; + struct pollfd p = { fileDescriptor, events | POLLERR, 0 }; + [FDs addItem: &p]; + [FDToStream setObject: stream + forKey: [OFNumber numberWithInt: + fileDescriptor]]; + [pool release]; + } +} + +- (void)_removeStream: (OFStream*)stream + withEvents: (short)events +{ + struct pollfd *FDsCArray = [FDs cArray]; + size_t i, nFDs = [FDs count]; + int fileDescriptor = [stream fileDescriptor]; + + for (i = 0; i < nFDs; i++) { + if (FDsCArray[i].fd == fileDescriptor) { + OFAutoreleasePool *pool; + + FDsCArray[i].events &= ~events; + + if ((FDsCArray[i].events & ~POLLERR) != 0) + return; + + pool = [[OFAutoreleasePool alloc] init]; + + [FDs removeItemAtIndex: i]; + [FDToStream removeObjectForKey: + [OFNumber numberWithInt: fileDescriptor]]; + + [pool release]; + } + } +} + +- (void)_processQueue +{ + @synchronized (queue) { + OFStream **queueCArray = [queue cArray]; + OFNumber **queueInfoCArray = [queueInfo cArray]; + size_t i, count = [queue count]; + + for (i = 0; i < count; i++) { + switch ([queueInfoCArray[i] intValue]) { + case QUEUE_ADD | QUEUE_READ: + [readStreams addObject: queueCArray[i]]; + + [self _addStream: queueCArray[i] + withEvents: POLLIN]; + + break; + case QUEUE_ADD | QUEUE_WRITE: + [writeStreams addObject: queueCArray[i]]; + + [self _addStream: queueCArray[i] + withEvents: POLLOUT]; + + break; + case QUEUE_REMOVE | QUEUE_READ: + [readStreams removeObjectIdenticalTo: + queueCArray[i]]; + + [self _removeStream: queueCArray[i] + withEvents: POLLIN]; + + break; + case QUEUE_REMOVE | QUEUE_WRITE: + [writeStreams removeObjectIdenticalTo: + queueCArray[i]]; + + [self _removeStream: queueCArray[i] + withEvents: POLLOUT]; + + break; + default: + assert(0); + } + } + + [queue removeNObjects: count]; + [queueInfo removeNObjects: count]; + } +} + +- (BOOL)observeWithTimeout: (int)timeout +{ + OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; + struct pollfd *FDsCArray; + size_t i, nFDs; + + [self _processQueue]; + + if ([self _processCache]) + return YES; + + FDsCArray = [FDs cArray]; + nFDs = [FDs count]; + +#ifdef OPEN_MAX + if (nFDs > OPEN_MAX) + @throw [OFOutOfRangeException newWithClass: isa]; +#endif + + if (poll(FDsCArray, (nfds_t)nFDs, timeout) < 1) + return NO; + + for (i = 0; i < nFDs; i++) { + OFNumber *num; + OFStream *stream; + + if (FDsCArray[i].revents & POLLIN) { + if (FDsCArray[i].fd == cancelFD[0]) { + char buffer; + + assert(read(cancelFD[0], &buffer, 1) > 0); + FDsCArray[i].revents = 0; + + continue; + } + + num = [OFNumber numberWithInt: FDsCArray[i].fd]; + stream = [FDToStream objectForKey: num]; + [delegate streamDidBecomeReadyForReading: stream]; + [pool releaseObjects]; + } + + if (FDsCArray[i].revents & POLLOUT) { + num = [OFNumber numberWithInt: FDsCArray[i].fd]; + stream = [FDToStream objectForKey: num]; + [delegate streamDidBecomeReadyForReading: stream]; + [pool releaseObjects]; + } + + if (FDsCArray[i].revents & POLLERR) { + num = [OFNumber numberWithInt: FDsCArray[i].fd]; + stream = [FDToStream objectForKey: num]; + [delegate streamDidReceiveException: stream]; + [pool releaseObjects]; + } + + FDsCArray[i].revents = 0; + } + + [pool release]; + + return YES; +} +@end ADDED src/OFStreamSelectObserver.h Index: src/OFStreamSelectObserver.h ================================================================== --- src/OFStreamSelectObserver.h +++ src/OFStreamSelectObserver.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2008, 2009, 2010, 2011 + * Jonathan Schleifer + * + * All rights reserved. + * + * This file is part of ObjFW. It may be distributed under the terms of the + * Q Public License 1.0, which can be found in the file LICENSE.QPL included in + * the packaging of this file. + * + * Alternatively, it may be distributed under the terms of the GNU General + * Public License, either version 2 or 3, which can be found in the file + * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this + * file. + */ + +#ifdef OF_HAVE_SYS_SELECT_H +# include +#endif + +#import "OFStreamObserver.h" + +@interface OFStreamSelectObserver: OFStreamObserver +{ + fd_set readFDs; + fd_set writeFDs; + fd_set exceptFDs; + int nFDs; +} +@end ADDED src/OFStreamSelectObserver.m Index: src/OFStreamSelectObserver.m ================================================================== --- src/OFStreamSelectObserver.m +++ src/OFStreamSelectObserver.m @@ -0,0 +1,223 @@ +/* + * Copyright (c) 2008, 2009, 2010, 2011 + * Jonathan Schleifer + * + * All rights reserved. + * + * This file is part of ObjFW. It may be distributed under the terms of the + * Q Public License 1.0, which can be found in the file LICENSE.QPL included in + * the packaging of this file. + * + * Alternatively, it may be distributed under the terms of the GNU General + * Public License, either version 2 or 3, which can be found in the file + * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this + * file. + */ + +#include "config.h" + +#include +#include +#include + +#define __NO_EXT_QNX +#import "OFStreamSelectObserver.h" +#import "OFStream.h" +#import "OFArray.h" +#import "OFNumber.h" +#import "OFAutoreleasePool.h" + +#ifdef _WIN32 +# define close(sock) closesocket(sock) +#endif + +enum { + QUEUE_ADD = 0, + QUEUE_REMOVE = 1, + QUEUE_READ = 0, + QUEUE_WRITE = 2 +}; + +@implementation OFStreamSelectObserver +- init +{ + self = [super init]; + + @try { + FD_ZERO(&readFDs); + FD_ZERO(&writeFDs); + + FD_SET(cancelFD[0], &readFDs); + nFDs = cancelFD[0] + 1; + } @catch (id e) { + [self release]; + @throw e; + } + + return self; +} + +- (void)_addStream: (OFStream*)stream + withFDSet: (fd_set*)FDSet +{ + OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; + int fileDescriptor = [stream fileDescriptor]; + + FD_SET(fileDescriptor, FDSet); + FD_SET(fileDescriptor, &exceptFDs); + + if (fileDescriptor >= nFDs) + nFDs = fileDescriptor + 1; + + [pool release]; +} + +- (void)_removeStream: (OFStream*)stream + withFDSet: (fd_set*)FDSet + otherFDSet: (fd_set*)otherFDSet +{ + int fileDescriptor = [stream fileDescriptor]; + + FD_CLR(fileDescriptor, FDSet); + + if (!FD_ISSET(fileDescriptor, otherFDSet)) + FD_CLR(fileDescriptor, &exceptFDs); +} + +- (void)_processQueue +{ + @synchronized (queue) { + OFStream **queueCArray = [queue cArray]; + OFNumber **queueInfoCArray = [queueInfo cArray]; + size_t i, count = [queue count]; + + for (i = 0; i < count; i++) { + switch ([queueInfoCArray[i] intValue]) { + case QUEUE_ADD | QUEUE_READ: + [readStreams addObject: queueCArray[i]]; + + [self _addStream: queueCArray[i] + withFDSet: &readFDs]; + + break; + case QUEUE_ADD | QUEUE_WRITE: + [writeStreams addObject: queueCArray[i]]; + + [self _addStream: queueCArray[i] + withFDSet: &writeFDs]; + + break; + case QUEUE_REMOVE | QUEUE_READ: + [readStreams removeObjectIdenticalTo: + queueCArray[i]]; + + [self _removeStream: queueCArray[i] + withFDSet: &readFDs + otherFDSet: &writeFDs]; + + break; + case QUEUE_REMOVE | QUEUE_WRITE: + [writeStreams removeObjectIdenticalTo: + queueCArray[i]]; + + [self _removeStream: queueCArray[i] + withFDSet: &writeFDs + otherFDSet: &readFDs]; + + break; + default: + assert(0); + } + } + + [queue removeNObjects: count]; + [queueInfo removeNObjects: count]; + } +} + +- (BOOL)observeWithTimeout: (int)timeout +{ + OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; + OFStream **cArray; + fd_set readFDs_; + fd_set writeFDs_; + fd_set exceptFDs_; + struct timeval time; + size_t i, count; + + [self _processQueue]; + + if ([self _processCache]) + return YES; + +# ifdef FD_COPY + FD_COPY(&readFDs, &readFDs_); + FD_COPY(&writeFDs, &writeFDs_); + FD_COPY(&exceptFDs, &exceptFDs_); +# else + readFDs_ = readFDs; + writeFDs_ = writeFDs; + exceptFDs_ = exceptFDs; +# endif + + time.tv_sec = timeout / 1000; + time.tv_usec = (timeout % 1000) * 1000; + + if (select(nFDs, &readFDs_, &writeFDs_, &exceptFDs_, + (timeout != -1 ? &time : NULL)) < 1) + return NO; + + if (FD_ISSET(cancelFD[0], &readFDs_)) { + char buffer; +#ifndef _WIN32 + assert(read(cancelFD[0], &buffer, 1) > 0); +#else + assert(recvfrom(cancelFD[0], &buffer, 1, 0, NULL, NULL) > 0); +#endif + } + + cArray = [readStreams cArray]; + count = [readStreams count]; + + for (i = 0; i < count; i++) { + int fileDescriptor = [cArray[i] fileDescriptor]; + + if (FD_ISSET(fileDescriptor, &readFDs_)) { + [delegate streamDidBecomeReadyForReading: cArray[i]]; + [pool releaseObjects]; + } + + if (FD_ISSET(fileDescriptor, &exceptFDs_)) { + [delegate streamDidReceiveException: cArray[i]]; + [pool releaseObjects]; + + /* + * Prevent calling it twice in case the FD is in both + * sets. + */ + FD_CLR(fileDescriptor, &exceptFDs_); + } + } + + cArray = [writeStreams cArray]; + count = [writeStreams count]; + + for (i = 0; i < count; i++) { + int fileDescriptor = [cArray[i] fileDescriptor]; + + if (FD_ISSET(fileDescriptor, &writeFDs_)) { + [delegate streamDidBecomeReadyForWriting: cArray[i]]; + [pool releaseObjects]; + } + + if (FD_ISSET(fileDescriptor, &exceptFDs_)) { + [delegate streamDidReceiveException: cArray[i]]; + [pool releaseObjects]; + } + } + + [pool release]; + + return YES; +} +@end