ObjFW  Check-in [220513a3f5]

Overview
Comment:More OFKernelEventObserver refactoring

This was necessary because select(), poll() and kevent() on FreeBSD and
OS X would sometimes return 0 events, even if there are some, and
sometimes return the correct number of events that were pending, meaning
the number of events returned is unreliable. To make things worse,
whether it returns 0 or the number of events that were pending is
completely non-deterministic on both FreeBSD and OS X (running the same
tests multiple times in a row would make it sometimes work and sometimes
fail).

In order to prevent code from depending on the return value of
-[observeForTimeInterval:] (which would depend on select(), poll() and
kevent() returning the correct number), OFKernelObserver no longer
returns whether there were pending events. It is expected that
-[observe] or -[observeForTimeInterval:] is just called in a loop as
long as events should be handled.

The tests have been changed as well to reflect this. What they do now is
set a deadline and call -[observeForTimeInterval:] with a small timeout
in a loop until the deadline is reached or all events have been handled.

Note: DragonFlyBSD has not been tested, but will most likely behave like
FreeBSD and OS X.

Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: 220513a3f52e8fa4f4e81b24d2c68f83f92b7e8402374146055da9a8dcdb7f17
User & Date: js on 2015-05-09 18:13:19
Other Links: manifest | tags
Context
2015-05-09
21:29
OFZIPArchive: Remove OFFile requirement check-in: aa1bb213e0 user: js tags: trunk
18:13
More OFKernelEventObserver refactoring check-in: 220513a3f5 user: js tags: trunk
2015-05-08
21:13
OFKernelEventObserver_kqueue: More error checking check-in: 92344de237 user: js tags: trunk
Changes

Modified src/OFKernelEventObserver+Private.h from [0257c37a21] to [dfd89ec8e7].

18
19
20
21
22
23
24
25
26

@interface OFKernelEventObserver (OF_PRIVATE_CATEGORY)
- (void)OF_addObjectForReading: (id)object;
- (void)OF_addObjectForWriting: (id)object;
- (void)OF_removeObjectForReading: (id)object;
- (void)OF_removeObjectForWriting: (id)object;
- (void)OF_processQueueAndStoreRemovedIn: (OFMutableArray*)removed;
- (bool)OF_processReadBuffers;
@end







|

18
19
20
21
22
23
24
25
26

@interface OFKernelEventObserver (OF_PRIVATE_CATEGORY)
- (void)OF_addObjectForReading: (id)object;
- (void)OF_addObjectForWriting: (id)object;
- (void)OF_removeObjectForReading: (id)object;
- (void)OF_removeObjectForWriting: (id)object;
- (void)OF_processQueueAndStoreRemovedIn: (OFMutableArray*)removed;
- (void)OF_processReadBuffers;
@end

Modified src/OFKernelEventObserver.h from [7a17ea14b6] to [c48f1a2944].

198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
- (void)observe;

/*!
 * @brief Observes all objects until an event happens on an object or the
 *	  timeout is reached.
 *
 * @param timeInterval The time to wait for an event, in seconds
 * @return A boolean whether events occurred before returning
 */
- (bool)observeForTimeInterval: (of_time_interval_t)timeInterval;

/*!
 * @brief Observes all objects until an event happens on an object or the
 *	  specified date is reached.
 *
 * @param date The until which to observe
 * @return A boolean whether events occurred before returning
 */
- (bool)observeUntilDate: (OFDate*)date;

/*!
 * @brief Cancels the currently blocking observe call.
 *
 * This is automatically done when a new object is added or removed by another
 * thread, but in some circumstances, it might be desirable for a thread to
 * manually stop the observe running in another thread.







<

|






<

|







198
199
200
201
202
203
204

205
206
207
208
209
210
211
212

213
214
215
216
217
218
219
220
221
- (void)observe;

/*!
 * @brief Observes all objects until an event happens on an object or the
 *	  timeout is reached.
 *
 * @param timeInterval The time to wait for an event, in seconds

 */
- (void)observeForTimeInterval: (of_time_interval_t)timeInterval;

/*!
 * @brief Observes all objects until an event happens on an object or the
 *	  specified date is reached.
 *
 * @param date The until which to observe

 */
- (void)observeUntilDate: (OFDate*)date;

