Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_streaming_node.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_streaming_H
18 #define __TBB_flow_graph_streaming_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 #if __TBB_PREVIEW_STREAMING_NODE
25 
26 // Included in namespace tbb::flow::interfaceX (in flow_graph.h)
27 
28 namespace internal {
29 
30 template <int N1, int N2>
31 struct port_ref_impl {
32  // "+1" since the port_ref range is a closed interval (includes its endpoints).
33  static const int size = N2 - N1 + 1;
34 };
35 
36 } // internal
37 
38 // The purpose of the port_ref_impl is the pretty syntax: the deduction of a compile-time constant is processed from the return type.
39 // So it is possible to use this helper without parentheses, e.g. "port_ref<0>".
40 template <int N1, int N2 = N1>
43 };
44 
45 namespace internal {
46 
47 template <typename T>
48 struct num_arguments {
49  static const int value = 1;
50 };
51 
52 template <int N1, int N2>
53 struct num_arguments<port_ref_impl<N1,N2>(*)()> {
54  static const int value = port_ref_impl<N1,N2>::size;
55 };
56 
57 template <int N1, int N2>
58 struct num_arguments<port_ref_impl<N1,N2>> {
59  static const int value = port_ref_impl<N1,N2>::size;
60 };
61 
62 template <typename... Args>
63 void ignore_return_values( Args&&... ) {}
64 
65 template <typename T>
66 T or_return_values( T&& t ) { return t; }
67 template <typename T, typename... Rest>
68 T or_return_values( T&& t, Rest&&... rest ) {
69  return t | or_return_values( std::forward<Rest>(rest)... );
70 }
71 
72 template<typename JP>
74  typedef size_t type;
76 };
77 
78 template<typename Key>
79 struct key_from_policy< key_matching<Key> > {
80  typedef Key type;
82 };
83 
84 template<typename Key>
85 struct key_from_policy< key_matching<Key&> > {
86  typedef const Key &type;
88 };
89 
90 template<typename Device, typename Key>
92  Device my_device;
94 public:
95  // TODO: investigate why default constructor is required
97  streaming_device_with_key( const Device& d, Key k ) : my_device( d ), my_key( k ) {}
98  Key key() const { return my_key; }
99  const Device& device() const { return my_device; }
100 };
101 
102 // --------- Kernel argument helpers --------- //
103 template <typename T>
106 };
107 
108 template <int N1, int N2>
109 struct is_port_ref_impl< port_ref_impl<N1, N2> > {
111 };
112 
113 template <int N1, int N2>
114 struct is_port_ref_impl< port_ref_impl<N1, N2>( * )() > {
116 };
117 
118 template <typename T>
119 struct is_port_ref {
121 };
122 
123 template <typename ...Args1>
125 
126 template <typename A1, typename ...Args1>
127 struct convert_and_call_impl<A1, Args1...> {
128  static const size_t my_delta = 1; // Index 0 contains device
129 
130  template <typename F, typename Tuple, typename ...Args2>
131  static void doit(F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
132  convert_and_call_impl<A1, Args1...>::doit_impl(typename is_port_ref<A1>::type(), f, t, a1, args1..., args2...);
133  }
134  template <typename F, typename Tuple, typename ...Args2>
135  static void doit_impl(std::false_type, F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
136  convert_and_call_impl<Args1...>::doit(f, t, args1..., args2..., a1);
137  }
138  template <typename F, typename Tuple, int N1, int N2, typename ...Args2>
139  static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N1, N2>, Args1&... args1, Args2&... args2) {
140  convert_and_call_impl<port_ref_impl<N1 + 1,N2>, Args1...>::doit_impl(x, f, t, port_ref<N1 + 1, N2>(), args1...,
141  args2..., std::get<N1 + my_delta>(t));
142  }
143  template <typename F, typename Tuple, int N, typename ...Args2>
144  static void doit_impl(std::true_type, F& f, Tuple& t, port_ref_impl<N, N>, Args1&... args1, Args2&... args2) {
145  convert_and_call_impl<Args1...>::doit(f, t, args1..., args2..., std::get<N + my_delta>(t));
146  }
147 
148  template <typename F, typename Tuple, int N1, int N2, typename ...Args2>
149  static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N1, N2>(* fn)(), Args1&... args1, Args2&... args2) {
150  doit_impl(x, f, t, fn(), args1..., args2...);
151  }
152  template <typename F, typename Tuple, int N, typename ...Args2>
153  static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N, N>(* fn)(), Args1&... args1, Args2&... args2) {
154  doit_impl(x, f, t, fn(), args1..., args2...);
155  }
156 };
157 
158 template <>
160  template <typename F, typename Tuple, typename ...Args2>
161  static void doit(F& f, Tuple&, Args2&... args2) {
162  f(args2...);
163  }
164 };
165 // ------------------------------------------- //
166 
167 template<typename JP, typename StreamFactory, typename... Ports>
169  // Do not use 'using' instead of 'struct' because Microsoft Visual C++ 12.0 fails to compile.
170  template <typename T>
171  struct async_msg_type {
172  typedef typename StreamFactory::template async_msg_type<T> type;
173  };
174 
179 
180  // indexer_node parameters pack expansion workaround for VS2013 for streaming_node
182 };
183 
184 // Default empty implementation
185 template<typename StreamFactory, typename KernelInputTuple, typename = void>
187  typedef typename StreamFactory::device_type device_type;
188  typedef typename StreamFactory::kernel_type kernel_type;
189  typedef KernelInputTuple kernel_input_tuple;
190 protected:
191  template <typename ...Args>
192  void enqueue_kernel_impl( kernel_input_tuple&, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
193  factory.send_kernel( device, kernel, args... );
194  }
195 };
196 
197 // Implementation for StreamFactory supporting range
198 template<typename StreamFactory, typename KernelInputTuple>
199 class kernel_executor_helper<StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type > {
200  typedef typename StreamFactory::device_type device_type;
201  typedef typename StreamFactory::kernel_type kernel_type;
202  typedef KernelInputTuple kernel_input_tuple;
203 
204  typedef typename StreamFactory::range_type range_type;
205 
206  // Container for randge. It can contain either port references or real range.
207  struct range_wrapper {
208  virtual range_type get_range( const kernel_input_tuple &ip ) const = 0;
209  virtual range_wrapper *clone() const = 0;
210  virtual ~range_wrapper() {}
211  };
212 
213  struct range_value : public range_wrapper {
214  range_value( const range_type& value ) : my_value(value) {}
215 
216  range_value( range_type&& value ) : my_value(std::move(value)) {}
217 
218  range_type get_range( const kernel_input_tuple & ) const __TBB_override {
219  return my_value;
220  }
221 
222  range_wrapper *clone() const __TBB_override {
223  return new range_value(my_value);
224  }
225  private:
226  range_type my_value;
227  };
228 
229  template <int N>
230  struct range_mapper : public range_wrapper {
232 
233  range_type get_range( const kernel_input_tuple &ip ) const __TBB_override {
234  // "+1" since get<0>(ip) is StreamFactory::device.
235  return get<N + 1>(ip).data(false);
236  }
237 
238  range_wrapper *clone() const __TBB_override {
239  return new range_mapper<N>;
240  }
241  };
242 
243 protected:
244  template <typename ...Args>
245  void enqueue_kernel_impl( kernel_input_tuple& ip, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
246  __TBB_ASSERT(my_range_wrapper, "Range is not set. Call set_range() before running streaming_node.");
247  factory.send_kernel( device, kernel, my_range_wrapper->get_range(ip), args... );
248  }
249 
250 public:
251  kernel_executor_helper() : my_range_wrapper(NULL) {}
252 
253  kernel_executor_helper(const kernel_executor_helper& executor) : my_range_wrapper(executor.my_range_wrapper ? executor.my_range_wrapper->clone() : NULL) {}
254 
255  kernel_executor_helper(kernel_executor_helper&& executor) : my_range_wrapper(executor.my_range_wrapper) {
256  // Set moving holder mappers to NULL to prevent double deallocation
257  executor.my_range_wrapper = NULL;
258  }
259 
261  if (my_range_wrapper) delete my_range_wrapper;
262  }
263 
264  void set_range(const range_type& work_size) {
265  my_range_wrapper = new range_value(work_size);
266  }
267 
268  void set_range(range_type&& work_size) {
269  my_range_wrapper = new range_value(std::move(work_size));
270  }
271 
272  template <int N>
274  my_range_wrapper = new range_mapper<N>;
275  }
276 
277  template <int N>
279  my_range_wrapper = new range_mapper<N>;
280  }
281 
282 private:
283  range_wrapper* my_range_wrapper;
284 };
285 
286 } // internal
287 
288 /*
289 /---------------------------------------- streaming_node ------------------------------------\
290 | |
291 | /--------------\ /----------------------\ /-----------\ /----------------------\ |
292 | | | | (device_with_key) O---O | | | |
293 | | | | | | | | | |
294 O---O indexer_node O---O device_selector_node O---O join_node O---O kernel_node O---O
295 | | | | (multifunction_node) | | | | (multifunction_node) | |
296 O---O | | O---O | | O---O
297 | \--------------/ \----------------------/ \-----------/ \----------------------/ |
298 | |
299 \--------------------------------------------------------------------------------------------/
300 */
301 template<typename... Args>
303 
304 template<typename... Ports, typename JP, typename StreamFactory>
305 class streaming_node< tuple<Ports...>, JP, StreamFactory >
306  : public composite_node < typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple,
307  typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple >
308  , public internal::kernel_executor_helper< StreamFactory, typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::kernel_input_tuple >
309 {
310  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple input_tuple;
311  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple output_tuple;
313 protected:
314  typedef typename StreamFactory::device_type device_type;
315  typedef typename StreamFactory::kernel_type kernel_type;
316 private:
318  typedef composite_node<input_tuple, output_tuple> base_type;
319  static const size_t NUM_INPUTS = tuple_size<input_tuple>::value;
320  static const size_t NUM_OUTPUTS = tuple_size<output_tuple>::value;
321 
324 
325  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::indexer_node_type indexer_node_type;
326  typedef typename indexer_node_type::output_type indexer_node_output_type;
327  typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::kernel_input_tuple kernel_input_tuple;
328  typedef multifunction_node<indexer_node_output_type, kernel_input_tuple> device_selector_node;
329  typedef multifunction_node<kernel_input_tuple, output_tuple> kernel_multifunction_node;
330 
331  template <int... S>
332  typename base_type::input_ports_type get_input_ports( internal::sequence<S...> ) {
333  return std::tie( internal::input_port<S>( my_indexer_node )... );
334  }
335 
336  template <int... S>
337  typename base_type::output_ports_type get_output_ports( internal::sequence<S...> ) {
338  return std::tie( internal::output_port<S>( my_kernel_node )... );
339  }
340 
341  typename base_type::input_ports_type get_input_ports() {
342  return get_input_ports( input_sequence() );
343  }
344 
345  typename base_type::output_ports_type get_output_ports() {
346  return get_output_ports( output_sequence() );
347  }
348 
349  template <int N>
351  make_edge( internal::output_port<N>( my_device_selector_node ), internal::input_port<N>( my_join_node ) );
352  return 0;
353  }
354 
355  template <int... S>
357  make_edge( my_indexer_node, my_device_selector_node );
358  make_edge( my_device_selector_node, my_join_node );
359  internal::ignore_return_values( make_Nth_edge<S + 1>()... );
360  make_edge( my_join_node, my_kernel_node );
361  }
362 
363  void make_edges() {
364  make_edges( input_sequence() );
365  }
366 
367  class device_selector_base {
368  public:
369  virtual void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) = 0;
370  virtual device_selector_base *clone( streaming_node &n ) const = 0;
372  };
373 
374  template <typename UserFunctor>
375  class device_selector : public device_selector_base, tbb::internal::no_assign {
376  public:
377  device_selector( UserFunctor uf, streaming_node &n, StreamFactory &f )
378  : my_dispatch_funcs( create_dispatch_funcs( input_sequence() ) )
379  , my_user_functor( uf ), my_node(n), my_factory( f )
380  {
381  my_port_epoches.fill( 0 );
382  }
383 
384  void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) __TBB_override {
385  (this->*my_dispatch_funcs[ v.tag() ])( my_port_epoches[ v.tag() ], v, op );
387  || my_port_epoches[v.tag()] == 0, "Epoch is changed when key matching is requested" );
388  }
389 
390  device_selector_base *clone( streaming_node &n ) const __TBB_override {
391  return new device_selector( my_user_functor, n, my_factory );
392  }
393  private:
394  typedef void(device_selector<UserFunctor>::*send_and_put_fn_type)(size_t &, const indexer_node_output_type &, typename device_selector_node::output_ports_type &);
395  typedef std::array < send_and_put_fn_type, NUM_INPUTS > dispatch_funcs_type;
396 
397  template <int... S>
398  static dispatch_funcs_type create_dispatch_funcs( internal::sequence<S...> ) {
399  dispatch_funcs_type dispatch = { { &device_selector<UserFunctor>::send_and_put_impl<S>... } };
400  return dispatch;
401  }
402 
403  template <typename T>
404  key_type get_key( std::false_type, const T &, size_t &epoch ) {
406  return epoch++;
407  }
408 
409  template <typename T>
410  key_type get_key( std::true_type, const T &t, size_t &/*epoch*/ ) {
412  return key_from_message<key_type>( t );
413  }
414 
415  template <int N>
416  void send_and_put_impl( size_t &epoch, const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
417  typedef typename tuple_element<N + 1, typename device_selector_node::output_ports_type>::type::output_type elem_type;
418  elem_type e = internal::cast_to<elem_type>( v );
419  device_type device = get_device( get_key( typename internal::key_from_policy<JP>::is_key_matching(), e, epoch ), get<0>( op ) );
420  my_factory.send_data( device, e );
421  get<N + 1>( op ).try_put( e );
422  }
423 
424  template< typename DevicePort >
425  device_type get_device( key_type key, DevicePort& dp ) {
426  typename std::unordered_map<typename std::decay<key_type>::type, epoch_desc>::iterator it = my_devices.find( key );
427  if ( it == my_devices.end() ) {
428  device_type d = my_user_functor( my_factory );
429  std::tie( it, std::ignore ) = my_devices.insert( std::make_pair( key, d ) );
430  bool res = dp.try_put( device_with_key_type( d, key ) );
431  __TBB_ASSERT_EX( res, NULL );
432  my_node.notify_new_device( d );
433  }
434  epoch_desc &e = it->second;
435  device_type d = e.my_device;
436  if ( ++e.my_request_number == NUM_INPUTS ) my_devices.erase( it );
437  return d;
438  }
439 
440  struct epoch_desc {
441  epoch_desc(device_type d ) : my_device( d ), my_request_number( 0 ) {}
442  device_type my_device;
444  };
445 
447  std::array<size_t, NUM_INPUTS> my_port_epoches;
448  dispatch_funcs_type my_dispatch_funcs;
449  UserFunctor my_user_functor;
451  StreamFactory &my_factory;
452  };
453 
454  class device_selector_body {
455  public:
456  device_selector_body( device_selector_base *d ) : my_device_selector( d ) {}
457 
458  void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
459  (*my_device_selector)(v, op);
460  }
461  private:
462  device_selector_base *my_device_selector;
463  };
464 
465  // TODO: investigate why copy-construction is disallowed
466  class args_storage_base : tbb::internal::no_copy {
467  public:
468  typedef typename kernel_multifunction_node::output_ports_type output_ports_type;
469 
470  virtual void enqueue( kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n ) = 0;
471  virtual void send( device_type d ) = 0;
472  virtual args_storage_base *clone() const = 0;
473  virtual ~args_storage_base () {}
474 
475  protected:
476  args_storage_base( const kernel_type& kernel, StreamFactory &f )
477  : my_kernel( kernel ), my_factory( f )
478  {}
479 
480  args_storage_base( const args_storage_base &k )
481  : tbb::internal::no_copy(), my_kernel( k.my_kernel ), my_factory( k.my_factory )
482  {}
483 
484  const kernel_type my_kernel;
485  StreamFactory &my_factory;
486  };
487 
488  template <typename... Args>
489  class args_storage : public args_storage_base {
491 
492  // ---------- Update events helpers ---------- //
493  template <int N>
494  bool do_try_put( const kernel_input_tuple& ip, output_ports_type &op ) const {
495  const auto& t = get<N + 1>( ip );
496  auto &port = get<N>( op );
497  return port.try_put( t );
498  }
499 
500  template <int... S>
501  bool do_try_put( const kernel_input_tuple& ip, output_ports_type &op, internal::sequence<S...> ) const {
502  return internal::or_return_values( do_try_put<S>( ip, op )... );
503  }
504 
505  // ------------------------------------------- //
506  class run_kernel_func : tbb::internal::no_assign {
507  public:
508  run_kernel_func( kernel_input_tuple &ip, const streaming_node &node, const args_storage& storage )
509  : my_kernel_func( ip, node, storage, get<0>(ip).device() ) {}
510 
511  // It is immpossible to use Args... because a function pointer cannot be casted to a function reference implicitly.
512  // Allow the compiler to deduce types for function pointers automatically.
513  template <typename... FnArgs>
514  void operator()( FnArgs&... args ) {
515  internal::convert_and_call_impl<FnArgs...>::doit( my_kernel_func, my_kernel_func.my_ip, args... );
516  }
517  private:
518  struct kernel_func : tbb::internal::no_copy {
519  kernel_input_tuple &my_ip;
521  const args_storage& my_storage;
522  device_type my_device;
523 
524  kernel_func( kernel_input_tuple &ip, const streaming_node &node, const args_storage& storage, device_type device )
525  : my_ip( ip ), my_node( node ), my_storage( storage ), my_device( device )
526  {}
527 
528  template <typename... FnArgs>
529  void operator()( FnArgs&... args ) {
530  my_node.enqueue_kernel( my_ip, my_storage.my_factory, my_device, my_storage.my_kernel, args... );
531  }
532  } my_kernel_func;
533  };
534 
535  template<typename FinalizeFn>
536  class run_finalize_func : tbb::internal::no_assign {
537  public:
538  run_finalize_func( kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn )
539  : my_ip( ip ), my_finalize_func( factory, get<0>(ip).device(), fn ) {}
540 
541  // It is immpossible to use Args... because a function pointer cannot be casted to a function reference implicitly.
542  // Allow the compiler to deduce types for function pointers automatically.
543  template <typename... FnArgs>
544  void operator()( FnArgs&... args ) {
545  internal::convert_and_call_impl<FnArgs...>::doit( my_finalize_func, my_ip, args... );
546  }
547  private:
548  kernel_input_tuple &my_ip;
549 
550  struct finalize_func : tbb::internal::no_assign {
551  StreamFactory &my_factory;
552  device_type my_device;
553  FinalizeFn my_fn;
554 
555  finalize_func( StreamFactory &factory, device_type device, FinalizeFn fn )
556  : my_factory(factory), my_device(device), my_fn(fn) {}
557 
558  template <typename... FnArgs>
559  void operator()( FnArgs&... args ) {
560  my_factory.finalize( my_device, my_fn, args... );
561  }
562  } my_finalize_func;
563  };
564 
565  template<typename FinalizeFn>
566  static run_finalize_func<FinalizeFn> make_run_finalize_func( kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn ) {
567  return run_finalize_func<FinalizeFn>( ip, factory, fn );
568  }
569 
570  class send_func : tbb::internal::no_assign {
571  public:
572  send_func( StreamFactory &factory, device_type d )
573  : my_factory(factory), my_device( d ) {}
574 
575  template <typename... FnArgs>
576  void operator()( FnArgs&... args ) {
577  my_factory.send_data( my_device, args... );
578  }
579  private:
580  StreamFactory &my_factory;
581  device_type my_device;
582  };
583 
584  public:
585  args_storage( const kernel_type& kernel, StreamFactory &f, Args&&... args )
586  : args_storage_base( kernel, f )
587  , my_args_pack( std::forward<Args>(args)... )
588  {}
589 
590  args_storage( const args_storage &k ) : args_storage_base( k ), my_args_pack( k.my_args_pack ) {}
591 
592  args_storage( const args_storage_base &k, Args&&... args ) : args_storage_base( k ), my_args_pack( std::forward<Args>(args)... ) {}
593 
594  void enqueue( kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n ) __TBB_override {
595  // Make const qualified args_pack (from non-const)
596  const args_pack_type& const_args_pack = my_args_pack;
597  // factory.enqure_kernel() gets
598  // - 'ip' tuple elements by reference and updates it (and 'ip') with dependencies
599  // - arguments (from my_args_pack) by const-reference via const_args_pack
600  tbb::internal::call( run_kernel_func( ip, n, *this ), const_args_pack );
601 
602  if (! do_try_put( ip, op, input_sequence() ) ) {
603  graph& g = n.my_graph;
604  // No one message was passed to successors so set a callback to extend the graph lifetime until the kernel completion.
605  g.increment_wait_count();
606 
607  // factory.finalize() gets
608  // - 'ip' tuple elements by reference, so 'ip' might be changed
609  // - arguments (from my_args_pack) by const-reference via const_args_pack
610  tbb::internal::call( make_run_finalize_func(ip, this->my_factory, [&g] {
611  g.decrement_wait_count();
612  }), const_args_pack );
613  }
614  }
615 
616  void send( device_type d ) __TBB_override {
617  // factory.send() gets arguments by reference and updates these arguments with dependencies
618  // (it gets but usually ignores port_ref-s)
619  tbb::internal::call( send_func( this->my_factory, d ), my_args_pack );
620  }
621 
622  args_storage_base *clone() const __TBB_override {
623  // Create new args_storage with copying constructor.
624  return new args_storage<Args...>( *this );
625  }
626 
627  private:
630  };
631 
632  // Body for kernel_multifunction_node.
633  class kernel_body : tbb::internal::no_assign {
634  public:
635  kernel_body( const streaming_node &node ) : my_node( node ) {}
636 
637  void operator()( kernel_input_tuple ip, typename args_storage_base::output_ports_type &op ) {
638  __TBB_ASSERT( (my_node.my_args_storage != NULL), "No arguments storage" );
639  // 'ip' is passed by value to create local copy for updating inside enqueue_kernel()
640  my_node.my_args_storage->enqueue( ip, op, my_node );
641  }
642  private:
644  };
645 
647  struct wrap_to_async {
648  typedef T type; // Keep port_ref as it is
649  };
650 
651  template <typename T>
652  struct wrap_to_async<T, std::false_type> {
653  typedef typename StreamFactory::template async_msg_type< typename tbb::internal::strip<T>::type > type;
654  };
655 
656  template <typename... Args>
657  args_storage_base *make_args_storage(const args_storage_base& storage, Args&&... args) const {
658  // In this variadic template convert all simple types 'T' into 'async_msg_type<T>'
659  return new args_storage<Args...>(storage, std::forward<Args>(args)...);
660  }
661 
662  void notify_new_device( device_type d ) {
663  my_args_storage->send( d );
664  }
665 
666  template <typename ...Args>
667  void enqueue_kernel( kernel_input_tuple& ip, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
668  this->enqueue_kernel_impl( ip, factory, device, kernel, args... );
669  }
670 
671 public:
672  template <typename DeviceSelector>
673  streaming_node( graph &g, const kernel_type& kernel, DeviceSelector d, StreamFactory &f )
674  : base_type( g )
675  , my_indexer_node( g )
676  , my_device_selector( new device_selector<DeviceSelector>( d, *this, f ) )
677  , my_device_selector_node( g, serial, device_selector_body( my_device_selector ) )
678  , my_join_node( g )
679  , my_kernel_node( g, serial, kernel_body( *this ) )
680  // By default, streaming_node maps all its ports to the kernel arguments on a one-to-one basis.
681  , my_args_storage( make_args_storage( args_storage<>(kernel, f), port_ref<0, NUM_INPUTS - 1>() ) )
682  {
683  base_type::set_external_ports( get_input_ports(), get_output_ports() );
684  make_edges();
685  }
686 
688  : base_type( node.my_graph )
689  , my_indexer_node( node.my_indexer_node )
690  , my_device_selector( node.my_device_selector->clone( *this ) )
691  , my_device_selector_node( node.my_graph, serial, device_selector_body( my_device_selector ) )
692  , my_join_node( node.my_join_node )
693  , my_kernel_node( node.my_graph, serial, kernel_body( *this ) )
694  , my_args_storage( node.my_args_storage->clone() )
695  {
696  base_type::set_external_ports( get_input_ports(), get_output_ports() );
697  make_edges();
698  }
699 
701  : base_type( node.my_graph )
702  , my_indexer_node( std::move( node.my_indexer_node ) )
703  , my_device_selector( node.my_device_selector->clone(*this) )
704  , my_device_selector_node( node.my_graph, serial, device_selector_body( my_device_selector ) )
705  , my_join_node( std::move( node.my_join_node ) )
706  , my_kernel_node( node.my_graph, serial, kernel_body( *this ) )
707  , my_args_storage( node.my_args_storage )
708  {
709  base_type::set_external_ports( get_input_ports(), get_output_ports() );
710  make_edges();
711  // Set moving node mappers to NULL to prevent double deallocation.
712  node.my_args_storage = NULL;
713  }
714 
716  if ( my_args_storage ) delete my_args_storage;
717  if ( my_device_selector ) delete my_device_selector;
718  }
719 
720  template <typename... Args>
721  void set_args( Args&&... args ) {
722  // Copy the base class of args_storage and create new storage for "Args...".
723  args_storage_base * const new_args_storage = make_args_storage( *my_args_storage, typename wrap_to_async<Args>::type(std::forward<Args>(args))...);
724  delete my_args_storage;
725  my_args_storage = new_args_storage;
726  }
727 
728 protected:
729  void reset_node( reset_flags = rf_reset_protocol ) __TBB_override { __TBB_ASSERT( false, "Not implemented yet" ); }
730 
731 private:
732  indexer_node_type my_indexer_node;
733  device_selector_base *my_device_selector;
734  device_selector_node my_device_selector_node;
735  join_node<kernel_input_tuple, JP> my_join_node;
736  kernel_multifunction_node my_kernel_node;
737 
738  args_storage_base *my_args_storage;
739 };
740 
741 #endif // __TBB_PREVIEW_STREAMING_NODE
742 #endif // __TBB_flow_graph_streaming_H
internal::streaming_device_with_key< device_type, key_type > device_with_key_type
static run_finalize_func< FinalizeFn > make_run_finalize_func(kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn)
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:319
tuple< typename async_msg_type< Ports >::type... > input_tuple
bool_constant< true > true_type
Definition: tbb_stddef.h:489
run_finalize_func(kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn)
is_port_ref_impl< typename tbb::internal::strip< T >::type >::type type
run_kernel_func(kernel_input_tuple &ip, const streaming_node &node, const args_storage &storage)
bool do_try_put(const kernel_input_tuple &ip, output_ports_type &op, internal::sequence< S... >) const
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:553
internal::port_ref_impl< N1, N2 > port_ref()
StreamFactory::template async_msg_type< T > type
void enqueue(kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n) __TBB_override
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
internal::make_sequence< NUM_OUTPUTS >::type output_sequence
internal::make_sequence< NUM_INPUTS >::type input_sequence
multifunction_node< indexer_node_output_type, kernel_input_tuple > device_selector_node
void call(F &&f, Pack &&p)
Calls the given function with arguments taken from a stored_pack.
K key_from_message(const T &t)
Definition: flow_graph.h:713
internal::streaming_node_traits< JP, StreamFactory, Ports... >::kernel_input_tuple kernel_input_tuple
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert ...
Definition: tbb_stddef.h:167
StreamFactory::template async_msg_type< typename tbb::internal::strip< T >::type > type
Detects whether two given types are the same.
internal::streaming_node_traits< JP, StreamFactory, Ports... >::indexer_node_type indexer_node_type
task * do_try_put(const T &v, void *p)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark d __itt_event ITT_FORMAT __itt_group_mark d void const wchar_t const wchar_t int ITT_FORMAT __itt_group_sync __itt_group_fsync x void const wchar_t int const wchar_t int int ITT_FORMAT __itt_group_sync __itt_group_fsync x void ITT_FORMAT __itt_group_sync __itt_group_fsync p void ITT_FORMAT __itt_group_sync __itt_group_fsync p void size_t ITT_FORMAT lu no args __itt_obj_prop_t __itt_obj_state_t ITT_FORMAT d const char ITT_FORMAT s __itt_frame ITT_FORMAT p const char const char ITT_FORMAT s __itt_counter ITT_FORMAT p __itt_counter unsigned long long ITT_FORMAT lu const wchar_t ITT_FORMAT S __itt_mark_type const wchar_t ITT_FORMAT S __itt_mark_type const char ITT_FORMAT s __itt_mark_type ITT_FORMAT d __itt_caller ITT_FORMAT p __itt_caller ITT_FORMAT p no args const __itt_domain __itt_clock_domain unsigned long long __itt_id ITT_FORMAT lu const __itt_domain __itt_clock_domain unsigned long long __itt_id __itt_id void * fn
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle * key
std::unordered_map< typename std::decay< key_type >::type, epoch_desc > my_devices
void make_edge(sender< T > &p, receiver< T > &s)
Makes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3451
void operator()(const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op) __TBB_override
void send_and_put_impl(size_t &epoch, const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op)
void reset_node(reset_flags=rf_reset_protocol) __TBB_override
void enqueue_kernel(kernel_input_tuple &ip, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
args_storage(const kernel_type &kernel, StreamFactory &f, Args &&... args)
internal::streaming_node_traits< JP, StreamFactory, Ports... >::input_tuple input_tuple
tuple< streaming_device_with_key< typename StreamFactory::device_type, typename key_from_policy< JP >::type >, typename async_msg_type< Ports >::type... > kernel_input_tuple
static void doit_impl(std::false_type, F &f, Tuple &t, A1 &a1, Args1 &... args1, Args2 &... args2)
static void doit(F &f, Tuple &, Args2 &... args2)
internal::streaming_node_traits< JP, StreamFactory, Ports... >::output_tuple output_tuple
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N1, N2 >, Args1 &... args1, Args2 &... args2)
static dispatch_funcs_type create_dispatch_funcs(internal::sequence< S... >)
field of type K being used for matching.
args_storage_base * make_args_storage(const args_storage_base &storage, Args &&... args) const
bool_constant< false > false_type
Definition: tbb_stddef.h:490
base_type::input_ports_type get_input_ports(internal::sequence< S... >)
void enqueue_kernel_impl(kernel_input_tuple &, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:330
void enqueue_kernel_impl(kernel_input_tuple &ip, StreamFactory &factory, device_type device, const kernel_type &kernel, Args &... args) const
bool do_try_put(const kernel_input_tuple &ip, output_ports_type &op) const
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
#define __TBB_override
Definition: tbb_stddef.h:240
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
void ignore_return_values(Args &&...)
void operator()(const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op)
kernel_func(kernel_input_tuple &ip, const streaming_node &node, const args_storage &storage, device_type device)
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N, N >(*fn)(), Args1 &... args1, Args2 &... args2)
static void doit_impl(std::true_type x, F &f, Tuple &t, port_ref_impl< N1, N2 >(*fn)(), Args1 &... args1, Args2 &... args2)
base_type::output_ports_type get_output_ports(internal::sequence< S... >)
STL namespace.
composite_node< input_tuple, output_tuple > base_type
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
static void doit(F &f, Tuple &t, A1 &a1, Args1 &... args1, Args2 &... args2)
multifunction_node< kernel_input_tuple, output_tuple > kernel_multifunction_node
device_selector(UserFunctor uf, streaming_node &n, StreamFactory &f)
static void doit_impl(std::true_type, F &f, Tuple &t, port_ref_impl< N, N >, Args1 &... args1, Args2 &... args2)
void operator()(kernel_input_tuple ip, typename args_storage_base::output_ports_type &op)
device_selector_base * clone(streaming_node &n) const __TBB_override
streaming_node(graph &g, const kernel_type &kernel, DeviceSelector d, StreamFactory &f)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void * data
The graph class.
indexer_node< typename async_msg_type< Ports >::type... > indexer_node_type
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.