1
+ <?php
2
+ namespace Pinpoint \Plugins \SysV2 \_rdKafka ;
3
+
4
+ /******************************************************************************
5
+ * Copyright 2024 NAVER Corp. *
6
+ * *
7
+ * Licensed under the Apache License, Version 2.0 (the "License"); *
8
+ * you may not use this file except in compliance with the License. *
9
+ * You may obtain a copy of the License at *
10
+ * *
11
+ * http://www.apache.org/licenses/LICENSE-2.0 *
12
+ * *
13
+ * Unless required by applicable law or agreed to in writing, software *
14
+ * distributed under the License is distributed on an "AS IS" BASIS, *
15
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
16
+ * See the License for the specific language governing permissions and *
17
+ * limitations under the License. *
18
+ ******************************************************************************/
19
+
20
+ use function Pinpoint \Plugins \pinpoint_get_context ;
21
+ use function Pinpoint \Plugins \pinpoint_get_sequence_id ;
22
+ use function Pinpoint \Plugins \{
23
+ pinpoint_join_cut ,
24
+ pinpoint_start_trace ,
25
+ pinpoint_add_clue ,
26
+ pinpoint_add_clues ,
27
+ pinpoint_set_context ,
28
+ pinpoint_end_trace
29
+ };
30
+ use Pinpoint \Common \Logger ;
31
+
32
+ const KAFKA_BROKER_LIST = "__kafka_broker_list__ " ;
33
+
34
+ $ config_set_on_before = function ($ name , $ value ) {
35
+ if ($ name == 'metadata.broker.list ' ) {
36
+ // store value into pinpoint context
37
+ pinpoint_set_context (KAFKA_BROKER_LIST , $ value );
38
+ }
39
+ };
40
+
41
+ $ RdKafka_add_brokers_on_before = function ($ blocker_list ) {
42
+ if (!empty ($ blocker_list )) {
43
+ pinpoint_set_context (KAFKA_BROKER_LIST , $ blocker_list );
44
+ }
45
+ };
46
+
47
+ function get_broker_list_plugins ($ joinable , $ on_before )
48
+ {
49
+ $ on_end = function ($ ret ) {
50
+ };
51
+ $ on_exception = function ($ exp ) {
52
+ };
53
+ return [$ joinable , $ on_before , $ on_end , $ on_exception ];
54
+ }
55
+
56
+ $ producev_on_before = function ($ partition , $ msgflags , $ payload , $ key = NULL , $ headers = [], $ timestamp_ms = NULL , $ opaque = NULL ) {
57
+ Logger::Inst ()->debug ("call on_before " );
58
+ pinpoint_start_trace ();
59
+ pinpoint_add_clue (PP_INTERCEPTOR_NAME , "RdKafka\ProducerTopic::producev " );
60
+ pinpoint_add_clue (PP_SERVER_TYPE , PP_KAFKA );
61
+ $ topic_pro = pinpoint_get_this ();
62
+ $ topic = "unknown " ;
63
+ if ($ topic_pro instanceof \RdKafka \ProducerTopic) {
64
+ $ topic = $ topic_pro ->getName ();
65
+ }
66
+ pinpoint_add_clues (PP_KAFKA_TOPIC , $ topic );
67
+ pinpoint_add_clues (PP_KAFKA_PARTITION , "$ partition " );
68
+ $ broker_list_value = pinpoint_get_context (KAFKA_BROKER_LIST );
69
+ if ($ broker_list_value ) {
70
+ pinpoint_add_clue (PP_DESTINATION , $ broker_list_value );
71
+ }
72
+ $ async_id = mt_rand ();
73
+ pinpoint_add_clue (PP_ASYNC_CALL_ID , $ async_id );
74
+ $ sequence_id = pinpoint_get_sequence_id ();
75
+
76
+ $ headers [PP_KAFKA_HEADER_ASYNC_CALL_ID ] = $ async_id ;
77
+ $ headers [PP_KAFKA_HEADER_SEQUENCE_ID ] = $ sequence_id ;
78
+
79
+ $ headers [PP_KAFKA_HEADER_TRANSACTION_ID ] = pinpoint_get_context (PP_TRANSACTION_ID );
80
+ $ headers [PP_KAFKA_HEADER_SPAN_ID ] = pinpoint_get_context (PP_SPAN_ID );
81
+ $ headers [PP_KAFKA_HEADER_APP_ID ] = APPLICATION_ID ;
82
+ $ headers [PP_KAFKA_HEADER_APP_NAME ] = APPLICATION_NAME ;
83
+
84
+
85
+ return [$ partition , $ msgflags , $ payload , $ key , $ headers , $ timestamp_ms , $ opaque ];
86
+ };
87
+
88
+ $ produce_on_before = function ($ partition , $ msgflags , $ payload = NULL , $ key = NULL , $ headers = NULL , $ opaque = NULL ) {
89
+ Logger::Inst ()->debug ("call on_before " );
90
+ pinpoint_start_trace ();
91
+ pinpoint_add_clue (PP_INTERCEPTOR_NAME , "RdKafka\ProducerTopic::produce " );
92
+ pinpoint_add_clue (PP_SERVER_TYPE , PP_KAFKA );
93
+ $ topic_pro = pinpoint_get_this ();
94
+ $ topic = "unknown " ;
95
+ if ($ topic_pro instanceof \RdKafka \ProducerTopic) {
96
+ $ topic = $ topic_pro ->getName ();
97
+ }
98
+ pinpoint_add_clues (PP_KAFKA_TOPIC , $ topic );
99
+ pinpoint_add_clues (PP_KAFKA_PARTITION , "$ partition " );
100
+ $ broker_list_value = pinpoint_get_context (KAFKA_BROKER_LIST );
101
+ if ($ broker_list_value ) {
102
+ pinpoint_add_clue (PP_DESTINATION , $ broker_list_value );
103
+ }
104
+ };
105
+
106
+
107
+ $ on_end = function ($ ret ) {
108
+ Logger::Inst ()->debug ("call on_end " );
109
+ pinpoint_end_trace ();
110
+ };
111
+
112
+ $ on_exception = function ($ exp ) {
113
+ Logger::Inst ()->debug ("call on_exception " );
114
+ };
115
+
116
+ // https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka-producertopic.producev.html
117
+ pinpoint_join_cut (
118
+ ["RdKafka\ProducerTopic " , "producev " ],
119
+ $ producev_on_before ,
120
+ $ on_end ,
121
+ $ on_exception
122
+ );
123
+
124
+ pinpoint_join_cut (
125
+ ["RdKafka\ProducerTopic " , "produce " ],
126
+ $ produce_on_before ,
127
+ $ on_end ,
128
+ $ on_exception
129
+ );
130
+
131
+ $ points = [
132
+ get_broker_list_plugins (["RdKafka\Conf " , "set " ], $ config_set_on_before ), // https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka-conf.set.html
133
+ get_broker_list_plugins (["RdKafka " , "addBrokers " ], $ RdKafka_add_brokers_on_before ) //https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.addbrokers.html
134
+ ];
135
+
136
+ foreach ($ points as $ point ) {
137
+ pinpoint_join_cut (
138
+ $ point [0 ],
139
+ $ point [1 ],
140
+ $ point [2 ],
141
+ $ point [3 ]
142
+ );
143
+ }
144
+ // @author eeliu
0 commit comments