/*!
 * @brief Cancels the currently blocking observe call.
 *
 * This is automatically done when a new object is added or removed by another
 * thread, but in some circumstances, it might be desirable for a thread to
 * manually stop the observe running in another thread.

Modified src/OFKernelEventObserver.m from [80d7681218] to [b099b55bf8].

336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
}

- (void)observe
{
	[self observeForTimeInterval: -1];
}

- (bool)observeForTimeInterval: (of_time_interval_t)timeInterval
{
	OF_UNRECOGNIZED_SELECTOR
}

- (bool)observeUntilDate: (OFDate*)date
{
	return [self observeForTimeInterval: [date timeIntervalSinceNow]];
}

- (void)cancel
{
#ifdef OF_HAVE_PIPE
	OF_ENSURE(write(_cancelFD[1], "", 1) > 0);
#else
	OF_ENSURE(sendto(_cancelFD[1], "", 1, 0, (struct sockaddr*)&_cancelAddr,
	    sizeof(_cancelAddr)) > 0);
#endif
}

- (bool)OF_processReadBuffers
{
	id const *objects = [_readObjects objects];
	size_t i, count = [_readObjects count];
	bool foundInReadBuffer = false;

	for (i = 0; i < count; i++) {
		void *pool = objc_autoreleasePoolPush();

		if ([objects[i] isKindOfClass: [OFStream class]] &&
		    [objects[i] hasDataInReadBuffer] &&
		    ![objects[i] OF_isWaitingForDelimiter]) {
			if ([_delegate respondsToSelector:
			    @selector(objectIsReadyForReading:)])
				[_delegate objectIsReadyForReading: objects[i]];

			foundInReadBuffer = true;
		}

		objc_autoreleasePoolPop(pool);
	}

	/*
	 * As long as we have data in the read buffer for any stream, we don't
	 * want to block.
	 */
	if (foundInReadBuffer)
		return true;

	return false;
}
@end







|




|

|












|



<






|
|
|
|
<
<
<



|
<
<
<
<
<
<
<
<
<

336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366

367
368
369
370
371
372
373
374
375
376



377
378
379
380









381
}

- (void)observe
{
	[self observeForTimeInterval: -1];
}

- (void)observeForTimeInterval: (of_time_interval_t)timeInterval
{
	OF_UNRECOGNIZED_SELECTOR
}

- (void)observeUntilDate: (OFDate*)date
{
	[self observeForTimeInterval: [date timeIntervalSinceNow]];
}

- (void)cancel
{
#ifdef OF_HAVE_PIPE
	OF_ENSURE(write(_cancelFD[1], "", 1) > 0);
#else
	OF_ENSURE(sendto(_cancelFD[1], "", 1, 0, (struct sockaddr*)&_cancelAddr,
	    sizeof(_cancelAddr)) > 0);
#endif
}

- (void)OF_processReadBuffers
{
	id const *objects = [_readObjects objects];
	size_t i, count = [_readObjects count];


	for (i = 0; i < count; i++) {
		void *pool = objc_autoreleasePoolPush();

		if ([objects[i] isKindOfClass: [OFStream class]] &&
		    [objects[i] hasDataInReadBuffer] &&
		    ![objects[i] OF_isWaitingForDelimiter] &&
		    [_delegate respondsToSelector:
		    @selector(objectIsReadyForReading:)])
			[_delegate objectIsReadyForReading: objects[i]];




		objc_autoreleasePoolPop(pool);
	}
}









@end

Modified src/OFKernelEventObserver_epoll.m from [463d24baf5] to [8f646c7ee8].

180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
- (void)OF_removeObjectForWriting: (id)object
{
	[self OF_removeObject: object
	       fileDescriptor: [object fileDescriptorForWriting]
		       events: EPOLLOUT];
}

