Skip to content

Commit 08938d6

Browse files
committed
engine: Stream json api data to a file
1 parent 709c80e commit 08938d6

9 files changed

+145
-23
lines changed

engine/src/server.hpp

+60-5
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@
2121

2222
#pragma once
2323

24+
#include <memory> // for unique_ptr
2425
#include <utility> // for make_pair
2526

26-
#include <cloe/handler.hpp> // for Request, Response
27-
#include <cloe/registrar.hpp> // for Registrar
27+
#include <cloe/handler.hpp> // for Request, Response
28+
#include <cloe/registrar.hpp> // for Registrar
29+
#include <cloe/utility/output_serializer_json.hpp> // for JsonFileSerializer
2830

29-
#include "stack.hpp" // for ServerConf
3031
#include "oak/server.hpp" // for Server, StaticRegistrar, ...
32+
#include "stack.hpp" // for ServerConf
3133

3234
namespace engine {
3335

@@ -67,6 +69,11 @@ class Server {
6769
public:
6870
bool is_listening() { return server_.is_listening(); }
6971

72+
bool is_streaming() { return is_streaming_; }
73+
74+
/**
75+
* Start the web server.
76+
*/
7077
void start() {
7178
assert(!is_listening());
7279

@@ -77,13 +84,31 @@ class Server {
7784
server_.listen();
7885
}
7986

87+
/**
88+
* Open a file for api data streaming. This does not require a running web
89+
* server.
90+
*/
91+
void init_stream(const std::string& filename) {
92+
serializer_ = make_json_file_serializer(cloe::utility::JsonFileType::JSON_GZIP, logger());
93+
serializer_->open_file(filename);
94+
}
95+
96+
/**
97+
* Stop all server-related procedures.
98+
*/
8099
void stop() {
81100
if (is_listening()) {
82101
logger()->info("Stopping server...");
83102
server_.stop();
84103
}
104+
if (serializer_ != nullptr) {
105+
serializer_->close_file();
106+
}
85107
}
86108

109+
/**
110+
* Register a list of all endpoints.
111+
*/
87112
void enroll(cloe::Registrar& r) {
88113
r.register_api_handler(
89114
"/endpoints", cloe::HandlerType::STATIC,
@@ -107,12 +132,32 @@ class Server {
107132
}
108133

109134
/**
110-
* Refresh the server buffer.
135+
* Refresh and/or start streaming api data to a file.
136+
*/
137+
void refresh_buffer_start_stream() {
138+
is_streaming_ = serializer_ != nullptr;
139+
if (is_listening() || is_streaming()) {
140+
buffer_api_registrar_.refresh_buffer();
141+
}
142+
if (is_streaming()) {
143+
// Write static endpoints at the beginning of the file.
144+
write_data_stream(static_api_registrar_.endpoints());
145+
write_data_stream(locked_api_registrar_.endpoints());
146+
write_data_stream(buffer_api_registrar_.endpoints());
147+
}
148+
}
149+
150+
/**
151+
* Refresh and/or write api data to a file.
111152
*/
112153
void refresh_buffer() {
113-
if (config.listen) {
154+
if (is_listening() || is_streaming()) {
114155
buffer_api_registrar_.refresh_buffer();
115156
}
157+
if (is_streaming()) {
158+
write_data_stream(locked_api_registrar_.endpoints());
159+
write_data_stream(buffer_api_registrar_.endpoints());
160+
}
116161
}
117162

118163
/**
@@ -123,12 +168,22 @@ class Server {
123168
protected:
124169
cloe::Logger logger() const { return cloe::logger::get("cloe"); }
125170

171+
private:
172+
void write_data_stream(const std::vector<std::string>& endpoints) const {
173+
auto j = server_.endpoints_to_json(endpoints);
174+
if (!j.empty()) {
175+
serializer_->serialize(j);
176+
}
177+
};
178+
126179
private: // State
127180
oak::Server server_;
128181
oak::StaticRegistrar static_registrar_{&server_, config.static_prefix, nullptr};
129182
oak::StaticRegistrar static_api_registrar_{&server_, config.api_prefix, nullptr};
130183
oak::LockedRegistrar locked_api_registrar_{&server_, config.api_prefix, nullptr};
131184
oak::BufferRegistrar buffer_api_registrar_{&server_, config.api_prefix, nullptr};
185+
bool is_streaming_{false};
186+
std::unique_ptr<cloe::utility::JsonFileSerializer> serializer_;
132187
};
133188

134189
} // namespace engine

engine/src/simulation.cpp

+8-1
Original file line numberDiff line numberDiff line change
@@ -707,7 +707,7 @@ StateId SimulationMachine::Connect::impl(SimulationContext& ctx) {
707707
}
708708

709709
ctx.progress.init_end();
710-
ctx.server->refresh_buffer();
710+
ctx.server->refresh_buffer_start_stream();
711711
logger()->info("Simulation initialization complete.");
712712
return START;
713713
}
@@ -1217,6 +1217,13 @@ SimulationResult Simulation::run() {
12171217
if (config_.server.listen) {
12181218
ctx.server->start();
12191219
}
1220+
// Stream data to the requested file
1221+
if (r.config.engine.output_file_data_stream) {
1222+
auto filepath = r.get_output_filepath(*r.config.engine.output_file_data_stream);
1223+
if (is_writable(filepath)) {
1224+
ctx.server->init_stream(filepath.native());
1225+
}
1226+
}
12201227

12211228
// Run pre-connect hooks
12221229
ctx.commander->set_enabled(config_.engine.security_enable_hooks);

engine/src/stack.hpp

+2
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ struct EngineConf : public Confable {
325325
boost::optional<boost::filesystem::path> output_file_config{"config.json"};
326326
boost::optional<boost::filesystem::path> output_file_result{"result.json"};
327327
boost::optional<boost::filesystem::path> output_file_triggers{"triggers.json"};
328+
boost::optional<boost::filesystem::path> output_file_data_stream;
328329
bool output_clobber_files{true};
329330

330331
/**
@@ -426,6 +427,7 @@ struct EngineConf : public Confable {
426427
{"config", make_schema(&output_file_config, file_proto(), "file to store config in")},
427428
{"result", make_schema(&output_file_result, file_proto(), "file to store simulation result in")},
428429
{"triggers", make_schema(&output_file_triggers, file_proto(), "file to store triggers in")},
430+
{"api_recording", make_schema(&output_file_data_stream, file_proto(), "file to store api data stream")},
429431
}},
430432
}},
431433
{"triggers", Struct{

tests/setup_bats.bash

+14
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,20 @@ cloe_engine() {
7373
)
7474
}
7575

76+
export cloe_tmp_registry="${HOME}/.cache/cloe/tmp-registry"
77+
78+
cloe_engine_with_tmp_registry() {
79+
test $1 == "run"
80+
shift
81+
82+
(
83+
export CLOE_WRITE_OUTPUT=true
84+
export CLOE_TMP_REGISTRY=${cloe_tmp_registry}
85+
export CLOE_OVERRIDE_ENV=(CLOE_TMP_REGISTRY)
86+
cloe_engine run <(echo '{"version":"4","engine":{"registry_path":"${CLOE_TMP_REGISTRY}"}}') "$@"
87+
)
88+
}
89+
7690
cloe_shell() {
7791
local cloe_log_level="${CLOE_LOG_LEVEL-debug}"
7892
local cloe_profile_file="${CLOE_PROFILE_FILE-${CLOE_ROOT}/conantest.py}"

tests/test_engine_json_schema.json

+12
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,18 @@
683683
"files": {
684684
"additionalProperties": false,
685685
"properties": {
686+
"api_recording": {
687+
"description": "file to store api data stream",
688+
"oneOf": [
689+
{
690+
"type": "null"
691+
},
692+
{
693+
"comment": "path should either not exist or be a file",
694+
"type": "string"
695+
}
696+
]
697+
},
686698
"config": {
687699
"description": "file to store config in",
688700
"oneOf": [

tests/test_engine_json_schema_with_vtd.json

+12
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,18 @@
683683
"files": {
684684
"additionalProperties": false,
685685
"properties": {
686+
"api_recording": {
687+
"description": "file to store api data stream",
688+
"oneOf": [
689+
{
690+
"type": "null"
691+
},
692+
{
693+
"comment": "path should either not exist or be a file",
694+
"type": "string"
695+
}
696+
]
697+
},
686698
"config": {
687699
"description": "file to store config in",
688700
"oneOf": [

tests/test_engine_replica_smoketest.bats

+2-17
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,10 @@
66

77
load setup_bats
88

9-
export cloe_tmp_registry="/tmp/cloe-ci-registry"
10-
11-
cloe_engine_with_tmp_registry() {
12-
test $1 == "run"
13-
shift
14-
15-
(
16-
export CLOE_WRITE_OUTPUT=true
17-
export CLOE_TMP_REGISTRY=${cloe_tmp_registry}
18-
export CLOE_OVERRIDE_ENV=(CLOE_TMP_REGISTRY)
19-
cloe_engine run <(echo '{"version":"4","engine":{"registry_path":"${CLOE_TMP_REGISTRY}"}}') "$@"
20-
)
21-
}
22-
23-
249
@test "Expect exact replication: test_engine_replica_smoketest.json" {
2510
# Clean up in case temporary registry already exists.
2611
if [[ -d "$cloe_tmp_registry" ]]; then
27-
rm -rf "$cloe_tmp_registry"
12+
rm -r "$cloe_tmp_registry" || true
2813
fi
2914

3015
# Run our initial smoketest, writing to the temporary registry.
@@ -64,5 +49,5 @@ cloe_engine_with_tmp_registry() {
6449
md5sum -c <(md5sum $cloe_tmp_registry/original/{config,triggers}.json | sed 's/original/replica-2/')
6550

6651
# Remove the test registry.
67-
rm -rf "$cloe_tmp_registry"
52+
rm -r "$cloe_tmp_registry" || true
6853
}

tests/test_vtd.bats

+12
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ teardown() {
1818
echo "Teardown VTD (from BATS)"
1919
teardown_vtd
2020
fi
21+
# Remove the temporary registry.
22+
rm -r "$cloe_tmp_registry" || true
2123
}
2224

2325
@test "Expect check/run success: test_vtd_smoketest.json" {
@@ -28,6 +30,16 @@ teardown() {
2830
cloe_engine run test_vtd_smoketest.json
2931
}
3032

33+
@test "Expect check/run success: test_vtd_api_recording.json" {
34+
if ! test_vtd_plugin_exists; then
35+
skip "required simulator vtd not present"
36+
fi
37+
cloe_engine check test_vtd_api_recording.json
38+
cloe_engine_with_tmp_registry run test_vtd_api_recording.json
39+
# Remove the test registry.
40+
rm -r "$cloe_tmp_registry" || true
41+
}
42+
3143
@test "Expect check/run success: test_vtd_watchdog.json" {
3244
if ! test_vtd_plugin_exists; then
3345
skip "required simulator vtd not present"

tests/test_vtd_api_recording.json

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"version": "4",
3+
"include": [
4+
"config_vtd_smoketest.json",
5+
"bootstrap_vtd.json"
6+
],
7+
"engine": {
8+
"output": {
9+
"files": {
10+
"api_recording": "data.json.gz"
11+
}
12+
}
13+
},
14+
"server": {
15+
"listen": false
16+
},
17+
"triggers": [
18+
{
19+
"event": "time=2",
20+
"action": "succeed"
21+
}
22+
]
23+
}

0 commit comments

Comments
 (0)