Skip to content

Commit f857737

Browse files
committed
Support passing streams between threads
1 parent e30f405 commit f857737

File tree

6 files changed

+251
-103
lines changed

6 files changed

+251
-103
lines changed

ext-src/php_swoole_thread.h

Lines changed: 4 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ zval *php_swoole_thread_get_arguments();
3939
#define EMSG_NO_RESOURCE "resource not found"
4040
#define ECODE_NO_RESOURCE -2
4141

42+
#define IS_STREAM_SOCKET 98
4243
#define IS_SERIALIZED_OBJECT 99
4344

4445
struct ThreadResource {
@@ -74,71 +75,9 @@ struct ArrayItem {
7475
store(zvalue);
7576
}
7677

77-
void store(zval *zvalue) {
78-
type = Z_TYPE_P(zvalue);
79-
switch (type) {
80-
case IS_LONG:
81-
value.lval = zval_get_long(zvalue);
82-
break;
83-
case IS_DOUBLE:
84-
value.dval = zval_get_double(zvalue);
85-
break;
86-
case IS_STRING: {
87-
value.str = zend_string_init(Z_STRVAL_P(zvalue), Z_STRLEN_P(zvalue), 1);
88-
break;
89-
case IS_TRUE:
90-
case IS_FALSE:
91-
case IS_NULL:
92-
break;
93-
}
94-
default: {
95-
auto _serialized_object = php_swoole_thread_serialize(zvalue);
96-
if (!_serialized_object) {
97-
type = IS_UNDEF;
98-
break;
99-
} else {
100-
type = IS_SERIALIZED_OBJECT;
101-
value.serialized_object = _serialized_object;
102-
}
103-
break;
104-
}
105-
}
106-
}
107-
108-
void fetch(zval *return_value) {
109-
switch (type) {
110-
case IS_LONG:
111-
RETVAL_LONG(value.lval);
112-
break;
113-
case IS_DOUBLE:
114-
RETVAL_LONG(value.dval);
115-
break;
116-
case IS_TRUE:
117-
RETVAL_TRUE;
118-
break;
119-
case IS_FALSE:
120-
RETVAL_FALSE;
121-
break;
122-
case IS_STRING:
123-
RETVAL_NEW_STR(zend_string_init(ZSTR_VAL(value.str), ZSTR_LEN(value.str), 0));
124-
break;
125-
case IS_SERIALIZED_OBJECT:
126-
php_swoole_thread_unserialize(value.serialized_object, return_value);
127-
break;
128-
default:
129-
break;
130-
}
131-
}
132-
133-
void release() {
134-
if (type == IS_STRING) {
135-
zend_string_release(value.str);
136-
value.str = nullptr;
137-
} else if (type == IS_SERIALIZED_OBJECT) {
138-
zend_string_release(value.serialized_object);
139-
value.serialized_object = nullptr;
140-
}
141-
}
78+
void store(zval *zvalue);
79+
void fetch(zval *return_value);
80+
void release();
14281

14382
~ArrayItem() {
14483
if (value.str) {

ext-src/stubs/php_swoole_thread.stub.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
namespace Swoole {
33
class Thread {
44
public int $id;
5-
private function __construct() {}
5+
public function __construct(string $script_file, mixed ...$args) {}
66

77
public function join(): bool {}
88
public function joinable(): bool {}

ext-src/stubs/php_swoole_thread_arginfo.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
/* This is a generated file, edit the .stub.php file instead.
2-
* Stub hash: 57a2a703c0e0a37729ab2e2df280fbb24e78404f */
2+
* Stub hash: 54c9d0c2a88bb65cab67d14129aa56806c31bb00 */
33

4-
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Swoole_Thread___construct, 0, 0, 0)
4+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Swoole_Thread___construct, 0, 0, 1)
5+
ZEND_ARG_TYPE_INFO(0, script_file, IS_STRING, 0)
6+
ZEND_ARG_VARIADIC_TYPE_INFO(0, args, IS_MIXED, 0)
57
ZEND_END_ARG_INFO()
68

79
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Swoole_Thread_join, 0, 0, _IS_BOOL, 0)

ext-src/swoole_thread.cc

Lines changed: 133 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ struct ThreadObject {
5050
};
5151

5252
static void php_swoole_thread_join(zend_object *object);
53+
static void php_swoole_thread_create(INTERNAL_FUNCTION_PARAMETERS, zval *zobject);
5354

5455
static thread_local zval thread_argv;
5556
static zend_long thread_resource_id = 0;
@@ -147,7 +148,9 @@ void php_swoole_thread_minit(int module_number) {
147148
swoole_thread_ce, ZEND_STRL("HARDWARE_CONCURRENCY"), std::thread::hardware_concurrency());
148149
}
149150

150-
static PHP_METHOD(swoole_thread, __construct) {}
151+
static PHP_METHOD(swoole_thread, __construct) {
152+
php_swoole_thread_create(INTERNAL_FUNCTION_PARAM_PASSTHRU, ZEND_THIS);
153+
}
151154

152155
static PHP_METHOD(swoole_thread, join) {
153156
ThreadObject *to = php_swoole_thread_fetch_object(Z_OBJ_P(ZEND_THIS));
@@ -252,6 +255,42 @@ void php_swoole_thread_rshutdown() {
252255
}
253256
}
254257

258+
static void php_swoole_thread_create(INTERNAL_FUNCTION_PARAMETERS, zval *zobject) {
259+
char *script_file;
260+
size_t l_script_file;
261+
zval *args;
262+
int argc;
263+
264+
ZEND_PARSE_PARAMETERS_START(1, -1)
265+
Z_PARAM_STRING(script_file, l_script_file)
266+
Z_PARAM_VARIADIC('+', args, argc)
267+
ZEND_PARSE_PARAMETERS_END();
268+
269+
if (l_script_file < 1) {
270+
zend_throw_exception(swoole_exception_ce, "exec file name is empty", SW_ERROR_INVALID_PARAMS);
271+
return;
272+
}
273+
274+
ThreadObject *to = php_swoole_thread_fetch_object(Z_OBJ_P(zobject));
275+
zend_string *file = zend_string_init(script_file, l_script_file, 1);
276+
277+
zval zargv;
278+
array_init(&zargv);
279+
for (int i = 0; i < argc; i++) {
280+
zend::array_add(&zargv, &args[i]);
281+
}
282+
zend_string *argv = php_swoole_thread_serialize(&zargv);
283+
zval_dtor(&zargv);
284+
285+
if (!argv) {
286+
zend_string_release(file);
287+
return;
288+
}
289+
290+
to->thread = new std::thread([file, argv]() { php_swoole_thread_start(file, argv); });
291+
zend_update_property_long(swoole_thread_ce, SW_Z8_OBJ_P(zobject), ZEND_STRL("id"), to->thread->native_handle());
292+
}
293+
255294
void php_swoole_thread_start(zend_string *file, zend_string *argv_serialized) {
256295
ts_resource(0);
257296
TSRMLS_CACHE_UPDATE();
@@ -311,41 +350,8 @@ void php_swoole_thread_start(zend_string *file, zend_string *argv_serialized) {
311350
}
312351

313352
static PHP_METHOD(swoole_thread, exec) {
314-
char *script_file;
315-
size_t l_script_file;
316-
zval *args;
317-
int argc;
318-
319-
ZEND_PARSE_PARAMETERS_START(1, -1)
320-
Z_PARAM_STRING(script_file, l_script_file)
321-
Z_PARAM_VARIADIC('+', args, argc)
322-
ZEND_PARSE_PARAMETERS_END();
323-
324-
if (l_script_file < 1) {
325-
php_swoole_fatal_error(E_WARNING, "exec file name is empty");
326-
RETURN_FALSE;
327-
}
328-
329353
object_init_ex(return_value, swoole_thread_ce);
330-
ThreadObject *to = php_swoole_thread_fetch_object(Z_OBJ_P(return_value));
331-
zend_string *file = zend_string_init(script_file, l_script_file, 1);
332-
333-
zval zargv;
334-
array_init(&zargv);
335-
for (int i = 0; i < argc; i++) {
336-
zend::array_add(&zargv, &args[i]);
337-
}
338-
zend_string *argv = php_swoole_thread_serialize(&zargv);
339-
zval_dtor(&zargv);
340-
341-
if (!argv) {
342-
zend_string_release(file);
343-
return;
344-
}
345-
346-
to->thread = new std::thread([file, argv]() { php_swoole_thread_start(file, argv); });
347-
zend_update_property_long(
348-
swoole_thread_ce, SW_Z8_OBJ_P(return_value), ZEND_STRL("id"), to->thread->native_handle());
354+
php_swoole_thread_create(INTERNAL_FUNCTION_PARAM_PASSTHRU, return_value);
349355
}
350356

351357
static PHP_METHOD(swoole_thread, getTsrmInfo) {
@@ -355,4 +361,96 @@ static PHP_METHOD(swoole_thread, getTsrmInfo) {
355361
add_assoc_string(return_value, "api_name", tsrm_api_name());
356362
}
357363

364+
void ArrayItem::store(zval *zvalue) {
365+
type = Z_TYPE_P(zvalue);
366+
switch (type) {
367+
case IS_LONG:
368+
value.lval = zval_get_long(zvalue);
369+
break;
370+
case IS_DOUBLE:
371+
value.dval = zval_get_double(zvalue);
372+
break;
373+
case IS_STRING: {
374+
value.str = zend_string_init(Z_STRVAL_P(zvalue), Z_STRLEN_P(zvalue), 1);
375+
break;
376+
}
377+
case IS_TRUE:
378+
case IS_FALSE:
379+
case IS_NULL:
380+
break;
381+
case IS_RESOURCE: {
382+
php_stream *stream;
383+
int sock_fd;
384+
int cast_flags = PHP_STREAM_AS_FD_FOR_SELECT | PHP_STREAM_CAST_INTERNAL;
385+
if ((php_stream_from_zval_no_verify(stream, zvalue))) {
386+
if (php_stream_cast(stream, cast_flags, (void **) &sock_fd, 1) == SUCCESS && sock_fd >= 0) {
387+
value.lval = dup(sock_fd);
388+
if (value.lval != -1) {
389+
type = IS_STREAM_SOCKET;
390+
break;
391+
}
392+
}
393+
}
394+
}
395+
/* no break */
396+
default: {
397+
auto _serialized_object = php_swoole_thread_serialize(zvalue);
398+
if (!_serialized_object) {
399+
type = IS_UNDEF;
400+
break;
401+
} else {
402+
type = IS_SERIALIZED_OBJECT;
403+
value.serialized_object = _serialized_object;
404+
}
405+
break;
406+
}
407+
}
408+
}
409+
410+
void ArrayItem::fetch(zval *return_value) {
411+
switch (type) {
412+
case IS_LONG:
413+
RETVAL_LONG(value.lval);
414+
break;
415+
case IS_DOUBLE:
416+
RETVAL_LONG(value.dval);
417+
break;
418+
case IS_TRUE:
419+
RETVAL_TRUE;
420+
break;
421+
case IS_FALSE:
422+
RETVAL_FALSE;
423+
break;
424+
case IS_STRING:
425+
RETVAL_NEW_STR(zend_string_init(ZSTR_VAL(value.str), ZSTR_LEN(value.str), 0));
426+
break;
427+
case IS_STREAM_SOCKET: {
428+
std::string path = "php://fd/" + std::to_string(value.lval);
429+
php_stream *stream = php_stream_open_wrapper_ex(path.c_str(), "", 0, NULL, NULL);
430+
if (stream) {
431+
php_stream_to_zval(stream, return_value);
432+
}
433+
break;
434+
}
435+
case IS_SERIALIZED_OBJECT:
436+
php_swoole_thread_unserialize(value.serialized_object, return_value);
437+
break;
438+
default:
439+
break;
440+
}
441+
}
442+
443+
void ArrayItem::release() {
444+
if (type == IS_STRING) {
445+
zend_string_release(value.str);
446+
value.str = nullptr;
447+
} else if (type == IS_STREAM_SOCKET) {
448+
::close(value.lval);
449+
value.lval = -1;
450+
} else if (type == IS_SERIALIZED_OBJECT) {
451+
zend_string_release(value.serialized_object);
452+
value.serialized_object = nullptr;
453+
}
454+
}
455+
358456
#endif

tests/swoole_thread/co-stream.phpt

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
--TEST--
2+
swoole_thread: co stream
3+
--SKIPIF--
4+
<?php
5+
require __DIR__ . '/../include/skipif.inc';
6+
skip_if_nts();
7+
?>
8+
--FILE--
9+
<?php
10+
require __DIR__ . '/../include/bootstrap.php';
11+
12+
use Swoole\Runtime;
13+
use Swoole\Thread;
14+
use Swoole\Thread\Queue;
15+
16+
$tm = new \SwooleTest\ThreadManager();
17+
$tm->initFreePorts();
18+
19+
$tm->parentFunc = function () use ($tm) {
20+
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
21+
Co\run(function () use ($tm) {
22+
$queue = new Queue();
23+
$fp = stream_socket_server('tcp://127.0.0.1:' . $tm->getFreePort(), $errno, $errstr);
24+
$queue->push($fp);
25+
$thread = new Thread(__FILE__, $queue);
26+
var_dump('main thread');
27+
$thread->join();
28+
});
29+
};
30+
31+
$tm->childFunc = function ($queue) use ($tm) {
32+
var_dump('child thread');
33+
$fp = $queue->pop();
34+
Co\run(function () use ($fp, $tm) {
35+
var_dump('child thread, co 0');
36+
Co\go(function () use ($tm) {
37+
var_dump('child thread, co 1');
38+
$client = stream_socket_client('tcp://127.0.0.1:' . $tm->getFreePort(), $errno, $errstr);
39+
Assert::notEmpty($client);
40+
$data = fread($client, 8192);
41+
Assert::eq($data, "hello world\n");
42+
fclose($client);
43+
});
44+
$conn = stream_socket_accept($fp, -1);
45+
fwrite($conn, "hello world\n");
46+
fclose($conn);
47+
fclose($fp);
48+
});
49+
};
50+
51+
$tm->run();
52+
?>
53+
--EXPECT--
54+
string(11) "main thread"
55+
string(12) "child thread"
56+
string(18) "child thread, co 0"
57+
string(18) "child thread, co 1"

0 commit comments

Comments
 (0)