Skip to content

Commit 77bb6d5

Browse files
committed
feat(async): support async invocation
- kakfa plugins
1 parent b8f7def commit 77bb6d5

File tree

10 files changed

+439
-29
lines changed

10 files changed

+439
-29
lines changed

Changes.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,19 @@
11
## Changes
22

3+
### - v4.0.1 async invocation API
4+
5+
- rdkafka plugins
6+
7+
### v4.0.0
8+
> https://github.com/pinpoint-apm/pinpoint-php-aop/releases/tag/v4.0.0
9+
* support nikic/php-parser v5.1.0
10+
11+
### - v3.0.8 async invocation API
12+
13+
> https://github.com/pinpoint-apm/pinpoint-php-aop/releases/tag/v3.0.8
14+
15+
* fix a bug in https://github.com/pinpoint-apm/pinpoint-c-agent/issues/672
16+
317
### v0.3.3
418

519
* fix dsn on pdo8

lib/Pinpoint/Plugins/Common/defines.php

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
define("PP_HTTP_IO", 49);
3737
define("PP_MESSAGE_QUEUE_URI ", 100);
3838
define("PP_KAFKA_TOPIC", 140);
39+
define("PP_KAFKA_PARTITION", 141);
3940
define("PP_PHP_METHOD", "1501");
4041
define("PP_PHP", "1500");
4142
define("PP_MYSQL", "2101");
@@ -46,6 +47,7 @@
4647
define("PP_CASSANDRA", "2600");
4748
define("PP_MONGODB", "2651");
4849
define("PP_KAFKA", "8660");
50+
define("PP_KAFKA_STREAMS", "8662");
4951
define("PP_REDIS", "8200");
5052
define("PP_REDIS_REDISSON", "8203");
5153
define("PP_REDIS_REDISSON_INTERNAL", "8204");
@@ -82,7 +84,7 @@
8284
define("PP_PARENT_HOST", "Ah");
8385
define("PP_NGINX_PROXY", "NP");
8486
define("PP_APACHE_PROXY", "AP");
85-
define("PP_TRANSCATION_ID", "tid");
87+
define("PP_TRANSACTION_ID", "tid");
8688
define("PP_SPAN_ID", "sid");
8789
define("PP_NOT_SAMPLED", "s0");
8890
define("PP_SAMPLED", "s1");
@@ -91,3 +93,10 @@
9193
# https://github.com/pinpoint-apm/pinpoint-c-agent/issues/534
9294
define("PP_ROUTE_KEY", '__pinpoint__route');
9395
define("UT", 'UT');
96+
define("PP_ASYNC_CALL_ID", "asyId");
97+
define("PP_KAFKA_HEADER_ASYNC_CALL_ID", "_asyId_");
98+
define("PP_KAFKA_HEADER_SEQUENCE_ID", "_sequence_id_");
99+
define("PP_KAFKA_HEADER_TRANSACTION_ID", "_tid_");
100+
define("PP_KAFKA_HEADER_SPAN_ID", "_sid_");
101+
define("PP_KAFKA_HEADER_APP_ID", "_appid_");
102+
define("PP_KAFKA_HEADER_APP_NAME", "_app_");
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php
2+
3+
/******************************************************************************
4+
* Copyright 2020 NAVER Corp. *
5+
* *
6+
* Licensed under the Apache License, Version 2.0 (the "License"); *
7+
* you may not use this file except in compliance with the License. *
8+
* You may obtain a copy of the License at *
9+
* *
10+
* http://www.apache.org/licenses/LICENSE-2.0 *
11+
* *
12+
* Unless required by applicable law or agreed to in writing, software *
13+
* distributed under the License is distributed on an "AS IS" BASIS, *
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
15+
* See the License for the specific language governing permissions and *
16+
* limitations under the License. *
17+
******************************************************************************/
18+
19+
namespace Pinpoint\Plugins;
20+
21+
class EmptyRequestPlugin extends DefaultRequestPlugin
22+
{
23+
public function __construct()
24+
{
25+
// skip call DefaultRequestPlugin::__construct
26+
require_once __DIR__ . "/SysV2/__init__.php";
27+
}
28+
}

lib/Pinpoint/Plugins/PinpointPerRequestPlugins.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,9 @@ public function __construct()
126126
require_once __DIR__ . "/SysV2/__init__.php";
127127
}
128128

129-
pinpoint_add_clue(PP_TRANSCATION_ID, $this->tid);
129+
pinpoint_add_clue(PP_TRANSACTION_ID, $this->tid);
130130
pinpoint_add_clue(PP_SPAN_ID, $this->sid);
131-
pinpoint_set_context(PP_TRANSCATION_ID, $this->tid);
131+
pinpoint_set_context(PP_TRANSACTION_ID, $this->tid);
132132
pinpoint_set_context(PP_SPAN_ID, (string) $this->sid);
133133
}
134134

