ObjFW  Check-in [45518ae7b7]

Overview
Comment:Use the locked queue for kqueue and epoll as well

_readObjects must only be changed from the thread running the observer
and not from a thread adding or removing objects to observe. This is
already handled by the locked queue used by poll and select, so the best
way to solve this is to use the locked queue for kqueue and epoll as
well.

Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: 45518ae7b78abd64e88a0a884cc344de6d804c16641444bda0d0cd13de2ae906
User & Date: js on 2016-03-20 14:07:47
Other Links: manifest | tags
Context
2016-03-20
14:58
OFKernelEventObserverTests: Properly count fails check-in: b5277f0444 user: js tags: trunk
14:07
Use the locked queue for kqueue and epoll as well check-in: 45518ae7b7 user: js tags: trunk
11:57
Never block when the read buffer is non-empty check-in: 7ae17af9f0 user: js tags: trunk
Changes

Modified src/Makefile from [2bb467052c] to [d6880e07e2].

88
89
90
91
92
93
94
95
96
97
98
99





100
101
102
103
104




105
106
107
108
109
110
111
88
89
90
91
92
93
94





95
96
97
98
99





100
101
102
103
104
105
106
107
108
109
110







-
-
-
-
-
+
+
+
+
+
-
-
-
-
-
+
+
+
+







	     OFFileManager.m		\
	     OFINICategory.m		\
	     OFINIFile.m		\
	     OFSettings.m		\
	     OFZIPArchive.m		\
	     OFZIPArchiveEntry.m
SRCS_PLUGINS = OFPlugin.m
SRCS_SOCKETS = OFHTTPClient.m				\
	       OFHTTPRequest.m				\
	       OFHTTPResponse.m				\
	       OFHTTPServer.m				\
	       OFKernelEventObserver.m			\
SRCS_SOCKETS = OFHTTPClient.m			\
	       OFHTTPRequest.m			\
	       OFHTTPResponse.m			\
	       OFHTTPServer.m			\
	       OFKernelEventObserver.m		\
	       OFKernelEventObserver_LockedQueue.m	\
	       OFStreamSocket.m				\
	       OFTCPSocket.m				\
	       OFUDPSocket.m				\
	       resolver.m				\
	       OFStreamSocket.m			\
	       OFTCPSocket.m			\
	       OFUDPSocket.m			\
	       resolver.m			\
	       socket.m
SRCS_THREADS = OFCondition.m		\
	       OFMutex.m		\
	       OFRecursiveMutex.m	\
	       OFThreadPool.m		\
	       threading.m

Modified src/OFKernelEventObserver+Private.h from [eb360b3361] to [115d733df8].

15
16
17
18
19
20
21





22
23
24
25
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30







+
+
+
+
+




 */

#import "OFKernelEventObserver.h"

OF_ASSUME_NONNULL_BEGIN

@interface OFKernelEventObserver ()
- (void)OF_addObjectForReading: (id <OFReadyForReadingObserving>)object;
- (void)OF_addObjectForWriting: (id <OFReadyForWritingObserving>)object;
- (void)OF_removeObjectForReading: (id <OFReadyForReadingObserving>)object;
- (void)OF_removeObjectForWriting: (id <OFReadyForWritingObserving>)object;
- (void)OF_processQueue;
- (bool)OF_processReadBuffers;
@end

OF_ASSUME_NONNULL_END

Modified src/OFKernelEventObserver.h from [a975466b61] to [dd46f03e3d].

21
22
23
24
25
26
27

28
29
30
31
32
33
34
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35







+







OF_ASSUME_NONNULL_BEGIN

@class OFMutableArray OF_GENERIC(ObjectType);
@class OFDate;
#ifdef OF_HAVE_THREADS
@class OFMutex;
#endif
@class OFDataArray;

/*!
 * @protocol OFKernelEventObserverDelegate
 *	     OFKernelEventObserver.h ObjFW/OFKernelEventObserver.h
 *
 * @brief A protocol that needs to be implemented by delegates for
 *	  OFKernelEventObserver.
116
117
118
119
120
121
122


123
124
125
126
127
128
129
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132







+
+







#else
	of_socket_t _cancelFD[2];
	struct sockaddr_in _cancelAddr;
#endif
#ifdef OF_HAVE_THREADS
	OFMutex *_mutex;
#endif
	OFDataArray *_queueActions;
	OFMutableArray *_queueObjects;
}

/*!
 * The delegate for the OFKernelEventObserver.
 */
@property OF_NULLABLE_PROPERTY (assign)
    id <OFKernelEventObserverDelegate> delegate;

Modified src/OFKernelEventObserver.m from [1613a72d1b] to [e992bd3e10].

17
18
19
20
21
22
23

24
25
26
27
28
29
30
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31







+







#define __NO_EXT_QNX

#include "config.h"

