30 #if __TBB_TASK_PRIORITY
32 arena *&next = my_priority_levels[a.my_top_priority].next_arena;
38 if ( arenas.
size() == 1 )
39 next = &*arenas.
begin();
43 #if __TBB_TASK_PRIORITY
45 arena *&next = my_priority_levels[a.my_top_priority].next_arena;
50 arena_list_type::iterator it = next;
53 if ( ++it == arenas.
end() && arenas.
size() > 1 )
64 market::market (
unsigned workers_soft_limit,
unsigned workers_hard_limit,
size_t stack_size )
65 : my_num_workers_hard_limit(workers_hard_limit)
66 , my_num_workers_soft_limit(workers_soft_limit)
68 , my_global_top_priority(normalized_normal_priority)
69 , my_global_bottom_priority(normalized_normal_priority)
72 , my_stack_size(stack_size)
73 , my_workers_soft_limit_to_report(workers_soft_limit)
75 #if __TBB_TASK_PRIORITY
88 workers_soft_limit = soft_limit-1;
91 if( workers_soft_limit >= workers_hard_limit )
92 workers_soft_limit = workers_hard_limit-1;
93 return workers_soft_limit;
103 if( old_public_count==0 )
109 "skip_soft_limit_warning must be larger than any valid workers_requested" );
111 if( soft_limit_to_report < workers_requested ) {
113 "The request for %u workers is ignored. Further requests for more workers "
114 "will be silently ignored until the limit changes.\n",
115 soft_limit_to_report, workers_requested );
125 "The request for larger stack (%u) cannot be satisfied.\n",
130 if( stack_size == 0 )
144 #if __TBB_TASK_GROUP_CONTEXT
146 "my_workers must be the last data field of the market class");
151 memset( storage, 0,
size );
153 m =
new (storage)
market( workers_soft_limit, workers_hard_limit, stack_size );
159 runtime_warning(
"RML might limit the number of workers to %u while %u is requested.\n"
160 , m->
my_server->default_concurrency(), workers_soft_limit );
166 #if __TBB_COUNT_TASK_NODES
167 if ( my_task_node_count )
168 runtime_warning(
"Leaked %ld task objects\n", (
long)my_task_node_count );
170 this->market::~market();
177 bool do_release =
false;
180 if ( blocking_terminate ) {
181 __TBB_ASSERT( is_public,
"Only an object with a public reference can request the blocking terminate" );
212 return blocking_terminate;
217 int market::update_workers_request() {
220 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
221 if (my_mandatory_num_requested > 0) {
226 #if __TBB_TASK_PRIORITY
254 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
255 #if __TBB_TASK_PRIORITY
256 #define FOR_EACH_PRIORITY_LEVEL_BEGIN { \
257 for (int p = m->my_global_top_priority; p >= m->my_global_bottom_priority; --p) { \
258 priority_level_info& pl = m->my_priority_levels[p]; \
259 arena_list_type& arenas = pl.arenas;
261 #define FOR_EACH_PRIORITY_LEVEL_BEGIN { { \
263 arena_list_type& arenas = m->my_arenas;
265 #define FOR_EACH_PRIORITY_LEVEL_END } }
268 FOR_EACH_PRIORITY_LEVEL_BEGIN
269 for (arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it)
270 if (it->my_global_concurrency_mode)
271 m->disable_mandatory_concurrency_impl(&*it);
272 FOR_EACH_PRIORITY_LEVEL_END
281 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
283 FOR_EACH_PRIORITY_LEVEL_BEGIN
284 for (arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it) {
285 if (!it->my_task_stream.empty(
p))
286 m->enable_mandatory_concurrency_impl(&*it);
288 FOR_EACH_PRIORITY_LEVEL_END
290 #undef FOR_EACH_PRIORITY_LEVEL_BEGIN
291 #undef FOR_EACH_PRIORITY_LEVEL_END
294 delta = m->update_workers_request();
298 m->
my_server->adjust_job_count_estimate( delta );
304 return ((
const market&)client).must_join_workers();
324 if (a.my_global_concurrency_mode)
325 disable_mandatory_concurrency_impl(&a);
340 #if __TBB_TASK_PRIORITY
344 priority_level_info &pl = my_priority_levels[
p];
350 if ( it->my_aba_epoch == aba_epoch ) {
366 #if __TBB_TASK_PRIORITY
374 if ( arenas.
empty() )
376 arena_list_type::iterator it = hint;
380 if ( ++it == arenas.
end() )
386 }
while ( it != hint );
392 max_workers =
min(workers_demand, max_workers);
395 for (arena_list_type::iterator it = arenas.
begin(); it != arenas.
end(); ++it) {
402 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
404 __TBB_ASSERT(max_workers == 0 || max_workers == 1, NULL);
405 allotted = a.my_global_concurrency_mode && assigned < max_workers ? 1 : 0;
410 allotted = tmp / workers_demand;
411 carry = tmp % workers_demand;
416 assigned += allotted;
418 __TBB_ASSERT( 0 <= assigned && assigned <= max_workers, NULL );
425 for ( arena_list_type::iterator it = arenas.
begin(); it != arenas.
end(); ++it )
432 #if __TBB_TASK_PRIORITY
433 inline void market::update_global_top_priority ( intptr_t newPriority ) {
435 my_global_top_priority = newPriority;
436 my_priority_levels[newPriority].workers_available =
437 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
441 advance_global_reload_epoch();
444 inline void market::reset_global_priority () {
445 my_global_bottom_priority = normalized_normal_priority;
446 update_global_top_priority(normalized_normal_priority);
454 int p = my_global_top_priority;
462 while ( !a &&
p >= my_global_bottom_priority ) {
463 priority_level_info &pl = my_priority_levels[
p--];
476 intptr_t i = highest_affected_priority;
477 int available = my_priority_levels[i].workers_available;
478 for ( ; i >= my_global_bottom_priority; --i ) {
479 priority_level_info &pl = my_priority_levels[i];
480 pl.workers_available = available;
481 if ( pl.workers_requested ) {
482 available -=
update_allotment( pl.arenas, pl.workers_requested, available );
483 if ( available <= 0 ) {
489 __TBB_ASSERT( i <= my_global_bottom_priority || !available, NULL );
490 for ( --i; i >= my_global_bottom_priority; --i ) {
491 priority_level_info &pl = my_priority_levels[i];
492 pl.workers_available = 0;
493 arena_list_type::iterator it = pl.arenas.begin();
494 for ( ; it != pl.arenas.end(); ++it ) {
495 __TBB_ASSERT( it->my_num_workers_requested >= 0 || !it->my_num_workers_allotted, NULL );
496 it->my_num_workers_allotted = 0;
502 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
503 void market::enable_mandatory_concurrency_impl ( arena *a ) {
507 a->my_global_concurrency_mode =
true;
508 my_mandatory_num_requested++;
511 void market::enable_mandatory_concurrency ( arena *a ) {
518 enable_mandatory_concurrency_impl(a);
519 delta = update_workers_request();
523 my_server->adjust_job_count_estimate(delta);
526 void market::disable_mandatory_concurrency_impl(arena* a) {
530 a->my_global_concurrency_mode =
false;
531 my_mandatory_num_requested--;
534 void market::mandatory_concurrency_disable ( arena *a ) {
538 if (!a->my_global_concurrency_mode)
543 if (a->has_enqueued_tasks())
547 disable_mandatory_concurrency_impl(a);
549 delta = update_workers_request();
552 my_server->adjust_job_count_estimate(delta);
565 if ( prev_req <= 0 ) {
571 else if ( prev_req < 0 ) {
575 #if !__TBB_TASK_PRIORITY
578 intptr_t
p = a.my_top_priority;
579 priority_level_info &pl = my_priority_levels[
p];
580 pl.workers_requested += delta;
583 if ( a.my_top_priority != normalized_normal_priority ) {
585 update_arena_top_priority( a, normalized_normal_priority );
587 a.my_bottom_priority = normalized_normal_priority;
590 if (my_mandatory_num_requested > 0) {
592 effective_soft_limit = 1;
595 if (
p == my_global_top_priority ) {
596 if ( !pl.workers_requested ) {
597 while ( --
p >= my_global_bottom_priority && !my_priority_levels[
p].workers_requested )
599 if (
p < my_global_bottom_priority )
600 reset_global_priority();
602 update_global_top_priority(
p);
604 my_priority_levels[my_global_top_priority].workers_available = effective_soft_limit;
607 else if (
p > my_global_top_priority ) {
611 update_global_top_priority(
p);
616 else if (
p == my_global_bottom_priority ) {
617 if ( !pl.workers_requested ) {
618 while ( ++
p <= my_global_top_priority && !my_priority_levels[
p].workers_requested )
620 if (
p > my_global_top_priority )
621 reset_global_priority();
623 my_global_bottom_priority =
p;
628 else if (
p < my_global_bottom_priority ) {
629 int prev_bottom = my_global_bottom_priority;
630 my_global_bottom_priority =
p;
634 __TBB_ASSERT( my_global_bottom_priority <
p &&
p < my_global_top_priority, NULL );
655 my_server->adjust_job_count_estimate( delta );
665 for (
int i = 0; i < 2; ++i) {
705 #if __TBB_TASK_GROUP_CONTEXT
708 my_workers[index - 1] =
s;
713 #if __TBB_TASK_PRIORITY
714 void market::update_arena_top_priority (
arena& a, intptr_t new_priority ) {
716 __TBB_ASSERT( a.my_top_priority != new_priority, NULL );
717 priority_level_info &prev_level = my_priority_levels[a.my_top_priority],
718 &new_level = my_priority_levels[new_priority];
720 a.my_top_priority = new_priority;
725 __TBB_ASSERT( prev_level.workers_requested >= 0 && new_level.workers_requested >= 0, NULL );
728 bool market::lower_arena_priority (
arena& a, intptr_t new_priority, uintptr_t old_reload_epoch ) {
731 if ( a.my_reload_epoch != old_reload_epoch ) {
736 __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
738 intptr_t
p = a.my_top_priority;
739 update_arena_top_priority( a, new_priority );
741 if ( my_global_bottom_priority > new_priority ) {
742 my_global_bottom_priority = new_priority;
744 if (
p == my_global_top_priority && !my_priority_levels[
p].workers_requested ) {
746 for ( --
p;
p>my_global_bottom_priority && !my_priority_levels[
p].workers_requested; --
p )
continue;
747 update_global_top_priority(
p);
752 __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
757 bool market::update_arena_priority ( arena& a, intptr_t new_priority ) {
761 tbb::internal::assert_priority_valid(new_priority);
762 __TBB_ASSERT( my_global_top_priority >= a.my_top_priority || a.my_num_workers_requested <= 0, NULL );
764 if ( a.my_top_priority == new_priority ) {
767 else if ( a.my_top_priority > new_priority ) {
768 if ( a.my_bottom_priority > new_priority )
769 a.my_bottom_priority = new_priority;
772 else if ( a.my_num_workers_requested <= 0 ) {
776 __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
778 intptr_t
p = a.my_top_priority;
779 intptr_t highest_affected_level =
max(
p, new_priority);
780 update_arena_top_priority( a, new_priority );
782 if ( my_global_top_priority < new_priority ) {
783 update_global_top_priority(new_priority);
785 else if ( my_global_top_priority == new_priority ) {
786 advance_global_reload_epoch();
789 __TBB_ASSERT( new_priority < my_global_top_priority, NULL );
790 __TBB_ASSERT( new_priority > my_global_bottom_priority, NULL );
791 if (
p == my_global_top_priority && !my_priority_levels[
p].workers_requested ) {
794 for ( --
p; !my_priority_levels[
p].workers_requested; --
p )
continue;
796 update_global_top_priority(
p);
797 highest_affected_level =
p;
800 if (
p == my_global_bottom_priority ) {
803 __TBB_ASSERT( new_priority <= my_global_top_priority, NULL );
804 while ( my_global_bottom_priority < my_global_top_priority
805 && !my_priority_levels[my_global_bottom_priority].workers_requested )
806 ++my_global_bottom_priority;
807 __TBB_ASSERT( my_global_bottom_priority <= new_priority, NULL );
808 __TBB_ASSERT( my_priority_levels[my_global_bottom_priority].workers_requested > 0, NULL );
812 __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );