Skip to content
This repository was archived by the owner on Dec 18, 2024. It is now read-only.

Subscriptions Fixes 1 #264

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/JsonResponses.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ namespace JsonResponses {
std::string message,
jsoncons::json& jsonResponse);

std::string malFormedRequest(std::string message);
std::string malFormedRequest(std::string message, std::string requestId="UNKNOWN");

void malFormedRequest(std::string message,
jsoncons::json& jsonResponse);
jsoncons::json& jsonResponse, std::string requesId="UNKNOWN");

/** A API call requested a non-existant path */
std::string pathNotFound(std::string request_id,
Expand Down
48 changes: 48 additions & 0 deletions include/VSSRequestJsonSchema.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,54 @@ static const char* SCHEMA_SET=R"(
}
)";


static const char* SCHEMA_SUBSCRIBE=R"(
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Subscribe Request",
"description": "Allows the client to subscribe to time-varying signal notifications on the server.",
"type": "object",
"required": ["action", "path", "requestId"],
"properties": {
"action": {
"enum": [ "subscribe" ],
"description": "The identifier for the subscription request"
},
"path": {
"$ref": "viss#/definitions/path"
},
"filters": {
"$ref": "viss#/definitions/filters"
},
"requestId": {
"$ref": "viss#/definitions/requestId"
}
}
}
)";

static const char* SCHEMA_UNSUBSCRIBE=R"(
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Unsubscribe Request",
"description": "Allows the client to unsubscribe to time-varying signal notifications on the server.",
"type": "object",
"required": ["action", "subscriptionId", "requestId"],
"properties": {
"action": {
"enum": [ "unsubscribe" ],
"description": "The identifier for the unsubscribe request"
},
"subscriptionId": {
"$ref": "viss#/definitions/subscriptionId"
},
"requestId": {
"$ref": "viss#/definitions/requestId"
}
}
}
)";

