wasm-micro-runtime/tests/regression/ba-issues/run.py
Liu Jia 8949797c84
Improve run.py of regression (#4417)
* Improve run.py of regression
1. Fix script interruption on case failure
2. improve statistics logic
3. enable select specific issue ids
2025-07-01 10:44:53 +08:00

313 lines
9.7 KiB
Python
Executable File

#!/usr/bin/env python3
#
# Copyright (C) 2019 Intel Corporation. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
import json
import os
import subprocess
import glob
import re
import argparse
from typing import Dict, Optional, List
WORK_DIR = os.getcwd()
TEST_WASM_COMMAND = (
"./build/build-{runtime}/iwasm {running_options} {running_mode} {file} {argument}"
)
COMPILE_AOT_COMMAND = "./build/build-wamrc/{compiler} {options} -o {out_file} {in_file}"
TEST_AOT_COMMAND = "./build/build-{runtime}/iwasm {running_options} {file} {argument}"
LOG_FILE = "issues_tests.log"
LOG_ENTRY = """
=======================================================
Failing issue id: {}.
run with command_lists: {}
{}
{}
=======================================================
"""
# Function to read and parse JSON file
def read_json_file(file_path):
with open(file_path, "r") as file:
return json.load(file)
return None
def dump_error_log(failing_issue_id, command_lists, exit_code_cmp, stdout_cmp):
with open(LOG_FILE, "a") as file:
file.write(
LOG_ENTRY.format(failing_issue_id, command_lists, exit_code_cmp, stdout_cmp)
)
def get_issue_ids_should_test(selected_ids: Optional[List[int]] = None):
"""Find all issue IDs that should be tested in folder issues."""
# If specific issue IDs are provided, return them as a set
if selected_ids:
return set(selected_ids)
# Define the path pattern
path_pattern = "issues/issue-*"
# Regular expression to extract the number
pattern = r"issue-(\d+)"
# Initialize a set to store the issue numbers
issue_numbers = set()
# Use glob to find directories matching the pattern
for dir_path in glob.glob(path_pattern):
# Extract the issue number using regular expression
match = re.search(pattern, dir_path)
if match:
issue_number = int(match.group(1))
issue_numbers.add(issue_number)
# Print the set of issue numbers
return issue_numbers
def get_and_check(d, key, default=None, nullable=False):
element = d.get(key, default)
if not nullable and element is None:
raise Exception(f"Missing {key} in {d}")
return element
def run_and_compare_results(
issue_id, cmd, description, ret_code, stdout_content
) -> bool:
print(f"####################################")
print(f"test BA issue #{issue_id} `{description}`...")
command_list = cmd.split()
result = subprocess.run(
command_list,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
errors="ignore",
)
actual_exit_code = result.returncode
actual_output = result.stdout.rstrip("\n")
exit_code_cmp = f"exit code (actual, expected) : {actual_exit_code, ret_code}"
stdout_cmp = f"stdout (actual, expected) : {actual_output, stdout_content}"
if actual_exit_code == ret_code and (
actual_output == stdout_content
or (
stdout_content == "Compile success"
and actual_output.find(stdout_content) != -1
)
or (len(stdout_content) > 30 and actual_output.find(stdout_content) != -1)
):
print("== PASS ==")
return True
else:
print(cmd)
print(exit_code_cmp)
print(stdout_cmp)
print(f"== FAILED: {issue_id} ==")
dump_error_log(
issue_id,
command_list,
exit_code_cmp,
stdout_cmp,
)
return False
def run_issue_test_wamrc(issue_id, compile_options):
compiler = get_and_check(compile_options, "compiler")
in_file = get_and_check(compile_options, "in file")
out_file = get_and_check(compile_options, "out file")
options = get_and_check(compile_options, "options")
expected_return = get_and_check(compile_options, "expected return")
ret_code = get_and_check(expected_return, "ret code")
stdout_content = get_and_check(expected_return, "stdout content")
description = get_and_check(expected_return, "description")
issue_path = os.path.join(WORK_DIR, f"issues/issue-{issue_id}/")
# file maybe *.wasm or *.aot, needs to the match the exact file name
actual_file = glob.glob(issue_path + in_file)
assert len(actual_file) == 1
# the absolute file path
in_file_path = os.path.join(issue_path, actual_file[0])
out_file_path = os.path.join(issue_path, out_file)
cmd = COMPILE_AOT_COMMAND.format(
compiler=compiler, options=options, out_file=out_file_path, in_file=in_file_path
)
return run_and_compare_results(issue_id, cmd, description, ret_code, stdout_content)
def run_issue_test_iwasm(issue_id, test_case) -> bool:
runtime = get_and_check(test_case, "runtime")
mode = get_and_check(test_case, "mode")
file = get_and_check(test_case, "file")
options = get_and_check(test_case, "options")
argument = get_and_check(test_case, "argument")
expected_return = get_and_check(test_case, "expected return")
ret_code = get_and_check(expected_return, "ret code")
stdout_content = get_and_check(expected_return, "stdout content")
description = get_and_check(expected_return, "description")
issue_path = os.path.join(WORK_DIR, f"issues/issue-{issue_id}/")
# file maybe *.wasm or *.aot, needs to the match the exact file name
actual_file = glob.glob(issue_path + file)
assert len(actual_file) == 1
# the absolute file path
file_path = os.path.join(issue_path, actual_file[0])
if mode == "aot":
cmd = TEST_AOT_COMMAND.format(
runtime=runtime,
file=file_path,
running_options=options,
argument=argument,
)
else:
if mode == "classic-interp":
running_mode = "--interp"
elif mode == "fast-interp":
running_mode = ""
else:
running_mode = f"--{mode}"
cmd = TEST_WASM_COMMAND.format(
runtime=runtime,
running_mode=running_mode,
file=file_path,
running_options=options,
argument=argument,
)
return run_and_compare_results(issue_id, cmd, description, ret_code, stdout_content)
def process_and_run_test_cases(
data: Dict[str, Dict], selected_ids: Optional[List[int]] = None
):
issue_ids_should_test = get_issue_ids_should_test(selected_ids)
passed_ids = set()
failed_ids = set()
json_only_ids = set()
# Iterate through each test case in the json data
for test_case in data.get("test cases", []):
is_deprecated = get_and_check(test_case, "deprecated")
issue_ids = get_and_check(test_case, "ids", default=[])
if is_deprecated:
print(f"test case {issue_ids} are deprecated, continue running nest one(s)")
continue
compile_options = get_and_check(test_case, "compile_options", nullable=True)
for issue_id in issue_ids:
if issue_id not in issue_ids_should_test:
json_only_ids.add(issue_id)
continue
# cross out the this issue_id in the should test set
issue_ids_should_test.remove(issue_id)
only_compile = False
# if this issue needs to test wamrc to compile the test case first
if compile_options:
only_compile = compile_options["only compile"]
compile_res = run_issue_test_wamrc(issue_id, compile_options)
if only_compile:
if compile_res:
passed_ids.add(issue_id)
else:
failed_ids.add(issue_id)
continue
else:
# if compile success, then continue to test iwasm
if not compile_res:
failed_ids.add(issue_id)
continue
# if this issue requires to test iwasm to run the test case
if not only_compile:
if run_issue_test_iwasm(issue_id, test_case):
passed_ids.add(issue_id)
else:
failed_ids.add(issue_id)
total = len(passed_ids) + len(failed_ids)
passed = len(passed_ids)
failed = len(failed_ids)
format_issue_ids_should_test = (
" ".join(f"#{x}" for x in issue_ids_should_test)
if issue_ids_should_test
else "no more"
)
format_json_only_ids = (
" ".join(f"#{x}" for x in json_only_ids) if json_only_ids else "no more"
)
print(f"####################################")
print(f"==== Test results ====")
print(f" Total: {total}")
print(f" Passed: {passed}")
print(f" Failed: {failed}")
if not selected_ids:
print(f" Left issues in folder: {format_issue_ids_should_test}")
print(f" Cases in JSON but not found in folder: {format_json_only_ids}")
else:
print(f" Issues not found in folder: {format_issue_ids_should_test}")
def main():
parser = argparse.ArgumentParser(description="Run BA issue tests.")
parser.add_argument(
"-i",
"--issues",
type=str,
help="Comma separated list of issue ids to run, e.g. 1,2,3. Default: all.",
)
args = parser.parse_args()
selected_ids = None
if args.issues:
selected_ids = [int(x) for x in args.issues.split(",") if x.strip().isdigit()]
# Path to the JSON file
file_path = "running_config.json"
# Read and parse the JSON file
data = read_json_file(file_path)
# Check if data is successfully read
if data is None:
assert 0, "No data to process."
# Remove the log file from last run if it exists
if os.path.exists(LOG_FILE):
os.remove(LOG_FILE)
# Process the data
process_and_run_test_cases(data, selected_ids)
if __name__ == "__main__":
main()