@@ -18,10 +18,12 @@ #define OF_STREAM_OBSERVER_M #include +#include + #ifdef OF_HAVE_POLL # include #endif #import "OFStreamObserver.h" @@ -31,10 +33,17 @@ #import "OFStream.h" #import "OFNumber.h" #import "OFAutoreleasePool.h" #import "OFOutOfRangeException.h" + +enum { + QUEUE_ADD = 0, + QUEUE_REMOVE = 1, + QUEUE_READ = 0, + QUEUE_WRITE = 2 +}; @implementation OFStreamObserver + observer { return [[[self alloc] init] autorelease]; @@ -45,10 +54,12 @@ self = [super init]; @try { readStreams = [[OFMutableArray alloc] init]; writeStreams = [[OFMutableArray alloc] init]; + queue = [[OFMutableArray alloc] init]; + queueInfo = [[OFMutableArray alloc] init]; #ifdef OF_HAVE_POLL fds = [[OFDataArray alloc] initWithItemSize: sizeof(struct pollfd)]; fdToStream = [[OFMutableDictionary alloc] init]; #else @@ -66,10 +77,12 @@ - (void)dealloc { [(id)delegate release]; [readStreams release]; [writeStreams release]; + [queue release]; + [queueInfo release]; #ifdef OF_HAVE_POLL [fdToStream release]; [fds release]; #endif @@ -175,60 +188,121 @@ } #endif - (void)addStreamToObserveForReading: (OFStream*)stream { - [readStreams addObject: stream]; - -#ifdef OF_HAVE_POLL - [self _addStream: stream - withEvents: POLLIN]; -#else - [self _addStream: stream - withFDSet: &readfds]; -#endif + OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; + OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_READ]; + + @synchronized (queue) { + [queue addObject: stream]; + [queueInfo addObject: qi]; + } + + [pool release]; } - (void)addStreamToObserveForWriting: (OFStream*)stream { - [writeStreams addObject: stream]; - -#ifdef OF_HAVE_POLL - [self _addStream: stream - withEvents: POLLOUT]; -#else - [self _addStream: stream - withFDSet: &writefds]; -#endif + OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; + OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_WRITE]; + + @synchronized (queue) { + [queue addObject: stream]; + [queueInfo addObject: qi]; + } + + [pool release]; } - (void)removeStreamToObserveForReading: (OFStream*)stream { - [readStreams removeObjectIdenticalTo: stream]; - -#ifdef OF_HAVE_POLL - [self _removeStream: stream - withEvents: POLLIN]; -#else - [self _removeStream: stream - withFDSet: &readfds - otherFDSet: &writefds]; -#endif + OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; + OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_READ]; + + @synchronized (queue) { + [queue addObject: stream]; + [queueInfo addObject: qi]; + } + + [pool release]; } - (void)removeStreamToObserveForWriting: (OFStream*)stream { - [writeStreams removeObjectIdenticalTo: stream]; + OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; + OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_WRITE]; + + @synchronized (queue) { + [queue addObject: stream]; + [queueInfo addObject: qi]; + } + + [pool release]; +} + +- (void)_processQueue +{ + @synchronized (queue) { + OFStream **queue_c = [queue cArray]; + OFNumber **queueInfo_c = [queueInfo cArray]; + size_t i, count = [queue count]; + for (i = 0; i < count; i++) { + switch ([queueInfo_c[i] intValue]) { + case QUEUE_ADD | QUEUE_READ: + [readStreams addObject: queue_c[i]]; +#ifdef OF_HAVE_POLL + [self _addStream: queue_c[i] + withEvents: POLLIN]; +#else + [self _addStream: queue_c[i] + withFDSet: &readfds]; +#endif + break; + case QUEUE_ADD | QUEUE_WRITE: + [writeStreams addObject: queue_c[i]]; +#ifdef OF_HAVE_POLL + [self _addStream: queue_c[i] + withEvents: POLLOUT]; +#else + [self _addStream: queue_c[i] + withFDSet: &writefds]; +#endif + break; + case QUEUE_REMOVE | QUEUE_READ: + [readStreams removeObjectIdenticalTo: + queue_c[i]]; +#ifdef OF_HAVE_POLL + [self _removeStream: queue_c[i] + withEvents: POLLIN]; +#else + [self _removeStream: queue_c[i] + withFDSet: &readfds + otherFDSet: &writefds]; +#endif + break; + case QUEUE_REMOVE | QUEUE_WRITE: + [writeStreams removeObjectIdenticalTo: + queue_c[i]]; #ifdef OF_HAVE_POLL - [self _removeStream: stream - withEvents: POLLOUT]; + [self _removeStream: queue_c[i] + withEvents: POLLOUT]; #else - [self _removeStream: stream - withFDSet: &writefds - otherFDSet: &readfds]; + [self _removeStream: queue_c[i] + withFDSet: &writefds + otherFDSet: &readfds]; #endif + break; + default: + assert(0); + } + } + + [queue removeNObjects: count]; + [queueInfo removeNObjects: count]; + } } - (void)observe { [self observeWithTimeout: -1]; @@ -240,17 +314,19 @@ BOOL foundInCache = NO; OFStream **cArray; size_t i, count; #ifdef OF_HAVE_POLL struct pollfd *fds_c; - nfds_t nfds; + size_t nfds; #else fd_set readfds_; fd_set writefds_; fd_set exceptfds_; struct timeval tv; #endif + + [self _processQueue]; cArray = [readStreams cArray]; count = [readStreams count]; for (i = 0; i < count; i++) {