16
16
17
17
package fr .acinq .eclair .payment .relay
18
18
19
- import akka .actor .ActorRef
20
19
import akka .actor .typed .Behavior
21
20
import akka .actor .typed .eventstream .EventStream
22
21
import akka .actor .typed .scaladsl .adapter .TypedActorRefOps
23
22
import akka .actor .typed .scaladsl .{ActorContext , Behaviors }
23
+ import akka .actor .{ActorRef , typed }
24
24
import fr .acinq .bitcoin .scalacompat .ByteVector32
25
25
import fr .acinq .bitcoin .scalacompat .Crypto .PublicKey
26
26
import fr .acinq .eclair .channel ._
@@ -43,7 +43,7 @@ object ChannelRelay {
43
43
44
44
// @formatter:off
45
45
sealed trait Command
46
- private case object DoRelay extends Command
46
+ private case class WrappedConfidence ( confidence : Double ) extends Command
47
47
private case class WrappedForwardFailure (failure : Register .ForwardFailure [CMD_ADD_HTLC ]) extends Command
48
48
private case class WrappedAddResponse (res : CommandResponse [CMD_ADD_HTLC ]) extends Command
49
49
// @formatter:on
@@ -54,15 +54,20 @@ object ChannelRelay {
54
54
case class RelaySuccess (selectedChannelId : ByteVector32 , cmdAdd : CMD_ADD_HTLC ) extends RelayResult
55
55
// @formatter:on
56
56
57
- def apply (nodeParams : NodeParams , register : ActorRef , channels : Map [ByteVector32 , Relayer .OutgoingChannel ], originNode : PublicKey , relayId : UUID , r : IncomingPaymentPacket .ChannelRelayPacket ): Behavior [Command ] =
57
+ def apply (nodeParams : NodeParams ,
58
+ register : ActorRef ,
59
+ channels : Map [ByteVector32 , Relayer .OutgoingChannel ],
60
+ originNode: PublicKey ,
61
+ relayId : UUID ,
62
+ r : IncomingPaymentPacket .ChannelRelayPacket ): Behavior [Command ] =
58
63
Behaviors .setup { context =>
59
64
Behaviors .withMdc(Logs .mdc(
60
65
category_opt = Some (Logs .LogCategory .PAYMENT ),
61
66
parentPaymentId_opt = Some (relayId), // for a channel relay, parent payment id = relay id
62
67
paymentHash_opt = Some (r.add.paymentHash),
63
68
nodeAlias_opt = Some (nodeParams.alias))) {
64
69
val upstream = Upstream .Hot .Channel (r.add.removeUnknownTlvs(), TimestampMilli .now(), originNode)
65
- context.self ! DoRelay
70
+ context.self ! WrappedConfidence ( 0.5 )
66
71
new ChannelRelay (nodeParams, register, channels, r, upstream, context).relay(Seq .empty)
67
72
}
68
73
}
@@ -118,60 +123,63 @@ class ChannelRelay private(nodeParams: NodeParams,
118
123
119
124
def relay (previousFailures : Seq [PreviouslyTried ]): Behavior [Command ] = {
120
125
Behaviors .receiveMessagePartial {
121
- case DoRelay =>
126
+ case WrappedConfidence (confidence) =>
122
127
if (previousFailures.isEmpty) {
123
128
context.log.info(" relaying htlc #{} from channelId={} to requestedShortChannelId={} nextNode={}" , r.add.id, r.add.channelId, r.payload.outgoingChannelId, nextNodeId_opt.getOrElse(" " ))
124
129
}
125
130
context.log.debug(" attempting relay previousAttempts={}" , previousFailures.size)
126
- handleRelay(previousFailures) match {
131
+ handleRelay(previousFailures, confidence ) match {
127
132
case RelayFailure (cmdFail) =>
128
133
Metrics .recordPaymentRelayFailed(Tags .FailureType (cmdFail), Tags .RelayType .Channel )
129
134
context.log.info(" rejecting htlc reason={}" , cmdFail.reason)
130
- safeSendAndStop(r.add.channelId, cmdFail)
135
+ safeSendAndStop(r.add.channelId, cmdFail, confidence, None )
131
136
case RelaySuccess (selectedChannelId, cmdAdd) =>
132
137
context.log.info(" forwarding htlc #{} from channelId={} to channelId={}" , r.add.id, r.add.channelId, selectedChannelId)
133
138
register ! Register .Forward (forwardFailureAdapter, selectedChannelId, cmdAdd)
134
- waitForAddResponse(selectedChannelId, previousFailures)
139
+ waitForAddResponse(selectedChannelId, previousFailures, confidence )
135
140
}
136
141
}
137
142
}
138
143
139
- def waitForAddResponse (selectedChannelId : ByteVector32 , previousFailures : Seq [PreviouslyTried ]): Behavior [Command ] =
144
+ def waitForAddResponse (selectedChannelId : ByteVector32 , previousFailures : Seq [PreviouslyTried ], confidence : Double ): Behavior [Command ] =
140
145
Behaviors .receiveMessagePartial {
141
146
case WrappedForwardFailure (Register .ForwardFailure (Register .Forward (_, channelId, _))) =>
142
147
context.log.warn(s " couldn't resolve downstream channel $channelId, failing htlc # ${upstream.add.id}" )
143
148
val cmdFail = CMD_FAIL_HTLC (upstream.add.id, Right (UnknownNextPeer ()), commit = true )
144
149
Metrics .recordPaymentRelayFailed(Tags .FailureType (cmdFail), Tags .RelayType .Channel )
145
- safeSendAndStop(upstream.add.channelId, cmdFail)
150
+ safeSendAndStop(upstream.add.channelId, cmdFail, confidence, Some (channelId) )
146
151
147
152
case WrappedAddResponse (addFailed : RES_ADD_FAILED [_]) =>
148
153
context.log.info(" attempt failed with reason={}" , addFailed.t.getClass.getSimpleName)
149
- context.self ! DoRelay
154
+ context.self ! WrappedConfidence (confidence)
150
155
relay(previousFailures :+ PreviouslyTried (selectedChannelId, addFailed))
151
156
152
- case WrappedAddResponse (_ : RES_SUCCESS [_]) =>
157
+ case WrappedAddResponse (r : RES_SUCCESS [_]) =>
153
158
context.log.debug(" sent htlc to the downstream channel" )
154
- waitForAddSettled()
159
+ waitForAddSettled(confidence, r.channelId )
155
160
}
156
161
157
- def waitForAddSettled (): Behavior [Command ] =
162
+ def waitForAddSettled (confidence : Double , channelId : ByteVector32 ): Behavior [Command ] =
158
163
Behaviors .receiveMessagePartial {
159
164
case WrappedAddResponse (RES_ADD_SETTLED (_, htlc, fulfill : HtlcResult .Fulfill )) =>
160
165
context.log.debug(" relaying fulfill to upstream" )
166
+ Metrics .relaySettleFulfill(confidence)
161
167
val cmd = CMD_FULFILL_HTLC (upstream.add.id, fulfill.paymentPreimage, commit = true )
162
168
context.system.eventStream ! EventStream .Publish (ChannelPaymentRelayed (upstream.amountIn, htlc.amountMsat, htlc.paymentHash, upstream.add.channelId, htlc.channelId, upstream.receivedAt, TimestampMilli .now()))
163
169
recordRelayDuration(isSuccess = true )
164
- safeSendAndStop(upstream.add.channelId, cmd)
170
+ safeSendAndStop(upstream.add.channelId, cmd, confidence, Some (channelId) )
165
171
166
172
case WrappedAddResponse (RES_ADD_SETTLED (_, _, fail : HtlcResult .Fail )) =>
167
173
context.log.debug(" relaying fail to upstream" )
174
+ Metrics .relaySettleFail(confidence)
168
175
Metrics .recordPaymentRelayFailed(Tags .FailureType .Remote , Tags .RelayType .Channel )
169
176
val cmd = translateRelayFailure(upstream.add.id, fail)
170
177
recordRelayDuration(isSuccess = false )
171
- safeSendAndStop(upstream.add.channelId, cmd)
178
+ safeSendAndStop(upstream.add.channelId, cmd, confidence, Some (channelId) )
172
179
}
173
180
174
- def safeSendAndStop (channelId : ByteVector32 , cmd : channel.HtlcSettlementCommand ): Behavior [Command ] = {
181
+ def safeSendAndStop (channelId : ByteVector32 , cmd : channel.HtlcSettlementCommand , confidence : Double , outgoingChannel_opt : Option [ByteVector32 ]): Behavior [Command ] = {
182
+ context.log.info(" cmd={}, startedAt={}, endedAt={}, confidence={}, originNode={}, outgoingChannel={}" , cmd.getClass.getSimpleName, upstream.receivedAt, TimestampMilli .now(), confidence, upstream.receivedFrom, outgoingChannel_opt)
175
183
val toSend = cmd match {
176
184
case _ : CMD_FULFILL_HTLC => cmd
177
185
case _ : CMD_FAIL_HTLC | _ : CMD_FAIL_MALFORMED_HTLC => r.payload match {
@@ -202,9 +210,9 @@ class ChannelRelay private(nodeParams: NodeParams,
202
210
* - a CMD_FAIL_HTLC to be sent back upstream
203
211
* - a CMD_ADD_HTLC to propagate downstream
204
212
*/
205
- def handleRelay (previousFailures : Seq [PreviouslyTried ]): RelayResult = {
213
+ def handleRelay (previousFailures : Seq [PreviouslyTried ], confidence : Double ): RelayResult = {
206
214
val alreadyTried = previousFailures.map(_.channelId)
207
- selectPreferredChannel(alreadyTried) match {
215
+ selectPreferredChannel(alreadyTried, confidence ) match {
208
216
case None if previousFailures.nonEmpty =>
209
217
// no more channels to try
210
218
val error = previousFailures
@@ -215,7 +223,7 @@ class ChannelRelay private(nodeParams: NodeParams,
215
223
.failure
216
224
RelayFailure (CMD_FAIL_HTLC (r.add.id, Right (translateLocalError(error.t, error.channelUpdate)), commit = true ))
217
225
case outgoingChannel_opt =>
218
- relayOrFail(outgoingChannel_opt)
226
+ relayOrFail(outgoingChannel_opt, confidence )
219
227
}
220
228
}
221
229
@@ -234,7 +242,7 @@ class ChannelRelay private(nodeParams: NodeParams,
234
242
*
235
243
* If no suitable channel is found we default to the originally requested channel.
236
244
*/
237
- def selectPreferredChannel (alreadyTried : Seq [ByteVector32 ]): Option [OutgoingChannel ] = {
245
+ def selectPreferredChannel (alreadyTried : Seq [ByteVector32 ], confidence : Double ): Option [OutgoingChannel ] = {
238
246
val requestedShortChannelId = r.payload.outgoingChannelId
239
247
context.log.debug(" selecting next channel with requestedShortChannelId={}" , requestedShortChannelId)
240
248
// we filter out channels that we have already tried
@@ -243,7 +251,7 @@ class ChannelRelay private(nodeParams: NodeParams,
243
251
candidateChannels
244
252
.values
245
253
.map { channel =>
246
- val relayResult = relayOrFail(Some (channel))
254
+ val relayResult = relayOrFail(Some (channel), confidence )
247
255
context.log.debug(s " candidate channel: channelId= ${channel.channelId} availableForSend={} capacity={} channelUpdate={} result={} " ,
248
256
channel.commitments.availableBalanceForSend,
249
257
channel.commitments.latest.capacity,
@@ -291,7 +299,7 @@ class ChannelRelay private(nodeParams: NodeParams,
291
299
* channel, because some parameters don't match with our settings for that channel. In that case we directly fail the
292
300
* htlc.
293
301
*/
294
- def relayOrFail (outgoingChannel_opt : Option [OutgoingChannelParams ]): RelayResult = {
302
+ def relayOrFail (outgoingChannel_opt : Option [OutgoingChannelParams ], confidence : Double ): RelayResult = {
295
303
outgoingChannel_opt match {
296
304
case None =>
297
305
RelayFailure (CMD_FAIL_HTLC (r.add.id, Right (UnknownNextPeer ()), commit = true ))
@@ -312,7 +320,7 @@ class ChannelRelay private(nodeParams: NodeParams,
312
320
case payload : IntermediatePayload .ChannelRelay .Blinded => Some (payload.nextBlinding)
313
321
case _ : IntermediatePayload .ChannelRelay .Standard => None
314
322
}
315
- RelaySuccess (c.channelId, CMD_ADD_HTLC (addResponseAdapter.toClassic, r.amountToForward, r.add.paymentHash, r.outgoingCltv, r.nextPacket, nextBlindingKey_opt, origin, commit = true ))
323
+ RelaySuccess (c.channelId, CMD_ADD_HTLC (addResponseAdapter.toClassic, r.amountToForward, r.add.paymentHash, r.outgoingCltv, r.nextPacket, nextBlindingKey_opt, confidence, origin, commit = true ))
316
324
}
317
325
}
318
326
0 commit comments