ObjFW  Check-in [35aab77af3]

Overview
Comment:Make OFStreamObserver thread-safe.

It does not cancel the currently running -[observe] yet when changing
the set of streams to observe.

Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: 35aab77af3b7af7684b677a3a16b39b09274c3cc1ffb66a0824abb4a19bd6b81
User & Date: js on 2011-04-01 16:54:57
Other Links: manifest | tags
Context
2011-04-01
17:10
Call -[releaseObjects] after calling the delegate. check-in: 61cad3ee76 user: js tags: trunk
16:54
Make OFStreamObserver thread-safe. check-in: 35aab77af3 user: js tags: trunk
16:06
Check nfds against OPEN_MAX before calling poll. check-in: 52102a2906 user: js tags: trunk
Changes

Modified src/OFStreamObserver.h from [7555d40ac1] to [abbf9520ca].

73
74
75
76
77
78
79

80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88



89

90
91
92
93
94
95
96







+








-
-
-

-







 *
 * Note: Currently, Win32 can only observe sockets and not files!
 */
@interface OFStreamObserver: OFObject
{
	OFMutableArray *readStreams;
	OFMutableArray *writeStreams;
	OFMutableArray *queue, *queueInfo;
	id <OFStreamObserverDelegate> delegate;
#ifdef OF_HAVE_POLL
	OFDataArray *fds;
	OFMutableDictionary *fdToStream;
#else
	fd_set readfds;
	fd_set writefds;
	fd_set exceptfds;
# ifndef _WIN32
	nfds_t nfds;
# else
	int nfds;
# endif
#endif
}

#ifdef OF_HAVE_PROPERTIES
@property (retain) id <OFStreamObserverDelegate> delegate;
#endif

116
117
118
119
120
121
122
123

124
125
126
127
128
129
130
131
132

133
134
135
136
137
138
139
113
114
115
116
117
118
119

120
121
122
123
124
125
126
127
128

129
130
131
132
133
134
135
136







-
+








-
+








/**
 * Adds a stream to observe for reading.
 * 
 * This is also used to observe a listening socket for incoming connections,
 * which then triggers a read event for the observed stream.
 *
 * It is recommended that the stream you add it set to non-blocking mode.
 * It is recommended that the stream you add is set to non-blocking mode.
 *
 * \param stream The stream to observe for reading
 */
- (void)addStreamToObserveForReading: (OFStream*)stream;

/**
 * Adds a stream to observe for writing.
 *
 * It is recommended that the stream you add it set to non-blocking mode.
 * It is recommended that the stream you add is set to non-blocking mode.
 *
 * \param stream The stream to observe for writing
 */
- (void)addStreamToObserveForWriting: (OFStream*)stream;