#import "OFKernelEventObserver.h"
#import "OFKernelEventObserver+Private.h"
#import "OFArray.h"
#import "OFDataArray.h"
#import "OFStream.h"
#import "OFStream+Private.h"
#ifndef OF_HAVE_PIPE
# import "OFStreamSocket.h"
#endif
#import "OFDate.h"
#ifdef OF_HAVE_THREADS
47
48
49
50
51
52
53








54
55
56
57
58
59
60
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69







+
+
+
+
+
+
+
+







#import "OFInitializationFailedException.h"
#import "OFInvalidArgumentException.h"
#import "OFOutOfRangeException.h"

#import "socket.h"
#import "socket_helpers.h"

enum {
	QUEUE_ADD = 0,
	QUEUE_REMOVE = 1,
	QUEUE_READ = 0,
	QUEUE_WRITE = 2
};
#define QUEUE_ACTION (QUEUE_ADD | QUEUE_REMOVE)

@implementation OFKernelEventObserver
@synthesize delegate = _delegate;

+ (void)initialize
{
	if (self != [OFKernelEventObserver class])
		return;
133
134
135
136
137
138
139




140
141
142
143
144
145
146
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159







+
+
+
+







			    exceptionWithClass: [self class]];
# endif
#endif

#ifdef OF_HAVE_THREADS
		_mutex = [[OFMutex alloc] init];
#endif

		_queueActions = [[OFDataArray alloc]
		    initWithItemSize: sizeof(int)];
		_queueObjects = [[OFMutableArray alloc] init];
	} @catch (id e) {
		[self release];
		@throw e;
	}

	return self;
}
158
159
160
161
162
163
164



165
166
167
168
169





170
171








172



173
174





175
176








177



178
179





180










181
182
183
184



















185
186



















































































































187
188
189
190
191
192
193
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190


191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209


210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228

229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385







+
+
+





+
+
+
+
+
-
-
+
+
+
+
+
+
+
+

+
+
+


+
+
+
+
+
-
-
+
+
+
+
+
+
+
+

+
+
+


+
+
+
+
+
-
+
+
+
+
+
+
+
+
+
+




+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+


+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







	[_readObjects release];
	[_writeObjects release];

#ifdef OF_HAVE_THREADS
	[_mutex release];
#endif

	[_queueActions release];
	[_queueObjects release];

	[super dealloc];
}

- (void)addObjectForReading: (id <OFReadyForReadingObserving>)object
{
#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		int action = QUEUE_ADD | QUEUE_READ;
	OF_UNRECOGNIZED_SELECTOR
}

		[_queueActions addItem: &action];
		[_queueObjects addObject: object];
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}
#endif

	[self cancel];
}

- (void)addObjectForWriting: (id <OFReadyForWritingObserving>)object
{
#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		int action = QUEUE_ADD | QUEUE_WRITE;
	OF_UNRECOGNIZED_SELECTOR
}

		[_queueActions addItem: &action];
		[_queueObjects addObject: object];
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}
#endif

	[self cancel];
}

- (void)removeObjectForReading: (id <OFReadyForReadingObserving>)object
{
#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		int action = QUEUE_REMOVE | QUEUE_READ;
	OF_UNRECOGNIZED_SELECTOR

		[_queueActions addItem: &action];
		[_queueObjects addObject: object];
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}
#endif

	[self cancel];
}

- (void)removeObjectForWriting: (id <OFReadyForWritingObserving>)object
{
#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		int action = QUEUE_REMOVE | QUEUE_WRITE;

		[_queueActions addItem: &action];
		[_queueObjects addObject: object];
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}
#endif

	[self cancel];
}

- (void)OF_addObjectForReading: (id <OFReadyForReadingObserving>)object
{
	OF_UNRECOGNIZED_SELECTOR
}

- (void)OF_addObjectForWriting: (id <OFReadyForWritingObserving>)object
{
	OF_UNRECOGNIZED_SELECTOR
}

- (void)OF_removeObjectForReading: (id <OFReadyForReadingObserving>)object
{
	OF_UNRECOGNIZED_SELECTOR
}

- (void)OF_removeObjectForWriting: (id <OFReadyForWritingObserving>)object
{
	OF_UNRECOGNIZED_SELECTOR
}

- (void)OF_processQueue
{
	void *pool = objc_autoreleasePoolPush();

#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		int *queueActions = [_queueActions items];
		id const *queueObjects = [_queueObjects objects];
		size_t count = [_queueActions count];

		OF_ENSURE([_queueObjects count] == count);

		for (size_t i = 0; i < count; i++) {
			int action = queueActions[i];
			id object = queueObjects[i];

			switch (action) {
			case QUEUE_ADD | QUEUE_READ:
				[_readObjects addObject: object];

				@try {
					[self OF_addObjectForReading: object];
				} @catch (id e) {
					[_readObjects
					    removeObjectIdenticalTo: object];

					@throw e;
				}

				break;
			case QUEUE_ADD | QUEUE_WRITE:
				[_writeObjects addObject: object];

				@try {
					[self OF_addObjectForWriting: object];
				} @catch (id e) {
					[_writeObjects
					    removeObjectIdenticalTo: object];

					@throw e;
				}

				break;
			case QUEUE_REMOVE | QUEUE_READ:
				[self OF_removeObjectForReading: object];

				[_readObjects removeObjectIdenticalTo: object];

				break;
			case QUEUE_REMOVE | QUEUE_WRITE:
				[self OF_removeObjectForWriting: object];

				[_writeObjects removeObjectIdenticalTo: object];

				break;
			default:
				OF_ENSURE(0);
			}
		}

		[_queueActions removeAllItems];
		[_queueObjects removeAllObjects];
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}
#endif

	objc_autoreleasePoolPop(pool);
}

