assembly/amd: bug fixes for PYTHON_REMU (#14347)

* default PYTHON_REMU to 1

* mockgpu

* less size

* normal compile path

* uniqie

* more

* fix clamp

* Change PYTHON_REMU default to 0 in _try_dlopen_remu
This commit is contained in:
George Hotz 2026-01-27 00:48:22 +08:00 committed by GitHub
commit 204f51e739
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 211 additions and 212 deletions

View file

@ -41,11 +41,9 @@ class _MXCSRContext:
lib.set_mxcsr(self._saved)
from tinygrad.uop.ops import UOp, Ops, KernelInfo, AxisType
from tinygrad.dtype import dtypes
from tinygrad.codegen import get_program
from tinygrad.device import Device, Buffer, BufferSpec
from tinygrad.runtime.autogen import hsa
from tinygrad.helpers import Context, DEBUG, colored, TUPLE_ORDER, getenv
from tinygrad.renderer import ProgramSpec
from tinygrad.helpers import Context, DEBUG, colored
from extra.assembly.amd.decode import decode_inst
from extra.assembly.amd.autogen.rdna3.str_pcode import PCODE
@ -144,6 +142,27 @@ def _val_to_u32(val: UOp) -> UOp:
if val.dtype in (dtypes.uint16, dtypes.int16): return val.cast(dtypes.uint32)
return val.cast(dtypes.uint32)
def _apply_clamp(val: UOp, clmp: int | UOp) -> UOp:
"""Apply VOP3 clamp modifier: clamp float results to [0.0, 1.0] range."""
if isinstance(clmp, int) and clmp == 0: return val
# Only clamp float types
if val.dtype == dtypes.float32:
zero, one = UOp.const(dtypes.float32, 0.0), UOp.const(dtypes.float32, 1.0)
clamped = val.maximum(zero).minimum(one)
if isinstance(clmp, UOp): return clmp.ne(_c(0)).where(clamped, val)
return clamped
if val.dtype == dtypes.half:
zero, one = UOp.const(dtypes.half, 0.0), UOp.const(dtypes.half, 1.0)
clamped = val.maximum(zero).minimum(one)
if isinstance(clmp, UOp): return clmp.ne(_c(0)).where(clamped, val)
return clamped
if val.dtype == dtypes.float64:
zero, one = UOp.const(dtypes.float64, 0.0), UOp.const(dtypes.float64, 1.0)
clamped = val.maximum(zero).minimum(one)
if isinstance(clmp, UOp): return clmp.ne(_c(0)).where(clamped, val)
return clamped
return val
# Pcode parser
def _apply_pseudocode_fixes(op_name: str, pcode: str) -> str:
fixes = {
@ -330,7 +349,8 @@ def compile_lane_pcode(op, inst, ctx: '_Ctx', inc_pc_fn, name: str):
return name, UOp.sink(*stores, *inc_pc_fn(), arg=KernelInfo(name=name))
def compile_vop_pcode(op, srcs: dict[str, UOp], lane: UOp, wvgpr_fn, wsgpr_fn, rsgpr_fn, vdst_reg: UOp, exec_mask: UOp,
inc_pc_fn=None, name: str = None, opsel_dst_hi: bool | UOp = False, rvgpr_fn=None, sdst_reg: int | None = None):
inc_pc_fn=None, name: str = None, opsel_dst_hi: bool | UOp = False, rvgpr_fn=None, sdst_reg: int | None = None,
clmp: int | UOp = 0):
"""Compile a VOP instruction using pcode parser. Returns (name, sink) if inc_pc_fn/name provided, else list of store UOps, or None."""
pcode = PCODE.get(op)
if pcode is None: return None
@ -355,6 +375,8 @@ def compile_vop_pcode(op, srcs: dict[str, UOp], lane: UOp, wvgpr_fn, wsgpr_fn, r
val.cast(dtypes.uint32) if val.dtype in (dtypes.uint16, dtypes.int16) else val.cast(dtypes.uint32) & UOp.const(dtypes.uint32, slice_mask)
raw_stores.append(('vgpr_slice', (lo_bit, width, val_bits)))
continue
# Apply clamp modifier for float types
val = _apply_clamp(val, clmp)
if val.dtype in (dtypes.uint64, dtypes.int64, dtypes.float64):
lo, hi = _split64(val)
raw_stores.extend([('vgpr', wvgpr(vdst_reg, lane, lo, exec_mask)), ('vgpr', wvgpr(vdst_reg + _c(1), lane, hi, exec_mask))])
@ -777,13 +799,14 @@ def _compile_vop3(inst: VOP3, ctx: _Ctx, name: str) -> tuple[str, UOp]:
# FMAC instructions need D0 (accumulator) from destination register
if 'FMAC' in op_name: srcs['D0'] = ctx.rvgpr_dyn(vdst_reg, lane)
opsel_dst_hi = bool(opsel & 0b1000) and _is_16bit_op(op_name)
clmp = getattr(inst, 'clmp', 0) or 0
if opsel_dst_hi:
stores = compile_vop_pcode(inst.op, srcs, lane, ctx.wvgpr_dyn, ctx.wsgpr_dyn, ctx.rsgpr_dyn, vdst_reg, exec_mask, opsel_dst_hi=True,
rvgpr_fn=ctx.rvgpr_dyn)
rvgpr_fn=ctx.rvgpr_dyn, clmp=clmp)
if stores is not None:
return name, UOp.sink(*stores, *ctx.inc_pc(), arg=KernelInfo(name=name))
pcode_result = compile_vop_pcode(inst.op, srcs, lane, ctx.wvgpr_dyn, ctx.wsgpr_dyn, ctx.rsgpr_dyn, vdst_reg, exec_mask, ctx.inc_pc, name,
rvgpr_fn=ctx.rvgpr_dyn)
rvgpr_fn=ctx.rvgpr_dyn, clmp=clmp)
assert pcode_result is not None, f"no pcode for VOP3: {inst.op.name}"
return pcode_result
@ -1117,37 +1140,6 @@ _INST_HANDLERS: dict[type, callable] = {
# PROGRAM DECODE AND COMPILATION
# ═══════════════════════════════════════════════════════════════════════════════
# Backend selection: EMU2_BACKEND=clang (default) or llvm
EMU2_BACKEND = getenv("EMU2_BACKEND", "clang")
def _get_backend():
"""Get renderer, compiler, and program class based on EMU2_BACKEND."""
if EMU2_BACKEND == "llvm":
from tinygrad.renderer.llvmir import CPULLVMRenderer
from tinygrad.runtime.support.compiler_cpu import CPULLVMCompiler
from tinygrad.runtime.ops_cpu import CPUProgram
return CPULLVMRenderer(), CPULLVMCompiler(), CPUProgram
else: # clang (default)
from tinygrad.renderer.cstyle import ClangRenderer
from tinygrad.runtime.support.compiler_cpu import ClangJITCompiler
from tinygrad.runtime.ops_cpu import CPUProgram
return ClangRenderer(), ClangJITCompiler(), CPUProgram
_emu_renderer, _emu_compiler, _ProgramClass = _get_backend()
def _elf_symbol_offsets(obj: bytes) -> dict[str, int]:
"""Parse ELF object file and return {symbol_name: offset} for all defined symbols."""
from tinygrad.runtime.support.elf import elf_loader, libc
def _strtab(blob: bytes, idx: int) -> str: return blob[idx:blob.find(b'\x00', idx)].decode('utf-8')
_, sections, _ = elf_loader(obj)
symtab_sec = next((s for s in sections if s.header.sh_type == libc.SHT_SYMTAB), None)
if symtab_sec is None: return {}
strtab_sec = sections[symtab_sec.header.sh_link] if symtab_sec.header.sh_link < len(sections) else None
if strtab_sec is None: return {}
symbols = (libc.Elf64_Sym * (symtab_sec.header.sh_size // symtab_sec.header.sh_entsize)).from_buffer_copy(symtab_sec.content)
return {name: sections[sym.st_shndx].header.sh_addr + sym.st_value
for sym in symbols if 0 < sym.st_shndx < len(sections) and (name := _strtab(strtab_sec.content, sym.st_name))}
@functools.cache
def _get_inst_sink(inst_bytes: bytes) -> tuple[UOp, tuple[int, int, int]]:
"""Build UOp sink for instruction bytes. Returns (sink, (base, mask, size)) with canonical name."""
@ -1171,76 +1163,51 @@ def _get_inst_sink(inst_bytes: bytes) -> tuple[UOp, tuple[int, int, int]]:
canonical_name = f"{_op_name(inst).lower()}_{base.to_bytes(size, 'little').hex()}"
return sink.replace(arg=KernelInfo(name=canonical_name)).rtag(1), (base, mask, size)
_canonical_prg_cache: list[tuple[int, int, int, ProgramSpec]] = [] # [(base, mask, size, prg), ...]
_last_compiled_new: bool = False # set by _get_inst_prg when compiling new instruction
_canonical_runner_cache: list[tuple[int, int, int, object]] = [] # [(base, mask, size, runner), ...]
def _match_canonical(inst_int: int, inst_size: int) -> ProgramSpec | None:
def _match_canonical(inst_int: int, inst_size: int) -> object | None:
"""Check if instruction matches any cached (base, mask, size) pattern."""
for base, mask, size, prg in _canonical_prg_cache:
if inst_size != size: continue # must match instruction size exactly
if (inst_int & mask) == base: return prg
for base, mask, size, runner in _canonical_runner_cache:
if inst_size == size and (inst_int & mask) == base: return runner
return None
@functools.cache
def _get_inst_prg(inst_bytes: bytes) -> ProgramSpec:
"""Compile instruction bytes to ProgramSpec. Cached by instruction bytes, with canonical dedup."""
global _last_compiled_new
# Decode instruction to get size for canonical matching
def _get_runner(inst_bytes: bytes):
"""Build and compile instruction to CompiledRunner. Cached by instruction bytes, with canonical dedup."""
from tinygrad.engine.realize import get_runner
inst = decode_inst(inst_bytes)
inst_size = inst.size()
inst_int = int.from_bytes(inst_bytes[:inst_size], 'little')
# Check canonical cache BEFORE building sink (avoids expensive UOp construction)
if (prg := _match_canonical(inst_int, inst_size)) is not None:
_last_compiled_new = False
return prg
if (runner := _match_canonical(inst_int, inst_size)) is not None: return runner, False
sink, (base, mask, size) = _get_inst_sink(inst_bytes)
with Context(NOOPT=1, IGNORE_OOB=1, TUPLE_ORDER=0):
prg = get_program(sink, _emu_renderer)
_canonical_prg_cache.append((base, mask, size, prg))
_last_compiled_new = True
return prg
runner = get_runner('CPU', sink)
_canonical_runner_cache.append((base, mask, size, runner))
return runner, True
@functools.cache
def decode_program(data: bytes) -> dict[int, tuple[str, object, list[int], object]]:
"""Decode program to {pc: (name, program, globals, holder)}."""
# Collect all instruction programs
inst_info: list[tuple[int, ProgramSpec]] = [] # (pc_bytes, prg)
"""Decode program to {pc: (name, fxn, globals, runner)}."""
result: dict[int, tuple[str, object, list[int], object]] = {}
i = 0
while i < len(data):
inst = decode_inst(data[i:])
if isinstance(inst, SOPP) and inst.op == SOPPOp.S_CODE_END: break
try:
prg = _get_inst_prg(bytes(data[i:i + inst.size() + 4]))
inst_info.append((i, prg)) # PC is now byte offset
runner, is_new = _get_runner(bytes(data[i:i + inst.size() + 4]))
if DEBUG >= 3:
try: inst_str = repr(inst)
except Exception: inst_str = f"<{type(inst).__name__} at PC={i}>"
msg = f"[emu2] PC={i}: {inst_str}"
print(colored(msg, 'green') if _last_compiled_new else msg)
if DEBUG >= 4: print(f"{colored(prg.src, 'BLACK')}")
print(colored(msg, 'green') if is_new else msg)
if DEBUG >= 4: print(f"{colored(runner.p.src, 'BLACK')}")
result[i] = (runner.p.function_name, runner._prg.fxn, runner.p.globals, runner)
except Exception as e:
try: inst_str = repr(inst)
except Exception: inst_str = f"<{type(inst).__name__}>"
raise RuntimeError(f"[emu2] Failed to compile PC={i} {inst_str}: {type(e).__name__}: {e}") from e
i += inst.size()
if not inst_info: return {}
# Batch compile and create function pointers
from tinygrad.runtime.support.elf import jit_loader
seen_funcs: set[str] = set()
combined_src_parts: list[str] = []
for pc, prg in inst_info:
if prg.function_name not in seen_funcs:
seen_funcs.add(prg.function_name)
combined_src_parts.append(prg.src)
obj = _emu_compiler.compile_to_obj("\n".join(combined_src_parts))
sym_offsets = _elf_symbol_offsets(obj)
cpu_prg = _ProgramClass(Device['CPU'], "emu2_batch", jit_loader(obj))
base_addr = ctypes.cast(cpu_prg.fxn, ctypes.c_void_p).value
return {pc: (prg.function_name, ctypes.CFUNCTYPE(None)(base_addr + sym_offsets.get(prg.function_name, 0)), prg.globals, cpu_prg)
for pc, prg in inst_info}
return result
# ═══════════════════════════════════════════════════════════════════════════════
# WAVE STATE

View file

@ -6,7 +6,7 @@ from pathlib import Path
# Set AMD=1 before importing tinygrad
os.environ["AMD"] = "1"
from extra.assembly.amd.emu2 import run_asm as python_run_asm, decode_program, _get_inst_sink, _get_inst_prg
from extra.assembly.amd.emu2 import run_asm as python_run_asm, decode_program
from extra.assembly.amd.decode import decode_inst
from extra.assembly.amd.autogen.rdna3.ins import SOPP, SOPPOp
@ -67,128 +67,55 @@ def benchmark_emulator(name: str, run_fn, kernel: bytes, global_size, local_size
return sum(times) / len(times)
def profile_instructions(kernel: bytes):
"""Profile individual instructions and return sorted by render time."""
from extra.assembly.amd.emu2 import _get_inst_prg, _get_inst_sink, _canonical_prg_cache
from tinygrad.codegen import get_program
from extra.assembly.amd.emu2 import _emu_renderer
"""Profile individual instruction compile times."""
from extra.assembly.amd.emu2 import _get_inst_sink, _get_runner, _canonical_runner_cache
from tinygrad.helpers import Context
# Clear caches to measure fresh
_get_inst_sink.cache_clear()
_get_inst_prg.cache_clear()
_canonical_prg_cache.clear()
decode_program.cache_clear()
_get_runner.cache_clear()
_canonical_runner_cache.clear()
# Collect instruction bytes and names
inst_data = []
results = []
i = 0
while i < len(kernel):
inst = decode_inst(kernel[i:])
if isinstance(inst, SOPP) and inst.op == SOPPOp.S_CODE_END: break
inst_bytes = bytes(kernel[i:i + inst.size() + 4])
try:
inst_str = repr(inst)
except Exception:
inst_str = f"<{type(inst).__name__}>"
inst_data.append((inst_bytes, inst_str, type(inst).__name__))
i += inst.size()
try: inst_str = repr(inst)
except Exception: inst_str = f"<{type(inst).__name__}>"
# Profile each instruction
from extra.assembly.amd.emu2 import _match_canonical
results = []
for inst_bytes, inst_str, inst_type in inst_data:
# Check canonical cache BEFORE building sink (matches real behavior)
inst_size = decode_inst(inst_bytes).size()
inst_int = int.from_bytes(inst_bytes[:inst_size], 'little')
is_cache_hit = _match_canonical(inst_int, inst_size) is not None
if is_cache_hit:
# Skip build and render entirely for cache hits
build_time, render_time, uop_count = 0, 0, 0
else:
# Build sink
build_start = time.perf_counter()
sink, (base, mask, size) = _get_inst_sink(inst_bytes)
build_time = time.perf_counter() - build_start
# Count UOps in sink
uop_count = len(sink.toposort())
# Render
render_start = time.perf_counter()
with Context(NOOPT=1, IGNORE_OOB=1, TUPLE_ORDER=0):
prg = get_program(sink, _emu_renderer)
render_time = time.perf_counter() - render_start
# Update canonical cache
_canonical_prg_cache.append((base, mask, size, prg))
# Time the full compile (sink + render + compile)
start = time.perf_counter()
with Context(CCACHE=0):
runner, is_new = _get_runner(inst_bytes)
compile_time = time.perf_counter() - start
results.append({
'inst_str': inst_str + (' [HIT]' if is_cache_hit else ''),
'inst_type': inst_type,
'uop_count': uop_count,
'build_ms': build_time * 1000,
'render_ms': render_time * 1000,
'inst_str': inst_str + ('' if is_new else ' [CACHED]'),
'compile_ms': compile_time * 1000 if is_new else 0,
})
# Sort by render time descending
return sorted(results, key=lambda x: x['render_ms'], reverse=True)
def benchmark_python_split(kernel: bytes, global_size, local_size, args_ptr, rsrc2: int, iterations: int = 5):
"""Benchmark Python emulator with build/render/compile/execution times separated."""
from extra.assembly.amd.emu2 import _emu_renderer, _emu_compiler, _elf_symbol_offsets
from extra.assembly.amd.emu2 import _get_inst_prg, _get_inst_sink, _canonical_prg_cache
from tinygrad.codegen import get_program
from tinygrad.helpers import Context
from tinygrad.runtime.support.elf import jit_loader
# Clear caches to measure fresh
_get_inst_sink.cache_clear()
_get_inst_prg.cache_clear()
_canonical_prg_cache.clear()
decode_program.cache_clear()
# Collect instruction bytes
inst_bytes_list = []
i = 0
while i < len(kernel):
inst = decode_inst(kernel[i:])
if isinstance(inst, SOPP) and inst.op == SOPPOp.S_CODE_END: break
inst_bytes_list.append(bytes(kernel[i:i + inst.size() + 4]))
i += inst.size()
# Measure build time (UOp sink generation, cached)
build_start = time.perf_counter()
for inst_bytes in inst_bytes_list:
_get_inst_sink(inst_bytes)
build_time = time.perf_counter() - build_start
return sorted(results, key=lambda x: x['compile_ms'], reverse=True)
# Measure render time (uses cached sinks, handles canonical dedup)
render_start = time.perf_counter()
cache_before = len(_canonical_prg_cache)
prgs = [_get_inst_prg(inst_bytes) for inst_bytes in inst_bytes_list]
render_count = len(_canonical_prg_cache) - cache_before # number of unique renders
render_time = time.perf_counter() - render_start
def benchmark_python_split(kernel: bytes, global_size, local_size, args_ptr, rsrc2: int, iterations: int = 5):
"""Benchmark Python emulator with compile and execution times."""
from extra.assembly.amd.emu2 import _get_inst_sink, _get_runner, _canonical_runner_cache
from tinygrad.helpers import Context
_get_inst_sink.cache_clear()
_get_runner.cache_clear()
_canonical_runner_cache.clear()
decode_program.cache_clear()
# Measure compile time (clang/llvm compile C to native)
# Measure compile time (decode_program builds sinks, renders, and compiles)
compile_start = time.perf_counter()
# Deduplicate by function name (same as decode_program does)
seen = set()
unique_srcs = []
for prg in prgs:
if prg.function_name not in seen:
seen.add(prg.function_name)
unique_srcs.append(prg.src)
combined_src = "\n".join(unique_srcs)
obj = _emu_compiler.compile_to_obj(combined_src)
_elf_symbol_offsets(obj)
jit_loader(obj)
with Context(CCACHE=0):
program = decode_program(kernel)
compile_time = time.perf_counter() - compile_start
n_compiled = len(_canonical_runner_cache)
# Execution time (need to populate cache first)
decode_program(kernel)
# Execution time
exec_time = benchmark_emulator("Python", python_run_asm, kernel, global_size, local_size, args_ptr, rsrc2, iterations)
return build_time, render_time, render_count, compile_time, exec_time
return compile_time, exec_time, len(program), n_compiled
def get_tinygrad_kernel(op_name: str) -> tuple[bytes, tuple, tuple, list[int], dict[int, bytes], int] | None:
"""Get a real tinygrad kernel by operation name. Returns (code, global_size, local_size, buf_sizes, buf_data, rsrc2)."""
@ -253,7 +180,6 @@ def main():
parser.add_argument("--iterations", type=int, default=3, help="Number of iterations per benchmark")
parser.add_argument("--profile", type=str, default=None, help="Profile instructions for a specific kernel (e.g. 'sin')")
parser.add_argument("--top", type=int, default=20, help="Number of top instructions to show in profile")
parser.add_argument("--sort-build", action="store_true", help="Sort profile by build time instead of render time")
args = parser.parse_args()
# Profile mode: show individual instruction timing
@ -264,19 +190,16 @@ def main():
return
kernel = kernel_info[0]
print(f"Profiling instructions for '{args.profile}' kernel...")
print("=" * 140)
print("=" * 110)
results = profile_instructions(kernel)
if args.sort_build:
results = sorted(results, key=lambda x: x['build_ms'], reverse=True)
print(f"{'Instruction':<90} {'UOps':>6} {'Build(ms)':>10} {'Render(ms)':>10}")
print("-" * 140)
print(f"{'Instruction':<90} {'Compile(ms)':>12}")
print("-" * 110)
for r in results[:args.top]:
inst = r['inst_str'][:87] + "..." if len(r['inst_str']) > 90 else r['inst_str']
print(f"{inst:<90} {r['uop_count']:>6} {r['build_ms']:>10.3f} {r['render_ms']:>10.3f}")
print("-" * 140)
total_build = sum(r['build_ms'] for r in results)
total_render = sum(r['render_ms'] for r in results)
print(f"{'TOTAL':<90} {'':>6} {total_build:>10.3f} {total_render:>10.3f}")
print(f"{inst:<90} {r['compile_ms']:>12.3f}")
print("-" * 110)
total = sum(r['compile_ms'] for r in results)
print(f"{'TOTAL':<90} {total:>12.3f}")
return
rust_remu = get_rust_remu()
@ -304,39 +227,34 @@ def main():
buffers, args_arr, args_ptr, ranges = setup_buffers(buf_sizes, buf_data)
# Benchmark Python emulator (must be first to measure compile time before cache is populated)
py_build, py_render, render_count, py_compile, py_exec = benchmark_python_split(kernel, global_size, local_size, args_ptr, rsrc2, args.iterations)
py_compile, py_exec, n_insts, n_compiled = benchmark_python_split(kernel, global_size, local_size, args_ptr, rsrc2, args.iterations)
n_insts = count_instructions(kernel) # uses cached decode_program
n_workgroups = global_size[0] * global_size[1] * global_size[2]
n_threads = local_size[0] * local_size[1] * local_size[2]
total_work = n_insts * n_workgroups * n_threads
print(f"{n_insts} insts × {n_workgroups} WGs × {n_threads} threads = {total_work:,} ops")
print(f"{n_insts} insts ({n_compiled} unique) × {n_workgroups} WGs × {n_threads} threads = {total_work:,} ops")
rust_time = benchmark_emulator("Rust", rust_remu.run_asm, kernel, global_size, local_size, args_ptr, rsrc2, args.iterations) if rust_remu else None
if py_build is not None:
if py_compile is not None:
py_exec_rate = total_work / py_exec / 1e6
print(f" Build: {py_build*1000:8.3f} ms")
print(f" Render: {py_render*1000:8.3f} ms ({render_count} unique)")
print(f" Compile: {py_compile*1000:8.3f} ms")
print(f" Compile: {py_compile*1000:8.3f} ms ({n_compiled} unique)")
print(f" Exec: {py_exec*1000:8.3f} ms ({py_exec_rate:7.2f} M ops/s)")
if rust_time:
rust_rate = total_work / rust_time / 1e6
speedup = py_exec / rust_time if py_exec else 0
print(f" Rust: {rust_time*1000:8.3f} ms ({rust_rate:7.2f} M ops/s) [{speedup:.1f}x faster]")
results.append((op_name, n_insts, n_workgroups, py_build, py_render, render_count, py_compile, py_exec, rust_time))
results.append((op_name, n_insts, n_compiled, n_workgroups, py_compile, py_exec, rust_time))
# Summary table
print("\n" + "=" * 140)
print("\n" + "=" * 110)
print("SUMMARY")
print("=" * 140)
print(f"{'Name':<16} {'Insts':<6} {'WGs':<5} {'Build (ms)':<12} {'Render (ms)':<16} {'Compile (ms)':<14} {'Exec (ms)':<12} {'Rust (ms)':<12} {'Speedup':<10}")
print("-" * 140)
print("=" * 110)
print(f"{'Name':<16} {'Insts':<6} {'Unique':<6} {'WGs':<5} {'Compile (ms)':<14} {'Exec (ms)':<12} {'Rust (ms)':<12} {'Speedup':<10}")
print("-" * 110)
for name, n_insts, n_wgs, py_build, py_render, render_count, py_compile, py_exec, rust_time in results:
build_ms = f"{py_build*1000:.3f}" if py_build else "error"
render_ms = f"{py_render*1000:.3f} ({render_count})" if py_render else "error"
for name, n_insts, n_compiled, n_wgs, py_compile, py_exec, rust_time in results:
compile_ms = f"{py_compile*1000:.3f}" if py_compile else "error"
exec_ms = f"{py_exec*1000:.3f}" if py_exec else "error"
if rust_time:
@ -344,7 +262,7 @@ def main():
speedup = f"{py_exec/rust_time:.1f}x" if py_exec else "N/A"
else:
rust_ms, speedup = "N/A", "N/A"
print(f"{name:<16} {n_insts:<6} {n_wgs:<5} {build_ms:<12} {render_ms:<16} {compile_ms:<14} {exec_ms:<12} {rust_ms:<12} {speedup:<10}")
print(f"{name:<16} {n_insts:<6} {n_compiled:<6} {n_wgs:<5} {compile_ms:<14} {exec_ms:<12} {rust_ms:<12} {speedup:<10}")
if __name__ == "__main__":
main()

View file

@ -2811,5 +2811,112 @@ class TestMin3Max3Unsigned(unittest.TestCase):
self.assertEqual(st.vgpr[0][1] & 0xFFFF, 0)
class TestVOP3Clamp(unittest.TestCase):
"""Tests for VOP3 clamp modifier (clmp=1).
The clamp modifier restricts float outputs to [0.0, 1.0] range.
This is used by operations like clip(0, 1) which AMD LLVM compiles to
v_max_f32_e64 with clmp=1.
Regression test for: clip(0, 1) bug where emulator ignored clmp field.
"""
def test_v_max_f32_e64_clamp_positive(self):
"""V_MAX_F32_E64 with clamp: value > 1.0 should be clamped to 1.0."""
instructions = [
v_mov_b32_e32(v[0], 2.5),
VOP3(VOP3Op.V_MAX_F32_E64, vdst=v[1], src0=v[0], src1=v[0], clmp=1),
]
st = run_program(instructions, n_lanes=1)
self.assertAlmostEqual(i2f(st.vgpr[0][1]), 1.0, places=5)
def test_v_max_f32_e64_clamp_negative(self):
"""V_MAX_F32_E64 with clamp: value < 0.0 should be clamped to 0.0."""
instructions = [
v_mov_b32_e32(v[0], -1.5),
VOP3(VOP3Op.V_MAX_F32_E64, vdst=v[1], src0=v[0], src1=v[0], clmp=1),
]
st = run_program(instructions, n_lanes=1)
self.assertAlmostEqual(i2f(st.vgpr[0][1]), 0.0, places=5)
def test_v_max_f32_e64_clamp_in_range(self):
"""V_MAX_F32_E64 with clamp: value in [0,1] should pass through."""
instructions = [
v_mov_b32_e32(v[0], 0.5),
VOP3(VOP3Op.V_MAX_F32_E64, vdst=v[1], src0=v[0], src1=v[0], clmp=1),
]
st = run_program(instructions, n_lanes=1)
self.assertAlmostEqual(i2f(st.vgpr[0][1]), 0.5, places=5)
def test_v_max_f32_e64_no_clamp(self):
"""V_MAX_F32_E64 without clamp: value > 1.0 should pass through."""
instructions = [
v_mov_b32_e32(v[0], 2.5),
VOP3(VOP3Op.V_MAX_F32_E64, vdst=v[1], src0=v[0], src1=v[0], clmp=0),
]
st = run_program(instructions, n_lanes=1)
self.assertAlmostEqual(i2f(st.vgpr[0][1]), 2.5, places=5)
def test_v_min_f32_e64_clamp_negative(self):
"""V_MIN_F32_E64 with clamp: value < 0.0 should be clamped to 0.0."""
instructions = [
v_mov_b32_e32(v[0], -2.0),
VOP3(VOP3Op.V_MIN_F32_E64, vdst=v[1], src0=v[0], src1=v[0], clmp=1),
]
st = run_program(instructions, n_lanes=1)
self.assertAlmostEqual(i2f(st.vgpr[0][1]), 0.0, places=5)
def test_v_add_f32_e64_clamp(self):
"""V_ADD_F32_E64 with clamp: 0.7 + 0.8 = 1.5 -> 1.0."""
instructions = [
v_mov_b32_e32(v[0], 0.7),
v_mov_b32_e32(v[1], 0.8),
VOP3(VOP3Op.V_ADD_F32_E64, vdst=v[2], src0=v[0], src1=v[1], clmp=1),
]
st = run_program(instructions, n_lanes=1)
self.assertAlmostEqual(i2f(st.vgpr[0][2]), 1.0, places=5)
def test_v_mul_f32_e64_clamp_underflow(self):
"""V_MUL_F32_E64 with clamp: 0.5 * -2.0 = -1.0 -> 0.0."""
instructions = [
v_mov_b32_e32(v[0], 0.5),
v_mov_b32_e32(v[1], -2.0),
VOP3(VOP3Op.V_MUL_F32_E64, vdst=v[2], src0=v[0], src1=v[1], clmp=1),
]
st = run_program(instructions, n_lanes=1)
self.assertAlmostEqual(i2f(st.vgpr[0][2]), 0.0, places=5)
def test_v_fma_f32_clamp(self):
"""V_FMA_F32 with clamp: 2*2+1 = 5 -> 1.0."""
instructions = [
v_mov_b32_e32(v[0], 2.0),
v_mov_b32_e32(v[1], 2.0),
v_mov_b32_e32(v[2], 1.0),
VOP3(VOP3Op.V_FMA_F32, vdst=v[3], src0=v[0], src1=v[1], src2=v[2], clmp=1),
]
st = run_program(instructions, n_lanes=1)
self.assertAlmostEqual(i2f(st.vgpr[0][3]), 1.0, places=5)
def test_v_max_f32_e64_clamp_multilane(self):
"""V_MAX_F32_E64 with clamp: test multiple lanes with different values."""
# lane 0: -0.5 -> 0.0
# lane 1: 0.5 -> 0.5
# lane 2: 1.5 -> 1.0
# lane 3: 2.5 -> 1.0
instructions = [
# Setup different values per lane using lane_id
s_mov_b32(s[0], f2i(0.5)),
v_cvt_f32_i32_e32(v[0], v[255]), # Convert lane_id to float
v_mov_b32_e32(v[2], s[0]), # v2 = 0.5
v_sub_f32_e32(v[0], v[0], v[2]), # Subtract 0.5: lane0=-0.5, lane1=0.5, lane2=1.5, lane3=2.5
VOP3(VOP3Op.V_MAX_F32_E64, vdst=v[1], src0=v[0], src1=v[0], clmp=1),
]
st = run_program(instructions, n_lanes=4)
self.assertAlmostEqual(i2f(st.vgpr[0][1]), 0.0, places=5, msg="lane 0: -0.5 should clamp to 0.0")
self.assertAlmostEqual(i2f(st.vgpr[1][1]), 0.5, places=5, msg="lane 1: 0.5 should pass through")
self.assertAlmostEqual(i2f(st.vgpr[2][1]), 1.0, places=5, msg="lane 2: 1.5 should clamp to 1.0")
self.assertAlmostEqual(i2f(st.vgpr[3][1]), 1.0, places=5, msg="lane 3: 2.5 should clamp to 1.0")
if __name__ == '__main__':
unittest.main()

View file

@ -466,6 +466,13 @@ class TestTinygradKernels(unittest.TestCase):
# This tests the integer multiply-high instructions used in range reduction
self._test_kernel(lambda T: T([859240.0, 1000000.0, 100594688.0]).sin())
def test_clip_zero_one(self):
"""Test clip(0, 1) - regression for binary_crossentropy failure."""
import numpy as np
np.random.seed(0)
x_np = np.random.uniform(-2, 2, (32, 10)).astype(np.float32).tolist()
self._test_kernel(lambda T: T(x_np).clip(0, 1))
def test_mod_int64(self):
"""Test int64 modulo, especially edge cases like 1 % -1."""
from tinygrad import dtypes

View file

@ -338,7 +338,7 @@ class TestHCQ(unittest.TestCase):
et = float(sig_en.timestamp - sig_st.timestamp)
print(f"exec kernel time: {et:.2f} us")
assert 0.1 <= et <= (100000 if MOCKGPU or Device.DEFAULT in {"CPU"} else 100)
assert 0.1 <= et <= (3000000 if MOCKGPU or Device.DEFAULT in {"CPU"} else 100)
def test_speed_copy_bandwidth(self):
if TestHCQ.d0.hw_copy_queue_t is None: self.skipTest("device does not support copy queue")

View file

@ -281,7 +281,7 @@ class TestRandomness(unittest.TestCase):
old_default_float = dtypes.default_float
# low precision can result in inf from randn
dtypes.default_float = default_float
t = Tensor.randn(256, 256)
t = Tensor.randn(64, 64)
mx = t.max().numpy().item()
mn = t.min().numpy().item()
print(f"testing with {default_float=}")
@ -324,11 +324,11 @@ class TestRandomness(unittest.TestCase):
lambda x: np.random.uniform(-1, 1, size=x) * math.sqrt(6 / (x[0] + math.prod(x[1:])))))
def test_kaiming_uniform(self):
for shape in [(32, 128, 3, 3), (80, 44), (3, 55, 35)]:
for shape in [(32, 16, 3, 3), (20, 44), (3, 15, 35)]:
self.assertTrue(equal_distribution(Tensor.kaiming_uniform, lambda x: torch.nn.init.kaiming_uniform_(torch.empty(x)), shape=shape))
def test_kaiming_normal(self):
for shape in [(32, 128, 3, 3), (80, 44), (3, 55, 35)]:
for shape in [(32, 16, 3, 3), (20, 44), (3, 15, 35)]:
self.assertTrue(equal_distribution(Tensor.kaiming_normal, lambda x: torch.nn.init.kaiming_normal_(torch.empty(x)), shape=shape))
def test_multinomial(self):
@ -388,7 +388,7 @@ class TestRandomness(unittest.TestCase):
@unittest.skipIf(Device.DEFAULT == "WEBGPU" and not OSX, "WEBGPU Vulkan can only run kernels with up to 10 buffers")
class TestSample(unittest.TestCase):
def test_sample(self):
X = Tensor.rand(10000, 50).realize()
X = Tensor.rand(1000, 50).realize()
BS = 16
idxs = np.random.randint(0, X.shape[0], size=(BS))
# this uncovered a bug with arg sort order