ObjFW  Check-in [b554732eae]

Overview
Comment:Improve OFStreamObserver.

It also looks at the cache of each stream now and does not block if
there is still data in the cache.

Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: b554732eaee65ed20141c7e2644c11d493dc942b4f152aceeba62adfeea55bc4
User & Date: js on 2010-09-19 02:52:19
Other Links: manifest | tags
Context
2010-09-19
22:35
Add -[unicodeString] to OFString. check-in: b5f0fc343d user: js tags: trunk
02:52
Improve OFStreamObserver. check-in: b554732eae user: js tags: trunk
02:34
Only handle the first matching object in OFMutalbeArray operations. check-in: de45535c23 user: js tags: trunk
Changes

Modified src/OFStream.h from [c21dc06749] to [dd21d0d02a].

23
24
25
26
27
28
29

30


31
32
33
34
35
36
37
 * _writeNBytes:fromBuffer: and _atEndOfStream, but nothing else. Those are not
 * defined in the headers, but do the actual work. OFStream uses those and does
 * all the caching and other stuff. If you override these methods without the
 * _ prefix, you *WILL* break caching and get broken results!
 */
@interface OFStream: OFObject
{

	char   *cache, *wBuffer;


	size_t cacheLen, wBufferLen;
	BOOL   useWBuffer;
}

/**
 * Returns a boolean whether the end of the stream has been reached.
 *







>
|
>
>







23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
 * _writeNBytes:fromBuffer: and _atEndOfStream, but nothing else. Those are not
 * defined in the headers, but do the actual work. OFStream uses those and does
 * all the caching and other stuff. If you override these methods without the
 * _ prefix, you *WILL* break caching and get broken results!
 */
@interface OFStream: OFObject
{
@public
	char   *cache;
@protected
	char   *wBuffer;
	size_t cacheLen, wBufferLen;
	BOOL   useWBuffer;
}

/**
 * Returns a boolean whether the end of the stream has been reached.
 *

Modified src/OFStreamObserver.h from [943d945622] to [572eeb8e96].

22
23
24
25
26
27
28

29
30
31
32
33
34
35
# include <windows.h>
#endif

@class OFStream;
#ifdef OF_HAVE_POLL
@class OFDataArray;
#endif

@class OFMutableDictionary;

/**
 * \brief A protocol that needs to be implemented by delegates for
 *	  OFStreamObserver.
 */
@protocol OFStreamObserverDelegate







>







22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# include <windows.h>
#endif

@class OFStream;
#ifdef OF_HAVE_POLL
@class OFDataArray;
#endif
@class OFMutableArray;
@class OFMutableDictionary;

/**
 * \brief A protocol that needs to be implemented by delegates for
 *	  OFStreamObserver.
 */
@protocol OFStreamObserverDelegate
50
51
52
53
54
55
56


57
58
59

60
61
62
63
64
65
66
67
68
69
70
71
72
@end

/**
 * \brief A class that can observe multiple streams at once.
 */
@interface OFStreamObserver: OFObject
{


	id <OFStreamObserverDelegate> delegate;
#ifdef OF_HAVE_POLL
	OFDataArray *fds;

#else
	fd_set readfds;
	fd_set writefds;
	int nfds;
#endif
	OFMutableDictionary *fdToStream;
}

#ifdef OF_HAVE_PROPERTIES
@property (retain) id <OFStreamObserverDelegate> delegate;
#endif

/**







>
>



>





<







51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

69
70
71
72
73
74
75
@end

/**
 * \brief A class that can observe multiple streams at once.
 */
@interface OFStreamObserver: OFObject
{
	OFMutableArray *readStreams;
	OFMutableArray *writeStreams;
	id <OFStreamObserverDelegate> delegate;
#ifdef OF_HAVE_POLL
	OFDataArray *fds;
	OFMutableDictionary *fdToStream;
#else
	fd_set readfds;
	fd_set writefds;
	int nfds;
#endif

}

#ifdef OF_HAVE_PROPERTIES
@property (retain) id <OFStreamObserverDelegate> delegate;
#endif

