Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
flow_graph.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_H
18 #define __TBB_flow_graph_H
19 
20 #define __TBB_flow_graph_H_include_area
22 
23 #include "tbb_stddef.h"
24 #include "atomic.h"
25 #include "spin_mutex.h"
26 #include "null_mutex.h"
27 #include "spin_rw_mutex.h"
28 #include "null_rw_mutex.h"
29 #include "task.h"
31 #include "tbb_exception.h"
34 #include "tbb_profiling.h"
35 #include "task_arena.h"
36 
37 #if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
38  #if __INTEL_COMPILER
39  // Disabled warning "routine is both inline and noinline"
40  #pragma warning (push)
41  #pragma warning( disable: 2196 )
42  #endif
43  #define __TBB_NOINLINE_SYM __attribute__((noinline))
44 #else
45  #define __TBB_NOINLINE_SYM
46 #endif
47 
48 #if __TBB_PREVIEW_ASYNC_MSG
49 #include <vector> // std::vector in internal::async_storage
50 #include <memory> // std::shared_ptr in async_msg
51 #endif
52 
53 #if __TBB_PREVIEW_STREAMING_NODE
54 // For streaming_node
55 #include <array> // std::array
56 #include <unordered_map> // std::unordered_map
57 #include <type_traits> // std::decay, std::true_type, std::false_type
58 #endif // __TBB_PREVIEW_STREAMING_NODE
59 
60 #if TBB_DEPRECATED_FLOW_ENQUEUE
61 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
62 #else
63 #define FLOW_SPAWN(a) tbb::task::spawn((a))
64 #endif
65 
66 // use the VC10 or gcc version of tuple if it is available.
67 #if __TBB_CPP11_TUPLE_PRESENT
68  #include <tuple>
69 namespace tbb {
70  namespace flow {
71  using std::tuple;
72  using std::tuple_size;
73  using std::tuple_element;
74  using std::get;
75  }
76 }
77 #else
78  #include "compat/tuple"
79 #endif
80 
81 #include<list>
82 #include<queue>
83 
94 namespace tbb {
95 namespace flow {
96 
98 enum concurrency { unlimited = 0, serial = 1 };
99 
100 namespace interface11 {
103 struct null_type {};
104 
106 class continue_msg {};
107 
109 template< typename T > class sender;
110 template< typename T > class receiver;
113 template< typename T, typename U > class limiter_node; // needed for resetting decrementer
115 template< typename R, typename B > class run_and_put_task;
117 namespace internal {
118 
119 template<typename T, typename M> class successor_cache;
120 template<typename T, typename M> class broadcast_cache;
121 template<typename T, typename M> class round_robin_cache;
122 template<typename T, typename M> class predecessor_cache;
123 template<typename T, typename M> class reservable_predecessor_cache;
125 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
126 namespace order {
127 struct following;
128 struct preceding;
129 }
130 template<typename Order, typename... Args> struct node_set;
131 #endif
133 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
134 // Holder of edges both for caches and for those nodes which do not have predecessor caches.
135 // C == receiver< ... > or sender< ... >, depending.
136 template<typename C>
137 class edge_container {
139 public:
140  typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
141 
142  void add_edge(C &s) {
143  built_edges.push_back(&s);
144  }
146  void delete_edge(C &s) {
147  for (typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i) {
148  if (*i == &s) {
149  (void)built_edges.erase(i);
150  return; // only remove one predecessor per request
151  }
152  }
153  }
155  void copy_edges(edge_list_type &v) {
156  v = built_edges;
157  }
158 
159  size_t edge_count() {
160  return (size_t)(built_edges.size());
161  }
163  void clear() {
164  built_edges.clear();
165  }
166 
167  // methods remove the statement from all predecessors/successors liste in the edge
168  // container.
169  template< typename S > void sender_extract(S &s);
170  template< typename R > void receiver_extract(R &r);
172 private:
173  edge_list_type built_edges;
174 }; // class edge_container
175 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
177 } // namespace internal
179 } // namespace interfaceX
180 } // namespace flow
181 } // namespace tbb
186 namespace tbb {
187 namespace flow {
188 namespace interface11 {
190 // enqueue left task if necessary. Returns the non-enqueued task if there is one.
191 static inline tbb::task *combine_tasks(graph& g, tbb::task * left, tbb::task * right) {
192  // if no RHS task, don't change left.
193  if (right == NULL) return left;
194  // right != NULL
195  if (left == NULL) return right;
196  if (left == SUCCESSFULLY_ENQUEUED) return right;
197  // left contains a task
198  if (right != SUCCESSFULLY_ENQUEUED) {
199  // both are valid tasks
201  return right;
202  }
203  return left;
204 }
206 #if __TBB_PREVIEW_ASYNC_MSG
208 template < typename T > class async_msg;
210 namespace internal {
212 template < typename T > class async_storage;
214 template< typename T, typename = void >
216  typedef async_msg<T> async_type;
217  typedef T filtered_type;
218 
219  static const bool is_async_type = false;
221  static const void* to_void_ptr(const T& t) {
222  return static_cast<const void*>(&t);
223  }
224 
225  static void* to_void_ptr(T& t) {
226  return static_cast<void*>(&t);
227  }
229  static const T& from_void_ptr(const void* p) {
230  return *static_cast<const T*>(p);
231  }
233  static T& from_void_ptr(void* p) {
234  return *static_cast<T*>(p);
235  }
236 
237  static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
238  if (is_async) {
239  // This (T) is NOT async and incoming 'A<X> t' IS async
240  // Get data from async_msg
242  task* const new_task = msg.my_storage->subscribe(*this_recv, this_recv->graph_reference());
243  // finalize() must be called after subscribe() because set() can be called in finalize()
244  // and 'this_recv' client must be subscribed by this moment
245  msg.finalize();
246  return new_task;
247  }
248  else {
249  // Incoming 't' is NOT async
250  return this_recv->try_put_task(from_void_ptr(p));
251  }
252  }
253 };
255 template< typename T >
256 struct async_helpers< T, typename std::enable_if< std::is_base_of<async_msg<typename T::async_msg_data_type>, T>::value >::type > {
257  typedef T async_type;
258  typedef typename T::async_msg_data_type filtered_type;
260  static const bool is_async_type = true;
262  // Receiver-classes use const interfaces
263  static const void* to_void_ptr(const T& t) {
264  return static_cast<const void*>(&static_cast<const async_msg<filtered_type>&>(t));
265  }
266 
267  static void* to_void_ptr(T& t) {
268  return static_cast<void*>(&static_cast<async_msg<filtered_type>&>(t));
269  }
270 
271  // Sender-classes use non-const interfaces
272  static const T& from_void_ptr(const void* p) {
273  return *static_cast<const T*>(static_cast<const async_msg<filtered_type>*>(p));
274  }
275 
276  static T& from_void_ptr(void* p) {
277  return *static_cast<T*>(static_cast<async_msg<filtered_type>*>(p));
278  }
280  // Used in receiver<T> class
281  static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
282  if (is_async) {
283  // Both are async
284  return this_recv->try_put_task(from_void_ptr(p));
285  }
286  else {
287  // This (T) is async and incoming 'X t' is NOT async
288  // Create async_msg for X
289  const filtered_type& t = async_helpers<filtered_type>::from_void_ptr(p);
290  const T msg(t);
291  return this_recv->try_put_task(msg);
292  }
293  }
294 };
297 
299  template< typename, typename > friend class internal::predecessor_cache;
300  template< typename, typename > friend class internal::reservable_predecessor_cache;
301 public:
304 
305  virtual ~untyped_sender() {}
307  // NOTE: Following part of PUBLIC section is copy-paste from original sender<T> class
308 
309  // TODO: Prevent untyped successor registration
310 
312  virtual bool register_successor( successor_type &r ) = 0;
315  virtual bool remove_successor( successor_type &r ) = 0;
318  virtual bool try_release( ) { return false; }
321  virtual bool try_consume( ) { return false; }
323 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
324  typedef internal::edge_container<successor_type> built_successors_type;
326  typedef built_successors_type::edge_list_type successor_list_type;
327  virtual built_successors_type &built_successors() = 0;
328  virtual void internal_add_built_successor( successor_type & ) = 0;
329  virtual void internal_delete_built_successor( successor_type & ) = 0;
330  virtual void copy_successors( successor_list_type &) = 0;
331  virtual size_t successor_count() = 0;
332 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
333 protected:
335  template< typename X >
336  bool try_get( X &t ) {
338  }
341  template< typename X >
342  bool try_reserve( X &t ) {
344  }
346  virtual bool try_get_wrapper( void* p, bool is_async ) = 0;
347  virtual bool try_reserve_wrapper( void* p, bool is_async ) = 0;
348 };
351  template< typename, typename > friend class run_and_put_task;
353  template< typename, typename > friend class internal::broadcast_cache;
354  template< typename, typename > friend class internal::round_robin_cache;
355  template< typename, typename > friend class internal::successor_cache;
357 #if __TBB_PREVIEW_OPENCL_NODE
358  template< typename, typename > friend class proxy_dependency_receiver;
359 #endif /* __TBB_PREVIEW_OPENCL_NODE */
360 public:
365  virtual ~untyped_receiver() {}
368  template<typename X>
369  bool try_put(const X& t) {
370  task *res = try_put_task(t);
371  if (!res) return false;
372  if (res != SUCCESSFULLY_ENQUEUED) internal::spawn_in_graph_arena(graph_reference(), *res);
373  return true;
374  }
376  // NOTE: Following part of PUBLIC section is copy-paste from original receiver<T> class
377 
378  // TODO: Prevent untyped predecessor registration
381  virtual bool register_predecessor( predecessor_type & ) { return false; }
384  virtual bool remove_predecessor( predecessor_type & ) { return false; }
386 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
387  typedef internal::edge_container<predecessor_type> built_predecessors_type;
388  typedef built_predecessors_type::edge_list_type predecessor_list_type;
389  virtual built_predecessors_type &built_predecessors() = 0;
390  virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
391  virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
392  virtual void copy_predecessors( predecessor_list_type & ) = 0;
393  virtual size_t predecessor_count() = 0;
394 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
395 protected:
396  template<typename X>
397  task *try_put_task(const X& t) {
399  }
401  virtual task* try_put_task_wrapper( const void* p, bool is_async ) = 0;
403  virtual graph& graph_reference() const = 0;
405  // NOTE: Following part of PROTECTED and PRIVATE sections is copy-paste from original receiver<T> class
408  virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
410  virtual bool is_continue_receiver() { return false; }
411 };
413 } // namespace internal
416 template< typename T >
418 public:
420  __TBB_DEPRECATED typedef T output_type;
425  virtual bool try_get( T & ) { return false; }
428  virtual bool try_reserve( T & ) { return false; }
430 protected:
431  virtual bool try_get_wrapper( void* p, bool is_async ) __TBB_override {
432  // Both async OR both are NOT async
435  }
436  // Else: this (T) is async OR incoming 't' is async
437  __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_get()");
438  return false;
439  }
441  virtual bool try_reserve_wrapper( void* p, bool is_async ) __TBB_override {
442  // Both async OR both are NOT async
445  }
446  // Else: this (T) is async OR incoming 't' is async
447  __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_reserve()");
448  return false;
449  }
450 }; // class sender<T>
453 template< typename T >
455  template< typename > friend class internal::async_storage;
456  template< typename, typename > friend struct internal::async_helpers;
457 public:
459  __TBB_DEPRECATED typedef T input_type;
465  return internal::untyped_receiver::try_put(t);
466  }
469  return internal::untyped_receiver::try_put(t);
470  }
472 protected:
473  virtual task* try_put_task_wrapper( const void *p, bool is_async ) __TBB_override {
475  }
478  virtual task *try_put_task(const T& t) = 0;
480 }; // class receiver<T>
482 #else // __TBB_PREVIEW_ASYNC_MSG
483 
485 template< typename T >
486 class sender {
487 public:
489  __TBB_DEPRECATED typedef T output_type;
492  __TBB_DEPRECATED typedef receiver<T> successor_type;
494  virtual ~sender() {}
496  // NOTE: Following part of PUBLIC section is partly copy-pasted in sender<T> under #if __TBB_PREVIEW_ASYNC_MSG
497 
499  __TBB_DEPRECATED virtual bool register_successor( successor_type &r ) = 0;
500 
502  __TBB_DEPRECATED virtual bool remove_successor( successor_type &r ) = 0;
505  virtual bool try_get( T & ) { return false; }
506 
508  virtual bool try_reserve( T & ) { return false; }
511  virtual bool try_release( ) { return false; }
514  virtual bool try_consume( ) { return false; }
516 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
517  __TBB_DEPRECATED typedef typename internal::edge_container<successor_type> built_successors_type;
519  __TBB_DEPRECATED typedef typename built_successors_type::edge_list_type successor_list_type;
520  __TBB_DEPRECATED virtual built_successors_type &built_successors() = 0;
521  __TBB_DEPRECATED virtual void internal_add_built_successor( successor_type & ) = 0;
522  __TBB_DEPRECATED virtual void internal_delete_built_successor( successor_type & ) = 0;
523  __TBB_DEPRECATED virtual void copy_successors( successor_list_type &) = 0;
524  __TBB_DEPRECATED virtual size_t successor_count() = 0;
525 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
526 }; // class sender<T>
527 
529 template< typename T >
530 class receiver {
531 public:
533  __TBB_DEPRECATED typedef T input_type;
534 
536  __TBB_DEPRECATED typedef sender<T> predecessor_type;
539  virtual ~receiver() {}
542  bool try_put( const T& t ) {
543  task *res = try_put_task(t);
544  if (!res) return false;
545  if (res != SUCCESSFULLY_ENQUEUED) internal::spawn_in_graph_arena(graph_reference(), *res);
546  return true;
547  }
550 protected:
551  template< typename R, typename B > friend class run_and_put_task;
552  template< typename X, typename Y > friend class internal::broadcast_cache;
553  template< typename X, typename Y > friend class internal::round_robin_cache;
554  virtual task *try_put_task(const T& t) = 0;
555  virtual graph& graph_reference() const = 0;
556 public:
557  // NOTE: Following part of PUBLIC and PROTECTED sections is copy-pasted in receiver<T> under #if __TBB_PREVIEW_ASYNC_MSG
558 
560  __TBB_DEPRECATED virtual bool register_predecessor( predecessor_type & ) { return false; }
563  __TBB_DEPRECATED virtual bool remove_predecessor( predecessor_type & ) { return false; }
564 
565 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
566  __TBB_DEPRECATED typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
567  __TBB_DEPRECATED typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
568  __TBB_DEPRECATED virtual built_predecessors_type &built_predecessors() = 0;
569  __TBB_DEPRECATED virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
570  __TBB_DEPRECATED virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
571  __TBB_DEPRECATED virtual void copy_predecessors( predecessor_list_type & ) = 0;
572  __TBB_DEPRECATED virtual size_t predecessor_count() = 0;
573 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
574 
575 protected:
577  virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
579  template<typename TT, typename M> friend class internal::successor_cache;
580  virtual bool is_continue_receiver() { return false; }
582 #if __TBB_PREVIEW_OPENCL_NODE
583  template< typename, typename > friend class proxy_dependency_receiver;
584 #endif /* __TBB_PREVIEW_OPENCL_NODE */
585 }; // class receiver<T>
587 #endif // __TBB_PREVIEW_ASYNC_MSG
588 
591 class continue_receiver : public receiver< continue_msg > {
592 public:
599 
602  __TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority)) {
603  my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
604  my_current_count = 0;
605  __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = priority; )
606  }
607 
610  my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
611  my_current_count = 0;
612  __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = src.my_priority; )
613  }
614 
618  ++my_predecessor_count;
619  return true;
620  }
621 
627  spin_mutex::scoped_lock l(my_mutex);
628  --my_predecessor_count;
629  return true;
630  }
632 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
633  __TBB_DEPRECATED typedef internal::edge_container<predecessor_type> built_predecessors_type;
634  __TBB_DEPRECATED typedef built_predecessors_type::edge_list_type predecessor_list_type;
635  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
637  __TBB_DEPRECATED void internal_add_built_predecessor( predecessor_type &s) __TBB_override {
639  my_built_predecessors.add_edge( s );
640  }
642  __TBB_DEPRECATED void internal_delete_built_predecessor( predecessor_type &s) __TBB_override {
644  my_built_predecessors.delete_edge(s);
645  }
647  __TBB_DEPRECATED void copy_predecessors( predecessor_list_type &v) __TBB_override {
649  my_built_predecessors.copy_edges(v);
650  }
652  __TBB_DEPRECATED size_t predecessor_count() __TBB_override {
654  return my_built_predecessors.edge_count();
655  }
656 
657 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
659 protected:
660  template< typename R, typename B > friend class run_and_put_task;
661  template<typename X, typename Y> friend class internal::broadcast_cache;
662  template<typename X, typename Y> friend class internal::round_robin_cache;
663  // execute body is supposed to be too small to create a task for.
664  task *try_put_task( const input_type & ) __TBB_override {
665  {
667  if ( ++my_current_count < my_predecessor_count )
669  else
670  my_current_count = 0;
671  }
672  task * res = execute();
673  return res? res : SUCCESSFULLY_ENQUEUED;
674  }
675 
676 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
677  // continue_receiver must contain its own built_predecessors because it does
678  // not have a node_cache.
679  built_predecessors_type my_built_predecessors;
680 #endif
686  // the friend declaration in the base class did not eliminate the "protected class"
687  // error in gcc 4.1.2
688  template<typename U, typename V> friend class tbb::flow::interface11::limiter_node;
691  my_current_count = 0;
692  if (f & rf_clear_edges) {
693 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
694  my_built_predecessors.clear();
695 #endif
696  my_predecessor_count = my_initial_predecessor_count;
697  }
698  }
703  virtual task * execute() = 0;
704  template<typename TT, typename M> friend class internal::successor_cache;
705  bool is_continue_receiver() __TBB_override { return true; }
707 }; // class continue_receiver
709 } // interfaceX
711 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
712  template <typename K, typename T>
713  K key_from_message( const T &t ) {
714  return t.key();
715  }
716 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
717 
721 } // flow
722 } // tbb
726 
727 namespace tbb {
728 namespace flow {
729 namespace interface11 {
734 #if __TBB_PREVIEW_ASYNC_MSG
736 #endif
738 
739 template <typename C, typename N>
740 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
741 {
742  if (begin) current_node = my_graph->my_nodes;
743  //else it is an end iterator by default
744 }
745 
746 template <typename C, typename N>
748  __TBB_ASSERT(current_node, "graph_iterator at end");
749  return *operator->();
750 }
751 
752 template <typename C, typename N>
754  return current_node;
755 }
757 template <typename C, typename N>
759  if (current_node) current_node = current_node->next;
760 }
762 } // namespace interfaceX
763 
764 namespace interface10 {
766 inline graph::graph() : my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
768  own_context = true;
769  cancelled = false;
770  caught_exception = false;
771  my_context = new task_group_context(tbb::internal::FLOW_TASKS);
775  my_is_active = true;
776 }
778 inline graph::graph(task_group_context& use_this_context) :
779  my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
781  own_context = false;
785  my_is_active = true;
786 }
787 
788 inline graph::~graph() {
789  wait_for_all();
791  tbb::task::destroy(*my_root_task);
792  if (own_context) delete my_context;
793  delete my_task_arena;
794 }
795 
796 inline void graph::reserve_wait() {
800  }
801 }
802 
803 inline void graph::release_wait() {
804  if (my_root_task) {
807  }
808 }
811  n->next = NULL;
812  {
815  if (my_nodes_last) my_nodes_last->next = n;
816  my_nodes_last = n;
817  if (!my_nodes) my_nodes = n;
818  }
819 }
820 
822  {
824  __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
825  if (n->prev) n->prev->next = n->next;
826  if (n->next) n->next->prev = n->prev;
827  if (my_nodes_last == n) my_nodes_last = n->prev;
828  if (my_nodes == n) my_nodes = n->next;
829  }
830  n->prev = n->next = NULL;
831 }
832 
834  // reset context
836 
837  if(my_context) my_context->reset();
838  cancelled = false;
839  caught_exception = false;
840  // reset all the nodes comprising the graph
841  for(iterator ii = begin(); ii != end(); ++ii) {
842  tbb::flow::interface11::graph_node *my_p = &(*ii);
843  my_p->reset_node(f);
844  }
845  // Reattach the arena. Might be useful to run the graph in a particular task_arena
846  // while not limiting graph lifetime to a single task_arena::execute() call.
847  prepare_task_arena( /*reinit=*/true );
849  // now spawn the tasks necessary to start the graph
850  for(task_list_type::iterator rti = my_reset_task_list.begin(); rti != my_reset_task_list.end(); ++rti) {
852  }
853  my_reset_task_list.clear();
854 }
855 
856 inline graph::iterator graph::begin() { return iterator(this, true); }
858 inline graph::iterator graph::end() { return iterator(this, false); }
860 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
861 
862 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
863 
864 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
865 
866 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
868 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
869 inline void graph::set_name(const char *name) {
870  tbb::internal::fgt_graph_desc(this, name);
871 }
872 #endif
873 
874 } // namespace interface10
876 namespace interface11 {
878 inline graph_node::graph_node(graph& g) : my_graph(g) {
879  my_graph.register_node(this);
880 }
881 
883  my_graph.remove_node(this);
884 }
885 
888 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
889 using internal::node_set;
890 #endif
891 
893 template < typename Output >
894 class source_node : public graph_node, public sender< Output > {
895 public:
897  typedef Output output_type;
898 
901 
902  //Source node has no input type
904 
905 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
906  typedef typename sender<output_type>::built_successors_type built_successors_type;
907  typedef typename sender<output_type>::successor_list_type successor_list_type;
908 #endif
909 
911  template< typename Body >
912  __TBB_NOINLINE_SYM source_node( graph &g, Body body, bool is_active = true )
913  : graph_node(g), my_active(is_active), init_my_active(is_active),
914  my_body( new internal::source_body_leaf< output_type, Body>(body) ),
915  my_init_body( new internal::source_body_leaf< output_type, Body>(body) ),
916  my_reserved(false), my_has_cached_item(false)
917  {
919  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
920  static_cast<sender<output_type> *>(this), this->my_body );
921  }
922 
923 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
924  template <typename Body, typename... Successors>
925  source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body, bool is_active = true )
926  : source_node(successors.graph_reference(), body, is_active) {
927  make_edges(*this, successors);
928  }
929 #endif
930 
932  __TBB_NOINLINE_SYM source_node( const source_node& src ) :
933  graph_node(src.my_graph), sender<Output>(),
934  my_active(src.init_my_active),
935  init_my_active(src.init_my_active), my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
936  my_reserved(false), my_has_cached_item(false)
937  {
938  my_successors.set_owner(this);
939  tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
940  static_cast<sender<output_type> *>(this), this->my_body );
941  }
942 
944  ~source_node() { delete my_body; delete my_init_body; }
945 
946 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
947  void set_name( const char *name ) __TBB_override {
949  }
950 #endif
953  bool register_successor( successor_type &r ) __TBB_override {
954  spin_mutex::scoped_lock lock(my_mutex);
956  if ( my_active )
957  spawn_put();
958  return true;
959  }
962  bool remove_successor( successor_type &r ) __TBB_override {
963  spin_mutex::scoped_lock lock(my_mutex);
965  return true;
966  }
967 
968 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
969 
970  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
972  void internal_add_built_successor( successor_type &r) __TBB_override {
974  my_successors.internal_add_built_successor(r);
975  }
976 
977  void internal_delete_built_successor( successor_type &r) __TBB_override {
979  my_successors.internal_delete_built_successor(r);
980  }
981 
982  size_t successor_count() __TBB_override {
983  spin_mutex::scoped_lock lock(my_mutex);
984  return my_successors.successor_count();
985  }
986 
987  void copy_successors(successor_list_type &v) __TBB_override {
988  spin_mutex::scoped_lock l(my_mutex);
989  my_successors.copy_successors(v);
990  }
991 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
992 
994  bool try_get( output_type &v ) __TBB_override {
995  spin_mutex::scoped_lock lock(my_mutex);
996  if ( my_reserved )
997  return false;
998 
999  if ( my_has_cached_item ) {
1000  v = my_cached_item;
1001  my_has_cached_item = false;
1002  return true;
1003  }
1004  // we've been asked to provide an item, but we have none. enqueue a task to
1005  // provide one.
1006  spawn_put();
1007  return false;
1008  }
1009 
1011  bool try_reserve( output_type &v ) __TBB_override {
1012  spin_mutex::scoped_lock lock(my_mutex);
1013  if ( my_reserved ) {
1014  return false;
1015  }
1016 
1017  if ( my_has_cached_item ) {
1018  v = my_cached_item;
1019  my_reserved = true;
1020  return true;
1021  } else {
1022  return false;
1023  }
1024  }
1025 
1030  __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1031  my_reserved = false;
1032  if(!my_successors.empty())
1033  spawn_put();
1034  return true;
1035  }
1036 
1039  spin_mutex::scoped_lock lock(my_mutex);
1040  __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1041  my_reserved = false;
1042  my_has_cached_item = false;
1043  if ( !my_successors.empty() ) {
1044  spawn_put();
1045  }
1046  return true;
1047  }
1050  void activate() {
1052  my_active = true;
1054  spawn_put();
1055  }
1057  template<typename Body>
1059  internal::source_body<output_type> &body_ref = *this->my_body;
1060  return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
1061  }
1063 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1064  void extract( ) __TBB_override {
1065  my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
1066  my_active = init_my_active;
1067  my_reserved = false;
1068  if(my_has_cached_item) my_has_cached_item = false;
1069  }
1070 #endif
1072 protected:
1075  void reset_node( reset_flags f) __TBB_override {
1076  my_active = init_my_active;
1077  my_reserved =false;
1078  if(my_has_cached_item) {
1079  my_has_cached_item = false;
1080  }
1082  if(f & rf_reset_bodies) {
1084  delete my_body;
1085  my_body = tmp;
1086  }
1087  if(my_active)
1088  internal::add_task_to_graph_reset_list(this->my_graph, create_put_task());
1089  }
1091 private:
1100  output_type my_cached_item;
1101 
1102  // used by apply_body_bypass, can invoke body of node.
1103  bool try_reserve_apply_body(output_type &v) {
1104  spin_mutex::scoped_lock lock(my_mutex);
1105  if ( my_reserved ) {
1106  return false;
1107  }
1108  if ( !my_has_cached_item ) {
1109  tbb::internal::fgt_begin_body( my_body );
1110  bool r = (*my_body)(my_cached_item);
1111  tbb::internal::fgt_end_body( my_body );
1112  if (r) {
1113  my_has_cached_item = true;
1114  }
1115  }
1116  if ( my_has_cached_item ) {
1117  v = my_cached_item;
1118  my_reserved = true;
1119  return true;
1120  } else {
1121  return false;
1122  }
1123  }
1124 
1125  // when resetting, and if the source_node was created with my_active == true, then
1126  // when we reset the node we must store a task to run the node, and spawn it only
1127  // after the reset is complete and is_active() is again true. This is why we don't
1128  // test for is_active() here.
1130  return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1132  }
1133 
1135  void spawn_put( ) {
1136  if(internal::is_graph_active(this->my_graph)) {
1137  internal::spawn_in_graph_arena(this->my_graph, *create_put_task());
1138  }
1139  }
1140 
1141  friend class internal::source_task_bypass< source_node< output_type > >;
1144  output_type v;
1145  if ( !try_reserve_apply_body(v) )
1146  return NULL;
1147 
1148  task *last_task = my_successors.try_put_task(v);
1149  if ( last_task )
1150  try_consume();
1151  else
1152  try_release();
1153  return last_task;
1154  }
1155 }; // class source_node
1156 
1158 template < typename Input, typename Output = continue_msg, typename Policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1159 class function_node : public graph_node, public internal::function_input<Input,Output,Policy,Allocator>, public internal::function_output<Output> {
1160 public:
1161  typedef Input input_type;
1162  typedef Output output_type;
1168 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1169  typedef typename input_impl_type::predecessor_list_type predecessor_list_type;
1170  typedef typename fOutput_type::successor_list_type successor_list_type;
1171 #endif
1172  using input_impl_type::my_predecessors;
1173 
1175  // input_queue_type is allocated here, but destroyed in the function_input_base.
1176  // TODO: pass the graph_buffer_policy to the function_input_base so it can all
1177  // be done in one place. This would be an interface-breaking change.
1178  template< typename Body >
1181  Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
1182 #else
1184 #endif
1185  : graph_node(g), input_impl_type(g, concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)),
1186  fOutput_type(g) {
1187  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1188  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1189  }
1190 
1191 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1192  template <typename Body>
1193  function_node( graph& g, size_t concurrency, Body body, node_priority_t priority )
1194  : function_node(g, concurrency, body, Policy(), priority) {}
1195 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1196 
1197 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1198  template <typename Body, typename... Args>
1199  function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
1201  : function_node(nodes.graph_reference(), concurrency, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1202  make_edges_in_order(nodes, *this);
1203  }
1204 
1205 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1206  template <typename Body, typename... Args>
1207  function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority )
1208  : function_node(nodes, concurrency, body, Policy(), priority) {}
1209 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1210 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1214  graph_node(src.my_graph),
1215  input_impl_type(src),
1216  fOutput_type(src.my_graph) {
1217  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1218  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1219  }
1220 
1221 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1222  void set_name( const char *name ) __TBB_override {
1224  }
1225 #endif
1226 
1227 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1228  void extract( ) __TBB_override {
1229  my_predecessors.built_predecessors().receiver_extract(*this);
1230  successors().built_successors().sender_extract(*this);
1231  }
1232 #endif
1233 
1234 protected:
1235  template< typename R, typename B > friend class run_and_put_task;
1236  template<typename X, typename Y> friend class internal::broadcast_cache;
1237  template<typename X, typename Y> friend class internal::round_robin_cache;
1238  using input_impl_type::try_put_task;
1239 
1240  internal::broadcast_cache<output_type> &successors () __TBB_override { return fOutput_type::my_successors; }
1241 
1243  input_impl_type::reset_function_input(f);
1244  // TODO: use clear() instead.
1245  if(f & rf_clear_edges) {
1246  successors().clear();
1247  my_predecessors.clear();
1248  }
1249  __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
1250  __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1251  }
1252 
1253 }; // class function_node
1256 // Output is a tuple of output types.
1257 template < typename Input, typename Output, typename Policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1259  public graph_node,
1261  <
1262  Input,
1263  typename internal::wrap_tuple_elements<
1264  tbb::flow::tuple_size<Output>::value, // #elements in tuple
1265  internal::multifunction_output, // wrap this around each element
1266  Output // the tuple providing the types
1267  >::type,
1268  Policy,
1269  Allocator
1270  > {
1271 protected:
1273 public:
1274  typedef Input input_type;
1279 private:
1281  using input_impl_type::my_predecessors;
1282 public:
1283  template<typename Body>
1285  graph &g, size_t concurrency,
1288 #else
1290 #endif
1291  ) : graph_node(g), base_type(g, concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
1292  tbb::internal::fgt_multioutput_node_with_body<N>(
1293  CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1294  &this->my_graph, static_cast<receiver<input_type> *>(this),
1295  this->output_ports(), this->my_body
1296  );
1297  }
1298 
1299 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1300  template <typename Body>
1301  __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
1302  : multifunction_node(g, concurrency, body, Policy(), priority) {}
1303 #endif // TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1304 
1305 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1306  template <typename Body, typename... Args>
1307  __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
1309  : multifunction_node(nodes.graph_reference(), concurrency, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1310  make_edges_in_order(nodes, *this);
1311  }
1313 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1314  template <typename Body, typename... Args>
1315  __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
1316  : multifunction_node(nodes, concurrency, body, Policy(), priority) {}
1317 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1318 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1319 
1320  __TBB_NOINLINE_SYM multifunction_node( const multifunction_node &other) :
1321  graph_node(other.my_graph), base_type(other) {
1322  tbb::internal::fgt_multioutput_node_with_body<N>( CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1323  &this->my_graph, static_cast<receiver<input_type> *>(this),
1324  this->output_ports(), this->my_body );
1325  }
1326 
1327 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1328  void set_name( const char *name ) __TBB_override {
1330  }
1331 #endif
1332 
1333 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1334  void extract( ) __TBB_override {
1335  my_predecessors.built_predecessors().receiver_extract(*this);
1336  base_type::extract();
1337  }
1338 #endif
1339  // all the guts are in multifunction_input...
1340 protected:
1341  void reset_node(reset_flags f) __TBB_override { base_type::reset(f); }
1342 }; // multifunction_node
1343 
1345 // successors. The node has unlimited concurrency, so it does not reject inputs.
1346 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
1347 class split_node : public graph_node, public receiver<TupleType> {
1350 public:
1351  typedef TupleType input_type;
1352  typedef Allocator allocator_type;
1353 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1354  typedef typename base_type::predecessor_type predecessor_type;
1355  typedef typename base_type::predecessor_list_type predecessor_list_type;
1356  typedef internal::predecessor_cache<input_type, null_mutex > predecessor_cache_type;
1357  typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
1358 #endif
1359 
1360  typedef typename internal::wrap_tuple_elements<
1361  N, // #elements in tuple
1362  internal::multifunction_output, // wrap this around each element
1363  TupleType // the tuple providing the types
1365 
1367  : graph_node(g),
1368  my_output_ports(internal::init_output_ports<output_ports_type>::call(g, my_output_ports))
1369  {
1370  tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1371  static_cast<receiver<input_type> *>(this), this->output_ports());
1372  }
1373 
1374 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1375  template <typename... Args>
1376  __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
1377  make_edges_in_order(nodes, *this);
1378  }
1379 #endif
1380 
1381  __TBB_NOINLINE_SYM split_node(const split_node& other)
1382  : graph_node(other.my_graph), base_type(other),
1383  my_output_ports(internal::init_output_ports<output_ports_type>::call(other.my_graph, my_output_ports))
1384  {
1385  tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1386  static_cast<receiver<input_type> *>(this), this->output_ports());
1387  }
1388 
1389 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1390  void set_name( const char *name ) __TBB_override {
1392  }
1393 #endif
1394 
1395  output_ports_type &output_ports() { return my_output_ports; }
1396 
1397 protected:
1398  task *try_put_task(const TupleType& t) __TBB_override {
1399  // Sending split messages in parallel is not justified, as overheads would prevail.
1400  // Also, we do not have successors here. So we just tell the task returned here is successful.
1401  return internal::emit_element<N>::emit_this(this->my_graph, t, output_ports());
1402  }
1404  if (f & rf_clear_edges)
1405  internal::clear_element<N>::clear_this(my_output_ports);
1406 
1407  __TBB_ASSERT(!(f & rf_clear_edges) || internal::clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
1408  }
1411  return my_graph;
1412  }
1413 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1414 private:
1415  void extract() __TBB_override {}
1418  void internal_add_built_predecessor(predecessor_type&) __TBB_override {}
1419 
1421  void internal_delete_built_predecessor(predecessor_type&) __TBB_override {}
1423  size_t predecessor_count() __TBB_override { return 0; }
1424 
1425  void copy_predecessors(predecessor_list_type&) __TBB_override {}
1426 
1427  built_predecessors_type &built_predecessors() __TBB_override { return my_predessors; }
1430  built_predecessors_type my_predessors;
1431 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1432 
1433 private:
1434  output_ports_type my_output_ports;
1435 };
1436 
1438 template <typename Output, typename Policy = internal::Policy<void> >
1439 class continue_node : public graph_node, public internal::continue_input<Output, Policy>,
1440  public internal::function_output<Output> {
1441 public:
1443  typedef Output output_type;
1448 
1450  template <typename Body >
1452  graph &g,
1454  Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1455 #else
1457 #endif
1458  ) : graph_node(g), input_impl_type( g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority) ),
1459  fOutput_type(g) {
1460  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1461 
1462  static_cast<receiver<input_type> *>(this),
1463  static_cast<sender<output_type> *>(this), this->my_body );
1464  }
1465 
1466 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1467  template <typename Body>
1468  continue_node( graph& g, Body body, node_priority_t priority )
1469  : continue_node(g, body, Policy(), priority) {}
1470 #endif
1471 
1472 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1473  template <typename Body, typename... Args>
1474  continue_node( const node_set<Args...>& nodes, Body body,
1476  : continue_node(nodes.graph_reference(), body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority) ) {
1477  make_edges_in_order(nodes, *this);
1478  }
1479 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1480  template <typename Body, typename... Args>
1481  continue_node( const node_set<Args...>& nodes, Body body, node_priority_t priority)
1482  : continue_node(nodes, body, Policy(), priority) {}
1483 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1484 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1485 
1487  template <typename Body >
1489  graph &g, int number_of_predecessors,
1492 #else
1494 #endif
1495  ) : graph_node(g)
1496  , input_impl_type(g, number_of_predecessors, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)),
1497  fOutput_type(g) {
1498  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1499  static_cast<receiver<input_type> *>(this),
1500  static_cast<sender<output_type> *>(this), this->my_body );
1501  }
1502 
1503 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1504  template <typename Body>
1505  continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t priority)
1506  : continue_node(g, number_of_predecessors, body, Policy(), priority) {}
1507 #endif
1509 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1510  template <typename Body, typename... Args>
1511  continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1513  : continue_node(nodes.graph_reference(), number_of_predecessors, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1514  make_edges_in_order(nodes, *this);
1515  }
1517 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1518  template <typename Body, typename... Args>
1519  continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1520  Body body, node_priority_t priority )
1521  : continue_node(nodes, number_of_predecessors, body, Policy(), priority) {}
1522 #endif
1523 #endif
1524 
1527  graph_node(src.my_graph), input_impl_type(src),
1528  internal::function_output<Output>(src.my_graph) {
1529  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1530  static_cast<receiver<input_type> *>(this),
1531  static_cast<sender<output_type> *>(this), this->my_body );
1532  }
1533 
1534 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1535  void set_name( const char *name ) __TBB_override {
1536  tbb::internal::fgt_node_desc( this, name );
1537  }
1538 #endif
1539 
1540 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1541  void extract() __TBB_override {
1542  input_impl_type::my_built_predecessors.receiver_extract(*this);
1543  successors().built_successors().sender_extract(*this);
1544  }
1545 #endif
1547 protected:
1548  template< typename R, typename B > friend class run_and_put_task;
1549  template<typename X, typename Y> friend class internal::broadcast_cache;
1550  template<typename X, typename Y> friend class internal::round_robin_cache;
1551  using input_impl_type::try_put_task;
1552  internal::broadcast_cache<output_type> &successors () __TBB_override { return fOutput_type::my_successors; }
1553 
1554  void reset_node(reset_flags f) __TBB_override {
1555  input_impl_type::reset_receiver(f);
1556  if(f & rf_clear_edges)successors().clear();
1557  __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1558  }
1559 }; // continue_node
1560 
1562 template <typename T>
1563 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1564 public:
1565  typedef T input_type;
1566  typedef T output_type;
1569 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1570  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1571  typedef typename sender<output_type>::successor_list_type successor_list_type;
1572 #endif
1573 private:
1575 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1576  internal::edge_container<predecessor_type> my_built_predecessors;
1577  spin_mutex pred_mutex; // serialize accesses on edge_container
1578 #endif
1579 public:
1582  my_successors.set_owner( this );
1583  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1584  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1585  }
1587 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1588  template <typename... Args>
1589  broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1590  make_edges_in_order(nodes, *this);
1591  }
1592 #endif
1593 
1594  // Copy constructor
1595  __TBB_NOINLINE_SYM broadcast_node( const broadcast_node& src ) :
1596  graph_node(src.my_graph), receiver<T>(), sender<T>()
1597  {
1598  my_successors.set_owner( this );
1599  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1600  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1601  }
1602 
1603 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1604  void set_name( const char *name ) __TBB_override {
1605  tbb::internal::fgt_node_desc( this, name );
1606  }
1607 #endif
1608 
1610  bool register_successor( successor_type &r ) __TBB_override {
1611  my_successors.register_successor( r );
1612  return true;
1613  }
1616  bool remove_successor( successor_type &r ) __TBB_override {
1617  my_successors.remove_successor( r );
1618  return true;
1619  }
1621 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1622  typedef typename sender<T>::built_successors_type built_successors_type;
1624  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1626  void internal_add_built_successor(successor_type &r) __TBB_override {
1627  my_successors.internal_add_built_successor(r);
1628  }
1629 
1630  void internal_delete_built_successor(successor_type &r) __TBB_override {
1631  my_successors.internal_delete_built_successor(r);
1632  }
1633 
1634  size_t successor_count() __TBB_override {
1635  return my_successors.successor_count();
1636  }
1637 
1638  void copy_successors(successor_list_type &v) __TBB_override {
1639  my_successors.copy_successors(v);
1640  }
1641 
1642  typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1643 
1644  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1645 
1646  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
1648  my_built_predecessors.add_edge(p);
1649  }
1650 
1651  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
1652  spin_mutex::scoped_lock l(pred_mutex);
1653  my_built_predecessors.delete_edge(p);
1654  }
1656  size_t predecessor_count() __TBB_override {
1658  return my_built_predecessors.edge_count();
1659  }
1661  void copy_predecessors(predecessor_list_type &v) __TBB_override {
1663  my_built_predecessors.copy_edges(v);
1664  }
1666  void extract() __TBB_override {
1667  my_built_predecessors.receiver_extract(*this);
1668  my_successors.built_successors().sender_extract(*this);
1669  }
1670 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1672 protected:
1673  template< typename R, typename B > friend class run_and_put_task;
1674  template<typename X, typename Y> friend class internal::broadcast_cache;
1675  template<typename X, typename Y> friend class internal::round_robin_cache;
1678  task *new_task = my_successors.try_put_task(t);
1679  if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1680  return new_task;
1681  }
1684  return my_graph;
1685  }
1686 
1688 
1690  if (f&rf_clear_edges) {
1691  my_successors.clear();
1692 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1693  my_built_predecessors.clear();
1694 #endif
1695  }
1696  __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
1697  }
1698 }; // broadcast_node
1701 template <typename T, typename A=cache_aligned_allocator<T> >
1702 class buffer_node : public graph_node, public internal::reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
1703 public:
1704  typedef T input_type;
1705  typedef T output_type;
1709 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1710  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1711  typedef typename sender<output_type>::successor_list_type successor_list_type;
1712 #endif
1713 protected:
1714  typedef size_t size_type;
1717 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1718  internal::edge_container<predecessor_type> my_built_predecessors;
1719 #endif
1720 
1722 
1723  enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1724 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1725  , add_blt_succ, del_blt_succ,
1726  add_blt_pred, del_blt_pred,
1727  blt_succ_cnt, blt_pred_cnt,
1728  blt_succ_cpy, blt_pred_cpy // create vector copies of preds and succs
1729 #endif
1730  };
1732  // implements the aggregator_operation concept
1733  class buffer_operation : public internal::aggregated_operation< buffer_operation > {
1734  public:
1735  char type;
1736 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1737  task * ltask;
1738  union {
1739  input_type *elem;
1740  successor_type *r;
1741  predecessor_type *p;
1742  size_t cnt_val;
1743  successor_list_type *svec;
1744  predecessor_list_type *pvec;
1745  };
1746 #else
1747  T *elem;
1749  successor_type *r;
1750 #endif
1751  buffer_operation(const T& e, op_type t) : type(char(t))
1753 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1754  , ltask(NULL), elem(const_cast<T*>(&e))
1755 #else
1756  , elem(const_cast<T*>(&e)) , ltask(NULL)
1757 #endif
1758  {}
1759  buffer_operation(op_type t) : type(char(t)), ltask(NULL) {}
1760  };
1763  typedef internal::aggregating_functor<class_type, buffer_operation> handler_type;
1764  friend class internal::aggregating_functor<class_type, buffer_operation>;
1765  internal::aggregator< handler_type, buffer_operation> my_aggregator;
1767  virtual void handle_operations(buffer_operation *op_list) {
1768  handle_operations_impl(op_list, this);
1769  }
1770 
1771  template<typename derived_type>
1772  void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1773  __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1774 
1775  buffer_operation *tmp = NULL;
1776  bool try_forwarding = false;
1777  while (op_list) {
1778  tmp = op_list;
1779  op_list = op_list->next;
1780  switch (tmp->type) {
1781  case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1782  case rem_succ: internal_rem_succ(tmp); break;
1783  case req_item: internal_pop(tmp); break;
1784  case res_item: internal_reserve(tmp); break;
1785  case rel_res: internal_release(tmp); try_forwarding = true; break;
1786  case con_res: internal_consume(tmp); try_forwarding = true; break;
1787  case put_item: try_forwarding = internal_push(tmp); break;
1788  case try_fwd_task: internal_forward_task(tmp); break;
1789 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1790  // edge recording
1791  case add_blt_succ: internal_add_built_succ(tmp); break;
1792  case del_blt_succ: internal_del_built_succ(tmp); break;
1793  case add_blt_pred: internal_add_built_pred(tmp); break;
1794  case del_blt_pred: internal_del_built_pred(tmp); break;
1795  case blt_succ_cnt: internal_succ_cnt(tmp); break;
1796  case blt_pred_cnt: internal_pred_cnt(tmp); break;
1797  case blt_succ_cpy: internal_copy_succs(tmp); break;
1798  case blt_pred_cpy: internal_copy_preds(tmp); break;
1799 #endif
1800  }
1801  }
1802 
1803  derived->order();
1805  if (try_forwarding && !forwarder_busy) {
1806  if(internal::is_graph_active(this->my_graph)) {
1807  forwarder_busy = true;
1808  task *new_task = new(task::allocate_additional_child_of(*(this->my_graph.root_task()))) internal::
1811  // tmp should point to the last item handled by the aggregator. This is the operation
1812  // the handling thread enqueued. So modifying that record will be okay.
1813  // workaround for icc bug
1814  tbb::task *z = tmp->ltask;
1815  graph &g = this->my_graph;
1816  tmp->ltask = combine_tasks(g, z, new_task); // in case the op generated a task
1817  }
1818  }
1819  } // handle_operations
1822  return op_data.ltask;
1823  }
1825  inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1826  task *ft = grab_forwarding_task(op_data);
1827  if(ft) {
1829  return true;
1830  }
1831  return false;
1832  }
1833 
1835  virtual task *forward_task() {
1836  buffer_operation op_data(try_fwd_task);
1837  task *last_task = NULL;
1838  do {
1839  op_data.status = internal::WAIT;
1840  op_data.ltask = NULL;
1841  my_aggregator.execute(&op_data);
1842 
1843  // workaround for icc bug
1844  tbb::task *xtask = op_data.ltask;
1845  graph& g = this->my_graph;
1846  last_task = combine_tasks(g, last_task, xtask);
1847  } while (op_data.status ==internal::SUCCEEDED);
1848  return last_task;
1849  }
1850 
1853  my_successors.register_successor(*(op->r));
1855  }
1856 
1858  virtual void internal_rem_succ(buffer_operation *op) {
1859  my_successors.remove_successor(*(op->r));
1861  }
1862 
1863 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1864  typedef typename sender<T>::built_successors_type built_successors_type;
1865 
1866  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1868  virtual void internal_add_built_succ(buffer_operation *op) {
1869  my_successors.internal_add_built_successor(*(op->r));
1871  }
1873  virtual void internal_del_built_succ(buffer_operation *op) {
1874  my_successors.internal_delete_built_successor(*(op->r));
1876  }
1878  typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1880  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1882  virtual void internal_add_built_pred(buffer_operation *op) {
1883  my_built_predecessors.add_edge(*(op->p));
1885  }
1887  virtual void internal_del_built_pred(buffer_operation *op) {
1888  my_built_predecessors.delete_edge(*(op->p));
1890  }
1891 
1892  virtual void internal_succ_cnt(buffer_operation *op) {
1893  op->cnt_val = my_successors.successor_count();
1895  }
1896 
1897  virtual void internal_pred_cnt(buffer_operation *op) {
1898  op->cnt_val = my_built_predecessors.edge_count();
1900  }
1901 
1902  virtual void internal_copy_succs(buffer_operation *op) {
1903  my_successors.copy_successors(*(op->svec));
1905  }
1906 
1907  virtual void internal_copy_preds(buffer_operation *op) {
1908  my_built_predecessors.copy_edges(*(op->pvec));
1910  }
1911 
1912 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1913 
1914 private:
1915  void order() {}
1916 
1917  bool is_item_valid() {
1918  return this->my_item_valid(this->my_tail - 1);
1919  }
1920 
1921  void try_put_and_add_task(task*& last_task) {
1922  task *new_task = my_successors.try_put_task(this->back());
1923  if (new_task) {
1924  // workaround for icc bug
1925  graph& g = this->my_graph;
1926  last_task = combine_tasks(g, last_task, new_task);
1927  this->destroy_back();
1928  }
1929  }
1931 protected:
1933  virtual void internal_forward_task(buffer_operation *op) {
1934  internal_forward_task_impl(op, this);
1935  }
1937  template<typename derived_type>
1938  void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
1939  __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1940 
1941  if (this->my_reserved || !derived->is_item_valid()) {
1943  this->forwarder_busy = false;
1944  return;
1945  }
1946  // Try forwarding, giving each successor a chance
1947  task * last_task = NULL;
1948  size_type counter = my_successors.size();
1949  for (; counter > 0 && derived->is_item_valid(); --counter)
1950  derived->try_put_and_add_task(last_task);
1952  op->ltask = last_task; // return task
1953  if (last_task && !counter) {
1955  }
1956  else {
1958  forwarder_busy = false;
1959  }
1960  }
1961 
1962  virtual bool internal_push(buffer_operation *op) {
1963  this->push_back(*(op->elem));
1965  return true;
1966  }
1967 
1968  virtual void internal_pop(buffer_operation *op) {
1969  if(this->pop_back(*(op->elem))) {
1971  }
1972  else {
1974  }
1975  }
1976 
1978  if(this->reserve_front(*(op->elem))) {
1980  }
1981  else {
1983  }
1984  }
1985 
1987  this->consume_front();
1989  }
1992  this->release_front();
1994  }
1995 
1996 public:
1998  __TBB_NOINLINE_SYM explicit buffer_node( graph &g ) : graph_node(g), internal::reservable_item_buffer<T>(),
1999  forwarder_busy(false) {
2000  my_successors.set_owner(this);
2001  my_aggregator.initialize_handler(handler_type(this));
2002  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2003  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2004  }
2005 
2006 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2007  template <typename... Args>
2008  buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
2009  make_edges_in_order(nodes, *this);
2010  }
2011 #endif
2012 
2014  __TBB_NOINLINE_SYM buffer_node( const buffer_node& src ) : graph_node(src.my_graph),
2015  internal::reservable_item_buffer<T>(), receiver<T>(), sender<T>() {
2016  forwarder_busy = false;
2017  my_successors.set_owner(this);
2018  my_aggregator.initialize_handler(handler_type(this));
2019  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2020  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2021  }
2022 
2023 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2024  void set_name( const char *name ) __TBB_override {
2025  tbb::internal::fgt_node_desc( this, name );
2026  }
2027 #endif
2028 
2029  //
2030  // message sender implementation
2031  //
2032 
2034 
2035  bool register_successor( successor_type &r ) __TBB_override {
2036  buffer_operation op_data(reg_succ);
2037  op_data.r = &r;
2038  my_aggregator.execute(&op_data);
2039  (void)enqueue_forwarding_task(op_data);
2040  return true;
2041  }
2042 
2043 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2044  void internal_add_built_successor( successor_type &r) __TBB_override {
2045  buffer_operation op_data(add_blt_succ);
2046  op_data.r = &r;
2047  my_aggregator.execute(&op_data);
2048  }
2049 
2050  void internal_delete_built_successor( successor_type &r) __TBB_override {
2051  buffer_operation op_data(del_blt_succ);
2052  op_data.r = &r;
2053  my_aggregator.execute(&op_data);
2054  }
2055 
2056  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
2057  buffer_operation op_data(add_blt_pred);
2058  op_data.p = &p;
2059  my_aggregator.execute(&op_data);
2060  }
2061 
2062  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
2063  buffer_operation op_data(del_blt_pred);
2064  op_data.p = &p;
2065  my_aggregator.execute(&op_data);
2066  }
2067 
2068  size_t predecessor_count() __TBB_override {
2069  buffer_operation op_data(blt_pred_cnt);
2070  my_aggregator.execute(&op_data);
2071  return op_data.cnt_val;
2072  }
2073 
2074  size_t successor_count() __TBB_override {
2075  buffer_operation op_data(blt_succ_cnt);
2076  my_aggregator.execute(&op_data);
2077  return op_data.cnt_val;
2078  }
2079 
2080  void copy_predecessors( predecessor_list_type &v ) __TBB_override {
2081  buffer_operation op_data(blt_pred_cpy);
2082  op_data.pvec = &v;
2083  my_aggregator.execute(&op_data);
2084  }
2085 
2086  void copy_successors( successor_list_type &v ) __TBB_override {
2087  buffer_operation op_data(blt_succ_cpy);
2088  op_data.svec = &v;
2089  my_aggregator.execute(&op_data);
2090  }
2091 
2092 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2093 
2095 
2097  bool remove_successor( successor_type &r ) __TBB_override {
2098  r.remove_predecessor(*this);
2099  buffer_operation op_data(rem_succ);
2100  op_data.r = &r;
2101  my_aggregator.execute(&op_data);
2102  // even though this operation does not cause a forward, if we are the handler, and
2103  // a forward is scheduled, we may be the first to reach this point after the aggregator,
2104  // and so should check for the task.
2105  (void)enqueue_forwarding_task(op_data);
2106  return true;
2107  }
2108 
2110 
2112  bool try_get( T &v ) __TBB_override {
2113  buffer_operation op_data(req_item);
2114  op_data.elem = &v;
2115  my_aggregator.execute(&op_data);
2116  (void)enqueue_forwarding_task(op_data);
2117  return (op_data.status==internal::SUCCEEDED);
2118  }
2119 
2121 
2124  buffer_operation op_data(res_item);
2125  op_data.elem = &v;
2126  my_aggregator.execute(&op_data);
2127  (void)enqueue_forwarding_task(op_data);
2128  return (op_data.status==internal::SUCCEEDED);
2129  }
2130 
2132 
2134  buffer_operation op_data(rel_res);
2135  my_aggregator.execute(&op_data);
2136  (void)enqueue_forwarding_task(op_data);
2137  return true;
2138  }
2139 
2141 
2143  buffer_operation op_data(con_res);
2144  my_aggregator.execute(&op_data);
2145  (void)enqueue_forwarding_task(op_data);
2146  return true;
2147  }
2148 
2149 protected:
2150 
2151  template< typename R, typename B > friend class run_and_put_task;
2152  template<typename X, typename Y> friend class internal::broadcast_cache;
2153  template<typename X, typename Y> friend class internal::round_robin_cache;
2156  buffer_operation op_data(t, put_item);
2157  my_aggregator.execute(&op_data);
2158  task *ft = grab_forwarding_task(op_data);
2159  // sequencer_nodes can return failure (if an item has been previously inserted)
2160  // We have to spawn the returned task if our own operation fails.
2161 
2162  if(ft && op_data.status ==internal::FAILED) {
2163  // we haven't succeeded queueing the item, but for some reason the
2164  // call returned a task (if another request resulted in a successful
2165  // forward this could happen.) Queue the task and reset the pointer.
2167  }
2168  else if(!ft && op_data.status ==internal::SUCCEEDED) {
2169  ft = SUCCESSFULLY_ENQUEUED;
2170  }
2171  return ft;
2172  }
2173 
2175  return my_graph;
2176  }
2177 
2179 
2180 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2181 public:
2182  void extract() __TBB_override {
2183  my_built_predecessors.receiver_extract(*this);
2184  my_successors.built_successors().sender_extract(*this);
2185  }
2186 #endif
2187 
2188 protected:
2191  // TODO: just clear structures
2192  if (f&rf_clear_edges) {
2193  my_successors.clear();
2194 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2195  my_built_predecessors.clear();
2196 #endif
2197  }
2198  forwarder_busy = false;
2199  }
2200 }; // buffer_node
2201 
2203 template <typename T, typename A=cache_aligned_allocator<T> >
2204 class queue_node : public buffer_node<T, A> {
2205 protected:
2210 
2211 private:
2212  template<typename, typename> friend class buffer_node;
2213 
2214  bool is_item_valid() {
2215  return this->my_item_valid(this->my_head);
2216  }
2217 
2218  void try_put_and_add_task(task*& last_task) {
2219  task *new_task = this->my_successors.try_put_task(this->front());
2220  if (new_task) {
2221  // workaround for icc bug
2222  graph& graph_ref = this->graph_reference();
2223  last_task = combine_tasks(graph_ref, last_task, new_task);
2224  this->destroy_front();
2225  }
2226  }
2227 
2228 protected:
2229  void internal_forward_task(queue_operation *op) __TBB_override {
2230  this->internal_forward_task_impl(op, this);
2231  }
2232 
2233  void internal_pop(queue_operation *op) __TBB_override {
2234  if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2236  }
2237  else {
2238  this->pop_front(*(op->elem));
2240  }
2241  }
2242  void internal_reserve(queue_operation *op) __TBB_override {
2243  if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2245  }
2246  else {
2247  this->reserve_front(*(op->elem));
2249  }
2250  }
2251  void internal_consume(queue_operation *op) __TBB_override {
2252  this->consume_front();
2254  }
2255 
2256 public:
2257  typedef T input_type;
2258  typedef T output_type;
2261 
2263  __TBB_NOINLINE_SYM explicit queue_node( graph &g ) : base_type(g) {
2264  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2265  static_cast<receiver<input_type> *>(this),
2266  static_cast<sender<output_type> *>(this) );
2267  }
2268 
2269 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2270  template <typename... Args>
2271  queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
2272  make_edges_in_order(nodes, *this);
2273  }
2274 #endif
2275 
2277  __TBB_NOINLINE_SYM queue_node( const queue_node& src) : base_type(src) {
2278  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2279  static_cast<receiver<input_type> *>(this),
2280  static_cast<sender<output_type> *>(this) );
2281  }
2282 
2283 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2284  void set_name( const char *name ) __TBB_override {
2285  tbb::internal::fgt_node_desc( this, name );
2286  }
2287 #endif
2288 
2289 protected:
2291  base_type::reset_node(f);
2292  }
2293 }; // queue_node
2294 
2296 template< typename T, typename A=cache_aligned_allocator<T> >
2297 class sequencer_node : public queue_node<T, A> {
2299  // my_sequencer should be a benign function and must be callable
2300  // from a parallel context. Does this mean it needn't be reset?
2301 public:
2302  typedef T input_type;
2303  typedef T output_type;
2306 
2308  template< typename Sequencer >
2309  __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
2310  my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2311  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2312  static_cast<receiver<input_type> *>(this),
2313  static_cast<sender<output_type> *>(this) );
2314  }
2315 
2316 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2317  template <typename Sequencer, typename... Args>
2318  sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
2319  : sequencer_node(nodes.graph_reference(), s) {
2320  make_edges_in_order(nodes, *this);
2321  }
2322 #endif
2323 
2325  __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
2326  my_sequencer( src.my_sequencer->clone() ) {
2327  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2328  static_cast<receiver<input_type> *>(this),
2329  static_cast<sender<output_type> *>(this) );
2330  }
2331 
2333  ~sequencer_node() { delete my_sequencer; }
2334 
2335 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2336  void set_name( const char *name ) __TBB_override {
2337  tbb::internal::fgt_node_desc( this, name );
2338  }
2339 #endif
2340 
2341 protected:
2344 
2345 private:
2346  bool internal_push(sequencer_operation *op) __TBB_override {
2347  size_type tag = (*my_sequencer)(*(op->elem));
2348 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2349  if (tag < this->my_head) {
2350  // have already emitted a message with this tag
2352  return false;
2353  }
2354 #endif
2355  // cannot modify this->my_tail now; the buffer would be inconsistent.
2356  size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2357 
2358  if (this->size(new_tail) > this->capacity()) {
2359  this->grow_my_array(this->size(new_tail));
2360  }
2361  this->my_tail = new_tail;
2362 
2363  const internal::op_stat res = this->place_item(tag, *(op->elem)) ? internal::SUCCEEDED : internal::FAILED;
2364  __TBB_store_with_release(op->status, res);
2365  return res ==internal::SUCCEEDED;
2366  }
2367 }; // sequencer_node
2368 
2370 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
2371 class priority_queue_node : public buffer_node<T, A> {
2372 public:
2373  typedef T input_type;
2374  typedef T output_type;
2379 
2381  __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
2382  : buffer_node<T, A>(g), compare(comp), mark(0) {
2383  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2384  static_cast<receiver<input_type> *>(this),
2385  static_cast<sender<output_type> *>(this) );
2386  }
2387 
2388 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2389  template <typename... Args>
2390  priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
2391  : priority_queue_node(nodes.graph_reference(), comp) {
2392  make_edges_in_order(nodes, *this);
2393  }
2394 #endif
2395 
2397  __TBB_NOINLINE_SYM priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {
2398  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2399  static_cast<receiver<input_type> *>(this),
2400  static_cast<sender<output_type> *>(this) );
2401  }
2402 
2403 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2404  void set_name( const char *name ) __TBB_override {
2405  tbb::internal::fgt_node_desc( this, name );
2406  }
2407 #endif
2408 
2409 protected:
2410 
2412  mark = 0;
2413  base_type::reset_node(f);
2414  }
2415 
2419 
2421  void internal_forward_task(prio_operation *op) __TBB_override {
2422  this->internal_forward_task_impl(op, this);
2423  }
2424 
2425  void handle_operations(prio_operation *op_list) __TBB_override {
2426  this->handle_operations_impl(op_list, this);
2427  }
2428 
2429  bool internal_push(prio_operation *op) __TBB_override {
2430  prio_push(*(op->elem));
2432  return true;
2433  }
2434 
2435  void internal_pop(prio_operation *op) __TBB_override {
2436  // if empty or already reserved, don't pop
2437  if ( this->my_reserved == true || this->my_tail == 0 ) {
2439  return;
2440  }
2441 
2442  *(op->elem) = prio();
2444  prio_pop();
2445 
2446  }
2447 
2448  // pops the highest-priority item, saves copy
2449  void internal_reserve(prio_operation *op) __TBB_override {
2450  if (this->my_reserved == true || this->my_tail == 0) {
2452  return;
2453  }
2454  this->my_reserved = true;
2455  *(op->elem) = prio();
2456  reserved_item = *(op->elem);
2458  prio_pop();
2459  }
2460 
2461  void internal_consume(prio_operation *op) __TBB_override {
2463  this->my_reserved = false;
2464  reserved_item = input_type();
2465  }
2466 
2467  void internal_release(prio_operation *op) __TBB_override {
2469  prio_push(reserved_item);
2470  this->my_reserved = false;
2471  reserved_item = input_type();
2472  }
2473 
2474 private:
2475  template<typename, typename> friend class buffer_node;
2476 
2477  void order() {
2478  if (mark < this->my_tail) heapify();
2479  __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2480  }
2481 
2482  bool is_item_valid() {
2483  return this->my_tail > 0;
2484  }
2485 
2486  void try_put_and_add_task(task*& last_task) {
2487  task * new_task = this->my_successors.try_put_task(this->prio());
2488  if (new_task) {
2489  // workaround for icc bug
2490  graph& graph_ref = this->graph_reference();
2491  last_task = combine_tasks(graph_ref, last_task, new_task);
2492  prio_pop();
2493  }
2494  }
2495 
2496 private:
2497  Compare compare;
2498  size_type mark;
2499 
2500  input_type reserved_item;
2501 
2502  // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
2503  bool prio_use_tail() {
2504  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2505  return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2506  }
2507 
2508  // prio_push: checks that the item will fit, expand array if necessary, put at end
2509  void prio_push(const T &src) {
2510  if ( this->my_tail >= this->my_array_size )
2511  this->grow_my_array( this->my_tail + 1 );
2512  (void) this->place_item(this->my_tail, src);
2513  ++(this->my_tail);
2514  __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2515  }
2516 
2517  // prio_pop: deletes highest priority item from the array, and if it is item
2518  // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail
2519  // and mark. Assumes the array has already been tested for emptiness; no failure.
2520  void prio_pop() {
2521  if (prio_use_tail()) {
2522  // there are newly pushed elements; last one higher than top
2523  // copy the data
2524  this->destroy_item(this->my_tail-1);
2525  --(this->my_tail);
2526  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2527  return;
2528  }
2529  this->destroy_item(0);
2530  if(this->my_tail > 1) {
2531  // push the last element down heap
2532  __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2533  this->move_item(0,this->my_tail - 1);
2534  }
2535  --(this->my_tail);
2536  if(mark > this->my_tail) --mark;
2537  if (this->my_tail > 1) // don't reheap for heap of size 1
2538  reheap();
2539  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2540  }
2541 
2542  const T& prio() {
2543  return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2544  }
2545 
2546  // turn array into heap
2547  void heapify() {
2548  if(this->my_tail == 0) {
2549  mark = 0;
2550  return;
2551  }
2552  if (!mark) mark = 1;
2553  for (; mark<this->my_tail; ++mark) { // for each unheaped element
2554  size_type cur_pos = mark;
2555  input_type to_place;
2556  this->fetch_item(mark,to_place);
2557  do { // push to_place up the heap
2558  size_type parent = (cur_pos-1)>>1;
2559  if (!compare(this->get_my_item(parent), to_place))
2560  break;
2561  this->move_item(cur_pos, parent);
2562  cur_pos = parent;
2563  } while( cur_pos );
2564  (void) this->place_item(cur_pos, to_place);
2565  }
2566  }
2567 
2568  // otherwise heapified array with new root element; rearrange to heap
2569  void reheap() {
2570  size_type cur_pos=0, child=1;
2571  while (child < mark) {
2572  size_type target = child;
2573  if (child+1<mark &&
2574  compare(this->get_my_item(child),
2575  this->get_my_item(child+1)))
2576  ++target;
2577  // target now has the higher priority child
2578  if (compare(this->get_my_item(target),
2579  this->get_my_item(cur_pos)))
2580  break;
2581  // swap
2582  this->swap_items(cur_pos, target);
2583  cur_pos = target;
2584  child = (cur_pos<<1)+1;
2585  }
2586  }
2587 }; // priority_queue_node
2588 
2589 } // interfaceX
2590 
2591 namespace interface11 {
2592 
2594 
2597 template< typename T, typename DecrementType=continue_msg >
2598 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2599 public:
2600  typedef T input_type;
2601  typedef T output_type;
2604 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2605  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
2606  typedef typename sender<output_type>::built_successors_type built_successors_type;
2607  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2608  typedef typename sender<output_type>::successor_list_type successor_list_type;
2609 #endif
2610  //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
2611 
2612 private:
2614  size_t my_count; //number of successful puts
2615  size_t my_tries; //number of active put attempts
2619  __TBB_DEPRECATED_LIMITER_EXPR( int init_decrement_predecessors; )
2620 
2622 
2623  // Let decrementer call decrement_counter()
2624  friend class internal::decrementer< limiter_node<T,DecrementType>, DecrementType >;
2625 
2626  bool check_conditions() { // always called under lock
2627  return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
2628  }
2629 
2630  // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
2632  input_type v;
2633  task *rval = NULL;
2634  bool reserved = false;
2635  {
2636  spin_mutex::scoped_lock lock(my_mutex);
2637  if ( check_conditions() )
2638  ++my_tries;
2639  else
2640  return NULL;
2641  }
2642 
2643  //SUCCESS
2644  // if we can reserve and can put, we consume the reservation
2645  // we increment the count and decrement the tries
2646  if ( (my_predecessors.try_reserve(v)) == true ){
2647  reserved=true;
2648  if ( (rval = my_successors.try_put_task(v)) != NULL ){
2649  {
2650  spin_mutex::scoped_lock lock(my_mutex);
2651  ++my_count;
2652  --my_tries;
2653  my_predecessors.try_consume();
2654  if ( check_conditions() ) {
2655  if ( internal::is_graph_active(this->my_graph) ) {
2656  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2659  }
2660  }
2661  }
2662  return rval;
2663  }
2664  }
2665  //FAILURE
2666  //if we can't reserve, we decrement the tries
2667  //if we can reserve but can't put, we decrement the tries and release the reservation
2668  {
2669  spin_mutex::scoped_lock lock(my_mutex);
2670  --my_tries;
2671  if (reserved) my_predecessors.try_release();
2672  if ( check_conditions() ) {
2673  if ( internal::is_graph_active(this->my_graph) ) {
2674  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2676  __TBB_ASSERT(!rval, "Have two tasks to handle");
2677  return rtask;
2678  }
2679  }
2680  return rval;
2681  }
2682  }
2683 
2684  void forward() {
2685  __TBB_ASSERT(false, "Should never be called");
2686  return;
2687  }
2688 
2689  task* decrement_counter( long long delta ) {
2690  {
2691  spin_mutex::scoped_lock lock(my_mutex);
2692  if( delta > 0 && size_t(delta) > my_count )
2693  my_count = 0;
2694  else if( delta < 0 && size_t(delta) > my_threshold - my_count )
2695  my_count = my_threshold;
2696  else
2697  my_count -= size_t(delta); // absolute value of delta is sufficiently small
2698  }
2699  return forward_task();
2700  }
2701 
2702  void initialize() {
2703  my_predecessors.set_owner(this);
2704  my_successors.set_owner(this);
2705  decrement.set_owner(this);
2707  CODEPTR(), tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2708  static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
2709  static_cast<sender<output_type> *>(this)
2710  );
2711  }
2712 public:
2715 
2716 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
2718  "Deprecated interface of the limiter node can be used only in conjunction "
2719  "with continue_msg as the type of DecrementType template parameter." );
2720 #endif // Check for incompatible interface
2721 
2724  __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
2725  : graph_node(g), my_threshold(threshold), my_count(0),
2727  my_tries(0), decrement(),
2728  init_decrement_predecessors(num_decrement_predecessors),
2729  decrement(num_decrement_predecessors)) {
2730  initialize();
2731  }
2732 
2733 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2734  template <typename... Args>
2735  limiter_node(const node_set<Args...>& nodes, size_t threshold)
2736  : limiter_node(nodes.graph_reference(), threshold) {
2737  make_edges_in_order(nodes, *this);
2738  }
2739 #endif
2740 
2742  limiter_node( const limiter_node& src ) :
2743  graph_node(src.my_graph), receiver<T>(), sender<T>(),
2744  my_threshold(src.my_threshold), my_count(0),
2746  my_tries(0), decrement(),
2747  init_decrement_predecessors(src.init_decrement_predecessors),
2748  decrement(src.init_decrement_predecessors)) {
2749  initialize();
2750  }
2751 
2752 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2753  void set_name( const char *name ) __TBB_override {
2754  tbb::internal::fgt_node_desc( this, name );
2755  }
2756 #endif
2757 
2759  bool register_successor( successor_type &r ) __TBB_override {
2760  spin_mutex::scoped_lock lock(my_mutex);
2761  bool was_empty = my_successors.empty();
2762  my_successors.register_successor(r);
2763  //spawn a forward task if this is the only successor
2764  if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2765  if ( internal::is_graph_active(this->my_graph) ) {
2766  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2769  }
2770  }
2771  return true;
2772  }
2773 
2775 
2776  bool remove_successor( successor_type &r ) __TBB_override {
2777  r.remove_predecessor(*this);
2778  my_successors.remove_successor(r);
2779  return true;
2780  }
2781 
2782 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2783  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
2784  built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
2785 
2786  void internal_add_built_successor(successor_type &src) __TBB_override {
2787  my_successors.internal_add_built_successor(src);
2788  }
2789 
2790  void internal_delete_built_successor(successor_type &src) __TBB_override {
2791  my_successors.internal_delete_built_successor(src);
2792  }
2793 
2794  size_t successor_count() __TBB_override { return my_successors.successor_count(); }
2795 
2796  void copy_successors(successor_list_type &v) __TBB_override {
2797  my_successors.copy_successors(v);
2798  }
2799 
2800  void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
2801  my_predecessors.internal_add_built_predecessor(src);
2802  }
2803 
2804  void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
2805  my_predecessors.internal_delete_built_predecessor(src);
2806  }
2807 
2808  size_t predecessor_count() __TBB_override { return my_predecessors.predecessor_count(); }
2809 
2810  void copy_predecessors(predecessor_list_type &v) __TBB_override {
2811  my_predecessors.copy_predecessors(v);
2812  }
2813 
2814  void extract() __TBB_override {
2815  my_count = 0;
2816  my_successors.built_successors().sender_extract(*this);
2817  my_predecessors.built_predecessors().receiver_extract(*this);
2818  decrement.built_predecessors().receiver_extract(decrement);
2819  }
2820 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2821 
2823  bool register_predecessor( predecessor_type &src ) __TBB_override {
2824  spin_mutex::scoped_lock lock(my_mutex);
2825  my_predecessors.add( src );
2826  if ( my_count + my_tries < my_threshold && !my_successors.empty() && internal::is_graph_active(this->my_graph) ) {
2827  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2830  }
2831  return true;
2832  }
2833 
2835  bool remove_predecessor( predecessor_type &src ) __TBB_override {
2836  my_predecessors.remove( src );
2837  return true;
2838  }
2839 
2840 protected:
2841 
2842  template< typename R, typename B > friend class run_and_put_task;
2843  template<typename X, typename Y> friend class internal::broadcast_cache;
2844  template<typename X, typename Y> friend class internal::round_robin_cache;
2847  {
2848  spin_mutex::scoped_lock lock(my_mutex);
2849  if ( my_count + my_tries >= my_threshold )
2850  return NULL;
2851  else
2852  ++my_tries;
2853  }
2854 
2855  task * rtask = my_successors.try_put_task(t);
2856 
2857  if ( !rtask ) { // try_put_task failed.
2858  spin_mutex::scoped_lock lock(my_mutex);
2859  --my_tries;
2860  if (check_conditions() && internal::is_graph_active(this->my_graph)) {
2861  rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2863  }
2864  }
2865  else {
2866  spin_mutex::scoped_lock lock(my_mutex);
2867  ++my_count;
2868  --my_tries;
2869  }
2870  return rtask;
2871  }
2872 
2873  graph& graph_reference() const __TBB_override { return my_graph; }
2874 
2876  __TBB_ASSERT(false,NULL); // should never be called
2877  }
2878 
2880  my_count = 0;
2881  if(f & rf_clear_edges) {
2882  my_predecessors.clear();
2883  my_successors.clear();
2884  }
2885  else
2886  {
2887  my_predecessors.reset( );
2888  }
2889  decrement.reset_receiver(f);
2890  }
2891 }; // limiter_node
2892 
2894 
2898 using internal::input_port;
2899 using internal::tag_value;
2900 
2901 template<typename OutputTuple, typename JP=queueing> class join_node;
2902 
2903 template<typename OutputTuple>
2904 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2905 private:
2908 public:
2909  typedef OutputTuple output_type;
2911  __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2912  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2913  this->input_ports(), static_cast< sender< output_type > *>(this) );
2914  }
2915 
2916 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2917  template <typename... Args>
2918  __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
2919  make_edges_in_order(nodes, *this);
2920  }
2921 #endif
2922 
2923  __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2924  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2925  this->input_ports(), static_cast< sender< output_type > *>(this) );
2926  }
2927 
2928 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2929  void set_name( const char *name ) __TBB_override {
2930  tbb::internal::fgt_node_desc( this, name );
2931  }
2932 #endif
2933 
2934 };
2935 
2936 template<typename OutputTuple>
2937 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2938 private:
2941 public:
2942  typedef OutputTuple output_type;
2944  __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2945  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2946  this->input_ports(), static_cast< sender< output_type > *>(this) );
2947  }
2948 
2949 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2950  template <typename... Args>
2951  __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
2952  make_edges_in_order(nodes, *this);
2953  }
2954 #endif
2955 
2956  __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2957  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2958  this->input_ports(), static_cast< sender< output_type > *>(this) );
2959  }
2960 
2961 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2962  void set_name( const char *name ) __TBB_override {
2963  tbb::internal::fgt_node_desc( this, name );
2964  }
2965 #endif
2966 
2967 };
2968 
2969 // template for key_matching join_node
2970 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
2971 template<typename OutputTuple, typename K, typename KHash>
2972 class join_node<OutputTuple, key_matching<K, KHash> > : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
2973  key_matching_port, OutputTuple, key_matching<K,KHash> > {
2974 private:
2977 public:
2978  typedef OutputTuple output_type;
2980 
2981 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
2982  join_node(graph &g) : unfolded_type(g) {}
2983 
2984 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2985  template <typename... Args>
2986  join_node(const node_set<Args...>& nodes, key_matching<K, KHash> = key_matching<K, KHash>())
2987  : join_node(nodes.graph_reference()) {
2988  make_edges_in_order(nodes, *this);
2989  }
2990 #endif
2991 
2992 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
2993 
2994  template<typename __TBB_B0, typename __TBB_B1>
2995  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2996  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2997  this->input_ports(), static_cast< sender< output_type > *>(this) );
2998  }
2999  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
3000  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
3001  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3002  this->input_ports(), static_cast< sender< output_type > *>(this) );
3003  }
3004  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
3005  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
3006  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3007  this->input_ports(), static_cast< sender< output_type > *>(this) );
3008  }
3009  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
3010  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
3011  unfolded_type(g, b0, b1, b2, b3, b4) {
3012  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3013  this->input_ports(), static_cast< sender< output_type > *>(this) );
3014  }
3015 #if __TBB_VARIADIC_MAX >= 6
3016  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3017  typename __TBB_B5>
3018  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
3019  unfolded_type(g, b0, b1, b2, b3, b4, b5) {
3020  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3021  this->input_ports(), static_cast< sender< output_type > *>(this) );
3022  }
3023 #endif
3024 #if __TBB_VARIADIC_MAX >= 7
3025  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3026  typename __TBB_B5, typename __TBB_B6>
3027  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
3028  unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
3029  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3030  this->input_ports(), static_cast< sender< output_type > *>(this) );
3031  }
3032 #endif
3033 #if __TBB_VARIADIC_MAX >= 8
3034  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3035  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
3036  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3037  __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
3038  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3039  this->input_ports(), static_cast< sender< output_type > *>(this) );
3040  }
3041 #endif
3042 #if __TBB_VARIADIC_MAX >= 9
3043  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3044  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
3045  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3046  __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
3047  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3048  this->input_ports(), static_cast< sender< output_type > *>(this) );
3049  }
3050 #endif
3051 #if __TBB_VARIADIC_MAX >= 10
3052  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3053  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
3054  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3055  __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
3056  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3057  this->input_ports(), static_cast< sender< output_type > *>(this) );
3058  }
3059 #endif
3060 
3061 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3062  template <typename... Args, typename... Bodies>
3063  __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
3064  : join_node(nodes.graph_reference(), bodies...) {
3065  make_edges_in_order(nodes, *this);
3066  }
3067 #endif
3068 
3069  __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
3070  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3071  this->input_ports(), static_cast< sender< output_type > *>(this) );
3072  }
3073 
3074 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3075  void set_name( const char *name ) __TBB_override {
3076  tbb::internal::fgt_node_desc( this, name );
3077  }
3078 #endif
3079 
3080 };
3081 
3082 // indexer node
3084 
3085 // TODO: Implement interface with variadic template or tuple
3086 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
3087  typename T4=null_type, typename T5=null_type, typename T6=null_type,
3088  typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
3089 
3090 //indexer node specializations
3091 template<typename T0>
3092 class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
3093 private:
3094  static const int N = 1;
3095 public:
3096  typedef tuple<T0> InputTuple;
3099  __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3100  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3101  this->input_ports(), static_cast< sender< output_type > *>(this) );
3102  }
3103 
3104 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3105  template <typename... Args>
3106  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3107  make_edges_in_order(nodes, *this);
3108  }
3109 #endif
3110 
3111  // Copy constructor
3112  __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3113  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3114  this->input_ports(), static_cast< sender< output_type > *>(this) );
3115  }
3116 
3117 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3118  void set_name( const char *name ) __TBB_override {
3119  tbb::internal::fgt_node_desc( this, name );
3120  }
3121 #endif
3122 };
3123 
3124 template<typename T0, typename T1>
3125 class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
3126 private:
3127  static const int N = 2;
3128 public:
3129  typedef tuple<T0, T1> InputTuple;
3132  __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3133  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3134  this->input_ports(), static_cast< sender< output_type > *>(this) );
3135  }
3136 
3137 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3138  template <typename... Args>
3139  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3140  make_edges_in_order(nodes, *this);
3141  }
3142 #endif
3143 
3144  // Copy constructor
3145  __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3146  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3147  this->input_ports(), static_cast< sender< output_type > *>(this) );
3148  }
3149 
3150 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3151  void set_name( const char *name ) __TBB_override {
3152  tbb::internal::fgt_node_desc( this, name );
3153  }
3154 #endif
3155 };
3156 
3157 template<typename T0, typename T1, typename T2>
3158 class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
3159 private:
3160  static const int N = 3;
3161 public:
3162  typedef tuple<T0, T1, T2> InputTuple;
3165  __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3166  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3167  this->input_ports(), static_cast< sender< output_type > *>(this) );
3168  }
3169 
3170 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3171  template <typename... Args>
3172  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3173  make_edges_in_order(nodes, *this);
3174  }
3175 #endif
3176 
3177  // Copy constructor
3178  __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3179  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3180  this->input_ports(), static_cast< sender< output_type > *>(this) );
3181  }
3182 
3183 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3184  void set_name( const char *name ) __TBB_override {
3185  tbb::internal::fgt_node_desc( this, name );
3186  }
3187 #endif
3188 };
3189 
3190 template<typename T0, typename T1, typename T2, typename T3>
3191 class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
3192 private:
3193  static const int N = 4;
3194 public:
3195  typedef tuple<T0, T1, T2, T3> InputTuple;
3198  __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3199  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3200  this->input_ports(), static_cast< sender< output_type > *>(this) );
3201  }
3202 
3203 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3204  template <typename... Args>
3205  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3206  make_edges_in_order(nodes, *this);
3207  }
3208 #endif
3209 
3210  // Copy constructor
3211  __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3212  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3213  this->input_ports(), static_cast< sender< output_type > *>(this) );
3214  }
3215 
3216 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3217  void set_name( const char *name ) __TBB_override {
3218  tbb::internal::fgt_node_desc( this, name );
3219  }
3220 #endif
3221 };
3222 
3223 template<typename T0, typename T1, typename T2, typename T3, typename T4>
3224 class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
3225 private:
3226  static const int N = 5;
3227 public:
3228  typedef tuple<T0, T1, T2, T3, T4> InputTuple;
3231  __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3232  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3233  this->input_ports(), static_cast< sender< output_type > *>(this) );
3234  }
3235 
3236 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3237  template <typename... Args>
3238  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3239  make_edges_in_order(nodes, *this);
3240  }
3241 #endif
3242 
3243  // Copy constructor
3244  __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3245  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3246  this->input_ports(), static_cast< sender< output_type > *>(this) );
3247  }
3248 
3249 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3250  void set_name( const char *name ) __TBB_override {
3251  tbb::internal::fgt_node_desc( this, name );
3252  }
3253 #endif
3254 };
3255 
3256 #if __TBB_VARIADIC_MAX >= 6
3257 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
3258 class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
3259 private:
3260  static const int N = 6;
3261 public:
3262  typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
3265  __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3266  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3267  this->input_ports(), static_cast< sender< output_type > *>(this) );
3268  }
3269 
3270 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3271  template <typename... Args>
3272  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3273  make_edges_in_order(nodes, *this);
3274  }
3275 #endif
3276 
3277  // Copy constructor
3278  __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3279  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3280  this->input_ports(), static_cast< sender< output_type > *>(this) );
3281  }
3282 
3283 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3284  void set_name( const char *name ) __TBB_override {
3285  tbb::internal::fgt_node_desc( this, name );
3286  }
3287 #endif
3288 };
3289 #endif //variadic max 6
3290 
3291 #if __TBB_VARIADIC_MAX >= 7
3292 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3293  typename T6>
3294 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3295 private:
3296  static const int N = 7;
3297 public:
3298  typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3301  __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3302  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3303  this->input_ports(), static_cast< sender< output_type > *>(this) );
3304  }
3305 
3306 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3307  template <typename... Args>
3308  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3309  make_edges_in_order(nodes, *this);
3310  }
3311 #endif
3312 
3313  // Copy constructor
3314  __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3315  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3316  this->input_ports(), static_cast< sender< output_type > *>(this) );
3317  }
3318 
3319 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3320  void set_name( const char *name ) __TBB_override {
3321  tbb::internal::fgt_node_desc( this, name );
3322  }
3323 #endif
3324 };
3325 #endif //variadic max 7
3326 
3327 #if __TBB_VARIADIC_MAX >= 8
3328 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3329  typename T6, typename T7>
3330 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3331 private:
3332  static const int N = 8;
3333 public:
3334  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3337  indexer_node(graph& g) : unfolded_type(g) {
3338  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3339  this->input_ports(), static_cast< sender< output_type > *>(this) );
3340  }
3341 
3342 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3343  template <typename... Args>
3344  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3345  make_edges_in_order(nodes, *this);
3346  }
3347 #endif
3348 
3349  // Copy constructor
3350  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3351  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3352  this->input_ports(), static_cast< sender< output_type > *>(this) );
3353  }
3354 
3355 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3356  void set_name( const char *name ) __TBB_override {
3357  tbb::internal::fgt_node_desc( this, name );
3358  }
3359 #endif
3360 };
3361 #endif //variadic max 8
3362 
3363 #if __TBB_VARIADIC_MAX >= 9
3364 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3365  typename T6, typename T7, typename T8>
3366 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3367 private:
3368  static const int N = 9;
3369 public:
3370  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3373  __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3374  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3375  this->input_ports(), static_cast< sender< output_type > *>(this) );
3376  }
3377 
3378 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3379  template <typename... Args>
3380  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3381  make_edges_in_order(nodes, *this);
3382  }
3383 #endif
3384 
3385  // Copy constructor
3386  __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3387  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3388  this->input_ports(), static_cast< sender< output_type > *>(this) );
3389  }
3390 
3391 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3392  void set_name( const char *name ) __TBB_override {
3393  tbb::internal::fgt_node_desc( this, name );
3394  }
3395 #endif
3396 };
3397 #endif //variadic max 9
3398 
3399 #if __TBB_VARIADIC_MAX >= 10
3400 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3401  typename T6, typename T7, typename T8, typename T9>
3402 class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3403 private:
3404  static const int N = 10;
3405 public:
3406  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3409  __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3410  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3411  this->input_ports(), static_cast< sender< output_type > *>(this) );
3412  }
3413 
3414 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3415  template <typename... Args>
3416  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3417  make_edges_in_order(nodes, *this);
3418  }
3419 #endif
3420 
3421  // Copy constructor
3422  __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3423  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3424  this->input_ports(), static_cast< sender< output_type > *>(this) );
3425  }
3426 
3427 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3428  void set_name( const char *name ) __TBB_override {
3429  tbb::internal::fgt_node_desc( this, name );
3430  }
3431 #endif
3432 };
3433 #endif //variadic max 10
3434 
3435 #if __TBB_PREVIEW_ASYNC_MSG
3437 #else
3438 template< typename T >
3439 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
3440 #endif
3441 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3442  s.internal_add_built_predecessor(p);
3443  p.internal_add_built_successor(s);
3444 #endif
3445  p.register_successor( s );
3446  tbb::internal::fgt_make_edge( &p, &s );
3447 }
3448 
3450 template< typename T >
3451 inline void make_edge( sender<T> &p, receiver<T> &s ) {
3452  internal_make_edge( p, s );
3453 }
3454 
3455 #if __TBB_PREVIEW_ASYNC_MSG
3456 template< typename TS, typename TR,
3459 inline void make_edge( TS &p, TR &s ) {
3460  internal_make_edge( p, s );
3461 }
3462 
3463 template< typename T >
3465  internal_make_edge( p, s );
3466 }
3467 
3468 template< typename T >
3470  internal_make_edge( p, s );
3471 }
3472 
3473 #endif // __TBB_PREVIEW_ASYNC_MSG
3474 
3475 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3476 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
3477 template< typename T, typename V,
3478  typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3479 inline void make_edge( T& output, V& input) {
3480  make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3481 }
3482 
3483 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
3484 template< typename T, typename R,
3485  typename = typename T::output_ports_type >
3486 inline void make_edge( T& output, receiver<R>& input) {
3487  make_edge(get<0>(output.output_ports()), input);
3488 }
3489 
3490 //Makes an edge from a sender to port 0 of a multi-input successor.
3491 template< typename S, typename V,
3492  typename = typename V::input_ports_type >
3493 inline void make_edge( sender<S>& output, V& input) {
3494  make_edge(output, get<0>(input.input_ports()));
3495 }
3496 #endif
3497 
3498 #if __TBB_PREVIEW_ASYNC_MSG
3500 #else
3501 template< typename T >
3502 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
3503 #endif
3504  p.remove_successor( s );
3505 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3506  // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3507  p.internal_delete_built_successor(s);
3508  s.internal_delete_built_predecessor(p);
3509 #endif
3511 }
3512 
3514 template< typename T >
3515 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
3516  internal_remove_edge( p, s );
3517 }
3518 
3519 #if __TBB_PREVIEW_ASYNC_MSG
3520 template< typename TS, typename TR,
3523 inline void remove_edge( TS &p, TR &s ) {
3524  internal_remove_edge( p, s );
3525 }
3526 
3527 template< typename T >
3529  internal_remove_edge( p, s );
3530 }
3531 
3532 template< typename T >
3534  internal_remove_edge( p, s );
3535 }
3536 #endif // __TBB_PREVIEW_ASYNC_MSG
3537 
3538 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3539 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
3540 template< typename T, typename V,
3541  typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3542 inline void remove_edge( T& output, V& input) {
3543  remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3544 }
3545 
3546 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
3547 template< typename T, typename R,
3548  typename = typename T::output_ports_type >
3549 inline void remove_edge( T& output, receiver<R>& input) {
3550  remove_edge(get<0>(output.output_ports()), input);
3551 }
3552 //Removes an edge between a sender and port 0 of a multi-input successor.
3553 template< typename S, typename V,
3554  typename = typename V::input_ports_type >
3555 inline void remove_edge( sender<S>& output, V& input) {
3556  remove_edge(output, get<0>(input.input_ports()));
3557 }
3558 #endif
3559 
3560 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3561 template<typename C >
3562 template< typename S >
3563 void internal::edge_container<C>::sender_extract( S &s ) {
3564  edge_list_type e = built_edges;
3565  for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3566  remove_edge(s, **i);
3567  }
3568 }
3569 
3570 template<typename C >
3571 template< typename R >
3572 void internal::edge_container<C>::receiver_extract( R &r ) {
3573  edge_list_type e = built_edges;
3574  for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3575  remove_edge(**i, r);
3576  }
3577 }
3578 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3579 
3581 template< typename Body, typename Node >
3582 Body copy_body( Node &n ) {
3583  return n.template copy_function_object<Body>();
3584 }
3585 
3586 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3587 
3588 //composite_node
3589 template< typename InputTuple, typename OutputTuple > class composite_node;
3590 
3591 template< typename... InputTypes, typename... OutputTypes>
3592 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<OutputTypes...> > : public graph_node{
3593 
3594 public:
3595  typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3596  typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3597 
3598 private:
3599  std::unique_ptr<input_ports_type> my_input_ports;
3600  std::unique_ptr<output_ports_type> my_output_ports;
3601 
3602  static const size_t NUM_INPUTS = sizeof...(InputTypes);
3603  static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3604 
3605 protected:
3607 
3608 public:
3609 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3610  composite_node( graph &g, const char *type_name = "composite_node" ) : graph_node(g) {
3611  tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3613  }
3614 #else
3616  tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3617  }
3618 #endif
3619 
3620  template<typename T1, typename T2>
3621  void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
3622  __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
3623  __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
3624  my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T1>(input_ports_tuple));
3625  my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T2>(output_ports_tuple));
3626 
3629  }
3630 
3631  template< typename... NodeTypes >
3632  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3633 
3634  template< typename... NodeTypes >
3635  void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3636 
3637 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3638  void set_name( const char *name ) __TBB_override {
3640  }
3641 #endif
3642 
3644  __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3645  return *my_input_ports;
3646  }
3647 
3648  output_ports_type& output_ports() {
3649  __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3650  return *my_output_ports;
3651  }
3652 
3653 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3654  void extract() __TBB_override {
3655  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3656  }
3657 #endif
3658 }; // class composite_node
3659 
3660 //composite_node with only input ports
3661 template< typename... InputTypes>
3662 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > : public graph_node {
3663 public:
3664  typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3665 
3666 private:
3667  std::unique_ptr<input_ports_type> my_input_ports;
3668  static const size_t NUM_INPUTS = sizeof...(InputTypes);
3669 
3670 protected:
3672 
3673 public:
3674 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3675  composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
3676  tbb::internal::fgt_composite( CODEPTR(), this, &g );
3678  }
3679 #else
3681  tbb::internal::fgt_composite( CODEPTR(), this, &g );
3682  }
3683 #endif
3684 
3685  template<typename T>
3686  void set_external_ports(T&& input_ports_tuple) {
3687  __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
3688 
3689  my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T>(input_ports_tuple));
3690 
3691  tbb::internal::fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, std::forward<T>(input_ports_tuple));
3692  }
3693 
3694  template< typename... NodeTypes >
3695  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3696 
3697  template< typename... NodeTypes >
3698  void add_nodes( const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3699 
3700 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3701  void set_name( const char *name ) __TBB_override {
3703  }
3704 #endif
3705 
3707  __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3708  return *my_input_ports;
3709  }
3710 
3711 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3712  void extract() __TBB_override {
3713  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3714  }
3715 #endif
3716 
3717 }; // class composite_node
3718 
3719 //composite_nodes with only output_ports
3720 template<typename... OutputTypes>
3721 class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > : public graph_node {
3722 public:
3723  typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3724 
3725 private:
3726  std::unique_ptr<output_ports_type> my_output_ports;
3727  static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3728 
3729 protected:
3731 
3732 public:
3733 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3734  __TBB_NOINLINE_SYM composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
3735  tbb::internal::fgt_composite( CODEPTR(), this, &g );
3737  }
3738 #else
3740  tbb::internal::fgt_composite( CODEPTR(), this, &g );
3741  }
3742 #endif
3743 
3744  template<typename T>
3745  void set_external_ports(T&& output_ports_tuple) {
3746  __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
3747 
3748  my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T>(output_ports_tuple));
3749 
3750  tbb::internal::fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, std::forward<T>(output_ports_tuple));
3751  }
3752 
3753  template<typename... NodeTypes >
3754  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3755 
3756  template<typename... NodeTypes >
3757  void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3758 
3759 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3760  void set_name( const char *name ) __TBB_override {
3762  }
3763 #endif
3764 
3765  output_ports_type& output_ports() {
3766  __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3767  return *my_output_ports;
3768  }
3769 
3770 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3771  void extract() __TBB_override {
3772  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3773  }
3774 #endif
3775 
3776 }; // class composite_node
3777 
3778 #endif // __TBB_FLOW_GRAPH_CPP11_FEATURES
3779 
3780 namespace internal {
3781 
3782 template<typename Gateway>
3784 public:
3785  typedef Gateway gateway_type;
3786 
3787  async_body_base(gateway_type *gateway): my_gateway(gateway) { }
3788  void set_gateway(gateway_type *gateway) {
3789  my_gateway = gateway;
3790  }
3791 
3792 protected:
3793  gateway_type *my_gateway;
3794 };
3795 
3796 template<typename Input, typename Ports, typename Gateway, typename Body>
3797 class async_body: public async_body_base<Gateway> {
3798 public:
3800  typedef Gateway gateway_type;
3801 
3802  async_body(const Body &body, gateway_type *gateway)
3803  : base_type(gateway), my_body(body) { }
3804 
3805  void operator()( const Input &v, Ports & ) {
3806  my_body(v, *this->my_gateway);
3807  }
3808 
3809  Body get_body() { return my_body; }
3810 
3811 private:
3812  Body my_body;
3813 };
3814 
3815 } // namespace internal
3816 
3817 } // namespace interfaceX
3818 namespace interface11 {
3819 
3821 template < typename Input, typename Output,
3822  typename Policy = queueing_lightweight,
3823  typename Allocator=cache_aligned_allocator<Input> >
3824 class async_node : public multifunction_node< Input, tuple< Output >, Policy, Allocator >, public sender< Output > {
3827 
3828 public:
3829  typedef Input input_type;
3830  typedef Output output_type;
3837 
3838 private:
3841  output_port_type *port;
3842  // TODO: pass value by copy since we do not want to block asynchronous thread.
3843  const Output *value;
3844  bool result;
3845  try_put_functor(output_port_type &p, const Output &v) : port(&p), value(&v), result(false) { }
3846  void operator()() {
3847  result = port->try_put(*value);
3848  }
3849  };
3850 
3851  class receiver_gateway_impl: public receiver_gateway<Output> {
3852  public:
3853  receiver_gateway_impl(async_node* node): my_node(node) {}
3855  tbb::internal::fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
3856  my_node->my_graph.reserve_wait();
3857  }
3858 
3860  my_node->my_graph.release_wait();
3861  tbb::internal::fgt_async_commit(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
3862  }
3863 
3865  bool try_put(const Output &i) __TBB_override {
3866  return my_node->try_put_impl(i);
3867  }
3868 
3869  private:
3871  } my_gateway;
3872 
3873  //The substitute of 'this' for member construction, to prevent compiler warnings
3874  async_node* self() { return this; }
3875 
3877  bool try_put_impl(const Output &i) {
3878  internal::multifunction_output<Output> &port_0 = internal::output_port<0>(*this);
3879  internal::broadcast_cache<output_type>& port_successors = port_0.successors();
3881  task_list tasks;
3882  bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
3883  __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
3884  "Return status is inconsistent with the method operation." );
3885 
3886  while( !tasks.empty() ) {
3887  internal::enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
3888  }
3889  tbb::internal::fgt_async_try_put_end(this, &port_0);
3890  return is_at_least_one_put_successful;
3891  }
3892 
3893 public:
3894  template<typename Body>
3896  graph &g, size_t concurrency,
3898  Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority)
3899 #else
3901 #endif
3902  ) : base_type(
3903  g, concurrency,
3904  internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
3905  (body, &my_gateway) __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority) ), my_gateway(self()) {
3906  tbb::internal::fgt_multioutput_node_with_body<1>(
3907  CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
3908  &this->my_graph, static_cast<receiver<input_type> *>(this),
3909  this->output_ports(), this->my_body
3910  );
3911  }
3912 
3913 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
3914  template <typename Body, typename... Args>
3915  __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
3916  : async_node(g, concurrency, body, Policy(), priority) {}
3917 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
3918 
3919 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3920  template <typename Body, typename... Args>
3921  __TBB_NOINLINE_SYM async_node(
3922  const node_set<Args...>& nodes, size_t concurrency, Body body,
3924  ) : async_node(nodes.graph_reference(), concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
3925  make_edges_in_order(nodes, *this);
3926  }
3927 
3928 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
3929  template <typename Body, typename... Args>
3930  __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
3931  : async_node(nodes, concurrency, body, Policy(), priority) {}
3932 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
3933 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3934 
3935  __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
3936  static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
3937  static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
3938 
3939  tbb::internal::fgt_multioutput_node_with_body<1>( CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
3940  &this->my_graph, static_cast<receiver<input_type> *>(this),
3941  this->output_ports(), this->my_body );
3942  }
3943 
3944  gateway_type& gateway() {
3945  return my_gateway;
3946  }
3947 
3948 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3949  void set_name( const char *name ) __TBB_override {
3951  }
3952 #endif
3953 
3954  // Define sender< Output >
3955 
3957  bool register_successor( successor_type &r ) __TBB_override {
3958  return internal::output_port<0>(*this).register_successor(r);
3959  }
3960 
3962  bool remove_successor( successor_type &r ) __TBB_override {
3963  return internal::output_port<0>(*this).remove_successor(r);
3964  }
3965 
3966  template<typename Body>
3970  mfn_body_type &body_ref = *this->my_body;
3971  async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< internal::multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
3972  return ab.get_body();
3973  }
3974 
3975 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3976  typedef typename internal::edge_container<successor_type> built_successors_type;
3978  typedef typename built_successors_type::edge_list_type successor_list_type;
3979  built_successors_type &built_successors() __TBB_override {
3980  return internal::output_port<0>(*this).built_successors();
3981  }
3982 
3983  void internal_add_built_successor( successor_type &r ) __TBB_override {
3984  internal::output_port<0>(*this).internal_add_built_successor(r);
3985  }
3986 
3987  void internal_delete_built_successor( successor_type &r ) __TBB_override {
3988  internal::output_port<0>(*this).internal_delete_built_successor(r);
3989  }
3990 
3991  void copy_successors( successor_list_type &l ) __TBB_override {
3992  internal::output_port<0>(*this).copy_successors(l);
3993  }
3994 
3995  size_t successor_count() __TBB_override {
3996  return internal::output_port<0>(*this).successor_count();
3997  }
3998 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3999 
4000 protected:
4001 
4003  base_type::reset_node(f);
4004  }
4005 };
4006 
4007 #if __TBB_PREVIEW_STREAMING_NODE
4009 #endif // __TBB_PREVIEW_STREAMING_NODE
4010 
4012 
4013 template< typename T >
4014 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
4015 public:
4016  typedef T input_type;
4017  typedef T output_type;
4020 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4021  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
4022  typedef typename sender<output_type>::built_successors_type built_successors_type;
4023  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
4024  typedef typename sender<output_type>::successor_list_type successor_list_type;
4025 #endif
4026 
4027  __TBB_NOINLINE_SYM explicit overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
4028  my_successors.set_owner( this );
4029  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4030  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4031  }
4032 
4033 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4034  template <typename... Args>
4035  overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
4036  make_edges_in_order(nodes, *this);
4037  }
4038 #endif
4039 
4041  __TBB_NOINLINE_SYM overwrite_node( const overwrite_node& src ) :
4042  graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
4043  {
4044  my_successors.set_owner( this );
4045  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4046  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4047  }
4048 
4050 
4051 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4052  void set_name( const char *name ) __TBB_override {
4053  tbb::internal::fgt_node_desc( this, name );
4054  }
4055 #endif
4056 
4057  bool register_successor( successor_type &s ) __TBB_override {
4058  spin_mutex::scoped_lock l( my_mutex );
4059  if (my_buffer_is_valid && internal::is_graph_active( my_graph )) {
4060  // We have a valid value that must be forwarded immediately.
4061  bool ret = s.try_put( my_buffer );
4062  if ( ret ) {
4063  // We add the successor that accepted our put
4064  my_successors.register_successor( s );
4065  } else {
4066  // In case of reservation a race between the moment of reservation and register_successor can appear,
4067  // because failed reserve does not mean that register_successor is not ready to put a message immediately.
4068  // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
4069  // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
4070  task *rtask = new ( task::allocate_additional_child_of( *( my_graph.root_task() ) ) )
4071  register_predecessor_task( *this, s );
4072  internal::spawn_in_graph_arena( my_graph, *rtask );
4073  }
4074  } else {
4075  // No valid value yet, just add as successor
4076  my_successors.register_successor( s );
4077  }
4078  return true;
4079  }
4080 
4081  bool remove_successor( successor_type &s ) __TBB_override {
4082  spin_mutex::scoped_lock l( my_mutex );
4083  my_successors.remove_successor(s);
4084  return true;
4085  }
4086 
4087 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4088  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
4089  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
4090 
4091  void internal_add_built_successor( successor_type &s) __TBB_override {
4092  spin_mutex::scoped_lock l( my_mutex );
4093  my_successors.internal_add_built_successor(s);
4094  }
4095 
4096  void internal_delete_built_successor( successor_type &s) __TBB_override {
4097  spin_mutex::scoped_lock l( my_mutex );
4098  my_successors.internal_delete_built_successor(s);
4099  }
4100 
4101  size_t successor_count() __TBB_override {
4102  spin_mutex::scoped_lock l( my_mutex );
4103  return my_successors.successor_count();
4104  }
4105 
4106  void copy_successors(successor_list_type &v) __TBB_override {
4107  spin_mutex::scoped_lock l( my_mutex );
4108  my_successors.copy_successors(v);
4109  }
4110 
4111  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
4112  spin_mutex::scoped_lock l( my_mutex );
4113  my_built_predecessors.add_edge(p);
4114  }
4115 
4116  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
4117  spin_mutex::scoped_lock l( my_mutex );
4118  my_built_predecessors.delete_edge(p);
4119  }
4120 
4121  size_t predecessor_count() __TBB_override {
4122  spin_mutex::scoped_lock l( my_mutex );
4123  return my_built_predecessors.edge_count();
4124  }
4125 
4126  void copy_predecessors( predecessor_list_type &v ) __TBB_override {
4127  spin_mutex::scoped_lock l( my_mutex );
4128  my_built_predecessors.copy_edges(v);
4129  }
4130 
4131  void extract() __TBB_override {
4132  my_buffer_is_valid = false;
4133  built_successors().sender_extract(*this);
4134  built_predecessors().receiver_extract(*this);
4135  }
4136 
4137 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4138 
4139  bool try_get( input_type &v ) __TBB_override {
4140  spin_mutex::scoped_lock l( my_mutex );
4141  if ( my_buffer_is_valid ) {
4142  v = my_buffer;
4143  return true;
4144  }
4145  return false;
4146  }
4147 
4150  return try_get(v);
4151  }
4152 
4154  bool try_release() __TBB_override { return true; }
4155 
4157  bool try_consume() __TBB_override { return true; }
4158 
4159  bool is_valid() {
4160  spin_mutex::scoped_lock l( my_mutex );
4161  return my_buffer_is_valid;
4162  }
4163 
4164  void clear() {
4165  spin_mutex::scoped_lock l( my_mutex );
4166  my_buffer_is_valid = false;
4167  }
4168 
4169 protected:
4170 
4171  template< typename R, typename B > friend class run_and_put_task;
4172  template<typename X, typename Y> friend class internal::broadcast_cache;
4173  template<typename X, typename Y> friend class internal::round_robin_cache;
4174  task * try_put_task( const input_type &v ) __TBB_override {
4175  spin_mutex::scoped_lock l( my_mutex );
4176  return try_put_task_impl(v);
4177  }
4178 
4179  task * try_put_task_impl(const input_type &v) {
4180  my_buffer = v;
4181  my_buffer_is_valid = true;
4182  task * rtask = my_successors.try_put_task(v);
4183  if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
4184  return rtask;
4185  }
4186 
4188  return my_graph;
4189  }
4190 
4193 
4194  register_predecessor_task(predecessor_type& owner, successor_type& succ) :
4195  o(owner), s(succ) {};
4196 
4198  if (!s.register_predecessor(o)) {
4199  o.register_successor(s);
4200  }
4201  return NULL;
4202  }
4203 
4204  predecessor_type& o;
4205  successor_type& s;
4206  };
4207 
4210 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4211  internal::edge_container<predecessor_type> my_built_predecessors;
4212 #endif
4213  input_type my_buffer;
4216 
4218  my_buffer_is_valid = false;
4219  if (f&rf_clear_edges) {
4220  my_successors.clear();
4221  }
4222  }
4223 }; // overwrite_node
4224 
4225 template< typename T >
4226 class write_once_node : public overwrite_node<T> {
4227 public:
4228  typedef T input_type;
4229  typedef T output_type;
4233 
4235  __TBB_NOINLINE_SYM explicit write_once_node(graph& g) : base_type(g) {
4236  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4237  static_cast<receiver<input_type> *>(this),
4238  static_cast<sender<output_type> *>(this) );
4239  }
4240 
4241 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4242  template <typename... Args>
4243  write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
4244  make_edges_in_order(nodes, *this);
4245  }
4246 #endif
4247 
4249  __TBB_NOINLINE_SYM write_once_node( const write_once_node& src ) : base_type(src) {
4250  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4251  static_cast<receiver<input_type> *>(this),
4252  static_cast<sender<output_type> *>(this) );
4253  }
4254 
4255 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4256  void set_name( const char *name ) __TBB_override {
4257  tbb::internal::fgt_node_desc( this, name );
4258  }
4259 #endif
4260 
4261 protected:
4262  template< typename R, typename B > friend class run_and_put_task;
4263  template<typename X, typename Y> friend class internal::broadcast_cache;
4264  template<typename X, typename Y> friend class internal::round_robin_cache;
4266  spin_mutex::scoped_lock l( this->my_mutex );
4267  return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
4268  }
4269 };
4270 
4271 } // interfaceX
4272 
4277 
4278  using interface11::graph;
4281 
4300  using namespace interface11::internal::graph_policy_namespace;
4301  using interface11::join_node;
4303  using interface11::copy_body;
4304  using interface11::make_edge;
4307 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
4309 #endif
4311 #if __TBB_PREVIEW_ASYNC_MSG
4312  using interface11::async_msg;
4313 #endif
4314 #if __TBB_PREVIEW_STREAMING_NODE
4315  using interface11::port_ref;
4317 #endif // __TBB_PREVIEW_STREAMING_NODE
4318 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4320  using internal::no_priority;
4321 #endif
4322 
4323 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4324  using interface11::internal::follows;
4325  using interface11::internal::precedes;
4326  using interface11::internal::make_node_set;
4327  using interface11::internal::make_edges;
4328 #endif
4329 
4330 } // flow
4331 } // tbb
4332 
4333 // Include deduction guides for node classes
4335 
4336 #undef __TBB_PFG_RESET_ARG
4337 #undef __TBB_COMMA
4338 
4340 #undef __TBB_flow_graph_H_include_area
4341 
4342 #if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
4343  #undef __TBB_NOINLINE_SYM
4344 #endif
4345 
4346 #endif // __TBB_flow_graph_H
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3211
tuple< T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > InputTuple
Definition: flow_graph.h:3406
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4)
Definition: flow_graph.h:3010
void internal_consume(prio_operation *op) __TBB_override
Definition: flow_graph.h:2461
virtual task * try_put_task(const T &t)=0
Put item to successor; return task to run the successor if possible.
virtual bool try_get(T &)
Request an item from the sender.
Definition: flow_graph.h:425
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1689
void prepare_task_arena(bool reinit=false)
__TBB_NOINLINE_SYM split_node(graph &g)
Definition: flow_graph.h:1366
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2305
internal::unfolded_join_node< N, reserving_port, OutputTuple, reserving > unfolded_type
Definition: flow_graph.h:2907
virtual void internal_pop(buffer_operation *op)
Definition: flow_graph.h:1968
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5)
Definition: flow_graph.h:3018
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:1568
void spawn_put()
Spawns a task that applies the body.
Definition: flow_graph.h:1135
__TBB_NOINLINE_SYM broadcast_node(const broadcast_node &src)
Definition: flow_graph.h:1595
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6)
Definition: flow_graph.h:3027
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3300
limiter_node(graph &g, __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
Constructor.
Definition: flow_graph.h:2723
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3145
bool try_put(const typename internal::async_helpers< T >::filtered_type &t)
Put an item to the receiver.
Definition: flow_graph.h:464
task * try_put_task(const T &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:4265
virtual void handle_operations(buffer_operation *op_list)
Definition: flow_graph.h:1767
task * try_put_task_impl(const input_type &v)
Definition: flow_graph.h:4179
tbb::flow::interface11::graph_iterator< const graph, const tbb::flow::interface11::graph_node > const_iterator
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
task & pop_front()
Pop the front task from the list.
Definition: task.h:1098
static void fgt_node(void *, string_index, void *, void *)
__TBB_NOINLINE_SYM join_node(const join_node &other)
Definition: flow_graph.h:2923
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:4231
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8 > output_type
Definition: flow_graph.h:3371
static tbb::task *const SUCCESSFULLY_ENQUEUED
Base class for user-defined tasks.
Definition: task.h:604
pointer operator->() const
Dereference.
Definition: flow_graph.h:753
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3131
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3336
#define __TBB_NOINLINE_SYM
Definition: flow_graph.h:45
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:4019
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1)
Definition: flow_graph.h:2995
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
void const char const char int ITT_FORMAT __itt_group_sync s
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1403
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:553
A cache of successors that are broadcast to.
Definition: flow_graph.h:120
void enqueue_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Enqueues a task inside graph arena.
void increment_ref_count()
Atomically increment reference count.
Definition: task.h:760
__TBB_NOINLINE_SYM multifunction_node(const multifunction_node &other)
Definition: flow_graph.h:1320
void operator()(const Input &v, Ports &)
Definition: flow_graph.h:3805
Forwards messages of type T to all successors.
Definition: flow_graph.h:1563
virtual void internal_consume(buffer_operation *op)
Definition: flow_graph.h:1986
bool try_get(input_type &v) __TBB_override
Request an item from the sender.
Definition: flow_graph.h:4139
static void fgt_graph(void *)
Forwards messages only if the threshold has not been reached.
Definition: flow_graph.h:113
base_type::size_type size_type
Definition: flow_graph.h:2207
void add_task_to_graph_reset_list(tbb::flow::interface10::graph &g, tbb::task *tp)
implements a function node that supports Input -> (set of outputs)
Definition: flow_graph.h:1258
void __TBB_EXPORTED_METHOD reset()
Forcefully reinitializes the context after the task tree it was associated with is completed...
const_iterator cbegin() const
start const iterator
Definition: flow_graph.h:864
internal::broadcast_cache< input_type > my_successors
Definition: flow_graph.h:1574
Body copy_body(Node &n)
Returns a copy of the body from a function or continue node.
Definition: flow_graph.h:3582
bool register_predecessor(predecessor_type &src) __TBB_override
Adds src to the list of cached predecessors.
Definition: flow_graph.h:2823
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6 > output_type
Definition: flow_graph.h:3299
buffer_node< T, A >::size_type size_type
Definition: flow_graph.h:2416
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:3833
base_type::output_ports_type output_ports_type
Definition: flow_graph.h:3836
tbb::flow::interface11::graph_node * my_nodes
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7)
Definition: flow_graph.h:3036
#define __TBB_FLOW_GRAPH_PRIORITY_EXPR(expr)
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1446
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9)
Definition: flow_graph.h:3054
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:716
__TBB_DEPRECATED typedef continue_msg input_type
The input type.
Definition: flow_graph.h:595
void release_wait() __TBB_override
Inform a graph that a previous call to reserve_wait is no longer in effect.
Definition: flow_graph.h:3859
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3099
internal::tagged_msg< size_t, T0, T1 > output_type
Definition: flow_graph.h:3130
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:1038
#define __TBB_FLOW_GRAPH_PRIORITY_ARG1(arg1, priority)
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:1687
bool empty() const
True if list is empty; false otherwise.
Definition: task.h:1077
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
bool try_put(const Output &i) __TBB_override
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:3865
bool is_graph_active(tbb::flow::interface10::graph &g)
void set_ref_count(int count)
Set reference count.
Definition: task.h:750
__TBB_NOINLINE_SYM sequencer_node(graph &g, const Sequencer &s)
Constructor.
Definition: flow_graph.h:2309
internal::tagged_msg< size_t, T0 > output_type
Definition: flow_graph.h:3097
static void fgt_graph_desc(void *, const char *)
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2486
static void fgt_end_body(void *)
untyped_receiver successor_type
The successor type for this node.
Definition: flow_graph.h:303
multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type
Definition: flow_graph.h:3825
#define __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority)
item_buffer with reservable front-end. NOTE: if reserving, do not
Definition: flow_graph.h:249
try_put_functor(output_port_type &p, const Output &v)
Definition: flow_graph.h:3845
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:2776
internal::round_robin_cache< T, null_rw_mutex > my_successors
Definition: flow_graph.h:1715
A generic null type.
Definition: flow_graph.h:103
concurrency
An enumeration the provides the two most common concurrency levels: unlimited and serial...
Definition: flow_graph.h:98
Forwards messages in priority order.
Definition: flow_graph.h:2371
Implements methods for both executable and function nodes that puts Output to its successors...
Definition: flow_graph.h:854
#define __TBB_DEPRECATED_LIMITER_ARG2(arg1, arg2)
const V & cast_to(T const &t)
Definition: flow_graph.h:715
internal::tagged_msg< size_t, T0, T1, T2, T3 > output_type
Definition: flow_graph.h:3196
bool register_successor(successor_type &r) __TBB_override
Adds a successor.
Definition: flow_graph.h:1610
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2259
internal::broadcast_cache< T > my_successors
Definition: flow_graph.h:2618
void call(F &&f, Pack &&p)
Calls the given function with arguments taken from a stored_pack.
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:2133
int decrement_ref_count()
Atomically decrement reference count and returns its new value.
Definition: task.h:777
void register_node(tbb::flow::interface11::graph_node *n)
Definition: flow_graph.h:810
A cache of predecessors that only supports try_get.
Definition: flow_graph.h:122
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3178
K key_from_message(const T &t)
Definition: flow_graph.h:713
A lock that occupies a single byte.
Definition: spin_mutex.h:39
bool internal_push(sequencer_operation *op) __TBB_override
Definition: flow_graph.h:2346
An abstract cache of successors.
Definition: flow_graph.h:119
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Definition: task.h:652
virtual void internal_reserve(buffer_operation *op)
Definition: flow_graph.h:1977
Enables one or the other code branches.
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3372
Implements an executable node that supports continue_msg -> Output.
Definition: flow_graph.h:1439
__TBB_NOINLINE_SYM function_node(graph &g, size_t concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body body, node_priority_t priority=tbb::flow::internal::no_priority))
Constructor.
Definition: flow_graph.h:1179
internal::continue_input< Output, Policy > input_impl_type
Definition: flow_graph.h:1444
#define __TBB_DEPRECATED_LIMITER_ARG4(arg1, arg2, arg3, arg4)
void release_wait() __TBB_override
Deregisters an external entity that may have interacted with the graph.
Definition: flow_graph.h:803
Pure virtual template class that defines a receiver of messages of type T.
Definition: flow_graph.h:110
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8)
Definition: flow_graph.h:3045
limiter_node(const limiter_node &src)
Copy constructor.
Definition: flow_graph.h:2742
Implements a function node that supports Input -> Output.
Definition: flow_graph.h:1159
__TBB_NOINLINE_SYM queue_node(const queue_node &src)
Copy constructor.
Definition: flow_graph.h:2277
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:4149
task * try_put_task(const T &t) __TBB_override
build a task to run the successor if possible. Default is old behavior.
Definition: flow_graph.h:1677
internal::source_body< output_type > * my_init_body
Definition: flow_graph.h:1096
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:2123
void internal_consume(queue_operation *op) __TBB_override
Definition: flow_graph.h:2251
Detects whether two given types are the same.
Implements methods for a function node that takes a type Input as input and sends.
Definition: flow_graph.h:421
virtual bool try_reserve_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:441
Base class for receivers of completion messages.
Definition: flow_graph.h:591
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:1921
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1447
void internal_make_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3436
static void fgt_release_wait(void *)
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1165
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3132
__TBB_NOINLINE_SYM async_node(const async_node &other)
Definition: flow_graph.h:3935
receiver< TupleType > base_type
Definition: flow_graph.h:1349
bool gather_successful_try_puts(const X &t, task_list &tasks)
Definition: flow_graph.h:511
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type ...
Definition: flow_graph.h:1508
void make_edge(sender< T > &p, receiver< T > &s)
Makes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3451
internal::async_body_base< gateway_type > async_body_base_type
Definition: flow_graph.h:3835
__TBB_NOINLINE_SYM continue_node(graph &g,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1451
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2189
leaf for multifunction. OutputSet can be a std::tuple or a vector.
Definition: flow_graph.h:203
static void fgt_remove_edge(void *, void *)
internal::aggregator< handler_type, buffer_operation > my_aggregator
Definition: flow_graph.h:1765
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2218
Forwards messages in sequence order.
Definition: flow_graph.h:2297
void internal_reserve(queue_operation *op) __TBB_override
Definition: flow_graph.h:2242
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:962
bool try_reserve(output_type &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:1011
static void fgt_composite(void *, void *, void *)
A cache of successors that are put in a round-robin fashion.
Definition: flow_graph.h:121
#define __TBB_DEPRECATED_LIMITER_EXPR(expr)
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:2873
base_type::buffer_operation queue_operation
Definition: flow_graph.h:2208
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:4187
Forwards messages in arbitrary order.
Definition: flow_graph.h:1702
bool is_continue_receiver() __TBB_override
Definition: flow_graph.h:705
task * try_put_task(const input_type &) __TBB_override
Definition: flow_graph.h:664
Breaks an infinite loop between the node reservation and register_successor call. ...
Definition: flow_graph.h:4192
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1341
virtual bool try_consume()
Consumes the reserved item.
Definition: flow_graph.h:321
tbb::flow::interface11::graph_node * my_nodes_last
void reset(tbb::flow::interface11::reset_flags f=tbb::flow::interface11::rf_reset_protocol)
Definition: flow_graph.h:833
untyped_sender predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:362
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1552
receiver_type::predecessor_type predecessor_type
Definition: flow_graph.h:3832
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1240
__TBB_NOINLINE_SYM continue_node(graph &g, int number_of_predecessors,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1488
static void fgt_node_with_body(void *, string_index, void *, void *, void *)
static void fgt_async_commit(void *, void *)
internal::function_body< T, size_t > * my_sequencer
Definition: flow_graph.h:2298
__TBB_DEPRECATED continue_receiver(const continue_receiver &src)
Copy constructor.
Definition: flow_graph.h:609
__TBB_DEPRECATED typedef receiver< input_type >::predecessor_type predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:598
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
bool remove_successor(successor_type &r) __TBB_override
Removes a successor.
Definition: flow_graph.h:2097
A task that calls a node&#39;s forward_task function.
Definition: flow_graph.h:271
static void fgt_async_try_put_end(void *, void *)
task * apply_body_bypass()
Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it...
Definition: flow_graph.h:1143
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t size
buffer_node< T, A >::buffer_operation sequencer_operation
Definition: flow_graph.h:2343
static void fgt_async_try_put_begin(void *, void *)
void reset_receiver(reset_flags f) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:690
static task * emit_this(graph &g, const T &t, P &p)
Definition: flow_graph.h:733
virtual bool register_predecessor(predecessor_type &)
Add a predecessor to the node.
Definition: flow_graph.h:381
virtual bool remove_predecessor(predecessor_type &)
Remove a predecessor from the node.
Definition: flow_graph.h:384
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:1683
void remove_edge(sender< T > &p, receiver< T > &s)
Removes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3515
void reserve_wait() __TBB_override
Used to register that an external entity may still interact with the graph.
Definition: flow_graph.h:796
buffer_node< T, A >::item_type item_type
Definition: flow_graph.h:2417
Implements async node.
Definition: flow_graph.h:3824
task * try_put_task(const T &t) __TBB_override
__TBB_NOINLINE_SYM priority_queue_node(graph &g, const Compare &comp=Compare())
Constructor.
Definition: flow_graph.h:2381
bool register_successor(successor_type &s) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:4057
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3264
bool register_successor(successor_type &r) __TBB_override
Adds a new successor.
Definition: flow_graph.h:2035
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:3957
__TBB_NOINLINE_SYM write_once_node(const write_once_node &src)
Copy constructor: call base class copy constructor.
Definition: flow_graph.h:4249
__TBB_NOINLINE_SYM write_once_node(graph &g)
Constructor.
Definition: flow_graph.h:4235
void activate_graph(tbb::flow::interface10::graph &g)
Forward declaration section.
Definition: flow_graph.h:109
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3)
Definition: flow_graph.h:3005
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2260
An executable node that acts as a source, i.e. it has no predecessors.
Definition: flow_graph.h:894
buffer_node< T, A >::buffer_operation prio_operation
Definition: flow_graph.h:2418
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:191
task * decrement_counter(long long delta)
Definition: flow_graph.h:2689
internal::function_input_queue< input_type, Allocator > input_queue_type
Definition: flow_graph.h:1278
task that does nothing. Useful for synchronization.
Definition: task.h:1031
void fgt_multiinput_multioutput_node_desc(const NodeType *, const char *)
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3230
register_predecessor_task(predecessor_type &owner, successor_type &succ)
Definition: flow_graph.h:4194
Implements methods for an executable node that takes continue_msg as input.
Definition: flow_graph.h:753
__TBB_NOINLINE_SYM async_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:3895
void handle_operations(prio_operation *op_list) __TBB_override
Definition: flow_graph.h:2425
Used to form groups of tasks.
Definition: task.h:347
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2377
void internal_release(prio_operation *op) __TBB_override
Definition: flow_graph.h:2467
internal::function_input_queue< input_type, Allocator > input_queue_type
Definition: flow_graph.h:1164
__TBB_NOINLINE_SYM broadcast_node(graph &g)
Definition: flow_graph.h:1581
Implements methods for a function node that takes a type Input as input.
Definition: flow_graph.h:638
internal::tagged_msg< size_t, T0, T1, T2 > output_type
Definition: flow_graph.h:3163
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:1707
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2)
Definition: flow_graph.h:3000
virtual task * execute()=0
Should be overridden by derived classes.
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2178
virtual bool try_reserve(T &)
Reserves an item in the sender.
Definition: flow_graph.h:428
reference operator*() const
Dereference.
Definition: flow_graph.h:747
const_iterator cend() const
end const iterator
Definition: flow_graph.h:866
virtual task * forward_task()
This is executed by an enqueued task, the "forwarder".
Definition: flow_graph.h:1835
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1242
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3386
A task that calls a node&#39;s apply_body_bypass function with no input.
Definition: flow_graph.h:321
internal::source_body< output_type > * my_body
Definition: flow_graph.h:1095
Base class for tasks generated by graph nodes.
tbb::task_group_context * my_context
static void fgt_async_reserve(void *, void *)
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls...
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3408
void internal_forward_task(prio_operation *op) __TBB_override
Tries to forward valid items to successors.
Definition: flow_graph.h:2421
__TBB_NOINLINE_SYM sequencer_node(const sequencer_node &src)
Copy constructor.
Definition: flow_graph.h:2325
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2603
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:4018
void internal_pop(prio_operation *op) __TBB_override
Definition: flow_graph.h:2435
bool try_reserve_apply_body(output_type &v)
Definition: flow_graph.h:1103
__TBB_NOINLINE_SYM overwrite_node(const overwrite_node &src)
Copy constructor; doesn&#39;t take anything from src; default won&#39;t work.
Definition: flow_graph.h:4041
internal::wrap_tuple_elements< N, internal::multifunction_output, Output >::type output_ports_type
Definition: flow_graph.h:1276
bool remove_successor(successor_type &s) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:4081
buffer_node< T, A >::size_type size_type
Definition: flow_graph.h:2342
unsigned int node_priority_t
iterator begin()
start iterator
Definition: flow_graph.h:856
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:4217
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3197
internal::wrap_tuple_elements< N, internal::multifunction_output, TupleType >::type output_ports_type
Definition: flow_graph.h:1364
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1445
bool try_put_impl(const Output &i)
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:3877
A list of children.
Definition: task.h:1063
An empty class used for messages that mean "I&#39;m done".
Definition: flow_graph.h:106
internal::broadcast_cache< output_type > my_successors
Definition: flow_graph.h:1097
static void fgt_begin_body(void *)
void add_nodes_impl(CompositeType *, bool)
Definition: flow_graph.h:958
void internal_pop(queue_operation *op) __TBB_override
Definition: flow_graph.h:2233
__TBB_NOINLINE_SYM source_node(const source_node &src)
Copy constructor.
Definition: flow_graph.h:932
bool remove_successor(successor_type &r) __TBB_override
Removes s as a successor.
Definition: flow_graph.h:1616
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
__TBB_NOINLINE_SYM join_node(const join_node &other)
Definition: flow_graph.h:2956
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:1028
interface11::internal::Policy< queueing, lightweight > queueing_lightweight
Definition: flow_graph.h:88
#define __TBB_override
Definition: tbb_stddef.h:240
bool try_put(const X &t)
Put an item to the receiver.
Definition: flow_graph.h:369
void spawn_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3422
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2411
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:1567
task * try_put_task(const T &t) __TBB_override
receive an item, return a task *if possible
Definition: flow_graph.h:2155
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:1409
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:1706
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3278
iterator end()
end iterator
Definition: flow_graph.h:858
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3098
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
async_body_base< Gateway > base_type
Definition: flow_graph.h:3799
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3164
receiver< input_type > receiver_type
Definition: flow_graph.h:3831
tbb::flow::interface11::graph_iterator< graph, tbb::flow::interface11::graph_node > iterator
internal::port_ref_impl< N1, N2 > port_ref()
Definition: flow_graph.h:42
async_body(const Body &body, gateway_type *gateway)
Definition: flow_graph.h:3802
virtual bool internal_push(buffer_operation *op)
Definition: flow_graph.h:1962
void internal_remove_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3499
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1166
sender< output_type >::successor_type successor_type
The type of successors of this node.
Definition: flow_graph.h:900
bool register_successor(successor_type &r) __TBB_override
Replace the current successor with this new successor.
Definition: flow_graph.h:2759
void internal_reserve(prio_operation *op) __TBB_override
Definition: flow_graph.h:2449
#define __TBB_CPP11_PRESENT
Definition: tbb_config.h:149
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5 > output_type
Definition: flow_graph.h:3263
virtual bool register_successor(successor_type &r)=0
Add a new successor to this node.
static void fgt_multioutput_node_desc(const NodeType *, const char *)
void remove_node(tbb::flow::interface11::graph_node *n)
Definition: flow_graph.h:821
bool try_put(const typename internal::async_helpers< T >::async_type &t)
Definition: flow_graph.h:468
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:4174
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3409
void const char const char int ITT_FORMAT __itt_group_sync p
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:953
output_ports_type & output_ports()
Definition: flow_graph.h:1395
bool try_reserve(X &t)
Reserves an item in the sender.
Definition: flow_graph.h:342
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
virtual bool remove_successor(successor_type &r)=0
Removes a successor from this node.
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:4215
internal::decrementer< limiter_node< T, DecrementType >, DecrementType > decrement
The internal receiver< DecrementType > that decrements the count.
Definition: flow_graph.h:2714
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7 > output_type
Definition: flow_graph.h:3335
void internal_forward_task(queue_operation *op) __TBB_override
Tries to forward valid items to successors.
Definition: flow_graph.h:2229
#define CODEPTR()
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2879
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2875
__TBB_DEPRECATED bool register_predecessor(predecessor_type &) __TBB_override
Increments the trigger threshold.
Definition: flow_graph.h:616
internal::unfolded_join_node< N, queueing_port, OutputTuple, queueing > unfolded_type
Definition: flow_graph.h:2940
STL namespace.
__TBB_DEPRECATED bool remove_predecessor(predecessor_type &) __TBB_override
Decrements the trigger threshold.
Definition: flow_graph.h:626
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2602
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
static const void * to_void_ptr(const T &t)
Definition: flow_graph.h:221
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:3962
__TBB_NOINLINE_SYM overwrite_node(graph &g)
Definition: flow_graph.h:4027
virtual bool try_release()
Releases the reserved item.
Definition: flow_graph.h:318
bool try_get(T &v) __TBB_override
Request an item from the buffer_node.
Definition: flow_graph.h:2112
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > output_type
Definition: flow_graph.h:3407
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:2174
__TBB_NOINLINE_SYM buffer_node(graph &g)
Constructor.
Definition: flow_graph.h:1998
internal::multifunction_input< input_type, output_ports_type, Policy, Allocator > base_type
Definition: flow_graph.h:1280
virtual void internal_reg_succ(buffer_operation *op)
Register successor.
Definition: flow_graph.h:1852
bool remove_predecessor(predecessor_type &src) __TBB_override
Removes src from the list of cached predecessors.
Definition: flow_graph.h:2835
virtual bool try_get_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:431
__TBB_NOINLINE_SYM split_node(const split_node &other)
Definition: flow_graph.h:1381
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2378
static void fgt_multiinput_multioutput_node(void *, string_index, void *, void *)
static void fgt_make_edge(void *, void *)
task * grab_forwarding_task(buffer_operation &op_data)
Definition: flow_graph.h:1821
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:2142
split_node: accepts a tuple as input, forwards each element of the tuple to its
Definition: flow_graph.h:1347
task * try_put_task(const TupleType &t) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:1398
bool try_release() __TBB_override
Releases the reserved item.
Definition: flow_graph.h:4154
virtual void reset_node(reset_flags f=rf_reset_protocol)=0
bool try_get(X &t)
Request an item from the sender.
Definition: flow_graph.h:336
static void fgt_node_desc(const NodeType *, const char *)
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:4002
internal::broadcast_cache< input_type, null_rw_mutex > my_successors
Definition: flow_graph.h:4209
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:4232
task * try_put_task(const T &t) __TBB_override
Puts an item to this receiver.
Definition: flow_graph.h:2846
bool internal_push(prio_operation *op) __TBB_override
Definition: flow_graph.h:2429
__TBB_NOINLINE_SYM continue_node(const continue_node &src)
Copy constructor.
Definition: flow_graph.h:1526
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3112
The base of all graph nodes.
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
Definition: flow_graph.h:1510
internal::multifunction_output< Output > output_port_type
Definition: flow_graph.h:3840
internal::multifunction_input< Input, typename base_type::output_ports_type, Policy, Allocator > mfn_input_type
Definition: flow_graph.h:3826
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
Definition: flow_graph.h:4197
void deactivate_graph(tbb::flow::interface10::graph &g)
static void fgt_reserve_wait(void *)
internal::tagged_msg< size_t, T0, T1, T2, T3, T4 > output_type
Definition: flow_graph.h:3229
__TBB_DEPRECATED continue_receiver(__TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority))
Constructor.
Definition: flow_graph.h:601
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2304
void reserve_wait() __TBB_override
Inform a graph that messages may come from outside, to prevent premature graph completion.
Definition: flow_graph.h:3854
virtual void internal_release(buffer_operation *op)
Definition: flow_graph.h:1991
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1167
bool try_consume() __TBB_override
Consumes the reserved item.
Definition: flow_graph.h:4157
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3314
#define __TBB_DEPRECATED
Definition: tbb_config.h:639
virtual void finalize() const
Definition: flow_graph.h:147
virtual task * try_put_task_wrapper(const void *p, bool is_async) __TBB_override
Definition: flow_graph.h:473
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2290
__TBB_NOINLINE_SYM queue_node(graph &g)
Constructor.
Definition: flow_graph.h:2263
receiver_gateway< output_type > gateway_type
Definition: flow_graph.h:3834
internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors
Definition: flow_graph.h:2616
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3244
static const node_priority_t no_priority
function_body that takes an Input and a set of output ports
Definition: flow_graph.h:193
internal::multifunction_input< input_type, output_ports_type, Policy, Allocator > input_impl_type
Definition: flow_graph.h:1277
An cache of predecessors that supports requests and reservations.
Definition: flow_graph.h:123
Forwards messages in FIFO order.
Definition: flow_graph.h:2204
internal::function_input< input_type, output_type, Policy, Allocator > input_impl_type
Definition: flow_graph.h:1163
__TBB_NOINLINE_SYM priority_queue_node(const priority_queue_node &src)
Copy constructor.
Definition: flow_graph.h:2397
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
__TBB_NOINLINE_SYM buffer_node(const buffer_node &src)
Copy constructor.
Definition: flow_graph.h:2014
tbb::internal::uint64_t tag_value
Definition: flow_graph.h:29
graph()
Constructs a graph with isolated task_group_context.
Definition: flow_graph.h:766
The graph class.
bool try_get(output_type &v) __TBB_override
Request an item from the node.
Definition: flow_graph.h:994
~graph()
Destroys the graph.
Definition: flow_graph.h:788
tbb::flow::tuple_element< N, typename MOP::output_ports_type >::type & output_port(MOP &op)
Definition: flow_graph.h:719
internal::unfolded_join_node< N, key_matching_port, OutputTuple, key_matching< K, KHash > > unfolded_type
Definition: flow_graph.h:2976
Output output_type
The type of the output message, which is complete.
Definition: flow_graph.h:897
internal::aggregating_functor< class_type, buffer_operation > handler_type
Definition: flow_graph.h:1763
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
Definition: flow_graph.h:1996
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.