@@ -20,10 +20,12 @@ #include #include +#include + #ifdef OF_HAVE_POLL # include #endif #import "OFStreamObserver.h" @@ -32,10 +34,11 @@ #import "OFDictionary.h" #import "OFStream.h" #import "OFNumber.h" #import "OFAutoreleasePool.h" +#import "OFInitializationFailedException.h" #import "OFOutOfRangeException.h" enum { QUEUE_ADD = 0, QUEUE_REMOVE = 1, @@ -52,10 +55,12 @@ - init { self = [super init]; @try { + struct pollfd p = { 0, POLLIN, 0 }; + readStreams = [[OFMutableArray alloc] init]; writeStreams = [[OFMutableArray alloc] init]; queue = [[OFMutableArray alloc] init]; queueInfo = [[OFMutableArray alloc] init]; #ifdef OF_HAVE_POLL @@ -64,10 +69,22 @@ fdToStream = [[OFMutableDictionary alloc] init]; #else FD_ZERO(&readfds); FD_ZERO(&writefds); #endif + + if (pipe(cancelFd)) + @throw [OFInitializationFailedException + newWithClass: isa]; + +#ifdef OF_HAVE_POLL + p.fd = cancelFd[0]; + [fds addItem: &p]; +#else + FD_SET(cancelFd[0], fdset); + nfds = cancelFd[0] + 1; +#endif } @catch (id e) { [self release]; @throw e; } @@ -196,10 +213,12 @@ @synchronized (queue) { [queue addObject: stream]; [queueInfo addObject: qi]; } + write(cancelFd[1], "", 1); + [pool release]; } - (void)addStreamToObserveForWriting: (OFStream*)stream { @@ -208,10 +227,12 @@ @synchronized (queue) { [queue addObject: stream]; [queueInfo addObject: qi]; } + + write(cancelFd[1], "", 1); [pool release]; } - (void)removeStreamToObserveForReading: (OFStream*)stream @@ -222,10 +243,12 @@ @synchronized (queue) { [queue addObject: stream]; [queueInfo addObject: qi]; } + write(cancelFd[1], "", 1); + [pool release]; } - (void)removeStreamToObserveForWriting: (OFStream*)stream { @@ -234,10 +257,12 @@ @synchronized (queue) { [queue addObject: stream]; [queueInfo addObject: qi]; } + + write(cancelFd[1], "", 1); [pool release]; } - (void)_processQueue @@ -355,10 +380,18 @@ return NO; for (i = 0; i < nfds; i++) { OFNumber *num; OFStream *stream; + + if (fds_c[i].fd == cancelFd[0]) { + char buf; + + read(cancelFd[0], &buf, 1); + + continue; + } if (fds_c[i].revents & POLLIN) { num = [OFNumber numberWithInt: fds_c[i].fd]; stream = [fdToStream objectForKey: num]; [delegate streamDidBecomeReadyForReading: stream]; @@ -393,10 +426,15 @@ # endif if (select(nfds, &readfds_, &writefds_, &exceptfds_, (timeout != -1 ? &tv : NULL)) < 1) return NO; + + if (FD_ISSET(cancelFd[0], &readfds_)) { + char buf; + read(cancelFd[0], &buf, 1); + } for (i = 0; i < count; i++) { int fd = [cArray[i] fileDescriptor]; if (FD_ISSET(fd, &readfds_)) {