Artifact c1f2526e6a518138a780f3382f8ba98c7ab3e47cf7488393833497628317b51e:
- File
src/OFThreadPool.m
— part of check-in
[13ee56edf3]
at
2014-06-21 21:43:43
on branch trunk
— Move all macros from OFObject.h to macros.h
This means that OFObject.h imports macros.h now, making it unnecessary
to manually import macros.h in almost every file. And while at it, also
import autorelease.h in OFObject.h, so that this doesn't need to be
manually imported in almost every file as well. (user: js, size: 6946) [annotate] [blame] [check-ins using]
/* * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014 * Jonathan Schleifer <js@webkeks.org> * * All rights reserved. * * This file is part of ObjFW. It may be distributed under the terms of the * Q Public License 1.0, which can be found in the file LICENSE.QPL included in * the packaging of this file. * * Alternatively, it may be distributed under the terms of the GNU General * Public License, either version 2 or 3, which can be found in the file * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this * file. */ #include "config.h" #import "OFThreadPool.h" #import "OFArray.h" #import "OFList.h" #import "OFThread.h" #import "OFCondition.h" #import "OFSystemInfo.h" @interface OFThreadPoolJob: OFObject { id _target; SEL _selector; id _object; #ifdef OF_HAVE_BLOCKS of_thread_pool_block_t _block; #endif } + (instancetype)jobWithTarget: (id)target selector: (SEL)selector object: (id)object; #ifdef OF_HAVE_BLOCKS + (instancetype)jobWithBlock: (of_thread_pool_block_t)block; #endif - initWithTarget: (id)target selector: (SEL)selector object: (id)object; #ifdef OF_HAVE_BLOCKS - initWithBlock: (of_thread_pool_block_t)block; #endif - (void)perform; @end @implementation OFThreadPoolJob + (instancetype)jobWithTarget: (id)target selector: (SEL)selector object: (id)object { return [[[self alloc] initWithTarget: target selector: selector object: object] autorelease]; } #ifdef OF_HAVE_BLOCKS + (instancetype)jobWithBlock: (of_thread_pool_block_t)block { return [[[self alloc] initWithBlock: block] autorelease]; } #endif - initWithTarget: (id)target selector: (SEL)selector object: (id)object { self = [super init]; @try { _target = [target retain]; _selector = selector; _object = [object retain]; } @catch (id e) { [self release]; @throw e; } return self; } #ifdef OF_HAVE_BLOCKS - initWithBlock: (of_thread_pool_block_t)block { self = [super init]; @try { _block = [block copy]; } @catch (id e) { [self release]; @throw e; } return self; } #endif - (void)dealloc { [_target release]; [_object release]; #ifdef OF_HAVE_BLOCKS [_block release]; #endif [super dealloc]; } - (void)perform { #ifdef OF_HAVE_BLOCKS if (_block != NULL) _block(); else #endif [_target performSelector: _selector withObject: _object]; } @end @interface OFThreadPoolThread: OFThread { OFList *_queue; OFCondition *_queueCondition, *_countCondition; @public volatile bool _terminate; volatile int *_doneCount; } + (instancetype)threadWithThreadPool: (OFThreadPool*)threadPool; - initWithThreadPool: (OFThreadPool*)threadPool; @end @implementation OFThreadPoolThread + (instancetype)threadWithThreadPool: (OFThreadPool*)threadPool { return [[[self alloc] initWithThreadPool: threadPool] autorelease]; } - initWithThreadPool: (OFThreadPool*)threadPool { self = [super init]; @try { _queue = [threadPool->_queue retain]; _queueCondition = [threadPool->_queueCondition retain]; _countCondition = [threadPool->_countCondition retain]; _doneCount = &threadPool->_doneCount; } @catch (id e) { [self release]; @throw e; } return self; } - (void)dealloc { [_queue release]; [_queueCondition release]; [_countCondition release]; [super dealloc]; } - (id)main { void *pool; if (_terminate) return nil; pool = objc_autoreleasePoolPush(); for (;;) { OFThreadPoolJob *job; [_queueCondition lock]; @try { of_list_object_t *listObject; if (_terminate) { objc_autoreleasePoolPop(pool); return nil; } listObject = [_queue firstListObject]; while (listObject == NULL) { [_queueCondition wait]; if (_terminate) { objc_autoreleasePoolPop(pool); return nil; } listObject = [_queue firstListObject]; } job = [[listObject->object retain] autorelease]; [_queue removeListObject: listObject]; } @finally { [_queueCondition unlock]; } if (_terminate) { objc_autoreleasePoolPop(pool); return nil; } [job perform]; if (_terminate) { objc_autoreleasePoolPop(pool); return nil; } objc_autoreleasePoolPop(pool); pool = objc_autoreleasePoolPush(); [_countCondition lock]; @try { if (_terminate) { objc_autoreleasePoolPop(pool); return nil; } (*_doneCount)++; [_countCondition signal]; } @finally { [_countCondition unlock]; } } } @end @implementation OFThreadPool + (instancetype)threadPool { return [[[self alloc] init] autorelease]; } + (instancetype)threadPoolWithSize: (size_t)size { return [[[self alloc] initWithSize: size] autorelease]; } - init { return [self initWithSize: [OFSystemInfo numberOfCPUs]]; } - initWithSize: (size_t)size { self = [super init]; @try { size_t i; _size = size; _threads = [[OFMutableArray alloc] init]; _queue = [[OFList alloc] init]; _queueCondition = [[OFCondition alloc] init]; _countCondition = [[OFCondition alloc] init]; for (i = 0; i < size; i++) { void *pool = objc_autoreleasePoolPush(); OFThreadPoolThread *thread = [OFThreadPoolThread threadWithThreadPool: self]; [_threads addObject: thread]; objc_autoreleasePoolPop(pool); } /* * We need to start the threads in a separate loop to make sure * threads is not modified anymore to prevent a race condition. */ for (i = 0; i < size; i++) [[_threads objectAtIndex: i] start]; } @catch (id e) { [self release]; @throw e; } return self; } - (void)dealloc { void *pool = objc_autoreleasePoolPush(); [_queueCondition lock]; @try { [_countCondition lock]; @try { OFEnumerator *enumerator = [_threads objectEnumerator]; OFThreadPoolThread *thread; while ((thread = [enumerator nextObject]) != nil) thread->_terminate = true; } @finally { [_countCondition unlock]; } [_queueCondition broadcast]; } @finally { [_queueCondition unlock]; } objc_autoreleasePoolPop(pool); [_threads release]; [_queue release]; [_queueCondition release]; [_countCondition release]; [super dealloc]; } - (void)OF_dispatchJob: (OFThreadPoolJob*)job { [_countCondition lock]; _count++; [_countCondition unlock]; [_queueCondition lock]; @try { [_queue appendObject: job]; [_queueCondition signal]; } @finally { [_queueCondition unlock]; } } - (void)waitUntilDone { for (;;) { [_countCondition lock]; @try { if (_doneCount == _count) return; [_countCondition wait]; } @finally { [_countCondition unlock]; } } } - (void)dispatchWithTarget: (id)target selector: (SEL)selector object: (id)object { [self OF_dispatchJob: [OFThreadPoolJob jobWithTarget: target selector: selector object: object]]; } #ifdef OF_HAVE_BLOCKS - (void)dispatchWithBlock: (of_thread_pool_block_t)block { [self OF_dispatchJob: [OFThreadPoolJob jobWithBlock: block]]; } #endif - (size_t)size { return _size; } @end