Index: src/OFEpollKernelEventObserver.m ================================================================== --- src/OFEpollKernelEventObserver.m +++ src/OFEpollKernelEventObserver.m @@ -27,16 +27,11 @@ #include #import "OFEpollKernelEventObserver.h" #import "OFArray.h" -#import "OFKernelEventObserver+Private.h" -#import "OFKernelEventObserver.h" #import "OFMapTable.h" -#ifdef OF_HAVE_THREADS -# import "OFMutex.h" -#endif #import "OFNull.h" #import "OFInitializationFailedException.h" #import "OFObserveFailedException.h" @@ -156,32 +151,32 @@ [_FDToEvents setObject: (void *)events forKey: (void *)((intptr_t)fd + 1)]; } } -- (void)of_addObjectForReading: (id )object +- (void)addObjectForReading: (id )object { [self of_addObject: object fileDescriptor: object.fileDescriptorForReading events: EPOLLIN]; } -- (void)of_addObjectForWriting: (id )object +- (void)addObjectForWriting: (id )object { [self of_addObject: object fileDescriptor: object.fileDescriptorForWriting events: EPOLLOUT]; } -- (void)of_removeObjectForReading: (id )object +- (void)removeObjectForReading: (id )object { [self of_removeObject: object fileDescriptor: object.fileDescriptorForReading events: EPOLLIN]; } -- (void)of_removeObjectForWriting: (id )object +- (void)removeObjectForWriting: (id )object { [self of_removeObject: object fileDescriptor: object.fileDescriptorForWriting events: EPOLLOUT]; } @@ -190,12 +185,10 @@ { OFNull *nullObject = [OFNull null]; struct epoll_event eventList[EVENTLIST_SIZE]; int events; - [self of_processQueue]; - if ([self of_processReadBuffers]) return; events = epoll_wait(_epfd, eventList, EVENTLIST_SIZE, (timeInterval != -1 ? timeInterval * 1000 : -1)); DELETED src/OFKernelEventObserver+Private.h Index: src/OFKernelEventObserver+Private.h ================================================================== --- src/OFKernelEventObserver+Private.h +++ src/OFKernelEventObserver+Private.h @@ -1,31 +0,0 @@ -/* - * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, - * 2018, 2019, 2020 - * Jonathan Schleifer - * - * All rights reserved. - * - * This file is part of ObjFW. It may be distributed under the terms of the - * Q Public License 1.0, which can be found in the file LICENSE.QPL included in - * the packaging of this file. - * - * Alternatively, it may be distributed under the terms of the GNU General - * Public License, either version 2 or 3, which can be found in the file - * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this - * file. - */ - -#import "OFKernelEventObserver.h" - -OF_ASSUME_NONNULL_BEGIN - -@interface OFKernelEventObserver () -- (void)of_addObjectForReading: (id )object; -- (void)of_addObjectForWriting: (id )object; -- (void)of_removeObjectForReading: (id )object; -- (void)of_removeObjectForWriting: (id )object; -- (void)of_processQueue; -- (bool)of_processReadBuffers; -@end - -OF_ASSUME_NONNULL_END Index: src/OFKernelEventObserver.h ================================================================== --- src/OFKernelEventObserver.h +++ src/OFKernelEventObserver.h @@ -134,16 +134,10 @@ struct sockaddr_in _cancelAddr; #endif #ifdef OF_AMIGAOS ULONG _execSignalMask; #endif -@private - OFMutableData *_queueActions; - OFMutableArray *_queueObjects; -#ifdef OF_HAVE_THREADS - OFMutex *_mutex; -#endif OF_RESERVE_IVARS(4) } /*! * @brief The delegate for the OFKernelEventObserver. @@ -232,14 +226,20 @@ - (void)observeUntilDate: (OFDate *)date; /*! * @brief Cancels the currently blocking observe call. * - * This is automatically done when a new object is added or removed by another - * thread, but in some circumstances, it might be desirable for a thread to - * manually stop the observe running in another thread. + * This is the only method that can and should be called from another thread + * than the one using the observer. */ - (void)cancel; + +/*! + * @brief This method should be called by subclasses in @ref observeUntilDate: + * as the first thing to handle all sockets that currently have data in + * the read buffer. + */ +- (bool)of_processReadBuffers; @end #endif OF_ASSUME_NONNULL_END Index: src/OFKernelEventObserver.m ================================================================== --- src/OFKernelEventObserver.m +++ src/OFKernelEventObserver.m @@ -20,17 +20,13 @@ #include "config.h" #include #import "OFKernelEventObserver.h" -#import "OFKernelEventObserver+Private.h" #import "OFArray.h" #import "OFData.h" #import "OFDate.h" -#ifdef OF_HAVE_THREADS -# import "OFMutex.h" -#endif #import "OFStream.h" #import "OFStream+Private.h" #ifndef OF_HAVE_PIPE # import "OFStreamSocket.h" #endif @@ -168,18 +164,10 @@ @throw [OFInitializationFailedException exceptionWithClass: self.class]; } # endif #endif - -#ifdef OF_HAVE_THREADS - _mutex = [[OFMutex alloc] init]; -#endif - - _queueActions = [[OFMutableData alloc] - initWithItemSize: sizeof(int)]; - _queueObjects = [[OFMutableArray alloc] init]; } @catch (id e) { [self release]; @throw e; } @@ -199,187 +187,31 @@ #endif [_readObjects release]; [_writeObjects release]; -#ifdef OF_HAVE_THREADS - [_mutex release]; -#endif - - [_queueActions release]; - [_queueObjects release]; - [super dealloc]; } - (void)addObjectForReading: (id )object { -#ifdef OF_HAVE_THREADS - [_mutex lock]; - @try { -#endif - int action = QUEUE_ADD | QUEUE_READ; - - [_queueActions addItem: &action]; - [_queueObjects addObject: object]; -#ifdef OF_HAVE_THREADS - } @finally { - [_mutex unlock]; - } -#endif - - [self cancel]; + OF_UNRECOGNIZED_SELECTOR } - (void)addObjectForWriting: (id )object { -#ifdef OF_HAVE_THREADS - [_mutex lock]; - @try { -#endif - int action = QUEUE_ADD | QUEUE_WRITE; - - [_queueActions addItem: &action]; - [_queueObjects addObject: object]; -#ifdef OF_HAVE_THREADS - } @finally { - [_mutex unlock]; - } -#endif - - [self cancel]; + OF_UNRECOGNIZED_SELECTOR } - (void)removeObjectForReading: (id )object { -#ifdef OF_HAVE_THREADS - [_mutex lock]; - @try { -#endif - int action = QUEUE_REMOVE | QUEUE_READ; - - [_queueActions addItem: &action]; - [_queueObjects addObject: object]; -#ifdef OF_HAVE_THREADS - } @finally { - [_mutex unlock]; - } -#endif - - [self cancel]; + OF_UNRECOGNIZED_SELECTOR } - (void)removeObjectForWriting: (id )object { -#ifdef OF_HAVE_THREADS - [_mutex lock]; - @try { -#endif - int action = QUEUE_REMOVE | QUEUE_WRITE; - - [_queueActions addItem: &action]; - [_queueObjects addObject: object]; -#ifdef OF_HAVE_THREADS - } @finally { - [_mutex unlock]; - } -#endif - - [self cancel]; -} - -- (void)of_addObjectForReading: (id )object -{ - OF_UNRECOGNIZED_SELECTOR -} - -- (void)of_addObjectForWriting: (id )object -{ - OF_UNRECOGNIZED_SELECTOR -} - -- (void)of_removeObjectForReading: (id )object -{ - OF_UNRECOGNIZED_SELECTOR -} - -- (void)of_removeObjectForWriting: (id )object -{ - OF_UNRECOGNIZED_SELECTOR -} - -- (void)of_processQueue -{ - void *pool = objc_autoreleasePoolPush(); - -#ifdef OF_HAVE_THREADS - [_mutex lock]; - @try { -#endif - const int *queueActions = _queueActions.items; - id const *queueObjects = _queueObjects.objects; - size_t count = _queueActions.count; - - OF_ENSURE(_queueObjects.count == count); - - for (size_t i = 0; i < count; i++) { - int action = queueActions[i]; - id object = queueObjects[i]; - - switch (action) { - case QUEUE_ADD | QUEUE_READ: - [_readObjects addObject: object]; - - @try { - [self of_addObjectForReading: object]; - } @catch (id e) { - [_readObjects - removeObjectIdenticalTo: object]; - - @throw e; - } - - break; - case QUEUE_ADD | QUEUE_WRITE: - [_writeObjects addObject: object]; - - @try { - [self of_addObjectForWriting: object]; - } @catch (id e) { - [_writeObjects - removeObjectIdenticalTo: object]; - - @throw e; - } - - break; - case QUEUE_REMOVE | QUEUE_READ: - [self of_removeObjectForReading: object]; - - [_readObjects removeObjectIdenticalTo: object]; - - break; - case QUEUE_REMOVE | QUEUE_WRITE: - [self of_removeObjectForWriting: object]; - - [_writeObjects removeObjectIdenticalTo: object]; - - break; - default: - OF_ENSURE(0); - } - } - - [_queueActions removeAllItems]; - [_queueObjects removeAllObjects]; -#ifdef OF_HAVE_THREADS - } @finally { - [_mutex unlock]; - } -#endif - - objc_autoreleasePoolPop(pool); + OF_UNRECOGNIZED_SELECTOR } - (bool)of_processReadBuffers { bool foundInReadBuffer = false; Index: src/OFKqueueKernelEventObserver.m ================================================================== --- src/OFKqueueKernelEventObserver.m +++ src/OFKqueueKernelEventObserver.m @@ -29,15 +29,10 @@ #include #include #import "OFKqueueKernelEventObserver.h" #import "OFArray.h" -#import "OFKernelEventObserver.h" -#import "OFKernelEventObserver+Private.h" -#ifdef OF_HAVE_THREADS -# import "OFMutex.h" -#endif #import "OFInitializationFailedException.h" #import "OFObserveFailedException.h" #import "OFOutOfRangeException.h" @@ -84,11 +79,11 @@ close(_kernelQueue); [super dealloc]; } -- (void)of_addObjectForReading: (id )object +- (void)addObjectForReading: (id )object { struct kevent event; memset(&event, 0, sizeof(event)); event.ident = object.fileDescriptorForReading; @@ -103,11 +98,11 @@ if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; } -- (void)of_addObjectForWriting: (id )object +- (void)addObjectForWriting: (id )object { struct kevent event; memset(&event, 0, sizeof(event)); event.ident = object.fileDescriptorForWriting; @@ -122,11 +117,11 @@ if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; } -- (void)of_removeObjectForReading: (id )object +- (void)removeObjectForReading: (id )object { struct kevent event; memset(&event, 0, sizeof(event)); event.ident = object.fileDescriptorForReading; @@ -136,11 +131,11 @@ if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; } -- (void)of_removeObjectForWriting: (id )object +- (void)removeObjectForWriting: (id )object { struct kevent event; memset(&event, 0, sizeof(event)); event.ident = object.fileDescriptorForWriting; @@ -156,12 +151,10 @@ { struct timespec timeout; struct kevent eventList[EVENTLIST_SIZE]; int events; - [self of_processQueue]; - if ([self of_processReadBuffers]) return; timeout.tv_sec = (time_t)timeInterval; timeout.tv_nsec = (timeInterval - timeout.tv_sec) * 1000000000; Index: src/OFPollKernelEventObserver.m ================================================================== --- src/OFPollKernelEventObserver.m +++ src/OFPollKernelEventObserver.m @@ -26,12 +26,10 @@ # include #endif #import "OFPollKernelEventObserver.h" #import "OFData.h" -#import "OFKernelEventObserver+Private.h" -#import "OFKernelEventObserver.h" #import "OFObserveFailedException.h" #import "OFOutOfRangeException.h" #import "socket_helpers.h" @@ -124,32 +122,32 @@ break; } } } -- (void)of_addObjectForReading: (id )object +- (void)addObjectForReading: (id )object { [self of_addObject: object fileDescriptor: object.fileDescriptorForReading events: POLLIN]; } -- (void)of_addObjectForWriting: (id )object +- (void)addObjectForWriting: (id )object { [self of_addObject: object fileDescriptor: object.fileDescriptorForWriting events: POLLOUT]; } -- (void)of_removeObjectForReading: (id )object +- (void)removeObjectForReading: (id )object { [self of_removeObject: object fileDescriptor: object.fileDescriptorForReading events: POLLIN]; } -- (void)of_removeObjectForWriting: (id )object +- (void)removeObjectForWriting: (id )object { [self of_removeObject: object fileDescriptor: object.fileDescriptorForWriting events: POLLOUT]; } @@ -158,12 +156,10 @@ { struct pollfd *FDs; int events; size_t nFDs; - [self of_processQueue]; - if ([self of_processReadBuffers]) return; FDs = _FDs.mutableItems; nFDs = _FDs.count; Index: src/OFSelectKernelEventObserver.m ================================================================== --- src/OFSelectKernelEventObserver.m +++ src/OFSelectKernelEventObserver.m @@ -31,12 +31,10 @@ #include #import "OFSelectKernelEventObserver.h" #import "OFArray.h" -#import "OFKernelEventObserver+Private.h" -#import "OFKernelEventObserver.h" #import "OFInitializationFailedException.h" #import "OFObserveFailedException.h" #import "OFOutOfRangeException.h" @@ -71,11 +69,11 @@ #endif return self; } -- (void)of_addObjectForReading: (id )object +- (void)addObjectForReading: (id )object { int fd = object.fileDescriptorForReading; if (fd < 0 || fd > INT_MAX - 1) @throw [OFOutOfRangeException exception]; @@ -89,11 +87,11 @@ _maxFD = fd; FD_SET((of_socket_t)fd, &_readFDs); } -- (void)of_addObjectForWriting: (id )object +- (void)addObjectForWriting: (id )object { int fd = object.fileDescriptorForWriting; if (fd < 0 || fd > INT_MAX - 1) @throw [OFOutOfRangeException exception]; @@ -107,11 +105,11 @@ _maxFD = fd; FD_SET((of_socket_t)fd, &_writeFDs); } -- (void)of_removeObjectForReading: (id )object +- (void)removeObjectForReading: (id )object { /* TODO: Adjust _maxFD */ int fd = object.fileDescriptorForReading; @@ -124,11 +122,11 @@ #endif FD_CLR((of_socket_t)fd, &_readFDs); } -- (void)of_removeObjectForWriting: (id )object +- (void)removeObjectForWriting: (id )object { /* TODO: Adjust _maxFD */ int fd = object.fileDescriptorForWriting; @@ -153,12 +151,10 @@ #ifdef OF_AMIGAOS ULONG execSignalMask, cancelSignal; #endif size_t count; - [self of_processQueue]; - if ([self of_processReadBuffers]) return; #ifdef FD_COPY FD_COPY(&_readFDs, &readFDs); Index: src/OFTCPSocket.m ================================================================== --- src/OFTCPSocket.m +++ src/OFTCPSocket.m @@ -222,18 +222,40 @@ - (void)of_socketDidConnect: (OFTCPSocket *)sock exception: (id)exception { if (exception != nil) { + /* + * self might be retained only by the pending async requests, + * which we're about to cancel. + */ + [[self retain] autorelease]; + + [sock cancelAsyncRequests]; [sock of_closeSocket]; if (_socketAddressesIndex >= _socketAddresses.count) { _exception = [exception retain]; [self didConnect]; - } else - [self tryNextAddressWithRunLoopMode: - [OFRunLoop currentRunLoop].currentMode]; + } else { + /* + * We must not call it before returning, as otherwise + * the new socket would be removed from the queue upon + * return. + */ + OFRunLoop *runLoop = [OFRunLoop currentRunLoop]; + SEL selector = + @selector(tryNextAddressWithRunLoopMode:); + OFTimer *timer = [OFTimer + timerWithTimeInterval: 0 + target: self + selector: selector + object: runLoop.currentMode + repeats: false]; + [runLoop addTimer: timer + forMode: runLoop.currentMode]; + } return; } if (_SOCKS5Host != nil)