@@ -42,17 +42,6 @@ event_service:init_server()
42
42
int
43
43
event_service:echo_send(const string s)
44
44
{
45
- zmq_msg_t msg_req, msg_data;
46
- int rc = zmq_msg_init_size (&snd_msg, s.size ()+1 );
47
- RET_ON_ERR (rc == 0 , " Failed to init zmq msg for %d bytes" , s.size ()+1 );
48
- memcpy ((void *)zmq_msg_data (&snd_msg), s.str (), s.size ()+1 );
49
- rc = zmq_msg_send (&snd_msg, m_socket, 0 );
50
- RET_ON_ERR (rc != -1 , " Failed to send to %s" , get_config (REQ_REP_END_KEY));
51
- out:
52
- if (rc != 0 ) {
53
- close ();
54
- }
55
- return rc;
56
45
}
57
46
58
47
@@ -101,18 +90,73 @@ event_service::cache_read(vector<zmq_msg_t> &lst)
101
90
102
91
103
92
int
104
- event_service::server_read ( event_req_type_t &req_code, event_service_data_t &data)
93
+ event_service::channel_read ( int &code, vector<string> &data)
105
94
{
106
- // TODO
107
- reurn 0 ;
95
+ int more = 0 , rc;
96
+ size_t more_size = sizeof (more);
97
+
98
+ {
99
+ zmq_msg_t rcv_code;
100
+
101
+ zmq_msg_init (&rcv_code);
102
+ rc = zmq_msg_recv (&rcv_code, m_req_socket, 0 );
103
+
104
+ RET_ON_ERR (rc != -1 , " Failed to receive code" );
105
+
106
+ msg_to_int (rcv_code, code);
107
+ zmq_msg_close (&rcv_code);
108
+ }
109
+
110
+ rc = zmq_getsockopt (m_socket, ZMQ_RCVMORE, &more, &more_size);
111
+ RET_ON_ERR (rc == 0 , " Failed to get sockopt for read channel" );
112
+
113
+
114
+ if (more) {
115
+ zmq_msg_t rcv_data;
116
+
117
+ zmq_msg_init (&rcv_data);
118
+ rc = zmq_msg_recv (&rcv_data, m_req_socket, 0 );
119
+ RET_ON_ERR (rc != -1 , " Failed to receive data" );
120
+
121
+ rc = zmq_getsockopt (m_socket, ZMQ_RCVMORE, &more, &more_size);
122
+ RET_ON_ERR (rc == 0 , " Failed to get sockopt for read channel" );
123
+ RET_ON_ERR (!more, " Expecting more than 2 parts" );
124
+
125
+ msg_to_vec (rcv_data, data);
126
+ zmq_msg_close (&rcv_data);
127
+ }
128
+ out:
129
+ reurn rc;
108
130
}
109
131
110
132
111
133
int
112
- event_service::server_write (int resp_code, event_service_data_t &data)
134
+ event_service::channel_write (int code, vector<string> &data)
113
135
{
114
- // TODO
115
- reurn 0 ;
136
+ zmq_msg_t msg_req, msg_data;
137
+ int flag = 0 ;
138
+
139
+ int rc = int_to_msg (code,msg_req);
140
+ RET_ON_ERR (rc == 0 , " Failed int (%d) to msg" , code);
141
+
142
+ if (!data.empty ()) {
143
+ rc = vec_to_msg (code, msg_data);
144
+ RET_ON_ERR (rc == 0 , " Failed vec (%d) to msg" , data.size ());
145
+ flag = ZMQ_SNDMORE;
146
+ }
147
+
148
+ rc = zmq_msg_send (&msg_req, m_socket, flag);
149
+ RET_ON_ERR (rc == 0 , " Failed to send code" );
150
+
151
+ if (flag != 0 ) {
152
+ rc = zmq_msg_send (&msg_data, m_socket, 0 );
153
+ RET_ON_ERR (rc == 0 , " Failed to send data" );
154
+ }
155
+
156
+ out:
157
+ zmq_msg_close (&msg_req);
158
+ zmq_msg_close (&msg_data);
159
+ return rc;
116
160
}
117
161
118
162
int
0 commit comments