Index: src/OFRunLoop.h
==================================================================
--- src/OFRunLoop.h
+++ src/OFRunLoop.h
@@ -53,10 +53,14 @@
+ (void)OF_setMainRunLoop;
#ifdef OF_HAVE_BLOCKS
+ (void)OF_addAsyncReadForStream: (OFStream*)stream
buffer: (void*)buffer
length: (size_t)length
+ block: (of_stream_async_read_block_t)block;
++ (void)OF_addAsyncReadForStream: (OFStream*)stream
+ buffer: (void*)buffer
+ exactLength: (size_t)length
block: (of_stream_async_read_block_t)block;
+ (void)OF_addAsyncReadLineForStream: (OFStream*)stream
encoding: (of_string_encoding_t)encoding
block: (of_stream_async_read_line_block_t)block;
+ (void)OF_addAsyncAcceptForTCPSocket: (OFTCPSocket*)socket
Index: src/OFRunLoop.m
==================================================================
--- src/OFRunLoop.m
+++ src/OFRunLoop.m
@@ -31,63 +31,69 @@
static OFRunLoop *mainRunLoop = nil;
#ifdef OF_HAVE_BLOCKS
@interface OFRunLoop_ReadQueueItem: OFObject
{
+@public
void *buffer;
size_t length;
of_stream_async_read_block_t block;
}
+@end
-@property void *buffer;
-@property size_t length;
-@property (copy) of_stream_async_read_block_t block;
+@interface OFRunLoop_ExactReadQueueItem: OFObject
+{
+@public
+ void *buffer;
+ size_t exactLength, readLength;
+ of_stream_async_read_block_t block;
+}
@end
@interface OFRunLoop_ReadLineQueueItem: OFObject
{
+@public
of_stream_async_read_line_block_t block;
of_string_encoding_t encoding;
}
-
-@property (copy) of_stream_async_read_line_block_t block;
-@property of_string_encoding_t encoding;
@end
@interface OFRunLoop_AcceptQueueItem: OFObject
{
+@public
of_tcpsocket_async_accept_block_t block;
}
-
-@property (copy) of_tcpsocket_async_accept_block_t block;
@end
@implementation OFRunLoop_ReadQueueItem
-@synthesize buffer, length, block;
+- (void)dealloc
+{
+ [block release];
+
+ [super dealloc];
+}
+@end
+@implementation OFRunLoop_ExactReadQueueItem
- (void)dealloc
{
[block release];
[super dealloc];
}
@end
@implementation OFRunLoop_ReadLineQueueItem
-@synthesize block, encoding;
-
- (void)dealloc
{
[block release];
[super dealloc];
}
@end
@implementation OFRunLoop_AcceptQueueItem
-@synthesize block;
-
- (void)dealloc
{
[block release];
[super dealloc];
@@ -113,13 +119,13 @@
objc_autoreleasePoolPop(pool);
}
#ifdef OF_HAVE_BLOCKS
+ (void)OF_addAsyncReadForStream: (OFStream*)stream
- buffer: (void*)buffer
- length: (size_t)length
- block: (of_stream_async_read_block_t)block
+ buffer: (void*)buffer
+ length: (size_t)length
+ block: (of_stream_async_read_block_t)block
{
void *pool = objc_autoreleasePoolPush();
OFRunLoop *runLoop = [self currentRunLoop];
OFList *queue = [runLoop->readQueues objectForKey: stream];
OFRunLoop_ReadQueueItem *queueItem;
@@ -132,13 +138,41 @@
if ([queue count] == 0)
[runLoop->streamObserver addStreamForReading: stream];
queueItem = [[[OFRunLoop_ReadQueueItem alloc] init] autorelease];
- [queueItem setBuffer: buffer];
- [queueItem setLength: length];
- [queueItem setBlock: block];
+ queueItem->buffer = buffer;
+ queueItem->length = length;
+ queueItem->block = [block copy];
+ [queue appendObject: queueItem];
+
+ objc_autoreleasePoolPop(pool);
+}
+
++ (void)OF_addAsyncReadForStream: (OFStream*)stream
+ buffer: (void*)buffer
+ exactLength: (size_t)exactLength
+ block: (of_stream_async_read_block_t)block
+{
+ void *pool = objc_autoreleasePoolPush();
+ OFRunLoop *runLoop = [self currentRunLoop];
+ OFList *queue = [runLoop->readQueues objectForKey: stream];
+ OFRunLoop_ExactReadQueueItem *queueItem;
+
+ if (queue == nil) {
+ queue = [OFList list];
+ [runLoop->readQueues setObject: queue
+ forKey: stream];
+ }
+
+ if ([queue count] == 0)
+ [runLoop->streamObserver addStreamForReading: stream];
+
+ queueItem = [[[OFRunLoop_ExactReadQueueItem alloc] init] autorelease];
+ queueItem->buffer = buffer;
+ queueItem->exactLength = exactLength;
+ queueItem->block = [block copy];
[queue appendObject: queueItem];
objc_autoreleasePoolPop(pool);
}
@@ -159,12 +193,12 @@
if ([queue count] == 0)
[runLoop->streamObserver addStreamForReading: stream];
queueItem = [[[OFRunLoop_ReadLineQueueItem alloc] init] autorelease];
- [queueItem setBlock: block];
- [queueItem setEncoding: encoding];
+ queueItem->block = [block copy];
+ queueItem->encoding = encoding;
[queue appendObject: queueItem];
objc_autoreleasePoolPop(pool);
}
@@ -184,11 +218,11 @@
if ([queue count] == 0)
[runLoop->streamObserver addStreamForReading: socket];
queueItem = [[[OFRunLoop_AcceptQueueItem alloc] init] autorelease];
- [queueItem setBlock: block];
+ queueItem->block = [block copy];
[queue appendObject: queueItem];
objc_autoreleasePoolPop(pool);
}
#endif
@@ -240,31 +274,55 @@
listObject = [queue firstListObject];
if ([listObject->object isKindOfClass:
[OFRunLoop_ReadQueueItem class]]) {
OFRunLoop_ReadQueueItem *queueItem = listObject->object;
- void *buffer = [queueItem buffer];
- size_t length = [stream readIntoBuffer: buffer
- length: [queueItem length]];
+ size_t length = [stream readIntoBuffer: queueItem->buffer
+ length: queueItem->length];
- if (![queueItem block](stream, buffer, length)) {
+ if (!queueItem->block(stream, queueItem->buffer, length)) {
[queue removeListObject: listObject];
if ([queue count] == 0) {
[streamObserver removeStreamForReading: stream];
[readQueues removeObjectForKey: stream];
}
+ }
+ } else if ([listObject->object isKindOfClass:
+ [OFRunLoop_ExactReadQueueItem class]]) {
+ OFRunLoop_ExactReadQueueItem *queueItem = listObject->object;
+ size_t length = [stream
+ readIntoBuffer: (char*)queueItem->buffer +
+ queueItem->readLength
+ length: queueItem->exactLength -
+ queueItem->readLength];
+
+ queueItem->readLength += length;
+ if (queueItem->readLength == queueItem->exactLength ||
+ [stream isAtEndOfStream]) {
+ if (queueItem->block(stream, queueItem->buffer,
+ queueItem->readLength))
+ queueItem->readLength = 0;
+ else {
+ [queue removeListObject: listObject];
+
+ if ([queue count] == 0) {
+ [streamObserver
+ removeStreamForReading: stream];
+ [readQueues removeObjectForKey: stream];
+ }
+ }
}
} else if ([listObject->object isKindOfClass:
[OFRunLoop_ReadLineQueueItem class]]) {
OFRunLoop_ReadLineQueueItem *queueItem = listObject->object;
OFString *line;
- line = [stream tryReadLineWithEncoding: [queueItem encoding]];
+ line = [stream tryReadLineWithEncoding: queueItem->encoding];
if (line != nil || [stream isAtEndOfStream]) {
- if (![queueItem block](stream, line)) {
+ if (!queueItem->block(stream, line)) {
[queue removeListObject: listObject];
if ([queue count] == 0) {
[streamObserver
removeStreamForReading: stream];
@@ -275,11 +333,11 @@
} else if ([listObject->object isKindOfClass:
[OFRunLoop_AcceptQueueItem class]]) {
OFRunLoop_AcceptQueueItem *queueItem = listObject->object;
OFTCPSocket *newSocket = [(OFTCPSocket*)stream accept];
- if (![queueItem block]((OFTCPSocket*)stream, newSocket)) {
+ if (!queueItem->block((OFTCPSocket*)stream, newSocket)) {
[queue removeListObject: listObject];
if ([queue count] == 0) {
[streamObserver removeStreamForReading: stream];
[readQueues removeObjectForKey: stream];
Index: src/OFStream.h
==================================================================
--- src/OFStream.h
+++ src/OFStream.h
@@ -76,44 +76,22 @@
/**
* \brief Reads at most size bytes from the stream into a buffer.
*
* On network streams, this might read less than the specified number of bytes.
* If you want to read exactly the specified number of bytes, use
- * -[readIntoBuffer:exactLength:].
+ * -readIntoBuffer:exactLength:. Note that a read can even return 0 bytes -
+ * this does not necessarily mean that the stream ended, so you still need to
+ * check isAtEndOfStream.
*
* \param buffer The buffer into which the data is read
* \param length The length of the data that should be read at most.
* The buffer must be at least this big!
* \return The number of bytes read
*/
- (size_t)readIntoBuffer: (void*)buffer
length: (size_t)size;
-#ifdef OF_HAVE_BLOCKS
-/**
- * \brief Asyncronously reads at most size bytes from the stream into a
- * buffer.
- *
- * On network streams, this might read less than the specified number of bytes.
- * If you want to read exactly the specified number of bytes, use
- * -[readIntoBuffer:exactLength:].
- *
- * \param buffer The buffer into which the data is read.
- * The buffer must not be free'd before the async read completed!
- * \param length The length of the data that should be read at most.
- * The buffer must be at least this big!
- * \param block The block to call when the data has been received.
- * If the block returns YES, it will be called again with the same
- * buffer and maximum length when more data has been received. If
- * you want the next block in the queue to handle the data
- * received next, you need to return NO from the block.
- */
-- (void)asyncReadWithBuffer: (void*)buffer
- length: (size_t)length
- block: (of_stream_async_read_block_t)block;
-#endif
-
/**
* \brief Reads exactly the specified length bytes from the stream into a
* buffer.
*
* Unlike readIntoBuffer:length:, this method does not return when less than the
@@ -123,14 +101,61 @@
* \warning Only call this when you know that specified amount of data is
* available! Otherwise you will get an exception!
*
* \param buffer The buffer into which the data is read
* \param length The length of the data that should be read.
- * The buffer must be exactly this big!
+ * The buffer must be exactly this big!
*/
- (void)readIntoBuffer: (void*)buffer
exactLength: (size_t)length;
+
+#ifdef OF_HAVE_BLOCKS
+/**
+ * \brief Asyncronously reads at most size bytes from the stream into a
+ * buffer.
+ *
+ * On network streams, this might read less than the specified number of bytes.
+ * If you want to read exactly the specified number of bytes, use
+ * asyncReadIntoBuffer:exactLength:block:. Note that a read can even return 0
+ * bytes - this does not necessarily mean that the stream ended, so you still
+ * need to check isAtEndOfStream.
+ *
+ * \param buffer The buffer into which the data is read.
+ * The buffer must not be free'd before the async read completed!
+ * \param length The length of the data that should be read at most.
+ * The buffer must be at least this big!
+ * \param block The block to call when the data has been received.
+ * If the block returns YES, it will be called again with the same
+ * buffer and maximum length when more data has been received. If
+ * you want the next block in the queue to handle the data
+ * received next, you need to return NO from the block.
+ */
+- (void)asyncReadIntoBuffer: (void*)buffer
+ length: (size_t)length
+ block: (of_stream_async_read_block_t)block;
+
+/**
+ * \brief Asyncronously reads exactly the specified length bytes from the
+ * stream into a buffer.
+ *
+ * Unlike asyncReadIntoBuffer:length:block, this method does not invoke the
+ * block when less than the specified length has been read - instead, it waits
+ * until it got exactly the specified length or the stream has ended.
+ *
+ * \param buffer The buffer into which the data is read
+ * \param length The length of the data that should be read.
+ * The buffer must be exactly this big!
+ * \param block The block to call when the data has been received.
+ * If the block returns YES, it will be called again with the same
+ * buffer and exact length when more data has been received. If
+ * you want the next block in the queue to handle the data
+ * received next, you need to return NO from the block.
+ */
+ - (void)asyncReadIntoBuffer: (void*)buffer
+ exactLength: (size_t)length
+ block: (of_stream_async_read_block_t)block;
+#endif
/**
* \brief Reads a uint8_t from the stream.
*
* \warning Only call this when you know that enough data is available!
Index: src/OFStream.m
==================================================================
--- src/OFStream.m
+++ src/OFStream.m
@@ -132,29 +132,41 @@
return length;
}
}
-- (void)asyncReadWithBuffer: (void*)buffer
+- (void)readIntoBuffer: (void*)buffer
+ exactLength: (size_t)length
+{
+ size_t readLength = 0;
+
+ while (readLength < length)
+ readLength += [self readIntoBuffer: (char*)buffer + readLength
+ length: length - readLength];
+}
+
+#ifdef OF_HAVE_BLOCKS
+- (void)asyncReadIntoBuffer: (void*)buffer
length: (size_t)length
block: (of_stream_async_read_block_t)block
{
[OFRunLoop OF_addAsyncReadForStream: self
buffer: buffer
length: length
block: block];
}
-- (void)readIntoBuffer: (void*)buffer
- exactLength: (size_t)length
-{
- size_t readLength = 0;
-
- while (readLength < length)
- readLength += [self readIntoBuffer: (char*)buffer + readLength
- length: length - readLength];
-}
+- (void)asyncReadIntoBuffer: (void*)buffer
+ exactLength: (size_t)length
+ block: (of_stream_async_read_block_t)block
+{
+ [OFRunLoop OF_addAsyncReadForStream: self
+ buffer: buffer
+ exactLength: length
+ block: block];
+}
+#endif
- (uint8_t)readInt8
{
uint8_t ret;
Index: src/OFStreamObserver.m
==================================================================
--- src/OFStreamObserver.m
+++ src/OFStreamObserver.m
@@ -382,11 +382,10 @@
size_t i, count = [readStreams count];
BOOL foundInCache = NO;
for (i = 0; i < count; i++) {
-
if ([objects[i] pendingBytes] > 0 &&
![objects[i] OF_isWaitingForDelimiter]) {
void *pool = objc_autoreleasePoolPush();
[delegate streamIsReadyForReading: objects[i]];
foundInCache = YES;