Skip to content

Add code style check and fix commands #210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -1,44 +1,55 @@
name: Tests
on: push
jobs:
code-style:
name: Code style
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: 3.8
cache: 'pip'
- run: pip install -r requirements.txt
- name: Check
run: black . --check

duplicates:
name: No duplicate domains in list
runs-on: ubuntu-latest
continue-on-error: true
steps:
- uses: actions/checkout@v2
name: Python setup
- uses: actions/setup-python@v2
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: 3.8
- name: Install pytest
run: pip install pytest
cache: 'pip'
- run: pip install -r requirements.txt
- name: Check for duplicate lines
run: |
cd scripts
python3 tests.py --type duplicates

regex:
name: No regex domains in list
runs-on: ubuntu-latest
continue-on-error: true
steps:
- uses: actions/checkout@v2
name: Python setup
- uses: actions/setup-python@v2
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: 3.8
- name: Install pytest
run: pip install pytest
cache: 'pip'
- run: pip install -r requirements.txt
- name: Check if list includes regex domains
run: |
cd scripts
python3 tests.py --type regex

unbound:
name: Check Unbound list
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4
- name: Install Unbound
run: sudo apt-get update && sudo apt-get install -y unbound
- name: Check unbound list
run: unbound-checkconf unbound.conf
run: unbound-checkconf unbound.conf
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,10 @@ whois: ## Check all domains with whois
@. .venv/bin/activate; cd scripts && python domain-check-api.py

dnscheck: ## Check all domains if they have a response
@. .venv/bin/activate; cd scripts && python dnscheck.py
@. .venv/bin/activate; cd scripts && python dnscheck.py

codestyle-check: ## Check all Python scripts code style
@. .venv/bin/activate; black . --check

codestyle-fix: ## Fix all Python scripts code style
@. .venv/bin/activate; black .
8 changes: 7 additions & 1 deletion convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,13 @@ def run(action: str):
if __name__ == "__main__":

# Read subcommand from command line, with error handling.
action_candidates = ["pihole", "unbound", "adguard", "adguard_important", "categories"]
action_candidates = [
"pihole",
"unbound",
"adguard",
"adguard_important",
"categories",
]
special_candidates = ["all", "duplicates", "json"]
subcommand = None
try:
Expand Down
162 changes: 95 additions & 67 deletions install.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,39 @@ def fetch_url(url):
if not url:
return

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:71.0) Gecko/20100101 Firefox/71.0'}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:71.0) Gecko/20100101 Firefox/71.0"
}

print('[i] Fetching:', url)
print("[i] Fetching:", url)

try:
response = urlopen(Request(url, headers=headers))
except HTTPError as e:
print('[E] HTTP Error:', e.code, 'whilst fetching', url)
print("[E] HTTP Error:", e.code, "whilst fetching", url)
return
except URLError as e:
print('[E] URL Error:', e.reason, 'whilst fetching', url)
print("[E] URL Error:", e.reason, "whilst fetching", url)
return

# Read and decode
response = response.read().decode('UTF-8').replace('\r\n', '\n')
response = response.read().decode("UTF-8").replace("\r\n", "\n")

# If there is data
if response:
# Strip leading and trailing whitespace
response = '\n'.join(x for x in map(str.strip, response.splitlines()))
response = "\n".join(x for x in map(str.strip, response.splitlines()))

# Return the hosts
return response


url_regexps_remote = 'https://raw.githubusercontent.com/nickspaargaren/no-google/master/regex.list'
install_comment = 'github.com/nickspaargaren/no-google'
url_regexps_remote = (
"https://raw.githubusercontent.com/nickspaargaren/no-google/master/regex.list"
)
install_comment = "github.com/nickspaargaren/no-google"

cmd_restart = ['pihole', 'restartdns', 'reload']
cmd_restart = ["pihole", "restartdns", "reload"]

db_exists = False
conn = None
Expand All @@ -61,73 +65,83 @@ def fetch_url(url):

# Check to see whether the default "pihole" docker container is active
try:
docker_id = subprocess.run(['docker', 'ps', '--filter', 'name=pihole', '-q'],
stdout=subprocess.PIPE, universal_newlines=True).stdout.strip()
docker_id = subprocess.run(
["docker", "ps", "--filter", "name=pihole", "-q"],
stdout=subprocess.PIPE,
universal_newlines=True,
).stdout.strip()
# Exception for if docker is not installed
except FileNotFoundError:
pass