lib/Pinpoint/Plugins/Sys/curl/CurlUtil.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public static function getPinpointHeader($url)
3434
'Pinpoint-Papptype:1500',
3535
'Pinpoint-Pappname:' . APPLICATION_NAME,
3636
'Pinpoint-Host:' . static::getHostFromURL($url),
37-
'Pinpoint-Traceid:' . pinpoint_get_context(PP_TRANSCATION_ID),
37+
'Pinpoint-Traceid:' . pinpoint_get_context(PP_TRANSACTION_ID),
3838
'Pinpoint-Pspanid:' . pinpoint_get_context(PP_SPAN_ID),
3939
'Pinpoint-Spanid:' . $nsid
4040
];
@@ -56,7 +56,7 @@ public static function getPPHeader($url)
5656
'Pinpoint-Papptype' => '1500',
5757
'Pinpoint-Pappname' => APPLICATION_NAME,
5858
'Pinpoint-Host' => static::getHostFromURL($url),
59-
'Pinpoint-Traceid' => pinpoint_get_context(PP_TRANSCATION_ID),
59+
'Pinpoint-Traceid' => pinpoint_get_context(PP_TRANSACTION_ID),
6060
'Pinpoint-Pspanid' => pinpoint_get_context(PP_SPAN_ID),
6161
'Pinpoint-Spanid' => $nsid
6262
];
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
/******************************************************************************
4+
* Copyright 2024 NAVER Corp. *
5+
* *
6+
* Licensed under the Apache License, Version 2.0 (the "License"); *
7+
* you may not use this file except in compliance with the License. *
8+
* You may obtain a copy of the License at *
9+
* *
10+
* http://www.apache.org/licenses/LICENSE-2.0 *
11+
* *
12+
* Unless required by applicable law or agreed to in writing, software *
13+
* distributed under the License is distributed on an "AS IS" BASIS, *
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
15+
* See the License for the specific language governing permissions and *
16+
* limitations under the License. *
17+
******************************************************************************/
18+
namespace Pinpoint\Plugins\SysV2\_rdKafka;
19+
20+
if (extension_loaded('rdkafka')) {
21+
require_once __DIR__ . "/rdKafka.php";
22+
if (defined("PP_ENABLE_EXPERIMENTAL_FEATURE")) {
23+
require_once __DIR__ . "/rdKafka_consumer.php";
24+
}
25+
}
26+
27+
// author: eeliu
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
<?php
2+
namespace Pinpoint\Plugins\SysV2\_rdKafka;
3+
4+
/******************************************************************************
5+
* Copyright 2024 NAVER Corp. *
6+
* *
7+
* Licensed under the Apache License, Version 2.0 (the "License"); *
8+
* you may not use this file except in compliance with the License. *
9+
* You may obtain a copy of the License at *
10+
* *
11+
* http://www.apache.org/licenses/LICENSE-2.0 *
12+
* *
13+
* Unless required by applicable law or agreed to in writing, software *
14+
* distributed under the License is distributed on an "AS IS" BASIS, *
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
16+
* See the License for the specific language governing permissions and *
17+
* limitations under the License. *
18+
******************************************************************************/
19+
20+
use function Pinpoint\Plugins\pinpoint_get_context;
21+
use function Pinpoint\Plugins\pinpoint_get_sequence_id;
22+
use function Pinpoint\Plugins\{
23+
pinpoint_join_cut,
24+
pinpoint_start_trace,
25+
pinpoint_add_clue,
26+
pinpoint_add_clues,
27+
pinpoint_set_context,
28+
pinpoint_end_trace
29+
};
30+
use Pinpoint\Common\Logger;
31+
32+
const KAFKA_BROKER_LIST = "__kafka_broker_list__";
33+
34+
$config_set_on_before = function ($name, $value) {
35+
if ($name == 'metadata.broker.list') {
36+
// store value into pinpoint context
37+
pinpoint_set_context(KAFKA_BROKER_LIST, $value);
38+
}
39+
};
40+
41+
$RdKafka_add_brokers_on_before = function ($blocker_list) {
42+
if (!empty($blocker_list)) {
43+
pinpoint_set_context(KAFKA_BROKER_LIST, $blocker_list);
44+
}
45+
};
46+
47+
function get_broker_list_plugins($joinable, $on_before)
48+
{
49+
$on_end = function ($ret) {
50+
};
51+
$on_exception = function ($exp) {
52+
};
53+
return [$joinable, $on_before, $on_end, $on_exception];
54+
}
55+
56+
$producev_on_before = function ($partition, $msgflags, $payload, $key = NULL, $headers = [], $timestamp_ms = NULL, $opaque = NULL) {
57+
Logger::Inst()->debug("call on_before");
58+
pinpoint_start_trace();
59+
pinpoint_add_clue(PP_INTERCEPTOR_NAME, "RdKafka\ProducerTopic::producev");
60+
pinpoint_add_clue(PP_SERVER_TYPE, PP_KAFKA);
61+
$topic_pro = pinpoint_get_this();
62+
$topic = "unknown";
63+
if ($topic_pro instanceof \RdKafka\ProducerTopic) {
64+
$topic = $topic_pro->getName();
65+
}
66+
pinpoint_add_clues(PP_KAFKA_TOPIC, $topic);
67+
pinpoint_add_clues(PP_KAFKA_PARTITION, "$partition");
68+
$broker_list_value = pinpoint_get_context(KAFKA_BROKER_LIST);
69+
if ($broker_list_value) {
70+
pinpoint_add_clue(PP_DESTINATION, $broker_list_value);
71+
}
72+
$async_id = mt_rand();
73+
pinpoint_add_clue(PP_ASYNC_CALL_ID, $async_id);
74+
$sequence_id = pinpoint_get_sequence_id();
75+
76+
$headers[PP_KAFKA_HEADER_ASYNC_CALL_ID] = $async_id;
77+
$headers[PP_KAFKA_HEADER_SEQUENCE_ID] = $sequence_id;
78+
79+
$headers[PP_KAFKA_HEADER_TRANSACTION_ID] = pinpoint_get_context(PP_TRANSACTION_ID);
80+
$headers[PP_KAFKA_HEADER_SPAN_ID] = pinpoint_get_context(PP_SPAN_ID);
81+
$headers[PP_KAFKA_HEADER_APP_ID] = APPLICATION_ID;
82+
$headers[PP_KAFKA_HEADER_APP_NAME] = APPLICATION_NAME;
83+
84+
85+
return [$partition, $msgflags, $payload, $key, $headers, $timestamp_ms, $opaque];
86+
};
87+
88+
$produce_on_before = function ($partition, $msgflags, $payload = NULL, $key = NULL, $headers = NULL, $opaque = NULL) {
89+
Logger::Inst()->debug("call on_before");
90+
pinpoint_start_trace();
91+
pinpoint_add_clue(PP_INTERCEPTOR_NAME, "RdKafka\ProducerTopic::produce");
92+
pinpoint_add_clue(PP_SERVER_TYPE, PP_KAFKA);
93+
$topic_pro = pinpoint_get_this();
94+
$topic = "unknown";
95+
if ($topic_pro instanceof \RdKafka\ProducerTopic) {
96+
$topic = $topic_pro->getName();
97+
}
98+
pinpoint_add_clues(PP_KAFKA_TOPIC, $topic);
99+
pinpoint_add_clues(PP_KAFKA_PARTITION, "$partition");
100+
$broker_list_value = pinpoint_get_context(KAFKA_BROKER_LIST);
101+
if ($broker_list_value) {
102+
pinpoint_add_clue(PP_DESTINATION, $broker_list_value);
103+
}
104+
};
105+
106+
107+
$on_end = function ($ret) {
108+
Logger::Inst()->debug("call on_end");
109+
pinpoint_end_trace();
110+
};
111+
112+
$on_exception = function ($exp) {
113+
Logger::Inst()->debug("call on_exception");
114+
};
115+
116+
// https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka-producertopic.producev.html
117+
pinpoint_join_cut(
118+
["RdKafka\ProducerTopic", "producev"],
119+
$producev_on_before,
120+
$on_end,
121+
$on_exception
122+
);
123+
124+
pinpoint_join_cut(
125+
["RdKafka\ProducerTopic", "produce"],
126+
$produce_on_before,
127+
$on_end,
128+
$on_exception
129+
);
130+
131+
$points = [
132+
get_broker_list_plugins(["RdKafka\Conf", "set"], $config_set_on_before), // https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka-conf.set.html
133+
get_broker_list_plugins(["RdKafka", "addBrokers"], $RdKafka_add_brokers_on_before) //https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.addbrokers.html
134+
];
135+
136+
foreach ($points as $point) {
137+
pinpoint_join_cut(
138+
$point[0],
139+
$point[1],
140+
$point[2],
141+
$point[3]
142+
);
143+
}
144+
// @author eeliu

0 commit comments

Comments
 (0)