static const char* SCHEMA_UPDATE_VSS_TREE=R"(
{
"$schema": "http://json-schema.org/draft-04/schema#",
Expand Down
5 changes: 5 additions & 0 deletions include/VSSRequestValidator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class VSSRequestValidator {

void validateGet(jsoncons::json &request);
void validateSet(jsoncons::json &request);
void validateSubscribe(jsoncons::json &request);
void validateUnsubscribe(jsoncons::json &request);

void validateUpdateMetadata(jsoncons::json &request);
void validateUpdateVSSTree(jsoncons::json &request);

Expand All @@ -45,6 +48,8 @@ class VSSRequestValidator {
class MessageValidator;
std::unique_ptr<VSSRequestValidator::MessageValidator> getValidator;
std::unique_ptr<VSSRequestValidator::MessageValidator> setValidator;
std::unique_ptr<VSSRequestValidator::MessageValidator> subscribeValidator;
std::unique_ptr<VSSRequestValidator::MessageValidator> unsubscribeValidator;
std::unique_ptr<VSSRequestValidator::MessageValidator> updateMetadataValidator;
std::unique_ptr<VSSRequestValidator::MessageValidator> updateVSSTreeValidator;

Expand Down
5 changes: 3 additions & 2 deletions include/VssCommandProcessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ class VssCommandProcessor : public IVssCommandProcessor {
std::string processUpdateVSSTree(kuksa::kuksaChannel& channel, jsoncons::json &request);

public:
std::string processSubscribe(kuksa::kuksaChannel& channel, const std::string& request_id, const std::string& path);
std::string processUnsubscribe(const std::string & request_id, uint32_t subscribeID);
std::string processGetMetaData(jsoncons::json &request);
std::string processAuthorize(kuksa::kuksaChannel& channel, const std::string & request_id,
const std::string & token);
std::string processGet2(kuksa::kuksaChannel &channel, jsoncons::json &request);
std::string processSet2(kuksa::kuksaChannel &channel, jsoncons::json &request);
std::string processSubscribe(kuksa::kuksaChannel& channel, jsoncons::json &request);
std::string processUnsubscribe(kuksa::kuksaChannel &channel, jsoncons::json &request);

VssCommandProcessor(std::shared_ptr<ILogger> loggerUtil,
std::shared_ptr<IVssDatabase> database,
std::shared_ptr<IAuthenticator> vdator,
Expand Down
17 changes: 9 additions & 8 deletions src/JsonResponses.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace JsonResponses {
jsonResponse["action"] = action;
jsonResponse["requestId"] = request_id;
jsoncons::json error;
error["number"] = 400;
error["number"] = "400";
error["reason"] = "Bad Request";
error["message"] = message;
jsonResponse["error"] = error;
Expand All @@ -41,18 +41,19 @@ namespace JsonResponses {
return ss.str();
}

void malFormedRequest(std::string message, jsoncons::json& jsonResponse) {
void malFormedRequest(std::string message, jsoncons::json& jsonResponse, std::string requestId) {
jsoncons::json error;

error["number"] = 400;
error["number"] = "400";
error["reason"] = "Bad Request";
error["message"] = message;
jsonResponse["error"] = error;
jsonResponse["requestId"] = requestId;
jsonResponse["ts"] = getTimeStamp();
}
std::string malFormedRequest(std::string message) {
std::string malFormedRequest(std::string message, std::string requestId) {
jsoncons::json answer;
malFormedRequest(message, answer);
malFormedRequest(message, answer, requestId);

std::stringstream ss;
ss << pretty_print(answer);
Expand All @@ -67,7 +68,7 @@ namespace JsonResponses {
jsonResponse["action"] = action;
jsonResponse["requestId"] = request_id;
jsoncons::json error;
error["number"] = 404;
error["number"] = "404";
error["reason"] = "Path not found";
error["message"] = "I can not find " + path + " in my db";
jsonResponse["error"] = error;
Expand All @@ -91,7 +92,7 @@ namespace JsonResponses {
jsoncons::json error;
jsonResponse["action"] = action;
jsonResponse["requestId"] = request_id;
error["number"] = 403;
error["number"] = "403";
error["reason"] = "Forbidden";
error["message"] = message;
jsonResponse["error"] = error;
Expand All @@ -115,7 +116,7 @@ namespace JsonResponses {
jsonResponse["action"] = action;
jsonResponse["requestId"] = request_id;
jsoncons::json error;
error["number"] = 400;
error["number"] = "400";
error["reason"] = "Value passed is out of bounds";
error["message"] = message;
jsonResponse["error"] = error;
Expand Down
2 changes: 1 addition & 1 deletion src/SubscriptionHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ int SubscriptionHandler::updateByUUID(const string &signalUUID, const json &data
for (auto subID : handle->second) {
std::lock_guard<std::mutex> lock(subMutex);
tuple<SubscriptionId, ConnectionId, json> newSub;
logger->Log(LogLevel::VERBOSE, "SubscriptionHandler::updateByUUID: new value set at path " + std::to_string(subID.first) + ss.str());
logger->Log(LogLevel::VERBOSE, "SubscriptionHandler::updateByUUID: new value set at path " + std::to_string(subID.first) + ": " + ss.str());
newSub = std::make_tuple(subID.first, subID.second, data);
buffer.push(newSub);
c.notify_one();
Expand Down
11 changes: 11 additions & 0 deletions src/VSSRequestValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ VSSRequestValidator::VSSRequestValidator(std::shared_ptr<ILogger> loggerUtil) :

this->getValidator = std::make_unique<MessageValidator>( VSS_JSON::SCHEMA_GET);
this->setValidator = std::make_unique<MessageValidator>( VSS_JSON::SCHEMA_SET);
this->subscribeValidator = std::make_unique<MessageValidator>( VSS_JSON::SCHEMA_SUBSCRIBE);
this->unsubscribeValidator = std::make_unique<MessageValidator>( VSS_JSON::SCHEMA_UNSUBSCRIBE);

this->updateMetadataValidator = std::make_unique<MessageValidator>( VSS_JSON::SCHEMA_UPDATE_METADATA);
this->updateVSSTreeValidator = std::make_unique<MessageValidator>( VSS_JSON::SCHEMA_UPDATE_VSS_TREE);
}
Expand All @@ -80,6 +83,14 @@ void VSSRequestValidator::validateSet(jsoncons::json& request) {
setValidator->validate(request);
}

void VSSRequestValidator::validateSubscribe(jsoncons::json& request) {
subscribeValidator->validate(request);
}

void VSSRequestValidator::validateUnsubscribe(jsoncons::json& request) {
unsubscribeValidator->validate(request);
}

void VSSRequestValidator::validateUpdateMetadata(jsoncons::json& request) {
updateMetadataValidator->validate(request);
}
Expand Down
125 changes: 12 additions & 113 deletions src/VssCommandProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,94 +56,6 @@ VssCommandProcessor::~VssCommandProcessor() {
}



string VssCommandProcessor::processSubscribe(kuksa::kuksaChannel &channel,
const string & request_id,
const string & path) {
logger->Log(LogLevel::VERBOSE, string("VssCommandProcessor::processSubscribe: Client wants to subscribe ")+path);

uint32_t subId = -1;
try {
subId = subHandler->subscribe(channel, database, path);
} catch (noPathFoundonTree &noPathFound) {
logger->Log(LogLevel::ERROR, string(noPathFound.what()));
return JsonResponses::pathNotFound(request_id, "subscribe", path);
} catch (noPermissionException &nopermission) {
logger->Log(LogLevel::ERROR, string(nopermission.what()));
return JsonResponses::noAccess(request_id, "subscribe", nopermission.what());
} catch (genException &outofboundExp) {
logger->Log(LogLevel::ERROR, string(outofboundExp.what()));
return JsonResponses::valueOutOfBounds(request_id, "subscribe",
outofboundExp.what());
} catch (std::exception &e) {
logger->Log(LogLevel::ERROR, "Unhandled error: " + string(e.what()));
return JsonResponses::malFormedRequest(request_id, "get", string("Unhandled error: ") + e.what());
}

if (subId > 0) {
jsoncons::json answer;
answer["action"] = "subscribe";
answer["requestId"] = request_id;
answer["subscriptionId"] = subId;
answer["ts"] = JsonResponses::getTimeStamp();

std::stringstream ss;
ss << pretty_print(answer);
return ss.str();

} else {
jsoncons::json root;
jsoncons::json error;

root["action"] = "subscribe";
root["requestId"] = request_id;
error["number"] = 400;
error["reason"] = "Bad Request";
error["message"] = "Unknown";

root["error"] = error;
root["ts"] = JsonResponses::getTimeStamp();

std::stringstream ss;

ss << pretty_print(root);
return ss.str();
}
}

string VssCommandProcessor::processUnsubscribe(const string & request_id,
uint32_t subscribeID) {
int res = subHandler->unsubscribe(subscribeID);
if (res == 0) {
jsoncons::json answer;
answer["action"] = "unsubscribe";
answer["requestId"] = request_id;
answer["subscriptionId"] = subscribeID;
answer["ts"] = JsonResponses::getTimeStamp();

std::stringstream ss;
ss << pretty_print(answer);
return ss.str();

} else {
jsoncons::json root;
jsoncons::json error;

root["action"] = "unsubscribe";
root["requestId"] = request_id;
error["number"] = 400;
error["reason"] = "Unknown error";
error["message"] = "Error while unsubscribing";

root["error"] = error;
root["ts"] = JsonResponses::getTimeStamp();

std::stringstream ss;
ss << pretty_print(root);
return ss.str();
}
}

string VssCommandProcessor::processUpdateVSSTree(kuksa::kuksaChannel& channel, jsoncons::json &request){
logger->Log(LogLevel::VERBOSE, "VssCommandProcessor::processUpdateVSSTree");

Expand Down Expand Up @@ -172,7 +84,7 @@ string VssCommandProcessor::processUpdateVSSTree(kuksa::kuksaChannel& channel, j
logger->Log(LogLevel::ERROR, string(e.what()));
jsoncons::json error;

error["number"] = 401;
error["number"] = "401";
error["reason"] = "Unknown error";
error["message"] = e.what();

Expand Down Expand Up @@ -217,7 +129,7 @@ string VssCommandProcessor::processGetMetaData(jsoncons::json &request) {
result["ts"] = JsonResponses::getTimeStamp();
if (0 == st.size()){
jsoncons::json error;
error["number"] = 404;
error["number"] = "404";
error["reason"] = "Path not found";
error["message"] = "In database no metadata found for path " + path.getVSSPath();
result["error"] = error;
Expand Down Expand Up @@ -263,7 +175,7 @@ string VssCommandProcessor::processUpdateMetaData(kuksa::kuksaChannel& channel,
logger->Log(LogLevel::ERROR, string(e.what()));
jsoncons::json error;

error["number"] = 401;
error["number"] = "401";
error["reason"] = "Unknown error";
error["message"] = e.what();

Expand Down Expand Up @@ -294,7 +206,7 @@ string VssCommandProcessor::processAuthorize(kuksa::kuksaChannel &channel,
jsoncons::json error;
result["action"] = "authorize";
result["requestId"] = request_id;
error["number"] = 401;
error["number"] = "401";
error["reason"] = "Invalid Token";
error["message"] = "Check the JWT token passed";

Expand Down Expand Up @@ -347,28 +259,17 @@ string VssCommandProcessor::processQuery(const string &req_json,
+ token + " with request id " + request_id);

response = processAuthorize(channel, request_id, token);
} else if (action == "unsubscribe") {
//string request_id = root["requestId"].as<int>();
string request_id = root["requestId"].as<string>();
uint32_t subscribeID = root["subscriptionId"].as<int>();
logger->Log(LogLevel::VERBOSE, "VssCommandProcessor::processQuery: unsubscribe query for sub ID = "
+ to_string(subscribeID) + " with request id " + request_id);

response = processUnsubscribe(request_id, subscribeID);
}
else if (action == "unsubscribe") {
response = processUnsubscribe(channel, root);
}
else if (action == "subscribe") {
response = processSubscribe(channel, root);
} else if (action == "updateVSSTree") {
response = processUpdateVSSTree(channel,root);
} else {
string path = root["path"].as<string>();
string request_id = root["requestId"].as<string>();
if (action == "subscribe") {
logger->Log(LogLevel::VERBOSE, "VssCommandProcessor::processQuery: subscribe query for "
+ path + " with request id " + request_id);
response =
processSubscribe(channel, request_id, path);
} else {
logger->Log(LogLevel::WARNING, "VssCommandProcessor::processQuery: Unknown action " + action);
return JsonResponses::malFormedRequest("Unknown action requested");
}
logger->Log(LogLevel::WARNING, "VssCommandProcessor::processQuery: Unknown action " + action);
return JsonResponses::malFormedRequest("Unknown action requested", root.get_value_or<std::string>("requestId", "UNKNOWN"));
}
} catch (jsoncons::ser_error &e) {
logger->Log(LogLevel::WARNING, "JSON parse error");
Expand All @@ -380,8 +281,6 @@ string VssCommandProcessor::processQuery(const string &req_json,
logger->Log(LogLevel::WARNING, "JSON not an object error");
return JsonResponses::malFormedRequest(e.what());
}


return response;
}

Expand Down
2 changes: 1 addition & 1 deletion src/VssCommandSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ std::string VssCommandProcessor::processSet2(kuksa::kuksaChannel &channel,
root["action"] = "set";
root.insert_or_assign("requestId", request["requestId"]);

error["number"] = 401;
error["number"] = "401";
error["reason"] = "Unknown error";
error["message"] = e.what();

Expand Down
Loading