@@ -24,14 +24,15 @@ #import "OFStreamObserver.h" #import "OFArray.h" #import "OFDictionary.h" #import "OFStream.h" -#import "OFNumber.h" +#import "OFDataArray.h" #ifdef _WIN32 # import "OFTCPSocket.h" #endif +#import "OFThread.h" #import "OFAutoreleasePool.h" #ifdef HAVE_POLL_H # import "OFStreamObserver_poll.h" #endif @@ -88,11 +89,12 @@ #endif readStreams = [[OFMutableArray alloc] init]; writeStreams = [[OFMutableArray alloc] init]; queue = [[OFMutableArray alloc] init]; - queueInfo = [[OFMutableArray alloc] init]; + queueInfo = [[OFDataArray alloc] initWithItemSize: sizeof(int)]; + queueFDs = [[OFDataArray alloc] initWithItemSize: sizeof(int)]; #ifndef _WIN32 if (pipe(cancelFD)) @throw [OFInitializationFailedException newWithClass: isa]; @@ -129,10 +131,14 @@ maxFD = cancelFD[0]; FDToStream = [self allocMemoryForNItems: maxFD + 1 ofSize: sizeof(OFStream*)]; FDToStream[cancelFD[0]] = nil; + +#ifdef OF_THREADS + mutex = [[OFMutex alloc] init]; +#endif } @catch (id e) { [self release]; @throw e; } @@ -147,10 +153,14 @@ [(id)delegate release]; [readStreams release]; [writeStreams release]; [queue release]; [queueInfo release]; + [queueFDs release]; +#ifdef OF_THREADS + [mutex release]; +#endif [super dealloc]; } - (id )delegate @@ -163,177 +173,186 @@ OF_SETTER(delegate, delegate_, YES, NO) } - (void)addStreamForReading: (OFStream*)stream { - OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; - OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_READ]; + [mutex lock]; + @try { + int qi = QUEUE_ADD | QUEUE_READ; + int fd = [stream fileDescriptor]; - @synchronized (queue) { [queue addObject: stream]; - [queueInfo addObject: qi]; + [queueInfo addItem: &qi]; + [queueFDs addItem: &fd]; + } @finally { + [mutex unlock]; } #ifndef _WIN32 assert(write(cancelFD[1], "", 1) > 0); #else assert(sendto(cancelFD[1], "", 1, 0, (struct sockaddr*)&cancelAddr, sizeof(cancelAddr)) > 0); #endif - - [pool release]; } - (void)addStreamForWriting: (OFStream*)stream { - OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; - OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_WRITE]; + [mutex lock]; + @try { + int qi = QUEUE_ADD | QUEUE_WRITE; + int fd = [stream fileDescriptor]; - @synchronized (queue) { [queue addObject: stream]; - [queueInfo addObject: qi]; + [queueInfo addItem: &qi]; + [queueFDs addItem: &fd]; + } @finally { + [mutex unlock]; } #ifndef _WIN32 assert(write(cancelFD[1], "", 1) > 0); #else assert(sendto(cancelFD[1], "", 1, 0, (struct sockaddr*)&cancelAddr, sizeof(cancelAddr)) > 0); #endif - - [pool release]; } - (void)removeStreamForReading: (OFStream*)stream { - OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; - OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_READ]; + [mutex lock]; + @try { + int qi = QUEUE_REMOVE | QUEUE_READ; + int fd = [stream fileDescriptor]; - @synchronized (queue) { [queue addObject: stream]; - [queueInfo addObject: qi]; + [queueInfo addItem: &qi]; + [queueFDs addItem: &fd]; + } @finally { + [mutex unlock]; } #ifndef _WIN32 assert(write(cancelFD[1], "", 1) > 0); #else assert(sendto(cancelFD[1], "", 1, 0, (struct sockaddr*)&cancelAddr, sizeof(cancelAddr)) > 0); #endif - - [pool release]; } - (void)removeStreamForWriting: (OFStream*)stream { - OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; - OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_WRITE]; + [mutex lock]; + @try { + int qi = QUEUE_REMOVE | QUEUE_WRITE; + int fd = [stream fileDescriptor]; - @synchronized (queue) { [queue addObject: stream]; - [queueInfo addObject: qi]; + [queueInfo addItem: &qi]; + [queueFDs addItem: &fd]; + } @finally { + [mutex unlock]; } #ifndef _WIN32 assert(write(cancelFD[1], "", 1) > 0); #else assert(sendto(cancelFD[1], "", 1, 0, (struct sockaddr*)&cancelAddr, sizeof(cancelAddr)) > 0); #endif +} - [pool release]; +- (void)_addFileDescriptorForReading: (int)fd +{ + @throw [OFNotImplementedException newWithClass: isa + selector: _cmd]; } -- (void)_addStreamForReading: (OFStream*)strea; +- (void)_addFileDescriptorForWriting: (int)fd { @throw [OFNotImplementedException newWithClass: isa selector: _cmd]; } -- (void)_addStreamForWriting: (OFStream*)stream +- (void)_removeFileDescriptorForReading: (int)fd { @throw [OFNotImplementedException newWithClass: isa selector: _cmd]; } -- (void)_removeStreamForReading: (OFStream*)stream -{ - @throw [OFNotImplementedException newWithClass: isa - selector: _cmd]; -} - -- (void)_removeStreamForWriting: (OFStream*)stream +- (void)_removeFileDescriptorForWriting: (int)fd { @throw [OFNotImplementedException newWithClass: isa selector: _cmd]; } - (void)_processQueue { - @synchronized (queue) { + [mutex lock]; + @try { OFStream **queueCArray = [queue cArray]; - OFNumber **queueInfoCArray = [queueInfo cArray]; + int *queueInfoCArray = [queueInfo cArray]; + int *queueFDsCArray = [queueFDs cArray]; size_t i, count = [queue count]; for (i = 0; i < count; i++) { - int action = [queueInfoCArray[i] intValue]; + OFStream *stream = queueCArray[i]; + int action = queueInfoCArray[i]; + int fd = queueFDsCArray[i]; if ((action & QUEUE_ACTION) == QUEUE_ADD) { - int fd = [queueCArray[i] fileDescriptor]; - if (fd > maxFD) { maxFD = fd; FDToStream = [self resizeMemory: FDToStream toNItems: maxFD + 1 ofSize: sizeof(OFStream*)]; } - FDToStream[fd] = queueCArray[i]; + FDToStream[fd] = stream; } if ((action & QUEUE_ACTION) == QUEUE_REMOVE) { - int fd = [queueCArray[i] fileDescriptor]; - /* FIXME: Maybe downsize? */ FDToStream[fd] = nil; } switch (action) { case QUEUE_ADD | QUEUE_READ: - [readStreams addObject: queueCArray[i]]; + [readStreams addObject: stream]; - [self _addStreamForReading: queueCArray[i]]; + [self _addFileDescriptorForReading: fd]; break; case QUEUE_ADD | QUEUE_WRITE: - [writeStreams addObject: queueCArray[i]]; + [writeStreams addObject: stream]; - [self _addStreamForWriting: queueCArray[i]]; + [self _addFileDescriptorForWriting: fd]; break; case QUEUE_REMOVE | QUEUE_READ: - [readStreams removeObjectIdenticalTo: - queueCArray[i]]; + [readStreams removeObjectIdenticalTo: stream]; - [self _removeStreamForReading: queueCArray[i]]; + [self _removeFileDescriptorForReading: fd]; break; case QUEUE_REMOVE | QUEUE_WRITE: - [writeStreams removeObjectIdenticalTo: - queueCArray[i]]; + [writeStreams removeObjectIdenticalTo: stream]; - [self _removeStreamForWriting: queueCArray[i]]; + [self _removeFileDescriptorForWriting: fd]; break; default: assert(0); } } [queue removeNObjects: count]; - [queueInfo removeNObjects: count]; + [queueInfo removeNItems: count]; + [queueFDs removeNItems: count]; + } @finally { + [mutex unlock]; } } - (void)observe {