Index: src/OFFile.m ================================================================== --- src/OFFile.m +++ src/OFFile.m @@ -354,12 +354,12 @@ _atEndOfStream = true; return ret; } -- (void)lowlevelWriteBuffer: (const void *)buffer - length: (size_t)length +- (size_t)lowlevelWriteBuffer: (const void *)buffer + length: (size_t)length { if (_handle == OF_INVALID_FILE_HANDLE) @throw [OFNotOpenException exceptionWithObject: self]; #if defined(OF_WINDOWS) @@ -404,15 +404,11 @@ requestedLength: length bytesWritten: 0 errNo: errno]; #endif - if ((size_t)bytesWritten != length) - @throw [OFWriteFailedException exceptionWithObject: self - requestedLength: length - bytesWritten: bytesWritten - errNo: 0]; + return (size_t)bytesWritten; } - (of_offset_t)lowlevelSeekToOffset: (of_offset_t)offset whence: (int)whence { Index: src/OFHTTPServer.m ================================================================== --- src/OFHTTPServer.m +++ src/OFHTTPServer.m @@ -253,35 +253,37 @@ isEqual: @"chunked"]; objc_autoreleasePoolPop(pool); } -- (void)lowlevelWriteBuffer: (const void *)buffer - length: (size_t)length +- (size_t)lowlevelWriteBuffer: (const void *)buffer + length: (size_t)length { + /* TODO: Use non-blocking writes */ + void *pool; if (_socket == nil) @throw [OFNotOpenException exceptionWithObject: self]; if (!_headersSent) [self of_sendHeaders]; - if (!_chunked) { - [_socket writeBuffer: buffer - length: length]; - return; - } + if (!_chunked) + return [_socket writeBuffer: buffer + length: length]; pool = objc_autoreleasePoolPush(); [_socket writeString: [OFString stringWithFormat: @"%zx\r\n", length]]; objc_autoreleasePoolPop(pool); [_socket writeBuffer: buffer length: length]; [_socket writeBuffer: "\r\n" length: 2]; + + return length; } - (void)close { if (_socket == nil) Index: src/OFProcess.m ================================================================== --- src/OFProcess.m +++ src/OFProcess.m @@ -482,12 +482,12 @@ _atEndOfStream = true; return ret; } -- (void)lowlevelWriteBuffer: (const void *)buffer - length: (size_t)length +- (size_t)lowlevelWriteBuffer: (const void *)buffer + length: (size_t)length { #ifndef OF_WINDOWS ssize_t bytesWritten; if (_writePipe[1] == -1) @@ -522,15 +522,11 @@ bytesWritten: 0 errNo: errNo]; } #endif - if ((size_t)bytesWritten != length) - @throw [OFWriteFailedException exceptionWithObject: self - requestedLength: length - bytesWritten: bytesWritten - errNo: 0]; + return (size_t)bytesWritten; } - (int)fileDescriptorForReading { #ifndef OF_WINDOWS Index: src/OFRunLoop+Private.h ================================================================== --- src/OFRunLoop+Private.h +++ src/OFRunLoop+Private.h @@ -40,10 +40,16 @@ + (void)of_addAsyncReadLineForStream: (OFStream *)stream encoding: (of_string_encoding_t)encoding target: (id)target selector: (SEL)selector context: (nullable id)context; ++ (void)of_addAsyncWriteForStream: (OFStream *)stream + buffer: (const void *)buffer + length: (size_t)length + target: (id)target + selector: (SEL)selector + context: (nullable id)context; + (void)of_addAsyncAcceptForTCPSocket: (OFTCPSocket *)socket target: (id)target selector: (SEL)selector context: (nullable id)context; + (void)of_addAsyncReceiveForUDPSocket: (OFUDPSocket *)socket @@ -62,10 +68,14 @@ exactLength: (size_t)length block: (of_stream_async_read_block_t)block; + (void)of_addAsyncReadLineForStream: (OFStream *)stream encoding: (of_string_encoding_t)encoding block: (of_stream_async_read_line_block_t)block; ++ (void)of_addAsyncWriteForStream: (OFStream *)stream + buffer: (const void *)buffer + length: (size_t)length + block: (of_stream_async_write_block_t)block; + (void)of_addAsyncAcceptForTCPSocket: (OFTCPSocket *)socket block: (of_tcp_socket_async_accept_block_t) block; + (void)of_addAsyncReceiveForUDPSocket: (OFUDPSocket *)socket buffer: (void *)buffer Index: src/OFRunLoop.h ================================================================== --- src/OFRunLoop.h +++ src/OFRunLoop.h @@ -47,11 +47,11 @@ #ifdef OF_HAVE_THREADS OFMutex *_timersQueueLock; #endif #if defined(OF_HAVE_SOCKETS) OFKernelEventObserver *_kernelEventObserver; - OFMutableDictionary *_readQueues; + OFMutableDictionary *_readQueues, *_writeQueues; #elif defined(OF_HAVE_THREADS) OFCondition *_condition; #endif volatile bool _stop; } Index: src/OFRunLoop.m ================================================================== --- src/OFRunLoop.m +++ src/OFRunLoop.m @@ -46,11 +46,11 @@ id _target; SEL _selector; id _context; } -- (bool)handleForObject: (id)object; +- (bool)handleObject: (id)object; @end @interface OFRunLoop_ReadQueueItem: OFRunLoop_QueueItem { @public @@ -80,10 +80,21 @@ of_stream_async_read_line_block_t _block; # endif of_string_encoding_t _encoding; } @end + +@interface OFRunLoop_WriteQueueItem: OFRunLoop_QueueItem +{ +@public +# ifdef OF_HAVE_BLOCKS + of_stream_async_write_block_t _block; +# endif + const void *_buffer; + size_t _length, _writtenLength; +} +@end @interface OFRunLoop_AcceptQueueItem: OFRunLoop_QueueItem { @public # ifdef OF_HAVE_BLOCKS @@ -102,11 +113,11 @@ size_t _length; } @end @implementation OFRunLoop_QueueItem -- (bool)handleForObject: (id)object +- (bool)handleObject: (id)object { OF_UNRECOGNIZED_SELECTOR } - (void)dealloc @@ -117,11 +128,11 @@ [super dealloc]; } @end @implementation OFRunLoop_ReadQueueItem -- (bool)handleForObject: (id)object +- (bool)handleObject: (id)object { size_t length; id exception = nil; @try { @@ -157,11 +168,11 @@ } # endif @end @implementation OFRunLoop_ExactReadQueueItem -- (bool)handleForObject: (id)object +- (bool)handleObject: (id)object { size_t length; id exception = nil; @try { @@ -211,11 +222,11 @@ } # endif @end @implementation OFRunLoop_ReadLineQueueItem -- (bool)handleForObject: (id)object +- (bool)handleObject: (id)object { OFString *line; id exception = nil; @try { @@ -251,13 +262,60 @@ [super dealloc]; } # endif @end + +@implementation OFRunLoop_WriteQueueItem +# ifdef OF_HAVE_BLOCKS +- (bool)handleObject: (id)object +{ + size_t length; + id exception = nil; + + @try { + length = [object writeBuffer: (char *)_buffer + _writtenLength + length: _length - _writtenLength]; + } @catch (id e) { + length = 0; + exception = e; + } + + _writtenLength += length; + + if (_writtenLength != _length && exception == nil) + return true; + +# ifdef OF_HAVE_BLOCKS + if (_block != NULL) + _block(object, _buffer, _writtenLength, exception); + else { +# endif + void (*func)(id, SEL, OFStream *, const void *, size_t, id, + id) = (void (*)(id, SEL, OFStream *, const void *, size_t, + id, id))[_target methodForSelector: _selector]; + + func(_target, _selector, object, _buffer, _writtenLength, + _context, exception); +# ifdef OF_HAVE_BLOCKS + } +# endif + + return false; +} + +- (void)dealloc +{ + [_block release]; + + [super dealloc]; +} +# endif +@end @implementation OFRunLoop_AcceptQueueItem -- (bool)handleForObject: (id)object +- (bool)handleObject: (id)object { OFTCPSocket *newSocket; id exception = nil; @try { @@ -292,11 +350,11 @@ } # endif @end @implementation OFRunLoop_UDPReceiveQueueItem -- (bool)handleForObject: (id)object +- (bool)handleObject: (id)object { size_t length; of_udp_socket_address_t address; id exception = nil; @@ -378,10 +436,31 @@ queueItem = [[[type alloc] init] autorelease]; \ code \ [queue appendObject: queueItem]; \ \ objc_autoreleasePoolPop(pool); +# define ADD_WRITE(type, object, code) \ + void *pool = objc_autoreleasePoolPush(); \ + OFRunLoop *runLoop = [self currentRunLoop]; \ + OFList *queue = [runLoop->_writeQueues objectForKey: object]; \ + type *queueItem; \ + \ + if (queue == nil) { \ + queue = [OFList list]; \ + [runLoop->_writeQueues setObject: queue \ + forKey: object]; \ + } \ + \ + if ([queue count] == 0) \ + [runLoop->_kernelEventObserver \ + addObjectForWriting: object]; \ + \ + queueItem = [[[type alloc] init] autorelease]; \ + code \ + [queue appendObject: queueItem]; \ + \ + objc_autoreleasePoolPop(pool); + (void)of_addAsyncReadForStream: (OFStream *)stream buffer: (void *)buffer length: (size_t)length target: (id)target @@ -424,10 +503,26 @@ queueItem->_selector = selector; queueItem->_context = [context retain]; queueItem->_encoding = encoding; }) } + ++ (void)of_addAsyncWriteForStream: (OFStream *)stream + buffer: (const void *)buffer + length: (size_t)length + target: (id)target + selector: (SEL)selector + context: (id)context +{ + ADD_WRITE(OFRunLoop_WriteQueueItem, stream, { + queueItem->_target = [target retain]; + queueItem->_selector = selector; + queueItem->_context = [context retain]; + queueItem->_buffer = buffer; + queueItem->_length = length; + }) +} + (void)of_addAsyncAcceptForTCPSocket: (OFTCPSocket *)stream target: (id)target selector: (SEL)selector context: (id)context @@ -487,10 +582,22 @@ ADD_READ(OFRunLoop_ReadLineQueueItem, stream, { queueItem->_block = [block copy]; queueItem->_encoding = encoding; }) } + ++ (void)of_addAsyncWriteForStream: (OFStream *)stream + buffer: (const void *)buffer + length: (size_t)length + block: (of_stream_async_write_block_t)block +{ + ADD_WRITE(OFRunLoop_WriteQueueItem, stream, { + queueItem->_block = [block copy]; + queueItem->_buffer = buffer; + queueItem->_length = length; + }) +} + (void)of_addAsyncAcceptForTCPSocket: (OFTCPSocket *)stream block: (of_tcp_socket_async_accept_block_t)block { ADD_READ(OFRunLoop_AcceptQueueItem, stream, { @@ -510,16 +617,30 @@ queueItem->_block = [block copy]; }) } # endif # undef ADD_READ +# undef ADD_WRITE + (void)of_cancelAsyncRequestsForObject: (id)object { void *pool = objc_autoreleasePoolPush(); OFRunLoop *runLoop = [self currentRunLoop]; OFList *queue; + + if ((queue = [runLoop->_writeQueues objectForKey: object]) != nil) { + assert([queue count] > 0); + + /* + * Clear the queue now, in case this has been called from a + * handler, as otherwise, we'd do the cleanups below twice. + */ + [queue removeAllObjects]; + + [runLoop->_kernelEventObserver removeObjectForWriting: object]; + [runLoop->_writeQueues removeObjectForKey: object]; + } if ((queue = [runLoop->_readQueues objectForKey: object]) != nil) { assert([queue count] > 0); /* @@ -549,10 +670,11 @@ #if defined(OF_HAVE_SOCKETS) _kernelEventObserver = [[OFKernelEventObserver alloc] init]; [_kernelEventObserver setDelegate: self]; _readQueues = [[OFMutableDictionary alloc] init]; + _writeQueues = [[OFMutableDictionary alloc] init]; #elif defined(OF_HAVE_THREADS) _condition = [[OFCondition alloc] init]; #endif } @catch (id e) { [self release]; @@ -569,10 +691,11 @@ [_timersQueueLock release]; #endif #if defined(OF_HAVE_SOCKETS) [_kernelEventObserver release]; [_readQueues release]; + [_writeQueues release]; #elif defined(OF_HAVE_THREADS) [_condition release]; #endif [super dealloc]; @@ -633,11 +756,11 @@ [[_readQueues objectForKey: object] retain]; assert(queue != nil); @try { - if (![[queue firstObject] handleForObject: object]) { + if (![[queue firstObject] handleObject: object]) { of_list_object_t *listObject = [queue firstListObject]; /* * The handler might have called -[cancelAsyncRequests] * so that our queue is now empty, in which case we @@ -651,10 +774,46 @@ removeObjectForReading: object]; [_readQueues removeObjectForKey: object]; } } + } + } @finally { + [queue release]; + } +} + +- (void)objectIsReadyForWriting: (id)object +{ + /* + * Retain the queue so that it doesn't disappear from us because the + * handler called -[cancelAsyncRequests]. + */ + OFList OF_GENERIC(OF_KINDOF(OFRunLoop_WriteQueueItem *)) *queue = + [[_writeQueues objectForKey: object] retain]; + + assert(queue != nil); + + @try { + if (![[queue firstObject] handleObject: object]) { + of_list_object_t *listObject = [queue firstListObject]; + + /* + * The handler might have called -[cancelAsyncRequests] + * so that our queue is now empty, in which case we + * should do nothing. + */ + if (listObject != NULL) { + [queue removeListObject: listObject]; + + if ([queue count] == 0) { + [_kernelEventObserver + removeObjectForWriting: object]; + [_writeQueues + removeObjectForKey: object]; + } + } } } @finally { [queue release]; } } Index: src/OFStdIOStream.m ================================================================== --- src/OFStdIOStream.m +++ src/OFStdIOStream.m @@ -216,12 +216,12 @@ _atEndOfStream = true; return ret; } -- (void)lowlevelWriteBuffer: (const void *)buffer - length: (size_t)length +- (size_t)lowlevelWriteBuffer: (const void *)buffer + length: (size_t)length { #ifndef OF_MORPHOS if (_fd == -1) @throw [OFNotOpenException exceptionWithObject: self]; @@ -262,15 +262,11 @@ requestedLength: length bytesWritten: 0 errNo: EIO]; #endif - if ((size_t)bytesWritten != length) - @throw [OFWriteFailedException exceptionWithObject: self - requestedLength: length - bytesWritten: bytesWritten - errNo: 0]; + return (size_t)bytesWritten; } #if !defined(OF_WINDOWS) && !defined(OF_MORPHOS) - (int)fileDescriptorForReading { Index: src/OFStdIOStream_Win32Console.m ================================================================== --- src/OFStdIOStream_Win32Console.m +++ src/OFStdIOStream_Win32Console.m @@ -216,12 +216,12 @@ objc_autoreleasePoolPop(pool); return j; } -- (void)lowlevelWriteBuffer: (const void *)buffer_ - length: (size_t)length +- (size_t)lowlevelWriteBuffer: (const void *)buffer_ + length: (size_t)length { const char *buffer = buffer_; char16_t *tmp; size_t i = 0, j = 0; @@ -345,7 +345,14 @@ bytesWritten: bytesWritten * 2 errNo: 0]; } @finally { [self freeMemory: tmp]; } + + /* + * We do not count in bytes when writing to the Win32 console. But + * since any incomplete write is an exception here anyway, we can just + * return length. + */ + return length; } @end Index: src/OFStream.h ================================================================== --- src/OFStream.h +++ src/OFStream.h @@ -60,10 +60,22 @@ * success * @return A bool whether the same block should be used for the next read */ typedef bool (^of_stream_async_read_line_block_t)(OFStream *stream, OFString *_Nullable line, id _Nullable exception); + +/*! + * @brief A block which is called when data was written to the stream. + * + * @param stream The stream to which data was written + * @param buffer The buffer which was written to the stream + * @param length The length of the data that bas been written + * @param exception An exception which occurred while reading or `nil` on + * success + */ +typedef void (^of_stream_async_write_block_t)(OFStream *stream, + const void *buffer, size_t length, id _Nullable exception); #endif /*! * @class OFStream OFStream.h ObjFW/OFStream.h * @@ -774,13 +786,55 @@ /*! * @brief Writes from a buffer into the stream. * * @param buffer The buffer from which the data is written into the stream * @param length The length of the data that should be written + * @return The number of bytes written. This can only differ from the specified + * length in non-blocking mode. + */ +- (size_t)writeBuffer: (const void *)buffer + length: (size_t)length; + +#ifdef OF_HAVE_SOCKETS +/*! + * @brief Asynchronously writes a buffer into the stream. + * + * @note The stream must implement @ref fileDescriptorForWriting and return a + * valid file descriptor in order for this to work! + * + * @param buffer The buffer from which the data is written into the stream. The + * buffer needs to be valid until the write request is completed! + * @param length The length of the data that should be written + * @param target The target on which the selector should be called when the data + * has been written. + * @param selector The selector to call on the target. The signature must be + * `void (OFStream *stream, const void *buffer, size_t length, + * id context, id exception)`. + */ +- (void)asyncWriteBuffer: (const void *)buffer + length: (size_t)length + target: (id)target + selector: (SEL)selector + context: (nullable id)context; + +# ifdef OF_HAVE_BLOCKS +/*! + * @brief Asynchronously writes a buffer into the stream. + * + * @note The stream must implement @ref fileDescriptorForWriting and return a + * valid file descriptor in order for this to work! + * + * @param buffer The buffer from which the data is written into the stream. The + * buffer needs to be valid until the write request is completed! + * @param length The length of the data that should be written + * @param block The block to call when the data has been written */ -- (void)writeBuffer: (const void *)buffer - length: (size_t)length; +- (void)asyncWriteBuffer: (const void *)buffer + length: (size_t)length + block: (of_stream_async_write_block_t)block; +# endif +#endif /*! * @brief Writes a uint8_t into the stream. * * @param int8 A uint8_t @@ -1148,13 +1202,14 @@ * @note Override this method with your actual write implementation when * subclassing! * * @param buffer The buffer with the data to write * @param length The length of the data to write + * @return The number of bytes written */ -- (void)lowlevelWriteBuffer: (const void *)buffer - length: (size_t)length; +- (size_t)lowlevelWriteBuffer: (const void *)buffer + length: (size_t)length; /*! * @brief Returns whether the lowlevel is at the end of the stream. * * @warning Do not call this directly! Index: src/OFStream.m ================================================================== --- src/OFStream.m +++ src/OFStream.m @@ -48,10 +48,11 @@ #import "OFNotImplementedException.h" #import "OFOutOfMemoryException.h" #import "OFOutOfRangeException.h" #import "OFSetOptionFailedException.h" #import "OFTruncatedDataException.h" +#import "OFWriteFailedException.h" #import "of_asprintf.h" #define MIN_READ_SIZE 512 @@ -94,12 +95,12 @@ length: (size_t)length { OF_UNRECOGNIZED_SELECTOR } -- (void)lowlevelWriteBuffer: (const void *)buffer - length: (size_t)length +- (size_t)lowlevelWriteBuffer: (const void *)buffer + length: (size_t)length { OF_UNRECOGNIZED_SELECTOR } - copy @@ -1044,23 +1045,62 @@ [self freeMemory: _writeBuffer]; _writeBuffer = NULL; _writeBufferLength = 0; } -- (void)writeBuffer: (const void *)buffer - length: (size_t)length +- (size_t)writeBuffer: (const void *)buffer + length: (size_t)length { - if (!_writeBuffered) - [self lowlevelWriteBuffer: buffer - length: length]; - else { + if (!_writeBuffered) { + size_t bytesWritten = [self lowlevelWriteBuffer: buffer + length: length]; + + if (_blocking && bytesWritten < length) + @throw [OFWriteFailedException + exceptionWithObject: self + requestedLength: length + bytesWritten: bytesWritten + errNo: 0]; + + return bytesWritten; + } else { _writeBuffer = [self resizeMemory: _writeBuffer size: _writeBufferLength + length]; memcpy(_writeBuffer + _writeBufferLength, buffer, length); _writeBufferLength += length; + + return length; } } + +#ifdef OF_HAVE_SOCKETS +- (void)asyncWriteBuffer: (const void *)buffer + length: (size_t)length + target: (id)target + selector: (SEL)selector + context: (id)context +{ + [OFRunLoop of_addAsyncWriteForStream: self + buffer: buffer + length: length + target: target + selector: selector + context: context]; +} + +# ifdef OF_HAVE_BLOCKS +- (void)asyncWriteBuffer: (const void *)buffer + length: (size_t)length + block: (of_stream_async_write_block_t)block +{ + [OFRunLoop of_addAsyncWriteForStream: self + buffer: buffer + length: length + block: block]; +} +# endif +#endif - (void)writeInt8: (uint8_t)int8 { [self writeBuffer: (char *)&int8 length: 1]; Index: src/OFStreamSocket.m ================================================================== --- src/OFStreamSocket.m +++ src/OFStreamSocket.m @@ -85,12 +85,12 @@ _atEndOfStream = true; return ret; } -- (void)lowlevelWriteBuffer: (const void *)buffer - length: (size_t)length +- (size_t)lowlevelWriteBuffer: (const void *)buffer + length: (size_t)length { if (_socket == INVALID_SOCKET) @throw [OFNotOpenException exceptionWithObject: self]; #ifndef OF_WINDOWS @@ -117,15 +117,11 @@ requestedLength: length bytesWritten: 0 errNo: of_socket_errno()]; #endif - if ((size_t)bytesWritten != length) - @throw [OFWriteFailedException exceptionWithObject: self - requestedLength: length - bytesWritten: bytesWritten - errNo: 0]; + return (size_t)bytesWritten; } #ifdef OF_WINDOWS - (void)setBlocking: (bool)enable { Index: src/OFTarArchive.m ================================================================== --- src/OFTarArchive.m +++ src/OFTarArchive.m @@ -405,28 +405,32 @@ [_entry release]; [super dealloc]; } -- (void)lowlevelWriteBuffer: (const void *)buffer - length: (size_t)length +- (size_t)lowlevelWriteBuffer: (const void *)buffer + length: (size_t)length { + size_t bytesWritten; + if (_stream == nil) @throw [OFNotOpenException exceptionWithObject: self]; if ((uint64_t)length > _toWrite) @throw [OFOutOfRangeException exception]; @try { - [_stream writeBuffer: buffer - length: length]; + bytesWritten = [_stream writeBuffer: buffer + length: length]; } @catch (OFWriteFailedException *e) { _toWrite -= [e bytesWritten]; @throw e; } - _toWrite -= length; + _toWrite -= bytesWritten; + + return bytesWritten; } - (bool)lowlevelIsAtEndOfStream { if (_stream == nil) Index: src/OFZIPArchive.m ================================================================== --- src/OFZIPArchive.m +++ src/OFZIPArchive.m @@ -843,22 +843,26 @@ [_entry release]; [super dealloc]; } -- (void)lowlevelWriteBuffer: (const void *)buffer - length: (size_t)length +- (size_t)lowlevelWriteBuffer: (const void *)buffer + length: (size_t)length { + size_t bytesWritten; + if ((sizeof(length) >= sizeof(int64_t) && length > INT64_MAX) || INT64_MAX - _bytesWritten < (int64_t)length) @throw [OFOutOfRangeException exception]; - [_stream writeBuffer: buffer - length: length]; + bytesWritten = [_stream writeBuffer: buffer + length: length]; - _bytesWritten += (int64_t)length; + _bytesWritten += (int64_t)bytesWritten; _CRC32 = of_crc32(_CRC32, buffer, length); + + return bytesWritten; } - (void)close { if (_stream == nil)