- (bool)OF_processReadBuffers
{
	bool foundInReadBuffer = false;

	for (id object in _readObjects) {
		void *pool = objc_autoreleasePoolPush();

		if ([object isKindOfClass: [OFStream class]] &&
		    [object hasDataInReadBuffer] &&
		    ![object OF_isWaitingForDelimiter]) {
			if ([_delegate respondsToSelector:
			    @selector(objectIsReadyForReading:)])
				[_delegate objectIsReadyForReading: object];

			foundInReadBuffer = true;
		}

		objc_autoreleasePoolPop(pool);
	}

	/*
	 * As long as we have data in the read buffer for any stream, we don't
	 * want to block.
	 */
	return foundInReadBuffer;
}

- (void)observe
{
	[self observeForTimeInterval: -1];
}

- (void)observeForTimeInterval: (of_time_interval_t)timeInterval
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
402
403
404
405
406
407
408



























409







-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-

	    (struct sockaddr*)&_cancelAddr, sizeof(_cancelAddr)) > 0);
# else
	OF_ENSURE(sendto(_cancelFD[1], "", 1, 0,
	    (struct sockaddr*)&_cancelAddr, 8) > 0);
# endif
#endif
}

- (bool)OF_processReadBuffers
{
	bool foundInReadBuffer = false;

	for (id object in _readObjects) {
		void *pool = objc_autoreleasePoolPush();

		if ([object isKindOfClass: [OFStream class]] &&
		    [object hasDataInReadBuffer] &&
		    ![object OF_isWaitingForDelimiter]) {
			if ([_delegate respondsToSelector:
			    @selector(objectIsReadyForReading:)])
				[_delegate objectIsReadyForReading: object];

			foundInReadBuffer = true;
		}

		objc_autoreleasePoolPop(pool);
	}

	/*
	 * As long as we have data in the read buffer for any stream, we don't
	 * want to block.
	 */
	return foundInReadBuffer;
}
@end

Deleted src/OFKernelEventObserver_LockedQueue.h version [67a5a5872e].

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41









































-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
/*
 * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016
 *   Jonathan Schleifer <js@heap.zone>
 *
 * All rights reserved.
 *
 * This file is part of ObjFW. It may be distributed under the terms of the
 * Q Public License 1.0, which can be found in the file LICENSE.QPL included in
 * the packaging of this file.
 *
 * Alternatively, it may be distributed under the terms of the GNU General
 * Public License, either version 2 or 3, which can be found in the file
 * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this
 * file.
 */

#import "OFKernelEventObserver.h"

OF_ASSUME_NONNULL_BEGIN

@class OFMutableArray OF_GENERIC(ObjectType);
@class OFDataArray;

@interface OFKernelEventObserver_LockedQueue: OFKernelEventObserver
{
	OFDataArray *_queueActions, *_queueFDs;
	OFMutableArray *_queueObjects;
}

- (void)OF_addObjectForReading: (id)object
		fileDescriptor: (int)fd;
- (void)OF_addObjectForWriting: (id)object
		fileDescriptor: (int)fd;
- (void)OF_removeObjectForReading: (id)object
		   fileDescriptor: (int)fd;
- (void)OF_removeObjectForWriting: (id)object
		   fileDescriptor: (int)fd;
- (void)OF_processQueue;
@end

OF_ASSUME_NONNULL_END

Deleted src/OFKernelEventObserver_LockedQueue.m version [1dd69e1924].

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237













































































































































































































































-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
/*
 * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016
 *   Jonathan Schleifer <js@heap.zone>
 *
 * All rights reserved.
 *
 * This file is part of ObjFW. It may be distributed under the terms of the
 * Q Public License 1.0, which can be found in the file LICENSE.QPL included in
 * the packaging of this file.
 *
 * Alternatively, it may be distributed under the terms of the GNU General
 * Public License, either version 2 or 3, which can be found in the file
 * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this
 * file.
 */

#include "config.h"

#import "OFKernelEventObserver_LockedQueue.h"
#import "OFArray.h"
#import "OFDataArray.h"
#ifdef OF_HAVE_THREADS
# import "OFMutex.h"
#endif

#import "OFInitializationFailedException.h"

enum {
	QUEUE_ADD = 0,
	QUEUE_REMOVE = 1,
	QUEUE_READ = 0,
	QUEUE_WRITE = 2
};
#define QUEUE_ACTION (QUEUE_ADD | QUEUE_REMOVE)

