ObjFW  Artifact [240d5f78f9]

Artifact 240d5f78f9be59a5485e7e6fbe420d4db4f1a7d021aefec64365c6d3a531d34f:

  • File src/OFKernelEventObserver_kqueue.m — part of check-in [7ae17af9f0] at 2016-03-20 11:57:06 on branch trunk — Never block when the read buffer is non-empty

    This was broken by 88f2f03. The problem only existed when something was
    in the read buffer, but not processed completely, as after processing
    the read buffer, it would go on to wait for data - but since not the
    entire read buffer had been processed, it meant there was still data
    left there that needed to be handled first. (user: js, size: 5922) [annotate] [blame] [check-ins using]


/*
 * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016
 *   Jonathan Schleifer <js@heap.zone>
 *
 * All rights reserved.
 *
 * This file is part of ObjFW. It may be distributed under the terms of the
 * Q Public License 1.0, which can be found in the file LICENSE.QPL included in
 * the packaging of this file.
 *
 * Alternatively, it may be distributed under the terms of the GNU General
 * Public License, either version 2 or 3, which can be found in the file
 * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this
 * file.
 */

#include "config.h"

#include <assert.h>
#include <errno.h>
#include <math.h>

#include <fcntl.h>
#include <unistd.h>

#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>

#import "OFKernelEventObserver.h"
#import "OFKernelEventObserver+Private.h"
#import "OFKernelEventObserver_kqueue.h"
#import "OFDataArray.h"
#import "OFArray.h"
#ifdef OF_HAVE_THREADS
# import "OFMutex.h"
#endif

#import "OFInitializationFailedException.h"
#import "OFObserveFailedException.h"
#import "OFOutOfRangeException.h"

#define EVENTLIST_SIZE 64

@implementation OFKernelEventObserver_kqueue
- init
{
	self = [super init];

	@try {
		struct kevent event;

#ifdef HAVE_KQUEUE1
		if ((_kernelQueue = kqueue1(O_CLOEXEC)) == -1)
			@throw [OFInitializationFailedException
			    exceptionWithClass: [self class]];
#else
		int flags;

		if ((_kernelQueue = kqueue()) == -1)
			@throw [OFInitializationFailedException
			    exceptionWithClass: [self class]];

		if ((flags = fcntl(_kernelQueue, F_GETFD, 0)) != -1)
			fcntl(_kernelQueue, F_SETFD, flags | FD_CLOEXEC);
#endif

		EV_SET(&event, _cancelFD[0], EVFILT_READ, EV_ADD, 0, 0, 0);

		if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0)
			@throw [OFInitializationFailedException
			    exceptionWithClass: [self class]];
	} @catch (id e) {
		[self release];
		@throw e;
	}

	return self;
}

- (void)dealloc
{
	close(_kernelQueue);

	[super dealloc];
}

- (void)addObjectForReading: (id <OFReadyForReadingObserving>)object
{
	struct kevent event;

	memset(&event, 0, sizeof(event));
	event.ident = [object fileDescriptorForReading];
	event.filter = EVFILT_READ;
	event.flags = EV_ADD;
#ifndef OF_NETBSD
	event.udata = object;
#else
	event.udata = (intptr_t)object;
#endif

#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		[_readObjects addObject: object];

		if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) {
			[_readObjects removeObjectIdenticalTo: object];
			@throw [OFObserveFailedException
			    exceptionWithObserver: self
					    errNo: errno];
		}
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}
#endif
}

- (void)addObjectForWriting: (id <OFReadyForWritingObserving>)object
{
	struct kevent event;

	memset(&event, 0, sizeof(event));
	event.ident = [object fileDescriptorForWriting];
	event.filter = EVFILT_WRITE;
	event.flags = EV_ADD;
#ifndef OF_NETBSD
	event.udata = object;
#else
	event.udata = (intptr_t)object;
#endif

#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		[_writeObjects addObject: object];

		if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0) {
			[_writeObjects removeObjectIdenticalTo: object];
			@throw [OFObserveFailedException
			    exceptionWithObserver: self
					    errNo: errno];
		}
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}
#endif
}

- (void)removeObjectForReading: (id <OFReadyForReadingObserving>)object
{
	struct kevent event;

	memset(&event, 0, sizeof(event));
	event.ident = [object fileDescriptorForReading];
	event.filter = EVFILT_READ;
	event.flags = EV_DELETE;

#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0)
			@throw [OFObserveFailedException
			    exceptionWithObserver: self
					    errNo: errno];

		[_readObjects removeObjectIdenticalTo: object];
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}
#endif
}

- (void)removeObjectForWriting: (id <OFReadyForWritingObserving>)object
{
	struct kevent event;

	memset(&event, 0, sizeof(event));
	event.ident = [object fileDescriptorForWriting];
	event.filter = EVFILT_WRITE;
	event.flags = EV_DELETE;

#ifdef OF_HAVE_THREADS
	[_mutex lock];
	@try {
#endif
		if (kevent(_kernelQueue, &event, 1, NULL, 0, NULL) != 0)
			@throw [OFObserveFailedException
			    exceptionWithObserver: self
					    errNo: errno];

		[_writeObjects removeObjectIdenticalTo: object];
#ifdef OF_HAVE_THREADS
	} @finally {
		[_mutex unlock];
	}
#endif
}

- (void)observeForTimeInterval: (of_time_interval_t)timeInterval
{
	struct timespec timeout;
	struct kevent eventList[EVENTLIST_SIZE];
	int events;

	if ([self OF_processReadBuffers])
		return;

	timeout.tv_sec = (time_t)timeInterval;
	timeout.tv_nsec = lrint((timeInterval - timeout.tv_sec) * 1000000000);

	events = kevent(_kernelQueue, NULL, 0, eventList, EVENTLIST_SIZE,
	    (timeInterval != -1 ? &timeout : NULL));

	if (events < 0)
		@throw [OFObserveFailedException exceptionWithObserver: self
								 errNo: errno];

	for (int i = 0; i < events; i++) {
		void *pool;

		if (eventList[i].flags & EV_ERROR)
			@throw [OFObserveFailedException
			    exceptionWithObserver: self
					    errNo: (int)eventList[i].data];

		if (eventList[i].ident == _cancelFD[0]) {
			char buffer;

			assert(eventList[i].filter == EVFILT_READ);
			OF_ENSURE(read(_cancelFD[0], &buffer, 1) == 1);

			continue;
		}

		pool = objc_autoreleasePoolPush();

		switch (eventList[i].filter) {
		case EVFILT_READ:
			if ([_delegate respondsToSelector:
			    @selector(objectIsReadyForReading:)])
				[_delegate objectIsReadyForReading:
				    (id)eventList[i].udata];
			break;
		case EVFILT_WRITE:
			if ([_delegate respondsToSelector:
			    @selector(objectIsReadyForWriting:)])
				[_delegate objectIsReadyForWriting:
				    (id)eventList[i].udata];
			break;
		default:
			assert(0);
		}

		objc_autoreleasePoolPop(pool);
	}
}
@end