@@ -100,58 +100,59 @@ #ifdef _WIN32 struct sockaddr_in cancelAddr2; socklen_t cancelAddrLen; #endif - readStreams = [[OFMutableArray alloc] init]; - writeStreams = [[OFMutableArray alloc] init]; - queue = [[OFMutableArray alloc] init]; - queueInfo = [[OFDataArray alloc] initWithItemSize: sizeof(int)]; - queueFDs = [[OFDataArray alloc] initWithItemSize: sizeof(int)]; + _readStreams = [[OFMutableArray alloc] init]; + _writeStreams = [[OFMutableArray alloc] init]; + _queue = [[OFMutableArray alloc] init]; + _queueInfo = [[OFDataArray alloc] + initWithItemSize: sizeof(int)]; + _queueFDs = [[OFDataArray alloc] initWithItemSize: sizeof(int)]; #ifndef _WIN32 - if (pipe(cancelFD)) + if (pipe(_cancelFD)) @throw [OFInitializationFailedException exceptionWithClass: [self class]]; #else /* Make sure WSAStartup has been called */ [OFTCPSocket class]; - cancelFD[0] = socket(AF_INET, SOCK_DGRAM, 0); - cancelFD[1] = socket(AF_INET, SOCK_DGRAM, 0); + _cancelFD[0] = socket(AF_INET, SOCK_DGRAM, 0); + _cancelFD[1] = socket(AF_INET, SOCK_DGRAM, 0); - if (cancelFD[0] == INVALID_SOCKET || - cancelFD[1] == INVALID_SOCKET) + if (_cancelFD[0] == INVALID_SOCKET || + _cancelFD[1] == INVALID_SOCKET) @throw [OFInitializationFailedException exceptionWithClass: [self class]]; - cancelAddr.sin_family = AF_INET; - cancelAddr.sin_port = 0; - cancelAddr.sin_addr.s_addr = inet_addr("127.0.0.1"); - cancelAddr2 = cancelAddr; + _cancelAddr.sin_family = AF_INET; + _cancelAddr.sin_port = 0; + _cancelAddr.sin_addr.s_addr = inet_addr("127.0.0.1"); + cancelAddr2 = _cancelAddr; - if (bind(cancelFD[0], (struct sockaddr*)&cancelAddr, - sizeof(cancelAddr)) || bind(cancelFD[1], + if (bind(_cancelFD[0], (struct sockaddr*)&_cancelAddr, + sizeof(_cancelAddr)) || bind(_cancelFD[1], (struct sockaddr*)&cancelAddr2, sizeof(cancelAddr2))) @throw [OFInitializationFailedException exceptionWithClass: [self class]]; - cancelAddrLen = sizeof(cancelAddr); + cancelAddrLen = sizeof(_cancelAddr); - if (getsockname(cancelFD[0], (struct sockaddr*)&cancelAddr, + if (getsockname(_cancelFD[0], (struct sockaddr*)&_cancelAddr, &cancelAddrLen)) @throw [OFInitializationFailedException exceptionWithClass: [self class]]; #endif - maxFD = cancelFD[0]; - FDToStream = [self allocMemoryWithSize: sizeof(OFStream*) - count: maxFD + 1]; - FDToStream[cancelFD[0]] = nil; + _maxFD = _cancelFD[0]; + _FDToStream = [self allocMemoryWithSize: sizeof(OFStream*) + count: _maxFD + 1]; + _FDToStream[_cancelFD[0]] = nil; #ifdef OF_HAVE_THREADS - mutex = [[OFMutex alloc] init]; + _mutex = [[OFMutex alloc] init]; #endif } @catch (id e) { [self release]; @throw e; } @@ -159,127 +160,117 @@ return self; } - (void)dealloc { - close(cancelFD[0]); - close(cancelFD[1]); - - [readStreams release]; - [writeStreams release]; - [queue release]; - [queueInfo release]; - [queueFDs release]; + close(_cancelFD[0]); + close(_cancelFD[1]); + + [_readStreams release]; + [_writeStreams release]; + [_queue release]; + [_queueInfo release]; + [_queueFDs release]; #ifdef OF_HAVE_THREADS - [mutex release]; + [_mutex release]; #endif [super dealloc]; } - (id )delegate { - return delegate; + return _delegate; } -- (void)setDelegate: (id )delegate_ +- (void)setDelegate: (id )delegate { - delegate = delegate_; + _delegate = delegate; } - (void)addStreamForReading: (OFStream*)stream { #ifdef OF_HAVE_THREADS - [mutex lock]; + [_mutex lock]; #endif @try { int qi = QUEUE_ADD | QUEUE_READ; int fd = [stream fileDescriptorForReading]; - [queue addObject: stream]; - [queueInfo addItem: &qi]; - [queueFDs addItem: &fd]; + [_queue addObject: stream]; + [_queueInfo addItem: &qi]; + [_queueFDs addItem: &fd]; } @finally { #ifdef OF_HAVE_THREADS - [mutex unlock]; + [_mutex unlock]; #endif } [self cancel]; } - (void)addStreamForWriting: (OFStream*)stream { #ifdef OF_HAVE_THREADS - [mutex lock]; + [_mutex lock]; #endif @try { int qi = QUEUE_ADD | QUEUE_WRITE; int fd = [stream fileDescriptorForWriting]; - [queue addObject: stream]; - [queueInfo addItem: &qi]; - [queueFDs addItem: &fd]; + [_queue addObject: stream]; + [_queueInfo addItem: &qi]; + [_queueFDs addItem: &fd]; } @finally { #ifdef OF_HAVE_THREADS - [mutex unlock]; + [_mutex unlock]; #endif } [self cancel]; } - (void)removeStreamForReading: (OFStream*)stream { #ifdef OF_HAVE_THREADS - [mutex lock]; + [_mutex lock]; #endif @try { int qi = QUEUE_REMOVE | QUEUE_READ; int fd = [stream fileDescriptorForReading]; - [queue addObject: stream]; - [queueInfo addItem: &qi]; - [queueFDs addItem: &fd]; + [_queue addObject: stream]; + [_queueInfo addItem: &qi]; + [_queueFDs addItem: &fd]; } @finally { #ifdef OF_HAVE_THREADS - [mutex unlock]; + [_mutex unlock]; #endif } -#ifndef _WIN32 - OF_ENSURE(write(cancelFD[1], "", 1) > 0); -#else - OF_ENSURE(sendto(cancelFD[1], "", 1, 0, (struct sockaddr*)&cancelAddr, - sizeof(cancelAddr)) > 0); -#endif + [self cancel]; } - (void)removeStreamForWriting: (OFStream*)stream { #ifdef OF_HAVE_THREADS - [mutex lock]; + [_mutex lock]; #endif @try { int qi = QUEUE_REMOVE | QUEUE_WRITE; int fd = [stream fileDescriptorForWriting]; - [queue addObject: stream]; - [queueInfo addItem: &qi]; - [queueFDs addItem: &fd]; + [_queue addObject: stream]; + [_queueInfo addItem: &qi]; + [_queueFDs addItem: &fd]; } @finally { #ifdef OF_HAVE_THREADS - [mutex unlock]; + [_mutex unlock]; #endif } -#ifndef _WIN32 - OF_ENSURE(write(cancelFD[1], "", 1) > 0); -#else - OF_ENSURE(sendto(cancelFD[1], "", 1, 0, (struct sockaddr*)&cancelAddr, - sizeof(cancelAddr)) > 0); -#endif + [self cancel]; } - (void)OF_addFileDescriptorForReading: (int)fd { [self doesNotRecognizeSelector: _cmd]; @@ -305,76 +296,76 @@ } - (void)OF_processQueue { #ifdef OF_HAVE_THREADS - [mutex lock]; + [_mutex lock]; #endif @try { - OFStream **queueObjects = [queue objects]; - int *queueInfoItems = [queueInfo items]; - int *queueFDsItems = [queueFDs items]; - size_t i, count = [queue count]; + OFStream **queueObjects = [_queue objects]; + int *queueInfoItems = [_queueInfo items]; + int *queueFDsItems = [_queueFDs items]; + size_t i, count = [_queue count]; for (i = 0; i < count; i++) { OFStream *stream = queueObjects[i]; int action = queueInfoItems[i]; int fd = queueFDsItems[i]; if ((action & QUEUE_ACTION) == QUEUE_ADD) { - if (fd > maxFD) { - maxFD = fd; - FDToStream = [self - resizeMemory: FDToStream + if (fd > _maxFD) { + _maxFD = fd; + _FDToStream = [self + resizeMemory: _FDToStream size: sizeof(OFStream*) - count: maxFD + 1]; + count: _maxFD + 1]; } - FDToStream[fd] = stream; + _FDToStream[fd] = stream; } if ((action & QUEUE_ACTION) == QUEUE_REMOVE) { /* FIXME: Maybe downsize? */ - FDToStream[fd] = nil; + _FDToStream[fd] = nil; } switch (action) { case QUEUE_ADD | QUEUE_READ: - [readStreams addObject: stream]; + [_readStreams addObject: stream]; [self OF_addFileDescriptorForReading: fd]; break; case QUEUE_ADD | QUEUE_WRITE: - [writeStreams addObject: stream]; + [_writeStreams addObject: stream]; [self OF_addFileDescriptorForWriting: fd]; break; case QUEUE_REMOVE | QUEUE_READ: - [readStreams removeObjectIdenticalTo: stream]; + [_readStreams removeObjectIdenticalTo: stream]; [self OF_removeFileDescriptorForReading: fd]; break; case QUEUE_REMOVE | QUEUE_WRITE: - [writeStreams removeObjectIdenticalTo: stream]; + [_writeStreams removeObjectIdenticalTo: stream]; [self OF_removeFileDescriptorForWriting: fd]; break; default: assert(0); } } - [queue removeAllObjects]; - [queueInfo removeAllItems]; - [queueFDs removeAllItems]; + [_queue removeAllObjects]; + [_queueInfo removeAllItems]; + [_queueFDs removeAllItems]; } @finally { #ifdef OF_HAVE_THREADS - [mutex unlock]; + [_mutex unlock]; #endif } } - (void)observe @@ -389,32 +380,31 @@ } - (void)cancel { #ifndef _WIN32 - OF_ENSURE(write(cancelFD[1], "", 1) > 0); + OF_ENSURE(write(_cancelFD[1], "", 1) > 0); #else - OF_ENSURE(sendto(cancelFD[1], "", 1, 0, (struct sockaddr*)&cancelAddr, - sizeof(cancelAddr)) > 0); + OF_ENSURE(sendto(_cancelFD[1], "", 1, 0, (struct sockaddr*)&_cancelAddr, + sizeof(_cancelAddr)) > 0); #endif } - (BOOL)OF_processCache { - OFStream **objects = [readStreams objects]; - size_t i, count = [readStreams count]; + OFStream **objects = [_readStreams objects]; + size_t i, count = [_readStreams count]; BOOL foundInCache = NO; - for (i = 0; i < count; i++) { if ([objects[i] pendingBytes] > 0 && ![objects[i] OF_isWaitingForDelimiter]) { void *pool = objc_autoreleasePoolPush(); - if ([delegate respondsToSelector: + if ([_delegate respondsToSelector: @selector(streamIsReadyForReading:)]) - [delegate streamIsReadyForReading: objects[i]]; + [_delegate streamIsReadyForReading: objects[i]]; foundInCache = YES; objc_autoreleasePoolPop(pool); }