kokkos-execution 0.0.1
Loading...
Searching...
No Matches
test_fork_join.cpp
Go to the documentation of this file.
2PRAGMA_DIAGNOSTIC_PUSH
4#include "exec/fork_join.hpp"
5#include "exec/static_thread_pool.hpp"
6PRAGMA_DIAGNOSTIC_POP
7
10
12
20
32
34
35using namespace Kokkos::utils::callbacks;
36
49
51TEST_F(ForkJoinTest, diamond) {
52 const view_s_t data(Kokkos::view_alloc("data - shared space"));
53
54 experimental::execution::static_thread_pool pool{4};
55 const context_t esc{exec};
56
57 auto sndr =
58 stdexec::schedule(pool.get_scheduler())
59 | stdexec::then(Tests::Utils::Functors::LoadCheckAdd<int, false>{.prev = 0, .value = 4, .data = data.data()})
60 | experimental::execution::fork_join(
61 stdexec::continues_on(esc.get_scheduler())
63 | stdexec::continues_on(pool.get_scheduler()),
64 stdexec::continues_on(pool.get_scheduler()) | THEN_INCREMENT_ATOMIC(data))
65 | stdexec::continues_on(esc.get_scheduler())
66 | stdexec::then(
67 Tests::Utils::Functors::LoadCheckAdd<int, on_device>{.prev = 6, .value = 3, .data = data.data()})
68 | stdexec::continues_on(stdexec::inline_scheduler{})
69 | stdexec::then(Tests::Utils::Functors::LoadCheckAdd<int, false>{.prev = 9, .value = 5, .data = data.data()});
70
71 ASSERT_EQ(data(), 0) << "Eager execution is not allowed.";
72
74
75 ASSERT_THAT(
77 testing::ElementsAre(
81 MATCHER_FOR_BEGIN_FENCE(exec, dispatch_label(exec, "schedule_from"))));
82
83 ASSERT_EQ(data(), 14);
84}
85
91TEST_F(ForkJoinTest, continues_on) {
92 const view_s_t data(Kokkos::view_alloc(exec, "data - shared space"));
93
94 const context_t esc{exec};
95
96 auto sndr =
97 stdexec::just() | stdexec::continues_on(esc.get_scheduler())
98 | experimental::execution::fork_join(
99 stdexec::continues_on(esc.get_scheduler())
100 | stdexec::then(
101 Tests::Utils::Functors::LoadCheckAdd<int, on_device>{.prev = 0, .value = 3, .data = data.data()}));
102
103 static_assert(stdexec::__sender_for<decltype(sndr), experimental::execution::fork_join_t>);
104
105 ASSERT_EQ(data(), 0) << "Eager execution is not allowed.";
106
107 const auto recorded_events = Tests::Utils::record_sync_wait<recorder_listener_t>(
108 std::move(sndr)); // NOLINT(performance-move-const-arg)
109
110 ASSERT_THAT(recorded_events, [&]() {
112 return testing::ElementsAre(
115 MATCHER_FOR_WAIT_EVENT(recorded_events.at(1)));
116 } else {
117 return testing::ElementsAre(
119 MATCHER_FOR_BEGIN_FENCE(exec, dispatch_label(exec, "after dispatch")));
120 }
121 }());
122
123 ASSERT_EQ(data(), 3);
124}
125
131TEST_F(ForkJoinTest, continues_on_bulk) {
132 const view_s_t data(Kokkos::view_alloc(exec, "data - shared space"));
133
134 const context_t esc{exec};
135
136 auto sndr =
137 stdexec::just() | stdexec::continues_on(esc.get_scheduler()) | BULK_SUM_INDICES(2, data)
138 | experimental::execution::fork_join(
139 stdexec::continues_on(esc.get_scheduler())
140 | stdexec::then(
141 Tests::Utils::Functors::LoadCheckAdd<int, on_device>{.prev = 1, .value = 2, .data = data.data()}));
142
143 ASSERT_EQ(data(), 0) << "Eager execution is not allowed.";
144
145 const auto recorded_events = Tests::Utils::record_sync_wait<recorder_listener_t>(std::move(sndr));
146
147 ASSERT_THAT(recorded_events, [&]() {
149 return testing::ElementsAre(
152 MATCHER_FOR_WAIT_EVENT(recorded_events.at(1)),
155 MATCHER_FOR_WAIT_EVENT(recorded_events.at(4)));
156 } else {
157 return testing::ElementsAre(
159 MATCHER_FOR_BEGIN_FENCE(exec, dispatch_label(exec, "after dispatch")),
161 MATCHER_FOR_BEGIN_FENCE(exec, dispatch_label(exec, "after dispatch")));
162 }
163 }());
164
165 ASSERT_EQ(data(), 3);
166}
167
168} // namespace Tests::ExecutionSpaceImpl
constexpr std::string dispatch_label(const Exec &, Label &&label)
Get the dispatch label from Exec and label.
#define MATCHER_FOR_WAIT_EVENT(_record_event_variant_)
#define MATCHER_FOR_BEGIN_PFOR(_exec_, _label_)
#define MATCHER_FOR_RECORD_EVENT(_exec_)
#define MATCHER_FOR_BEGIN_FENCE(_exec_, _label_)
RecorderListener< EventDiscardMatcher< TEST_EXECUTION_SPACE >, BeginFenceEvent, BeginParallelForEvent, Kokkos::Execution::Impl::RecordEvent, Kokkos::Execution::Impl::WaitEvent > recorder_listener_t
#define KOKKOS_EXECUTION_THREADS_THROWS_ON_SYNC_WAIT_ASSERT_AND_SKIP(_sndr_)
Definition context.hpp:69
#define KOKKOS_EXECUTION_STDEXEC_PRAGMA_DIAGNOSTIC_IGNORED
Basic list of ignored diagnostics when including anything from stdexec.
#define THEN_INCREMENT_ATOMIC(_data_)
Same as THEN_INCREMENT, using Tests::Utils::atomic_add. // NOLINTNEXTLINE(cppcoreguidelines-macro-usa...
Definition increment.hpp:39
constexpr check_scheduler_type_t< Tag, Schd > check_scheduler_type
auto record_sync_wait(Sndr &&sndr)
Definition sync_wait.hpp:14
Matcher to filter out events that are just noise for tests.
auto get_scheduler() const noexcept -> ExecutionSpaceImpl::Scheduler< Exec >
Event to be sent to Kokkos::utils::callbacks::dispatch when calling record.
Definition event.hpp:52
Event to be sent to Kokkos::utils::callbacks::dispatch when calling wait.
Definition event.hpp:73
Kokkos::Execution::ExecutionSpaceContext< Exec > context_t
Definition context.hpp:27
Load the value at data and check it is equal to prev. Then, add value to it.
#define BULK_SUM_INDICES(_size_, _data_)
Add a bulk using Tests::Utils::Functors::SumIndices. // NOLINTNEXTLINE(cppcoreguidelines-macro-usage)...