- (bool)observeForTimeInterval: (of_time_interval_t)timeInterval
{
	OFNull *nullObject = [OFNull null];
	void *pool = objc_autoreleasePoolPush();
	struct epoll_event eventList[EVENTLIST_SIZE];
	int i, events, realEvents = 0;

	[self OF_processQueueAndStoreRemovedIn: nil];

	if ([self OF_processReadBuffers]) {
		objc_autoreleasePoolPop(pool);
		return true;
	}

	objc_autoreleasePoolPop(pool);

	events = epoll_wait(_epfd, eventList, EVENTLIST_SIZE,
	    (timeInterval != -1 ? timeInterval * 1000 : -1));

	if (events < 0)
		return [OFObserveFailedException exceptionWithObserver: self
								 errNo: errno];

	if (events == 0)
		return false;

	for (i = 0; i < events; i++) {
		if (eventList[i].data.ptr == nullObject) {
			char buffer;

			assert(eventList[i].events == EPOLLIN);
			OF_ENSURE(read(_cancelFD[0], &buffer, 1) == 1);

			continue;
		}

		if (eventList[i].events & EPOLLIN) {
			pool = objc_autoreleasePoolPush();

			if ([_delegate respondsToSelector:
			    @selector(objectIsReadyForReading:)])
				[_delegate objectIsReadyForReading:
				    eventList[i].data.ptr];

			realEvents++;

			objc_autoreleasePoolPop(pool);
		}

		if (eventList[i].events & EPOLLOUT) {
			pool = objc_autoreleasePoolPush();

			if ([_delegate respondsToSelector:
			    @selector(objectIsReadyForWriting:)])
				[_delegate objectIsReadyForWriting:
				    eventList[i].data.ptr];

			realEvents++;

			objc_autoreleasePoolPop(pool);
		}

		assert((eventList[i].events & ~(EPOLLIN | EPOLLOUT)) == 0);
	}

	if (realEvents == 0)
		return false;

	return true;
}
@end







|




|


<
|
<
<
<










<
<
<


















<
<











<
<





|
<
<
<
<
<

180
181
182
183
184
185
186
187
188
189
190
191
192
193
194

195



196
197
198
199
200
201
202
203
204
205



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223


224
225
226
227
228
229
230
231
232
233
234


235
236
237
238
239
240





241
- (void)OF_removeObjectForWriting: (id)object
{
	[self OF_removeObject: object
	       fileDescriptor: [object fileDescriptorForWriting]
		       events: EPOLLOUT];
}

- (void)observeForTimeInterval: (of_time_interval_t)timeInterval
{
	OFNull *nullObject = [OFNull null];
	void *pool = objc_autoreleasePoolPush();
	struct epoll_event eventList[EVENTLIST_SIZE];
	int i, events;

	[self OF_processQueueAndStoreRemovedIn: nil];

	[self OF_processReadBuffers];




	objc_autoreleasePoolPop(pool);

	events = epoll_wait(_epfd, eventList, EVENTLIST_SIZE,
	    (timeInterval != -1 ? timeInterval * 1000 : -1));

	if (events < 0)
		return [OFObserveFailedException exceptionWithObserver: self
								 errNo: errno];




	for (i = 0; i < events; i++) {
		if (eventList[i].data.ptr == nullObject) {
			char buffer;

			assert(eventList[i].events == EPOLLIN);
			OF_ENSURE(read(_cancelFD[0], &buffer, 1) == 1);

			continue;
		}

		if (eventList[i].events & EPOLLIN) {
			pool = objc_autoreleasePoolPush();

			if ([_delegate respondsToSelector:
			    @selector(objectIsReadyForReading:)])
				[_delegate objectIsReadyForReading:
				    eventList[i].data.ptr];



			objc_autoreleasePoolPop(pool);
		}

		if (eventList[i].events & EPOLLOUT) {
			pool = objc_autoreleasePoolPush();

			if ([_delegate respondsToSelector:
			    @selector(objectIsReadyForWriting:)])
				[_delegate objectIsReadyForWriting:
				    eventList[i].data.ptr];



			objc_autoreleasePoolPop(pool);
		}

		assert((eventList[i].events & ~(EPOLLIN | EPOLLOUT)) == 0);
	}
}





@end

Modified src/OFKernelEventObserver_kqueue.m from [dac9f53f6d] to [306f8337d7].

144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
	memset(&event, 0, sizeof(event));
	event.ident = [object fileDescriptorForWriting];
	event.filter = EVFILT_WRITE;
	event.flags = EV_DELETE;
	[_changeList addItem: &event];
}

