Index: src/OFKernelEventObserver+Private.h ================================================================== --- src/OFKernelEventObserver+Private.h +++ src/OFKernelEventObserver+Private.h @@ -17,9 +17,9 @@ #import "OFKernelEventObserver.h" OF_ASSUME_NONNULL_BEGIN @interface OFKernelEventObserver (OF_PRIVATE_CATEGORY) -- (void)OF_processReadBuffers; +- (bool)OF_processReadBuffers; @end OF_ASSUME_NONNULL_END Index: src/OFKernelEventObserver.m ================================================================== --- src/OFKernelEventObserver.m +++ src/OFKernelEventObserver.m @@ -222,24 +222,34 @@ (struct sockaddr*)&_cancelAddr, 8) > 0); # endif #endif } -- (void)OF_processReadBuffers +- (bool)OF_processReadBuffers { id const *objects = [_readObjects objects]; size_t i, count = [_readObjects count]; + bool foundInReadBuffer = false; for (i = 0; i < count; i++) { void *pool = objc_autoreleasePoolPush(); if ([objects[i] isKindOfClass: [OFStream class]] && [objects[i] hasDataInReadBuffer] && - ![objects[i] OF_isWaitingForDelimiter] && - [_delegate respondsToSelector: - @selector(objectIsReadyForReading:)]) - [_delegate objectIsReadyForReading: objects[i]]; + ![objects[i] OF_isWaitingForDelimiter]) { + if ([_delegate respondsToSelector: + @selector(objectIsReadyForReading:)]) + [_delegate objectIsReadyForReading: objects[i]]; + + foundInReadBuffer = true; + } objc_autoreleasePoolPop(pool); } + + /* + * As long as we have data in the read buffer for any stream, we don't + * want to block. + */ + return foundInReadBuffer; } @end Index: src/OFKernelEventObserver_LockedQueue.m ================================================================== --- src/OFKernelEventObserver_LockedQueue.m +++ src/OFKernelEventObserver_LockedQueue.m @@ -168,10 +168,12 @@ OF_UNRECOGNIZED_SELECTOR } - (void)OF_processQueue { + void *pool = objc_autoreleasePoolPush(); + #ifdef OF_HAVE_THREADS [_mutex lock]; @try { #endif int *queueActions = [_queueActions items]; @@ -227,7 +229,9 @@ #ifdef OF_HAVE_THREADS } @finally { [_mutex unlock]; } #endif + + objc_autoreleasePoolPop(pool); } @end Index: src/OFKernelEventObserver_epoll.m ================================================================== --- src/OFKernelEventObserver_epoll.m +++ src/OFKernelEventObserver_epoll.m @@ -207,17 +207,15 @@ } - (void)observeForTimeInterval: (of_time_interval_t)timeInterval { OFNull *nullObject = [OFNull null]; - void *pool = objc_autoreleasePoolPush(); struct epoll_event eventList[EVENTLIST_SIZE]; int i, events; - [self OF_processReadBuffers]; - - objc_autoreleasePoolPop(pool); + if ([self OF_processReadBuffers]) + return; events = epoll_wait(_epfd, eventList, EVENTLIST_SIZE, (timeInterval != -1 ? timeInterval * 1000 : -1)); if (events < 0) Index: src/OFKernelEventObserver_kqueue.m ================================================================== --- src/OFKernelEventObserver_kqueue.m +++ src/OFKernelEventObserver_kqueue.m @@ -203,30 +203,30 @@ #endif } - (void)observeForTimeInterval: (of_time_interval_t)timeInterval { - void *pool = objc_autoreleasePoolPush(); struct timespec timeout; struct kevent eventList[EVENTLIST_SIZE]; int i, events; + + if ([self OF_processReadBuffers]) + return; timeout.tv_sec = (time_t)timeInterval; timeout.tv_nsec = lrint((timeInterval - timeout.tv_sec) * 1000000000); - [self OF_processReadBuffers]; - - objc_autoreleasePoolPop(pool); - events = kevent(_kernelQueue, NULL, 0, eventList, EVENTLIST_SIZE, (timeInterval != -1 ? &timeout : NULL)); if (events < 0) @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; for (i = 0; i < events; i++) { + void *pool; + if (eventList[i].flags & EV_ERROR) @throw [OFObserveFailedException exceptionWithObserver: self errNo: (int)eventList[i].data]; Index: src/OFKernelEventObserver_poll.m ================================================================== --- src/OFKernelEventObserver_poll.m +++ src/OFKernelEventObserver_poll.m @@ -157,19 +157,18 @@ events: POLLOUT]; } - (void)observeForTimeInterval: (of_time_interval_t)timeInterval { - void *pool = objc_autoreleasePoolPush(); struct pollfd *FDs; int events; size_t i, nFDs; [self OF_processQueue]; - [self OF_processReadBuffers]; - objc_autoreleasePoolPop(pool); + if ([self OF_processReadBuffers]) + return; FDs = [_FDs items]; nFDs = [_FDs count]; #ifdef OPEN_MAX @@ -186,10 +185,12 @@ for (i = 0; i < nFDs; i++) { assert(FDs[i].fd <= _maxFD); if (FDs[i].revents & POLLIN) { + void *pool; + if (FDs[i].fd == _cancelFD[0]) { char buffer; #ifdef OF_HAVE_PIPE OF_ENSURE(read(_cancelFD[0], &buffer, 1) == 1); @@ -211,11 +212,11 @@ objc_autoreleasePoolPop(pool); } if (FDs[i].revents & POLLOUT) { - pool = objc_autoreleasePoolPush(); + void *pool = objc_autoreleasePoolPush(); if ([_delegate respondsToSelector: @selector(objectIsReadyForWriting:)]) [_delegate objectIsReadyForWriting: _FDToObject[FDs[i].fd]]; Index: src/OFKernelEventObserver_select.m ================================================================== --- src/OFKernelEventObserver_select.m +++ src/OFKernelEventObserver_select.m @@ -128,22 +128,21 @@ FD_CLR(fd, &_writeFDs); } - (void)observeForTimeInterval: (of_time_interval_t)timeInterval { - void *pool = objc_autoreleasePoolPush(); id const *objects; fd_set readFDs; fd_set writeFDs; struct timeval timeout; int events; size_t i, count; [self OF_processQueue]; - [self OF_processReadBuffers]; - objc_autoreleasePoolPop(pool); + if ([self OF_processReadBuffers]) + return; #ifdef FD_COPY FD_COPY(&_readFDs, &readFDs); FD_COPY(&_writeFDs, &writeFDs); #else @@ -184,14 +183,12 @@ objects = [_readObjects objects]; count = [_readObjects count]; for (i = 0; i < count; i++) { - int fd; - - pool = objc_autoreleasePoolPush(); - fd = [objects[i] fileDescriptorForReading]; + void *pool = objc_autoreleasePoolPush(); + int fd = [objects[i] fileDescriptorForReading]; if (FD_ISSET(fd, &readFDs) && [_delegate respondsToSelector: @selector(objectIsReadyForReading:)]) [_delegate objectIsReadyForReading: objects[i]]; @@ -200,18 +197,16 @@ objects = [_writeObjects objects]; count = [_writeObjects count]; for (i = 0; i < count; i++) { - int fd; - - pool = objc_autoreleasePoolPush(); - fd = [objects[i] fileDescriptorForWriting]; + void *pool = objc_autoreleasePoolPush(); + int fd = [objects[i] fileDescriptorForWriting]; if (FD_ISSET(fd, &writeFDs) && [_delegate respondsToSelector: @selector(objectIsReadyForWriting:)]) [_delegate objectIsReadyForWriting: objects[i]]; objc_autoreleasePoolPop(pool); } } @end