/* * Copyright (c) 2008-2021 Jonathan Schleifer <js@nil.im> * * All rights reserved. * * This file is part of ObjFW. It may be distributed under the terms of the * Q Public License 1.0, which can be found in the file LICENSE.QPL included in * the packaging of this file. * * Alternatively, it may be distributed under the terms of the GNU General * Public License, either version 2 or 3, which can be found in the file * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this * file. */ #include "config.h" #define OF_THREAD_POOL_M #import "OFThreadPool.h" #import "OFArray.h" #import "OFList.h" #import "OFThread.h" #import "OFCondition.h" #import "OFSystemInfo.h" OF_DIRECT_MEMBERS @interface OFThreadPoolJob: OFObject { id _target; SEL _selector; id _object; #ifdef OF_HAVE_BLOCKS OFThreadPoolBlock _block; #endif } - (instancetype)initWithTarget: (id)target selector: (SEL)selector object: (id)object; #ifdef OF_HAVE_BLOCKS - (instancetype)initWithBlock: (OFThreadPoolBlock)block; #endif - (void)perform; @end @implementation OFThreadPoolJob - (instancetype)initWithTarget: (id)target selector: (SEL)selector object: (id)object { self = [super init]; @try { _target = [target retain]; _selector = selector; _object = [object retain]; } @catch (id e) { [self release]; @throw e; } return self; } #ifdef OF_HAVE_BLOCKS - (instancetype)initWithBlock: (OFThreadPoolBlock)block { self = [super init]; @try { _block = [block copy]; } @catch (id e) { [self release]; @throw e; } return self; } #endif - (void)dealloc { [_target release]; [_object release]; #ifdef OF_HAVE_BLOCKS [_block release]; #endif [super dealloc]; } - (void)perform { #ifdef OF_HAVE_BLOCKS if (_block != NULL) _block(); else #endif [_target performSelector: _selector withObject: _object]; } @end OF_DIRECT_MEMBERS @interface OFThreadPoolThread: OFThread { OFList *_queue; OFCondition *_queueCondition, *_countCondition; @public volatile bool _terminate; volatile int *_doneCount; } + (instancetype)threadWithThreadPool: (OFThreadPool *)threadPool; - (instancetype)initWithThreadPool: (OFThreadPool *)threadPool; @end @implementation OFThreadPoolThread + (instancetype)threadWithThreadPool: (OFThreadPool *)threadPool { return [[(OFThreadPoolThread *)[self alloc] initWithThreadPool: threadPool] autorelease]; } - (instancetype)initWithThreadPool: (OFThreadPool *)threadPool { self = [super init]; @try { _queue = [threadPool->_queue retain]; _queueCondition = [threadPool->_queueCondition retain]; _countCondition = [threadPool->_countCondition retain]; _doneCount = &threadPool->_doneCount; } @catch (id e) { [self release]; @throw e; } return self; } - (void)dealloc { [_queue release]; [_queueCondition release]; [_countCondition release]; [super dealloc]; } - (id)main { void *pool; if (_terminate) return nil; pool = objc_autoreleasePoolPush(); for (;;) { OFThreadPoolJob *job; [_queueCondition lock]; @try { OFListItem listItem; if (_terminate) { objc_autoreleasePoolPop(pool); return nil; } listItem = _queue.firstListItem; while (listItem == NULL) { [_queueCondition wait]; if (_terminate) { objc_autoreleasePoolPop(pool); return nil; } listItem = _queue.firstListItem; } job = [[OFListItemObject(listItem) retain] autorelease]; [_queue removeListItem: listItem]; } @finally { [_queueCondition unlock]; } if (_terminate) { objc_autoreleasePoolPop(pool); return nil; } [job perform]; if (_terminate) { objc_autoreleasePoolPop(pool); return nil; } objc_autoreleasePoolPop(pool); pool = objc_autoreleasePoolPush(); [_countCondition lock]; @try { if (_terminate) { objc_autoreleasePoolPop(pool); return nil; } (*_doneCount)++; [_countCondition signal]; } @finally { [_countCondition unlock]; } } } @end @implementation OFThreadPool + (instancetype)threadPool { return [[[self alloc] init] autorelease]; } + (instancetype)threadPoolWithSize: (size_t)size { return [[[self alloc] initWithSize: size] autorelease]; } - (instancetype)init { return [self initWithSize: [OFSystemInfo numberOfCPUs]]; } - (instancetype)initWithSize: (size_t)size { self = [super init]; @try { _size = size; _threads = [[OFMutableArray alloc] init]; _queue = [[OFList alloc] init]; _queueCondition = [[OFCondition alloc] init]; _countCondition = [[OFCondition alloc] init]; for (size_t i = 0; i < size; i++) { void *pool = objc_autoreleasePoolPush(); OFThreadPoolThread *thread = [OFThreadPoolThread threadWithThreadPool: self]; [_threads addObject: thread]; objc_autoreleasePoolPop(pool); } /* * We need to start the threads in a separate loop to make sure * _threads is not modified anymore to prevent a race condition. */ for (size_t i = 0; i < size; i++) [[_threads objectAtIndex: i] start]; } @catch (id e) { [self release]; @throw e; } return self; } - (void)dealloc { [_queueCondition lock]; @try { [_countCondition lock]; @try { for (OFThreadPoolThread *thread in _threads) thread->_terminate = true; } @finally { [_countCondition unlock]; } [_queueCondition broadcast]; } @finally { [_queueCondition unlock]; } [_threads release]; [_queue release]; [_queueCondition release]; [_countCondition release]; [super dealloc]; } - (void)of_dispatchJob: (OFThreadPoolJob *)job OF_DIRECT { [_countCondition lock]; _count++; [_countCondition unlock]; [_queueCondition lock]; @try { [_queue appendObject: job]; [_queueCondition signal]; } @finally { [_queueCondition unlock]; } } - (void)waitUntilDone { for (;;) { [_countCondition lock]; @try { if (_doneCount == _count) return; [_countCondition wait]; } @finally { [_countCondition unlock]; } } } - (void)dispatchWithTarget: (id)target selector: (SEL)selector object: (id)object { OFThreadPoolJob *job = [[OFThreadPoolJob alloc] initWithTarget: target selector: selector object: object]; @try { [self of_dispatchJob: job]; } @finally { [job release]; } } #ifdef OF_HAVE_BLOCKS - (void)dispatchWithBlock: (OFThreadPoolBlock)block { OFThreadPoolJob *job = [[OFThreadPoolJob alloc] initWithBlock: block]; @try { [self of_dispatchJob: job]; } @finally { [job release]; } } #endif - (size_t)size { return _size; } @end