- (bool)observeForTimeInterval: (of_time_interval_t)timeInterval
{
	void *pool = objc_autoreleasePoolPush();
	struct timespec timeout;
	struct kevent eventList[EVENTLIST_SIZE];
	int i, events, realEvents = 0;

	timeout.tv_sec = (time_t)timeInterval;
	timeout.tv_nsec = lrint((timeInterval - timeout.tv_sec) * 1000000000);

	/*
	 * Make sure to keep the streams retained and thus the file descriptors
	 * valid until the actual change has been performed.
	 */
	[self OF_processQueueAndStoreRemovedIn: _removedArray];

	if ([self OF_processReadBuffers]) {
		objc_autoreleasePoolPop(pool);
		return true;
	}

	objc_autoreleasePoolPop(pool);

	events = kevent(_kernelQueue, [_changeList items],
	    (int)[_changeList count], eventList, EVENTLIST_SIZE,
	    (timeInterval != -1 ? &timeout : NULL));

	if (events < 0)
		return [OFObserveFailedException exceptionWithObserver: self
								 errNo: errno];

	[_changeList removeAllItems];
	[_removedArray removeAllObjects];

	if (events == 0)
		return false;

	for (i = 0; i < events; i++) {
		if (eventList[i].flags & EV_ERROR)
			@throw [OFObserveFailedException
			    exceptionWithObserver: self
					    errNo: (int)eventList[i].data];

		if (eventList[i].ident == _cancelFD[0]) {







|




|










|
<
<
<








|





<
<
<







144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167



168
169
170
171
172
173
174
175
176
177
178
179
180
181



182
183
184
185
186
187
188
	memset(&event, 0, sizeof(event));
	event.ident = [object fileDescriptorForWriting];
	event.filter = EVFILT_WRITE;
	event.flags = EV_DELETE;
	[_changeList addItem: &event];
}

- (void)observeForTimeInterval: (of_time_interval_t)timeInterval
{
	void *pool = objc_autoreleasePoolPush();
	struct timespec timeout;
	struct kevent eventList[EVENTLIST_SIZE];
	int i, events;

	timeout.tv_sec = (time_t)timeInterval;
	timeout.tv_nsec = lrint((timeInterval - timeout.tv_sec) * 1000000000);

	/*
	 * Make sure to keep the streams retained and thus the file descriptors
	 * valid until the actual change has been performed.
	 */
	[self OF_processQueueAndStoreRemovedIn: _removedArray];

	[self OF_processReadBuffers];




	objc_autoreleasePoolPop(pool);

	events = kevent(_kernelQueue, [_changeList items],
	    (int)[_changeList count], eventList, EVENTLIST_SIZE,
	    (timeInterval != -1 ? &timeout : NULL));

	if (events < 0)
		@throw [OFObserveFailedException exceptionWithObserver: self
								 errNo: errno];

	[_changeList removeAllItems];
	[_removedArray removeAllObjects];




	for (i = 0; i < events; i++) {
		if (eventList[i].flags & EV_ERROR)
			@throw [OFObserveFailedException
			    exceptionWithObserver: self
					    errNo: (int)eventList[i].data];

		if (eventList[i].ident == _cancelFD[0]) {
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
				    (id)eventList[i].udata];
			break;
		default:
			assert(0);
		}

		objc_autoreleasePoolPop(pool);

		realEvents++;
	}

	if (realEvents == 0)
		return false;

	return true;
}
@end







|
<
|
<
<
<
<
<
<

210
211
212
213
214
215
216
217

218






219
				    (id)eventList[i].udata];
			break;
		default:
			assert(0);
		}

		objc_autoreleasePoolPop(pool);
	}

}






@end

Modified src/OFKernelEventObserver_poll.m from [b1989e4bd7] to [6db686d146].

148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
- (void)OF_removeObjectForWriting: (id)object
{
	[self OF_removeObject: object
	       fileDescriptor: [object fileDescriptorForWriting]
		       events: POLLOUT];
}