@implementation OFKernelEventObserver_LockedQueue
- init
{
	self = [super init];

	@try {
		_queueActions = [[OFDataArray alloc]
		    initWithItemSize: sizeof(int)];
		_queueFDs = [[OFDataArray alloc] initWithItemSize: sizeof(int)];
		_queueObjects = [[OFMutableArray alloc] init];
	} @catch (id e) {
		@throw [OFInitializationFailedException
		    exceptionWithClass: [self class]];
	}

	return self;
}

- (void)dealloc
{
	[_queueActions release];
	[_queueFDs release];
	[_queueObjects release];

	[super dealloc];
}

- (void)addObjectForReading: (id <OFReadyForReadingObserving>)object
{
#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		int qi = QUEUE_ADD | QUEUE_READ;
		int fd = [object fileDescriptorForReading];

		[_queueActions addItem: &qi];
		[_queueFDs addItem: &fd];
		[_queueObjects addObject: object];
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}
#endif

	[self cancel];
}

- (void)addObjectForWriting: (id <OFReadyForWritingObserving>)object
{
#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		int qi = QUEUE_ADD | QUEUE_WRITE;
		int fd = [object fileDescriptorForWriting];

		[_queueActions addItem: &qi];
		[_queueFDs addItem: &fd];
		[_queueObjects addObject: object];
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}
#endif

	[self cancel];
}

- (void)removeObjectForReading: (id <OFReadyForReadingObserving>)object
{
#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		int qi = QUEUE_REMOVE | QUEUE_READ;
		int fd = [object fileDescriptorForReading];

		[_queueActions addItem: &qi];
		[_queueFDs addItem: &fd];
		[_queueObjects addObject: object];
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}
#endif

	[self cancel];
}

- (void)removeObjectForWriting: (id <OFReadyForWritingObserving>)object
{
#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		int qi = QUEUE_REMOVE | QUEUE_WRITE;
		int fd = [object fileDescriptorForWriting];

		[_queueActions addItem: &qi];
		[_queueFDs addItem: &fd];
		[_queueObjects addObject: object];
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}
#endif

	[self cancel];
}

- (void)OF_addObjectForReading: (id)object
		fileDescriptor: (int)fd
{
	OF_UNRECOGNIZED_SELECTOR
}

- (void)OF_addObjectForWriting: (id)object
		fileDescriptor: (int)fd
{
	OF_UNRECOGNIZED_SELECTOR
}

- (void)OF_removeObjectForReading: (id)object
		   fileDescriptor: (int)fd
{
	OF_UNRECOGNIZED_SELECTOR
}

- (void)OF_removeObjectForWriting: (id)object
		   fileDescriptor: (int)fd
{
	OF_UNRECOGNIZED_SELECTOR
}

- (void)OF_processQueue
{
	void *pool = objc_autoreleasePoolPush();

#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		int *queueActions = [_queueActions items];
		int *queueFDs = [_queueFDs items];
		id const *queueObjects = [_queueObjects objects];
		size_t count = [_queueActions count];

		OF_ENSURE([_queueFDs count] == count);
		OF_ENSURE([_queueObjects count] == count);

		for (size_t i = 0; i < count; i++) {
			int action = queueActions[i];
			int fd = queueFDs[i];
			id object = queueObjects[i];

			switch (action) {
			case QUEUE_ADD | QUEUE_READ:
				[_readObjects addObject: object];

				[self OF_addObjectForReading: object
					      fileDescriptor: fd];

				break;
			case QUEUE_ADD | QUEUE_WRITE:
				[_writeObjects addObject: object];

				[self OF_addObjectForWriting: object
					      fileDescriptor: fd];

				break;
			case QUEUE_REMOVE | QUEUE_READ:
				[self OF_removeObjectForReading: object
						 fileDescriptor: fd];

				[_readObjects removeObjectIdenticalTo: object];

				break;
			case QUEUE_REMOVE | QUEUE_WRITE:
				[self OF_removeObjectForWriting: object
						 fileDescriptor: fd];

				[_writeObjects removeObjectIdenticalTo: object];

				break;
			default:
				OF_ENSURE(0);
			}
		}

		[_queueActions removeAllItems];
		[_queueFDs removeAllItems];
		[_queueObjects removeAllObjects];
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}
#endif

	objc_autoreleasePoolPop(pool);
}
@end

Modified src/OFKernelEventObserver_epoll.m from [ccbb902e19] to [d65aafd84c].

88
89
90
91
92
93
94
95
96
97
98
99
100
101
102


103
104

105
106
107
108
109



110
111
112
113
114
115


116
117

118
119
120


121
122
123


124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140

141
142

143
144

145
146
147
148
149
150





151
152
153
154



155
156
157
158



159
160
161
162
163




164
165
166
167
168




169
170
171
172
173

174
175
176
177

178
179
180
181

182
183
184
185

186
187
188
189

190
191
192
193

194
195
196
197

198
199
200
201

