Skip to content

Event Pipelines #487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bldsys/cmake/test_helpers.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ set_property(TARGET vkb_tests PROPERTY FOLDER "tests")


function(vkb__register_tests)
set(options)
set(options)
set(oneValueArgs NAME)
set(multiValueArgs SRC LIBS)

if(NOT ((CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME AND BUILD_TESTING) OR VKB_BUILD_TESTS))
return() # testing not enabled
endif()

cmake_parse_arguments(TARGET "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})
cmake_parse_arguments(TARGET "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})

if (TARGET_NAME STREQUAL "")
message(FATAL_ERROR "NAME must be defined in vkb__register_tests")
Expand Down
3 changes: 3 additions & 0 deletions components/events/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ vkb__register_headers(
event_types.hpp
input_manager.hpp
event_bus.hpp
event_pipeline.hpp
OUTPUT
EVENTS_HEADERS
)
Expand All @@ -32,6 +33,7 @@ vkb__register_component(
SRC
src/input_manager.cpp
src/event_bus.cpp
src/event_pipeline.cpp
HEADERS
${EVENTS_HEADERS}
INCLUDE_DIRS
Expand All @@ -44,6 +46,7 @@ vkb__register_tests(
tests/channel.test.cpp
tests/event_bus.test.cpp
tests/input_manager.test.cpp
tests/event_pipeline.test.cpp
LIBS
vkb__events
)
47 changes: 39 additions & 8 deletions components/events/include/components/events/event_bus.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ class EventObserver

/**
* @brief EventBus acts as a collection of event channels and observers
*
*
* An observer is added to the event bus through attach(observer). Once attached, an observer can register event listeners (each<EventType>(), last<EventType>()) and request
* ChannelSender<EventTypes>. Each step of the EventBus calls update() on its observers. Which in turn allows an observer to submit events to the bus. After this, the bus then processes
* all event callbacks with a stream of events.
*
*
* The combination of these actions will allow for inter component communication without any hard links. This allows samples to create and organize components in anyway they deem fit.
*
* TODO: Allow an observer to detach from the bus. This should also clear all its callbacks. May need to restructure the internal storage of the bus
Expand All @@ -58,7 +58,7 @@ class EventBus

/**
* @brief Attach a new observer
*
*
* @param observer the observer to attach
* @return EventBus& the event bus
*/
Expand All @@ -69,7 +69,7 @@ class EventBus

/**
* @brief Attach an event callback for each event in a cycle
*
*
* @param cb the callback
* @return EventBus& the event bus
*/
Expand All @@ -78,7 +78,7 @@ class EventBus

/**
* @brief Attach an event callback for the last event in a cycle
*
*
* @param cb the callback
* @return EventBus& the event bus
*/
Expand All @@ -87,7 +87,7 @@ class EventBus

/**
* @brief Retrieve a ChannelSender for a given type
*
*
* @tparam Type the type of the required sender
* @return ChannelSenderPtr<Type> the requested sender
*/
Expand All @@ -96,9 +96,24 @@ class EventBus

/**
* @brief Process a cycle of events
*
*
*/
void process();
virtual void process();

protected:
// Allow each and last callbacks to process all events held in channels
inline void flush_callbacks()
{
for (auto &it : m_each_callbacks)
{
it.second->process_each();
}

for (auto &it : m_last_callbacks)
{
it.second->process_last();
}
}

private:
class ChannelCallbacks
Expand Down Expand Up @@ -188,6 +203,22 @@ class EventBus
std::unordered_map<std::type_index, std::unique_ptr<ChannelCallbacks>> m_each_callbacks;
std::unordered_map<std::type_index, std::unique_ptr<ChannelCallbacks>> m_last_callbacks;
};

// Allows for EventObservers to be grouped into a single container
class EventObserverGroup : public EventObserver
{
public:
EventObserverGroup() = default;
virtual ~EventObserverGroup() = default;

EventObserverGroup &attach(std::shared_ptr<EventObserver> &observer);
EventObserverGroup &remove(std::shared_ptr<EventObserver> &observer);
virtual void update() override;
virtual void attach(EventBus &bus) override;

private:
std::set<std::shared_ptr<EventObserver>> m_observers;
};
} // namespace events
} // namespace components

Expand Down
112 changes: 112 additions & 0 deletions components/events/include/components/events/event_pipeline.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/* Copyright (c) 2022, Arm Limited and Contributors
*
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 the "License";
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <functional>

#include "components/events/event_bus.hpp"

namespace components
{
namespace events
{
class EventPipelineStage
{
public:
EventPipelineStage() = default;
virtual ~EventPipelineStage() = default;

virtual const char *name() const = 0;
virtual void emit(EventBus &bus) = 0;
};

template <typename Event>
class TypedEventPipelineStage : public EventPipelineStage
{
public:
TypedEventPipelineStage()
{}

virtual ~TypedEventPipelineStage() = default;

virtual const char *name() const override
{
return typeid(Event).name();
}

virtual void emit(EventBus &bus) override
{
auto sender = bus.request_sender<Event>();
sender->push(Event{});
}
};

template <typename Event>
class TypedEventPipelineStageWithFunc : public TypedEventPipelineStage<Event>
{
public:
typedef Event (*Func)();

TypedEventPipelineStageWithFunc(Func &&func) :
m_func{func}
{}

virtual ~TypedEventPipelineStageWithFunc() = default;

virtual void emit(EventBus &bus) override
{
auto sender = bus.request_sender<Event>();
sender->push(m_func());
}

private:
Func m_func;
};

class EventPipeline : public EventBus
{
public:
EventPipeline() = default;
virtual ~EventPipeline() = default;

/**
* @brief Stage is ran once at the start of the pipeline
*
* @param stage the stage to be run
* @return EventPipeline& self
*/
EventPipeline &add_once(std::unique_ptr<EventPipelineStage> &&stage);

