@@ -46,11 +46,11 @@ id _target; SEL _selector; id _context; } -- (bool)handleForObject: (id)object; +- (bool)handleObject: (id)object; @end @interface OFRunLoop_ReadQueueItem: OFRunLoop_QueueItem { @public @@ -80,10 +80,21 @@ of_stream_async_read_line_block_t _block; # endif of_string_encoding_t _encoding; } @end + +@interface OFRunLoop_WriteQueueItem: OFRunLoop_QueueItem +{ +@public +# ifdef OF_HAVE_BLOCKS + of_stream_async_write_block_t _block; +# endif + const void *_buffer; + size_t _length, _writtenLength; +} +@end @interface OFRunLoop_AcceptQueueItem: OFRunLoop_QueueItem { @public # ifdef OF_HAVE_BLOCKS @@ -102,11 +113,11 @@ size_t _length; } @end @implementation OFRunLoop_QueueItem -- (bool)handleForObject: (id)object +- (bool)handleObject: (id)object { OF_UNRECOGNIZED_SELECTOR } - (void)dealloc @@ -117,11 +128,11 @@ [super dealloc]; } @end @implementation OFRunLoop_ReadQueueItem -- (bool)handleForObject: (id)object +- (bool)handleObject: (id)object { size_t length; id exception = nil; @try { @@ -157,11 +168,11 @@ } # endif @end @implementation OFRunLoop_ExactReadQueueItem -- (bool)handleForObject: (id)object +- (bool)handleObject: (id)object { size_t length; id exception = nil; @try { @@ -211,11 +222,11 @@ } # endif @end @implementation OFRunLoop_ReadLineQueueItem -- (bool)handleForObject: (id)object +- (bool)handleObject: (id)object { OFString *line; id exception = nil; @try { @@ -251,13 +262,60 @@ [super dealloc]; } # endif @end + +@implementation OFRunLoop_WriteQueueItem +# ifdef OF_HAVE_BLOCKS +- (bool)handleObject: (id)object +{ + size_t length; + id exception = nil; + + @try { + length = [object writeBuffer: (char *)_buffer + _writtenLength + length: _length - _writtenLength]; + } @catch (id e) { + length = 0; + exception = e; + } + + _writtenLength += length; + + if (_writtenLength != _length && exception == nil) + return true; + +# ifdef OF_HAVE_BLOCKS + if (_block != NULL) + _block(object, _buffer, _writtenLength, exception); + else { +# endif + void (*func)(id, SEL, OFStream *, const void *, size_t, id, + id) = (void (*)(id, SEL, OFStream *, const void *, size_t, + id, id))[_target methodForSelector: _selector]; + + func(_target, _selector, object, _buffer, _writtenLength, + _context, exception); +# ifdef OF_HAVE_BLOCKS + } +# endif + + return false; +} + +- (void)dealloc +{ + [_block release]; + + [super dealloc]; +} +# endif +@end @implementation OFRunLoop_AcceptQueueItem -- (bool)handleForObject: (id)object +- (bool)handleObject: (id)object { OFTCPSocket *newSocket; id exception = nil; @try { @@ -292,11 +350,11 @@ } # endif @end @implementation OFRunLoop_UDPReceiveQueueItem -- (bool)handleForObject: (id)object +- (bool)handleObject: (id)object { size_t length; of_udp_socket_address_t address; id exception = nil; @@ -378,10 +436,31 @@ queueItem = [[[type alloc] init] autorelease]; \ code \ [queue appendObject: queueItem]; \ \ objc_autoreleasePoolPop(pool); +# define ADD_WRITE(type, object, code) \ + void *pool = objc_autoreleasePoolPush(); \ + OFRunLoop *runLoop = [self currentRunLoop]; \ + OFList *queue = [runLoop->_writeQueues objectForKey: object]; \ + type *queueItem; \ + \ + if (queue == nil) { \ + queue = [OFList list]; \ + [runLoop->_writeQueues setObject: queue \ + forKey: object]; \ + } \ + \ + if ([queue count] == 0) \ + [runLoop->_kernelEventObserver \ + addObjectForWriting: object]; \ + \ + queueItem = [[[type alloc] init] autorelease]; \ + code \ + [queue appendObject: queueItem]; \ + \ + objc_autoreleasePoolPop(pool); + (void)of_addAsyncReadForStream: (OFStream *)stream buffer: (void *)buffer length: (size_t)length target: (id)target @@ -424,10 +503,26 @@ queueItem->_selector = selector; queueItem->_context = [context retain]; queueItem->_encoding = encoding; }) } + ++ (void)of_addAsyncWriteForStream: (OFStream *)stream + buffer: (const void *)buffer + length: (size_t)length + target: (id)target + selector: (SEL)selector + context: (id)context +{ + ADD_WRITE(OFRunLoop_WriteQueueItem, stream, { + queueItem->_target = [target retain]; + queueItem->_selector = selector; + queueItem->_context = [context retain]; + queueItem->_buffer = buffer; + queueItem->_length = length; + }) +} + (void)of_addAsyncAcceptForTCPSocket: (OFTCPSocket *)stream target: (id)target selector: (SEL)selector context: (id)context @@ -487,10 +582,22 @@ ADD_READ(OFRunLoop_ReadLineQueueItem, stream, { queueItem->_block = [block copy]; queueItem->_encoding = encoding; }) } + ++ (void)of_addAsyncWriteForStream: (OFStream *)stream + buffer: (const void *)buffer + length: (size_t)length + block: (of_stream_async_write_block_t)block +{ + ADD_WRITE(OFRunLoop_WriteQueueItem, stream, { + queueItem->_block = [block copy]; + queueItem->_buffer = buffer; + queueItem->_length = length; + }) +} + (void)of_addAsyncAcceptForTCPSocket: (OFTCPSocket *)stream block: (of_tcp_socket_async_accept_block_t)block { ADD_READ(OFRunLoop_AcceptQueueItem, stream, { @@ -510,16 +617,30 @@ queueItem->_block = [block copy]; }) } # endif # undef ADD_READ +# undef ADD_WRITE + (void)of_cancelAsyncRequestsForObject: (id)object { void *pool = objc_autoreleasePoolPush(); OFRunLoop *runLoop = [self currentRunLoop]; OFList *queue; + + if ((queue = [runLoop->_writeQueues objectForKey: object]) != nil) { + assert([queue count] > 0); + + /* + * Clear the queue now, in case this has been called from a + * handler, as otherwise, we'd do the cleanups below twice. + */ + [queue removeAllObjects]; + + [runLoop->_kernelEventObserver removeObjectForWriting: object]; + [runLoop->_writeQueues removeObjectForKey: object]; + } if ((queue = [runLoop->_readQueues objectForKey: object]) != nil) { assert([queue count] > 0); /* @@ -549,10 +670,11 @@ #if defined(OF_HAVE_SOCKETS) _kernelEventObserver = [[OFKernelEventObserver alloc] init]; [_kernelEventObserver setDelegate: self]; _readQueues = [[OFMutableDictionary alloc] init]; + _writeQueues = [[OFMutableDictionary alloc] init]; #elif defined(OF_HAVE_THREADS) _condition = [[OFCondition alloc] init]; #endif } @catch (id e) { [self release]; @@ -569,10 +691,11 @@ [_timersQueueLock release]; #endif #if defined(OF_HAVE_SOCKETS) [_kernelEventObserver release]; [_readQueues release]; + [_writeQueues release]; #elif defined(OF_HAVE_THREADS) [_condition release]; #endif [super dealloc]; @@ -633,11 +756,11 @@ [[_readQueues objectForKey: object] retain]; assert(queue != nil); @try { - if (![[queue firstObject] handleForObject: object]) { + if (![[queue firstObject] handleObject: object]) { of_list_object_t *listObject = [queue firstListObject]; /* * The handler might have called -[cancelAsyncRequests] * so that our queue is now empty, in which case we @@ -651,10 +774,46 @@ removeObjectForReading: object]; [_readQueues removeObjectForKey: object]; } } + } + } @finally { + [queue release]; + } +} + +- (void)objectIsReadyForWriting: (id)object +{ + /* + * Retain the queue so that it doesn't disappear from us because the + * handler called -[cancelAsyncRequests]. + */ + OFList OF_GENERIC(OF_KINDOF(OFRunLoop_WriteQueueItem *)) *queue = + [[_writeQueues objectForKey: object] retain]; + + assert(queue != nil); + + @try { + if (![[queue firstObject] handleObject: object]) { + of_list_object_t *listObject = [queue firstListObject]; + + /* + * The handler might have called -[cancelAsyncRequests] + * so that our queue is now empty, in which case we + * should do nothing. + */ + if (listObject != NULL) { + [queue removeListObject: listObject]; + + if ([queue count] == 0) { + [_kernelEventObserver + removeObjectForWriting: object]; + [_writeQueues + removeObjectForKey: object]; + } + } } } @finally { [queue release]; } }