202
203
204
205

206
207
208
209
210
211
212
213


214
215
216
217
218
219
220
88
89
90
91
92
93
94

95






96
97
98

99

100



101
102
103
104





105
106


107



108
109



110
111





112
113
114
115
116

117





118
119

120


121
122





123
124
125
126
127
128



129
130
131
132



133
134
135
136




137
138
139
140
141




142
143
144
145





146




147
148
149
150

151

152
153

154
155
156
157

158

159
160

161
162
163
164

165

166
167

168
169
170
171

172

173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188







-

-
-
-
-
-
-
+
+

-
+
-

-
-
-
+
+
+

-
-
-
-
-
+
+
-
-
+
-
-
-
+
+
-
-
-
+
+
-
-
-
-
-





-

-
-
-
-
-
+

-
+
-
-
+

-
-
-
-
-
+
+
+
+
+

-
-
-
+
+
+

-
-
-
+
+
+

-
-
-
-
+
+
+
+

-
-
-
-
+
+
+
+
-
-
-
-
-
+
-
-
-
-
+



-
+
-


-
+



-
+
-


-
+



-
+
-


-
+



-
+
-







+
+








	[super dealloc];
}

- (void)OF_addObject: (id)object
      fileDescriptor: (int)fd
	      events: (int)addEvents
	objectsArray: (OFMutableArray*)objectsArray
{
#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		struct epoll_event event;
		intptr_t events;
	struct epoll_event event;
	intptr_t events;

		events = (intptr_t)[_FDToEvents
	events = (intptr_t)[_FDToEvents valueForKey: (void*)(intptr_t)fd];
		    valueForKey: (void*)(intptr_t)fd];

		memset(&event, 0, sizeof(event));
		event.events = (int)events | addEvents;
		event.data.ptr = object;
	memset(&event, 0, sizeof(event));
	event.events = (int)events | addEvents;
	event.data.ptr = object;

		[objectsArray addObject: object];

		if (epoll_ctl(_epfd,
		    (events == 0 ? EPOLL_CTL_ADD : EPOLL_CTL_MOD),
		    fd, &event) == -1) {
	if (epoll_ctl(_epfd, (events == 0 ? EPOLL_CTL_ADD : EPOLL_CTL_MOD),
	    fd, &event) == -1)
			[objectsArray removeObjectIdenticalTo: object];
			@throw [OFObserveFailedException
		@throw [OFObserveFailedException exceptionWithObserver: self
			    exceptionWithObserver: self
					    errNo: errno];
		}
								 errNo: errno];


		[_FDToEvents setValue: (void*)(events | addEvents)
			       forKey: (void*)(intptr_t)fd];
	[_FDToEvents setValue: (void*)(events | addEvents)
		       forKey: (void*)(intptr_t)fd];
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}
#endif
}

- (void)OF_removeObject: (id)object
	 fileDescriptor: (int)fd
		 events: (int)removeEvents
	   objectsArray: (OFMutableArray*)objectsArray
{
#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		intptr_t events;
	intptr_t events;

		events = (intptr_t)[_FDToEvents
	events = (intptr_t)[_FDToEvents valueForKey: (void*)(intptr_t)fd];
		    valueForKey: (void*)(intptr_t)fd];
		events &= ~removeEvents;
	events &= ~removeEvents;

		if (events == 0) {
			if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) == -1)
				@throw [OFObserveFailedException
				    exceptionWithObserver: self
						    errNo: errno];
	if (events == 0) {
		if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) == -1)
			@throw [OFObserveFailedException
			    exceptionWithObserver: self
					    errNo: errno];

			[_FDToEvents removeValueForKey: (void*)(intptr_t)fd];
		} else {
			struct epoll_event event;
		[_FDToEvents removeValueForKey: (void*)(intptr_t)fd];
	} else {
		struct epoll_event event;

			memset(&event, 0, sizeof(event));
			event.events = (int)events;
			event.data.ptr = object;
		memset(&event, 0, sizeof(event));
		event.events = (int)events;
		event.data.ptr = object;

			if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &event) == -1)
				@throw [OFObserveFailedException
				    exceptionWithObserver: self
						    errNo: errno];
		if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &event) == -1)
			@throw [OFObserveFailedException
			    exceptionWithObserver: self
					    errNo: errno];

			[_FDToEvents setValue: (void*)events
				       forKey: (void*)(intptr_t)fd];
		}

		[_FDToEvents setValue: (void*)events
			       forKey: (void*)(intptr_t)fd];
	}
}
		[objectsArray removeObjectIdenticalTo: object];
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}

#endif
}

- (void)addObjectForReading: (id <OFReadyForReadingObserving>)object
- (void)OF_addObjectForReading: (id <OFReadyForReadingObserving>)object
{
	[self OF_addObject: object
	    fileDescriptor: [object fileDescriptorForReading]
		    events: EPOLLIN
		    events: EPOLLIN];
	      objectsArray: _readObjects];
}

