Skip to content

Commit

Permalink
kernelCTF: GHA: move common utils to separate file
Browse files Browse the repository at this point in the history
  • Loading branch information
koczkatamas committed Oct 20, 2023
1 parent 80f0262 commit 90d419b
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 98 deletions.
3 changes: 2 additions & 1 deletion kernelctf/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.cache
.cache/
__pycache__/
98 changes: 1 addition & 97 deletions kernelctf/check-submission.py
Original file line number Diff line number Diff line change
@@ -1,107 +1,17 @@
#!/usr/bin/env -S python3 -u
import os
import re
import subprocess
import sys
import json
import jsonschema
import requests
import csv
import io
import hashlib
import time
from utils import *

BASE_DIR = os.path.abspath(os.path.dirname(__file__))
PUBLIC_CSV_URL = "https://docs.google.com/spreadsheets/d/e/2PACX-1vS1REdTA29OJftst8xN5B5x8iIUcxuK6bXdzF8G1UXCmRtoNsoQ9MbebdRdFnj6qZ0Yd7LwQfvYC2oF/pub?output=csv"
POC_FOLDER = "pocs/linux/kernelctf/"
EXPLOIT_DIR = "exploit/"
CACHE_DIR = f"{BASE_DIR}/.cache"
MIN_SCHEMA_VERSION = 2
# DEBUG = "--debug" in sys.argv

errors = []
warnings = []

def error(msg):
global errors
msg = msg.replace('\n', '\n ')
errors.append(msg)
print(f"\n[!] [ERROR] {msg}")

def warning(msg):
global warnings
msg = msg.replace('\n', '\n ')
warnings.append(msg)
print(f"\n[!] [WARN] {msg}")

def fail(msg):
print("\n[!] [FAIL] " + msg.replace('\n', '\n '))
os._exit(1)

def run(cmd):
try:
result = subprocess.check_output(cmd, shell=True).decode('utf-8').split('\n')
return result if result[-1] != "" else result[0:-1]
except subprocess.CalledProcessError as e:
fail(f"executing '{cmd}' failed with exit code {e.returncode}")

def subdirEntries(files, subdir):
return list(set([f[len(subdir):].split('/')[0] for f in files if f.startswith(subdir)]))

def formatList(items, nl=False):
return ('\n' if nl else '').join([f"\n - {item}" for item in items])

def printList(title, items):
print(f"\n{title}:" + formatList(items))

def errorList(errorMsg, items, warningOnly=False):
itemsStr = ", ".join(f"`{x}`" for x in items)
errorMsg = errorMsg.replace("<LIST>", itemsStr) if "<LIST>" in errorMsg else f"{errorMsg}: {itemsStr}"
if warningOnly:
warning(errorMsg)
else:
error(errorMsg)

def checkOnlyOne(list, errorMsg):
if len(list) > 1:
errorList(errorMsg, list)
return list[0]

def checkList(items, isAllowedFunc, errorMsg, warningOnly=False):
disallowedItems = [item for item in items if not isAllowedFunc(item)]
if len(disallowedItems) > 0:
errorList(errorMsg, disallowedItems, warningOnly)
return list(sorted(set(items) - set(disallowedItems)))

def checkAtLeastOne(list, errorMsg):
if len(list) == 0:
fail(errorMsg)

def checkRegex(text, pattern, errorMsg):
m = re.match(pattern, text)
if not m:
error(f"{errorMsg}. Must match regex `{pattern}`")
return m

def fetch(url, cache_name, cache_time=3600):
cache_fn = f"{CACHE_DIR}/{cache_name}"
if cache_name and os.path.isfile(cache_fn) and (time.time() - os.path.getmtime(cache_fn) < cache_time):
with open(cache_fn, "rb") as f: return f.read().decode('utf-8')

response = requests.get(url)
if response.status_code != 200:
fail(f"expected 200 OK for request: {url}")

if cache_name:
os.makedirs(CACHE_DIR, exist_ok=True)
with open(cache_fn, "wb") as f: f.write(response.content)

return response.content.decode('utf-8')

def parseCsv(csvContent):
columns, *rows = list(csv.reader(io.StringIO(csvContent), strict=True))
return [{ columns[i]: row[i] for i in range(len(columns)) } for row in rows]

