Index: src/OFStreamObserver.h ================================================================== --- src/OFStreamObserver.h +++ src/OFStreamObserver.h @@ -25,10 +25,12 @@ #endif @class OFStream; @class OFMutableArray; @class OFMutableDictionary; +@class OFDataArray; +@class OFMutex; /** * \brief A protocol that needs to be implemented by delegates for * OFStreamObserver. */ @@ -78,16 +80,18 @@ { OFMutableArray *readStreams; OFMutableArray *writeStreams; OFStream **FDToStream; size_t maxFD; - OFMutableArray *queue, *queueInfo; + OFMutableArray *queue; + OFDataArray *queueInfo, *queueFDs; id delegate; int cancelFD[2]; #ifdef _WIN32 struct sockaddr_in cancelAddr; #endif + OFMutex *mutex; } #ifdef OF_HAVE_PROPERTIES @property (retain) id delegate; #endif Index: src/OFStreamObserver.m ================================================================== --- src/OFStreamObserver.m +++ src/OFStreamObserver.m @@ -24,14 +24,15 @@ #import "OFStreamObserver.h" #import "OFArray.h" #import "OFDictionary.h" #import "OFStream.h" -#import "OFNumber.h" +#import "OFDataArray.h" #ifdef _WIN32 # import "OFTCPSocket.h" #endif +#import "OFThread.h" #import "OFAutoreleasePool.h" #ifdef HAVE_POLL_H # import "OFStreamObserver_poll.h" #endif @@ -88,11 +89,12 @@ #endif readStreams = [[OFMutableArray alloc] init]; writeStreams = [[OFMutableArray alloc] init]; queue = [[OFMutableArray alloc] init]; - queueInfo = [[OFMutableArray alloc] init]; + queueInfo = [[OFDataArray alloc] initWithItemSize: sizeof(int)]; + queueFDs = [[OFDataArray alloc] initWithItemSize: sizeof(int)]; #ifndef _WIN32 if (pipe(cancelFD)) @throw [OFInitializationFailedException newWithClass: isa]; @@ -129,10 +131,14 @@ maxFD = cancelFD[0]; FDToStream = [self allocMemoryForNItems: maxFD + 1 ofSize: sizeof(OFStream*)]; FDToStream[cancelFD[0]] = nil; + +#ifdef OF_THREADS + mutex = [[OFMutex alloc] init]; +#endif } @catch (id e) { [self release]; @throw e; } @@ -147,10 +153,14 @@ [(id)delegate release]; [readStreams release]; [writeStreams release]; [queue release]; [queueInfo release]; + [queueFDs release]; +#ifdef OF_THREADS + [mutex release]; +#endif [super dealloc]; } - (id )delegate @@ -163,177 +173,186 @@ OF_SETTER(delegate, delegate_, YES, NO) } - (void)addStreamForReading: (OFStream*)stream { - OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; - OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_READ]; + [mutex lock]; + @try { + int qi = QUEUE_ADD | QUEUE_READ; + int fd = [stream fileDescriptor]; - @synchronized (queue) { [queue addObject: stream]; - [queueInfo addObject: qi]; + [queueInfo addItem: &qi]; + [queueFDs addItem: &fd]; + } @finally { + [mutex unlock]; } #ifndef _WIN32 assert(write(cancelFD[1], "", 1) > 0); #else assert(sendto(cancelFD[1], "", 1, 0, (struct sockaddr*)&cancelAddr, sizeof(cancelAddr)) > 0); #endif - - [pool release]; } - (void)addStreamForWriting: (OFStream*)stream { - OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; - OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_WRITE]; + [mutex lock]; + @try { + int qi = QUEUE_ADD | QUEUE_WRITE; + int fd = [stream fileDescriptor]; - @synchronized (queue) { [queue addObject: stream]; - [queueInfo addObject: qi]; + [queueInfo addItem: &qi]; + [queueFDs addItem: &fd]; + } @finally { + [mutex unlock]; } #ifndef _WIN32 assert(write(cancelFD[1], "", 1) > 0); #else assert(sendto(cancelFD[1], "", 1, 0, (struct sockaddr*)&cancelAddr, sizeof(cancelAddr)) > 0); #endif - - [pool release]; } - (void)removeStreamForReading: (OFStream*)stream { - OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; - OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_READ]; + [mutex lock]; + @try { + int qi = QUEUE_REMOVE | QUEUE_READ; + int fd = [stream fileDescriptor]; - @synchronized (queue) { [queue addObject: stream]; - [queueInfo addObject: qi]; + [queueInfo addItem: &qi]; + [queueFDs addItem: &fd]; + } @finally { + [mutex unlock]; } #ifndef _WIN32 assert(write(cancelFD[1], "", 1) > 0); #else assert(sendto(cancelFD[1], "", 1, 0, (struct sockaddr*)&cancelAddr, sizeof(cancelAddr)) > 0); #endif - - [pool release]; } - (void)removeStreamForWriting: (OFStream*)stream { - OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; - OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_WRITE]; + [mutex lock]; + @try { + int qi = QUEUE_REMOVE | QUEUE_WRITE; + int fd = [stream fileDescriptor]; - @synchronized (queue) { [queue addObject: stream]; - [queueInfo addObject: qi]; + [queueInfo addItem: &qi]; + [queueFDs addItem: &fd]; + } @finally { + [mutex unlock]; } #ifndef _WIN32 assert(write(cancelFD[1], "", 1) > 0); #else assert(sendto(cancelFD[1], "", 1, 0, (struct sockaddr*)&cancelAddr, sizeof(cancelAddr)) > 0); #endif +} - [pool release]; +- (void)_addFileDescriptorForReading: (int)fd +{ + @throw [OFNotImplementedException newWithClass: isa + selector: _cmd]; } -- (void)_addStreamForReading: (OFStream*)strea; +- (void)_addFileDescriptorForWriting: (int)fd { @throw [OFNotImplementedException newWithClass: isa selector: _cmd]; } -- (void)_addStreamForWriting: (OFStream*)stream +- (void)_removeFileDescriptorForReading: (int)fd { @throw [OFNotImplementedException newWithClass: isa selector: _cmd]; } -- (void)_removeStreamForReading: (OFStream*)stream -{ - @throw [OFNotImplementedException newWithClass: isa - selector: _cmd]; -} - -- (void)_removeStreamForWriting: (OFStream*)stream +- (void)_removeFileDescriptorForWriting: (int)fd { @throw [OFNotImplementedException newWithClass: isa selector: _cmd]; } - (void)_processQueue { - @synchronized (queue) { + [mutex lock]; + @try { OFStream **queueCArray = [queue cArray]; - OFNumber **queueInfoCArray = [queueInfo cArray]; + int *queueInfoCArray = [queueInfo cArray]; + int *queueFDsCArray = [queueFDs cArray]; size_t i, count = [queue count]; for (i = 0; i < count; i++) { - int action = [queueInfoCArray[i] intValue]; + OFStream *stream = queueCArray[i]; + int action = queueInfoCArray[i]; + int fd = queueFDsCArray[i]; if ((action & QUEUE_ACTION) == QUEUE_ADD) { - int fd = [queueCArray[i] fileDescriptor]; - if (fd > maxFD) { maxFD = fd; FDToStream = [self resizeMemory: FDToStream toNItems: maxFD + 1 ofSize: sizeof(OFStream*)]; } - FDToStream[fd] = queueCArray[i]; + FDToStream[fd] = stream; } if ((action & QUEUE_ACTION) == QUEUE_REMOVE) { - int fd = [queueCArray[i] fileDescriptor]; - /* FIXME: Maybe downsize? */ FDToStream[fd] = nil; } switch (action) { case QUEUE_ADD | QUEUE_READ: - [readStreams addObject: queueCArray[i]]; + [readStreams addObject: stream]; - [self _addStreamForReading: queueCArray[i]]; + [self _addFileDescriptorForReading: fd]; break; case QUEUE_ADD | QUEUE_WRITE: - [writeStreams addObject: queueCArray[i]]; + [writeStreams addObject: stream]; - [self _addStreamForWriting: queueCArray[i]]; + [self _addFileDescriptorForWriting: fd]; break; case QUEUE_REMOVE | QUEUE_READ: - [readStreams removeObjectIdenticalTo: - queueCArray[i]]; + [readStreams removeObjectIdenticalTo: stream]; - [self _removeStreamForReading: queueCArray[i]]; + [self _removeFileDescriptorForReading: fd]; break; case QUEUE_REMOVE | QUEUE_WRITE: - [writeStreams removeObjectIdenticalTo: - queueCArray[i]]; + [writeStreams removeObjectIdenticalTo: stream]; - [self _removeStreamForWriting: queueCArray[i]]; + [self _removeFileDescriptorForWriting: fd]; break; default: assert(0); } } [queue removeNObjects: count]; - [queueInfo removeNObjects: count]; + [queueInfo removeNItems: count]; + [queueFDs removeNItems: count]; + } @finally { + [mutex unlock]; } } - (void)observe { Index: src/OFStreamObserver_poll.m ================================================================== --- src/OFStreamObserver_poll.m +++ src/OFStreamObserver_poll.m @@ -58,16 +58,15 @@ [FDs release]; [super dealloc]; } -- (void)_addStream: (OFStream*)stream - withEvents: (short)events +- (void)_addFileDescriptor: (int)fd + withEvents: (short)events { struct pollfd *FDsCArray = [FDs cArray]; size_t i, count = [FDs count]; - int fd = [stream fileDescriptor]; BOOL found = NO; for (i = 0; i < count; i++) { if (FDsCArray[i].fd == fd) { FDsCArray[i].events |= events; @@ -80,19 +79,18 @@ struct pollfd p = { fd, events | POLLERR, 0 }; [FDs addItem: &p]; } } -- (void)_removeStream: (OFStream*)stream - withEvents: (short)events +- (void)_removeFileDescriptor: (int)fd + withEvents: (short)events { struct pollfd *FDsCArray = [FDs cArray]; size_t i, nFDs = [FDs count]; - int fileDescriptor = [stream fileDescriptor]; for (i = 0; i < nFDs; i++) { - if (FDsCArray[i].fd == fileDescriptor) { + if (FDsCArray[i].fd == fd) { FDsCArray[i].events &= ~events; if ((FDsCArray[i].events & ~POLLERR) == 0) [FDs removeItemAtIndex: i]; @@ -99,32 +97,32 @@ break; } } } -- (void)_addStreamForReading: (OFStream*)stream -{ - [self _addStream: stream - withEvents: POLLIN]; -} - -- (void)_addStreamForWriting: (OFStream*)stream -{ - [self _addStream: stream - withEvents: POLLOUT]; -} - -- (void)_removeStreamForReading: (OFStream*)stream -{ - [self _removeStream: stream - withEvents: POLLIN]; -} - -- (void)_removeStreamForWriting: (OFStream*)stream -{ - [self _removeStream: stream - withEvents: POLLOUT]; +- (void)_addFileDescriptorForReading: (int)fd +{ + [self _addFileDescriptor: fd + withEvents: POLLIN]; +} + +- (void)_addFileDescriptorForWriting: (int)fd +{ + [self _addFileDescriptor: fd + withEvents: POLLOUT]; +} + +- (void)_removeFileDescriptorForReading: (int)fd +{ + [self _removeFileDescriptor: fd + withEvents: POLLIN]; +} + +- (void)_removeFileDescriptorForWriting: (int)fd +{ + [self _removeFileDescriptor: fd + withEvents: POLLOUT]; } - (BOOL)observeWithTimeout: (int)timeout { OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; Index: src/OFStreamObserver_select.m ================================================================== --- src/OFStreamObserver_select.m +++ src/OFStreamObserver_select.m @@ -39,40 +39,32 @@ FD_SET(cancelFD[0], &readFDs); return self; } -- (void)_addStreamForReading: (OFStream*)stream +- (void)_addFileDescriptorForReading: (int)fd { - int fd = [stream fileDescriptor]; - FD_SET(fd, &readFDs); FD_SET(fd, &exceptFDs); } -- (void)_addStreamForWriting: (OFStream*)stream +- (void)_addFileDescriptorForWriting: (int)fd { - int fd = [stream fileDescriptor]; - FD_SET(fd, &writeFDs); FD_SET(fd, &exceptFDs); } -- (void)_removeStreamForReading: (OFStream*)stream +- (void)_removeFileDescriptorForReading: (int)fd { - int fd = [stream fileDescriptor]; - FD_CLR(fd, &readFDs); if (!FD_ISSET(fd, &writeFDs)) FD_CLR(fd, &exceptFDs); } -- (void)_removeStreamForWriting: (OFStream*)stream +- (void)_removeFileDescriptorForWriting: (int)fd { - int fd = [stream fileDescriptor]; - FD_CLR(fd, &writeFDs); if (!FD_ISSET(fd, &readFDs)) FD_CLR(fd, &exceptFDs); }