Skip to content

Commit a1b2d77

Browse files
authored
optionally delete sqs message when node not found (#801)
* optionally delete sqs message when node not found * remove unused variables
1 parent 40f0251 commit a1b2d77

File tree

7 files changed

+194
-7
lines changed

7 files changed

+194
-7
lines changed

cmd/node-termination-handler.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,7 @@ func watchForCancellationEvents(cancelChan <-chan monitor.InterruptionEvent, int
333333

334334
func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Store, drainEvent *monitor.InterruptionEvent, node node.Node, nthConfig config.Config, nodeMetadata ec2metadata.NodeMetadata, metrics observability.Metrics, recorder observability.K8sEventRecorder, wg *sync.WaitGroup) {
335335
defer wg.Done()
336+
nodeFound := true
336337
nodeName := drainEvent.NodeName
337338

338339
if nthConfig.UseProviderId {
@@ -348,6 +349,7 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto
348349
nodeLabels, err := node.GetNodeLabels(nodeName)
349350
if err != nil {
350351
log.Err(err).Msgf("Unable to fetch node labels for node '%s' ", nodeName)
352+
nodeFound = false
351353
}
352354
drainEvent.NodeLabels = nodeLabels
353355
if drainEvent.PreDrainTask != nil {
@@ -376,15 +378,14 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto
376378

377379
if err != nil {
378380
interruptionEventStore.CancelInterruptionEvent(drainEvent.EventID)
379-
<-interruptionEventStore.Workers
380381
} else {
381382
interruptionEventStore.MarkAllAsProcessed(nodeName)
382-
if drainEvent.PostDrainTask != nil {
383-
runPostDrainTask(node, nodeName, drainEvent, metrics, recorder)
384-
}
385-
<-interruptionEventStore.Workers
386383
}
387384

385+
if (err == nil || (!nodeFound && nthConfig.DeleteSqsMsgIfNodeNotFound)) && drainEvent.PostDrainTask != nil {
386+
runPostDrainTask(node, nodeName, drainEvent, metrics, recorder)
387+
}
388+
<-interruptionEventStore.Workers
388389
}
389390

390391
func runPreDrainTask(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) {

config/helm/aws-node-termination-handler/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ The configuration in this table applies to AWS Node Termination Handler in IMDS
156156
| `enableScheduledEventDraining` | If `true`, drain nodes before the maintenance window starts for an EC2 instance scheduled event. Only used in IMDS mode. | `true` |
157157
| `enableRebalanceMonitoring` | If `true`, cordon nodes when the rebalance recommendation notice is received. If you'd like to drain the node in addition to cordoning, then also set `enableRebalanceDraining`. Only used in IMDS mode. | `false` |
158158
| `enableRebalanceDraining` | If `true`, drain nodes when the rebalance recommendation notice is received. Only used in IMDS mode. | `false` |
159+
| `deleteSqsMsgIfNodeNotFound` | If `true`, delete the SQS Message from the SQS Queue if the targeted node is not found. Only used in Queue Processor mode. | `false` |
159160

160161
### Testing Configuration
161162

config/helm/aws-node-termination-handler/templates/deployment.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ spec:
164164
{{- end }}
165165
- name: QUEUE_URL
166166
value: {{ .Values.queueURL | quote }}
167+
- name: DELETE_SQS_MSG_IF_NODE_NOT_FOUND
168+
value: {{ .Values.deleteSqsMsgIfNodeNotFound | quote }}
167169
- name: WORKERS
168170
value: {{ .Values.workers | quote }}
169171
{{- with .Values.extraEnv }}

config/helm/aws-node-termination-handler/values.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,9 @@ enableRebalanceMonitoring: false
277277
# enableRebalanceDraining If true, drain nodes when the rebalance recommendation notice is received. Only used in IMDS mode.
278278
enableRebalanceDraining: false
279279

280+
# deleteSqsMsgIfNodeNotFound If true, delete the SQS Message from the SQS Queue if the targeted node(s) are not found. Only used in Queue Processor mode.
281+
deleteSqsMsgIfNodeNotFound: false
282+
280283
# ---------------------------------------------------------------------------------------------------------------------
281284
# Testing
282285
# ---------------------------------------------------------------------------------------------------------------------

pkg/config/config.go

+6
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ const (
108108
awsEndpointConfigKey = "AWS_ENDPOINT"
109109
queueURLConfigKey = "QUEUE_URL"
110110
completeLifecycleActionDelaySecondsKey = "COMPLETE_LIFECYCLE_ACTION_DELAY_SECONDS"
111+
deleteSqsMsgIfNodeNotFoundKey = "DELETE_SQS_MSG_IF_NODE_NOT_FOUND"
111112
)
112113

113114
// Config arguments set via CLI, environment variables, or defaults
@@ -159,6 +160,7 @@ type Config struct {
159160
Workers int
160161
UseProviderId bool
161162
CompleteLifecycleActionDelaySeconds int
163+
DeleteSqsMsgIfNodeNotFound bool
162164
}
163165

164166
// ParseCliArgs parses cli arguments and uses environment variables as fallback values
@@ -220,6 +222,7 @@ func ParseCliArgs() (config Config, err error) {
220222
flag.IntVar(&config.Workers, "workers", getIntEnv(workersConfigKey, workersDefault), "The amount of parallel event processors.")
221223
flag.BoolVar(&config.UseProviderId, "use-provider-id", getBoolEnv(useProviderIdConfigKey, useProviderIdDefault), "If true, fetch node name through Kubernetes node spec ProviderID instead of AWS event PrivateDnsHostname.")
222224
flag.IntVar(&config.CompleteLifecycleActionDelaySeconds, "complete-lifecycle-action-delay-seconds", getIntEnv(completeLifecycleActionDelaySecondsKey, -1), "Delay completing the Autoscaling lifecycle action after a node has been drained.")
225+
flag.BoolVar(&config.DeleteSqsMsgIfNodeNotFound, "delete-sqs-msg-if-node-not-found", getBoolEnv(deleteSqsMsgIfNodeNotFoundKey, false), "If true, delete SQS Messages from the SQS Queue if the targeted node(s) are not found.")
223226
flag.Parse()
224227

225228
if isConfigProvided("pod-termination-grace-period", podTerminationGracePeriodConfigKey) && isConfigProvided("grace-period", gracePeriodConfigKey) {
@@ -299,6 +302,7 @@ func (c Config) PrintJsonConfigArgs() {
299302
Bool("enable_scheduled_event_draining", c.EnableScheduledEventDraining).
300303
Bool("enable_spot_interruption_draining", c.EnableSpotInterruptionDraining).
301304
Bool("enable_sqs_termination_draining", c.EnableSQSTerminationDraining).
305+
Bool("delete_sqs_msg_if_node_not_found", c.DeleteSqsMsgIfNodeNotFound).
302306
Bool("enable_rebalance_monitoring", c.EnableRebalanceMonitoring).
303307
Bool("enable_rebalance_draining", c.EnableRebalanceDraining).
304308
Int("metadata_tries", c.MetadataTries).
@@ -346,6 +350,7 @@ func (c Config) PrintHumanConfigArgs() {
346350
"\tenable-scheduled-event-draining: %t,\n"+
347351
"\tenable-spot-interruption-draining: %t,\n"+
348352
"\tenable-sqs-termination-draining: %t,\n"+
353+
"\tdelete-sqs-msg-if-node-not-found: %t,\n"+
349354
"\tenable-rebalance-monitoring: %t,\n"+
350355
"\tenable-rebalance-draining: %t,\n"+
351356
"\tmetadata-tries: %d,\n"+
@@ -384,6 +389,7 @@ func (c Config) PrintHumanConfigArgs() {
384389
c.EnableScheduledEventDraining,
385390
c.EnableSpotInterruptionDraining,
386391
c.EnableSQSTerminationDraining,
392+
c.DeleteSqsMsgIfNodeNotFound,
387393
c.EnableRebalanceMonitoring,
388394
c.EnableRebalanceDraining,
389395
c.MetadataTries,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
#!/bin/bash
2+
set -euo pipefail
3+
4+
# Available env vars:
5+
# $TMP_DIR
6+
# $CLUSTER_NAME
7+
# $KUBECONFIG
8+
# $NODE_TERMINATION_HANDLER_DOCKER_REPO
9+
# $NODE_TERMINATION_HANDLER_DOCKER_TAG
10+
# $WEBHOOK_DOCKER_REPO
11+
# $WEBHOOK_DOCKER_TAG
12+
# $AEMM_URL
13+
# $AEMM_VERSION
14+
15+
function fail_and_exit {
16+
echo "❌ Rebalance Recommendation SQS Test failed $CLUSTER_NAME"
17+
exit "${1:-1}"
18+
}
19+
20+
echo "Starting Rebalance Recommendation SQS Test for Node Termination Handler"
21+
START_TIME=$(date -u +"%Y-%m-%dT%TZ")
22+
23+
SCRIPTPATH="$( cd "$(dirname "$0")" ; pwd -P )"
24+
25+
common_helm_args=()
26+
27+
localstack_helm_args=(
28+
upgrade
29+
--install
30+
--namespace default
31+
"$CLUSTER_NAME-localstack"
32+
"$SCRIPTPATH/../../config/helm/localstack/"
33+
--set nodeSelector."${NTH_CONTROL_LABEL}"
34+
--set defaultRegion="${AWS_REGION}"
35+
--wait
36+
)
37+
38+
set -x
39+
helm "${localstack_helm_args[@]}"
40+
set +x
41+
42+
sleep 10
43+
44+
localstack_pod=$(kubectl get pods --selector app=localstack --field-selector="status.phase=Running" \
45+
-o go-template --template '{{range .items}}{{.metadata.name}} {{.metadata.creationTimestamp}}{{"\n"}}{{end}}' \
46+
| awk '$2 >= "'"${START_TIME//+0000/Z}"'" { print $1 }')
47+
echo "🥑 Using localstack pod ${localstack_pod}"
48+
49+
run_instances_resp=$(kubectl exec -i "${localstack_pod}" -- bash -c "awslocal ec2 run-instances --private-ip-address 192.168.0.4 --region ${AWS_REGION}")
50+
instance_id=$(echo "${run_instances_resp}" | jq -r '.Instances[] .InstanceId')
51+
echo "🥑 Created awslocal EC2 instance (${instance_id}) not backed by a node"
52+
53+
CREATE_SQS_CMD="awslocal sqs create-queue --queue-name "${CLUSTER_NAME}-queue" --attributes MessageRetentionPeriod=300 --region ${AWS_REGION}"
54+
queue_url=$(kubectl exec -i "${localstack_pod}" -- bash -c "${CREATE_SQS_CMD}" | jq -r .QueueUrl)
55+
56+
echo "🥑 Created SQS Queue ${queue_url}"
57+
58+
anth_helm_args=(
59+
upgrade
60+
--install
61+
--namespace kube-system
62+
"$CLUSTER_NAME-acth"
63+
"$SCRIPTPATH/../../config/helm/aws-node-termination-handler/"
64+
--set image.repository="$NODE_TERMINATION_HANDLER_DOCKER_REPO"
65+
--set image.tag="$NODE_TERMINATION_HANDLER_DOCKER_TAG"
66+
--set nodeSelector."${NTH_CONTROL_LABEL}"
67+
--set tolerations[0].operator=Exists
68+
--set awsAccessKeyID=foo
69+
--set awsSecretAccessKey=bar
70+
--set awsRegion="${AWS_REGION}"
71+
--set awsEndpoint="http://localstack.default"
72+
--set checkTagBeforeDraining=false
73+
--set enableSqsTerminationDraining=true
74+
--set deleteSqsMsgIfNodeNotFound=true
75+
--set taintNode="true"
76+
--set "queueURL=${queue_url}"
77+
--wait
78+
)
79+
[[ -n "${NODE_TERMINATION_HANDLER_DOCKER_PULL_POLICY-}" ]] &&
80+
anth_helm_args+=(--set image.pullPolicy="$NODE_TERMINATION_HANDLER_DOCKER_PULL_POLICY")
81+
[[ ${#common_helm_args[@]} -gt 0 ]] &&
82+
anth_helm_args+=("${common_helm_args[@]}")
83+
84+
set -x
85+
helm "${anth_helm_args[@]}"
86+
set +x
87+
88+
emtp_helm_args=(
89+
upgrade
90+
--install
91+
--namespace default
92+
"$CLUSTER_NAME-emtp"
93+
"$SCRIPTPATH/../../config/helm/webhook-test-proxy/"
94+
--set webhookTestProxy.image.repository="$WEBHOOK_DOCKER_REPO"
95+
--set webhookTestProxy.image.tag="$WEBHOOK_DOCKER_TAG"
96+
--wait
97+
)
98+
[[ -n "${WEBHOOK_DOCKER_PULL_POLICY-}" ]] &&
99+
emtp_helm_args+=(--set webhookTestProxy.image.pullPolicy="$WEBHOOK_DOCKER_PULL_POLICY")
100+
[[ ${#common_helm_args[@]} -gt 0 ]] &&
101+
emtp_helm_args+=("${common_helm_args[@]}")
102+
103+
set -x
104+
helm "${emtp_helm_args[@]}"
105+
set +x
106+
107+
CHECK_CYCLES=15
108+
CHECK_SLEEP=15
109+
110+
DEPLOYED=0
111+
112+
for i in $(seq 1 $CHECK_CYCLES); do
113+
if [[ $(kubectl get deployments regular-pod-test -o jsonpath='{.status.unavailableReplicas}') -eq 0 ]]; then
114+
echo "✅ Verified regular-pod-test pod was scheduled and started!"
115+
DEPLOYED=1
116+
break
117+
fi
118+
echo "Setup Loop $i/$CHECK_CYCLES, sleeping for $CHECK_SLEEP seconds"
119+
sleep $CHECK_SLEEP
120+
done
121+
122+
if [[ $DEPLOYED -eq 0 ]]; then
123+
echo "❌ regular-pod-test pod deployment failed"
124+
fail_and_exit 2
125+
fi
126+
127+
REBALANCE_EVENT=$(cat <<EOF
128+
{
129+
"version": "0",
130+
"id": "5d5555d5-dd55-5555-5555-5555dd55d55d",
131+
"detail-type": "EC2 Instance Rebalance Recommendation",
132+
"source": "aws.ec2",
133+
"account": "123456789012",
134+
"time": "$(date -u +"%Y-%m-%dT%TZ")",
135+
"region": "us-east-1",
136+
"resources": [
137+
"arn:aws:ec2:us-east-1b:instance/${instance_id}"
138+
],
139+
"detail": {
140+
"instance-id": "${instance_id}"
141+
}
142+
}
143+
EOF
144+
)
145+
146+
REBALANCE_EVENT_ONE_LINE=$(echo "${REBALANCE_EVENT}" | tr -d '\n' |sed 's/\"/\\"/g')
147+
SEND_SQS_CMD="awslocal sqs send-message --queue-url ${queue_url} --message-body \"${REBALANCE_EVENT_ONE_LINE}\" --region ${AWS_REGION}"
148+
kubectl exec -i "${localstack_pod}" -- bash -c "${SEND_SQS_CMD}"
149+
echo "✅ Sent Rebalance Recommendation to SQS queue: ${queue_url}"
150+
151+
GET_ATTRS_SQS_CMD="awslocal sqs get-queue-attributes --queue-url ${queue_url} --attribute-names All --region ${AWS_REGION}"
152+
153+
for i in $(seq 1 $CHECK_CYCLES); do
154+
if [[ $(kubectl exec -i "${localstack_pod}" -- bash -c "${GET_ATTRS_SQS_CMD}" | jq '(.Attributes.ApproximateNumberOfMessagesNotVisible|tonumber) + (.Attributes.ApproximateNumberOfMessages|tonumber)' ) -eq 0 ]]; then
155+
kubectl exec -i "${localstack_pod}" -- bash -c "${GET_ATTRS_SQS_CMD}"
156+
echo "✅ Verified the message was deleted from the queue after processing!"
157+
echo "✅ Rebalance Recommendation SQS Test Passed $CLUSTER_NAME! ✅"
158+
exit 0
159+
fi
160+
161+
echo "Assertion Loop $i/$CHECK_CYCLES, sleeping for $CHECK_SLEEP seconds"
162+
sleep $CHECK_SLEEP
163+
done
164+
165+
echo "❌ message was not removed from the queue after processing"
166+
fail_and_exit 3

test/k8s-local-cluster-test/run-test

+10-2
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,18 @@ export KUBECONFIG="$TMP_DIR/kubeconfig"
228228
trap "exit_and_fail" INT TERM ERR
229229
trap "clean_up" EXIT
230230

231+
cat << EOF >$TMP_DIR/env
232+
export KUBECONFIG=$TMP_DIR/kubeconfig
233+
echo "Updated KUBECONFIG=$KUBECONFIG"
234+
235+
export PATH=$TMP_DIR:\$PATH
236+
echo "Updated PATH=$PATH"
237+
238+
EOF
239+
231240
echo "======================================================================================================"
232241
echo "To poke around your test manually:"
233-
echo "export KUBECONFIG=$TMP_DIR/kubeconfig"
234-
echo "export PATH=$TMP_DIR:\$PATH"
242+
echo ". $TMP_DIR/env"
235243
echo "kubectl get pods -A"
236244
echo "======================================================================================================"
237245

0 commit comments

Comments
 (0)