argv = [arg for arg in sys.argv if not arg.startswith("--")]
print(f"[-] Argv: {argv}")

Expand Down Expand Up @@ -218,12 +128,6 @@ def parseCsv(csvContent):
if schemaVersion >= 3:
checkList(flagTargets, lambda t: t in metadata["exploits"].keys(), f"Missing metadata information for exploit(s)")

def ghSet(varName, content):
varName = f"GITHUB_{varName}"
print(f"[+] Writing {json.dumps(content)} to ${varName}")
if varName in os.environ:
with open(os.environ[varName], 'at') as f: f.write(content + "\n")

def summary(success, text):
if warnings:
text += "\n\n**Warnings:**\n\n" + '\n\n'.join(f" - ⚠️ {x}" for x in warnings)
Expand Down
103 changes: 103 additions & 0 deletions kernelctf/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import csv
import io
import json
import os
import subprocess
import re
import requests
import time
from urllib.parse import urlparse

BASE_DIR = os.path.abspath(os.path.dirname(__file__))
CACHE_DIR = f"{BASE_DIR}/.cache"

errors = []
warnings = []

def error(msg):
global errors
msg = msg.replace('\n', '\n ')
errors.append(msg)
print(f"\n[!] [ERROR] {msg}")

def warning(msg):
global warnings
msg = msg.replace('\n', '\n ')
warnings.append(msg)
print(f"\n[!] [WARN] {msg}")

def fail(msg):
print("\n[!] [FAIL] " + msg.replace('\n', '\n '))
os._exit(1)

def run(cmd):
try:
result = subprocess.check_output(cmd, shell=True).decode('utf-8').split('\n')
return result if result[-1] != "" else result[0:-1]
except subprocess.CalledProcessError as e:
fail(f"executing '{cmd}' failed with exit code {e.returncode}")

def subdirEntries(files, subdir):
return list(set([f[len(subdir):].split('/')[0] for f in files if f.startswith(subdir)]))

def formatList(items, nl=False):
return ('\n' if nl else '').join([f"\n - {item}" for item in items])

def printList(title, items):
print(f"\n{title}:" + formatList(items))

def errorList(errorMsg, items, warningOnly=False):
itemsStr = ", ".join(f"`{x}`" for x in items)
errorMsg = errorMsg.replace("<LIST>", itemsStr) if "<LIST>" in errorMsg else f"{errorMsg}: {itemsStr}"
if warningOnly:
warning(errorMsg)
else:
error(errorMsg)

def checkOnlyOne(list, errorMsg):
if len(list) > 1:
errorList(errorMsg, list)
return list[0]

def checkList(items, isAllowedFunc, errorMsg, warningOnly=False):
disallowedItems = [item for item in items if not isAllowedFunc(item)]
if len(disallowedItems) > 0:
errorList(errorMsg, disallowedItems, warningOnly)
return list(sorted(set(items) - set(disallowedItems)))

def checkAtLeastOne(list, errorMsg):
if len(list) == 0:
fail(errorMsg)

def checkRegex(text, pattern, errorMsg):
m = re.match(pattern, text)
if not m:
error(f"{errorMsg}. Must match regex `{pattern}`")
return m

def fetch(url, cache_name=None, cache_time=3600):
if not cache_name:
cache_name = os.path.basename(urlparse(url).path)
cache_fn = f"{CACHE_DIR}/{cache_name}"
if cache_name and os.path.isfile(cache_fn) and (time.time() - os.path.getmtime(cache_fn) < cache_time):
with open(cache_fn, "rb") as f: return f.read().decode('utf-8')

response = requests.get(url)
if response.status_code != 200:
fail(f"expected 200 OK for request: {url}")

if cache_name:
os.makedirs(CACHE_DIR, exist_ok=True)
with open(cache_fn, "wb") as f: f.write(response.content)

return response.content.decode('utf-8')

def parseCsv(csvContent):
columns, *rows = list(csv.reader(io.StringIO(csvContent), strict=True))
return [{ columns[i]: row[i] for i in range(len(columns)) } for row in rows]

def ghSet(varName, content):
varName = f"GITHUB_{varName}"
print(f"[+] Writing {json.dumps(content)} to ${varName}")
if varName in os.environ:
with open(os.environ[varName], 'at') as f: f.write(content + "\n")

0 comments on commit 90d419b

Please sign in to comment.