diff --git a/bldsys/cmake/test_helpers.cmake b/bldsys/cmake/test_helpers.cmake index 53f1742219..61956e05d4 100644 --- a/bldsys/cmake/test_helpers.cmake +++ b/bldsys/cmake/test_helpers.cmake @@ -27,7 +27,7 @@ set_property(TARGET vkb_tests PROPERTY FOLDER "tests") function(vkb__register_tests) - set(options) + set(options) set(oneValueArgs NAME) set(multiValueArgs SRC LIBS) @@ -35,7 +35,7 @@ function(vkb__register_tests) return() # testing not enabled endif() - cmake_parse_arguments(TARGET "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) + cmake_parse_arguments(TARGET "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) if (TARGET_NAME STREQUAL "") message(FATAL_ERROR "NAME must be defined in vkb__register_tests") diff --git a/components/events/CMakeLists.txt b/components/events/CMakeLists.txt index 19bb358560..87fcf1c6a2 100644 --- a/components/events/CMakeLists.txt +++ b/components/events/CMakeLists.txt @@ -23,6 +23,7 @@ vkb__register_headers( event_types.hpp input_manager.hpp event_bus.hpp + event_pipeline.hpp OUTPUT EVENTS_HEADERS ) @@ -32,6 +33,7 @@ vkb__register_component( SRC src/input_manager.cpp src/event_bus.cpp + src/event_pipeline.cpp HEADERS ${EVENTS_HEADERS} INCLUDE_DIRS @@ -44,6 +46,7 @@ vkb__register_tests( tests/channel.test.cpp tests/event_bus.test.cpp tests/input_manager.test.cpp + tests/event_pipeline.test.cpp LIBS vkb__events ) \ No newline at end of file diff --git a/components/events/include/components/events/event_bus.hpp b/components/events/include/components/events/event_bus.hpp index b0cb6b3570..ef5e66c437 100644 --- a/components/events/include/components/events/event_bus.hpp +++ b/components/events/include/components/events/event_bus.hpp @@ -41,11 +41,11 @@ class EventObserver /** * @brief EventBus acts as a collection of event channels and observers - * + * * An observer is added to the event bus through attach(observer). Once attached, an observer can register event listeners (each(), last()) and request * ChannelSender. Each step of the EventBus calls update() on its observers. Which in turn allows an observer to submit events to the bus. After this, the bus then processes * all event callbacks with a stream of events. - * + * * The combination of these actions will allow for inter component communication without any hard links. This allows samples to create and organize components in anyway they deem fit. * * TODO: Allow an observer to detach from the bus. This should also clear all its callbacks. May need to restructure the internal storage of the bus @@ -58,7 +58,7 @@ class EventBus /** * @brief Attach a new observer - * + * * @param observer the observer to attach * @return EventBus& the event bus */ @@ -69,7 +69,7 @@ class EventBus /** * @brief Attach an event callback for each event in a cycle - * + * * @param cb the callback * @return EventBus& the event bus */ @@ -78,7 +78,7 @@ class EventBus /** * @brief Attach an event callback for the last event in a cycle - * + * * @param cb the callback * @return EventBus& the event bus */ @@ -87,7 +87,7 @@ class EventBus /** * @brief Retrieve a ChannelSender for a given type - * + * * @tparam Type the type of the required sender * @return ChannelSenderPtr the requested sender */ @@ -96,9 +96,24 @@ class EventBus /** * @brief Process a cycle of events - * + * */ - void process(); + virtual void process(); + + protected: + // Allow each and last callbacks to process all events held in channels + inline void flush_callbacks() + { + for (auto &it : m_each_callbacks) + { + it.second->process_each(); + } + + for (auto &it : m_last_callbacks) + { + it.second->process_last(); + } + } private: class ChannelCallbacks @@ -188,6 +203,22 @@ class EventBus std::unordered_map> m_each_callbacks; std::unordered_map> m_last_callbacks; }; + +// Allows for EventObservers to be grouped into a single container +class EventObserverGroup : public EventObserver +{ + public: + EventObserverGroup() = default; + virtual ~EventObserverGroup() = default; + + EventObserverGroup &attach(std::shared_ptr &observer); + EventObserverGroup &remove(std::shared_ptr &observer); + virtual void update() override; + virtual void attach(EventBus &bus) override; + + private: + std::set> m_observers; +}; } // namespace events } // namespace components diff --git a/components/events/include/components/events/event_pipeline.hpp b/components/events/include/components/events/event_pipeline.hpp new file mode 100644 index 0000000000..358d15b509 --- /dev/null +++ b/components/events/include/components/events/event_pipeline.hpp @@ -0,0 +1,112 @@ +/* Copyright (c) 2022, Arm Limited and Contributors + * + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 the "License"; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "components/events/event_bus.hpp" + +namespace components +{ +namespace events +{ +class EventPipelineStage +{ + public: + EventPipelineStage() = default; + virtual ~EventPipelineStage() = default; + + virtual const char *name() const = 0; + virtual void emit(EventBus &bus) = 0; +}; + +template +class TypedEventPipelineStage : public EventPipelineStage +{ + public: + TypedEventPipelineStage() + {} + + virtual ~TypedEventPipelineStage() = default; + + virtual const char *name() const override + { + return typeid(Event).name(); + } + + virtual void emit(EventBus &bus) override + { + auto sender = bus.request_sender(); + sender->push(Event{}); + } +}; + +template +class TypedEventPipelineStageWithFunc : public TypedEventPipelineStage +{ + public: + typedef Event (*Func)(); + + TypedEventPipelineStageWithFunc(Func &&func) : + m_func{func} + {} + + virtual ~TypedEventPipelineStageWithFunc() = default; + + virtual void emit(EventBus &bus) override + { + auto sender = bus.request_sender(); + sender->push(m_func()); + } + + private: + Func m_func; +}; + +class EventPipeline : public EventBus +{ + public: + EventPipeline() = default; + virtual ~EventPipeline() = default; + + /** + * @brief Stage is ran once at the start of the pipeline + * + * @param stage the stage to be run + * @return EventPipeline& self + */ + EventPipeline &add_once(std::unique_ptr &&stage); + + /** + * @brief Stage is ran each time process is executed. Stages are ran in order of addition + * + * @param stage the stage to be run + * @return EventPipeline& self + */ + EventPipeline &add_always(std::unique_ptr &&stage); + + virtual void process() override; + + protected: + bool running{false}; + + std::vector> m_once_stages; + std::vector> m_stages; +}; +} // namespace events +} // namespace components \ No newline at end of file diff --git a/components/events/src/event_pipeline.cpp b/components/events/src/event_pipeline.cpp new file mode 100644 index 0000000000..6c570c8428 --- /dev/null +++ b/components/events/src/event_pipeline.cpp @@ -0,0 +1,68 @@ +/* Copyright (c) 2022, Arm Limited and Contributors + * + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 the "License"; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "components/events/event_pipeline.hpp" + +#include + +namespace components +{ +namespace events +{ +EventPipeline &EventPipeline::add_once(std::unique_ptr &&stage) +{ + m_once_stages.emplace_back(std::move(stage)); + return *this; +} + +EventPipeline &EventPipeline::add_always(std::unique_ptr &&stage) +{ + m_stages.emplace_back(std::move(stage)); + return *this; +} + +void EventPipeline::process() +{ + if (!running) + { + for (auto &stage : m_once_stages) + { + // TODO: Add Tracy ZoneScoped(stage->name()); + + // flush after each stage to make sure pipeline stages are sequential + stage->emit(*this); + flush_callbacks(); + } + + running = true; + } + + // query observer events + EventBus::process(); + flush_callbacks(); + + for (auto &stage : m_stages) + { + // TODO: Add Tracy ZoneScoped(stage->name()); + + // flush after each stage to make sure pipeline stages are sequential + stage->emit(*this); + flush_callbacks(); + } +} +} // namespace events +} // namespace components \ No newline at end of file diff --git a/components/events/tests/event_bus.test.cpp b/components/events/tests/event_bus.test.cpp index 79e9cece5d..744adb2051 100644 --- a/components/events/tests/event_bus.test.cpp +++ b/components/events/tests/event_bus.test.cpp @@ -86,17 +86,17 @@ class TestEventBus final : public EventBus } }; -class Observer final : public EventObserver +class TestObserver final : public EventObserver { public: - Observer(std::function update = nullptr, std::function attach = nullptr) : + TestObserver(std::function update = nullptr, std::function attach = nullptr) : EventObserver{}, m_update{update}, m_attach{attach} { } - virtual ~Observer() = default; + virtual ~TestObserver() = default; virtual void update() override { @@ -123,7 +123,7 @@ TEST_CASE("register observer", "[events]") { TestEventBus bus{}; - std::shared_ptr observer = std::make_shared(); + std::shared_ptr observer = std::make_shared(); REQUIRE(bus.observer_count() == 0); @@ -136,9 +136,9 @@ TEST_CASE("register multiple observers of the different instances", "[events]") { TestEventBus bus{}; - std::shared_ptr observer_1 = std::make_shared(); - std::shared_ptr observer_2 = std::make_shared(); - std::shared_ptr observer_3 = std::make_shared(); + std::shared_ptr observer_1 = std::make_shared(); + std::shared_ptr observer_2 = std::make_shared(); + std::shared_ptr observer_3 = std::make_shared(); REQUIRE(bus.observer_count() == 0); @@ -276,7 +276,7 @@ TEST_CASE("process observer", "[events]") auto sender = bus.request_sender(); REQUIRE(sender != nullptr); - auto observer = std::make_shared( + auto observer = std::make_shared( [&]() { // update() sender->push(EventType{true, 5}); }, @@ -327,8 +327,8 @@ TEST_CASE("expire an observer before process", "[events]") TestEventBus bus{}; - std::shared_ptr observer_1 = std::make_shared(); - std::shared_ptr observer_2 = std::make_shared(); + std::shared_ptr observer_1 = std::make_shared(); + std::shared_ptr observer_2 = std::make_shared(); bus.attach(observer_1).attach(observer_2); @@ -345,4 +345,4 @@ TEST_CASE("expire an observer before process", "[events]") bus.process(); REQUIRE(bus.observer_count() == 0); -} \ No newline at end of file +} diff --git a/components/events/tests/event_pipeline.test.cpp b/components/events/tests/event_pipeline.test.cpp new file mode 100644 index 0000000000..6ba7a20a28 --- /dev/null +++ b/components/events/tests/event_pipeline.test.cpp @@ -0,0 +1,313 @@ +/* Copyright (c) 2022, Arm Limited and Contributors + * + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 the "License"; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +#include + +using namespace components::events; + +class TestEventBus final : public EventBus +{ + public: + TestEventBus() : + EventBus{} + {} + + virtual ~TestEventBus() = default; + + inline size_t observer_count() const + { + return m_observers.size(); + } + + inline size_t unobserved_each_event_count() const + { + size_t unprocessed_events{0}; + + for (auto &it : m_each_callbacks) + { + unprocessed_events += it.second->queue_size(); + } + + return unprocessed_events; + } + + inline size_t unobserved_last_event_count() const + { + size_t unprocessed_events{0}; + + for (auto &it : m_last_callbacks) + { + unprocessed_events += it.second->queue_size(); + } + + return unprocessed_events; + } + + inline size_t each_callback_count() const + { + size_t callback_count{0}; + + for (auto &it : m_each_callbacks) + { + callback_count += it.second->callback_count(); + } + + return callback_count; + } + + inline size_t last_callback_count() const + { + size_t callback_count{0}; + + for (auto &it : m_last_callbacks) + { + callback_count += it.second->callback_count(); + } + + return callback_count; + } +}; + +class TestObserver final : public EventObserver +{ + public: + TestObserver(std::function update = nullptr, std::function attach = nullptr) : + EventObserver{}, + m_update{update}, + m_attach{attach} + { + } + + virtual ~TestObserver() = default; + + virtual void update() override + { + if (m_update) + { + m_update(); + } + } + + virtual void attach(EventBus &bus) override + { + if (m_attach) + { + m_attach(bus); + } + } + + private: + std::function m_update{nullptr}; + std::function m_attach{nullptr}; +}; + +struct TestEvent +{}; + +// tracks the order of pipeline execution +class TestStage : public TypedEventPipelineStage +{ + public: + TestStage(uint32_t ¤t_value, uint32_t expected_value) : + m_current_value{current_value}, m_expected_value{expected_value} + {} + + virtual ~TestStage() = default; + + virtual void emit(EventBus &bus) override + { + auto sender = bus.request_sender(); + sender->push(TestEvent{}); + + REQUIRE(m_current_value == m_expected_value); + + m_current_value++; + } + + protected: + uint32_t &m_current_value; + uint32_t m_expected_value; +}; + +class TestEventPipeline final : public EventPipeline +{ + public: + TestEventPipeline() = default; + virtual ~TestEventPipeline() = default; + + inline size_t once_count() const + { + return m_once_stages.size(); + } + + inline size_t always_count() const + { + return m_stages.size(); + } + + inline size_t total_stage_count() const + { + return once_count() + always_count(); + } +}; + +TEST_CASE("register once stage", "[events]") +{ + TestEventPipeline pipeline{}; + + REQUIRE(pipeline.once_count() == 0); + + pipeline.add_once(std::make_unique>()); + + REQUIRE(pipeline.once_count() == 1); +} + +TEST_CASE("register then stage", "[events]") +{ + TestEventPipeline pipeline{}; + + REQUIRE(pipeline.always_count() == 0); + + pipeline.add_always(std::make_unique>()); + + REQUIRE(pipeline.always_count() == 1); +} + +TEST_CASE("register multiple stages", "[events]") +{ + TestEventPipeline pipeline{}; + + REQUIRE(pipeline.always_count() == 0); + REQUIRE(pipeline.once_count() == 0); + + pipeline + .add_once(std::make_unique>()) + .add_once(std::make_unique>()) + .add_always(std::make_unique>()) + .add_always(std::make_unique>()) + .add_always(std::make_unique>()); + + REQUIRE(pipeline.always_count() == 3); + REQUIRE(pipeline.once_count() == 2); + REQUIRE(pipeline.total_stage_count() == 5); +} + +TEST_CASE("stages are executed in the correct order", "[events]") +{ + TestEventPipeline pipeline{}; + + uint32_t execution_index{0}; + + pipeline + .add_once(std::make_unique(execution_index, 0)) + .add_once(std::make_unique(execution_index, 1)) + .add_always(std::make_unique(execution_index, 2)) + .add_always(std::make_unique(execution_index, 3)) + .add_always(std::make_unique(execution_index, 4)); + + // assertions inside TestStage + pipeline.process(); + + // on a second pass we expect only the "then" stages to be processed + execution_index = 2; + + pipeline.process(); +} + +TEST_CASE("event observers are executed in the correct order", "[events]") +{ + struct EventOne + { + uint32_t value{4}; + }; + + struct EventTwo + { + uint32_t value{1}; + }; + + struct EventThree + { + uint32_t value{56}; + }; + + TestEventPipeline pipeline{}; + + pipeline + .add_once(std::make_unique>()) + .add_always(std::make_unique>()) + .add_always(std::make_unique>()); + + uint32_t one_execution_count{0}; + uint32_t two_execution_count{0}; + uint32_t three_execution_count{0}; + + pipeline + .each([&](const EventOne &event) { + REQUIRE(event.value == 4); + one_execution_count++; + }) + .each([&](const EventTwo &event) { + REQUIRE(event.value == 1); + two_execution_count++; + }) + .each([&](const EventThree &event) { + REQUIRE(event.value == 56); + three_execution_count++; + }); + + pipeline.process(); + pipeline.process(); + pipeline.process(); + pipeline.process(); + + REQUIRE(one_execution_count == 1); + REQUIRE(two_execution_count == 4); + REQUIRE(three_execution_count == 4); +} + +TEST_CASE("stages with custom fields", "[events]") +{ + struct Update + { + float delta_time{0.0f}; + }; + + TestEventPipeline pipeline{}; + + pipeline + .add_always(std::make_unique>( + []() -> Update { + // in practice we would track delta time here + return Update{0.0167f}; + })); + + pipeline + .each([&](const Update &event) { + REQUIRE(event.delta_time == 0.0167f); + }); + + pipeline.process(); + pipeline.process(); + pipeline.process(); + pipeline.process(); +} \ No newline at end of file