/* * Copyright (c) 2008, 2009, 2010, 2011, 2012 * Jonathan Schleifer * * All rights reserved. * * This file is part of ObjFW. It may be distributed under the terms of the * Q Public License 1.0, which can be found in the file LICENSE.QPL included in * the packaging of this file. * * Alternatively, it may be distributed under the terms of the GNU General * Public License, either version 2 or 3, which can be found in the file * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this * file. */ #include "config.h" #import "OFThreadPool.h" #import "OFArray.h" #import "OFList.h" #import "OFThread.h" #import "OFAutoreleasePool.h" @interface OFThreadPoolJob: OFObject { id target; SEL selector; id object; #ifdef OF_HAVE_BLOCKS of_thread_pool_block_t block; #endif } + jobWithTarget: (id)target selector: (SEL)selector object: (id)object; #ifdef OF_HAVE_BLOCKS + jobWithBlock: (of_thread_pool_block_t)block object: (id)object; #endif - initWithTarget: (id)target selector: (SEL)selector object: (id)object; #ifdef OF_HAVE_BLOCKS - initWithBlock: (of_thread_pool_block_t)block object: (id)object; #endif - (void)perform; @end @implementation OFThreadPoolJob + jobWithTarget: (id)target selector: (SEL)selector object: (id)object { return [[[self alloc] initWithTarget: target selector: selector object: object] autorelease]; } #ifdef OF_HAVE_BLOCKS + jobWithBlock: (of_thread_pool_block_t)block object: (id)object { return [[(OFThreadPoolJob*)[self alloc] initWithBlock: block object: object] autorelease]; } #endif - initWithTarget: (id)target_ selector: (SEL)selector_ object: (id)object_ { self = [super init]; @try { target = [target_ retain]; selector = selector_; object = [object_ retain]; } @catch (id e) { [self release]; @throw e; } return self; } #ifdef OF_HAVE_BLOCKS - initWithBlock: (of_thread_pool_block_t)block_ object: (id)object_ { self = [super init]; @try { block = [block_ copy]; object = [object_ retain]; } @catch (id e) { [self release]; @throw e; } return self; } #endif - (void)dealloc { [target release]; [object release]; #ifdef OF_HAVE_BLOCKS [block release]; #endif [super dealloc]; } - (void)perform { #ifdef OF_HAVE_BLOCKS if (block != NULL) block(object); else #endif [object performSelector: selector withObject: object]; } @end @interface OFThreadPoolThread: OFThread { OFList *queue; OFCondition *queueCondition, *countCondition; @public volatile BOOL terminate; volatile int *doneCount; } + threadWithThreadPool: (OFThreadPool*)threadPool; - initWithThreadPool: (OFThreadPool*)threadPool; @end @implementation OFThreadPoolThread + threadWithThreadPool: (OFThreadPool*)threadPool { return [[[self alloc] initWithThreadPool: threadPool] autorelease]; } - initWithThreadPool: (OFThreadPool*)threadPool { self = [super init]; @try { queue = [threadPool->queue retain]; queueCondition = [threadPool->queueCondition retain]; countCondition = [threadPool->countCondition retain]; doneCount = &threadPool->doneCount; } @catch (id e) { [self release]; @throw e; } return self; } - (void)dealloc { [queue release]; [queueCondition release]; [countCondition release]; [super dealloc]; } - (id)main { OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; if (terminate) { [pool release]; return nil; } for (;;) { OFThreadPoolJob *job; [queueCondition lock]; @try { of_list_object_t *listObject; if (terminate) { [pool release]; return nil; } listObject = [queue firstListObject]; while (listObject == NULL) { [queueCondition wait]; if (terminate) { [pool release]; return nil; } listObject = [queue firstListObject]; } job = [[listObject->object retain] autorelease]; [queue removeListObject: listObject]; } @finally { [queueCondition unlock]; } if (terminate) { [pool release]; return nil; } [job perform]; if (terminate) { [pool release]; return nil; } [pool releaseObjects]; [countCondition lock]; @try { if (terminate) { [pool release]; return nil; } (*doneCount)++; [countCondition signal]; } @finally { [countCondition unlock]; } } } @end @implementation OFThreadPool + threadPool { return [[[self alloc] init] autorelease]; } + threadPoolWithSize: (size_t)size { return [[[self alloc] initWithSize: size] autorelease]; } - init { return [self initWithSize: of_num_cpus]; } - initWithSize: (size_t)size_ { self = [super init]; @try { OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; size_t i; size = size_; threads = [[OFMutableArray alloc] init]; queue = [[OFList alloc] init]; queueCondition = [[OFCondition alloc] init]; countCondition = [[OFCondition alloc] init]; for (i = 0; i < size; i++) { OFThreadPoolThread *thread = [OFThreadPoolThread threadWithThreadPool: self]; [threads addObject: thread]; [pool releaseObjects]; } /* * We need to start the threads in a separate loop to make sure * threads is not modified anymore to prevent a race condition. */ for (i = 0; i < size; i++) { OFThreadPoolThread *thread = [threads objectAtIndex: i]; [thread start]; } [pool release]; } @catch (id e) { [self release]; @throw e; } return self; } - (void)dealloc { OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; [queueCondition lock]; @try { [countCondition lock]; @try { OFEnumerator *enumerator = [threads objectEnumerator]; OFThreadPoolThread *thread; while ((thread = [enumerator nextObject]) != nil) thread->terminate = YES; } @finally { [countCondition unlock]; } [queueCondition broadcast]; } @finally { [queueCondition unlock]; } [pool release]; [threads release]; [queue release]; [queueCondition release]; [countCondition release]; [super dealloc]; } - (void)_dispatchJob: (OFThreadPoolJob*)job { of_atomic_inc_int(&count); [queueCondition lock]; @try { [queue appendObject: job]; [queueCondition signal]; } @finally { [queueCondition unlock]; } } - (void)waitUntilFinished { for (;;) { [countCondition lock]; @try { if (doneCount == count) return; [countCondition wait]; } @finally { [countCondition unlock]; } } } - (void)dispatchWithTarget: (id)target selector: (SEL)selector object: (id)object { [self _dispatchJob: [OFThreadPoolJob jobWithTarget: target selector: selector object: object]]; } #ifdef OF_HAVE_BLOCKS - (void)dispatchWithBlock: (of_thread_pool_block_t)block { [self _dispatchJob: [OFThreadPoolJob jobWithBlock: block object: nil]]; } - (void)dispatchWithBlock: (of_thread_pool_block_t)block object: (id)object { [self _dispatchJob: [OFThreadPoolJob jobWithBlock: block object: object]]; } #endif - (size_t)size { return size; } @end