- (void)addObjectForWriting: (id <OFReadyForWritingObserving>)object
- (void)OF_addObjectForWriting: (id <OFReadyForWritingObserving>)object
{
	[self OF_addObject: object
	    fileDescriptor: [object fileDescriptorForWriting]
		    events: EPOLLOUT
		    events: EPOLLOUT];
	      objectsArray: _writeObjects];
}

- (void)removeObjectForReading: (id <OFReadyForReadingObserving>)object
- (void)OF_removeObjectForReading: (id <OFReadyForReadingObserving>)object
{
	[self OF_removeObject: object
	       fileDescriptor: [object fileDescriptorForReading]
		       events: EPOLLIN
		       events: EPOLLIN];
		 objectsArray: _readObjects];
}

- (void)removeObjectForWriting: (id <OFReadyForWritingObserving>)object
- (void)OF_removeObjectForWriting: (id <OFReadyForWritingObserving>)object
{
	[self OF_removeObject: object
	       fileDescriptor: [object fileDescriptorForWriting]
		       events: EPOLLOUT
		       events: EPOLLOUT];
		 objectsArray: _writeObjects];
}

- (void)observeForTimeInterval: (of_time_interval_t)timeInterval
{
	OFNull *nullObject = [OFNull null];
	struct epoll_event eventList[EVENTLIST_SIZE];
	int events;

	[self OF_processQueue];

	if ([self OF_processReadBuffers])
		return;

	events = epoll_wait(_epfd, eventList, EVENTLIST_SIZE,
	    (timeInterval != -1 ? timeInterval * 1000 : -1));

229
230
231
232
233
234
235
236

237
238
239
240
241
242
243
244
245
246
247

248
249
250
251
252
253
254
255
256
257
258
259
260
197
198
199
200
201
202
203

204
205
206
207
208
209
210
211
212
213
214

215
216
217
218
219
220
221
222
223
224
225
226
227
228







-
+










-
+













			assert(eventList[i].events == EPOLLIN);
			OF_ENSURE(read(_cancelFD[0], &buffer, 1) == 1);

			continue;
		}

		if (eventList[i].events & EPOLLIN) {
			pool = objc_autoreleasePoolPush();
			void *pool = objc_autoreleasePoolPush();

			if ([_delegate respondsToSelector:
			    @selector(objectIsReadyForReading:)])
				[_delegate objectIsReadyForReading:
				    eventList[i].data.ptr];

			objc_autoreleasePoolPop(pool);
		}

		if (eventList[i].events & EPOLLOUT) {
			pool = objc_autoreleasePoolPush();
			void *pool = objc_autoreleasePoolPush();

			if ([_delegate respondsToSelector:
			    @selector(objectIsReadyForWriting:)])
				[_delegate objectIsReadyForWriting:
				    eventList[i].data.ptr];

			objc_autoreleasePoolPop(pool);
		}

		assert((eventList[i].events & ~(EPOLLIN | EPOLLOUT)) == 0);
	}
}
@end

Modified src/OFKernelEventObserver_kqueue.m from [240d5f78f9] to [1f47484a4e].

81
82
83
84
85
86
87
88

89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108

109
110

111
112
113


114
115
116
117

118
119
120
121

122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141

142
143

144
145
146


147
148
149
150

151
152
153
154

155
156
157
158
159
160
161
162
163
164
165
166
167
168


169
170
171


172
173
174
175
176

177
178
179
180

181
182
183
184
185
186
187
188
189
190
191
192
193
194


195
196

197
198
199
200
201
202
203
204
205
206
207
208
209
210


211
212
213
214
215
216
217
81
82
83
84
85
86
87

88
89
90
91
92
93
94
95
96
97
98
99
100
101







102


103



104
105




106




107
108
109
110
111
112
113
114
115
116
117
118
119
120







121


122



123
124




125




126
127
128
129
130
131
132
133
134






135
136



137
138





139




140
141
142
143
144
145
146
147
148






149
150


151







152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167







-
+













-
-
-
-
-
-
-
+
-
-
+
-
-
-
+
+
-
-
-
-
+
-
-
-
-
+













-
-
-
-
-
-
-
+
-
-
+
-
-
-
+
+
-
-
-
-
+
-
-
-
-
+








-
-
-
-
-
-
+
+
-
-
-
+
+
-
-
-
-
-
+
-
-
-
-
+








-
-
-
-
-
-
+
+
-
-
+
-
-
-
-
-
-
-







+
+







- (void)dealloc
{
	close(_kernelQueue);

	[super dealloc];
}

- (void)addObjectForReading: (id <OFReadyForReadingObserving>)object
- (void)OF_addObjectForReading: (id <OFReadyForReadingObserving>)object
{
	struct kevent event;

	memset(&event, 0, sizeof(event));
	event.ident = [object fileDescriptorForReading];
	event.filter = EVFILT_READ;
	event.flags = EV_ADD;
#ifndef OF_NETBSD
	event.udata = object;
#else
	event.udata = (intptr_t)object;
#endif

#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		[_readObjects addObject: object];

		if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) {
	if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0)
			[_readObjects removeObjectIdenticalTo: object];
			@throw [OFObserveFailedException
		@throw [OFObserveFailedException exceptionWithObserver: self
			    exceptionWithObserver: self
					    errNo: errno];
		}
								 errNo: errno];
}
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}

