ObjFW  Diff

Differences From Artifact [3eaeff6179]:

To Artifact [bf14750714]:


16
17
18
19
20
21
22


23
24
25
26
27
28
29
30
31
32
33
34
35







36
37
38
39
40
41
42
43
44
45
46
47
48
49


50
51
52
53
54
55
56
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67







+
+













+
+
+
+
+
+
+














+
+








#include "config.h"

#define OF_STREAM_OBSERVER_M

#include <string.h>

#include <assert.h>

#ifdef OF_HAVE_POLL
# include <poll.h>
#endif

#import "OFStreamObserver.h"
#import "OFDataArray.h"
#import "OFArray.h"
#import "OFDictionary.h"
#import "OFStream.h"
#import "OFNumber.h"
#import "OFAutoreleasePool.h"

#import "OFOutOfRangeException.h"

enum {
	QUEUE_ADD = 0,
	QUEUE_REMOVE = 1,
	QUEUE_READ = 0,
	QUEUE_WRITE = 2
};

@implementation OFStreamObserver
+ observer
{
	return [[[self alloc] init] autorelease];
}

- init
{
	self = [super init];

	@try {
		readStreams = [[OFMutableArray alloc] init];
		writeStreams = [[OFMutableArray alloc] init];
		queue = [[OFMutableArray alloc] init];
		queueInfo = [[OFMutableArray alloc] init];
#ifdef OF_HAVE_POLL
		fds = [[OFDataArray alloc] initWithItemSize:
		    sizeof(struct pollfd)];
		fdToStream = [[OFMutableDictionary alloc] init];
#else
		FD_ZERO(&readfds);
		FD_ZERO(&writefds);
64
65
66
67
68
69
70


71
72
73
74
75
76
77
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90







+
+







}

- (void)dealloc
{
	[(id)delegate release];
	[readStreams release];
	[writeStreams release];
	[queue release];
	[queueInfo release];
#ifdef OF_HAVE_POLL
	[fdToStream release];
	[fds release];
#endif

	[super dealloc];
}
173
174
175
176
177
178
179




180
181
182




183
184
185

186
187
188
189
190
191
192




193
194
195




196
197
198

199
200
201
202
203
204
205


206
207

































208





















209
210


211
212
213
214



215
216
217

218
219
220
221



222
223
224


225
226
227
228



229









230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245

246
247
248
249
250
251


252
253
254
255
256
257
258
186
187
188
189
190
191
192
193
194
195
196



197
198
199
200



201



202
203
204
205
206
207
208
209



210
211
212
213



214



215
216
217
218
219
220


221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275


276
277
278



279
280
281
282


283




284
285
286
287


288
289
290



291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318

319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334







+
+
+
+
-
-
-
+
+
+
+
-
-
-
+
-
-
-




+
+
+
+
-
-
-
+
+
+
+
-
-
-
+
-
-
-




+
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+

-
-
-
+
+
+

-
-
+
-
-
-
-
+
+
+

-
-
+
+

-
-
-
+
+
+

+
+
+
+
+
+
+
+
+















-
+






+
+







	if (!FD_ISSET(fd, other_fdset))
		FD_CLR(fd, &exceptfds);
}
#endif

- (void)addStreamToObserveForReading: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_READ];

	@synchronized (queue) {
	[readStreams addObject: stream];

#ifdef OF_HAVE_POLL
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}

	[self _addStream: stream
	      withEvents: POLLIN];
#else
	[pool release];
	[self _addStream: stream
	       withFDSet: &readfds];
#endif
}

- (void)addStreamToObserveForWriting: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_WRITE];

	@synchronized (queue) {
	[writeStreams addObject: stream];

#ifdef OF_HAVE_POLL
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}

	[self _addStream: stream
	      withEvents: POLLOUT];
#else
	[pool release];
	[self _addStream: stream
	       withFDSet: &writefds];
#endif
}

- (void)removeStreamToObserveForReading: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_READ];
	[readStreams removeObjectIdenticalTo: stream];


	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}

	[pool release];
}

- (void)removeStreamToObserveForWriting: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_WRITE];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}

	[pool release];
}

- (void)_processQueue
{
	@synchronized (queue) {
		OFStream **queue_c = [queue cArray];
		OFNumber **queueInfo_c = [queueInfo cArray];
		size_t i, count = [queue count];

		for (i = 0; i < count; i++) {
			switch ([queueInfo_c[i] intValue]) {
			case QUEUE_ADD | QUEUE_READ:
				[readStreams addObject: queue_c[i]];
#ifdef OF_HAVE_POLL
				[self _addStream: queue_c[i]
				      withEvents: POLLIN];
#else
				[self _addStream: queue_c[i]
				       withFDSet: &readfds];
#endif
				break;
			case QUEUE_ADD | QUEUE_WRITE:
				[writeStreams addObject: queue_c[i]];
#ifdef OF_HAVE_POLL
				[self _addStream: queue_c[i]
				      withEvents: POLLOUT];
#else
				[self _addStream: queue_c[i]
				       withFDSet: &writefds];
#endif
				break;
			case QUEUE_REMOVE | QUEUE_READ:
				[readStreams removeObjectIdenticalTo:
				    queue_c[i]];
#ifdef OF_HAVE_POLL
	[self _removeStream: stream
		 withEvents: POLLIN];
				[self _removeStream: queue_c[i]
					 withEvents: POLLIN];
#else
	[self _removeStream: stream
		  withFDSet: &readfds
		 otherFDSet: &writefds];
				[self _removeStream: queue_c[i]
					  withFDSet: &readfds
					 otherFDSet: &writefds];
#endif
}

				break;
- (void)removeStreamToObserveForWriting: (OFStream*)stream
{
	[writeStreams removeObjectIdenticalTo: stream];

			case QUEUE_REMOVE | QUEUE_WRITE:
				[writeStreams removeObjectIdenticalTo:
				    queue_c[i]];
#ifdef OF_HAVE_POLL
	[self _removeStream: stream
		 withEvents: POLLOUT];
				[self _removeStream: queue_c[i]
					 withEvents: POLLOUT];
#else
	[self _removeStream: stream
		  withFDSet: &writefds
		 otherFDSet: &readfds];
				[self _removeStream: queue_c[i]
					  withFDSet: &writefds
					 otherFDSet: &readfds];
#endif
				break;
			default:
				assert(0);
			}
		}

		[queue removeNObjects: count];
		[queueInfo removeNObjects: count];
	}
}

- (void)observe
{
	[self observeWithTimeout: -1];
}

- (BOOL)observeWithTimeout: (int)timeout
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	BOOL foundInCache = NO;
	OFStream **cArray;
	size_t i, count;
#ifdef OF_HAVE_POLL
	struct pollfd *fds_c;
	nfds_t nfds;
	size_t nfds;
#else
	fd_set readfds_;
	fd_set writefds_;
	fd_set exceptfds_;
	struct timeval tv;
#endif

	[self _processQueue];

	cArray = [readStreams cArray];
	count = [readStreams count];

	for (i = 0; i < count; i++) {
		if (cArray[i]->cache != NULL) {
			[delegate streamDidBecomeReadyForReading: cArray[i]];