Index: src/OFKernelEventObserver+Private.h ================================================================== --- src/OFKernelEventObserver+Private.h +++ src/OFKernelEventObserver+Private.h @@ -20,7 +20,7 @@ - (void)OF_addObjectForReading: (id)object; - (void)OF_addObjectForWriting: (id)object; - (void)OF_removeObjectForReading: (id)object; - (void)OF_removeObjectForWriting: (id)object; - (void)OF_processQueueAndStoreRemovedIn: (OFMutableArray*)removed; -- (bool)OF_processReadBuffers; +- (void)OF_processReadBuffers; @end Index: src/OFKernelEventObserver.h ================================================================== --- src/OFKernelEventObserver.h +++ src/OFKernelEventObserver.h @@ -200,22 +200,20 @@ /*! * @brief Observes all objects until an event happens on an object or the * timeout is reached. * * @param timeInterval The time to wait for an event, in seconds - * @return A boolean whether events occurred before returning */ -- (bool)observeForTimeInterval: (of_time_interval_t)timeInterval; +- (void)observeForTimeInterval: (of_time_interval_t)timeInterval; /*! * @brief Observes all objects until an event happens on an object or the * specified date is reached. * * @param date The until which to observe - * @return A boolean whether events occurred before returning */ -- (bool)observeUntilDate: (OFDate*)date; +- (void)observeUntilDate: (OFDate*)date; /*! * @brief Cancels the currently blocking observe call. * * This is automatically done when a new object is added or removed by another Index: src/OFKernelEventObserver.m ================================================================== --- src/OFKernelEventObserver.m +++ src/OFKernelEventObserver.m @@ -338,18 +338,18 @@ - (void)observe { [self observeForTimeInterval: -1]; } -- (bool)observeForTimeInterval: (of_time_interval_t)timeInterval +- (void)observeForTimeInterval: (of_time_interval_t)timeInterval { OF_UNRECOGNIZED_SELECTOR } -- (bool)observeUntilDate: (OFDate*)date +- (void)observeUntilDate: (OFDate*)date { - return [self observeForTimeInterval: [date timeIntervalSinceNow]]; + [self observeForTimeInterval: [date timeIntervalSinceNow]]; } - (void)cancel { #ifdef OF_HAVE_PIPE @@ -358,37 +358,24 @@ OF_ENSURE(sendto(_cancelFD[1], "", 1, 0, (struct sockaddr*)&_cancelAddr, sizeof(_cancelAddr)) > 0); #endif } -- (bool)OF_processReadBuffers +- (void)OF_processReadBuffers { id const *objects = [_readObjects objects]; size_t i, count = [_readObjects count]; - bool foundInReadBuffer = false; for (i = 0; i < count; i++) { void *pool = objc_autoreleasePoolPush(); if ([objects[i] isKindOfClass: [OFStream class]] && [objects[i] hasDataInReadBuffer] && - ![objects[i] OF_isWaitingForDelimiter]) { - if ([_delegate respondsToSelector: - @selector(objectIsReadyForReading:)]) - [_delegate objectIsReadyForReading: objects[i]]; - - foundInReadBuffer = true; - } + ![objects[i] OF_isWaitingForDelimiter] && + [_delegate respondsToSelector: + @selector(objectIsReadyForReading:)]) + [_delegate objectIsReadyForReading: objects[i]]; objc_autoreleasePoolPop(pool); } - - /* - * As long as we have data in the read buffer for any stream, we don't - * want to block. - */ - if (foundInReadBuffer) - return true; - - return false; } @end Index: src/OFKernelEventObserver_epoll.m ================================================================== --- src/OFKernelEventObserver_epoll.m +++ src/OFKernelEventObserver_epoll.m @@ -182,23 +182,19 @@ [self OF_removeObject: object fileDescriptor: [object fileDescriptorForWriting] events: EPOLLOUT]; } -- (bool)observeForTimeInterval: (of_time_interval_t)timeInterval +- (void)observeForTimeInterval: (of_time_interval_t)timeInterval { OFNull *nullObject = [OFNull null]; void *pool = objc_autoreleasePoolPush(); struct epoll_event eventList[EVENTLIST_SIZE]; - int i, events, realEvents = 0; + int i, events; [self OF_processQueueAndStoreRemovedIn: nil]; - - if ([self OF_processReadBuffers]) { - objc_autoreleasePoolPop(pool); - return true; - } + [self OF_processReadBuffers]; objc_autoreleasePoolPop(pool); events = epoll_wait(_epfd, eventList, EVENTLIST_SIZE, (timeInterval != -1 ? timeInterval * 1000 : -1)); @@ -205,13 +201,10 @@ if (events < 0) return [OFObserveFailedException exceptionWithObserver: self errNo: errno]; - if (events == 0) - return false; - for (i = 0; i < events; i++) { if (eventList[i].data.ptr == nullObject) { char buffer; assert(eventList[i].events == EPOLLIN); @@ -226,12 +219,10 @@ if ([_delegate respondsToSelector: @selector(objectIsReadyForReading:)]) [_delegate objectIsReadyForReading: eventList[i].data.ptr]; - realEvents++; - objc_autoreleasePoolPop(pool); } if (eventList[i].events & EPOLLOUT) { pool = objc_autoreleasePoolPush(); @@ -239,19 +230,12 @@ if ([_delegate respondsToSelector: @selector(objectIsReadyForWriting:)]) [_delegate objectIsReadyForWriting: eventList[i].data.ptr]; - realEvents++; - objc_autoreleasePoolPop(pool); } assert((eventList[i].events & ~(EPOLLIN | EPOLLOUT)) == 0); } - - if (realEvents == 0) - return false; - - return true; } @end Index: src/OFKernelEventObserver_kqueue.m ================================================================== --- src/OFKernelEventObserver_kqueue.m +++ src/OFKernelEventObserver_kqueue.m @@ -146,16 +146,16 @@ event.filter = EVFILT_WRITE; event.flags = EV_DELETE; [_changeList addItem: &event]; } -- (bool)observeForTimeInterval: (of_time_interval_t)timeInterval +- (void)observeForTimeInterval: (of_time_interval_t)timeInterval { void *pool = objc_autoreleasePoolPush(); struct timespec timeout; struct kevent eventList[EVENTLIST_SIZE]; - int i, events, realEvents = 0; + int i, events; timeout.tv_sec = (time_t)timeInterval; timeout.tv_nsec = lrint((timeInterval - timeout.tv_sec) * 1000000000); /* @@ -162,31 +162,25 @@ * Make sure to keep the streams retained and thus the file descriptors * valid until the actual change has been performed. */ [self OF_processQueueAndStoreRemovedIn: _removedArray]; - if ([self OF_processReadBuffers]) { - objc_autoreleasePoolPop(pool); - return true; - } + [self OF_processReadBuffers]; objc_autoreleasePoolPop(pool); events = kevent(_kernelQueue, [_changeList items], (int)[_changeList count], eventList, EVENTLIST_SIZE, (timeInterval != -1 ? &timeout : NULL)); if (events < 0) - return [OFObserveFailedException exceptionWithObserver: self + @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; [_changeList removeAllItems]; [_removedArray removeAllObjects]; - if (events == 0) - return false; - for (i = 0; i < events; i++) { if (eventList[i].flags & EV_ERROR) @throw [OFObserveFailedException exceptionWithObserver: self errNo: (int)eventList[i].data]; @@ -218,15 +212,8 @@ default: assert(0); } objc_autoreleasePoolPop(pool); - - realEvents++; - } - - if (realEvents == 0) - return false; - - return true; + } } @end Index: src/OFKernelEventObserver_poll.m ================================================================== --- src/OFKernelEventObserver_poll.m +++ src/OFKernelEventObserver_poll.m @@ -150,23 +150,19 @@ [self OF_removeObject: object fileDescriptor: [object fileDescriptorForWriting] events: POLLOUT]; } -- (bool)observeForTimeInterval: (of_time_interval_t)timeInterval +- (void)observeForTimeInterval: (of_time_interval_t)timeInterval { void *pool = objc_autoreleasePoolPush(); struct pollfd *FDs; int events; - size_t i, nFDs, realEvents = 0; + size_t i, nFDs; [self OF_processQueueAndStoreRemovedIn: nil]; - - if ([self OF_processReadBuffers]) { - objc_autoreleasePoolPop(pool); - return true; - } + [self OF_processReadBuffers]; objc_autoreleasePoolPop(pool); FDs = [_FDs items]; nFDs = [_FDs count]; @@ -181,13 +177,10 @@ if (events < 0) @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; - if (events == 0) - return false; - for (i = 0; i < nFDs; i++) { if (FDs[i].fd > _maxFD) @throw [OFOutOfRangeException exception]; if (FDs[i].revents & POLLIN) { @@ -206,12 +199,10 @@ @selector(objectIsReadyForReading:)]) [_delegate objectIsReadyForReading: _FDToObject[FDs[i].fd]]; objc_autoreleasePoolPop(pool); - - realEvents++; } if (FDs[i].revents & POLLOUT) { pool = objc_autoreleasePoolPush(); @@ -219,18 +210,11 @@ @selector(objectIsReadyForWriting:)]) [_delegate objectIsReadyForWriting: _FDToObject[FDs[i].fd]]; objc_autoreleasePoolPop(pool); - - realEvents++; } FDs[i].revents = 0; } - - if (realEvents == 0) - return false; - - return true; } @end Index: src/OFKernelEventObserver_select.m ================================================================== --- src/OFKernelEventObserver_select.m +++ src/OFKernelEventObserver_select.m @@ -117,26 +117,22 @@ @throw [OFOutOfRangeException exception]; FD_CLR(fd, &_writeFDs); } -- (bool)observeForTimeInterval: (of_time_interval_t)timeInterval +- (void)observeForTimeInterval: (of_time_interval_t)timeInterval { void *pool = objc_autoreleasePoolPush(); id const *objects; fd_set readFDs; fd_set writeFDs; struct timeval timeout; int events; - size_t i, count, realEvents = 0; + size_t i, count; [self OF_processQueueAndStoreRemovedIn: nil]; - - if ([self OF_processReadBuffers]) { - objc_autoreleasePoolPop(pool); - return true; - } + [self OF_processReadBuffers]; objc_autoreleasePoolPop(pool); #ifdef FD_COPY FD_COPY(&_readFDs, &readFDs); @@ -164,13 +160,10 @@ if (events < 0) @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; - if (events == 0) - return false; - if (FD_ISSET(_cancelFD[0], &readFDs)) { char buffer; #ifndef _WIN32 OF_ENSURE(read(_cancelFD[0], &buffer, 1) == 1); @@ -186,17 +179,13 @@ for (i = 0; i < count; i++) { int fd = [objects[i] fileDescriptorForReading]; pool = objc_autoreleasePoolPush(); - if (FD_ISSET(fd, &readFDs)) { - if ([_delegate respondsToSelector: - @selector(objectIsReadyForReading:)]) - [_delegate objectIsReadyForReading: objects[i]]; - - realEvents++; - } + if (FD_ISSET(fd, &readFDs) && [_delegate respondsToSelector: + @selector(objectIsReadyForReading:)]) + [_delegate objectIsReadyForReading: objects[i]]; objc_autoreleasePoolPop(pool); } objects = [_writeObjects objects]; @@ -205,22 +194,13 @@ for (i = 0; i < count; i++) { int fd = [objects[i] fileDescriptorForWriting]; pool = objc_autoreleasePoolPush(); - if (FD_ISSET(fd, &writeFDs)) { - if ([_delegate respondsToSelector: - @selector(objectIsReadyForWriting:)]) - [_delegate objectIsReadyForWriting: objects[i]]; - - realEvents++; - } + if (FD_ISSET(fd, &writeFDs) && [_delegate respondsToSelector: + @selector(objectIsReadyForWriting:)]) + [_delegate objectIsReadyForWriting: objects[i]]; objc_autoreleasePoolPop(pool); } - - if (realEvents == 0) - return false; - - return true; } @end Index: tests/OFKernelEventObserverTests.m ================================================================== --- tests/OFKernelEventObserverTests.m +++ tests/OFKernelEventObserverTests.m @@ -16,10 +16,11 @@ #include "config.h" #import "OFKernelEventObserver.h" #import "OFString.h" +#import "OFDate.h" #import "OFTCPSocket.h" #import "OFAutoreleasePool.h" #if defined(HAVE_SYS_SELECT_H) || defined(_WIN32) # import "OFKernelEventObserver_select.h" @@ -34,98 +35,181 @@ # import "OFKernelEventObserver_kqueue.h" #endif #import "TestsAppDelegate.h" +#define EXPECTED_EVENTS 3 + static OFString *module; -static OFKernelEventObserver *observer; -static int events; -static id expectedObject; -static bool readData, expectEOS; -static OFTCPSocket *accepted; - -@interface ObserverDelegate: OFObject -- (void)objectIsReadyForReading: (id)object; + +@interface ObserverTest: OFObject +{ +@public + TestsAppDelegate *_testsAppDelegate; + OFKernelEventObserver *_observer; + OFTCPSocket *_server, *_client, *_accepted; + size_t _events; +} + +- (void)run; @end -@implementation ObserverDelegate +@implementation ObserverTest +- initWithTestsAppDelegate: (TestsAppDelegate*)testsAppDelegate +{ + self = [super init]; + + @try { + uint16_t port; + + _testsAppDelegate = testsAppDelegate; + + _server = [[OFTCPSocket alloc] init]; + port = [_server bindToHost: @"127.0.0.1" + port: 0]; + [_server listen]; + + _client = [[OFTCPSocket alloc] init]; + [_client connectToHost: @"127.0.0.1" + port: port]; + + [_client writeBuffer: "0" + length: 1]; + } @catch (id e) { + [self release]; + @throw e; + } + + return self; +} + +- (void)dealloc +{ + [_server release]; + [_client release]; + [_accepted release]; + + [super dealloc]; +} + +- (void)run +{ + OFDate *deadline; + bool deadlineExceeded = false; + + [_testsAppDelegate outputTesting: @"-[observe] with listening socket" + inModule: module]; + + deadline = [OFDate dateWithTimeIntervalSinceNow: 1]; + while (_events < EXPECTED_EVENTS) { + if ([deadline timeIntervalSinceNow] < 0) { + deadlineExceeded = true; + break; + } + + [_observer observeForTimeInterval: 0.01]; + } + + if (!deadlineExceeded) + [_testsAppDelegate + outputSuccess: @"-[observe] not exceeding deadline" + inModule: module]; + else + [_testsAppDelegate + outputFailure: @"-[observe] not exceeding deadline" + inModule: module]; + + if (_events == EXPECTED_EVENTS) + [_testsAppDelegate + outputSuccess: @"-[observe] handling all events" + inModule: module]; + else + [_testsAppDelegate + outputFailure: @"-[observe] handling all events" + inModule: module]; +} + - (void)objectIsReadyForReading: (id)object { - events++; - - OF_ENSURE(object == expectedObject); - - if ([object isListening]) { - accepted = [[object accept] retain]; - - [accepted writeBuffer: "0" - length: 1]; - } else if (readData) { - char buf; - - if (expectEOS) - OF_ENSURE([object readIntoBuffer: &buf - length: 1] == 0); - else { - OF_ENSURE([object readIntoBuffer: &buf - length: 1] == 1); - OF_ENSURE(buf == '0'); - } + char buf; + + switch (_events++) { + case 0: + if (object == _server) + [_testsAppDelegate + outputSuccess: @"-[observe] with listening socket" + inModule: module]; + else + [_testsAppDelegate + outputFailure: @"-[observe] with listening socket" + inModule: module]; + + _accepted = [[object accept] retain]; + [_observer addObjectForReading: _accepted]; + + [_testsAppDelegate + outputTesting: @"-[observe] with data to read available" + inModule: module]; + + break; + case 1: + if (object == _accepted && + [object readIntoBuffer: &buf + length: 1] == 1 && buf == '0') + [_testsAppDelegate + outputSuccess: @"-[observe] with data to read " + @"available" + inModule: module]; + else + [_testsAppDelegate + outputFailure: @"-[observe] with data to read " + @"available" + inModule: module]; + + [_client close]; + + [_testsAppDelegate + outputTesting: @"-[observe] with closed connection" + inModule: module]; + + break; + case 2: + if (object == _accepted && + [object readIntoBuffer: &buf + length: 1] == 0) + [_testsAppDelegate + outputSuccess: @"-[observe] with closed connection" + inModule: module]; + else + [_testsAppDelegate + outputFailure: @"-[observe] with closed connection" + inModule: module]; + + break; + default: + OF_ENSURE(0); } } @end @implementation TestsAppDelegate (OFKernelEventObserverTests) - (void)kernelEventObserverTestsWithClass: (Class)class { - ObserverDelegate *delegate = - [[[ObserverDelegate alloc] init] autorelease]; - OFTCPSocket *sock1 = [OFTCPSocket socket]; - OFTCPSocket *sock2 = [OFTCPSocket socket]; - uint16_t port; + ObserverTest *test; module = [class className]; - events = 0; - expectedObject = nil; - readData = expectEOS = false; - accepted = nil; - - port = [sock1 bindToHost: @"127.0.0.1" - port: 0]; - [sock1 listen]; + test = [[[ObserverTest alloc] + initWithTestsAppDelegate: self] autorelease]; TEST(@"+[observer]", - (observer = [class observer]) && - R([observer setDelegate: delegate])) + (test->_observer = [OFKernelEventObserver observer])) + [test->_observer setDelegate: test]; TEST(@"-[addObjectForReading:]", - R([observer addObjectForReading: sock1])) - - [sock2 connectToHost: @"127.0.0.1" - port: port]; - TEST(@"-[observe] waiting for connection", - (expectedObject = sock1) && - [observer observeForTimeInterval: 0.01]) - [accepted autorelease]; - - TEST(@"-[observe] waiting for data", - (expectedObject = sock2) && - R([observer addObjectForReading: sock2]) && - [observer observeForTimeInterval: 0.01]) - - TEST(@"-[observe] keeping event until read", - R(readData = true) && [observer observeForTimeInterval: 0.01]) - - TEST(@"-[observe] time out due to no events", - R(readData = false) && ![observer observeForTimeInterval: 0.01]) - - [accepted close]; - TEST(@"-[observe] closed connection", - R(readData = true) && R(expectEOS = true) && - [observer observeForTimeInterval: 0.01]) - - TEST(@"-[observe] correct number of events", events == 4) + R([test->_observer addObjectForReading: test->_server])) + + [test run]; } - (void)kernelEventObserverTests { OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];