#endif
}

- (void)addObjectForWriting: (id <OFReadyForWritingObserving>)object
- (void)OF_addObjectForWriting: (id <OFReadyForWritingObserving>)object
{
	struct kevent event;

	memset(&event, 0, sizeof(event));
	event.ident = [object fileDescriptorForWriting];
	event.filter = EVFILT_WRITE;
	event.flags = EV_ADD;
#ifndef OF_NETBSD
	event.udata = object;
#else
	event.udata = (intptr_t)object;
#endif

#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		[_writeObjects addObject: object];

		if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) {
	if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0)
			[_writeObjects removeObjectIdenticalTo: object];
			@throw [OFObserveFailedException
		@throw [OFObserveFailedException exceptionWithObserver: self
			    exceptionWithObserver: self
					    errNo: errno];
		}
								 errNo: errno];
}
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}

#endif
}

- (void)removeObjectForReading: (id <OFReadyForReadingObserving>)object
- (void)OF_removeObjectForReading: (id <OFReadyForReadingObserving>)object
{
	struct kevent event;

	memset(&event, 0, sizeof(event));
	event.ident = [object fileDescriptorForReading];
	event.filter = EVFILT_READ;
	event.flags = EV_DELETE;

#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0)
			@throw [OFObserveFailedException
	if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0)
		@throw [OFObserveFailedException exceptionWithObserver: self
			    exceptionWithObserver: self
					    errNo: errno];

								 errNo: errno];
}
		[_readObjects removeObjectIdenticalTo: object];
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}

#endif
}

- (void)removeObjectForWriting: (id <OFReadyForWritingObserving>)object
- (void)OF_removeObjectForWriting: (id <OFReadyForWritingObserving>)object
{
	struct kevent event;

	memset(&event, 0, sizeof(event));
	event.ident = [object fileDescriptorForWriting];
	event.filter = EVFILT_WRITE;
	event.flags = EV_DELETE;

#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0)
			@throw [OFObserveFailedException
	if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0)
		@throw [OFObserveFailedException exceptionWithObserver: self
			    exceptionWithObserver: self
					    errNo: errno];
								 errNo: errno];

		[_writeObjects removeObjectIdenticalTo: object];
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}
#endif
}

- (void)observeForTimeInterval: (of_time_interval_t)timeInterval
{
	struct timespec timeout;
	struct kevent eventList[EVENTLIST_SIZE];
	int events;

	[self OF_processQueue];

	if ([self OF_processReadBuffers])
		return;

	timeout.tv_sec = (time_t)timeInterval;
	timeout.tv_nsec = lrint((timeInterval - timeout.tv_sec) * 1000000000);

Modified src/OFKernelEventObserver_poll.h from [4f7085394a] to [560881d237].

10
11
12
13
14
15
16
17

18
19
20
21
22
23

24
25
26
27
28
29
30
31
10
11
12
13
14
15
16

17
18
19
20
21
22

23
24
25
26
27
28
29
30
31







-
+





-
+








 *
 * Alternatively, it may be distributed under the terms of the GNU General
 * Public License, either version 2 or 3, which can be found in the file
 * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this
 * file.
 */

#import "OFKernelEventObserver_LockedQueue.h"
#import "OFKernelEventObserver.h"

OF_ASSUME_NONNULL_BEGIN

@class OFDataArray;

@interface OFKernelEventObserver_poll: OFKernelEventObserver_LockedQueue
@interface OFKernelEventObserver_poll: OFKernelEventObserver
{
	OFDataArray *_FDs;
	size_t _maxFD;
	id __unsafe_unretained *_FDToObject;
}
@end

OF_ASSUME_NONNULL_END

Modified src/OFKernelEventObserver_poll.m from [365c8917fd] to [faa844312a].

121
122
123
124
125
126
127
128

129
130
131
132

133
134
135
136

137
138
139
140

141
142
143
144

145
146
147
148

149
150
151
152

153
154
155
156

157
158
159
160
161
162
163
121
122
123
124
125
126
127

128

129
130

131
132
133
134

135

136
137

138
139
140
141

142

143
144

145
146
147
148

149

150
151

152
153
154
155
156
157
158
159







-
+
-


-
+



-
+
-


-
+



-
+
-


-
+



-
+
-


-
+







			}

			break;
		}
	}
}

- (void)OF_addObjectForReading: (id)object
- (void)OF_addObjectForReading: (id <OFReadyForReadingObserving>)object
		fileDescriptor: (int)fd
{
	[self OF_addObject: object
	    fileDescriptor: fd
	    fileDescriptor: [object fileDescriptorForReading]
		    events: POLLIN];
}

