ObjFW  Check-in [a2b309b38a]

Overview
Comment:Implement async reading (into buffers and lines).
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: a2b309b38a165ef48f8c5b801f4bdc6347fb0a8e0ba925ff1098c2a90d8e6181
User & Date: js on 2012-09-14 05:20:07
Other Links: manifest | tags
Context
2012-09-14
05:24
Create a pool for -[applicationDidFinishLaunching] check-in: bba061e4b7 user: js tags: trunk
05:20
Implement async reading (into buffers and lines). check-in: a2b309b38a user: js tags: trunk
2012-09-12
17:27
Split -[OFStream fileDescriptor]. check-in: 440e95fd4a user: js tags: trunk
Changes

Modified src/OFRunLoop.h from [00fc6ba68c] to [2b4f887c3c].

11
12
13
14
15
16
17


18
19
20
21
22
23
24
25
26



27
28
29

30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46









47
48
49
50
51
52
53
54
55
56
57
58
59
 * Alternatively, it may be distributed under the terms of the GNU General
 * Public License, either version 2 or 3, which can be found in the file
 * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this
 * file.
 */

#import "OFObject.h"



@class OFSortedList;
@class OFStreamObserver;
@class OFTimer;

/**
 * \brief A class providing a run loop for the application and its processes.
 */
@interface OFRunLoop: OFObject



{
	OFSortedList *timersQueue;
	OFStreamObserver *streamObserver;

}

/**
 * \brief Returns the main run loop.
 *
 * \return The main run loop
 */
+ (OFRunLoop*)mainRunLoop;

/**
 * \brief Returns the run loop for the current thread.
 *
 * \return The run loop for the current thread
 */
+ (OFRunLoop*)currentRunLoop;

+ (void)_setMainRunLoop: (OFRunLoop*)mainRunLoop;










/**
 * \brief Adds an OFTimer to the run loop.
 *
 * \param timer The timer to add
 */
- (void)addTimer: (OFTimer*)timer;

/**
 * \brief Starts the run loop.
 */
- (void)run;
@end







>
>


|
|





>
>
>



>

















>
>
>
>
>
>
>
>
>













11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
 * Alternatively, it may be distributed under the terms of the GNU General
 * Public License, either version 2 or 3, which can be found in the file
 * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this
 * file.
 */

#import "OFObject.h"
#import "OFStream.h"
#import "OFStreamObserver.h"

@class OFSortedList;
@class OFTimer;
@class OFMutableDictionary;

/**
 * \brief A class providing a run loop for the application and its processes.
 */
@interface OFRunLoop: OFObject
#ifdef OF_RUNLOOP_M
    <OFStreamObserverDelegate>
#endif
{
	OFSortedList *timersQueue;
	OFStreamObserver *streamObserver;
	OFMutableDictionary *readQueues;
}

/**
 * \brief Returns the main run loop.
 *
 * \return The main run loop
 */
+ (OFRunLoop*)mainRunLoop;

/**
 * \brief Returns the run loop for the current thread.
 *
 * \return The run loop for the current thread
 */
+ (OFRunLoop*)currentRunLoop;

+ (void)_setMainRunLoop: (OFRunLoop*)mainRunLoop;
#ifdef OF_HAVE_BLOCKS
+ (void)_addAsyncReadForStream: (OFStream*)stream
			buffer: (void*)buffer
			length: (size_t)length
			 block: (of_stream_async_read_block_t)block;
+ (void)_addAsyncReadLineForStream: (OFStream*)stream
			  encoding: (of_string_encoding_t)encoding
			     block: (of_stream_async_read_line_block_t)block;
#endif

/**
 * \brief Adds an OFTimer to the run loop.
 *
 * \param timer The timer to add
 */
- (void)addTimer: (OFTimer*)timer;

/**
 * \brief Starts the run loop.
 */
- (void)run;
@end

Modified src/OFRunLoop.m from [e04d2c2152] to [0049670d02].

11
12
13
14
15
16
17
18


19

20
21
22
23

24
25
26
27














