- (bool)observeForTimeInterval: (of_time_interval_t)timeInterval
{
	void *pool = objc_autoreleasePoolPush();
	struct pollfd *FDs;
	int events;
	size_t i, nFDs, realEvents = 0;

	[self OF_processQueueAndStoreRemovedIn: nil];

	if ([self OF_processReadBuffers]) {
		objc_autoreleasePoolPop(pool);
		return true;
	}

	objc_autoreleasePoolPop(pool);

	FDs = [_FDs items];
	nFDs = [_FDs count];

#ifdef OPEN_MAX
	if (nFDs > OPEN_MAX)
		@throw [OFOutOfRangeException exception];
#endif

	events = poll(FDs, (nfds_t)nFDs,
	    (int)(timeInterval != -1 ? timeInterval * 1000 : -1));

	if (events < 0)
		@throw [OFObserveFailedException exceptionWithObserver: self
								 errNo: errno];

	if (events == 0)
		return false;

	for (i = 0; i < nFDs; i++) {
		if (FDs[i].fd > _maxFD)
			@throw [OFOutOfRangeException exception];

		if (FDs[i].revents & POLLIN) {
			if (FDs[i].fd == _cancelFD[0]) {
				char buffer;







|




|


<
|
<
<
<


















<
<
<







148
149
150
151
152
153
154
155
156
157
158
159
160
161
162

163



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181



182
183
184
185
186
187
188
- (void)OF_removeObjectForWriting: (id)object
{
	[self OF_removeObject: object
	       fileDescriptor: [object fileDescriptorForWriting]
		       events: POLLOUT];
}

- (void)observeForTimeInterval: (of_time_interval_t)timeInterval
{
	void *pool = objc_autoreleasePoolPush();
	struct pollfd *FDs;
	int events;
	size_t i, nFDs;

	[self OF_processQueueAndStoreRemovedIn: nil];

	[self OF_processReadBuffers];




	objc_autoreleasePoolPop(pool);

	FDs = [_FDs items];
	nFDs = [_FDs count];

#ifdef OPEN_MAX
	if (nFDs > OPEN_MAX)
		@throw [OFOutOfRangeException exception];
#endif

	events = poll(FDs, (nfds_t)nFDs,
	    (int)(timeInterval != -1 ? timeInterval * 1000 : -1));

	if (events < 0)
		@throw [OFObserveFailedException exceptionWithObserver: self
								 errNo: errno];




	for (i = 0; i < nFDs; i++) {
		if (FDs[i].fd > _maxFD)
			@throw [OFOutOfRangeException exception];

		if (FDs[i].revents & POLLIN) {
			if (FDs[i].fd == _cancelFD[0]) {
				char buffer;
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236

			if ([_delegate respondsToSelector:
			    @selector(objectIsReadyForReading:)])
				[_delegate objectIsReadyForReading:
				    _FDToObject[FDs[i].fd]];

			objc_autoreleasePoolPop(pool);

			realEvents++;
		}

		if (FDs[i].revents & POLLOUT) {
			pool = objc_autoreleasePoolPush();

			if ([_delegate respondsToSelector:
			    @selector(objectIsReadyForWriting:)])
				[_delegate objectIsReadyForWriting:
				    _FDToObject[FDs[i].fd]];

			objc_autoreleasePoolPop(pool);

			realEvents++;
		}

		FDs[i].revents = 0;
	}

	if (realEvents == 0)
		return false;

	return true;
}
@end







<
<











<
<




|
<
<
<
<
<

197
198
199
200
201
202
203


204
205
206
207
208
209
210
211
212
213
214


215
216
217
218
219





220

			if ([_delegate respondsToSelector:
			    @selector(objectIsReadyForReading:)])
				[_delegate objectIsReadyForReading:
				    _FDToObject[FDs[i].fd]];

			objc_autoreleasePoolPop(pool);


		}

		if (FDs[i].revents & POLLOUT) {
			pool = objc_autoreleasePoolPush();

			if ([_delegate respondsToSelector:
			    @selector(objectIsReadyForWriting:)])
				[_delegate objectIsReadyForWriting:
				    _FDToObject[FDs[i].fd]];

			objc_autoreleasePoolPop(pool);


		}

		FDs[i].revents = 0;
	}
}





@end

Modified src/OFKernelEventObserver_select.m from [9c1fba606a] to [331bf675cd].

115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144

	if (fd >= FD_SETSIZE)
		@throw [OFOutOfRangeException exception];

	FD_CLR(fd, &_writeFDs);
}

- (bool)observeForTimeInterval: (of_time_interval_t)timeInterval
{
	void *pool = objc_autoreleasePoolPush();
	id const *objects;
	fd_set readFDs;
	fd_set writeFDs;
	struct timeval timeout;
	int events;
	size_t i, count, realEvents = 0;

	[self OF_processQueueAndStoreRemovedIn: nil];

	if ([self OF_processReadBuffers]) {
		objc_autoreleasePoolPop(pool);
		return true;
	}

	objc_autoreleasePoolPop(pool);

#ifdef FD_COPY
	FD_COPY(&_readFDs, &readFDs);
	FD_COPY(&_writeFDs, &writeFDs);
#else







|







|


<
|
<
<
<







115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132

133



134
135
136
137
138
139
140

	if (fd >= FD_SETSIZE)
		@throw [OFOutOfRangeException exception];

	FD_CLR(fd, &_writeFDs);
}

- (void)observeForTimeInterval: (of_time_interval_t)timeInterval
{
	void *pool = objc_autoreleasePoolPush();
	id const *objects;
	fd_set readFDs;
	fd_set writeFDs;
	struct timeval timeout;
	int events;
	size_t i, count;

	[self OF_processQueueAndStoreRemovedIn: nil];

	[self OF_processReadBuffers];




	objc_autoreleasePoolPop(pool);

#ifdef FD_COPY
	FD_COPY(&_readFDs, &readFDs);
	FD_COPY(&_writeFDs, &writeFDs);
#else
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
	events = select(_maxFD + 1, &readFDs, &writeFDs, NULL,
	    (timeInterval != -1 ? &timeout : NULL));

	if (events < 0)
		@throw [OFObserveFailedException exceptionWithObserver: self
								 errNo: errno];

	if (events == 0)
		return false;

	if (FD_ISSET(_cancelFD[0], &readFDs)) {
		char buffer;

#ifndef _WIN32
		OF_ENSURE(read(_cancelFD[0], &buffer, 1) == 1);
#else
		OF_ENSURE(recvfrom(_cancelFD[0], &buffer, 1, 0, NULL,
		    NULL) == 1);
#endif
	}

	objects = [_readObjects objects];
	count = [_readObjects count];

	for (i = 0; i < count; i++) {
		int fd = [objects[i] fileDescriptorForReading];

		pool = objc_autoreleasePoolPush();

		if (FD_ISSET(fd, &readFDs)) {
			if ([_delegate respondsToSelector:
			    @selector(objectIsReadyForReading:)])
				[_delegate objectIsReadyForReading: objects[i]];

			realEvents++;
		}

		objc_autoreleasePoolPop(pool);
	}

	objects = [_writeObjects objects];
	count = [_writeObjects count];

	for (i = 0; i < count; i++) {
		int fd = [objects[i] fileDescriptorForWriting];

		pool = objc_autoreleasePoolPush();

		if (FD_ISSET(fd, &writeFDs)) {
			if ([_delegate respondsToSelector:
			    @selector(objectIsReadyForWriting:)])
				[_delegate objectIsReadyForWriting: objects[i]];

			realEvents++;
		}

		objc_autoreleasePoolPop(pool);
	}

	if (realEvents == 0)
		return false;

	return true;
}
@end







<
<
<



















|
<
|
|
<
<
<












|
<
|
|
<
<
<



|
<
<
<
<
<

158
159
160
161
162
163
164



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184

185
186



187
188
189
190
191
192
193
194
195
196
197
198
199

200
201



202
203
204
205





206
	events = select(_maxFD + 1, &readFDs, &writeFDs, NULL,
	    (timeInterval != -1 ? &timeout : NULL));

	if (events < 0)
		@throw [OFObserveFailedException exceptionWithObserver: self
								 errNo: errno];




	if (FD_ISSET(_cancelFD[0], &readFDs)) {
		char buffer;

#ifndef _WIN32
		OF_ENSURE(read(_cancelFD[0], &buffer, 1) == 1);
#else
		OF_ENSURE(recvfrom(_cancelFD[0], &buffer, 1, 0, NULL,
		    NULL) == 1);
#endif
	}

	objects = [_readObjects objects];
	count = [_readObjects count];

	for (i = 0; i < count; i++) {
		int fd = [objects[i] fileDescriptorForReading];

		pool = objc_autoreleasePoolPush();

		if (FD_ISSET(fd, &readFDs) && [_delegate respondsToSelector:

		    @selector(objectIsReadyForReading:)])
			[_delegate objectIsReadyForReading: objects[i]];




		objc_autoreleasePoolPop(pool);
	}

	objects = [_writeObjects objects];
	count = [_writeObjects count];

	for (i = 0; i < count; i++) {
		int fd = [objects[i] fileDescriptorForWriting];

		pool = objc_autoreleasePoolPush();

		if (FD_ISSET(fd, &writeFDs) && [_delegate respondsToSelector:

		    @selector(objectIsReadyForWriting:)])
			[_delegate objectIsReadyForWriting: objects[i]];




		objc_autoreleasePoolPop(pool);
	}
}





