ObjFW  Check-in [a61ab37726]

Overview
Comment:Cancel the currently blocking -[observe] when the stream set is changed.

Not working on win32 yet, it will be ported to it later as it's not that
easy there, since select() can only observe sockets.

Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: a61ab37726045003bd80ceaf9fd159da4e40a18de98d9123606d8692282506e4
User & Date: js on 2011-04-01 22:14:32
Other Links: manifest | tags
Context
2011-04-02
02:29
Fix small bugs introduced by the last commit. check-in: 321274075d user: js tags: trunk
2011-04-01
22:14
Cancel the currently blocking -[observe] when the stream set is changed. check-in: a61ab37726 user: js tags: trunk
17:10
Call -[releaseObjects] after calling the delegate. check-in: 61cad3ee76 user: js tags: trunk
Changes

Modified src/OFStreamObserver.h from [abbf9520ca] to [7b1f8623a5].

84
85
86
87
88
89
90

91
92
93
94
95
96
97
	OFMutableDictionary *fdToStream;
#else
	fd_set readfds;
	fd_set writefds;
	fd_set exceptfds;
	int nfds;
#endif

}

#ifdef OF_HAVE_PROPERTIES
@property (retain) id <OFStreamObserverDelegate> delegate;
#endif

/**







>







84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
	OFMutableDictionary *fdToStream;
#else
	fd_set readfds;
	fd_set writefds;
	fd_set exceptfds;
	int nfds;
#endif
	int cancelFd[2];
}

#ifdef OF_HAVE_PROPERTIES
@property (retain) id <OFStreamObserverDelegate> delegate;
#endif

/**
109
110
111
112
113
114
115
116
117
118
119
120



121
122
123
124
125
126
127
128
129
130



131
132
133
134
135
136
137



138
139
140
141
142
143



144
145
146
147
148
149
150
 *
 * \param delegate The delegate for the OFStreamObserver
 */
- (void)setDelegate: (id <OFStreamObserverDelegate>)delegate;

/**
 * Adds a stream to observe for reading.
 * 
 * This is also used to observe a listening socket for incoming connections,
 * which then triggers a read event for the observed stream.
 *
 * It is recommended that the stream you add is set to non-blocking mode.



 *
 * \param stream The stream to observe for reading
 */
- (void)addStreamToObserveForReading: (OFStream*)stream;

/**
 * Adds a stream to observe for writing.
 *
 * It is recommended that the stream you add is set to non-blocking mode.
 *



 * \param stream The stream to observe for writing
 */
- (void)addStreamToObserveForWriting: (OFStream*)stream;

/**
 * Removes a stream to observe for reading.
 *



 * \param stream The stream to remove from observing for reading
 */
- (void)removeStreamToObserveForReading: (OFStream*)stream;

/**
 * Removes a stream to observe for writing.



 *
 * \param stream The stream to remove from observing for writing
 */
- (void)removeStreamToObserveForWriting: (OFStream*)stream;

/**
 * Observes all streams and blocks until an event happens on a stream.







|




>
>
>










>
>
>







>
>
>






>
>
>







110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
 *
 * \param delegate The delegate for the OFStreamObserver
 */
- (void)setDelegate: (id <OFStreamObserverDelegate>)delegate;

/**
 * Adds a stream to observe for reading.
 *
 * This is also used to observe a listening socket for incoming connections,
 * which then triggers a read event for the observed stream.
 *
 * It is recommended that the stream you add is set to non-blocking mode.
 *
 * If there is an -[observe] call blocking, it will be canceled. The reason for
 * this is to prevent blocking even though the new added stream is ready.
 *
 * \param stream The stream to observe for reading
 */
- (void)addStreamToObserveForReading: (OFStream*)stream;

/**
 * Adds a stream to observe for writing.
 *
 * It is recommended that the stream you add is set to non-blocking mode.
 *
 * If there is an -[observe] call blocking, it will be canceled. The reason for
 * this is to prevent blocking even though the new added stream is ready.
 *
 * \param stream The stream to observe for writing
 */