/**

Modified src/OFStreamObserver.m from [7ad4630322] to [2babecb315].

15
16
17
18
19
20
21

22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37



38
39


40
41
42
43

44


45
46
47
48
49
50
51


52

53
54
55
56
57
58
59
60
61
62

#ifdef OF_HAVE_POLL
# include <poll.h>
#endif

#import "OFStreamObserver.h"
#import "OFDataArray.h"

#import "OFDictionary.h"
#import "OFStream.h"
#import "OFNumber.h"
#import "OFAutoreleasePool.h"
#import "OFExceptions.h"

@implementation OFStreamObserver
+ streamObserver
{
	return [[[self alloc] init] autorelease];
}

- init
{
	self = [super init];




#ifdef OF_HAVE_POLL
	fds = [[OFDataArray alloc] initWithItemSize: sizeof(struct pollfd)];


#else
	FD_ZERO(&readfds);
	FD_ZERO(&writefds);
#endif

	fdToStream = [[OFMutableDictionary alloc] init];



	return self;
}

- (void)dealloc
{
	[(id)delegate release];


#ifdef OF_HAVE_POLL

	[fds release];
#endif
	[fdToStream release];

	[super dealloc];
}

- (id <OFStreamObserverDelegate>)delegate
{
	return [[(id)delegate retain] autorelease];







>
















>
>
>

|
>
>

|
|

>
|
>
>







>
>

>


<







15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

67
68
69
70
71
72
73

#ifdef OF_HAVE_POLL
# include <poll.h>
#endif

#import "OFStreamObserver.h"
#import "OFDataArray.h"
#import "OFArray.h"
#import "OFDictionary.h"
#import "OFStream.h"
#import "OFNumber.h"
#import "OFAutoreleasePool.h"
#import "OFExceptions.h"

@implementation OFStreamObserver
+ streamObserver
{
	return [[[self alloc] init] autorelease];
}

- init
{
	self = [super init];

	@try {
		readStreams = [[OFMutableArray alloc] init];
		writeStreams = [[OFMutableArray alloc] init];
#ifdef OF_HAVE_POLL
		fds = [[OFDataArray alloc] initWithItemSize:
		    sizeof(struct pollfd)];
		fdToStream = [[OFMutableDictionary alloc] init];
#else
		FD_ZERO(&readfds);
		FD_ZERO(&writefds);
#endif
	} @catch (OFException *e) {
		[self dealloc];
		@throw e;
	}

	return self;
}

- (void)dealloc
{
	[(id)delegate release];
	[readStreams release];
	[writeStreams release];
#ifdef OF_HAVE_POLL
	[fdToStream release];
	[fds release];
#endif


	[super dealloc];
}

- (id <OFStreamObserverDelegate>)delegate
{
	return [[(id)delegate retain] autorelease];
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166


167
168
169
170
171
172
173
174
175
176
177


178
179
180
181
182
183
184
185
186
187
188


189
190
191
192
193
194
195
196
197
198
199


200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216



217
218
219






220


















221
222
223
224
225
226
227
		@throw [OFOutOfRangeException newWithClass: isa];

	FD_SET(fd, fdset);

	if (fd >= nfds)
		nfds = fd + 1;

	[fdToStream setObject: stream
		       forKey: [OFNumber numberWithInt: fd]];

	[pool release];
}

- (void)_removeStream: (OFStream*)stream
	    withFDSet: (fd_set*)fdset
{
	int fd = [stream fileDescriptor];

	if (fd >= FD_SETSIZE)
		@throw [OFOutOfRangeException newWithClass: isa];

	FD_CLR(fd, fdset);

	if (!FD_ISSET(fd, &readfds) && !FD_ISSET(fd, &writefds)) {
		OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];

		[fdToStream removeObjectForKey: [OFNumber numberWithInt: fd]];

		[pool release];
	}
}
#endif

- (void)addStreamToObserveForReading: (OFStream*)stream
{


#ifdef OF_HAVE_POLL
	[self _addStream: stream
	      withEvents: POLLIN];
#else
	[self _addStream: stream
	       withFDSet: &readfds];
#endif
}

- (void)addStreamToObserveForWriting: (OFStream*)stream
{


#ifdef OF_HAVE_POLL
	[self _addStream: stream
	      withEvents: POLLOUT];
#else
	[self _addStream: stream
	       withFDSet: &writefds];
#endif
}

- (void)removeStreamToObserveForReading: (OFStream*)stream
{


#ifdef OF_HAVE_POLL
	[self _removeStream: stream
		 withEvents: POLLIN];
#else
	[self _removeStream: stream
		  withFDSet: &readfds];
#endif
}

- (void)removeStreamToObserveForWriting: (OFStream*)stream
{


#ifdef OF_HAVE_POLL
	[self _removeStream: stream
		 withEvents: POLLOUT];
#else
	[self _removeStream: stream
		  withFDSet: &writefds];
#endif
}

- (void)observe
{
	[self observeWithTimeout: -1];
}

- (BOOL)observeWithTimeout: (int)timeout
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];



#ifdef OF_HAVE_POLL
	struct pollfd *fds_c = [fds cArray];
	size_t i, nfds = [fds count];

























	if (poll(fds_c, nfds, timeout) < 1)
		return NO;

	for (i = 0; i < nfds; i++) {
		OFNumber *num;
		OFStream *stream;








<
<
<












<
<
<
<
<
<
<
<





>
>











>
>











>
>











>
>

















>
>
>


|
>
>
>
>
>
>

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







143
144
145
146
147
148
149



150
151
152
153
154
155
156
157
158
159
160
161








162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
		@throw [OFOutOfRangeException newWithClass: isa];

	FD_SET(fd, fdset);

	if (fd >= nfds)
		nfds = fd + 1;




	[pool release];
}

- (void)_removeStream: (OFStream*)stream
	    withFDSet: (fd_set*)fdset
{
	int fd = [stream fileDescriptor];

	if (fd >= FD_SETSIZE)
		@throw [OFOutOfRangeException newWithClass: isa];

	FD_CLR(fd, fdset);








}
#endif

- (void)addStreamToObserveForReading: (OFStream*)stream
{
	[readStreams addObject: stream];

#ifdef OF_HAVE_POLL
	[self _addStream: stream
	      withEvents: POLLIN];
#else
	[self _addStream: stream
	       withFDSet: &readfds];
#endif
}

- (void)addStreamToObserveForWriting: (OFStream*)stream
{
	[writeStreams addObject: stream];

#ifdef OF_HAVE_POLL
	[self _addStream: stream
	      withEvents: POLLOUT];
#else
	[self _addStream: stream
	       withFDSet: &writefds];
#endif
}

- (void)removeStreamToObserveForReading: (OFStream*)stream
{
	[readStreams removeObjectIdenticalTo: stream];

#ifdef OF_HAVE_POLL
	[self _removeStream: stream
		 withEvents: POLLIN];
#else
	[self _removeStream: stream
		  withFDSet: &readfds];
#endif
}

- (void)removeStreamToObserveForWriting: (OFStream*)stream
{
	[writeStreams removeObjectIdenticalTo: stream];

#ifdef OF_HAVE_POLL
	[self _removeStream: stream
		 withEvents: POLLOUT];
#else
	[self _removeStream: stream
		  withFDSet: &writefds];
#endif
}

- (void)observe
{
	[self observeWithTimeout: -1];
}

- (BOOL)observeWithTimeout: (int)timeout
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
	BOOL foundInCache = NO;
	OFStream **cArray;
	size_t i, count;
#ifdef OF_HAVE_POLL
	struct pollfd *fds_c = [fds cArray];
	size_t nfds = [fds count];
#else
	fd_set readfds_;
	fd_set writefds_;
	fd_set exceptfds_;
	struct timeval tv;
#endif

	cArray = [readStreams cArray];
	count = [readStreams count];

	for (i = 0; i < count; i++) {
		if (cArray[i]->cache != NULL) {
			[delegate streamDidBecomeReadyForReading: cArray[i]];
			foundInCache = YES;
		}
	}

	/*
	 * As long as we have data in the cache for any stream, we don't want
	 * to block.
	 */
	if (foundInCache)
		return YES;

