@@ -49,10 +49,11 @@ QUEUE_ADD = 0, QUEUE_REMOVE = 1, QUEUE_READ = 0, QUEUE_WRITE = 2 }; +#define QUEUE_ACTION (QUEUE_ADD | QUEUE_REMOVE) @implementation OFStreamObserver + observer { return [[[self alloc] init] autorelease]; @@ -123,10 +124,15 @@ if (getsockname(cancelFD[0], (struct sockaddr*)&cancelAddr, &cancelAddrLen)) @throw [OFInitializationFailedException newWithClass: isa]; #endif + + maxFD = cancelFD[0]; + FDToStream = [self allocMemoryForNItems: maxFD + 1 + withSize: sizeof(OFStream*)]; + FDToStream[cancelFD[0]] = nil; } @catch (id e) { [self release]; @throw e; } @@ -234,10 +240,106 @@ sizeof(cancelAddr)) > 0); #endif [pool release]; } + +- (void)_addStreamToObserveForReading: (OFStream*)strea; +{ + @throw [OFNotImplementedException newWithClass: isa + selector: _cmd]; +} + +- (void)_addStreamToObserveForWriting: (OFStream*)stream +{ + @throw [OFNotImplementedException newWithClass: isa + selector: _cmd]; +} + +- (void)_removeStreamToObserveForReading: (OFStream*)stream +{ + @throw [OFNotImplementedException newWithClass: isa + selector: _cmd]; +} + +- (void)_removeStreamToObserveForWriting: (OFStream*)stream +{ + @throw [OFNotImplementedException newWithClass: isa + selector: _cmd]; +} + +- (void)_processQueue +{ + @synchronized (queue) { + OFStream **queueCArray = [queue cArray]; + OFNumber **queueInfoCArray = [queueInfo cArray]; + size_t i, count = [queue count]; + + for (i = 0; i < count; i++) { + int action = [queueInfoCArray[i] intValue]; + + if ((action & QUEUE_ACTION) == QUEUE_ADD) { + int fd = [queueCArray[i] fileDescriptor]; + + if (fd > maxFD) { + maxFD = fd; + FDToStream = [self + resizeMemory: FDToStream + toNItems: maxFD + 1 + withSize: sizeof(OFStream*)]; + } + + FDToStream[fd] = queueCArray[i]; + } + + if ((action & QUEUE_ACTION) == QUEUE_REMOVE) { + int fd = [queueCArray[i] fileDescriptor]; + + /* FIXME: Maybe downsize? */ + FDToStream[fd] = nil; + } + + switch (action) { + case QUEUE_ADD | QUEUE_READ: + [readStreams addObject: queueCArray[i]]; + + [self _addStreamToObserveForReading: + queueCArray[i]]; + + break; + case QUEUE_ADD | QUEUE_WRITE: + [writeStreams addObject: queueCArray[i]]; + + [self _addStreamToObserveForWriting: + queueCArray[i]]; + + break; + case QUEUE_REMOVE | QUEUE_READ: + [readStreams removeObjectIdenticalTo: + queueCArray[i]]; + + [self _removeStreamToObserveForReading: + queueCArray[i]]; + + break; + case QUEUE_REMOVE | QUEUE_WRITE: + [writeStreams removeObjectIdenticalTo: + queueCArray[i]]; + + [self _removeStreamToObserveForWriting: + queueCArray[i]]; + + break; + default: + assert(0); + } + } + + [queue removeNObjects: count]; + [queueInfo removeNObjects: count]; + } +} - (void)observe { [self observeWithTimeout: -1]; }