kokkos-execution 0.0.1
Loading...
Searching...
No Matches
test_parallel_for.cpp
Go to the documentation of this file.
1#include <bit>
2
5
7
18
29
31
32using namespace Kokkos::utils::callbacks;
33
47
52template <template <typename...> class SndrAdptr, bool IsDispatchingSender, typename... Args>
53consteval bool test_sndr_traits() {
55 using schd_sndr_t = typename ParallelForTest::schedule_sender_t;
56
58 using label_t = std::string;
60 using policy_t = Kokkos::RangePolicy<TEST_EXECUTION_SPACE>;
61 using pfor_sndr_t = SndrAdptr<Args..., schd_sndr_t, label_t, functor_t, policy_t>;
62
65 static_assert(std::same_as<Kokkos::Execution::Impl::exec_of_t<pfor_sndr_t>, TEST_EXECUTION_SPACE>);
66
68 static_assert(Kokkos::Execution::Impl::dispatching_sender<pfor_sndr_t> == IsDispatchingSender);
69
71 using completion_signatures_t = stdexec::__completion_signatures_of_t<pfor_sndr_t, stdexec::env<>>;
72
73 static_assert(stdexec::__mset_eq<
74 stdexec::__mset<stdexec::set_value_t(), stdexec::set_error_t(std::exception_ptr)>,
75 completion_signatures_t
76 >);
77
79 static_assert(std::same_as<
80 stdexec::__completion_domain_of_t<stdexec::set_value_t, pfor_sndr_t, stdexec::env<>>,
82 >);
83
85 static_assert(std::same_as<
88 >);
89
91 static_assert(stdexec::sender_to<pfor_sndr_t, Tests::Utils::SinkReceiver>);
92
93 static_assert(std::same_as<
94 stdexec::transform_sender_result_t<pfor_sndr_t, stdexec::env_of_t<Tests::Utils::SinkReceiver>>,
97 schd_sndr_t,
98 label_t,
99 functor_t,
100 policy_t
101 >
102 >);
103
105 static_assert(stdexec::__nothrow_connectable<pfor_sndr_t, Tests::Utils::SinkReceiver>);
106
107 return true;
108}
110static_assert(test_sndr_traits<
112 false,
114>());
115
117consteval bool test_sndr_decomposition() {
119 using schd_sndr_t = typename ParallelForTest::schedule_sender_t;
120
122 using label_t = std::string;
124 using policy_t = Kokkos::RangePolicy<TEST_EXECUTION_SPACE>;
126
128 static_assert(stdexec::__sender_for<pfor_sndr_t, Kokkos::Execution::parallel_for_t>);
129
130 static_assert(std::same_as<
131 stdexec::__data_of<pfor_sndr_t>,
133 >);
134
135 static_assert(stdexec::__nbr_children_of<pfor_sndr_t> == 1);
136 static_assert(std::same_as<stdexec::__child_of<pfor_sndr_t>, schd_sndr_t>);
137
139 static_assert(stdexec::__applicable<
141 pfor_sndr_t,
142 const stdexec::env<>&
143 >);
144
145 return true;
146}
147static_assert(test_sndr_decomposition());
148
150template <typename ViewType>
151consteval bool test_closure_traits() {
153 using policy_t = Kokkos::RangePolicy<TEST_EXECUTION_SPACE>;
155
158
159 static_assert(std::is_nothrow_move_constructible_v<closure_t>);
160
161 return true;
162}
164static_assert(test_closure_traits<std::span<int>>());
165
170 Kokkos::RangePolicy<TEST_EXECUTION_SPACE>,
172>());
173
175TEST_F(ParallelForTest, team_policy) {
176 constexpr int size = 32;
177
178 const auto [num_teams, team_size] = [&]() {
180#if defined(KOKKOS_ENABLE_HPX)
181 if constexpr (std::same_as<TEST_EXECUTION_SPACE, Kokkos::Experimental::HPX>) {
182 return std::make_tuple(size, 1);
183 }
184#endif
185 const int team_size_ = std::bit_floor(static_cast<unsigned short>(std::min(exec.concurrency(), size / 2)));
186 return std::make_tuple(size / team_size_, team_size_);
187 }();
188
189 ASSERT_EQ(team_size * num_teams, size);
190
191 const view_s_t witness(Kokkos::view_alloc(exec, "data - shared space"));
192
193 const context_t esc{exec};
194
195 auto chain = stdexec::schedule(esc.get_scheduler())
197 "hello from pfor",
198 Kokkos::TeamPolicy<TEST_EXECUTION_SPACE>(num_teams, team_size),
199 Tests::Utils::Functors::SumIndices{.data = witness});
200
201 stdexec::sync_wait(std::move(chain));
202
203 ASSERT_EQ(witness(), size / 2 * (size - 1));
204}
205
206template <typename ViewType, Kokkos::ExecutionSpace Exec>
208 const size_t size,
209 const ViewType& witness,
210 const Kokkos::Execution::ExecutionSpaceContext<Exec>& esc) -> stdexec::sender auto {
211 auto chain = stdexec::schedule(esc.get_scheduler())
213 "passing label, execution policy and functor",
214 Kokkos::RangePolicy<Exec>(0, size),
215 Tests::Utils::Functors::SumIndices{.data = witness})
217 Kokkos::RangePolicy<Exec>(0, size), Tests::Utils::Functors::SumIndices{.data = witness});
218
219 if constexpr (std::same_as<Exec, Kokkos::DefaultExecutionSpace>) {
220 return std::move(chain)
222 "passing label, work count and functor", size, Tests::Utils::Functors::SumIndices{.data = witness})
224 } else {
225 return chain;
226 }
227}
228
231 constexpr size_t size = 10;
232
233 const view_s_t witness(Kokkos::view_alloc(exec, "data - shared space"));
234
235 const auto recorded_events = Tests::Utils::record_sync_wait<recorder_listener_t>(
237
238 unsigned short int ievent = 0;
239
240 ASSERT_GE(recorded_events.size(), 3);
241
243
244 ASSERT_THAT(
245 recorded_events,
246 ElementAt<variant_t>(ievent++, MATCHER_FOR_BEGIN_PFOR(exec, "passing label, execution policy and functor")));
247 ASSERT_THAT(
248 recorded_events,
249 ElementAt<variant_t>(ievent++, MATCHER_FOR_BEGIN_PFOR(exec, Kokkos::Impl::TypeInfo<functor_t>::name())));
250
251 if constexpr (std::same_as<TEST_EXECUTION_SPACE, Kokkos::DefaultExecutionSpace>) {
252 ASSERT_THAT(
253 recorded_events.at(ievent++), MATCHER_FOR_BEGIN_PFOR(exec, "passing label, work count and functor"));
254 ASSERT_THAT(
255 recorded_events.at(ievent++), MATCHER_FOR_BEGIN_PFOR(exec, Kokkos::Impl::TypeInfo<functor_t>::name()));
256 }
257
258 ASSERT_THAT(recorded_events.at(ievent), MATCHER_FOR_BEGIN_FENCE(exec, dispatch_label(exec, "sync_wait")));
259
260 ASSERT_EQ(witness(), ievent * size / 2 * (size - 1));
261}
262
264TEST_F(ParallelForTest, two_parallel_regions) {
265 constexpr size_t size = 10;
266
267 const view_s_t witness(Kokkos::view_alloc(exec, "data - shared space"));
268
269 const context_t esc{exec};
270
271 auto chain = stdexec::schedule(esc.get_scheduler())
273 std::format("{}: hello from pfor", Kokkos::Impl::TypeInfo<TEST_EXECUTION_SPACE>::name()),
274 Kokkos::RangePolicy<TEST_EXECUTION_SPACE>(0, size),
275 Tests::Utils::Functors::SumIndices{.data = witness})
276 | stdexec::then(
278 .prev = size / 2 * (size - 1), .value = 4, .data = witness.data()})
280 std::format("{}: hello again from pfor", Kokkos::Impl::TypeInfo<TEST_EXECUTION_SPACE>::name()),
281 Kokkos::RangePolicy<TEST_EXECUTION_SPACE>(0, 2 * size),
282 Tests::Utils::Functors::SumIndices{.data = witness});
283
284 ASSERT_THAT(
286 testing::ElementsAre(
287 MATCHER_FOR_BEGIN_PFOR(exec, dispatch_label(exec, "hello from pfor")),
289 MATCHER_FOR_BEGIN_PFOR(exec, dispatch_label(exec, "hello again from pfor")),
291
292 ASSERT_EQ(witness(), size / 2 * (size - 1) + 4 + 2 * size * (2 * size - 1) / 2);
293}
294
296TEST_F(ParallelForTest, starts_on_parallel_region) {
297 constexpr size_t size = 10;
298
299 const view_s_t witness(Kokkos::view_alloc(exec, "data - shared space"));
300
301 auto sndr = stdexec::just()
303 std::format("{}: hello from pfor", Kokkos::Impl::TypeInfo<TEST_EXECUTION_SPACE>::name()),
304 Kokkos::RangePolicy<TEST_EXECUTION_SPACE>(0, size),
305 Tests::Utils::Functors::SumIndices{.data = witness});
306
307 const context_t esc{exec};
308 auto starts_on = stdexec::starts_on(esc.get_scheduler(), std::move(sndr));
309
316 static_assert(stdexec::__is_instance_of<
317 stdexec::transform_sender_result_t<
318 decltype(starts_on),
319 stdexec::env_of_t<Kokkos::Execution::Impl::SyncWait::Receiver<TEST_EXECUTION_SPACE>>
320 >,
321 stdexec::__seq::__sndr
322 >);
323
324 const auto recorded_events = Tests::Utils::record_sync_wait<recorder_listener_t>(std::move(starts_on));
325
326 ASSERT_THAT(recorded_events, [&]() {
328 return testing::ElementsAre(
329 MATCHER_FOR_BEGIN_PFOR(exec, dispatch_label(exec, "hello from pfor")),
331 MATCHER_FOR_WAIT_EVENT(recorded_events.at(1)));
332 } else {
333 return testing::ElementsAre(
334 MATCHER_FOR_BEGIN_PFOR(exec, dispatch_label(exec, "hello from pfor")),
335 MATCHER_FOR_BEGIN_FENCE(exec, dispatch_label(exec, "after dispatch")));
336 }
337 }());
338
339 ASSERT_EQ(witness(), size / 2 * (size - 1));
340}
341
343TEST_F(ParallelForTest, forwarding_env) {
344 constexpr size_t size = 10;
345
346 const view_s_t data(Kokkos::view_alloc(exec, "data - shared space"));
347
348 std::atomic<size_t> count = 0;
349
350 int value;
351
352 const context_t esc{exec};
353
354 stdexec::sender auto sndr =
355 stdexec::read_env(stdexec::get_allocator)
356 | stdexec::then([&value](auto allocator) { value = Tests::Utils::round_trip_allocate(allocator, 42); })
357 | stdexec::continues_on(esc.get_scheduler())
360 "my pfor",
361 Kokkos::RangePolicy<TEST_EXECUTION_SPACE>(0, size),
363 | stdexec::write_env(stdexec::prop{stdexec::get_allocator, Tests::Utils::TrackingAllocator<int>{&count}});
364
365 ASSERT_EQ(data(), 0) << "Eager execution is not allowed.";
366
367 const auto recorded_events = Tests::Utils::record_sync_wait<recorder_listener_t>(std::move(sndr));
368
369 ASSERT_THAT(recorded_events, [&]() {
371 return testing::ElementsAre(
372 MATCHER_FOR_BEGIN_PFOR(exec, "my pfor"),
374 MATCHER_FOR_WAIT_EVENT(recorded_events.at(1)));
375 } else {
376 return testing::ElementsAre(
377 MATCHER_FOR_BEGIN_PFOR(exec, "my pfor"),
378 MATCHER_FOR_BEGIN_FENCE(exec, dispatch_label(exec, "after dispatch")));
379 }
380 }());
381
382 ASSERT_EQ(data(), size / 2 * (size - 1));
383
384 ASSERT_EQ(value, 42);
385 ASSERT_EQ(count, 1);
386}
387
388} // namespace Tests::ExecutionSpaceImpl
constexpr std::string dispatch_label(const Exec &, Label &&label)
Get the dispatch label from Exec and label.
#define MATCHER_FOR_WAIT_EVENT(_record_event_variant_)
#define MATCHER_FOR_BEGIN_PFOR(_exec_, _label_)
#define MATCHER_FOR_RECORD_EVENT(_exec_)
#define MATCHER_FOR_BEGIN_FENCE(_exec_, _label_)
RecorderListener< EventDiscardMatcher< TEST_EXECUTION_SPACE >, BeginFenceEvent, BeginParallelForEvent, Kokkos::Execution::Impl::RecordEvent, Kokkos::Execution::Impl::WaitEvent > recorder_listener_t
typename recorder_listener_t::event_variant_t variant_t
Concept for a sender whose completion scheduler is Kokkos::Execution::ExecutionSpaceImpl::Scheduler.
Concept that constrains the type of a sender that dispatches a functor for execution.
std::invoke_result_t< stdexec::get_completion_scheduler_t< Tag >, stdexec::env_of_t< Sndr >, Env... > completion_scheduler_of_t
Retrieve the completion scheduler for a given completion tag.
constexpr parallel_for_t parallel_for
auto ElementAt(const size_t index, ElementMatcher &&matcher)
consteval bool test_sndr_traits()
Definition test_bulk.cpp:49
auto closure_object_creation_overloads(const size_t size, const ViewType &witness, const Kokkos::Execution::ExecutionSpaceContext< Exec > &esc) -> stdexec::sender auto
consteval bool test_sndr_decomposition()
auto record_sync_wait(Sndr &&sndr)
Definition sync_wait.hpp:14
consteval bool check_continues_on_after_just_stopped()
constexpr check_rcvr_env_queryable_with_t< true, Queries... > check_rcvr_env_queryable_with
auto round_trip_allocate(Allocator &allocator, T &&value)
Matcher to filter out events that are just noise for tests.
Execution context using a Kokkos execution space under the hood.
auto get_scheduler() const noexcept -> ExecutionSpaceImpl::Scheduler< Exec >
Scheduler for a Kokkos execution space.
Event to be sent to Kokkos::utils::callbacks::dispatch when calling record.
Definition event.hpp:52
Event to be sent to Kokkos::utils::callbacks::dispatch when calling wait.
Definition event.hpp:73
Custom algorithm for the Kokkos::parallel_for construct.
decltype(std::declval< const context_t >().get_scheduler()) scheduler_t
Definition context.hpp:28
decltype(stdexec::schedule(std::declval< scheduler_t >())) schedule_sender_t
Definition context.hpp:29
Kokkos::Execution::ExecutionSpaceContext< Exec > context_t
Definition context.hpp:27
Load the value at data and check it is equal to prev. Then, add value to it.
A minimal tracking allocator.