Skip to content

disk_check: Check & mount RO as RW using tmpfs #1569

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 26, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions scripts/disk_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
What:
There have been cases, where disk turns RO due to kernel bug.
In RO state, system blocks new remote user login via TACACS.
This utility is to check & make transient recovery as needed.

How:
check for RW permission. If RO, create writable overlay using tmpfs.

By default "/etc" & "/home" are checked and if in RO state, make them RW
using overlay on top of tmpfs.

Making /etc/ & /home as writable lets successful new remote user login.

If in RO state or in RW state with the help of tmpfs overlay,
syslog ERR messages are written, to help raise alerts.

Monit may be used to invoke it periodically, to help scan & fix and
report via syslog.

"""

import argparse
import os
import sys
import syslog
import subprocess

TEST_LOG_FN = None
UPPER_DIR = "/run/mount/upper"
WORK_DIR = "/run/mount/work"
MOUNTS_FILE = "/proc/mounts"

def log_err(m):
print("Err: {}".format(m))
syslog.syslog(syslog.LOG_ERR, m)
if TEST_LOG_FN:
TEST_LOG_FN(syslog.LOG_ERR, m)


def log_info(m):
print("Info: {}".format(m))
syslog.syslog(syslog.LOG_INFO, m)


def log_debug(m):
print("debug: {}".format(m))
syslog.syslog(syslog.LOG_DEBUG, m)


def test_rw(dirs):
for d in dirs:
rw = os.access(d, os.W_OK)
if not rw:
log_err("{} dir is not RW".format(d))
return False
else:
log_debug("{} dir is RW".format(d))
return True


def run_cmd(cmd):
proc = subprocess.run(cmd, shell=True, text=True, capture_output=True)
ret = proc.returncode
if ret:
log_err("failed: ret={} cmd={}".format(ret, cmd))
else:
log_info("ret={} cmd: {}".format(ret, cmd))

if proc.stdout:
log_info("stdout: {}".format(str(proc.stdout)))
if proc.stderr:
log_info("stderr: {}".format(str(proc.stderr)))
return ret


def get_dname(path_name):
return path_name.replace('/', ' ').strip().split()[-1]


def do_mnt(dirs):
if os.path.exists(UPPER_DIR):
log_err("Already mounted")
return 1

for i in (UPPER_DIR, WORK_DIR):
ret = run_cmd("mkdir {}".format(i))
if ret:
break

for d in dirs:
if not ret:
ret = run_cmd("mount -t overlay overlay_{} -o lowerdir={},"
"upperdir={},workdir={} {}".format(
get_dname(d), d, UPPER_DIR, WORK_DIR, d))

if ret:
log_err("Failed to mount {} as RW".format(dirs))
else:
log_info("{} are mounted as RW".format(dirs))
return ret


def is_mounted(dirs):
if not os.path.exists(UPPER_DIR):
return False

onames = set()
for d in dirs:
onames.add("overlay_{}".format(get_dname(d)))

with open(MOUNTS_FILE, "r") as s:
for ln in s.readlines():
n = ln.strip().split()[0]
if n in onames:
log_debug("Mount exists for {}".format(n))
return True
return False


def do_check(skip_mount, dirs):
ret = 0
if not test_rw(dirs):
if not skip_mount:
ret = do_mnt(dirs)

# Check if mounted
if (not ret) and is_mounted(dirs):
log_err("READ-ONLY: Mounted {} to make RW".format(dirs))

return ret


def main():
parser=argparse.ArgumentParser(
description="check disk for RW and mount etc & home as RW")
parser.add_argument('-s', "--skip-mount", action='store_true', default=False,
help="Skip mounting /etc & /home as RW")
parser.add_argument('-d', "--dirs", default="/etc,/home",
help="dirs to mount")
args = parser.parse_args()

ret = do_check(args.skip_mount, args.dirs.split(","))
return ret


if __name__ == "__main__":
sys.exit(main())
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
'scripts/db_migrator.py',
'scripts/decode-syseeprom',
'scripts/dropcheck',
'scripts/disk_check.py',
'scripts/dropconfig',
'scripts/dropstat',
'scripts/dump_nat_entries.py',
Expand Down
138 changes: 138 additions & 0 deletions tests/disk_check_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import sys
import syslog
from unittest.mock import patch
import pytest

sys.path.append("scripts")
import disk_check

disk_check.MOUNTS_FILE = "/tmp/proc_mounts"

test_data = {
"0": {
"args": ["", "-d", "/tmp"],
"err": ""
},
"1": {
"args": ["", "-d", "/tmpx", "-s"],
"err": "/tmpx dir is not RW"
},
"2": {
"args": ["", "-d", "/tmpx"],
"mounts": "overlay_tmpx blahblah",
"err": "/tmpx dir is not RW",
"cmds": ['mkdir /run/mount/upper', 'mkdir /run/mount/work',
'mount -t overlay overlay_tmpx -o lowerdir=/tmpx,upperdir=/run/mount/upper,workdir=/run/mount/work /tmpx']
},
"3": {
"args": ["", "-d", "/tmpx"],
"cmds": ['mkdir /run/mount/upper'],
"proc": [ {"ret": -1, "stdout": "blah", "stderr": "blah blah"} ],
"expect_ret": -1
},
"4": {
"args": ["", "-d", "/tmpx"],
"upperdir": "/tmp",
"err": "/tmpx dir is not RW|Already mounted",
"expect_ret": 1
},
"5": {
"args": ["", "-d", "/tmp"],
"upperdir": "/tmp",
"mounts": "overlay_tmp blahblah",
"err": "READ-ONLY: Mounted ['/tmp'] to make RW"
},
"6": {
"args": ["", "-d", "/tmp"],
"upperdir": "/tmp"
}
}

err_data = ""
cmds = []
current_tc = None

def mount_file(d):
with open(disk_check.MOUNTS_FILE, "w") as s:
s.write(d)


def report_err_msg(lvl, m):
global err_data
if lvl == syslog.LOG_ERR:
if err_data:
err_data += "|"
err_data += m

disk_check.TEST_LOG_FN = report_err_msg

class proc:
returncode = 0
stdout = None
stderr = None

def __init__(self, proc_upd = None):
if proc_upd:
self.returncode = proc_upd.get("ret", 0)
self.stdout = proc_upd.get("stdout", None)
self.stderr = proc_upd.get("stderr", None)


def mock_subproc_run(cmd, shell, text, capture_output):
global cmds

upd = (current_tc["proc"][len(cmds)]
if len(current_tc.get("proc", [])) > len(cmds) else None)
cmds.append(cmd)

return proc(upd)


def init_tc(tc):
global err_data, cmds, current_tc

err_data = ""
cmds = []
mount_file(tc.get("mounts", ""))
current_tc = tc


def swap_upper(tc):
tmp_u = tc["upperdir"]
tc["upperdir"] = disk_check.UPPER_DIR
disk_check.UPPER_DIR = tmp_u


class TestDiskCheck(object):
def setup(self):
pass


@patch("disk_check.subprocess.run")
def test_readonly(self, mock_proc):
global err_data, cmds

mock_proc.side_effect = mock_subproc_run
for i, tc in test_data.items():
print("-----------Start tc {}---------".format(i))
init_tc(tc)

with patch('sys.argv', tc["args"]):
if "upperdir" in tc:
swap_upper(tc)

ret = disk_check.main()

if "upperdir" in tc:
# restore
swap_upper(tc)

print("ret = {}".format(ret))
print("err_data={}".format(err_data))
print("cmds: {}".format(cmds))

assert ret == tc.get("expect_ret", 0)
if "err" in tc:
assert err_data == tc["err"]
assert cmds == tc.get("cmds", [])
print("-----------End tc {}-----------".format(i))