@@ -86,10 +86,21 @@ # ifdef OF_HAVE_BLOCKS of_tcp_socket_async_accept_block_t _block; # endif } @end + +@interface OFRunLoop_UDPReceiveQueueItem: OFRunLoop_QueueItem +{ +@public +# ifdef OF_HAVE_BLOCKS + of_udp_socket_async_receive_block_t _block; +# endif + void *_buffer; + size_t _length; +} +@end @implementation OFRunLoop_QueueItem - (void)dealloc { [_target release]; @@ -139,10 +150,21 @@ [super dealloc]; } # endif @end + +@implementation OFRunLoop_UDPReceiveQueueItem +# ifdef OF_HAVE_BLOCKS +- (void)dealloc +{ + [_block release]; + + [super dealloc]; +} +# endif +@end #endif @implementation OFRunLoop + (OFRunLoop*)mainRunLoop { @@ -162,25 +184,25 @@ { mainRunLoop = [runLoop retain]; } #ifdef OF_HAVE_SOCKETS -# define ADD(type, code) \ +# define ADD_READ(type, object, code) \ void *pool = objc_autoreleasePoolPush(); \ OFRunLoop *runLoop = [self currentRunLoop]; \ - OFList *queue = [runLoop->_readQueues objectForKey: stream]; \ + OFList *queue = [runLoop->_readQueues objectForKey: object]; \ type *queueItem; \ \ if (queue == nil) { \ queue = [OFList list]; \ [runLoop->_readQueues setObject: queue \ - forKey: stream]; \ + forKey: object]; \ } \ \ if ([queue count] == 0) \ [runLoop->_kernelEventObserver \ - addObjectForReading: stream]; \ + addObjectForReading: object]; \ \ queueItem = [[[type alloc] init] autorelease]; \ code \ [queue appendObject: queueItem]; \ \ @@ -190,11 +212,11 @@ buffer: (void*)buffer length: (size_t)length target: (id)target selector: (SEL)selector { - ADD(OFRunLoop_ReadQueueItem, { + ADD_READ(OFRunLoop_ReadQueueItem, stream, { queueItem->_target = [target retain]; queueItem->_selector = selector; queueItem->_buffer = buffer; queueItem->_length = length; }) @@ -204,11 +226,11 @@ buffer: (void*)buffer exactLength: (size_t)exactLength target: (id)target selector: (SEL)selector { - ADD(OFRunLoop_ExactReadQueueItem, { + ADD_READ(OFRunLoop_ExactReadQueueItem, stream, { queueItem->_target = [target retain]; queueItem->_selector = selector; queueItem->_buffer = buffer; queueItem->_exactLength = exactLength; }) @@ -217,11 +239,11 @@ + (void)OF_addAsyncReadLineForStream: (OFStream*)stream encoding: (of_string_encoding_t)encoding target: (id)target selector: (SEL)selector { - ADD(OFRunLoop_ReadLineQueueItem, { + ADD_READ(OFRunLoop_ReadLineQueueItem, stream, { queueItem->_target = [target retain]; queueItem->_selector = selector; queueItem->_encoding = encoding; }) } @@ -228,11 +250,25 @@ + (void)OF_addAsyncAcceptForTCPSocket: (OFTCPSocket*)stream target: (id)target selector: (SEL)selector { - ADD(OFRunLoop_AcceptQueueItem, { + ADD_READ(OFRunLoop_AcceptQueueItem, stream, { + queueItem->_target = [target retain]; + queueItem->_selector = selector; + }) +} + ++ (void)OF_addAsyncReceiveForUDPSocket: (OFUDPSocket*)socket + buffer: (void*)buffer + length: (size_t)length + target: (id)target + selector: (SEL)selector +{ + ADD_READ(OFRunLoop_UDPReceiveQueueItem, socket, { + queueItem->_buffer = buffer; + queueItem->_length = length; queueItem->_target = [target retain]; queueItem->_selector = selector; }) } @@ -240,11 +276,11 @@ + (void)OF_addAsyncReadForStream: (OFStream*)stream buffer: (void*)buffer length: (size_t)length block: (of_stream_async_read_block_t)block { - ADD(OFRunLoop_ReadQueueItem, { + ADD_READ(OFRunLoop_ReadQueueItem, stream, { queueItem->_block = [block copy]; queueItem->_buffer = buffer; queueItem->_length = length; }) } @@ -252,11 +288,11 @@ + (void)OF_addAsyncReadForStream: (OFStream*)stream buffer: (void*)buffer exactLength: (size_t)exactLength block: (of_stream_async_read_block_t)block { - ADD(OFRunLoop_ExactReadQueueItem, { + ADD_READ(OFRunLoop_ExactReadQueueItem, stream, { queueItem->_block = [block copy]; queueItem->_buffer = buffer; queueItem->_exactLength = exactLength; }) } @@ -263,37 +299,50 @@ + (void)OF_addAsyncReadLineForStream: (OFStream*)stream encoding: (of_string_encoding_t)encoding block: (of_stream_async_read_line_block_t)block { - ADD(OFRunLoop_ReadLineQueueItem, { + ADD_READ(OFRunLoop_ReadLineQueueItem, stream, { queueItem->_block = [block copy]; queueItem->_encoding = encoding; }) } + (void)OF_addAsyncAcceptForTCPSocket: (OFTCPSocket*)stream block: (of_tcp_socket_async_accept_block_t)block { - ADD(OFRunLoop_AcceptQueueItem, { + ADD_READ(OFRunLoop_AcceptQueueItem, stream, { + queueItem->_block = [block copy]; + }) +} + ++ (void)OF_addAsyncReceiveForUDPSocket: (OFUDPSocket*)socket + buffer: (void*)buffer + length: (size_t)length + block: (of_udp_socket_async_receive_block_t) + block +{ + ADD_READ(OFRunLoop_UDPReceiveQueueItem, socket, { + queueItem->_buffer = buffer; + queueItem->_length = length; queueItem->_block = [block copy]; }) } # endif -# undef ADD +# undef ADD_READ -+ (void)OF_cancelAsyncRequestsForStream: (OFStream*)stream ++ (void)OF_cancelAsyncRequestsForObject: (id)object { void *pool = objc_autoreleasePoolPush(); OFRunLoop *runLoop = [self currentRunLoop]; OFList *queue; - if ((queue = [runLoop->_readQueues objectForKey: stream]) != nil) { + if ((queue = [runLoop->_readQueues objectForKey: object]) != nil) { assert([queue count] > 0); - [runLoop->_kernelEventObserver removeObjectForReading: stream]; - [runLoop->_readQueues removeObjectForKey: stream]; + [runLoop->_kernelEventObserver removeObjectForReading: object]; + [runLoop->_readQueues removeObjectForKey: object]; } objc_autoreleasePoolPop(pool); } #endif @@ -601,10 +650,63 @@ if (!func(queueItem->_target, queueItem->_selector, object, newSocket, exception)) { [queue removeListObject: listObject]; + if ([queue count] == 0) { + [_kernelEventObserver + removeObjectForReading: object]; + [_readQueues + removeObjectForKey: object]; + } + } +# ifdef OF_HAVE_BLOCKS + } +# endif + } else if ([listObject->object isKindOfClass: + [OFRunLoop_UDPReceiveQueueItem class]]) { + OFRunLoop_UDPReceiveQueueItem *queueItem = listObject->object; + size_t length; + of_udp_socket_address_t address; + OFException *exception = nil; + + @try { + length = [object receiveIntoBuffer: queueItem->_buffer + length: queueItem->_length + sender: &address]; + } @catch (OFException *e) { + length = 0; + exception = e; + } + +# ifdef OF_HAVE_BLOCKS + if (queueItem->_block != NULL) { + if (!queueItem->_block(object, queueItem->_buffer, + length, address, exception)) { + [queue removeListObject: listObject]; + + if ([queue count] == 0) { + [_kernelEventObserver + removeObjectForReading: object]; + [_readQueues + removeObjectForKey: object]; + } + } + } else { +# endif + bool (*func)(id, SEL, OFUDPSocket*, void*, size_t, + of_udp_socket_address_t address, OFException*) = + (bool(*)(id, SEL, OFUDPSocket*, void*, size_t, + of_udp_socket_address_t, OFException*)) + [queueItem->_target methodForSelector: + queueItem->_selector]; + + if (!func(queueItem->_target, queueItem->_selector, + object, queueItem->_buffer, length, address, + exception)) { + [queue removeListObject: listObject]; + if ([queue count] == 0) { [_kernelEventObserver removeObjectForReading: object]; [_readQueues removeObjectForKey: object];