@end

Modified tests/OFKernelEventObserverTests.m from [c839b85266] to [8dbc611e84].

14
15
16
17
18
19
20

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38


39





40

41
42
43
44
45
46
47
48
49
50










































































51
52
53

54


55







56
57
58

59






60

61




62




63

64



65



66
67



68



69
70


71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
 * file.
 */

#include "config.h"

#import "OFKernelEventObserver.h"
#import "OFString.h"

#import "OFTCPSocket.h"
#import "OFAutoreleasePool.h"

#if defined(HAVE_SYS_SELECT_H) || defined(_WIN32)
# import "OFKernelEventObserver_select.h"
#endif
#if defined(HAVE_POLL_H) || defined(__wii__)
# import "OFKernelEventObserver_poll.h"
#endif
#ifdef HAVE_EPOLL
# import "OFKernelEventObserver_epoll.h"
#endif
#ifdef HAVE_KQUEUE
# import "OFKernelEventObserver_kqueue.h"
#endif

#import "TestsAppDelegate.h"



static OFString *module;





static OFKernelEventObserver *observer;

static int events;
static id expectedObject;
static bool readData, expectEOS;
static OFTCPSocket *accepted;

@interface ObserverDelegate: OFObject
- (void)objectIsReadyForReading: (id)object;
@end

