From 90d419bf3169a9219961401d3b2f3b3eb27bccb4 Mon Sep 17 00:00:00 2001 From: Tamas Koczka Date: Fri, 20 Oct 2023 08:47:51 +0000 Subject: [PATCH] kernelCTF: GHA: move common utils to separate file --- kernelctf/.gitignore | 3 +- kernelctf/check-submission.py | 98 +------------------------------- kernelctf/utils.py | 103 ++++++++++++++++++++++++++++++++++ 3 files changed, 106 insertions(+), 98 deletions(-) create mode 100644 kernelctf/utils.py diff --git a/kernelctf/.gitignore b/kernelctf/.gitignore index 16d3c4db..0b6aefaa 100644 --- a/kernelctf/.gitignore +++ b/kernelctf/.gitignore @@ -1 +1,2 @@ -.cache +.cache/ +__pycache__/ diff --git a/kernelctf/check-submission.py b/kernelctf/check-submission.py index edeb3383..242fa8c0 100755 --- a/kernelctf/check-submission.py +++ b/kernelctf/check-submission.py @@ -1,107 +1,17 @@ #!/usr/bin/env -S python3 -u import os -import re -import subprocess import sys import json import jsonschema -import requests -import csv -import io import hashlib -import time +from utils import * -BASE_DIR = os.path.abspath(os.path.dirname(__file__)) PUBLIC_CSV_URL = "https://docs.google.com/spreadsheets/d/e/2PACX-1vS1REdTA29OJftst8xN5B5x8iIUcxuK6bXdzF8G1UXCmRtoNsoQ9MbebdRdFnj6qZ0Yd7LwQfvYC2oF/pub?output=csv" POC_FOLDER = "pocs/linux/kernelctf/" EXPLOIT_DIR = "exploit/" -CACHE_DIR = f"{BASE_DIR}/.cache" MIN_SCHEMA_VERSION = 2 # DEBUG = "--debug" in sys.argv -errors = [] -warnings = [] - -def error(msg): - global errors - msg = msg.replace('\n', '\n ') - errors.append(msg) - print(f"\n[!] [ERROR] {msg}") - -def warning(msg): - global warnings - msg = msg.replace('\n', '\n ') - warnings.append(msg) - print(f"\n[!] [WARN] {msg}") - -def fail(msg): - print("\n[!] [FAIL] " + msg.replace('\n', '\n ')) - os._exit(1) - -def run(cmd): - try: - result = subprocess.check_output(cmd, shell=True).decode('utf-8').split('\n') - return result if result[-1] != "" else result[0:-1] - except subprocess.CalledProcessError as e: - fail(f"executing '{cmd}' failed with exit code {e.returncode}") - -def subdirEntries(files, subdir): - return list(set([f[len(subdir):].split('/')[0] for f in files if f.startswith(subdir)])) - -def formatList(items, nl=False): - return ('\n' if nl else '').join([f"\n - {item}" for item in items]) - -def printList(title, items): - print(f"\n{title}:" + formatList(items)) - -def errorList(errorMsg, items, warningOnly=False): - itemsStr = ", ".join(f"`{x}`" for x in items) - errorMsg = errorMsg.replace("", itemsStr) if "" in errorMsg else f"{errorMsg}: {itemsStr}" - if warningOnly: - warning(errorMsg) - else: - error(errorMsg) - -def checkOnlyOne(list, errorMsg): - if len(list) > 1: - errorList(errorMsg, list) - return list[0] - -def checkList(items, isAllowedFunc, errorMsg, warningOnly=False): - disallowedItems = [item for item in items if not isAllowedFunc(item)] - if len(disallowedItems) > 0: - errorList(errorMsg, disallowedItems, warningOnly) - return list(sorted(set(items) - set(disallowedItems))) - -def checkAtLeastOne(list, errorMsg): - if len(list) == 0: - fail(errorMsg) - -def checkRegex(text, pattern, errorMsg): - m = re.match(pattern, text) - if not m: - error(f"{errorMsg}. Must match regex `{pattern}`") - return m - -def fetch(url, cache_name, cache_time=3600): - cache_fn = f"{CACHE_DIR}/{cache_name}" - if cache_name and os.path.isfile(cache_fn) and (time.time() - os.path.getmtime(cache_fn) < cache_time): - with open(cache_fn, "rb") as f: return f.read().decode('utf-8') - - response = requests.get(url) - if response.status_code != 200: - fail(f"expected 200 OK for request: {url}") - - if cache_name: - os.makedirs(CACHE_DIR, exist_ok=True) - with open(cache_fn, "wb") as f: f.write(response.content) - - return response.content.decode('utf-8') - -def parseCsv(csvContent): - columns, *rows = list(csv.reader(io.StringIO(csvContent), strict=True)) - return [{ columns[i]: row[i] for i in range(len(columns)) } for row in rows] - argv = [arg for arg in sys.argv if not arg.startswith("--")] print(f"[-] Argv: {argv}") @@ -218,12 +128,6 @@ def parseCsv(csvContent): if schemaVersion >= 3: checkList(flagTargets, lambda t: t in metadata["exploits"].keys(), f"Missing metadata information for exploit(s)") -def ghSet(varName, content): - varName = f"GITHUB_{varName}" - print(f"[+] Writing {json.dumps(content)} to ${varName}") - if varName in os.environ: - with open(os.environ[varName], 'at') as f: f.write(content + "\n") - def summary(success, text): if warnings: text += "\n\n**Warnings:**\n\n" + '\n\n'.join(f" - ⚠️ {x}" for x in warnings) diff --git a/kernelctf/utils.py b/kernelctf/utils.py new file mode 100644 index 00000000..225f6e8d --- /dev/null +++ b/kernelctf/utils.py @@ -0,0 +1,103 @@ +import csv +import io +import json +import os +import subprocess +import re +import requests +import time +from urllib.parse import urlparse + +BASE_DIR = os.path.abspath(os.path.dirname(__file__)) +CACHE_DIR = f"{BASE_DIR}/.cache" + +errors = [] +warnings = [] + +def error(msg): + global errors + msg = msg.replace('\n', '\n ') + errors.append(msg) + print(f"\n[!] [ERROR] {msg}") + +def warning(msg): + global warnings + msg = msg.replace('\n', '\n ') + warnings.append(msg) + print(f"\n[!] [WARN] {msg}") + +def fail(msg): + print("\n[!] [FAIL] " + msg.replace('\n', '\n ')) + os._exit(1) + +def run(cmd): + try: + result = subprocess.check_output(cmd, shell=True).decode('utf-8').split('\n') + return result if result[-1] != "" else result[0:-1] + except subprocess.CalledProcessError as e: + fail(f"executing '{cmd}' failed with exit code {e.returncode}") + +def subdirEntries(files, subdir): + return list(set([f[len(subdir):].split('/')[0] for f in files if f.startswith(subdir)])) + +def formatList(items, nl=False): + return ('\n' if nl else '').join([f"\n - {item}" for item in items]) + +def printList(title, items): + print(f"\n{title}:" + formatList(items)) + +def errorList(errorMsg, items, warningOnly=False): + itemsStr = ", ".join(f"`{x}`" for x in items) + errorMsg = errorMsg.replace("", itemsStr) if "" in errorMsg else f"{errorMsg}: {itemsStr}" + if warningOnly: + warning(errorMsg) + else: + error(errorMsg) + +def checkOnlyOne(list, errorMsg): + if len(list) > 1: + errorList(errorMsg, list) + return list[0] + +def checkList(items, isAllowedFunc, errorMsg, warningOnly=False): + disallowedItems = [item for item in items if not isAllowedFunc(item)] + if len(disallowedItems) > 0: + errorList(errorMsg, disallowedItems, warningOnly) + return list(sorted(set(items) - set(disallowedItems))) + +def checkAtLeastOne(list, errorMsg): + if len(list) == 0: + fail(errorMsg) + +def checkRegex(text, pattern, errorMsg): + m = re.match(pattern, text) + if not m: + error(f"{errorMsg}. Must match regex `{pattern}`") + return m + +def fetch(url, cache_name=None, cache_time=3600): + if not cache_name: + cache_name = os.path.basename(urlparse(url).path) + cache_fn = f"{CACHE_DIR}/{cache_name}" + if cache_name and os.path.isfile(cache_fn) and (time.time() - os.path.getmtime(cache_fn) < cache_time): + with open(cache_fn, "rb") as f: return f.read().decode('utf-8') + + response = requests.get(url) + if response.status_code != 200: + fail(f"expected 200 OK for request: {url}") + + if cache_name: + os.makedirs(CACHE_DIR, exist_ok=True) + with open(cache_fn, "wb") as f: f.write(response.content) + + return response.content.decode('utf-8') + +def parseCsv(csvContent): + columns, *rows = list(csv.reader(io.StringIO(csvContent), strict=True)) + return [{ columns[i]: row[i] for i in range(len(columns)) } for row in rows] + +def ghSet(varName, content): + varName = f"GITHUB_{varName}" + print(f"[+] Writing {json.dumps(content)} to ${varName}") + if varName in os.environ: + with open(os.environ[varName], 'at') as f: f.write(content + "\n")