Paws/PythonScripts/test_disk.py

172 lines
5.5 KiB
Python

import argparse
import subprocess
import time
import sys
import os
from datetime import datetime
import signal
interrupted_once = False
interrupted_time = 0
def handle_sigint(signum, frame):
global interrupted_once, interrupted_time
now = time.time()
if interrupted_once and (now - interrupted_time < 5):
print("\n[!] Second interrupt received. Exiting now.")
sys.exit(1)
else:
interrupted_once = True
interrupted_time = now
print("\n[!] Interrupt received. Press Ctrl+C again within 5 seconds to exit.")
def run_cmd(cmd, log_file, check=True, capture_output=True):
print(f"Running: {cmd}")
result = subprocess.run(cmd, shell=True, capture_output=capture_output, text=True)
log_file.write(f"\n$ {cmd}\n")
if result.stdout:
log_file.write(result.stdout)
if result.stderr:
log_file.write(result.stderr)
log_file.flush() # ✅ Flush after writing
if check and result.returncode != 0:
print(f"[!] Non-zero exit code ({result.returncode}) for: {cmd}")
if result.stderr:
print(result.stderr.strip())
return result.stdout.strip()
def is_nvme(device):
return "nvme" in os.path.basename(device)
def prompt_overwrite():
print("\nWARNING: Disk already has a partition table!")
confirm = input("Type 'yes' to overwrite the entire disk: ").strip().lower()
while confirm != "yes":
confirm = (
input("Incorrect input. Please type 'yes' to proceed: ").strip().lower()
)
def get_device_info(device):
print("Fetching device model and serial...")
output = subprocess.run(
f"sudo smartctl -i {device}", shell=True, capture_output=True, text=True
).stdout
model = serial = "unknown"
for line in output.splitlines():
if any(x in line for x in ["Device Model", "Model Number", "Product"]):
model = line.split(":", 1)[-1].strip().replace(" ", "_")
elif "Vendor:" in line and model == "unknown":
vendor = line.split(":", 1)[-1].strip().replace(" ", "_")
model = vendor # temporary, could be combined later
elif "Serial Number" in line or "Serial number" in line:
serial = line.split(":", 1)[-1].strip()
elif "Logical Unit id" in line and serial == "unknown":
serial = line.split()[-1].strip()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if model == "unknown" or serial == "unknown":
print("Failed to extract model or serial. Using timestamp for filename.")
return f"drive_test_{timestamp}.log"
return f"{model}_{serial}_test_{timestamp}.log"
def show_smart_info(device, log_file):
print("Gathering extended SMART info...")
cmd = f"sudo smartctl -x {'-d nvme' if is_nvme(device) else ''} {device}"
run_cmd(cmd, log_file)
def run_smart_test(device, log_file):
print("Running short SMART self-test (2 minutes)...")
cmd = f"sudo smartctl -t short {'-d nvme' if is_nvme(device) else ''} {device}"
run_cmd(cmd, log_file, check=False)
time.sleep(120)
print("SMART Self-Test Results:")
cmd = f"sudo smartctl -l selftest {'-d nvme' if is_nvme(device) else ''} {device}"
run_cmd(cmd, log_file)
def check_partition_table(device, log_file):
print("Checking for existing partition table...")
output = run_cmd(f"sudo parted -s {device} print", log_file, check=False)
return not (
"Partition Table: unknown" in output or "unrecognised disk label" in output
)
def run_badblocks(device, log_file):
print("Running non-destructive read-only badblocks check (this may take time)...")
run_cmd(f"sudo badblocks -b 4096 -sv {device}", log_file, check=False)
print("Badblocks check complete.")
def fill_random(device, log_file):
print("Writing 25 GiB of random data to disk...")
run_cmd(
f"sudo dd if=/dev/urandom of={device} bs=1M count=25600 status=progress",
log_file,
check=False,
)
print("Random data write complete.")
def run_fio(device, rw_mode, log_file, bs="4k", runtime=60):
print(f"Running fio test: {rw_mode}")
cmd = (
f"sudo fio --name={rw_mode}_test "
f"--filename={device} --direct=1 --rw={rw_mode} "
f"--bs={bs} --iodepth=16 --numjobs=1 "
f"--time_based --runtime={runtime} --group_reporting"
)
run_cmd(cmd, log_file)
def main():
signal.signal(signal.SIGINT, handle_sigint)
parser = argparse.ArgumentParser(
description="Test used drive with SMART, badblocks, and fio."
)
parser.add_argument("device", help="Target device (e.g., /dev/sdX)")
args = parser.parse_args()
device = args.device
if not os.path.exists(device):
print(f"Device {device} not found.")
sys.exit(1)
log_filename = get_device_info(device)
with open(log_filename, "w") as log_file:
print(f"Logging to {log_filename}...")
show_smart_info(device, log_file)
run_smart_test(device, log_file)
run_badblocks(device, log_file)
if check_partition_table(device, log_file):
prompt_overwrite()
fill_random(device, log_file)
else:
fill_random(device, log_file)
run_fio(device, "write", log_file)
run_fio(device, "read", log_file)
run_fio(device, "randread", log_file)
run_fio(device, "randwrite", log_file)
run_fio(device, "randread", log_file, bs="512", runtime=30) # IOPS test
print(f"\nTest completed. Results saved to {log_filename}")
if __name__ == "__main__":
main()