28
29
30
31
32
33
34
 * Alternatively, it may be distributed under the terms of the GNU General
 * Public License, either version 2 or 3, which can be found in the file
 * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this
 * file.
 */

#include "config.h"



#import "OFRunLoop.h"

#import "OFThread.h"
#import "OFSortedList.h"
#import "OFTimer.h"
#import "OFDate.h"

#import "OFStreamObserver.h"

static OFTLSKey *currentRunLoopKey;
static OFRunLoop *mainRunLoop;















































@implementation OFRunLoop
+ (void)initialize
{
	if (self == [OFRunLoop class])
		currentRunLoopKey = [[OFTLSKey alloc] init];
}








>
>

>




>
|



>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
 * Alternatively, it may be distributed under the terms of the GNU General
 * Public License, either version 2 or 3, which can be found in the file
 * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this
 * file.
 */

#include "config.h"

#define OF_RUNLOOP_M

#import "OFRunLoop.h"
#import "OFDictionary.h"
#import "OFThread.h"
#import "OFSortedList.h"
#import "OFTimer.h"
#import "OFDate.h"

#import "macros.h"

static OFTLSKey *currentRunLoopKey;
static OFRunLoop *mainRunLoop;

#ifdef OF_HAVE_BLOCKS
@interface OFRunLoop_ReadQueueItem: OFObject
{
	void *buffer;
	size_t length;
	of_stream_async_read_block_t block;
}

@property void *buffer;
@property size_t length;
@property (copy) of_stream_async_read_block_t block;
@end

@interface OFRunLoop_ReadLineQueueItem: OFObject
{
	of_stream_async_read_line_block_t block;
	of_string_encoding_t encoding;
}

@property (copy) of_stream_async_read_line_block_t block;
@property of_string_encoding_t encoding;
@end

@implementation OFRunLoop_ReadQueueItem
@synthesize buffer, length, block;

- (void)dealloc
{
	[block release];

	[super dealloc];
}
@end

@implementation OFRunLoop_ReadLineQueueItem
@synthesize block, encoding;

- (void)dealloc
{
	[block release];

	[super dealloc];
}
@end
#endif

@implementation OFRunLoop
+ (void)initialize
{
	if (self == [OFRunLoop class])
		currentRunLoopKey = [[OFTLSKey alloc] init];
}
45
46
47
48
49
50
51
























































52
53
54
55
56
57
58
59

60



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

78
79
80
81
82
83
84
85
86
87
88












































89
90
91
92
93
94
95
}

+ (void)_setMainRunLoop: (OFRunLoop*)mainRunLoop_
{
	mainRunLoop = [mainRunLoop_ retain];
}

























































- init
{
	self = [super init];

	@try {
		void *pool = objc_autoreleasePoolPush();

		timersQueue = [[[OFThread currentThread] _timersQueue] retain];

		streamObserver = [[OFStreamObserver alloc] init];




		[OFThread setObject: self
			  forTLSKey: currentRunLoopKey];

		objc_autoreleasePoolPop(pool);
	} @catch (id e) {
		[self release];
		@throw e;
	}

	return self;
}

- (void)dealloc
{
	[timersQueue release];
	[streamObserver release];


	[super dealloc];
}

- (void)addTimer: (OFTimer*)timer
{
	@synchronized (timersQueue) {
		[timersQueue addObject: timer];
	}
	[streamObserver cancel];
}













