/**
* @brief Stage is ran each time process is executed. Stages are ran in order of addition
*
* @param stage the stage to be run
* @return EventPipeline& self
*/
EventPipeline &add_always(std::unique_ptr<EventPipelineStage> &&stage);

virtual void process() override;

protected:
bool running{false};

std::vector<std::unique_ptr<EventPipelineStage>> m_once_stages;
std::vector<std::unique_ptr<EventPipelineStage>> m_stages;
};
} // namespace events
} // namespace components
68 changes: 68 additions & 0 deletions components/events/src/event_pipeline.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/* Copyright (c) 2022, Arm Limited and Contributors
*
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 the "License";
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "components/events/event_pipeline.hpp"

#include <cassert>

namespace components
{
namespace events
{
EventPipeline &EventPipeline::add_once(std::unique_ptr<EventPipelineStage> &&stage)
{
m_once_stages.emplace_back(std::move(stage));
return *this;
}

EventPipeline &EventPipeline::add_always(std::unique_ptr<EventPipelineStage> &&stage)
{
m_stages.emplace_back(std::move(stage));
return *this;
}

void EventPipeline::process()
{
if (!running)
{
for (auto &stage : m_once_stages)
{
// TODO: Add Tracy ZoneScoped(stage->name());

// flush after each stage to make sure pipeline stages are sequential
stage->emit(*this);
flush_callbacks();
}

running = true;
}

// query observer events
EventBus::process();
flush_callbacks();

for (auto &stage : m_stages)
{
// TODO: Add Tracy ZoneScoped(stage->name());

// flush after each stage to make sure pipeline stages are sequential
stage->emit(*this);
flush_callbacks();
}
}
} // namespace events
} // namespace components
22 changes: 11 additions & 11 deletions components/events/tests/event_bus.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,17 @@ class TestEventBus final : public EventBus
}
};

class Observer final : public EventObserver
class TestObserver final : public EventObserver
{
public:
Observer(std::function<void()> update = nullptr, std::function<void(EventBus &)> attach = nullptr) :
TestObserver(std::function<void()> update = nullptr, std::function<void(EventBus &)> attach = nullptr) :
EventObserver{},
m_update{update},
m_attach{attach}
{
}

virtual ~Observer() = default;
virtual ~TestObserver() = default;

virtual void update() override
{
Expand All @@ -123,7 +123,7 @@ TEST_CASE("register observer", "[events]")
{
TestEventBus bus{};

std::shared_ptr<EventObserver> observer = std::make_shared<Observer>();
std::shared_ptr<EventObserver> observer = std::make_shared<TestObserver>();

REQUIRE(bus.observer_count() == 0);

Expand All @@ -136,9 +136,9 @@ TEST_CASE("register multiple observers of the different instances", "[events]")
{
TestEventBus bus{};

std::shared_ptr<EventObserver> observer_1 = std::make_shared<Observer>();
std::shared_ptr<EventObserver> observer_2 = std::make_shared<Observer>();
std::shared_ptr<EventObserver> observer_3 = std::make_shared<Observer>();
std::shared_ptr<EventObserver> observer_1 = std::make_shared<TestObserver>();
std::shared_ptr<EventObserver> observer_2 = std::make_shared<TestObserver>();
std::shared_ptr<EventObserver> observer_3 = std::make_shared<TestObserver>();

REQUIRE(bus.observer_count() == 0);

Expand Down Expand Up @@ -276,7 +276,7 @@ TEST_CASE("process observer", "[events]")
auto sender = bus.request_sender<EventType>();
REQUIRE(sender != nullptr);

auto observer = std::make_shared<Observer>(
auto observer = std::make_shared<TestObserver>(
[&]() { // update()
sender->push(EventType{true, 5});
},
Expand Down Expand Up @@ -327,8 +327,8 @@ TEST_CASE("expire an observer before process", "[events]")

TestEventBus bus{};

std::shared_ptr<EventObserver> observer_1 = std::make_shared<Observer>();
std::shared_ptr<EventObserver> observer_2 = std::make_shared<Observer>();
std::shared_ptr<EventObserver> observer_1 = std::make_shared<TestObserver>();
std::shared_ptr<EventObserver> observer_2 = std::make_shared<TestObserver>();

bus.attach(observer_1).attach(observer_2);

Expand All @@ -345,4 +345,4 @@ TEST_CASE("expire an observer before process", "[events]")
bus.process();

REQUIRE(bus.observer_count() == 0);
}
}
Loading