# If a pihole docker container was found, locate the first mount
if docker_id:
docker_mnt = subprocess.run(['docker', 'inspect', '--format', '{{ (json .Mounts) }}', docker_id],
stdout=subprocess.PIPE, universal_newlines=True).stdout.strip()
docker_mnt = subprocess.run(
["docker", "inspect", "--format", "{{ (json .Mounts) }}", docker_id],
stdout=subprocess.PIPE,
universal_newlines=True,
).stdout.strip()
# Convert output to JSON and iterate through each dict
for json_dict in json.loads(docker_mnt):
# If this mount's destination is /etc/pihole
if json_dict['Destination'] == r'/etc/pihole':
if json_dict["Destination"] == r"/etc/pihole":
# Use the source path as our target
docker_mnt_src = json_dict['Source']
docker_mnt_src = json_dict["Source"]
break

# If we successfully found the mount
if docker_mnt_src:
print('[i] Running in docker installation mode')
print("[i] Running in docker installation mode")
# Prepend restart commands
cmd_restart[0:0] = ['docker', 'exec', '-i', 'pihole']
cmd_restart[0:0] = ["docker", "exec", "-i", "pihole"]
else:
print('[i] Running in physical installation mode ')
print("[i] Running in physical installation mode ")

# Set paths
path_pihole = docker_mnt_src if docker_mnt_src else r'/etc/pihole'
path_legacy_regex = os.path.join(path_pihole, 'regex.list')
path_legacy_mmotti_regex = os.path.join(path_pihole, 'mmotti-regex.list')
path_pihole_db = os.path.join(path_pihole, 'gravity.db')
path_pihole = docker_mnt_src if docker_mnt_src else r"/etc/pihole"
path_legacy_regex = os.path.join(path_pihole, "regex.list")
path_legacy_mmotti_regex = os.path.join(path_pihole, "mmotti-regex.list")
path_pihole_db = os.path.join(path_pihole, "gravity.db")

# Check that pi-hole path exists
if os.path.exists(path_pihole):
print('[i] Pi-hole path exists')
print("[i] Pi-hole path exists")
else:
print(f'[e] {path_pihole} was not found')
print(f"[e] {path_pihole} was not found")
exit(1)

# Check for write access to /etc/pihole
if os.access(path_pihole, os.X_OK | os.W_OK):
print(f'[i] Write access to {path_pihole} verified')
print(f"[i] Write access to {path_pihole} verified")
else:
print(f'[e] Write access is not available for {path_pihole}. Please run as root or other privileged user')
print(
f"[e] Write access is not available for {path_pihole}. Please run as root or other privileged user"
)
exit(1)

# Determine whether we are using DB or not
if os.path.isfile(path_pihole_db) and os.path.getsize(path_pihole_db) > 0:
db_exists = True
print('[i] DB detected')
print("[i] DB detected")
else:
print('[i] Legacy regex.list detected')
print("[i] Legacy regex.list detected")

# Fetch the remote regexps
str_regexps_remote = fetch_url(url_regexps_remote)

# If regexps were fetched, remove any comments and add to set
if str_regexps_remote:
regexps_remote.update(x for x in map(str.strip, str_regexps_remote.splitlines()) if x and x[:1] != '#')
print(f'[i] {len(regexps_remote)} regexps collected from {url_regexps_remote}')
regexps_remote.update(
x for x in map(str.strip, str_regexps_remote.splitlines()) if x and x[:1] != "#"
)
print(f"[i] {len(regexps_remote)} regexps collected from {url_regexps_remote}")
else:
print('[i] No remote regexps were found.')
print("[i] No remote regexps were found.")
exit(1)

if db_exists:
# Create a DB connection
print(f'[i] Connecting to {path_pihole_db}')
print(f"[i] Connecting to {path_pihole_db}")

try:
conn = sqlite3.connect(path_pihole_db)
Expand All @@ -139,93 +153,107 @@ def fetch_url(url):
c = conn.cursor()

# Add / update remote regexps
print('[i] Adding / updating regexps in the DB')

c.executemany('INSERT OR IGNORE INTO domainlist (type, domain, enabled, comment) '
'VALUES (3, ?, 1, ?)',
[(x, install_comment) for x in sorted(regexps_remote)])
c.executemany('UPDATE domainlist '
'SET comment = ? WHERE domain in (?) AND comment != ?',
[(install_comment, x, install_comment) for x in sorted(regexps_remote)])
print("[i] Adding / updating regexps in the DB")

c.executemany(
"INSERT OR IGNORE INTO domainlist (type, domain, enabled, comment) "
"VALUES (3, ?, 1, ?)",
[(x, install_comment) for x in sorted(regexps_remote)],
)
c.executemany(
"UPDATE domainlist " "SET comment = ? WHERE domain in (?) AND comment != ?",
[(install_comment, x, install_comment) for x in sorted(regexps_remote)],
)

conn.commit()

# Fetch all current mmotti regexps in the local db
c.execute('SELECT domain FROM domainlist WHERE type = 3 AND comment = ?', (install_comment,))
c.execute(
"SELECT domain FROM domainlist WHERE type = 3 AND comment = ?",
(install_comment,),
)
regexps_mmotti_local_results = c.fetchall()
regexps_mmotti_local.update([x[0] for x in regexps_mmotti_local_results])