@implementation ObserverDelegate










































































- (void)objectIsReadyForReading: (id)object
{
	events++;




	OF_ENSURE(object == expectedObject);








	if ([object isListening]) {
		accepted = [[object accept] retain];








		[accepted writeBuffer: "0"

			       length: 1];




	} else if (readData) {




		char buf;





		if (expectEOS)



			OF_ENSURE([object readIntoBuffer: &buf
						  length: 1] == 0);



		else {



			OF_ENSURE([object readIntoBuffer: &buf
						  length: 1] == 1);


			OF_ENSURE(buf == '0');
		}
	}
}
@end

@implementation TestsAppDelegate (OFKernelEventObserverTests)
- (void)kernelEventObserverTestsWithClass: (Class)class
{
	ObserverDelegate *delegate =
	    [[[ObserverDelegate alloc] init] autorelease];
	OFTCPSocket *sock1 = [OFTCPSocket socket];
	OFTCPSocket *sock2 = [OFTCPSocket socket];
	uint16_t port;

	module = [class className];
	events = 0;
	expectedObject = nil;
	readData = expectEOS = false;
	accepted = nil;

	port = [sock1 bindToHost: @"127.0.0.1"
			    port: 0];
	[sock1 listen];

	TEST(@"+[observer]",
	    (observer = [class observer]) &&
	    R([observer setDelegate: delegate]))

	TEST(@"-[addObjectForReading:]",
	    R([observer addObjectForReading: sock1]))

	[sock2 connectToHost: @"127.0.0.1"
			port: port];
	TEST(@"-[observe] waiting for connection",
	    (expectedObject = sock1) &&
	    [observer observeForTimeInterval: 0.01])
	[accepted autorelease];

	TEST(@"-[observe] waiting for data",
	    (expectedObject = sock2) &&
	    R([observer addObjectForReading: sock2]) &&
	    [observer observeForTimeInterval: 0.01])

	TEST(@"-[observe] keeping event until read",
	    R(readData = true) && [observer observeForTimeInterval: 0.01])

	TEST(@"-[observe] time out due to no events",
	    R(readData = false) && ![observer observeForTimeInterval: 0.01])

	[accepted close];
	TEST(@"-[observe] closed connection",
	    R(readData = true) && R(expectEOS = true) &&
	    [observer observeForTimeInterval: 0.01])

	TEST(@"-[observe] correct number of events", events == 4)
}

- (void)kernelEventObserverTests
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];

#if defined(HAVE_SYS_SELECT_H) || defined(_WIN32)







>


















>
>

>
>
>
>
>
|
>
|
<
<
<
|
|
|


|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


<
>

>
>
|
>
>
>
>
>
>
>

<
|
>

>
>
>
>
>
>
|
>
|
>
>
>
>
|
>
>
>
>
|
>

>
>
>
|
>
>
>
|
|
>
>
>
|
>
>
>
|
<
>
>
|
<







|
<
<
<
<


|
<
<
<
|
<
<
<


|
|


|

<
<
<
<
<
<
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132

133
134
135
136
137
138
139
140
141
142
143
144
145

146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186

187
188
189

190
191
192
193
194
195
196
197




198
199
200



201



202
203
204
205
206
207
208
209






210

















211
212
213
214
215
216
217
 * file.
 */

#include "config.h"

#import "OFKernelEventObserver.h"
#import "OFString.h"
#import "OFDate.h"
#import "OFTCPSocket.h"
#import "OFAutoreleasePool.h"

#if defined(HAVE_SYS_SELECT_H) || defined(_WIN32)
# import "OFKernelEventObserver_select.h"
#endif
#if defined(HAVE_POLL_H) || defined(__wii__)
# import "OFKernelEventObserver_poll.h"
#endif
#ifdef HAVE_EPOLL
# import "OFKernelEventObserver_epoll.h"
#endif
#ifdef HAVE_KQUEUE
# import "OFKernelEventObserver_kqueue.h"
#endif

