kokkos-execution 0.0.1
Loading...
Searching...
No Matches
completion_signal.hpp
Go to the documentation of this file.
1#ifndef KOKKOS_EXECUTION_IMPL_COMPLETION_SIGNAL_HPP
2#define KOKKOS_EXECUTION_IMPL_COMPLETION_SIGNAL_HPP
3
5
6#if defined(KOKKOS_EXECUTION_ENABLE_DEBUG_LOGGING)
7# include "plog/Log.h"
8#endif
9
11
18
20
28template <Kokkos::ExecutionSpace Exec, stdexec::receiver Rcvr>
31 bool operator()(const Exec& exec, const Rcvr& rcvr) const noexcept
32 requires(stdexec::__queryable_with<stdexec::env_of_t<Rcvr>, get_exec_t>)
33 {
34 if constexpr (std::same_as<exec_of_t<stdexec::env_of_t<Rcvr>>, Exec>) {
35 const auto& src = exec;
36 const auto& dst = get_exec(stdexec::get_env(rcvr)).get();
37#if defined(KOKKOS_EXECUTION_ENABLE_DEBUG_LOGGING)
38 PLOG_DEBUG << "The synchronization happens if " << Kokkos::Tools::Experimental::device_id(src)
39 << " is not equal to " << Kokkos::Tools::Experimental::device_id(dst) << '.';
40#endif
41 return src != dst;
42 }
43 return true;
44 }
45
47 constexpr bool operator()(const Exec&, const Rcvr&) const noexcept
48 requires(!stdexec::__queryable_with<stdexec::env_of_t<Rcvr>, get_exec_t>)
49 {
50#if defined(KOKKOS_EXECUTION_ENABLE_DEBUG_LOGGING)
51 PLOG_DEBUG << "Synchronization always required.";
52#endif
53 return true;
54 }
55};
56
57template <typename Policy, Kokkos::ExecutionSpace Exec, stdexec::receiver Rcvr>
59
60struct SyncPolicyTag { };
61
73
74template <typename Policy>
75concept sync_policy = std::derived_from<Policy, SyncPolicyTag>;
76
82template <Kokkos::ExecutionSpace Exec, stdexec::receiver Rcvr>
83struct CompletionSignal<SyncPolicy::InlineFenceExec, Exec, Rcvr> {
84 static constexpr auto label = Impl::dispatch_label<Exec, ": after dispatch">();
85
86 Rcvr rcvr;
87
88 void propagate(const Exec& exec) & noexcept {
89 if (!RequiresSync<Exec, Rcvr>{}(exec, rcvr)) {
90 stdexec::set_value(std::move(rcvr));
91 } else {
92 try {
93 exec.fence(std::string(label));
94 stdexec::set_value(std::move(rcvr));
95 } catch (...) {
96 stdexec::set_error(std::move(rcvr), std::current_exception());
97 }
98 }
99 }
100};
101
102template <Kokkos::ExecutionSpace Exec, stdexec::receiver Rcvr>
104 using receiver_concept = stdexec::receiver_tag;
105
107
108 void set_value() && noexcept {
109 try {
111 stdexec::set_value(std::move(completion_signal->rcvr));
112 } catch (...) {
113 stdexec::set_error(std::move(completion_signal->rcvr), std::current_exception());
114 }
115 }
116};
117
124template <Kokkos::ExecutionSpace Exec, stdexec::receiver Rcvr>
125struct CompletionSignal<SyncPolicy::ScheduleWaitEvent, Exec, Rcvr> {
128 using inner_opstate_t = stdexec::connect_result_t<
129 stdexec::schedule_result_t<delegation_scheduler_t>,
131 >;
132
133 Rcvr rcvr;
134 event_storage_t event = std::nullopt;
136
137 void propagate(const Exec& exec) & noexcept {
138 if (!RequiresSync<Exec, Rcvr>{}(exec, rcvr)) {
139 stdexec::set_value(std::move(rcvr));
140 } else {
141 try {
142 event.emplace();
143 record(*event, exec);
144 inner_opstate.emplace_from(
145 stdexec::connect,
146 stdexec::schedule(stdexec::get_delegation_scheduler(stdexec::get_env(rcvr))),
148 stdexec::start(inner_opstate.get());
149 } catch (...) {
150 stdexec::set_error(std::move(rcvr), std::current_exception());
151 }
152 }
153 }
154};
155
157
169
170template <typename Policy>
171concept submitted_policy = std::derived_from<Policy, SubmittedPolicyTag>;
172
173template <Kokkos::ExecutionSpace Exec, stdexec::receiver Rcvr>
174struct CompletionSignal<SubmittedPolicy::OrderOnExec, Exec, Rcvr> {
175 Rcvr rcvr;
176
177 void propagate(const Exec&) & noexcept {
178 std::move(rcvr).submitted();
179 }
180};
181
188template <Kokkos::ExecutionSpace Exec, stdexec::receiver Rcvr>
189struct CompletionSignal<SubmittedPolicy::DependOnEvent, Exec, Rcvr> {
191
192 Rcvr rcvr;
194
195 void propagate(const Exec& exec) & noexcept {
196 if (RequiresSync<Exec, Rcvr>{}(exec, rcvr)) {
197 auto& event = event_storage.emplace();
198 Impl::record(event, exec);
199 std::move(rcvr).submitted(OptionalConstEventRef{event});
200 } else {
201 std::move(rcvr).submitted(OptionalConstEventRef<Exec>{});
202 }
203 }
204};
205
206} // namespace Kokkos::Execution::Impl
207
208#endif // KOKKOS_EXECUTION_IMPL_COMPLETION_SIGNAL_HPP
Data structure that has an interface similar to std::optional but stores the value with stdexec::__ma...
Constrain an EventType type to be a valid event type for Exec execution space type.
Definition event.hpp:26
OptionalRef< const Event< Exec > > OptionalConstEventRef
Optionally stores a reference to a const Impl::Event.
Definition event.hpp:178
std::optional< Event< Exec > > event_storage_t
Definition event.hpp:174
stdexec::__query_result_t< const Env &, stdexec::get_delegation_scheduler_t > delegation_scheduler_of_t
Definition schedulers.hpp:9
void record(Event< Exec > &event, const Exec &exec)
Record event on exec.
Definition event.hpp:140
void wait(const Event< Exec > &... events)
Wait for events to complete.
Definition event.hpp:152
constexpr get_exec_t get_exec
Definition get_exec.hpp:19
consteval std::string_view dispatch_label() noexcept
View the dispatch label as a std::string_view.
stdexec::connect_result_t< stdexec::schedule_result_t< delegation_scheduler_t >, ScheduleWaitEventReceiver< Exec, Rcvr > > inner_opstate_t
Impl::delegation_scheduler_of_t< stdexec::env_of_t< Rcvr > > delegation_scheduler_t
constexpr bool operator()(const Exec &, const Rcvr &) const noexcept
bool operator()(const Exec &exec, const Rcvr &rcvr) const noexcept
CompletionSignal< SyncPolicy::ScheduleWaitEvent, Exec, Rcvr > * completion_signal
Under a sync policy, a terminal completion is propagated: in-flight operations must complete.