- (void)OF_addObjectForWriting: (id)object
- (void)OF_addObjectForWriting: (id <OFReadyForWritingObserving>)object
		fileDescriptor: (int)fd
{
	[self OF_addObject: object
	    fileDescriptor: fd
	    fileDescriptor: [object fileDescriptorForWriting]
		    events: POLLOUT];
}

- (void)OF_removeObjectForReading: (id)object
- (void)OF_removeObjectForReading: (id <OFReadyForReadingObserving>)object
		   fileDescriptor: (int)fd
{
	[self OF_removeObject: object
	       fileDescriptor: fd
	       fileDescriptor: [object fileDescriptorForReading]
		       events: POLLIN];
}

- (void)OF_removeObjectForWriting: (id)object
- (void)OF_removeObjectForWriting: (id <OFReadyForWritingObserving>)object
		   fileDescriptor: (int)fd
{
	[self OF_removeObject: object
	       fileDescriptor: fd
	       fileDescriptor: [object fileDescriptorForWriting]
		       events: POLLOUT];
}

- (void)observeForTimeInterval: (of_time_interval_t)timeInterval
{
	struct pollfd *FDs;
	int events;

Modified src/OFKernelEventObserver_select.h from [3ab62cb3ec] to [b013269393].

23
24
25
26
27
28
29
30

31
32
33
34

35
36
37
38
39
40
41
23
24
25
26
27
28
29

30
31
32
33

34
35
36
37
38
39
40
41







-
+



-
+







# define __STDC_CONSTANT_MACROS
#endif

#ifdef HAVE_SYS_SELECT_H
# include <sys/select.h>
#endif

#import "OFKernelEventObserver_LockedQueue.h"
#import "OFKernelEventObserver.h"

OF_ASSUME_NONNULL_BEGIN

@interface OFKernelEventObserver_select: OFKernelEventObserver_LockedQueue
@interface OFKernelEventObserver_select: OFKernelEventObserver
{
	fd_set _readFDs, _writeFDs;
	int _maxFD;
}
@end

OF_ASSUME_NONNULL_END

Modified src/OFKernelEventObserver_select.m from [c7ee699c53] to [e66acd4ec0].

60
61
62
63
64
65
66
67
68
69




70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86




87
88
89
90
91
92
93
94
95
96
97
98
99
100
101

102
103
104


105
106
107
108
109
110
111
112
113
114
115
116
117

118
119
120


121
122
123
124
125
126
127
60
61
62
63
64
65
66



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102

103

104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119

120

121
122
123
124
125
126
127
128
129
130
131







-
-
-
+
+
+
+














-
-
-
+
+
+
+














-
+
-


+
+












-
+
-


+
+







		@throw [OFOutOfRangeException exception];

	_maxFD = (int)_cancelFD[0];

	return self;
}

- (void)OF_addObjectForReading: (id)object
		fileDescriptor: (int)fd
{
- (void)OF_addObjectForReading: (id <OFReadyForReadingObserving>)object
{
	int fd = [object fileDescriptorForReading];

	if (fd < 0 || fd > INT_MAX - 1)
		@throw [OFOutOfRangeException exception];

#ifndef OF_WINDOWS
	if (fd >= FD_SETSIZE)
		@throw [OFOutOfRangeException exception];
#endif

	if (fd > _maxFD)
		_maxFD = fd;

	FD_SET(fd, &_readFDs);
}

- (void)OF_addObjectForWriting: (id)object
		fileDescriptor: (int)fd
{
- (void)OF_addObjectForWriting: (id <OFReadyForWritingObserving>)object
{
	int fd = [object fileDescriptorForWriting];

	if (fd < 0 || fd > INT_MAX - 1)
		@throw [OFOutOfRangeException exception];

#ifndef OF_WINDOWS
	if (fd >= FD_SETSIZE)
		@throw [OFOutOfRangeException exception];
#endif

	if (fd > _maxFD)
		_maxFD = fd;

	FD_SET(fd, &_writeFDs);
}

- (void)OF_removeObjectForReading: (id)object
- (void)OF_removeObjectForReading: (id <OFReadyForReadingObserving>)object
		   fileDescriptor: (int)fd
{
	/* TODO: Adjust _maxFD */

	int fd = [object fileDescriptorForReading];

	if (fd < 0)
		@throw [OFOutOfRangeException exception];

#ifndef OF_WINDOWS
	if (fd >= FD_SETSIZE)
		@throw [OFOutOfRangeException exception];
#endif

	FD_CLR(fd, &_readFDs);
}

- (void)OF_removeObjectForWriting: (id)object
- (void)OF_removeObjectForWriting: (id <OFReadyForWritingObserving>)object
		   fileDescriptor: (int)fd
{
	/* TODO: Adjust _maxFD */

	int fd = [object fileDescriptorForWriting];

	if (fd < 0)
		@throw [OFOutOfRangeException exception];

#ifndef OF_WINDOWS
	if (fd >= FD_SETSIZE)
		@throw [OFOutOfRangeException exception];