Files
2026-05-19 17:19:36 +08:00

238 lines
7.1 KiB
Python

import os
import sys
import time
import datetime
import logging
import json
import base64
import shutil
import subprocess
import threading
import numpy as np
try:
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
except ImportError as exc:
print(
"[BENCHMARKS|CRITICAL]: Required package 'cryptography' is not installed. "
"Install it with: pip install cryptography",
file=sys.stderr,
)
raise SystemExit(1) from exc
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" # disable Tensorflow warnings
PUBLIC_KEY_PEM = b"""-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqguy5ezlNj90/+7LeF5l
ufetjhBKSqe+CAknLSA9xJ4Iz8IoFvfjvxSR8zyhsD7zcIcIMlkt7LBbl0IdiXv2
8yLP973j4xbkLindkulQEKeyE1Yf5g0TdbHCsNafs7GCwkR582WlnsV4hditqLLT
jwMKcW3Pkdg5UnuS/alFcXCmHbZJMC7odgBkg+UWTWGueOBhKYil8+6QUW1Ih9t8
oSWc3L16/jzzNkheI44dCBDCqh3YuJXkGTd866OURaovmAfvDYvt1mMWVVKYU6Jq
OhXwzok2//uGZoOpCCO3KGkaXfCfAOjg6rrs1Wd8Be/W3DzkM6nTaaXpHTTu0Slm
XwIDAQAB
-----END PUBLIC KEY-----
"""
_SECURE_INTERVAL = 5 # 10 minutes
_PROGRAM_START = time.time()
_GPU_UTIL_THRESHOLD = 1.0 # %; above => running on GPU
def _gpu_util_percent():
# No NVIDIA GPU / no nvidia-smi: treat as 0% and continue.
if not shutil.which("nvidia-smi"):
return 0.0
try:
result = subprocess.run(
[
"nvidia-smi",
"--query-gpu=utilization.gpu",
"--format=csv,noheader,nounits",
],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode != 0:
return 0.0
vals = []
for line in result.stdout.splitlines():
line = line.strip()
if not line:
continue
try:
vals.append(float(line))
except ValueError:
continue
return round(max(vals), 1) if vals else 0.0
except Exception:
return 0.0
def _cpu_util_percent():
# Linux /proc/stat sample; non-Linux or error => 0.0
if not os.path.isfile("/proc/stat"):
return 0.0
try:
def _read_idle_total():
with open("/proc/stat") as f:
parts = f.readline().split()
if len(parts) < 5 or parts[0] != "cpu":
return None
nums = [int(x) for x in parts[1:]]
idle = nums[3] + (nums[4] if len(nums) > 4 else 0)
return idle, sum(nums)
idle1, total1 = _read_idle_total()
if idle1 is None:
return 0.0
time.sleep(0.1)
idle2, total2 = _read_idle_total()
if idle2 is None:
return 0.0
dt = total2 - total1
if dt <= 0:
return 0.0
util = (1.0 - (idle2 - idle1) / dt) * 100.0
return round(max(0.0, min(100.0, util)), 1)
except Exception:
return 0.0
def _platform_from_util():
try:
gpu_util = _gpu_util_percent()
cpu_util = _cpu_util_percent()
platform = "GPU" if gpu_util > _GPU_UTIL_THRESHOLD else "CPU"
return platform, gpu_util, cpu_util
except Exception:
return "CPU", 0.0, 0.0
def _append_secure(logger):
pub = serialization.load_pem_public_key(PUBLIC_KEY_PEM)
pad = padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
)
platform, gpu_util, cpu_util = _platform_from_util()
audit = {
"timestamp": round(time.time() - _PROGRAM_START, 3),
"platform": platform,
"gpu_util": gpu_util,
"cpu_util": cpu_util,
}
enc = base64.b64encode(pub.encrypt(json.dumps(audit).encode(), pad)).decode()
logger[-1].setdefault("secure_log", []).append(enc)
return audit
def _flush_secure(logger):
if logger.filename is None:
return None
with logger._secure_lock:
audit = _append_secure(logger)
with open(logger.filename, "w") as f:
json.dump(logger, f)
log.info(
"util_audit: platform={platform} gpu_util={gpu_util}% cpu_util={cpu_util}%".format(
**audit
)
)
return audit
def _secure_timer_worker(logger):
while not logger._secure_stop.wait(_SECURE_INTERVAL):
_flush_secure(logger)
def _start_secure_timer(logger):
if logger.filename is None or (
logger._secure_thread is not None and logger._secure_thread.is_alive()
):
return
logger._secure_stop.clear()
logger._secure_thread = threading.Thread(
target=_secure_timer_worker,
args=(logger,),
name="util-audit-timer",
daemon=True,
)
logger._secure_thread.start()
def _stop_secure_timer(logger):
logger._secure_stop.set()
if logger._secure_thread is not None:
logger._secure_thread.join(timeout=_SECURE_INTERVAL + 5)
logger._secure_thread = None
class CustomHandler(logging.StreamHandler):
"""Custom handler for stdout logging."""
def format(self, record):
"""Format the record with specific format."""
fmt = f'[BENCHMARKS|%(levelname)s|%(asctime)s]: %(message)s'
return logging.Formatter(fmt, datefmt='%Y-%m-%d %H:%M:%S').format(record)
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
log.addHandler(CustomHandler())
class JsonLogger(list):
def __init__(self, filename=None):
self.filename = filename
self._secure_lock = threading.Lock()
self._secure_stop = threading.Event()
self._secure_thread = None
if filename is not None:
if os.path.isfile(filename):
with open(filename, "r") as file:
super().__init__(json.load(file))
log.info("Extending existing logs from {}.".format(filename))
else:
log.info("Creating new logs in {}.".format(filename))
super().__init__()
else:
log.warning("Filename was not provided and logs will not be saved.")
super().__init__()
self.append(dict())
now = datetime.datetime.now()
self.log(datetime=now.strftime("%Y-%m-%d %H:%M:%S"))
_start_secure_timer(self)
def log(self, **kwargs):
with self._secure_lock:
self[-1].update(kwargs)
for k, v in kwargs.items():
log.info(f"{k}: {v}")
def average(self, key):
with self._secure_lock:
self[-1][f"{key}_mean"] = np.mean(self[-1][key])
if len(self[-1][key]) == 1:
self[-1][f"{key}_std"] = 0.0
else:
self[-1][f"{key}_std"] = np.std(self[-1][key], ddof=1)
mean = self[-1][f"{key}_mean"]
std = self[-1][f"{key}_std"]
log.info("{}_mean: {}".format(key, mean))
log.info("{}_std: {}".format(key, std))
def __str__(self):
return "\n" + "\n".join(f"{k}: {v}" for k, v in self[-1].items())
def dump(self):
_stop_secure_timer(self)
if self.filename is not None:
with self._secure_lock:
with open(self.filename, "w") as file:
json.dump(self, file)