- (void)run
{
	for (;;) {
		void *pool = objc_autoreleasePoolPush();
		OFDate *now = [OFDate date];
		OFTimer *timer;







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>








>

>
>
>

















>











>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
}

+ (void)_setMainRunLoop: (OFRunLoop*)mainRunLoop_
{
	mainRunLoop = [mainRunLoop_ retain];
}

#ifdef OF_HAVE_BLOCKS
+ (void)_addAsyncReadForStream: (OFStream*)stream
			buffer: (void*)buffer
			length: (size_t)length
			 block: (of_stream_async_read_block_t)block
{
	void *pool = objc_autoreleasePoolPush();
	OFRunLoop *runLoop = [self currentRunLoop];
	OFList *queue = [runLoop->readQueues objectForKey: stream];
	OFRunLoop_ReadQueueItem *queueItem;

	if (queue == nil) {
		queue = [OFList list];
		[runLoop->readQueues setObject: queue
					forKey: stream];
	}

	if ([queue count] == 0)
		[runLoop->streamObserver addStreamForReading: stream];

	queueItem = [[[OFRunLoop_ReadQueueItem alloc] init] autorelease];
	[queueItem setBuffer: buffer];
	[queueItem setLength: length];
	[queueItem setBlock: block];
	[queue appendObject: queueItem];

	objc_autoreleasePoolPop(pool);
}

+ (void)_addAsyncReadLineForStream: (OFStream*)stream
			  encoding: (of_string_encoding_t)encoding
			     block: (of_stream_async_read_line_block_t)block
{
	void *pool = objc_autoreleasePoolPush();
	OFRunLoop *runLoop = [self currentRunLoop];
	OFList *queue = [runLoop->readQueues objectForKey: stream];
	OFRunLoop_ReadLineQueueItem *queueItem;

	if (queue == nil) {
		queue = [OFList list];
		[runLoop->readQueues setObject: queue
					forKey: stream];
	}

	if ([queue count] == 0)
		[runLoop->streamObserver addStreamForReading: stream];

	queueItem = [[[OFRunLoop_ReadLineQueueItem alloc] init] autorelease];
	[queueItem setBlock: block];
	[queueItem setEncoding: encoding];
	[queue appendObject: queueItem];

	objc_autoreleasePoolPop(pool);
}
#endif

- init
{
	self = [super init];

	@try {
		void *pool = objc_autoreleasePoolPush();

		timersQueue = [[[OFThread currentThread] _timersQueue] retain];

		streamObserver = [[OFStreamObserver alloc] init];
		[streamObserver setDelegate: self];

		readQueues = [[OFMutableDictionary alloc] init];

		[OFThread setObject: self
			  forTLSKey: currentRunLoopKey];

		objc_autoreleasePoolPop(pool);
	} @catch (id e) {
		[self release];
		@throw e;
	}

	return self;
}

- (void)dealloc
{
	[timersQueue release];
	[streamObserver release];
	[readQueues release];

	[super dealloc];
}

- (void)addTimer: (OFTimer*)timer
{
	@synchronized (timersQueue) {
		[timersQueue addObject: timer];
	}
	[streamObserver cancel];
}

#ifdef OF_HAVE_BLOCKS
- (void)streamIsReadyForReading: (OFStream*)stream
{
	OFList *queue = [readQueues objectForKey: stream];
	of_list_object_t *listObject;

	OF_ENSURE(queue != nil);

	listObject = [queue firstListObject];

	if ([listObject->object isKindOfClass:
	    [OFRunLoop_ReadQueueItem class]]) {
		OFRunLoop_ReadQueueItem *queueItem = listObject->object;
		void *buffer = [queueItem buffer];
		size_t length = [stream readIntoBuffer: buffer
						length: [queueItem length]];

		if (![queueItem block](stream, buffer, length)) {
			[queue removeListObject: listObject];

			if ([queue count] == 0)
				[streamObserver removeStreamForReading: stream];
		}
	} else if ([listObject->object isKindOfClass:
	    [OFRunLoop_ReadLineQueueItem class]]) {
		OFRunLoop_ReadLineQueueItem *queueItem = listObject->object;
		OFString *line;

		line = [stream tryReadLineWithEncoding: [queueItem encoding]];

		if (line != nil || [stream isAtEndOfStream]) {
			if (![queueItem block](stream, line)) {
				[queue removeListObject: listObject];

				if ([queue count] == 0)
					[streamObserver
					    removeStreamForReading: stream];
			}
		}
	} else
		OF_ENSURE(0);
}
#endif

- (void)run
{
	for (;;) {
		void *pool = objc_autoreleasePoolPush();
		OFDate *now = [OFDate date];
		OFTimer *timer;

Modified src/OFStream.h from [fb448f8379] to [d3c9a265b1].

22
23
24
25
26
27
28

29





30
31
32
33
34
35
36
#endif

#include <stdarg.h>

#import "OFObject.h"
#import "OFString.h"


@class OFDataArray;






/**
 * \brief A base class for different types of streams.
 *
 * \warning Even though the OFCopying protocol is implemented, it does
 *	    <i>not</i> return an independent copy of the stream but instead
 *	    retains it.  This is so that the stream can be used as a key for a







>

>
>
>
>
>







22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#endif

#include <stdarg.h>

#import "OFObject.h"
#import "OFString.h"

@class OFStream;
@class OFDataArray;

#ifdef OF_HAVE_BLOCKS
typedef BOOL (^of_stream_async_read_block_t)(OFStream*, void*, size_t);
typedef BOOL (^of_stream_async_read_line_block_t)(OFStream*, OFString*);
#endif

/**
 * \brief A base class for different types of streams.
 *
 * \warning Even though the OFCopying protocol is implemented, it does
 *	    <i>not</i> return an independent copy of the stream but instead
 *	    retains it.  This is so that the stream can be used as a key for a
77
78
79
80
81
82
83
























84
85
86
87
88
89
90
 * \param buffer The buffer into which the data is read
 * \param length The length of the data that should be read at most.
 *		 The buffer <i>must</i> be at least this big!
 * \return The number of bytes read
 */
- (size_t)readIntoBuffer: (void*)buffer
		  length: (size_t)size;

























/**
 * \brief Reads exactly the specified length bytes from the stream into a
 *	  buffer.
 *
 * Unlike readIntoBuffer:length:, this method does not return when less than the
 * specified length has been read - instead, it waits until it got exactly the







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
 * \param buffer The buffer into which the data is read
 * \param length The length of the data that should be read at most.
 *		 The buffer <i>must</i> be at least this big!
 * \return The number of bytes read
 */
- (size_t)readIntoBuffer: (void*)buffer
		  length: (size_t)size;

#ifdef OF_HAVE_BLOCKS
/**
 * \brief Asyncronously reads <i>at most</i> size bytes from the stream into a
 *	  buffer.
 *
 * On network streams, this might read less than the specified number of bytes.
 * If you want to read exactly the specified number of bytes, use
 * -[readIntoBuffer:exactLength:].
 *
 * \param buffer The buffer into which the data is read.
 *		 The buffer must not be free'd before the async read completed!
 * \param length The length of the data that should be read at most.
 *		 The buffer <i>must</i> be at least this big!
 * \param block The block to call when the data has been received.
 *		If the block returns YES, it will be called again with the same
 *		buffer and maximum length when more data has been received. If
 *		you want the next block in the queue to handle the data
 *		received next, you need to return NO from the block.
 */
- (void)asyncReadWithBuffer: (void*)buffer
		     length: (size_t)length
		      block: (of_stream_async_read_block_t)block;
#endif

/**
 * \brief Reads exactly the specified length bytes from the stream into a
 *	  buffer.
 *
 * Unlike readIntoBuffer:length:, this method does not return when less than the
 * specified length has been read - instead, it waits until it got exactly the
441
442
443
444
445
446
447

























448
449
450
451
452
453
454
 *	  stream occurs.
 *
 * \param encoding The encoding used by the stream
 * \return The line that was read, autoreleased, or nil if the end of the
 *	   stream has been reached.
 */
- (OFString*)readLineWithEncoding: (of_string_encoding_t)encoding;


























/**
 * \brief Tries to read a line from the stream (see readLine) and returns nil if
 *	  no complete line has been received yet.
 *
 * \return The line that was read, autoreleased, or nil if the line is not
 *	   complete yet







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
 *	  stream occurs.
 *
 * \param encoding The encoding used by the stream
 * \return The line that was read, autoreleased, or nil if the end of the
 *	   stream has been reached.
 */
- (OFString*)readLineWithEncoding: (of_string_encoding_t)encoding;

#ifdef OF_HAVE_BLOCKS
/**
 * \brief Asyncronously reads until a newline, \\0 or end of stream occurs.
 *
 * \param block The block to call when the data has been received.
 *		If the block returns YES, it will be called again when the next
 *		line has been received. If you want the next block in the queue
 *		to handle the next line, you need to return NO from the block.
 */
- (void)asyncReadLineWithBlock: (of_stream_async_read_line_block_t)block;

/**
 * \brief Asyncronously reads with the specified encoding until a newline, \\0
 *	  or end of stream occurs.
 *
 * \param encoding The encoding used by the stream
 * \param block The block to call when the data has been received.
 *		If the block returns YES, it will be called again when the next
 *		line has been received. If you want the next block in the queue
 *		to handle the next line, you need to return NO from the block.
 */
- (void)asyncReadLineWithEncoding: (of_string_encoding_t)encoding
			    block: (of_stream_async_read_line_block_t)block;
#endif

/**
 * \brief Tries to read a line from the stream (see readLine) and returns nil if
 *	  no complete line has been received yet.
 *
 * \return The line that was read, autoreleased, or nil if the line is not
 *	   complete yet

Modified src/OFStream.m from [6992f1ccb5] to [f58d32520c].

30
31
32
33
34
35
36

37
38
39
40
41
42
43
#ifndef _WIN32
# include <signal.h>
#endif

#import "OFStream.h"
#import "OFString.h"
#import "OFDataArray.h"


#import "OFInvalidArgumentException.h"
#import "OFInvalidFormatException.h"
#import "OFNotImplementedException.h"
#import "OFSetOptionFailedException.h"

#import "macros.h"







>







30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#ifndef _WIN32
# include <signal.h>
#endif

#import "OFStream.h"
#import "OFString.h"
#import "OFDataArray.h"
#import "OFRunLoop.h"

#import "OFInvalidArgumentException.h"
#import "OFInvalidFormatException.h"
#import "OFNotImplementedException.h"
#import "OFSetOptionFailedException.h"

#import "macros.h"
128
129
130
131
132
133
134










135
136
137
138
139
140
141
		[self freeMemory: cache];
		cache = tmp;
		cacheLength -= length;

		return length;
	}
}











- (void)readIntoBuffer: (void*)buffer
	   exactLength: (size_t)length
{
	size_t readLength = 0;

	while (readLength < length)







>
>
>
>
>
>
>
>
>
>







129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
		[self freeMemory: cache];
		cache = tmp;
		cacheLength -= length;

		return length;
	}
}

- (void)asyncReadWithBuffer: (void*)buffer
		     length: (size_t)length
		      block: (of_stream_async_read_block_t)block
{
	[OFRunLoop _addAsyncReadForStream: self
				   buffer: buffer
				   length: length
				    block: block];
}

- (void)readIntoBuffer: (void*)buffer
	   exactLength: (size_t)length
{
	size_t readLength = 0;

	while (readLength < length)
668
669
670
671
672
673
674
















675
676
677
678
679
680
681

	while ((line = [self tryReadLineWithEncoding: encoding]) == nil)
		if ([self isAtEndOfStream])
			return nil;

	return line;
}

















- (OFString*)tryReadLine
{
	return [self tryReadLineWithEncoding: OF_STRING_ENCODING_UTF_8];
}

- (OFString*)tryReadTillDelimiter: (OFString*)delimiter







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708

	while ((line = [self tryReadLineWithEncoding: encoding]) == nil)
		if ([self isAtEndOfStream])
			return nil;

	return line;
}

#ifdef OF_HAVE_BLOCKS
- (void)asyncReadLineWithBlock: (of_stream_async_read_line_block_t)block
{
	return [self asyncReadLineWithEncoding: OF_STRING_ENCODING_UTF_8
					 block: block];
}

- (void)asyncReadLineWithEncoding: (of_string_encoding_t)encoding
			    block: (of_stream_async_read_line_block_t)block
{
	[OFRunLoop _addAsyncReadLineForStream: self
				     encoding: encoding
					block: block];
}
#endif

- (OFString*)tryReadLine
{
	return [self tryReadLineWithEncoding: OF_STRING_ENCODING_UTF_8];
}

- (OFString*)tryReadTillDelimiter: (OFString*)delimiter