Index: src/OFStreamObserver.h ================================================================== --- src/OFStreamObserver.h +++ src/OFStreamObserver.h @@ -86,10 +86,11 @@ fd_set readfds; fd_set writefds; fd_set exceptfds; int nfds; #endif + int cancelFd[2]; } #ifdef OF_HAVE_PROPERTIES @property (retain) id delegate; #endif @@ -111,15 +112,18 @@ */ - (void)setDelegate: (id )delegate; /** * Adds a stream to observe for reading. - * + * * This is also used to observe a listening socket for incoming connections, * which then triggers a read event for the observed stream. * * It is recommended that the stream you add is set to non-blocking mode. + * + * If there is an -[observe] call blocking, it will be canceled. The reason for + * this is to prevent blocking even though the new added stream is ready. * * \param stream The stream to observe for reading */ - (void)addStreamToObserveForReading: (OFStream*)stream; @@ -126,23 +130,32 @@ /** * Adds a stream to observe for writing. * * It is recommended that the stream you add is set to non-blocking mode. * + * If there is an -[observe] call blocking, it will be canceled. The reason for + * this is to prevent blocking even though the new added stream is ready. + * * \param stream The stream to observe for writing */ - (void)addStreamToObserveForWriting: (OFStream*)stream; /** * Removes a stream to observe for reading. * + * If there is an -[observe] call blocking, it will be canceled. The reason for + * this is to prevent the removed stream from still being observed. + * * \param stream The stream to remove from observing for reading */ - (void)removeStreamToObserveForReading: (OFStream*)stream; /** * Removes a stream to observe for writing. + * + * If there is an -[observe] call blocking, it will be canceled. The reason for + * this is to prevent the removed stream from still being observed. * * \param stream The stream to remove from observing for writing */ - (void)removeStreamToObserveForWriting: (OFStream*)stream; Index: src/OFStreamObserver.m ================================================================== --- src/OFStreamObserver.m +++ src/OFStreamObserver.m @@ -20,10 +20,12 @@ #include #include +#include + #ifdef OF_HAVE_POLL # include #endif #import "OFStreamObserver.h" @@ -32,10 +34,11 @@ #import "OFDictionary.h" #import "OFStream.h" #import "OFNumber.h" #import "OFAutoreleasePool.h" +#import "OFInitializationFailedException.h" #import "OFOutOfRangeException.h" enum { QUEUE_ADD = 0, QUEUE_REMOVE = 1, @@ -52,10 +55,12 @@ - init { self = [super init]; @try { + struct pollfd p = { 0, POLLIN, 0 }; + readStreams = [[OFMutableArray alloc] init]; writeStreams = [[OFMutableArray alloc] init]; queue = [[OFMutableArray alloc] init]; queueInfo = [[OFMutableArray alloc] init]; #ifdef OF_HAVE_POLL @@ -64,10 +69,22 @@ fdToStream = [[OFMutableDictionary alloc] init]; #else FD_ZERO(&readfds); FD_ZERO(&writefds); #endif + + if (pipe(cancelFd)) + @throw [OFInitializationFailedException + newWithClass: isa]; + +#ifdef OF_HAVE_POLL + p.fd = cancelFd[0]; + [fds addItem: &p]; +#else + FD_SET(cancelFd[0], fdset); + nfds = cancelFd[0] + 1; +#endif } @catch (id e) { [self release]; @throw e; } @@ -196,10 +213,12 @@ @synchronized (queue) { [queue addObject: stream]; [queueInfo addObject: qi]; } + write(cancelFd[1], "", 1); + [pool release]; } - (void)addStreamToObserveForWriting: (OFStream*)stream { @@ -208,10 +227,12 @@ @synchronized (queue) { [queue addObject: stream]; [queueInfo addObject: qi]; } + + write(cancelFd[1], "", 1); [pool release]; } - (void)removeStreamToObserveForReading: (OFStream*)stream @@ -222,10 +243,12 @@ @synchronized (queue) { [queue addObject: stream]; [queueInfo addObject: qi]; } + write(cancelFd[1], "", 1); + [pool release]; } - (void)removeStreamToObserveForWriting: (OFStream*)stream { @@ -234,10 +257,12 @@ @synchronized (queue) { [queue addObject: stream]; [queueInfo addObject: qi]; } + + write(cancelFd[1], "", 1); [pool release]; } - (void)_processQueue @@ -355,10 +380,18 @@ return NO; for (i = 0; i < nfds; i++) { OFNumber *num; OFStream *stream; + + if (fds_c[i].fd == cancelFd[0]) { + char buf; + + read(cancelFd[0], &buf, 1); + + continue; + } if (fds_c[i].revents & POLLIN) { num = [OFNumber numberWithInt: fds_c[i].fd]; stream = [fdToStream objectForKey: num]; [delegate streamDidBecomeReadyForReading: stream]; @@ -393,10 +426,15 @@ # endif if (select(nfds, &readfds_, &writefds_, &exceptfds_, (timeout != -1 ? &tv : NULL)) < 1) return NO; + + if (FD_ISSET(cancelFd[0], &readfds_)) { + char buf; + read(cancelFd[0], &buf, 1); + } for (i = 0; i < count; i++) { int fd = [cArray[i] fileDescriptor]; if (FD_ISSET(fd, &readfds_)) {