#ifdef OF_HAVE_POLL
	if (poll(fds_c, nfds, timeout) < 1)
		return NO;

	for (i = 0; i < nfds; i++) {
		OFNumber *num;
		OFStream *stream;

236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265







266
267
268
269

270
271
272
273
274
275
276
			stream = [fdToStream objectForKey: num];
			[delegate streamDidBecomeReadyForReading: stream];
		}

		fds_c[i].revents = 0;
	}
#else
	fd_set readfds_;
	fd_set writefds_;
	fd_set exceptfds_;
	struct timeval tv;
	OFEnumerator *enumerator;
	OFStream *stream;

	readfds_ = readfds;
	writefds_ = writefds;
	FD_ZERO(&exceptfds_);

	if (select(nfds, &readfds_, &writefds_, &exceptfds_,
	    (timeout != -1 ? &tv : NULL)) < 1)
		return NO;

	enumerator = [[[fdToStream copy] autorelease] objectEnumerator];

	while ((stream = [enumerator nextObject]) != nil) {
		int fd = [stream fileDescriptor];

		if (FD_ISSET(fd, &readfds_))
			[delegate streamDidBecomeReadyForReading: stream];








		if (FD_ISSET(fd, &writefds_))
			[delegate streamDidBecomeReadyForWriting: stream];
	}
#endif

	[pool release];

	return YES;
}
@end

@implementation OFObject (OFStreamObserverDelegate)







<
<
<
<
<
<
<








<
|
<
|


|
|
>
>
>
>
>
>
>
|
|


>







271
272
273
274
275
276
277







278
279
280
281
282
283
284
285

286

287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
			stream = [fdToStream objectForKey: num];
			[delegate streamDidBecomeReadyForReading: stream];
		}

		fds_c[i].revents = 0;
	}
#else







	readfds_ = readfds;
	writefds_ = writefds;
	FD_ZERO(&exceptfds_);

	if (select(nfds, &readfds_, &writefds_, &exceptfds_,
	    (timeout != -1 ? &tv : NULL)) < 1)
		return NO;


	for (i = 0; i < count; i++) {

		int fd = [cArray[i] fileDescriptor];

		if (FD_ISSET(fd, &readfds_))
			[delegate streamDidBecomeReadyForReading: cArray[i]];
	}

	cArray = [writeStreams cArray];
	count = [writeStreams count];

	for (i = 0; i < count; i++) {
		int fd = [cArray[i] fileDescriptor];

		if (FD_ISSET(fd, &readfds_))
			[delegate streamDidBecomeReadyForWriting: cArray[i]];
	}
#endif

	[pool release];

	return YES;
}
@end

@implementation OFObject (OFStreamObserverDelegate)