Index: src/OFKernelEventObserver.h ================================================================== --- src/OFKernelEventObserver.h +++ src/OFKernelEventObserver.h @@ -16,11 +16,10 @@ #import "OFObject.h" #import "socket.h" -@class OFStream; @class OFMutableArray; @class OFMutableDictionary; @class OFDataArray; #ifdef OF_HAVE_THREADS @class OFMutex; @@ -34,50 +33,74 @@ @protocol OFKernelEventObserverDelegate #ifdef OF_HAVE_OPTIONAL_PROTOCOLS @optional #endif /*! - * @brief This callback is called when a stream did get ready for reading. - * - * @note When @ref OFStream::tryReadLine or - * @ref OFStream::tryReadTillDelimiter: has been called on the stream, - * this callback will not be called again until new data has been - * received, even though there is still data in the cache. The reason for - * this is to prevent spinning in a loop when there is an incomplete - * string in the cache. Once the string is complete, the callback will be - * called again if there is data in the cache. - * - * @param stream The stream which did become ready for reading - */ -- (void)streamIsReadyForReading: (OFStream*)stream; - -/*! - * @brief This callback is called when a stream did get ready for writing. - * - * @param stream The stream which did become ready for writing - */ -- (void)streamIsReadyForWriting: (OFStream*)stream; - -/*! - * @brief This callback is called when an exception occurred on the stream. - * - * @param stream The stream on which an exception occurred - */ -- (void)streamDidReceiveException: (OFStream*)stream; + * @brief This callback is called when an object did get ready for reading. + * + * @note If the object is a subclass of @ref OFStream and + * @ref OFStream::tryReadLine or @ref OFStream::tryReadTillDelimiter: has + * been called on the stream, this callback will not be called again + * until new data has been received, even though there is still data in + * the cache. The reason for this is to prevent spinning in a loop when + * there is an incomplete string in the cache. Once the string has been + * completed, the callback will be called again as long there is data in + * the cache. + * + * @param object The object which did become ready for reading + */ +- (void)objectIsReadyForReading: (id)object; + +/*! + * @brief This callback is called when an object did get ready for writing. + * + * @param object The object which did become ready for writing + */ +- (void)objectIsReadyForWriting: (id)object; +@end + +/*! + * @brief This protocol is implemented by classes which can be observed for + * readiness for reading by OFKernelEventObserver. + */ +@protocol OFReadyForReadingObserving +/*! + * @brief Returns the file descriptor for reading that should be checked by the + * OFKernelEventObserver. + * + * @return The file descriptor for reading that should be checked by the + * OFKernelEventObserver + */ +- (int)fileDescriptorForReading; +@end + +/*! + * @brief This protocol is implemented by classes which can be observed for + * readiness for writing by OFKernelEventObserver. + */ +@protocol OFReadyForWritingObserving +/*! + * @brief Returns the file descriptor for writing that should be checked by the + * OFKernelEventObserver. + * + * @return The file descriptor for writing that should be checked by the + * OFKernelEventObserver + */ +- (int)fileDescriptorForWriting; @end /*! * @brief A class that can observe multiple kernel events (e.g. streams being * ready to read) at once. * - * @note Currently, Win32 can only observe sockets and not files! + * @note Currently, Win32 can only observe TCP sockets! */ @interface OFKernelEventObserver: OFObject { - OFMutableArray *_readStreams; - OFMutableArray *_writeStreams; - __unsafe_unretained OFStream **_FDToStream; + OFMutableArray *_readObjects; + OFMutableArray *_writeObjects; + __unsafe_unretained id *_FDToObject; size_t _maxFD; OFMutableArray *_queue; OFDataArray *_queueInfo, *_queueFDs; id _delegate; int _cancelFD[2]; @@ -113,87 +136,83 @@ * @param delegate The delegate for the OFKernelEventObserver */ - (void)setDelegate: (id )delegate; /*! - * @brief Adds a stream to observe for reading. - * - * This is also used to observe a listening socket for incoming connections, - * which then triggers a read event for the observed stream. - * - * It is recommended that the stream you add is set to non-blocking mode. - * - * If there is an @ref observe call blocking, it will be canceled. The reason - * for this is to prevent blocking even though the new added stream is ready. - * - * @param stream The stream to observe for reading - */ -- (void)addStreamForReading: (OFStream*)stream; - -/*! - * @brief Adds a stream to observe for writing. - * - * It is recommended that the stream you add is set to non-blocking mode. - * - * If there is an @ref observe call blocking, it will be canceled. The reason - * for this is to prevent blocking even though the new added stream is ready. - * - * @param stream The stream to observe for writing - */ -- (void)addStreamForWriting: (OFStream*)stream; - -/*! - * @brief Removes a stream to observe for reading. - * - * If there is an @ref observe call blocking, it will be canceled. The reason - * for this is to prevent the removed stream from still being observed. - * - * @param stream The stream to remove from observing for reading - */ -- (void)removeStreamForReading: (OFStream*)stream; - -/*! - * @brief Removes a stream to observe for writing. - * - * If there is an @ref observe call blocking, it will be canceled. The reason - * for this is to prevent the removed stream from still being observed. - * - * @param stream The stream to remove from observing for writing - */ -- (void)removeStreamForWriting: (OFStream*)stream; - -/*! - * @brief Observes all streams and blocks until an event happens on a stream. + * @brief Adds an object to observe for reading. + * + * This is also used to observe a listening socket for incoming connections, + * which then triggers a read event for the observed object. + * + * If there is an @ref observe call blocking, it will be canceled. The reason + * for this is to prevent blocking even though the newly added object is ready. + * + * @param object The object to observe for reading + */ +- (void)addObjectForReading: (id )object; + +/*! + * @brief Adds an object to observe for writing. + * + * If there is an @ref observe call blocking, it will be canceled. The reason + * for this is to prevent blocking even though the newly added object is ready. + * + * @param object The object to observe for writing + */ +- (void)addObjectForWriting: (id )object; + +/*! + * @brief Removes an object to observe for reading. + * + * If there is an @ref observe call blocking, it will be canceled. The reason + * for this is to prevent the removed object from still being observed. + * + * @param object The object to remove from observing for reading + */ +- (void)removeObjectForReading: (id )object; + +/*! + * @brief Removes an object to observe for writing. + * + * If there is an @ref observe call blocking, it will be canceled. The reason + * for this is to prevent the removed object from still being observed. + * + * @param object The object to remove from observing for writing + */ +- (void)removeObjectForWriting: (id )object; + +/*! + * @brief Observes all objects and blocks until an event happens on an object. */ - (void)observe; /*! - * @brief Observes all streams until an event happens on a stream or the + * @brief Observes all objects until an event happens on an object or the * timeout is reached. * * @param timeInterval The time to wait for an event, in seconds * @return A boolean whether events occurred during the timeinterval */ - (bool)observeForTimeInterval: (of_time_interval_t)timeInterval; /*! - * @brief Observes all streams until an event happens on a stream or the - * timeout is reached. + * @brief Observes all objects until an event happens on an object or the + * specified date is reached. * * @param date The until which to observe * @return A boolean whether events occurred until the specified date */ - (bool)observeUntilDate: (OFDate*)date; /*! * @brief Cancels the currently blocking observe call. * - * This is automatically done when a new stream is added or removed by another + * This is automatically done when a new object is added or removed by another * thread, but in some circumstances, it might be desirable for a thread to * manually stop the observe running in another thread. */ - (void)cancel; @end @interface OFObject (OFKernelEventObserverDelegate) @end Index: src/OFKernelEventObserver.m ================================================================== --- src/OFKernelEventObserver.m +++ src/OFKernelEventObserver.m @@ -44,10 +44,11 @@ #if defined(HAVE_SYS_SELECT_H) || defined(_WIN32) # import "OFKernelEventObserver_select.h" #endif #import "OFInitializationFailedException.h" +#import "OFInvalidArgumentException.h" #import "OFOutOfRangeException.h" #import "autorelease.h" #import "macros.h" #import "socket_helpers.h" @@ -102,12 +103,12 @@ # ifndef __wii__ socklen_t cancelAddrLen; # endif #endif - _readStreams = [[OFMutableArray alloc] init]; - _writeStreams = [[OFMutableArray alloc] init]; + _readObjects = [[OFMutableArray alloc] init]; + _writeObjects = [[OFMutableArray alloc] init]; _queue = [[OFMutableArray alloc] init]; _queueInfo = [[OFDataArray alloc] initWithItemSize: sizeof(int)]; _queueFDs = [[OFDataArray alloc] initWithItemSize: sizeof(int)]; @@ -152,13 +153,13 @@ exceptionWithClass: [self class]]; # endif #endif _maxFD = _cancelFD[0]; - _FDToStream = [self allocMemoryWithSize: sizeof(OFStream*) + _FDToObject = [self allocMemoryWithSize: sizeof(id) count: _maxFD + 1]; - _FDToStream[_cancelFD[0]] = nil; + _FDToObject[_cancelFD[0]] = nil; #ifdef OF_HAVE_THREADS _mutex = [[OFMutex alloc] init]; #endif } @catch (id e) { @@ -172,12 +173,12 @@ - (void)dealloc { close(_cancelFD[0]); close(_cancelFD[1]); - [_readStreams release]; - [_writeStreams release]; + [_readObjects release]; + [_writeObjects release]; [_queue release]; [_queueInfo release]; [_queueFDs release]; #ifdef OF_HAVE_THREADS [_mutex release]; @@ -194,20 +195,41 @@ - (void)setDelegate: (id )delegate { _delegate = delegate; } -- (void)addStreamForReading: (OFStream*)stream +- (void)addObjectForReading: (id )object { #ifdef OF_HAVE_THREADS [_mutex lock]; #endif @try { int qi = QUEUE_ADD | QUEUE_READ; - int fd = [stream fileDescriptorForReading]; + int fd = [object fileDescriptorForReading]; + + [_queue addObject: object]; + [_queueInfo addItem: &qi]; + [_queueFDs addItem: &fd]; + } @finally { +#ifdef OF_HAVE_THREADS + [_mutex unlock]; +#endif + } + + [self cancel]; +} + +- (void)addObjectForWriting: (id )object +{ +#ifdef OF_HAVE_THREADS + [_mutex lock]; +#endif + @try { + int qi = QUEUE_ADD | QUEUE_WRITE; + int fd = [object fileDescriptorForWriting]; - [_queue addObject: stream]; + [_queue addObject: object]; [_queueInfo addItem: &qi]; [_queueFDs addItem: &fd]; } @finally { #ifdef OF_HAVE_THREADS [_mutex unlock]; @@ -215,20 +237,20 @@ } [self cancel]; } -- (void)addStreamForWriting: (OFStream*)stream +- (void)removeObjectForReading: (id )object { #ifdef OF_HAVE_THREADS [_mutex lock]; #endif @try { - int qi = QUEUE_ADD | QUEUE_WRITE; - int fd = [stream fileDescriptorForWriting]; + int qi = QUEUE_REMOVE | QUEUE_READ; + int fd = [object fileDescriptorForReading]; - [_queue addObject: stream]; + [_queue addObject: object]; [_queueInfo addItem: &qi]; [_queueFDs addItem: &fd]; } @finally { #ifdef OF_HAVE_THREADS [_mutex unlock]; @@ -236,41 +258,20 @@ } [self cancel]; } -- (void)removeStreamForReading: (OFStream*)stream -{ -#ifdef OF_HAVE_THREADS - [_mutex lock]; -#endif - @try { - int qi = QUEUE_REMOVE | QUEUE_READ; - int fd = [stream fileDescriptorForReading]; - - [_queue addObject: stream]; - [_queueInfo addItem: &qi]; - [_queueFDs addItem: &fd]; - } @finally { -#ifdef OF_HAVE_THREADS - [_mutex unlock]; -#endif - } - - [self cancel]; -} - -- (void)removeStreamForWriting: (OFStream*)stream +- (void)removeObjectForWriting: (id )object { #ifdef OF_HAVE_THREADS [_mutex lock]; #endif @try { int qi = QUEUE_REMOVE | QUEUE_WRITE; - int fd = [stream fileDescriptorForWriting]; + int fd = [object fileDescriptorForWriting]; - [_queue addObject: stream]; + [_queue addObject: object]; [_queueInfo addItem: &qi]; [_queueFDs addItem: &fd]; } @finally { #ifdef OF_HAVE_THREADS [_mutex unlock]; @@ -304,58 +305,58 @@ { #ifdef OF_HAVE_THREADS [_mutex lock]; #endif @try { - OFStream **queueObjects = [_queue objects]; + id *queueObjects = [_queue objects]; int *queueInfoItems = [_queueInfo items]; int *queueFDsItems = [_queueFDs items]; size_t i, count = [_queue count]; for (i = 0; i < count; i++) { - OFStream *stream = queueObjects[i]; + id object = queueObjects[i]; int action = queueInfoItems[i]; int fd = queueFDsItems[i]; if ((action & QUEUE_ACTION) == QUEUE_ADD) { if (fd > _maxFD) { _maxFD = fd; - _FDToStream = [self - resizeMemory: _FDToStream - size: sizeof(OFStream*) + _FDToObject = [self + resizeMemory: _FDToObject + size: sizeof(id) count: _maxFD + 1]; } - _FDToStream[fd] = stream; + _FDToObject[fd] = object; } if ((action & QUEUE_ACTION) == QUEUE_REMOVE) { /* FIXME: Maybe downsize? */ - _FDToStream[fd] = nil; + _FDToObject[fd] = nil; } switch (action) { case QUEUE_ADD | QUEUE_READ: - [_readStreams addObject: stream]; + [_readObjects addObject: object]; [self OF_addFileDescriptorForReading: fd]; break; case QUEUE_ADD | QUEUE_WRITE: - [_writeStreams addObject: stream]; + [_writeObjects addObject: object]; [self OF_addFileDescriptorForWriting: fd]; break; case QUEUE_REMOVE | QUEUE_READ: - [_readStreams removeObjectIdenticalTo: stream]; + [_readObjects removeObjectIdenticalTo: object]; [self OF_removeFileDescriptorForReading: fd]; break; case QUEUE_REMOVE | QUEUE_WRITE: - [_writeStreams removeObjectIdenticalTo: stream]; + [_writeObjects removeObjectIdenticalTo: object]; [self OF_removeFileDescriptorForWriting: fd]; break; default: @@ -398,22 +399,23 @@ #endif } - (bool)OF_processCache { - OFStream **objects = [_readStreams objects]; - size_t i, count = [_readStreams count]; + id *objects = [_readObjects objects]; + size_t i, count = [_readObjects count]; bool foundInCache = false; for (i = 0; i < count; i++) { - if ([objects[i] numberOfBytesInReadBuffer] > 0 && + if ([objects[i] isKindOfClass: [OFStream class]] && + [objects[i] numberOfBytesInReadBuffer] > 0 && ![objects[i] OF_isWaitingForDelimiter]) { void *pool = objc_autoreleasePoolPush(); if ([_delegate respondsToSelector: - @selector(streamIsReadyForReading:)]) - [_delegate streamIsReadyForReading: objects[i]]; + @selector(objectsIsReadyForReading:)]) + [_delegate objectIsReadyForReading: objects[i]]; foundInCache = true; objc_autoreleasePoolPop(pool); } Index: src/OFKernelEventObserver_kqueue.m ================================================================== --- src/OFKernelEventObserver_kqueue.m +++ src/OFKernelEventObserver_kqueue.m @@ -150,32 +150,22 @@ realEvents++; pool = objc_autoreleasePoolPush(); - if (eventList[i].flags & EV_ERROR) { - if ([_delegate respondsToSelector: - @selector(streamDidReceiveException:)]) - [_delegate streamDidReceiveException: - _FDToStream[eventList[i].ident]]; - - objc_autoreleasePoolPop(pool); - continue; - } - switch (eventList[i].filter) { case EVFILT_READ: if ([_delegate respondsToSelector: - @selector(streamIsReadyForReading:)]) - [_delegate streamIsReadyForReading: - _FDToStream[eventList[i].ident]]; + @selector(objectIsReadyForReading:)]) + [_delegate objectIsReadyForReading: + _FDToObject[eventList[i].ident]]; break; case EVFILT_WRITE: if ([_delegate respondsToSelector: - @selector(streamIsReadyForWriting:)]) - [_delegate streamIsReadyForWriting: - _FDToStream[eventList[i].ident]]; + @selector(objectIsReadyForWriting:)]) + [_delegate objectIsReadyForWriting: + _FDToObject[eventList[i].ident]]; break; default: assert(0); } Index: src/OFKernelEventObserver_poll.m ================================================================== --- src/OFKernelEventObserver_poll.m +++ src/OFKernelEventObserver_poll.m @@ -80,11 +80,11 @@ break; } } if (!found) { - struct pollfd p = { fd, events | POLLERR, 0 }; + struct pollfd p = { fd, events, 0 }; [_FDs addItem: &p]; } } - (void)OF_removeFileDescriptor: (int)fd @@ -95,11 +95,11 @@ for (i = 0; i < nFDs; i++) { if (FDs[i].fd == fd) { FDs[i].events &= ~events; - if ((FDs[i].events & ~POLLERR) == 0) + if (FDs[i].events == 0) [_FDs removeItemAtIndex: i]; break; } } @@ -169,31 +169,22 @@ objc_autoreleasePoolPop(pool); continue; } if ([_delegate respondsToSelector: - @selector(streamIsReadyForReading:)]) - [_delegate streamIsReadyForReading: - _FDToStream[FDs[i].fd]]; + @selector(objectIsReadyForReading:)]) + [_delegate objectIsReadyForReading: + _FDToObject[FDs[i].fd]]; realEvents++; } if (FDs[i].revents & POLLOUT) { if ([_delegate respondsToSelector: - @selector(streamIsReadyForWriting:)]) - [_delegate streamIsReadyForWriting: - _FDToStream[FDs[i].fd]]; - - realEvents++; - } - - if (FDs[i].revents & POLLERR) { - if ([_delegate respondsToSelector: - @selector(streamDidReceiveException:)]) - [_delegate streamDidReceiveException: - _FDToStream[FDs[i].fd]]; + @selector(objectIsReadyForWriting:)]) + [_delegate objectIsReadyForWriting: + _FDToObject[FDs[i].fd]]; realEvents++; } FDs[i].revents = 0; Index: src/OFKernelEventObserver_select.h ================================================================== --- src/OFKernelEventObserver_select.h +++ src/OFKernelEventObserver_select.h @@ -27,8 +27,8 @@ #import "OFKernelEventObserver.h" @interface OFKernelEventObserver_select: OFKernelEventObserver { - fd_set _readFDs, _writeFDs, _exceptFDs; + fd_set _readFDs, _writeFDs; } @end Index: src/OFKernelEventObserver_select.m ================================================================== --- src/OFKernelEventObserver_select.m +++ src/OFKernelEventObserver_select.m @@ -24,11 +24,10 @@ #include #import "OFKernelEventObserver.h" #import "OFKernelEventObserver+Private.h" #import "OFKernelEventObserver_select.h" -#import "OFStream.h" #import "OFArray.h" #import "autorelease.h" #import "macros.h" #import "socket_helpers.h" @@ -47,42 +46,33 @@ } - (void)OF_addFileDescriptorForReading: (int)fd { FD_SET(fd, &_readFDs); - FD_SET(fd, &_exceptFDs); } - (void)OF_addFileDescriptorForWriting: (int)fd { FD_SET(fd, &_writeFDs); - FD_SET(fd, &_exceptFDs); } - (void)OF_removeFileDescriptorForReading: (int)fd { FD_CLR(fd, &_readFDs); - - if (!FD_ISSET(fd, &_writeFDs)) - FD_CLR(fd, &_exceptFDs); } - (void)OF_removeFileDescriptorForWriting: (int)fd { FD_CLR(fd, &_writeFDs); - - if (!FD_ISSET(fd, &_readFDs)) - FD_CLR(fd, &_exceptFDs); } - (bool)observeForTimeInterval: (of_time_interval_t)timeInterval { void *pool = objc_autoreleasePoolPush(); - OFStream **objects; + id *objects; fd_set readFDs; fd_set writeFDs; - fd_set exceptFDs; struct timeval timeout; size_t i, count, realEvents = 0; [self OF_processQueue]; @@ -94,15 +84,13 @@ objc_autoreleasePoolPop(pool); #ifdef FD_COPY FD_COPY(&_readFDs, &readFDs); FD_COPY(&_writeFDs, &writeFDs); - FD_COPY(&_exceptFDs, &exceptFDs); #else readFDs = _readFDs; writeFDs = _writeFDs; - exceptFDs = _exceptFDs; #endif /* * We cast to int before assigning to tv_usec in order to avoid a * warning with Apple GCC on PPC. POSIX defines this as suseconds_t, @@ -110,11 +98,11 @@ * satisfy the required range, we just cast to int. */ timeout.tv_sec = (time_t)timeInterval; timeout.tv_usec = (int)lrint((timeInterval - timeout.tv_sec) * 1000); - if (select((int)_maxFD + 1, &readFDs, &writeFDs, &exceptFDs, + if (select((int)_maxFD + 1, &readFDs, &writeFDs, NULL, (timeInterval != -1 ? &timeout : NULL)) < 1) return false; if (FD_ISSET(_cancelFD[0], &readFDs)) { char buffer; @@ -124,65 +112,41 @@ OF_ENSURE(recvfrom(_cancelFD[0], &buffer, 1, 0, NULL, NULL) > 0); #endif } - objects = [_readStreams objects]; - count = [_readStreams count]; + objects = [_readObjects objects]; + count = [_readObjects count]; for (i = 0; i < count; i++) { int fd = [objects[i] fileDescriptorForReading]; pool = objc_autoreleasePoolPush(); if (FD_ISSET(fd, &readFDs)) { if ([_delegate respondsToSelector: - @selector(streamIsReadyForReading:)]) - [_delegate streamIsReadyForReading: objects[i]]; - - realEvents++; - } - - if (FD_ISSET(fd, &exceptFDs)) { - if ([_delegate respondsToSelector: - @selector(streamDidReceiveException:)]) - [_delegate streamDidReceiveException: - objects[i]]; - - /* - * Prevent calling it twice in case the FD is in both - * sets. - */ - FD_CLR(fd, &exceptFDs); + @selector(objectIsReadyForReading:)]) + [_delegate objectIsReadyForReading: objects[i]]; realEvents++; } objc_autoreleasePoolPop(pool); } - objects = [_writeStreams objects]; - count = [_writeStreams count]; + objects = [_writeObjects objects]; + count = [_writeObjects count]; for (i = 0; i < count; i++) { int fd = [objects[i] fileDescriptorForWriting]; pool = objc_autoreleasePoolPush(); if (FD_ISSET(fd, &writeFDs)) { if ([_delegate respondsToSelector: - @selector(streamIsReadyForWriting:)]) - [_delegate streamIsReadyForWriting: objects[i]]; - - realEvents++; - } - - if (FD_ISSET(fd, &exceptFDs)) { - if ([_delegate respondsToSelector: - @selector(streamDidReceiveException:)]) - [_delegate streamDidReceiveException: - objects[i]]; + @selector(objectIsReadyForWriting:)]) + [_delegate objectIsReadyForWriting: objects[i]]; realEvents++; } objc_autoreleasePoolPop(pool); Index: src/OFRunLoop.m ================================================================== --- src/OFRunLoop.m +++ src/OFRunLoop.m @@ -176,11 +176,11 @@ forKey: stream]; \ } \ \ if ([queue count] == 0) \ [runLoop->_kernelEventObserver \ - addStreamForReading: stream]; \ + addObjectForReading: stream]; \ \ queueItem = [[[type alloc] init] autorelease]; \ code \ [queue appendObject: queueItem]; \ \ @@ -288,11 +288,11 @@ OFList *queue; if ((queue = [runLoop->_readQueues objectForKey: stream]) != nil) { assert([queue count] > 0); - [runLoop->_kernelEventObserver removeStreamForReading: stream]; + [runLoop->_kernelEventObserver removeObjectForReading: stream]; [runLoop->_readQueues removeObjectForKey: stream]; } objc_autoreleasePoolPop(pool); } @@ -385,13 +385,13 @@ } #endif } #ifdef OF_HAVE_SOCKETS -- (void)streamIsReadyForReading: (OFStream*)stream +- (void)objectIsReadyForReading: (id)object { - OFList *queue = [_readQueues objectForKey: stream]; + OFList *queue = [_readQueues objectForKey: object]; of_list_object_t *listObject; assert(queue != nil); listObject = [queue firstListObject]; @@ -401,28 +401,28 @@ OFRunLoop_ReadQueueItem *queueItem = listObject->object; size_t length; OFException *exception = nil; @try { - length = [stream readIntoBuffer: queueItem->_buffer + length = [object readIntoBuffer: queueItem->_buffer length: queueItem->_length]; } @catch (OFException *e) { length = 0; exception = e; } # ifdef OF_HAVE_BLOCKS if (queueItem->_block != NULL) { - if (!queueItem->_block(stream, queueItem->_buffer, + if (!queueItem->_block(object, queueItem->_buffer, length, exception)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [_kernelEventObserver - removeStreamForReading: stream]; + removeObjectForReading: object]; [_readQueues - removeObjectForKey: stream]; + removeObjectForKey: object]; } } } else { # endif bool (*func)(id, SEL, OFStream*, void*, size_t, @@ -430,18 +430,18 @@ size_t, OFException*)) [queueItem->_target methodForSelector: queueItem->_selector]; if (!func(queueItem->_target, queueItem->_selector, - stream, queueItem->_buffer, length, exception)) { + object, queueItem->_buffer, length, exception)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [_kernelEventObserver - removeStreamForReading: stream]; + removeObjectForReading: object]; [_readQueues - removeObjectForKey: stream]; + removeObjectForKey: object]; } } # ifdef OF_HAVE_BLOCKS } # endif @@ -450,11 +450,11 @@ OFRunLoop_ExactReadQueueItem *queueItem = listObject->object; size_t length; OFException *exception = nil; @try { - length = [stream + length = [object readIntoBuffer: (char*)queueItem->_buffer + queueItem->_readLength length: queueItem->_exactLength - queueItem->_readLength]; } @catch (OFException *e) { @@ -462,26 +462,26 @@ exception = e; } queueItem->_readLength += length; if (queueItem->_readLength == queueItem->_exactLength || - [stream isAtEndOfStream] || exception != nil) { + [object isAtEndOfStream] || exception != nil) { # ifdef OF_HAVE_BLOCKS if (queueItem->_block != NULL) { - if (queueItem->_block(stream, + if (queueItem->_block(object, queueItem->_buffer, queueItem->_readLength, exception)) queueItem->_readLength = 0; else { [queue removeListObject: listObject]; if ([queue count] == 0) { [_kernelEventObserver - removeStreamForReading: - stream]; + removeObjectForReading: + object]; [_readQueues - removeObjectForKey: stream]; + removeObjectForKey: object]; } } } else { # endif bool (*func)(id, SEL, OFStream*, void*, @@ -489,23 +489,23 @@ OFStream*, void*, size_t, OFException*)) [queueItem->_target methodForSelector: queueItem->_selector]; if (func(queueItem->_target, - queueItem->_selector, stream, + queueItem->_selector, object, queueItem->_buffer, queueItem->_readLength, exception)) queueItem->_readLength = 0; else { [queue removeListObject: listObject]; if ([queue count] == 0) { [_kernelEventObserver - removeStreamForReading: - stream]; + removeObjectForReading: + object]; [_readQueues - removeObjectForKey: stream]; + removeObjectForKey: object]; } } # ifdef OF_HAVE_BLOCKS } # endif @@ -515,31 +515,31 @@ OFRunLoop_ReadLineQueueItem *queueItem = listObject->object; OFString *line; OFException *exception = nil; @try { - line = [stream + line = [object tryReadLineWithEncoding: queueItem->_encoding]; } @catch (OFException *e) { line = nil; exception = e; } - if (line != nil || [stream isAtEndOfStream] || + if (line != nil || [object isAtEndOfStream] || exception != nil) { # ifdef OF_HAVE_BLOCKS if (queueItem->_block != NULL) { - if (!queueItem->_block(stream, line, + if (!queueItem->_block(object, line, exception)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [_kernelEventObserver - removeStreamForReading: - stream]; + removeObjectForReading: + object]; [_readQueues - removeObjectForKey: stream]; + removeObjectForKey: object]; } } } else { # endif bool (*func)(id, SEL, OFStream*, OFString*, @@ -547,20 +547,20 @@ OFString*, OFException*)) [queueItem->_target methodForSelector: queueItem->_selector]; if (!func(queueItem->_target, - queueItem->_selector, stream, line, + queueItem->_selector, object, line, exception)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [_kernelEventObserver - removeStreamForReading: - stream]; + removeObjectForReading: + object]; [_readQueues - removeObjectForKey: stream]; + removeObjectForKey: object]; } } # ifdef OF_HAVE_BLOCKS } # endif @@ -570,27 +570,26 @@ OFRunLoop_AcceptQueueItem *queueItem = listObject->object; OFTCPSocket *newSocket; OFException *exception = nil; @try { - newSocket = [(OFTCPSocket*)stream accept]; + newSocket = [object accept]; } @catch (OFException *e) { newSocket = nil; exception = e; } # ifdef OF_HAVE_BLOCKS if (queueItem->_block != NULL) { - if (!queueItem->_block((OFTCPSocket*)stream, - newSocket, exception)) { + if (!queueItem->_block(object, newSocket, exception)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [_kernelEventObserver - removeStreamForReading: stream]; + removeObjectForReading: object]; [_readQueues - removeObjectForKey: stream]; + removeObjectForKey: object]; } } } else { # endif bool (*func)(id, SEL, OFTCPSocket*, OFTCPSocket*, @@ -599,18 +598,18 @@ OFException*)) [queueItem->_target methodForSelector: queueItem->_selector]; if (!func(queueItem->_target, queueItem->_selector, - (OFTCPSocket*)stream, newSocket, exception)) { + object, newSocket, exception)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [_kernelEventObserver - removeStreamForReading: stream]; + removeObjectForReading: object]; [_readQueues - removeObjectForKey: stream]; + removeObjectForKey: object]; } } # ifdef OF_HAVE_BLOCKS } # endif @@ -671,11 +670,11 @@ } @finally { [_timersQueueLock unlock]; } #endif - /* Watch for stream events until the next timer is due */ + /* Watch for I/O events until the next timer is due */ if (nextTimer != nil) { of_time_interval_t timeout = [nextTimer timeIntervalSinceNow]; if (timeout > 0) { @@ -690,11 +689,11 @@ [OFThread sleepForTimeInterval: timeout]; #endif } } else { /* - * No more timers: Just watch for streams until we get + * No more timers: Just watch for I/O until we get * an event. If a timer is added by another thread, it * cancels the observe. */ #if defined(OF_HAVE_SOCKETS) [_kernelEventObserver observe]; Index: src/OFStream.h ================================================================== --- src/OFStream.h +++ src/OFStream.h @@ -23,10 +23,11 @@ #include #import "OFObject.h" #import "OFString.h" +#import "OFKernelEventObserver.h" /*! @file */ @class OFStream; @class OFDataArray; @@ -73,11 +74,12 @@ * the methods that do the actual work. OFStream uses those for all other * methods and does all the caching and other stuff for you. If you * override these methods without the lowlevel prefix, you *will* break * caching and get broken results! */ -@interface OFStream: OFObject +@interface OFStream: OFObject { char *_readBuffer, *_writeBuffer; size_t _readBufferLength, _writeBufferLength; bool _writeBufferEnabled, _blocking, _waitingForDelimiter; } @@ -138,10 +140,13 @@ * On network streams, this might read less than the specified number of bytes. * If you want to read exactly the specified number of bytes, use * @ref asyncReadIntoBuffer:exactLength:block:. Note that a read can even * return 0 bytes - this does not necessarily mean that the stream ended, so * you still need to check @ref isAtEndOfStream. + * + * @note The stream must implement @ref fileDescriptorForReading and return a + * valid file descriptor in order for this to work! * * @param buffer The buffer into which the data is read. * The buffer must not be free'd before the async read completed! * @param length The length of the data that should be read at most. * The buffer *must* be *at least* this big! @@ -166,10 +171,13 @@ * * Unlike @ref asyncReadIntoBuffer:length:target:selector:, this method does * not call the method when less than the specified length has been read - * instead, it waits until it got exactly the specified length, the stream has * ended or an exception occurred. + * + * @note The stream must implement @ref fileDescriptorForReading and return a + * valid file descriptor in order for this to work! * * @param buffer The buffer into which the data is read * @param length The length of the data that should be read. * The buffer *must* be *at least* this big! * @param target The target on which the selector should be called when the @@ -195,10 +203,13 @@ * On network streams, this might read less than the specified number of bytes. * If you want to read exactly the specified number of bytes, use * @ref asyncReadIntoBuffer:exactLength:block:. Note that a read can even * return 0 bytes - this does not necessarily mean that the stream ended, so * you still need to check @ref isAtEndOfStream. + * + * @note The stream must implement @ref fileDescriptorForReading and return a + * valid file descriptor in order for this to work! * * @param buffer The buffer into which the data is read. * The buffer must not be free'd before the async read completed! * @param length The length of the data that should be read at most. * The buffer *must* be *at least* this big! @@ -218,10 +229,13 @@ * * Unlike @ref asyncReadIntoBuffer:length:block:, this method does not invoke * the block when less than the specified length has been read - instead, it * waits until it got exactly the specified length, the stream has ended or an * exception occurred. + * + * @note The stream must implement @ref fileDescriptorForReading and return a + * valid file descriptor in order for this to work! * * @param buffer The buffer into which the data is read * @param length The length of the data that should be read. * The buffer *must* be *at least* this big! * @param block The block to call when the data has been received. @@ -584,10 +598,13 @@ #ifdef OF_HAVE_SOCKETS /*! * @brief Asyncronously reads until a newline, \\0, end of stream or an * exception occurs. + * + * @note The stream must implement @ref fileDescriptorForReading and return a + * valid file descriptor in order for this to work! * * @param target The target on which to call the selector when the data has * been received. If the method returns true, it will be called * again when the next line has been received. If you want the * next method in the queue to handle the next line, you need to @@ -600,10 +617,13 @@ selector: (SEL)selector; /*! * @brief Asyncronously reads with the specified encoding until a newline, \\0, * end of stream or an exception occurs. + * + * @note The stream must implement @ref fileDescriptorForReading and return a + * valid file descriptor in order for this to work! * * @param encoding The encoding used by the stream * @param target The target on which to call the selector when the data has * been received. If the method returns true, it will be called * again when the next line has been received. If you want the @@ -619,10 +639,13 @@ # ifdef OF_HAVE_BLOCKS /*! * @brief Asyncronously reads until a newline, \\0, end of stream or an * exception occurs. + * + * @note The stream must implement @ref fileDescriptorForReading and return a + * valid file descriptor in order for this to work! * * @param block The block to call when the data has been received. * If the block returns true, it will be called again when the next * line has been received. If you want the next block in the queue * to handle the next line, you need to return false from the @@ -631,10 +654,13 @@ - (void)asyncReadLineWithBlock: (of_stream_async_read_line_block_t)block; /*! * @brief Asyncronously reads with the specified encoding until a newline, \\0, * end of stream or an exception occurs. + * + * @note The stream must implement @ref fileDescriptorForReading and return a + * valid file descriptor in order for this to work! * * @param encoding The encoding used by the stream * @param block The block to call when the data has been received. * If the block returns true, it will be called again when the next * line has been received. If you want the next block in the queue