# Remove any local entries that do not exist in the remote list
# (will only work for previous installs where we've set the comment field)
print('[i] Identifying obsolete regexps')
print("[i] Identifying obsolete regexps")
regexps_remove = regexps_mmotti_local.difference(regexps_remote)

if regexps_remove:
print('[i] Removing obsolete regexps')
c.executemany('DELETE FROM domainlist WHERE type = 3 AND domain in (?)', [(x,) for x in regexps_remove])
print("[i] Removing obsolete regexps")
c.executemany(
"DELETE FROM domainlist WHERE type = 3 AND domain in (?)",
[(x,) for x in regexps_remove],
)
conn.commit()

# Delete mmotti-regex.list as if we've migrated to the db, it's no longer needed
if os.path.exists(path_legacy_mmotti_regex):
os.remove(path_legacy_mmotti_regex)

print('[i] Restarting Pi-hole')
print("[i] Restarting Pi-hole")
subprocess.run(cmd_restart, stdout=subprocess.DEVNULL)

# Prepare final result
print('[i] Done - Please see your installed regexps below\n')
print("[i] Done - Please see your installed regexps below\n")

c.execute('Select domain FROM domainlist WHERE type = 3')
c.execute("Select domain FROM domainlist WHERE type = 3")
final_results = c.fetchall()
regexps_local.update(x[0] for x in final_results)

print(*sorted(regexps_local), sep='\n')
print(*sorted(regexps_local), sep="\n")

conn.close()

else:
# If regex.list exists and is not empty
# Read it and add to a set
if os.path.isfile(path_legacy_regex) and os.path.getsize(path_legacy_regex) > 0:
print('[i] Collecting existing entries from regex.list')
with open(path_legacy_regex, 'r') as fRead:
regexps_local.update(x for x in map(str.strip, fRead) if x and x[:1] != '#')
print("[i] Collecting existing entries from regex.list")
with open(path_legacy_regex, "r") as fRead:
regexps_local.update(x for x in map(str.strip, fRead) if x and x[:1] != "#")

# If the local regexp set is not empty
if regexps_local:
print(f'[i] {len(regexps_local)} existing regexps identified')
print(f"[i] {len(regexps_local)} existing regexps identified")
# If we have a record of a previous legacy install
if os.path.isfile(path_legacy_mmotti_regex) and os.path.getsize(path_legacy_mmotti_regex) > 0:
print('[i] Existing mmotti-regex install identified')
if (
os.path.isfile(path_legacy_mmotti_regex)
and os.path.getsize(path_legacy_mmotti_regex) > 0
):
print("[i] Existing mmotti-regex install identified")
# Read the previously installed regexps to a set
with open(path_legacy_mmotti_regex, 'r') as fOpen:
regexps_legacy_mmotti.update(x for x in map(str.strip, fOpen) if x and x[:1] != '#')
with open(path_legacy_mmotti_regex, "r") as fOpen:
regexps_legacy_mmotti.update(
x for x in map(str.strip, fOpen) if x and x[:1] != "#"
)

if regexps_legacy_mmotti:
print('[i] Removing previously installed regexps')
print("[i] Removing previously installed regexps")
regexps_local.difference_update(regexps_legacy_mmotti)

# Add remote regexps to local regexps
print(f'[i] Syncing with {url_regexps_remote}')
print(f"[i] Syncing with {url_regexps_remote}")
regexps_local.update(regexps_remote)

# Output to regex.list
print(f'[i] Outputting {len(regexps_local)} regexps to {path_legacy_regex}')
with open(path_legacy_regex, 'w') as fWrite:
print(f"[i] Outputting {len(regexps_local)} regexps to {path_legacy_regex}")
with open(path_legacy_regex, "w") as fWrite:
for line in sorted(regexps_local):
fWrite.write(f'{line}\n')
fWrite.write(f"{line}\n")

# Output mmotti remote regexps to mmotti-regex.list
# for future install / uninstall
with open(path_legacy_mmotti_regex, 'w') as fWrite:
with open(path_legacy_mmotti_regex, "w") as fWrite:
for line in sorted(regexps_remote):
fWrite.write(f'{line}\n')
fWrite.write(f"{line}\n")

print('[i] Restarting Pi-hole')
print("[i] Restarting Pi-hole")
subprocess.run(cmd_restart, stdout=subprocess.DEVNULL)

# Prepare final result
print('[i] Done - Please see your installed regexps below\n')
with open(path_legacy_regex, 'r') as fOpen:
print("[i] Done - Please see your installed regexps below\n")
with open(path_legacy_regex, "r") as fOpen:
for line in fOpen:
print(line, end='')
print(line, end="")
Loading
Loading