@@ -25,10 +25,12 @@ #import "OFArray.h" #import "OFData.h" #import "OFDictionary.h" #ifdef OF_HAVE_SOCKETS # import "OFKernelEventObserver.h" +# import "OFDatagramSocket.h" +# import "OFSequencedPacketSocket.h" # import "OFTCPSocket.h" # import "OFTCPSocket+Private.h" #endif #import "OFThread.h" #ifdef OF_HAVE_THREADS @@ -155,11 +157,11 @@ @interface OFRunLoopAcceptQueueItem: OFRunLoopQueueItem { @public # ifdef OF_HAVE_BLOCKS - of_tcp_socket_async_accept_block_t _block; + id _block; # endif } @end @interface OFRunLoopDatagramReceiveQueueItem: OFRunLoopQueueItem @@ -181,10 +183,31 @@ # endif OFData *_data; of_socket_address_t _receiver; } @end + +@interface OFRunLoopPacketReceiveQueueItem: OFRunLoopQueueItem +{ +@public +# ifdef OF_HAVE_BLOCKS + of_sequenced_packet_socket_async_receive_block_t _block; +# endif + void *_buffer; + size_t _length; +} +@end + +@interface OFRunLoopPacketSendQueueItem: OFRunLoopQueueItem +{ +@public +# ifdef OF_HAVE_BLOCKS + of_sequenced_packet_socket_async_send_data_block_t _block; +# endif + OFData *_data; +} +@end #endif @implementation OFRunLoopState - (instancetype)init { @@ -733,24 +756,32 @@ # endif @implementation OFRunLoopAcceptQueueItem - (bool)handleObject: (id)object { - OFTCPSocket *acceptedSocket; - id exception = nil; + id acceptedSocket, exception = nil; @try { acceptedSocket = [object accept]; } @catch (id e) { acceptedSocket = nil; exception = e; } # ifdef OF_HAVE_BLOCKS - if (_block != NULL) - return _block(object, acceptedSocket, exception); - else { + if (_block != NULL) { + if ([object isKindOfClass: [OFTCPSocket class]]) + return ((of_tcp_socket_async_accept_block_t) + _block)(object, acceptedSocket, exception); + else if ([object isKindOfClass: + [OFSequencedPacketSocket class]]) + return + ((of_sequenced_packet_socket_async_accept_block_t) + _block)(object, acceptedSocket, exception); + else + OF_ENSURE(0); + } else { # endif if (![_delegate respondsToSelector: @selector(socket:didAcceptSocket:exception:)]) return false; @@ -852,10 +883,111 @@ newData = [_delegate socket: object didSendData: _data receiver: &_receiver exception: exception]; + if (newData == nil) + return false; + + oldData = _data; + _data = [newData copy]; + [oldData release]; + + return true; +# ifdef OF_HAVE_BLOCKS + } +# endif +} + +- (void)dealloc +{ + [_data release]; +# ifdef OF_HAVE_BLOCKS + [_block release]; +# endif + + [super dealloc]; +} +@end + +@implementation OFRunLoopPacketReceiveQueueItem +- (bool)handleObject: (id)object +{ + size_t length; + id exception = nil; + + @try { + length = [object receiveIntoBuffer: _buffer + length: _length]; + } @catch (id e) { + length = 0; + exception = e; + } + +# ifdef OF_HAVE_BLOCKS + if (_block != NULL) + return _block(object, _buffer, length, exception); + else { +# endif + if (![_delegate respondsToSelector: @selector( + socket:didReceiveIntoBuffer:length:exception:)]) + return false; + + return [_delegate socket: object + didReceiveIntoBuffer: _buffer + length: length + exception: exception]; +# ifdef OF_HAVE_BLOCKS + } +# endif +} + +# ifdef OF_HAVE_BLOCKS +- (void)dealloc +{ + [_block release]; + + [super dealloc]; +} +# endif +@end + +@implementation OFRunLoopPacketSendQueueItem +- (bool)handleObject: (id)object +{ + id exception = nil; + OFData *newData, *oldData; + + @try { + [object sendBuffer: _data.items + length: _data.count * _data.itemSize]; + } @catch (id e) { + exception = e; + } + +# ifdef OF_HAVE_BLOCKS + if (_block != NULL) { + newData = _block(object, _data, exception); + + if (newData == nil) + return false; + + oldData = _data; + _data = [newData copy]; + [oldData release]; + + return true; + } else { +# endif + if (![_delegate respondsToSelector: + @selector(socket:didSendData:exception:)]) + return false; + + newData = [_delegate socket: object + didSendData: _data + exception: exception]; + if (newData == nil) return false; oldData = _data; _data = [newData copy]; @@ -1063,18 +1195,16 @@ QUEUE_ITEM } # endif -+ (void)of_addAsyncAcceptForTCPSocket: (OFTCPSocket *)stream - mode: (of_run_loop_mode_t)mode -# ifdef OF_HAVE_BLOCKS - block: (of_tcp_socket_async_accept_block_t)block -# endif - delegate: (id )delegate -{ - NEW_READ(OFRunLoopAcceptQueueItem, stream, mode) ++ (void)of_addAsyncAcceptForSocket: (id)sock + mode: (of_run_loop_mode_t)mode + block: (id)block + delegate: (id)delegate +{ + NEW_READ(OFRunLoopAcceptQueueItem, sock, mode) queueItem->_delegate = [delegate retain]; # ifdef OF_HAVE_BLOCKS queueItem->_block = [block copy]; # endif @@ -1119,10 +1249,51 @@ queueItem->_block = [block copy]; # endif queueItem->_data = [data copy]; queueItem->_receiver = *receiver; + QUEUE_ITEM +} + ++ (void)of_addAsyncReceiveForSequencedPacketSocket: (OFSequencedPacketSocket *) + sock + buffer: (void *)buffer + length: (size_t)length + mode: (of_run_loop_mode_t)mode +# ifdef OF_HAVE_BLOCKS + block: (of_sequenced_packet_socket_async_receive_block_t)block +# endif + delegate: (id )delegate +{ + NEW_READ(OFRunLoopPacketReceiveQueueItem, sock, mode) + + queueItem->_delegate = [delegate retain]; +# ifdef OF_HAVE_BLOCKS + queueItem->_block = [block copy]; +# endif + queueItem->_buffer = buffer; + queueItem->_length = length; + + QUEUE_ITEM +} + ++ (void)of_addAsyncSendForSequencedPacketSocket: (OFSequencedPacketSocket *)sock + data: (OFData *)data + mode: (of_run_loop_mode_t)mode +# ifdef OF_HAVE_BLOCKS + block: (of_sequenced_packet_socket_async_send_data_block_t)block +# endif + delegate: (id )delegate +{ + NEW_WRITE(OFRunLoopPacketSendQueueItem, sock, mode) + + queueItem->_delegate = [delegate retain]; +# ifdef OF_HAVE_BLOCKS + queueItem->_block = [block copy]; +# endif + queueItem->_data = [data copy]; + QUEUE_ITEM } # undef NEW_READ # undef NEW_WRITE # undef QUEUE_ITEM