diff --git a/problems/nvidia/eval_better_bench.py b/problems/nvidia/eval_better_bench.py index 007781e..fbc148a 100644 --- a/problems/nvidia/eval_better_bench.py +++ b/problems/nvidia/eval_better_bench.py @@ -79,7 +79,12 @@ def get_test_cases(file_name: str, seed: Optional[int]) -> list[TestCase]: tests = [] lines = content.splitlines() - match = r"\s*([a-zA-Z]+):\s*([a-zA-Z]+|[+-]?[0-9]+)\s*" + # Match key: value pairs where value can be: + # - a list like [1, 2, 3] (needed for group gemm which has per-group dimensions) + # - a tuple like (1, 2, 3) + # - an integer + # - an alphabetic string + match = r"\s*([a-zA-Z_]+)\s*:\s*(\[[^\]]*\]|\([^)]*\)|[a-zA-Z_]+|[+-]?[0-9]+)\s*" for line in lines: parts = line.split(";") case = {} @@ -93,7 +98,16 @@ def get_test_cases(file_name: str, seed: Optional[int]) -> list[TestCase]: try: val = int(val) except ValueError: - pass + # Try parsing as tuple/list (e.g., [1, 2, 3] for group gemm dimensions) + if (val.startswith('(') and val.endswith(')')) or (val.startswith('[') and val.endswith(']')): + try: + inner = val[1:-1].strip() + if inner: + val = tuple(int(x.strip()) for x in inner.split(',')) + else: + val = tuple() + except ValueError: + pass case[key] = val tests.append(TestCase(spec=line, args=case)) diff --git a/problems/nvidia/nvfp4_group_gemm/eval.py b/problems/nvidia/nvfp4_group_gemm/eval.py deleted file mode 100644 index 2f00f53..0000000 --- a/problems/nvidia/nvfp4_group_gemm/eval.py +++ /dev/null @@ -1,431 +0,0 @@ -import base64 -import dataclasses -import multiprocessing -import re -import time -import os -import sys -import math -from pathlib import Path -from typing import Any, Optional -import tempfile - -import torch.cuda -from cutlass.cute.nvgpu.common import OpError - -from utils import set_seed, clear_l2_cache - -try: - from task import TestSpec -except ImportError: - TestSpec = dict - -from reference import check_implementation, generate_input - - -class PopcornOutput: - def __init__(self, fd: int): - self.file = os.fdopen(fd, "w") - os.set_inheritable(fd, False) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.file.close() - - def print(self, *args, **kwargs): - print(*args, **kwargs, file=self.file, flush=True) - - def log(self, key, value): - self.print(f"{key}: {value}") - - -@dataclasses.dataclass -class TestCase: - args: dict - spec: str - - -def _combine(a: int, b: int) -> int: - # combine two integers into one: - # we need this to generate a secret seed based on the test-level seed and - # the global secret seed. - # the test-level seeds are public knowledge, and typically relatively small numbers, - # so we need to make sure they don't provide any useful info for the full seed. - # This Cantor construction ensures that if the secret seed is a large number, - # then so is the overall seed. - return int(a + (a + b) * (a + b + 1) // 2) - - -def get_test_cases(file_name: str, seed: Optional[int]) -> list[TestCase]: - try: - content = Path(file_name).read_text() - except Exception as E: - print(f"Could not open test file`{file_name}`: {E}", file=sys.stderr) - exit(113) - - tests = [] - lines = content.splitlines() - # Match key: value pairs where value can be: - # - a list like [1, 2, 3] - # - a tuple like (1, 2, 3) - # - an integer - # - an alphabetic string - match = r"\s*([a-zA-Z_]+)\s*:\s*(\[[^\]]*\]|\([^)]*\)|[a-zA-Z_]+|[+-]?[0-9]+)\s*" - for line in lines: - if not line.strip(): - continue - parts = line.split(";") - case = {} - for part in parts: - if not part.strip(): - continue - matched = re.fullmatch(match, part) - if not matched: - print(f"invalid test case: '{line}': '{part}'", file=sys.stderr) - exit(113) - key = matched[1] - val = matched[2] - try: - val = int(val) - except ValueError: - # Try parsing as tuple/list - if (val.startswith('(') and val.endswith(')')) or (val.startswith('[') and val.endswith(']')): - try: - inner = val[1:-1].strip() - if inner: - val = tuple(int(x.strip()) for x in inner.split(',')) - else: - val = tuple() - except ValueError: - pass - - case[key] = val - tests.append(TestCase(spec=line, args=case)) - - if seed is not None: - for test in tests: - if "seed" in test.args: - test.args["seed"] = _combine(test.args["seed"], seed) - - return tests - - -@dataclasses.dataclass -class Stats: - runs: int - mean: float - std: float - err: float - best: float - worst: float - - -def calculate_stats(durations: list[int]): - """ - Calculate statistical data from a list of durations. - @param durations: A list of durations in nanoseconds. - @return: A Stats object containing the number of runs, mean, standard deviation, error, best, and worst durations. - """ - runs = len(durations) - total = sum(durations) - best = min(durations) - worst = max(durations) - - avg = total / runs - variance = sum(map(lambda x: (x - avg) ** 2, durations)) - std = math.sqrt(variance / (runs - 1)) - err = std / math.sqrt(runs) - - return Stats( - runs=runs, mean=avg, std=std, err=err, best=float(best), worst=float(worst) - ) - - -def _clone_data(data): - """ - Recursively goes through data and clones all tensors. - """ - if isinstance(data, tuple): - return tuple(_clone_data(x) for x in data) - elif isinstance(data, list): - return [_clone_data(x) for x in data] - elif isinstance(data, dict): - return {k: _clone_data(v) for k, v in data.items()} - elif isinstance(data, torch.Tensor): - return data.clone() - else: - return data - - -def _run_single_test(test: TestCase): - """ - Runs a single test case. Do not call directly - """ - from submission import custom_kernel - - data = generate_input(**test.args) - torch.cuda.synchronize() - try: - submission_output = custom_kernel(_clone_data(data)) - - except OpError as E: - print(f"Encountered {E}", file=sys.stderr) - return False, str(E) - torch.cuda.synchronize() - return check_implementation(data, submission_output) - - -def run_single_test(pool: multiprocessing.Pool, test: TestCase): - """ - Runs a single test in another process. - """ - return pool.apply(_run_single_test, (test,)) - - -def run_testing( - logger: PopcornOutput, pool: multiprocessing.Pool, tests: list[TestCase] -): - """ - Executes the actual test case code and checks for correctness. - @param logger: A PopcornOutput object used for logging test results. - @param tests: A list of TestCase objects representing the test cases to be executed. - @return: An integer representing the exit status: 0 if all tests pass, otherwise 112. - """ - passed = True - logger.log("test-count", len(tests)) - for idx, test in enumerate(tests): - logger.log(f"test.{idx}.spec", test.spec) - good, message = run_single_test(pool, test) - if not good: - logger.log(f"test.{idx}.status", "fail") - logger.log(f"test.{idx}.error", message) - passed = False - else: - logger.log(f"test.{idx}.status", "pass") - if message: - logger.log(f"test.{idx}.message", message) - - if passed: - logger.log("check", "pass") - return 0 - else: - logger.log("check", "fail") - return 112 - - -def _run_single_benchmark( - test: TestCase, recheck: bool, max_repeats: int, max_time_ns: float -) -> Stats | Any: - """ - Runs one benchmark. Do not call directly. - """ - from submission import custom_kernel - - durations = [] - # generate input data once - data = generate_input(**test.args) - check_copy = _clone_data(data) - # first, one obligatory correctness check - try: - output = custom_kernel(_clone_data(data)) - except OpError as E: - return f"Encountered {E}" - good, message = check_implementation(check_copy, output) - if not good: - return message - - # now, do multiple timing runs without further correctness testing - # there is an upper bound of 100 runs, and a lower bound of 3 runs; - # otherwise, we repeat until we either measure at least 10 full seconds, - # or the relative error of the mean is below 1%. - - bm_start_time = time.perf_counter_ns() - for i in range(max_repeats): - if recheck: - # ensure we use a different seed for every benchmark - if "seed" in test.args: - test.args["seed"] += 13 - - data = generate_input(**test.args) - check_copy = _clone_data(data) - torch.cuda.synchronize() - start_event = torch.cuda.Event(enable_timing=True) - end_event = torch.cuda.Event(enable_timing=True) - clear_l2_cache() - - start_event.record() - output = custom_kernel(data) - end_event.record() - torch.cuda.synchronize() - duration = start_event.elapsed_time(end_event) * 1e6 # Convert ms to ns - - if recheck: - good, message = check_implementation(check_copy, output) - if not good: - return message - - del output - durations.append(duration) - - if i > 1: - total_bm_duration = time.perf_counter_ns() - bm_start_time - stats = calculate_stats(durations) - # stop if either - # a) relative error dips below 0.1% - # b) we exceed the total time limit for benchmarking the kernel - # c) we exceed 2 minutes of total wallclock time. - if ( - stats.err / stats.mean < 0.001 - or stats.mean * stats.runs > max_time_ns - or total_bm_duration > 120e9 - ): - break - - return calculate_stats(durations) - - -def run_single_benchmark( - pool: multiprocessing.Pool, - test: TestCase, - recheck: bool, - max_repeats: int, - max_time_ns: float, -): - """ - For a particular test case, check correctness (if applicable) and grab runtime results. - @param pool: Process on which the benchmark will be launched. - @param test: TestCase object. - @param recheck: Flag for whether to explicitly check functional correctness. - @param max_repeats: Number of trials to repeat. - @param max_time_ns: Timeout time in nanoseconds. - @return: A Stats object for this particular benchmark case or an error if the test fails. - """ - return pool.apply(_run_single_benchmark, (test, recheck, max_repeats, max_time_ns)) - - -def run_benchmarking( - logger: PopcornOutput, pool: multiprocessing.Pool, tests: list[TestCase] -): - """ - Executes benchmarking code for a CUDA Kernel and logs runtimes. - @param logger: A PopcornOutput object used for logging benchmark results. - @param pool: Process on which the benchmarks will be launched. - @param tests: A list of TestCase objects representing the test cases to be benchmarked. - @return: An integer representing the exit status: 0 if all benchmarks pass, otherwise 112. - """ - # warm up - run_single_benchmark(pool, tests[0], False, 100, 10e7) - - passed = True - logger.log("benchmark-count", len(tests)) - for idx, test in enumerate(tests): - logger.log(f"benchmark.{idx}.spec", test.spec) - result = run_single_benchmark(pool, test, False, 100, 10e9) - if isinstance(result, Stats): - for field in dataclasses.fields(Stats): - logger.log(f"benchmark.{idx}.{field.name}", getattr(result, field.name)) - else: - passed = False - logger.log(f"benchmark.{idx}.status", "fail") - logger.log(f"benchmark.{idx}.error", result) - - if passed: - logger.log("check", "pass") - return 0 - else: - logger.log("check", "fail") - return 112 - - -def run_single_profile(test: TestCase) -> str: - """ - Runs a single test case. Do not call directly - """ - from submission import custom_kernel - from torch.profiler import profile, record_function, ProfilerActivity - - data = generate_input(**test.args) - torch.cuda.synchronize() - - with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA]) as prof: - submission_output = custom_kernel(_clone_data(data)) - torch.cuda.synchronize() - return prof.key_averages().table(sort_by="self_cuda_time_total", row_limit=20) - - -def run_profiling(logger: PopcornOutput, tests: list[TestCase]): - logger.log("benchmark-count", len(tests)) - for idx, test in enumerate(tests): - logger.log(f"benchmark.{idx}.spec", test.spec) - report = run_single_profile(test) - logger.log( - f"benchmark.{idx}.report", - base64.b64encode(report.encode("utf-8"), b"+*").decode("utf-8"), - ) - logger.log("check", "pass") - return 0 - - -def main(): - fd = os.getenv("POPCORN_FD") - if not fd: - return 111 - - if len(sys.argv) < 3: - return 2 - - mode = sys.argv[1] - seed = os.getenv("POPCORN_SEED") - os.unsetenv("POPCORN_SEED") - seed = int(seed) if seed else None - set_seed(seed or 42) - - # Parse test cases from temp file (text format from kernelbot) - tests = get_test_cases(sys.argv[2], seed) - - with PopcornOutput(int(fd)) as logger: - import multiprocessing - - mp_context = multiprocessing.get_context("spawn") - with mp_context.Pool(1) as pool: - if mode == "test": - return run_testing(logger, pool, tests) - if mode == "benchmark": - return run_benchmarking(logger, pool, tests) - - if mode == "leaderboard": - # warmup - run_single_benchmark(pool, tests[0], False, 100, 1e7) - logger.log("benchmark-count", len(tests)) - passed = True - for i in range(len(tests)): - result = run_single_benchmark(pool, tests[i], True, 100, 30e9) - logger.log(f"benchmark.{i}.spec", tests[i].spec) - if isinstance(result, Stats): - for field in dataclasses.fields(Stats): - logger.log( - f"benchmark.{i}.{field.name}", - getattr(result, field.name), - ) - else: - passed = False - logger.log(f"benchmark.{i}.status", "fail") - logger.log( - f"benchmark.{i}.error", str(result) - ) # TODO: Make sure result implements __str__? - break - - logger.log("check", "pass" if passed else "fail") - elif mode == "profile": - run_profiling(logger, tests) - else: - # TODO: Implement script mode - return 2 - - -if __name__ == "__main__": - print("main") - main() - print("main end") \ No newline at end of file diff --git a/problems/nvidia/nvfp4_group_gemm/task.yml b/problems/nvidia/nvfp4_group_gemm/task.yml index df25a7c..01bdc2f 100644 --- a/problems/nvidia/nvfp4_group_gemm/task.yml +++ b/problems/nvidia/nvfp4_group_gemm/task.yml @@ -5,7 +5,8 @@ files: - {"name": "task.py", "source": "task.py"} - {"name": "utils.py", "source": "utils.py"} - {"name": "reference.py", "source": "reference.py"} - - {"name": "eval.py", "source": "eval.py"} + # Use eval_better_bench.py for batched timing (50 kernels per measurement) to reduce variance + - {"name": "eval.py", "source": "../eval_better_bench.py"} lang: "py"