ObjFW  Check-in [35aab77af3]

Overview
Comment:Make OFStreamObserver thread-safe.

It does not cancel the currently running -[observe] yet when changing
the set of streams to observe.

Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: 35aab77af3b7af7684b677a3a16b39b09274c3cc1ffb66a0824abb4a19bd6b81
User & Date: js on 2011-04-01 16:54:57
Other Links: manifest | tags
Context
2011-04-01
17:10
Call -[releaseObjects] after calling the delegate. check-in: 61cad3ee76 user: js tags: trunk
16:54
Make OFStreamObserver thread-safe. check-in: 35aab77af3 user: js tags: trunk
16:06
Check nfds against OPEN_MAX before calling poll. check-in: 52102a2906 user: js tags: trunk
Changes

Modified src/OFStreamObserver.h from [7555d40ac1] to [abbf9520ca].

73
74
75
76
77
78
79

80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
 *
 * Note: Currently, Win32 can only observe sockets and not files!
 */
@interface OFStreamObserver: OFObject
{
	OFMutableArray *readStreams;
	OFMutableArray *writeStreams;

	id <OFStreamObserverDelegate> delegate;
#ifdef OF_HAVE_POLL
	OFDataArray *fds;
	OFMutableDictionary *fdToStream;
#else
	fd_set readfds;
	fd_set writefds;
	fd_set exceptfds;
# ifndef _WIN32
	nfds_t nfds;
# else
	int nfds;
# endif
#endif
}

#ifdef OF_HAVE_PROPERTIES
@property (retain) id <OFStreamObserverDelegate> delegate;
#endif








>








<
<
<

<







73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88



89

90
91
92
93
94
95
96
 *
 * Note: Currently, Win32 can only observe sockets and not files!
 */
@interface OFStreamObserver: OFObject
{
	OFMutableArray *readStreams;
	OFMutableArray *writeStreams;
	OFMutableArray *queue, *queueInfo;
	id <OFStreamObserverDelegate> delegate;
#ifdef OF_HAVE_POLL
	OFDataArray *fds;
	OFMutableDictionary *fdToStream;
#else
	fd_set readfds;
	fd_set writefds;
	fd_set exceptfds;



	int nfds;

#endif
}

#ifdef OF_HAVE_PROPERTIES
@property (retain) id <OFStreamObserverDelegate> delegate;
#endif

116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139

/**
 * Adds a stream to observe for reading.
 * 
 * This is also used to observe a listening socket for incoming connections,
 * which then triggers a read event for the observed stream.
 *
 * It is recommended that the stream you add it set to non-blocking mode.
 *
 * \param stream The stream to observe for reading
 */
- (void)addStreamToObserveForReading: (OFStream*)stream;

/**
 * Adds a stream to observe for writing.
 *
 * It is recommended that the stream you add it set to non-blocking mode.
 *
 * \param stream The stream to observe for writing
 */
- (void)addStreamToObserveForWriting: (OFStream*)stream;

/**
 * Removes a stream to observe for reading.







|








|







113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136

/**
 * Adds a stream to observe for reading.
 * 
 * This is also used to observe a listening socket for incoming connections,
 * which then triggers a read event for the observed stream.
 *
 * It is recommended that the stream you add is set to non-blocking mode.
 *
 * \param stream The stream to observe for reading
 */
- (void)addStreamToObserveForReading: (OFStream*)stream;

/**
 * Adds a stream to observe for writing.
 *
 * It is recommended that the stream you add is set to non-blocking mode.
 *
 * \param stream The stream to observe for writing
 */
- (void)addStreamToObserveForWriting: (OFStream*)stream;