/**
 * Removes a stream to observe for reading.

Modified src/OFStreamObserver.m from [3eaeff6179] to [bf14750714].

16
17
18
19
20
21
22


23
24
25
26
27
28
29
30
31
32
33
34
35







36
37
38
39
40
41
42
43
44
45
46
47
48
49


50
51
52
53
54
55
56
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67







+
+













+
+
+
+
+
+
+














+
+








#include "config.h"

#define OF_STREAM_OBSERVER_M

#include <string.h>

#include <assert.h>

#ifdef OF_HAVE_POLL
# include <poll.h>
#endif

#import "OFStreamObserver.h"
#import "OFDataArray.h"
#import "OFArray.h"
#import "OFDictionary.h"
#import "OFStream.h"
#import "OFNumber.h"
#import "OFAutoreleasePool.h"

#import "OFOutOfRangeException.h"

enum {
	QUEUE_ADD = 0,
	QUEUE_REMOVE = 1,
	QUEUE_READ = 0,
	QUEUE_WRITE = 2
};

@implementation OFStreamObserver
+ observer
{
	return [[[self alloc] init] autorelease];
}

- init
{
	self = [super init];

	@try {
		readStreams = [[OFMutableArray alloc] init];
		writeStreams = [[OFMutableArray alloc] init];
		queue = [[OFMutableArray alloc] init];
		queueInfo = [[OFMutableArray alloc] init];
#ifdef OF_HAVE_POLL
		fds = [[OFDataArray alloc] initWithItemSize:
		    sizeof(struct pollfd)];
		fdToStream = [[OFMutableDictionary alloc] init];
#else
		FD_ZERO(&readfds);
		FD_ZERO(&writefds);
64
65
66
67
68
69
70


71
72
73
74
75
76
77
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90







+
+







}

- (void)dealloc
{
	[(id)delegate release];
	[readStreams release];
	[writeStreams release];
	[queue release];
	[queueInfo release];
#ifdef OF_HAVE_POLL
	[fdToStream release];
	[fds release];
#endif

	[super dealloc];
}
173
174
175
176
177
178
179




180
181
182




183
184
185

186
187
188
189
190
191
192




193
194
195




196
197
198

199
200
201
202
203
204
205


206
207

































208





















209
210


211
212
213
214



215
216
217

218
219
220
221



222
223
224


225
226
227
228



229









230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245

246
247
248
249
250
251


252
253
254
255
256
257
258
186
187
188
189
190
191
192
193
194
195
196



197
198
199
200



201



202
203
204
205
206
207
208
209



210
211
212
213



214



215
216
217
218
219
220


221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275


276
277
278



279
280
281
282


283




284
285
286
287


288
289
290



291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318

319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334







+
+
+
+
-
-
-
+
+
+
+
-
-
-
+
-
-
-




+
+
+
+
-
-
-
+
+
+
+
-
-
-
+
-
-
-




+
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+

-
-
-
+
+
+

-
-
+
-
-
-
-
+
+
+

-
-
+
+

-
-
-
+
+
+

+
+
+
+
+
+
+
+
+















-
+






+
+







	if (!FD_ISSET(fd, other_fdset))
		FD_CLR(fd, &exceptfds);
}
#endif

- (void)addStreamToObserveForReading: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_READ];

	@synchronized (queue) {
	[readStreams addObject: stream];

#ifdef OF_HAVE_POLL
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}

	[self _addStream: stream
	      withEvents: POLLIN];
#else
	[pool release];
	[self _addStream: stream
	       withFDSet: &readfds];
#endif
}

- (void)addStreamToObserveForWriting: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_WRITE];

	@synchronized (queue) {
	[writeStreams addObject: stream];

#ifdef OF_HAVE_POLL
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}

	[self _addStream: stream
	      withEvents: POLLOUT];
#else
	[pool release];
	[self _addStream: stream
	       withFDSet: &writefds];
#endif
}

- (void)removeStreamToObserveForReading: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_READ];
	[readStreams removeObjectIdenticalTo: stream];


	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}

	[pool release];
}

- (void)removeStreamToObserveForWriting: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_WRITE];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}

	[pool release];
}

- (void)_processQueue
{
	@synchronized (queue) {
		OFStream **queue_c = [queue cArray];
		OFNumber **queueInfo_c = [queueInfo cArray];
		size_t i, count = [queue count];

		for (i = 0; i < count; i++) {
			switch ([queueInfo_c[i] intValue]) {
			case QUEUE_ADD | QUEUE_READ:
				[readStreams addObject: queue_c[i]];
#ifdef OF_HAVE_POLL
				[self _addStream: queue_c[i]
				      withEvents: POLLIN];
#else
				[self _addStream: queue_c[i]
				       withFDSet: &readfds];
#endif
				break;
			case QUEUE_ADD | QUEUE_WRITE:
				[writeStreams addObject: queue_c[i]];
#ifdef OF_HAVE_POLL
				[self _addStream: queue_c[i]
				      withEvents: POLLOUT];
#else
				[self _addStream: queue_c[i]
				       withFDSet: &writefds];
#endif
				break;
			case QUEUE_REMOVE | QUEUE_READ:
				[readStreams removeObjectIdenticalTo:
				    queue_c[i]];
#ifdef OF_HAVE_POLL
	[self _removeStream: stream
		 withEvents: POLLIN];
				[self _removeStream: queue_c[i]
					 withEvents: POLLIN];
#else
	[self _removeStream: stream
		  withFDSet: &readfds
		 otherFDSet: &writefds];
				[self _removeStream: queue_c[i]
					  withFDSet: &readfds
					 otherFDSet: &writefds];
#endif
}

				break;
- (void)removeStreamToObserveForWriting: (OFStream*)stream
{
	[writeStreams removeObjectIdenticalTo: stream];

			case QUEUE_REMOVE | QUEUE_WRITE:
				[writeStreams removeObjectIdenticalTo:
				    queue_c[i]];
#ifdef OF_HAVE_POLL
	[self _removeStream: stream
		 withEvents: POLLOUT];
				[self _removeStream: queue_c[i]
					 withEvents: POLLOUT];
#else
	[self _removeStream: stream
		  withFDSet: &writefds
		 otherFDSet: &readfds];
				[self _removeStream: queue_c[i]
					  withFDSet: &writefds
					 otherFDSet: &readfds];
#endif
				break;
			default:
				assert(0);
			}
		}

		[queue removeNObjects: count];
		[queueInfo removeNObjects: count];
	}
}

- (void)observe
{
	[self observeWithTimeout: -1];
}

- (BOOL)observeWithTimeout: (int)timeout
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	BOOL foundInCache = NO;
	OFStream **cArray;
	size_t i, count;
#ifdef OF_HAVE_POLL
	struct pollfd *fds_c;
	nfds_t nfds;
	size_t nfds;
#else
	fd_set readfds_;
	fd_set writefds_;
	fd_set exceptfds_;
	struct timeval tv;
#endif

	[self _processQueue];

	cArray = [readStreams cArray];
	count = [readStreams count];

	for (i = 0; i < count; i++) {
		if (cArray[i]->cache != NULL) {
			[delegate streamDidBecomeReadyForReading: cArray[i]];