- (void)addStreamToObserveForWriting: (OFStream*)stream;

/**
 * Removes a stream to observe for reading.
 *
 * If there is an -[observe] call blocking, it will be canceled. The reason for
 * this is to prevent the removed stream from still being observed.
 *
 * \param stream The stream to remove from observing for reading
 */
- (void)removeStreamToObserveForReading: (OFStream*)stream;

/**
 * Removes a stream to observe for writing.
 *
 * If there is an -[observe] call blocking, it will be canceled. The reason for
 * this is to prevent the removed stream from still being observed.
 *
 * \param stream The stream to remove from observing for writing
 */
- (void)removeStreamToObserveForWriting: (OFStream*)stream;

/**
 * Observes all streams and blocks until an event happens on a stream.

Modified src/OFStreamObserver.m from [95f2f3be53] to [864995cc48].

18
19
20
21
22
23
24


25
26
27
28
29
30
31
32
33
34
35
36

37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56


57
58
59
60
61
62
63
64
65
66
67
68












69
70
71
72
73
74
75

#define OF_STREAM_OBSERVER_M

#include <string.h>

#include <assert.h>



#ifdef OF_HAVE_POLL
# include <poll.h>
#endif

#import "OFStreamObserver.h"
#import "OFDataArray.h"
#import "OFArray.h"
#import "OFDictionary.h"
#import "OFStream.h"
#import "OFNumber.h"
#import "OFAutoreleasePool.h"


#import "OFOutOfRangeException.h"

enum {
	QUEUE_ADD = 0,
	QUEUE_REMOVE = 1,
	QUEUE_READ = 0,
	QUEUE_WRITE = 2
};

@implementation OFStreamObserver
+ observer
{
	return [[[self alloc] init] autorelease];
}

- init
{
	self = [super init];

	@try {


		readStreams = [[OFMutableArray alloc] init];
		writeStreams = [[OFMutableArray alloc] init];
		queue = [[OFMutableArray alloc] init];
		queueInfo = [[OFMutableArray alloc] init];
#ifdef OF_HAVE_POLL
		fds = [[OFDataArray alloc] initWithItemSize:
		    sizeof(struct pollfd)];
		fdToStream = [[OFMutableDictionary alloc] init];
#else
		FD_ZERO(&readfds);
		FD_ZERO(&writefds);
#endif












	} @catch (id e) {
		[self release];
		@throw e;
	}

	return self;
}







>
>












>




















>
>












>
>
>
>
>
>
>
>
>
>
>
>







18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92

#define OF_STREAM_OBSERVER_M

#include <string.h>

#include <assert.h>

#include <unistd.h>

#ifdef OF_HAVE_POLL
# include <poll.h>
#endif

#import "OFStreamObserver.h"
#import "OFDataArray.h"
#import "OFArray.h"
#import "OFDictionary.h"
#import "OFStream.h"
#import "OFNumber.h"
#import "OFAutoreleasePool.h"

#import "OFInitializationFailedException.h"
#import "OFOutOfRangeException.h"

enum {
	QUEUE_ADD = 0,
	QUEUE_REMOVE = 1,
	QUEUE_READ = 0,
	QUEUE_WRITE = 2
};

@implementation OFStreamObserver
+ observer
{
	return [[[self alloc] init] autorelease];
}

- init
{
	self = [super init];

	@try {
		struct pollfd p = { 0, POLLIN, 0 };

		readStreams = [[OFMutableArray alloc] init];
		writeStreams = [[OFMutableArray alloc] init];
		queue = [[OFMutableArray alloc] init];
		queueInfo = [[OFMutableArray alloc] init];
#ifdef OF_HAVE_POLL
		fds = [[OFDataArray alloc] initWithItemSize:
		    sizeof(struct pollfd)];
		fdToStream = [[OFMutableDictionary alloc] init];
#else
		FD_ZERO(&readfds);
		FD_ZERO(&writefds);
#endif

		if (pipe(cancelFd))
			@throw [OFInitializationFailedException
			    newWithClass: isa];

#ifdef OF_HAVE_POLL
		p.fd = cancelFd[0];
		[fds addItem: &p];
#else
		FD_SET(cancelFd[0], fdset);
		nfds = cancelFd[0] + 1;
#endif
	} @catch (id e) {
		[self release];
		@throw e;
	}

	return self;
}
194
195
196
197
198
199
200


201
202
203
204
205
206
207
208
209
210
211
212


213
214
215
216
217
218
219
220
221
222
223
224
225
226


227
228
229
230
231
232
233
234
235
236
237
238


239
240
241
242
243
244
245
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_READ];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}



	[pool release];
}

- (void)addStreamToObserveForWriting: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_WRITE];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}



	[pool release];
}

- (void)removeStreamToObserveForReading: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_READ];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}



	[pool release];
}

- (void)removeStreamToObserveForWriting: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_WRITE];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}



	[pool release];
}

- (void)_processQueue
{
	@synchronized (queue) {







>
>












>
>














>
>












>
>







211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_READ];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}

	write(cancelFd[1], "", 1);

	[pool release];
}

- (void)addStreamToObserveForWriting: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_WRITE];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}

	write(cancelFd[1], "", 1);

	[pool release];
}

- (void)removeStreamToObserveForReading: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_READ];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}

	write(cancelFd[1], "", 1);

	[pool release];
}

- (void)removeStreamToObserveForWriting: (OFStream*)stream
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_WRITE];

	@synchronized (queue) {
		[queue addObject: stream];
		[queueInfo addObject: qi];
	}

	write(cancelFd[1], "", 1);

	[pool release];
}

- (void)_processQueue
{
	@synchronized (queue) {
353
354
355
356
357
358
359








360
361
362
363
364
365
366

	if (poll(fds_c, (nfds_t)nfds, timeout) < 1)
		return NO;

	for (i = 0; i < nfds; i++) {
		OFNumber *num;
		OFStream *stream;









		if (fds_c[i].revents & POLLIN) {
			num = [OFNumber numberWithInt: fds_c[i].fd];
			stream = [fdToStream objectForKey: num];
			[delegate streamDidBecomeReadyForReading: stream];
			[pool releaseObjects];
		}







>
>
>
>
>
>
>
>







378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399

	if (poll(fds_c, (nfds_t)nfds, timeout) < 1)
		return NO;

	for (i = 0; i < nfds; i++) {
		OFNumber *num;
		OFStream *stream;

		if (fds_c[i].fd == cancelFd[0]) {
			char buf;

			read(cancelFd[0], &buf, 1);

			continue;
		}

		if (fds_c[i].revents & POLLIN) {
			num = [OFNumber numberWithInt: fds_c[i].fd];
			stream = [fdToStream objectForKey: num];
			[delegate streamDidBecomeReadyForReading: stream];
			[pool releaseObjects];
		}
391
392
393
394
395
396
397





398
399
400
401
402
403
404
	writefds_ = writefds;
	exceptfds_ = exceptfds;
# endif

	if (select(nfds, &readfds_, &writefds_, &exceptfds_,
	    (timeout != -1 ? &tv : NULL)) < 1)
		return NO;






	for (i = 0; i < count; i++) {
		int fd = [cArray[i] fileDescriptor];

		if (FD_ISSET(fd, &readfds_)) {
			[delegate streamDidBecomeReadyForReading: cArray[i]];
			[pool releaseObjects];







>
>
>
>
>







424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
	writefds_ = writefds;
	exceptfds_ = exceptfds;
# endif

	if (select(nfds, &readfds_, &writefds_, &exceptfds_,
	    (timeout != -1 ? &tv : NULL)) < 1)
		return NO;

	if (FD_ISSET(cancelFd[0], &readfds_)) {
		char buf;
		read(cancelFd[0], &buf, 1);
	}

	for (i = 0; i < count; i++) {
		int fd = [cArray[i] fileDescriptor];

		if (FD_ISSET(fd, &readfds_)) {
			[delegate streamDidBecomeReadyForReading: cArray[i]];
			[pool releaseObjects];