Skip to content

Optimize thread co socket #5370

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/thread/aio.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
$threads = [];
$atomic = new Swoole\Thread\Atomic();
for ($i = 0; $i < $c; $i++) {
$threads[] = Thread::exec(__FILE__, $i, $atomic);
$threads[] = new Thread(__FILE__, $i, $atomic);
}
for ($i = 0; $i < $c; $i++) {
$threads[$i]->join();
Expand Down
7 changes: 5 additions & 2 deletions examples/thread/argv.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@

if (empty($args)) {
var_dump($GLOBALS['argv']);
$thread = Thread::exec(__FILE__, 'thread-1', $argc, $argv);
$thread->join();
$n = 2;
while ($n--) {
$thread = new Thread(__FILE__, 'thread-' . $n, $argc, $argv);
$thread->join();
}
} else {
var_dump($args[0], $args[1], $args[2]);
sleep(1);
Expand Down
2 changes: 1 addition & 1 deletion examples/thread/atomic.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
$a1 = new Atomic;
$a2 = new Long;
for ($i = 0; $i < $c; $i++) {
$threads[] = Thread::exec(__FILE__, $i, $a1, $a2);
$threads[] = new Thread(__FILE__, $i, $a1, $a2);
}
for ($i = 0; $i < $c; $i++) {
$threads[$i]->join();
Expand Down
4 changes: 2 additions & 2 deletions examples/thread/co.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
$list[1] = uniqid();
var_dump(count($list));

$t1 = Swoole\Thread::exec('mt.php', 'thread-1', PHP_OS, $map, $list);
$t2 = Swoole\Thread::exec('mt.php', 'thread-2', PHP_OS, $map, $list);
$t1 = new Swoole\Thread('mt.php', 'thread-1', PHP_OS, $map, $list);
$t2 = new Swoole\Thread('mt.php', 'thread-2', PHP_OS, $map, $list);

//var_dump($t1->id);
//var_dump($t2->id);
Expand Down
2 changes: 1 addition & 1 deletion examples/thread/lock.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
if (empty($args)) {
$lock = new Lock;
$lock->lock();
$thread = Thread::exec(__FILE__, $lock);
$thread = new Thread(__FILE__, $lock);
$lock->lock();
echo "main thread\n";
$thread->join();
Expand Down
2 changes: 1 addition & 1 deletion examples/thread/mt.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
var_dump(count($list));

//if ($args[0] == 'thread-2') {
// $t3 = Swoole\Thread::exec('mt.php', 'thread-3', PHP_OS);
// $t3 = new Swoole\Thread('mt.php', 'thread-3', PHP_OS);
// $t3->join();
//}

Expand Down
2 changes: 1 addition & 1 deletion examples/thread/pipe.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
if (empty($args)) {
Co\run(function () {
$sockets = swoole_coroutine_socketpair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
$thread = Thread::exec(__FILE__, $sockets);
$thread = new Thread(__FILE__, $sockets);
echo $sockets[0]->recv(8192), PHP_EOL;
$thread->join();
});
Expand Down
2 changes: 1 addition & 1 deletion examples/thread/run_test.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
$threads = [];

for ($i = 0; $i < $c; $i++) {
$threads[] = Swoole\Thread::exec('benchmark.php', 'thread-' . ($i + 1), $map);
$threads[] = new Swoole\Thread('benchmark.php', 'thread-' . ($i + 1), $map);
}

for ($i = 0; $i < $c; $i++) {
Expand Down
2 changes: 1 addition & 1 deletion examples/thread/server.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
$threads = [];
$queue = new Queue;
for ($i = 0; $i < $c; $i++) {
$threads[] = Thread::exec(__FILE__, $i, $queue);
$threads[] = new Thread(__FILE__, $i, $queue);
}
for ($i = 0; $i < $c; $i++) {
$threads[$i]->join();
Expand Down
2 changes: 1 addition & 1 deletion examples/thread/signal.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
Co\run(function () {
echo "main thread\n";
$sockets = swoole_coroutine_socketpair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
$thread = Thread::exec(__FILE__, $sockets);
$thread = new Thread(__FILE__, $sockets);
$parent_pipe = $sockets[1];
// 收到信号之后向子线程发送指令让子线程退出
if (System::waitSignal(SIGTERM)) {
Expand Down
2 changes: 1 addition & 1 deletion examples/thread/thread_pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
$threads = [];
$queue = new Queue;
for ($i = 0; $i < $c; $i++) {
$threads[] = Thread::exec(__FILE__, $i, $queue);
$threads[] = new Thread(__FILE__, $i, $queue);
}
while ($n--) {
$queue->push(base64_encode(random_bytes(16)), Queue::NOTIFY_ONE);
Expand Down
6 changes: 4 additions & 2 deletions ext-src/php_swoole.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ zend_class_entry *swoole_error_ce;
zend_object_handlers swoole_error_handlers;

#ifdef COMPILE_DL_SWOOLE
#ifdef ZTS
ZEND_TSRMLS_CACHE_DEFINE()
#endif
ZEND_GET_MODULE(swoole)
#endif

Expand Down Expand Up @@ -1089,8 +1092,7 @@ PHP_RSHUTDOWN_FUNCTION(swoole) {
if (!zstream) {
return;
}
stream =
(php_stream *) zend_fetch_resource2_ex((zstream), NULL, php_file_le_stream(), php_file_le_pstream());
stream = (php_stream *) zend_fetch_resource2_ex((zstream), NULL, php_file_le_stream(), php_file_le_pstream());
if (!stream) {
return;
}
Expand Down
1 change: 1 addition & 0 deletions ext-src/php_swoole_cxx.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ extern zend_string **sw_zend_known_strings;
SW_API bool php_swoole_is_enable_coroutine();
SW_API zend_object *php_swoole_create_socket(enum swSocketType type);
SW_API zend_object *php_swoole_create_socket_from_fd(int fd, enum swSocketType type);
SW_API zend_object *php_swoole_create_socket_from_fd(int fd, int _domain, int _type, int _protocol);
SW_API bool php_swoole_export_socket(zval *zobject, swoole::coroutine::Socket *_socket);
SW_API zend_object *php_swoole_dup_socket(int fd, enum swSocketType type);
SW_API void php_swoole_init_socket_object(zval *zobject, swoole::coroutine::Socket *socket);
Expand Down
18 changes: 14 additions & 4 deletions ext-src/php_swoole_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,23 @@ bool php_swoole_thread_resource_free(ThreadResourceId resource_id, ThreadResourc
ThreadResource *php_swoole_thread_resource_fetch(ThreadResourceId resource_id);

void php_swoole_thread_start(zend_string *file, zend_string *argv);
zend_string *php_swoole_thread_serialize(zval *zdata);
bool php_swoole_thread_unserialize(zend_string *data, zval *zv);
zend_string *php_swoole_thread_argv_serialize(zval *zdata);
bool php_swoole_thread_argv_unserialize(zend_string *data, zval *zv);
zend_string *php_swoole_serialize(zval *zdata);
bool php_swoole_unserialize(zend_string *data, zval *zv);
void php_swoole_thread_argv_clean(zval *zdata);
void php_swoole_thread_bailout(void);

zval *php_swoole_thread_get_arguments();

#define EMSG_NO_RESOURCE "resource not found"
#define ECODE_NO_RESOURCE -2

#define IS_STREAM_SOCKET 98
#define IS_SERIALIZED_OBJECT 99
enum {
IS_CO_SOCKET = 97,
IS_STREAM_SOCKET = 98,
IS_SERIALIZED_OBJECT = 99,
};

struct ThreadResource {
uint32_t ref_count;
Expand All @@ -66,6 +72,10 @@ struct ArrayItem {
zend_string *str;
zend_long lval;
double dval;
struct {
int fd;
swSocketType type;
} socket;
zend_string *serialized_object;
} value;

Expand Down
3 changes: 0 additions & 3 deletions ext-src/stubs/php_swoole_socket_coro.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,5 @@ public function getpeername(): false|array {}
public function isClosed(): bool {}
/** @param resource $stream */
public static function import($stream) : Socket | false {}
#ifdef SW_THREAD
public function __wakeup(): void {}
#endif
}
}
1 change: 0 additions & 1 deletion ext-src/stubs/php_swoole_thread.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ public function join(): bool {}
public function joinable(): bool {}
public function detach(): bool {}

public static function exec(string $script_file, mixed ...$args): Thread {}
public static function getArguments(): array {}
public static function getId(): int {}
public static function getTsrmInfo(): array {}
Expand Down
2 changes: 1 addition & 1 deletion ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2651,7 +2651,7 @@ static PHP_METHOD(swoole_server, start) {

if (!ZVAL_IS_NULL(&server_object->init_arguments)) {
call_user_function(NULL, NULL, &server_object->init_arguments, &thread_argv, 0, NULL);
thread_argv_serialized = php_swoole_thread_serialize(&thread_argv);
thread_argv_serialized = php_swoole_thread_argv_serialize(&thread_argv);
}

serv->worker_thread_start = [bootstrap, thread_argv_serialized](const WorkerFn &fn) {
Expand Down
48 changes: 10 additions & 38 deletions ext-src/swoole_socket_coro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ static PHP_METHOD(swoole_socket_coro, getsockname);
static PHP_METHOD(swoole_socket_coro, getpeername);
static PHP_METHOD(swoole_socket_coro, isClosed);
static PHP_METHOD(swoole_socket_coro, import);
#ifdef SW_THREAD
static PHP_METHOD(swoole_socket_coro, __wakeup);
#endif
SW_EXTERN_C_END

// clang-format off
Expand Down Expand Up @@ -132,9 +129,6 @@ static const zend_function_entry swoole_socket_coro_methods[] =
PHP_ME(swoole_socket_coro, getsockname, arginfo_class_Swoole_Coroutine_Socket_getsockname, ZEND_ACC_PUBLIC)
PHP_ME(swoole_socket_coro, isClosed, arginfo_class_Swoole_Coroutine_Socket_isClosed, ZEND_ACC_PUBLIC)
PHP_ME(swoole_socket_coro, import, arginfo_class_Swoole_Coroutine_Socket_import, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
#ifdef SW_THREAD
PHP_ME(swoole_socket_coro, __wakeup, arginfo_class_Swoole_Coroutine_Socket___wakeup, ZEND_ACC_PUBLIC)
#endif
PHP_FE_END
};
// clang-format on
Expand Down Expand Up @@ -720,9 +714,7 @@ static void socket_coro_register_constants(int module_number) {

void php_swoole_socket_coro_minit(int module_number) {
SW_INIT_CLASS_ENTRY(swoole_socket_coro, "Swoole\\Coroutine\\Socket", "Co\\Socket", swoole_socket_coro_methods);
#ifndef SW_THREAD
SW_SET_CLASS_NOT_SERIALIZABLE(swoole_socket_coro);
#endif
SW_SET_CLASS_CLONEABLE(swoole_socket_coro, sw_zend_class_clone_deny);
SW_SET_CLASS_UNSET_PROPERTY_HANDLER(swoole_socket_coro, sw_zend_class_unset_property_deny);
SW_SET_CLASS_CUSTOM_OBJECT(
Expand Down Expand Up @@ -829,12 +821,12 @@ SW_API void php_swoole_socket_set_error_properties(zval *zobject, Socket *socket
php_swoole_socket_set_error_properties(zobject, socket->errCode, socket->errMsg);
}

SW_API zend_object *php_swoole_create_socket_from_fd(int fd, enum swSocketType type) {
static zend_object *create_socket_object(Socket *socket) {
zval zobject;
zend_object *object = socket_coro_create_object(swoole_socket_coro_ce);
SocketObject *sock = (SocketObject *) socket_coro_fetch_object(object);

sock->socket = new Socket(fd, type);
sock->socket = socket;
if (UNEXPECTED(sock->socket->get_fd() < 0)) {
php_swoole_sys_error(E_WARNING, "new Socket() failed");
delete sock->socket;
Expand All @@ -848,6 +840,14 @@ SW_API zend_object *php_swoole_create_socket_from_fd(int fd, enum swSocketType t
return object;
}

SW_API zend_object *php_swoole_create_socket_from_fd(int fd, enum swSocketType type) {
return create_socket_object(new Socket(fd, type));
}

SW_API zend_object *php_swoole_create_socket_from_fd(int fd, int _domain, int _type, int _protocol) {
return create_socket_object(new Socket(fd, _domain, _type, _protocol));
}

SW_API Socket *php_swoole_get_socket(zval *zobject) {
SW_ASSERT(Z_OBJCE_P(zobject) == swoole_socket_coro_ce);
SocketObject *sock = (SocketObject *) socket_coro_fetch_object(Z_OBJ_P(zobject));
Expand Down Expand Up @@ -2217,31 +2217,3 @@ static PHP_METHOD(swoole_socket_coro, import) {

RETURN_OBJ(object);
}

#ifdef SW_THREAD
static PHP_METHOD(swoole_socket_coro, __wakeup) {
zend_long sockfd = zend::object_get_long(ZEND_THIS, ZEND_STRL("fd"));
if (sockfd < 0) {
zend_throw_exception(swoole_exception_ce, EMSG_NO_RESOURCE, ECODE_NO_RESOURCE);
return;
}

zend_long new_sockfd = dup(sockfd);
if (sockfd < 0) {
zend_throw_exception(swoole_exception_ce, EMSG_NO_RESOURCE, ECODE_NO_RESOURCE);
return;
}

SocketObject *sock = (SocketObject *) socket_coro_fetch_object(Z_OBJ_P(ZEND_THIS));

zend_long domain = zend::object_get_long(ZEND_THIS, ZEND_STRL("domain"));
zend_long type = zend::object_get_long(ZEND_THIS, ZEND_STRL("type"));
zend_long protocol = zend::object_get_long(ZEND_THIS, ZEND_STRL("protocol"));

php_swoole_check_reactor();
sock->socket = new Socket((int) new_sockfd, (int) domain, (int) type, (int) protocol);
sock->socket->set_zero_copy(true);
sock->socket->set_buffer_allocator(sw_zend_string_allocator());
zend_update_property_long(swoole_socket_coro_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("fd"), sock->socket->get_fd());
}
#endif
Loading
Loading