#import "TestsAppDelegate.h"

#define EXPECTED_EVENTS 3

static OFString *module;

@interface ObserverTest: OFObject
{
@public
	TestsAppDelegate *_testsAppDelegate;
	OFKernelEventObserver *_observer;
	OFTCPSocket *_server, *_client, *_accepted;
	size_t _events;



}

- (void)run;
@end

@implementation ObserverTest
- initWithTestsAppDelegate: (TestsAppDelegate*)testsAppDelegate
{
	self = [super init];

	@try {
		uint16_t port;

		_testsAppDelegate = testsAppDelegate;

		_server = [[OFTCPSocket alloc] init];
		port = [_server bindToHost: @"127.0.0.1"
				      port: 0];
		[_server listen];

		_client = [[OFTCPSocket alloc] init];
		[_client connectToHost: @"127.0.0.1"
				  port: port];

		[_client writeBuffer: "0"
			      length: 1];
	} @catch (id e) {
		[self release];
		@throw e;
	}

	return self;
}

- (void)dealloc
{
	[_server release];
	[_client release];
	[_accepted release];

	[super dealloc];
}

- (void)run
{
	OFDate *deadline;
	bool deadlineExceeded = false;

	[_testsAppDelegate outputTesting: @"-[observe] with listening socket"
				inModule: module];

	deadline = [OFDate dateWithTimeIntervalSinceNow: 1];
	while (_events < EXPECTED_EVENTS) {
		if ([deadline timeIntervalSinceNow] < 0) {
			deadlineExceeded = true;
			break;
		}

		[_observer observeForTimeInterval: 0.01];
	}

	if (!deadlineExceeded)
		[_testsAppDelegate
		    outputSuccess: @"-[observe] not exceeding deadline"
			 inModule: module];
	else
		[_testsAppDelegate
		    outputFailure: @"-[observe] not exceeding deadline"
			 inModule: module];

	if (_events == EXPECTED_EVENTS)
		[_testsAppDelegate
		    outputSuccess: @"-[observe] handling all events"
			 inModule: module];
	else
		[_testsAppDelegate
		    outputFailure: @"-[observe] handling all events"
			 inModule: module];
}

- (void)objectIsReadyForReading: (id)object
{

	char buf;

	switch (_events++) {
	case 0:
		if (object == _server)
			[_testsAppDelegate
			    outputSuccess: @"-[observe] with listening socket"
				 inModule: module];
		else
			[_testsAppDelegate
			    outputFailure: @"-[observe] with listening socket"
				 inModule: module];


		_accepted = [[object accept] retain];
		[_observer addObjectForReading: _accepted];

		[_testsAppDelegate
		    outputTesting: @"-[observe] with data to read available"
			 inModule: module];

		break;
	case 1:
		if (object == _accepted &&
		    [object readIntoBuffer: &buf
				    length: 1] == 1 && buf == '0')
			[_testsAppDelegate
			    outputSuccess: @"-[observe] with data to read "
					   @"available"
				 inModule: module];
		else
			[_testsAppDelegate
			    outputFailure: @"-[observe] with data to read "
					   @"available"
				 inModule: module];

		[_client close];

		[_testsAppDelegate
		    outputTesting: @"-[observe] with closed connection"
			 inModule: module];

		break;
	case 2:
		if (object == _accepted &&
		    [object readIntoBuffer: &buf
				    length: 1] == 0)
			[_testsAppDelegate
			    outputSuccess: @"-[observe] with closed connection"
				 inModule: module];
		else
			[_testsAppDelegate
			    outputFailure: @"-[observe] with closed connection"
				 inModule: module];


		break;
	default:
		OF_ENSURE(0);

	}
}
@end

@implementation TestsAppDelegate (OFKernelEventObserverTests)
- (void)kernelEventObserverTestsWithClass: (Class)class
{
	ObserverTest *test;





	module = [class className];
	test = [[[ObserverTest alloc]



	    initWithTestsAppDelegate: self] autorelease];




	TEST(@"+[observer]",
	    (test->_observer = [OFKernelEventObserver observer]))
	[test->_observer setDelegate: test];

	TEST(@"-[addObjectForReading:]",
	    R([test->_observer addObjectForReading: test->_server]))







	[test run];

















}

- (void)kernelEventObserverTests
{
	OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];

#if defined(HAVE_SYS_SELECT_H) || defined(_WIN32)