ObjFW  Diff

Differences From Artifact [95f2f3be53]:

To Artifact [864995cc48]:


18
19
20
21
22
23
24


25
26
27
28
29
30
31
32
33
34
35
36

37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56


57
58
59
60
61
62
63
64
65
66
67
68












69
70
71
72
73
74
75

#define OF_STREAM_OBSERVER_M

#include <string.h>

#include <assert.h>



#ifdef OF_HAVE_POLL
# include <poll.h>
#endif

#import "OFStreamObserver.h"
#import "OFDataArray.h"
#import "OFArray.h"
#import "OFDictionary.h"
#import "OFStream.h"
#import "OFNumber.h"
#import "OFAutoreleasePool.h"


#import "OFOutOfRangeException.h"

enum {
	QUEUE_ADD = 0,
	QUEUE_REMOVE = 1,
	QUEUE_READ = 0,
	QUEUE_WRITE = 2
};

@implementation OFStreamObserver
+ observer
{
	return [[[self alloc] init] autorelease];
}

- init
{
	self = [super init];

	@try {


		readStreams = [[OFMutableArray alloc] init];
		writeStreams = [[OFMutableArray alloc] init];
		queue = [[OFMutableArray alloc] init];
		queueInfo = [[OFMutableArray alloc] init];
#ifdef OF_HAVE_POLL
		fds = [[OFDataArray alloc] initWithItemSize:
		    sizeof(struct pollfd)];
		fdToStream = [[OFMutableDictionary alloc] init];
#else
		FD_ZERO(&readfds);
		FD_ZERO(&writefds);
#endif












	} @catch (id e) {
		[self release];
		@throw e;
	}

	return self;
}







>
>












>




















>
>












>
>
>
>
>
>
>
>
>
>
>
>







18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92

#define OF_STREAM_OBSERVER_M

#include <string.h>

#include <assert.h>

#include <unistd.h>

#ifdef OF_HAVE_POLL
# include <poll.h>
#endif

#import "OFStreamObserver.h"
#import "OFDataArray.h"
#import "OFArray.h"
#import "OFDictionary.h"
#import "OFStream.h"
#import "OFNumber.h"
#import "OFAutoreleasePool.h"

#import "OFInitializationFailedException.h"
#import "OFOutOfRangeException.h"

enum {
	QUEUE_ADD = 0,
	QUEUE_REMOVE = 1,
	QUEUE_READ = 0,
	QUEUE_WRITE = 2
};

@implementation OFStreamObserver
+ observer
{
	return [[[self alloc] init] autorelease];
}

- init
{
	self = [super init];

	@try {
		struct pollfd p = { 0, POLLIN, 0 };

		readStreams = [[OFMutableArray alloc] init];
		writeStreams = [[OFMutableArray alloc] init];
		queue = [[OFMutableArray alloc] init];
		queueInfo = [[OFMutableArray alloc] init];
#ifdef OF_HAVE_POLL
		fds = [[OFDataArray alloc] initWithItemSize:
		    sizeof(struct pollfd)];
		fdToStream = [[OFMutableDictionary alloc] init];
#else
		FD_ZERO(&readfds);
		FD_ZERO(&writefds);
#endif

		if (pipe(cancelFd))
			@throw [OFInitializationFailedException
			    newWithClass: isa];

#ifdef OF_HAVE_POLL
		p.fd = cancelFd[0];
		[fds addItem: &p];
#else
		FD_SET(cancelFd[0], fdset);
		nfds = cancelFd[0] + 1;
#endif
	} @catch (id e) {
		[self release];
		@throw e;
	}

	return self;
}
194
195
196
197
198
199
200


201
202
203
204
205
206
207
208
209
210
211
212


213
214
215
216
217
218
219
220
221
222
223
224
225
226


227
228
229
230
231
232
233
234
235
236
237
238


239
240
241
242
243
244
245
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_READ];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}



	[pool release];
}

- (void)addStreamToObserveForWriting: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_WRITE];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}



	[pool release];
}

- (void)removeStreamToObserveForReading: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_READ];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}



	[pool release];
}

- (void)removeStreamToObserveForWriting: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_WRITE];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}



	[pool release];
}

- (void)_processQueue
{
	@synchronized (queue) {







>
>












>
>














>
>












>
>







211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_READ];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}

	write(cancelFd[1], "", 1);

	[pool release];
}

- (void)addStreamToObserveForWriting: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_WRITE];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}

	write(cancelFd[1], "", 1);

	[pool release];
}

- (void)removeStreamToObserveForReading: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_READ];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}

	write(cancelFd[1], "", 1);

	[pool release];
}

- (void)removeStreamToObserveForWriting: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_WRITE];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}

	write(cancelFd[1], "", 1);

	[pool release];
}

- (void)_processQueue
{
	@synchronized (queue) {
353
354
355
356
357
358
359








360
361
362
363
364
365
366

	if (poll(fds_c, (nfds_t)nfds, timeout) < 1)
		return NO;

	for (i = 0; i < nfds; i++) {
		OFNumber *num;
		OFStream *stream;









		if (fds_c[i].revents & POLLIN) {
			num = [OFNumber numberWithInt: fds_c[i].fd];
			stream = [fdToStream objectForKey: num];
			[delegate streamDidBecomeReadyForReading: stream];
			[pool releaseObjects];
		}







>
>
>
>
>
>
>
>







378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399

	if (poll(fds_c, (nfds_t)nfds, timeout) < 1)
		return NO;

	for (i = 0; i < nfds; i++) {
		OFNumber *num;
		OFStream *stream;

		if (fds_c[i].fd == cancelFd[0]) {
			char buf;

			read(cancelFd[0], &buf, 1);

			continue;
		}

		if (fds_c[i].revents & POLLIN) {
			num = [OFNumber numberWithInt: fds_c[i].fd];
			stream = [fdToStream objectForKey: num];
			[delegate streamDidBecomeReadyForReading: stream];
			[pool releaseObjects];
		}
391
392
393
394
395
396
397





398
399
400
401
402
403
404
	writefds_ = writefds;
	exceptfds_ = exceptfds;
# endif

	if (select(nfds, &readfds_, &writefds_, &exceptfds_,
	    (timeout != -1 ? &tv : NULL)) < 1)
		return NO;






	for (i = 0; i < count; i++) {
		int fd = [cArray[i] fileDescriptor];

		if (FD_ISSET(fd, &readfds_)) {
			[delegate streamDidBecomeReadyForReading: cArray[i]];
			[pool releaseObjects];







>
>
>
>
>







424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
	writefds_ = writefds;
	exceptfds_ = exceptfds;
# endif

	if (select(nfds, &readfds_, &writefds_, &exceptfds_,
	    (timeout != -1 ? &tv : NULL)) < 1)
		return NO;

	if (FD_ISSET(cancelFd[0], &readfds_)) {
		char buf;
		read(cancelFd[0], &buf, 1);
	}

	for (i = 0; i < count; i++) {
		int fd = [cArray[i] fileDescriptor];

		if (FD_ISSET(fd, &readfds_)) {
			[delegate streamDidBecomeReadyForReading: cArray[i]];
			[pool releaseObjects];