kokkos-execution 0.0.1
Loading...
Searching...
No Matches
operation_state.hpp
Go to the documentation of this file.
1#ifndef KOKKOS_EXECUTION_EXECUTION_SPACE_OPERATION_STATE_HPP
2#define KOKKOS_EXECUTION_EXECUTION_SPACE_OPERATION_STATE_HPP
3
5
12
14
15template <typename Clsr>
16concept Closure = requires(const Clsr& clsr) {
17 typename Clsr::execution_space;
18
19 { clsr.execute() } -> std::same_as<void>;
20
21 { clsr.get_policy() };
22 requires Kokkos::ExecutionPolicy<std::remove_cvref_t<decltype(clsr.get_policy())>>;
23
24 requires std::same_as<std::remove_cvref_t<decltype(clsr.get_policy().space())>, typename Clsr::execution_space>;
25};
26
35template <stdexec::receiver Rcvr, typename OpState>
37 static constexpr bool successor_handles_sync = stdexec::__is_instance_of<Rcvr, ScheduleFromReceiver>
38 || stdexec::__is_instance_of<Rcvr, SyncWaitReceiver>;
39
41 constexpr bool operator()(const OpState&) const noexcept requires(successor_handles_sync)
42 {
43 return false;
44 }
45
50 bool operator()(const OpState& opstate) const noexcept
51 requires(!successor_handles_sync && stdexec::__queryable_with<stdexec::env_of_t<Rcvr>, get_exec_t>)
52 {
53 if constexpr (
54 std::same_as<
55 std::remove_cvref_t<stdexec::__query_result_t<stdexec::env_of_t<Rcvr>, get_exec_t>>,
56 stdexec::__query_result_t<OpState, get_exec_t>
57 >) {
58 return opstate.query(get_exec).get() != get_exec(stdexec::get_env(opstate.rcvr)).get();
59 }
60 return true;
61 }
62
64 constexpr bool operator()(const OpState&) const noexcept
65 requires(!successor_handles_sync && !stdexec::__queryable_with<stdexec::env_of_t<Rcvr>, get_exec_t>)
66 {
67 return true;
68 }
69};
70
72template <typename Rcvr, typename Exec>
75 && stdexec::__queryable_with<stdexec::env_of_t<Rcvr>, stdexec::get_delegation_scheduler_t>;
76
77template <
78 stdexec::receiver Rcvr,
79 Kokkos::ExecutionSpace Exec,
81>
83
84template <stdexec::receiver Rcvr, Kokkos::ExecutionSpace Exec>
86 using receiver_concept = stdexec::receiver_t;
88
91
92 void set_value() && noexcept {
93 try {
94 event.wait();
95 opstate->storage.__destroy();
96 stdexec::set_value(std::move(opstate->rcvr));
97 } catch (...) {
98 opstate->storage.__destroy();
99 stdexec::set_error(std::move(opstate->rcvr), std::current_exception());
100 }
101 }
102};
103
104template <stdexec::receiver Rcvr, Kokkos::ExecutionSpace Exec>
105struct MayDelegateCompletionWithEvent<Rcvr, Exec, false> {
106 static constexpr auto label = Impl::dispatch_label<Exec, ": after dispatch">();
107
108 Rcvr rcvr;
109
110 template <typename OpState>
111 void delegate(OpState* const opstate) noexcept {
113 opstate->query(get_exec).get().fence(std::string(label));
114 }
115 stdexec::set_value(std::move(this->rcvr));
116 }
117};
118
119template <stdexec::receiver Rcvr, Kokkos::ExecutionSpace Exec>
120struct MayDelegateCompletionWithEvent<Rcvr, Exec, true> {
122 using opstate_t = stdexec::connect_result_t<
123 stdexec::schedule_result_t<
124 stdexec::__query_result_t<stdexec::env_of_t<Rcvr>, stdexec::get_delegation_scheduler_t>
125 >,
127 >;
128
129 Rcvr rcvr;
130 stdexec::__manual_lifetime<opstate_t> storage{};
131
132 template <typename OpState>
133 void delegate(OpState* const opstate) noexcept {
135 storage.__construct_from(
136 stdexec::connect,
137 stdexec::schedule(stdexec::get_delegation_scheduler(stdexec::get_env(this->rcvr))),
138 receiver_t{.opstate = opstate, .event = typename receiver_t::event_t{opstate->query(get_exec).get()}});
139 stdexec::start(storage.__get());
140 } else {
141 stdexec::set_value(std::move(this->rcvr));
142 }
143 }
144};
145
146template <stdexec::receiver Rcvr, Closure Clsr, Closure... Clsrs>
147requires(std::same_as<typename Clsr::execution_space, typename Clsrs::execution_space> && ...)
149 : public stdexec::__immovable
150 , public MayDelegateCompletionWithEvent<Rcvr, typename Clsr::execution_space> {
151 using execution_space = typename Clsr::execution_space;
152 using receiver_t = Rcvr;
153 using closures_t = stdexec::__tuple<Clsr, Clsrs...>;
154
156
158
159 constexpr explicit OpStateBase(Rcvr rcvr_, Clsr clsr_, Clsrs... clsrs_) noexcept(
160 std::is_nothrow_constructible_v<may_delegate_completion_with_event_t, Rcvr>
161 && std::is_nothrow_move_constructible_v<Clsr> && (std::is_nothrow_move_constructible_v<Clsrs> && ...))
162 : may_delegate_completion_with_event_t{std::move(rcvr_)}
163 , clsrs(std::move(clsr_), std::move(clsrs_)...) {
164 }
165
166 void propagate_completion_signal(stdexec::set_value_t) noexcept {
167 try {
168 stdexec::__apply([](auto&... clsr) { (clsr.execute(), ...); }, clsrs);
169 } catch (...) {
170 this->propagate_completion_signal(stdexec::set_error, std::current_exception());
171 return;
172 }
173
174 may_delegate_completion_with_event_t::delegate(this);
175 }
176
177 template <typename Error>
178 void propagate_completion_signal(stdexec::set_error_t, Error&& error) noexcept {
179 stdexec::set_error(std::move(this->rcvr), std::forward<Error>(error));
180 }
181
182 void propagate_completion_signal(stdexec::set_stopped_t) noexcept {
183 stdexec::set_stopped(std::move(this->rcvr));
184 }
185
187 [[nodiscard]]
188 constexpr auto query(get_exec_t) const noexcept -> ExecutionSpaceRef<execution_space> {
189 return ExecutionSpaceRef<execution_space>{stdexec::__get<0>(clsrs).get_policy().space()};
190 }
191
193};
194
195template <typename ParentOp>
197 using receiver_concept = stdexec::receiver_t;
198
199 ParentOp* parent_op;
200
201 void set_value() && noexcept {
202 parent_op->propagate_completion_signal(stdexec::set_value);
203 }
204
205 template <typename Error>
206 void set_error(Error&& error) && noexcept {
207 parent_op->propagate_completion_signal(stdexec::set_error, std::forward<Error>(error));
208 }
209
210 void set_stopped() && noexcept {
211 parent_op->propagate_completion_signal(stdexec::set_stopped);
212 }
213
215 typename ParentOp::execution_space,
216 parent_op->query(get_exec).get(),
217 typename ParentOp::receiver_t,
219};
220
221template <stdexec::sender Sndr, stdexec::receiver Rcvr, Closure... Clsrs>
223struct OpState : public OpStateBase<Rcvr, Clsrs...> {
224 using operation_state_concept = stdexec::operation_state_t;
225
226 using base_t = OpStateBase<Rcvr, Clsrs...>;
227
228 using inner_opstate_t = stdexec::connect_result_t<Sndr, OpStateReceiver<base_t>>;
229
231 std::is_nothrow_constructible_v<base_t, Rcvr&&, Clsrs&&...>;
232
234 stdexec::__nothrow_connectable<Sndr&&, OpStateReceiver<base_t>>;
235
237
238 constexpr explicit OpState(
239 Sndr&& sndr, // NOLINT(cppcoreguidelines-rvalue-reference-param-not-moved)
240 Rcvr rcvr_,
242 : base_t(std::move(rcvr_), std::move(clsrs_)...)
243 , inner_opstate(stdexec::connect(std::forward<Sndr>(sndr), OpStateReceiver<base_t>{this})) {
244 }
245
246 void start() & noexcept {
247 stdexec::start(inner_opstate);
248 }
249};
250
251template <typename Sndr, typename Rcvr, typename... Clsrs>
253 using type = OpState<Sndr, Rcvr, Clsrs...>;
254
255 // NOLINTNEXTLINE(cppcoreguidelines-rvalue-reference-param-not-moved)
256 constexpr auto operator()(Sndr&& sndr, Rcvr rcvr, Clsrs... clsrs) const
257 noexcept(std::is_nothrow_constructible_v<type, Sndr&&, Rcvr&&, Clsrs&&...>) -> type {
258 return type(std::forward<Sndr>(sndr), std::move(rcvr), std::move(clsrs)...);
259 }
260};
261
262template <Impl::dispatching_sender Sndr, typename Rcvr, typename... Clsrs>
263struct MakeOpStateFn<Sndr, Rcvr, Clsrs...> {
264 using child_of_sndr_t = stdexec::__child_of<Sndr>;
265 using clsr_of_sndr_t = typename stdexec::transform_sender_result_t<Sndr, stdexec::env_of_t<Rcvr>>::closure_t;
266
269
270 static constexpr bool sndr_has_nothrow_transform_sender = stdexec::__detail::__has_nothrow_transform_sender<
272 stdexec::set_value_t,
273 Sndr&&,
274 stdexec::env_of_t<Rcvr>
275 >;
276
277 static constexpr bool is_nothrow_make_opstate =
278 std::is_nothrow_invocable_v<make_opstate_fn_t, child_of_sndr_t&&, Rcvr&&, clsr_of_sndr_t&&, Clsrs&&...>;
279
286 // NOLINTNEXTLINE(cppcoreguidelines-rvalue-reference-param-not-moved)
287 constexpr auto operator()(Sndr&& sndr, Rcvr&& rcvr, Clsrs... clsrs) const
289 auto trnsfrmd_sndr = stdexec::transform_sender(std::forward<Sndr>(sndr), stdexec::get_env(rcvr));
290 return make_opstate_fn_t{}(
291 stdexec::__forward_like<Sndr>(trnsfrmd_sndr.sndr),
292 std::forward<Rcvr>(rcvr),
293 std::move(trnsfrmd_sndr.clsr),
294 std::move(clsrs)...);
295 }
296};
297
298template <typename Sndr, typename Rcvr, typename... Clsrs>
299using opstate_t = typename MakeOpStateFn<Sndr, Rcvr, Clsrs...>::type;
300
301} // namespace Kokkos::Execution::ExecutionSpaceImpl
302
303#endif // KOKKOS_EXECUTION_EXECUTION_SPACE_OPERATION_STATE_HPP
The execution space supports events and the receiver is queryable for a delegation scheduler.
Concept that constrains the type of a sender that dispatches a functor for execution.
#define KOKKOS_EXECUTION_FORWARDING_GET_ENV(_type_, _obj_)
Retrieve the environment of _obj_ (with forwarding). // NOLINTNEXTLINE(cppcoreguidelines-macro-usage)...
Definition env.hpp:7
#define KOKKOS_EXECUTION_UPSERT_EXEC(_exec_type_, _exec_, _rcvr_type_, _rcvr_)
Definition get_exec.hpp:66
typename MakeOpStateFn< Sndr, Rcvr, Clsrs... >::type opstate_t
consteval std::string_view dispatch_label() noexcept
View the dispatch label as a std::string_view.
Wrap a Kokkos execution space to make it cheap to copy/move in new environments.
Definition get_exec.hpp:31
typename stdexec::transform_sender_result_t< Sndr, stdexec::env_of_t< Rcvr > >::closure_t clsr_of_sndr_t
constexpr auto operator()(Sndr &&sndr, Rcvr &&rcvr, Clsrs... clsrs) const noexcept(sndr_has_nothrow_transform_sender &&is_nothrow_make_opstate) -> type
MakeOpStateFn< child_of_sndr_t, Rcvr, clsr_of_sndr_t, Clsrs... > make_opstate_fn_t
constexpr auto operator()(Sndr &&sndr, Rcvr rcvr, Clsrs... clsrs) const noexcept(std::is_nothrow_constructible_v< type, Sndr &&, Rcvr &&, Clsrs &&... >) -> type
stdexec::connect_result_t< stdexec::schedule_result_t< stdexec::__query_result_t< stdexec::env_of_t< Rcvr >, stdexec::get_delegation_scheduler_t > >, receiver_t > opstate_t
constexpr OpStateBase(Rcvr rcvr_, Clsr clsr_, Clsrs... clsrs_) noexcept(std::is_nothrow_constructible_v< may_delegate_completion_with_event_t, Rcvr > &&std::is_nothrow_move_constructible_v< Clsr > &&(std::is_nothrow_move_constructible_v< Clsrs > &&...))
void propagate_completion_signal(stdexec::set_error_t, Error &&error) noexcept
MayDelegateCompletionWithEvent< Rcvr, execution_space > may_delegate_completion_with_event_t
constexpr auto query(get_exec_t) const noexcept -> ExecutionSpaceRef< execution_space >
void propagate_completion_signal(stdexec::set_value_t) noexcept
void propagate_completion_signal(stdexec::set_stopped_t) noexcept
stdexec::connect_result_t< Sndr, OpStateReceiver< base_t > > inner_opstate_t
constexpr OpState(Sndr &&sndr, Rcvr rcvr_, Clsrs... clsrs_) noexcept(opstate_base_is_nothrow_constructible &&inner_opstate_is_nothrow_constructible)
Synchronization at the boundary of the work enqueued on an execution space.
bool operator()(const OpState &opstate) const noexcept
constexpr bool operator()(const OpState &) const noexcept
The synchronization will be handled by the successor.
constexpr bool operator()(const OpState &) const noexcept
As a fallback, synchronization is always required.
MayDelegateCompletionWithEvent< Rcvr, Exec > * opstate
An event that can be recorded on an execution space instance.
Definition event.hpp:34