/**
 * Removes a stream to observe for reading.

Modified src/OFStreamObserver.m from [3eaeff6179] to [bf14750714].

16
17
18
19
20
21
22


23
24
25
26
27
28
29
30
31
32
33
34
35







36
37
38
39
40
41
42
43
44
45
46
47
48
49


50
51
52
53
54
55
56

#include "config.h"

#define OF_STREAM_OBSERVER_M

#include <string.h>



#ifdef OF_HAVE_POLL
# include <poll.h>
#endif

#import "OFStreamObserver.h"
#import "OFDataArray.h"
#import "OFArray.h"
#import "OFDictionary.h"
#import "OFStream.h"
#import "OFNumber.h"
#import "OFAutoreleasePool.h"

#import "OFOutOfRangeException.h"








@implementation OFStreamObserver
+ observer
{
	return [[[self alloc] init] autorelease];
}

- init
{
	self = [super init];

	@try {
		readStreams = [[OFMutableArray alloc] init];
		writeStreams = [[OFMutableArray alloc] init];


#ifdef OF_HAVE_POLL
		fds = [[OFDataArray alloc] initWithItemSize:
		    sizeof(struct pollfd)];
		fdToStream = [[OFMutableDictionary alloc] init];
#else
		FD_ZERO(&readfds);
		FD_ZERO(&writefds);







>
>













>
>
>
>
>
>
>














>
>







16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

#include "config.h"

#define OF_STREAM_OBSERVER_M

#include <string.h>

#include <assert.h>

#ifdef OF_HAVE_POLL
# include <poll.h>
#endif

#import "OFStreamObserver.h"
#import "OFDataArray.h"
#import "OFArray.h"
#import "OFDictionary.h"
#import "OFStream.h"
#import "OFNumber.h"
#import "OFAutoreleasePool.h"

#import "OFOutOfRangeException.h"

enum {
	QUEUE_ADD = 0,
	QUEUE_REMOVE = 1,
	QUEUE_READ = 0,
	QUEUE_WRITE = 2
};

@implementation OFStreamObserver
+ observer
{
	return [[[self alloc] init] autorelease];
}

- init
{
	self = [super init];

	@try {
		readStreams = [[OFMutableArray alloc] init];
		writeStreams = [[OFMutableArray alloc] init];
		queue = [[OFMutableArray alloc] init];
		queueInfo = [[OFMutableArray alloc] init];
#ifdef OF_HAVE_POLL
		fds = [[OFDataArray alloc] initWithItemSize:
		    sizeof(struct pollfd)];
		fdToStream = [[OFMutableDictionary alloc] init];
#else
		FD_ZERO(&readfds);
		FD_ZERO(&writefds);
64
65
66
67
68
69
70


71
72
73
74
75
76
77
}

- (void)dealloc
{
	[(id)delegate release];
	[readStreams release];
	[writeStreams release];


#ifdef OF_HAVE_POLL
	[fdToStream release];
	[fds release];
#endif

	[super dealloc];
}







>
>







75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
}

- (void)dealloc
{
	[(id)delegate release];
	[readStreams release];
	[writeStreams release];
	[queue release];
	[queueInfo release];
#ifdef OF_HAVE_POLL
	[fdToStream release];
	[fds release];
#endif

	[super dealloc];
}
173
174
175
176
177
178
179




180

181
182
183
184
185
186
187
188
189
190
191
192




193

194
195
196
197
198
199
200
201
202
203
204
205


206



207




























208





















209
210
211
212
213
214
215
216
217
218
219

220
221
222
223
224
225
226
227
228
229









230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251


252
253
254
255
256
257
258
	if (!FD_ISSET(fd, other_fdset))
		FD_CLR(fd, &exceptfds);
}
#endif

- (void)addStreamToObserveForReading: (OFStream*)stream
{




	[readStreams addObject: stream];


#ifdef OF_HAVE_POLL
	[self _addStream: stream
	      withEvents: POLLIN];
#else
	[self _addStream: stream
	       withFDSet: &readfds];
#endif
}

- (void)addStreamToObserveForWriting: (OFStream*)stream
{




	[writeStreams addObject: stream];


#ifdef OF_HAVE_POLL
	[self _addStream: stream
	      withEvents: POLLOUT];
#else
	[self _addStream: stream
	       withFDSet: &writefds];
#endif
}

- (void)removeStreamToObserveForReading: (OFStream*)stream
{


	[readStreams removeObjectIdenticalTo: stream];
































#ifdef OF_HAVE_POLL





















	[self _removeStream: stream
		 withEvents: POLLIN];
#else
	[self _removeStream: stream
		  withFDSet: &readfds
		 otherFDSet: &writefds];
#endif
}

- (void)removeStreamToObserveForWriting: (OFStream*)stream
{

	[writeStreams removeObjectIdenticalTo: stream];

#ifdef OF_HAVE_POLL
	[self _removeStream: stream
		 withEvents: POLLOUT];
#else
	[self _removeStream: stream
		  withFDSet: &writefds
		 otherFDSet: &readfds];
#endif









}

- (void)observe
{
	[self observeWithTimeout: -1];
}

- (BOOL)observeWithTimeout: (int)timeout
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	BOOL foundInCache = NO;
	OFStream **cArray;
	size_t i, count;
#ifdef OF_HAVE_POLL
	struct pollfd *fds_c;
	nfds_t nfds;
#else
	fd_set readfds_;
	fd_set writefds_;
	fd_set exceptfds_;
	struct timeval tv;
#endif



	cArray = [readStreams cArray];
	count = [readStreams count];

	for (i = 0; i < count; i++) {
		if (cArray[i]->cache != NULL) {
			[delegate streamDidBecomeReadyForReading: cArray[i]];







>
>
>
>
|
>
|
|
<
<
|
<
<
<




>
>
>
>
|
>
|
|
<
<
|
<
<
<




>
>
|
>
>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
|

|
|
|

<
|
<
<
>
|
|

|
|

|
|
|

>
>
>
>
>
>
>
>
>















|






>
>







186
187
188
189
190
191
192
193
194
195
196
197
198
199
200


201



202
203
204
205
206
207
208
209
210
211
212
213


214



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282

283


284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
	if (!FD_ISSET(fd, other_fdset))
		FD_CLR(fd, &exceptfds);
}
#endif

- (void)addStreamToObserveForReading: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_READ];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}



	[pool release];



}

- (void)addStreamToObserveForWriting: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_WRITE];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}



	[pool release];



}

- (void)removeStreamToObserveForReading: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_READ];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}

	[pool release];
}

- (void)removeStreamToObserveForWriting: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_WRITE];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}

	[pool release];
}

- (void)_processQueue
{
	@synchronized (queue) {
		OFStream **queue_c = [queue cArray];
		OFNumber **queueInfo_c = [queueInfo cArray];
		size_t i, count = [queue count];

		for (i = 0; i < count; i++) {
			switch ([queueInfo_c[i] intValue]) {
			case QUEUE_ADD | QUEUE_READ:
				[readStreams addObject: queue_c[i]];
#ifdef OF_HAVE_POLL
				[self _addStream: queue_c[i]
				      withEvents: POLLIN];
#else
				[self _addStream: queue_c[i]
				       withFDSet: &readfds];
#endif
				break;
			case QUEUE_ADD | QUEUE_WRITE:
				[writeStreams addObject: queue_c[i]];
#ifdef OF_HAVE_POLL
				[self _addStream: queue_c[i]
				      withEvents: POLLOUT];
#else
				[self _addStream: queue_c[i]
				       withFDSet: &writefds];
#endif
				break;
			case QUEUE_REMOVE | QUEUE_READ:
				[readStreams removeObjectIdenticalTo:
				    queue_c[i]];
#ifdef OF_HAVE_POLL
				[self _removeStream: queue_c[i]
					 withEvents: POLLIN];
#else
				[self _removeStream: queue_c[i]
					  withFDSet: &readfds
					 otherFDSet: &writefds];
#endif

				break;


			case QUEUE_REMOVE | QUEUE_WRITE:
				[writeStreams removeObjectIdenticalTo:
				    queue_c[i]];
#ifdef OF_HAVE_POLL
				[self _removeStream: queue_c[i]
					 withEvents: POLLOUT];
#else
				[self _removeStream: queue_c[i]
					  withFDSet: &writefds
					 otherFDSet: &readfds];
#endif
				break;
			default:
				assert(0);
			}
		}

		[queue removeNObjects: count];
		[queueInfo removeNObjects: count];
	}
}

- (void)observe
{
	[self observeWithTimeout: -1];
}

- (BOOL)observeWithTimeout: (int)timeout
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	BOOL foundInCache = NO;
	OFStream **cArray;
	size_t i, count;
#ifdef OF_HAVE_POLL
	struct pollfd *fds_c;
	size_t nfds;
#else
	fd_set readfds_;
	fd_set writefds_;
	fd_set exceptfds_;
	struct timeval tv;
#endif

	[self _processQueue];

	cArray = [readStreams cArray];
	count = [readStreams count];

	for (i = 0; i < count; i++) {
		if (cArray[i]->cache != NULL) {
			[delegate streamDidBecomeReadyForReading: cArray[i]];