Index: src/OFStream.h ================================================================== --- src/OFStream.h +++ src/OFStream.h @@ -25,11 +25,14 @@ * all the caching and other stuff. If you override these methods without the * _ prefix, you *WILL* break caching and get broken results! */ @interface OFStream: OFObject { - char *cache, *wBuffer; +@public + char *cache; +@protected + char *wBuffer; size_t cacheLen, wBufferLen; BOOL useWBuffer; } /** Index: src/OFStreamObserver.h ================================================================== --- src/OFStreamObserver.h +++ src/OFStreamObserver.h @@ -24,10 +24,11 @@ @class OFStream; #ifdef OF_HAVE_POLL @class OFDataArray; #endif +@class OFMutableArray; @class OFMutableDictionary; /** * \brief A protocol that needs to be implemented by delegates for * OFStreamObserver. @@ -52,19 +53,21 @@ /** * \brief A class that can observe multiple streams at once. */ @interface OFStreamObserver: OFObject { + OFMutableArray *readStreams; + OFMutableArray *writeStreams; id delegate; #ifdef OF_HAVE_POLL OFDataArray *fds; + OFMutableDictionary *fdToStream; #else fd_set readfds; fd_set writefds; int nfds; #endif - OFMutableDictionary *fdToStream; } #ifdef OF_HAVE_PROPERTIES @property (retain) id delegate; #endif Index: src/OFStreamObserver.m ================================================================== --- src/OFStreamObserver.m +++ src/OFStreamObserver.m @@ -17,10 +17,11 @@ # include #endif #import "OFStreamObserver.h" #import "OFDataArray.h" +#import "OFArray.h" #import "OFDictionary.h" #import "OFStream.h" #import "OFNumber.h" #import "OFAutoreleasePool.h" #import "OFExceptions.h" @@ -33,28 +34,38 @@ - init { self = [super init]; + @try { + readStreams = [[OFMutableArray alloc] init]; + writeStreams = [[OFMutableArray alloc] init]; #ifdef OF_HAVE_POLL - fds = [[OFDataArray alloc] initWithItemSize: sizeof(struct pollfd)]; + fds = [[OFDataArray alloc] initWithItemSize: + sizeof(struct pollfd)]; + fdToStream = [[OFMutableDictionary alloc] init]; #else - FD_ZERO(&readfds); - FD_ZERO(&writefds); + FD_ZERO(&readfds); + FD_ZERO(&writefds); #endif - fdToStream = [[OFMutableDictionary alloc] init]; + } @catch (OFException *e) { + [self dealloc]; + @throw e; + } return self; } - (void)dealloc { [(id)delegate release]; + [readStreams release]; + [writeStreams release]; #ifdef OF_HAVE_POLL + [fdToStream release]; [fds release]; #endif - [fdToStream release]; [super dealloc]; } - (id )delegate @@ -134,13 +145,10 @@ FD_SET(fd, fdset); if (fd >= nfds) nfds = fd + 1; - [fdToStream setObject: stream - forKey: [OFNumber numberWithInt: fd]]; - [pool release]; } - (void)_removeStream: (OFStream*)stream withFDSet: (fd_set*)fdset @@ -149,23 +157,17 @@ if (fd >= FD_SETSIZE) @throw [OFOutOfRangeException newWithClass: isa]; FD_CLR(fd, fdset); - - if (!FD_ISSET(fd, &readfds) && !FD_ISSET(fd, &writefds)) { - OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; - - [fdToStream removeObjectForKey: [OFNumber numberWithInt: fd]]; - - [pool release]; - } } #endif - (void)addStreamToObserveForReading: (OFStream*)stream { + [readStreams addObject: stream]; + #ifdef OF_HAVE_POLL [self _addStream: stream withEvents: POLLIN]; #else [self _addStream: stream @@ -173,10 +175,12 @@ #endif } - (void)addStreamToObserveForWriting: (OFStream*)stream { + [writeStreams addObject: stream]; + #ifdef OF_HAVE_POLL [self _addStream: stream withEvents: POLLOUT]; #else [self _addStream: stream @@ -184,10 +188,12 @@ #endif } - (void)removeStreamToObserveForReading: (OFStream*)stream { + [readStreams removeObjectIdenticalTo: stream]; + #ifdef OF_HAVE_POLL [self _removeStream: stream withEvents: POLLIN]; #else [self _removeStream: stream @@ -195,10 +201,12 @@ #endif } - (void)removeStreamToObserveForWriting: (OFStream*)stream { + [writeStreams removeObjectIdenticalTo: stream]; + #ifdef OF_HAVE_POLL [self _removeStream: stream withEvents: POLLOUT]; #else [self _removeStream: stream @@ -212,14 +220,41 @@ } - (BOOL)observeWithTimeout: (int)timeout { OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; + BOOL foundInCache = NO; + OFStream **cArray; + size_t i, count; #ifdef OF_HAVE_POLL struct pollfd *fds_c = [fds cArray]; - size_t i, nfds = [fds count]; + size_t nfds = [fds count]; +#else + fd_set readfds_; + fd_set writefds_; + fd_set exceptfds_; + struct timeval tv; +#endif + + cArray = [readStreams cArray]; + count = [readStreams count]; + + for (i = 0; i < count; i++) { + if (cArray[i]->cache != NULL) { + [delegate streamDidBecomeReadyForReading: cArray[i]]; + foundInCache = YES; + } + } + + /* + * As long as we have data in the cache for any stream, we don't want + * to block. + */ + if (foundInCache) + return YES; +#ifdef OF_HAVE_POLL if (poll(fds_c, nfds, timeout) < 1) return NO; for (i = 0; i < nfds; i++) { OFNumber *num; @@ -238,37 +273,36 @@ } fds_c[i].revents = 0; } #else - fd_set readfds_; - fd_set writefds_; - fd_set exceptfds_; - struct timeval tv; - OFEnumerator *enumerator; - OFStream *stream; - readfds_ = readfds; writefds_ = writefds; FD_ZERO(&exceptfds_); if (select(nfds, &readfds_, &writefds_, &exceptfds_, (timeout != -1 ? &tv : NULL)) < 1) return NO; - enumerator = [[[fdToStream copy] autorelease] objectEnumerator]; + for (i = 0; i < count; i++) { + int fd = [cArray[i] fileDescriptor]; + + if (FD_ISSET(fd, &readfds_)) + [delegate streamDidBecomeReadyForReading: cArray[i]]; + } + + cArray = [writeStreams cArray]; + count = [writeStreams count]; - while ((stream = [enumerator nextObject]) != nil) { - int fd = [stream fileDescriptor]; + for (i = 0; i < count; i++) { + int fd = [cArray[i] fileDescriptor]; if (FD_ISSET(fd, &readfds_)) - [delegate streamDidBecomeReadyForReading: stream]; - - if (FD_ISSET(fd, &writefds_)) - [delegate streamDidBecomeReadyForWriting: stream]; + [delegate streamDidBecomeReadyForWriting: cArray[i]]; } #endif + [pool release]; return YES; } @end