Skip to content

[BUG]:read after write failed #646

@shuoranliu

Description

@shuoranliu

Describe the bug

python3 test_write_read_consistency.py --path /mnt/curvine/lsr-test/fs-test/

#!/usr/bin/env python3
"""
Test script to verify write-read consistency issue in curvine-fuse.
This reproduces the issue seen in YOLO training where results.csv read fails
after being written due to S3 InvalidRange error.
"""

import os
import sys
import time
import tempfile
import argparse

def test_write_then_read(base_path: str, file_name: str = "test_consistency.csv"):
    """
    Test write-then-read consistency.
    Simulates what YOLO does: write to a file, then immediately read it.
    """
    file_path = os.path.join(base_path, file_name)

    print(f"[TEST] Testing write-read consistency on: {file_path}")

    # Clean up if exists
    if os.path.exists(file_path):
        print(f"[CLEANUP] Removing existing file: {file_path}")
        os.remove(file_path)
        time.sleep(0.5)

    # Test 1: Write new file, then read
    print("\n=== Test 1: Create new file and read immediately ===")
    test_data_1 = "epoch,loss,accuracy\n1,0.5,0.8\n"

    print(f"[WRITE] Writing {len(test_data_1)} bytes...")
    with open(file_path, 'w') as f:
        f.write(test_data_1)
    print("[WRITE] Write completed")

    print("[READ] Reading file immediately after write...")
    try:
        with open(file_path, 'r') as f:
            content = f.read()
        print(f"[READ] Read {len(content)} bytes successfully")
        if content == test_data_1:
            print("[PASS] Content matches!")
        else:
            print(f"[FAIL] Content mismatch!")
            print(f"  Expected: {repr(test_data_1)}")
            print(f"  Got: {repr(content)}")
            return False
    except Exception as e:
        print(f"[FAIL] Read failed: {e}")
        return False

    # Test 2: Append to file, then read (simulates results.csv append mode)
    print("\n=== Test 2: Append to file and read immediately ===")
    test_data_2 = "2,0.4,0.85\n"

    print(f"[APPEND] Appending {len(test_data_2)} bytes...")
    with open(file_path, 'a') as f:
        f.write(test_data_2)
    print("[APPEND] Append completed")

    print("[READ] Reading file immediately after append...")
    try:
        with open(file_path, 'r') as f:
            content = f.read()
        expected = test_data_1 + test_data_2
        print(f"[READ] Read {len(content)} bytes successfully")
        if content == expected:
            print("[PASS] Content matches!")
        else:
            print(f"[FAIL] Content mismatch!")
            print(f"  Expected: {repr(expected)}")
            print(f"  Got: {repr(content)}")
            return False
    except Exception as e:
        print(f"[FAIL] Read failed: {e}")
        return False

    # Test 3: Multiple rapid append-read cycles
    print("\n=== Test 3: Multiple rapid append-read cycles ===")
    for i in range(3, 6):
        append_data = f"{i},{0.5-i*0.05:.2f},{0.8+i*0.02:.2f}\n"
        print(f"[CYCLE {i}] Appending: {append_data.strip()}")

        with open(file_path, 'a') as f:
            f.write(append_data)

        try:
            with open(file_path, 'r') as f:
                content = f.read()
            lines = content.strip().split('\n')
            print(f"[CYCLE {i}] Read OK, {len(lines)} lines total")
        except Exception as e:
            print(f"[CYCLE {i}] Read FAILED: {e}")
            return False

    # Test 4: Check file stat after operations
    print("\n=== Test 4: File stat consistency ===")
    try:
        stat = os.stat(file_path)
        print(f"[STAT] File size: {stat.st_size} bytes")

        with open(file_path, 'r') as f:
            content = f.read()
        actual_len = len(content.encode('utf-8'))

        if stat.st_size == actual_len:
            print(f"[PASS] Stat size matches content length: {actual_len}")
        else:
            print(f"[FAIL] Stat size ({stat.st_size}) != content length ({actual_len})")
            return False
    except Exception as e:
        print(f"[FAIL] Stat check failed: {e}")
        return False

    # Cleanup
    print(f"\n[CLEANUP] Removing test file: {file_path}")
    os.remove(file_path)

    return True


def test_concurrent_write_read(base_path: str):
    """
    Test concurrent write and read operations.
    """
    import threading
    import random

    file_path = os.path.join(base_path, "test_concurrent.txt")
    errors = []

    print(f"\n=== Test 5: Concurrent write-read ===")
    print(f"[TEST] File: {file_path}")

    # Initialize file
    with open(file_path, 'w') as f:
        f.write("initial\n")

    def writer(thread_id, iterations):
        for i in range(iterations):
            try:
                with open(file_path, 'a') as f:
                    f.write(f"writer-{thread_id}-{i}\n")
            except Exception as e:
                errors.append(f"Writer {thread_id} failed at {i}: {e}")
            time.sleep(random.uniform(0.01, 0.05))

    def reader(thread_id, iterations):
        for i in range(iterations):
            try:
                with open(file_path, 'r') as f:
                    content = f.read()
                    _ = len(content)
            except Exception as e:
                errors.append(f"Reader {thread_id} failed at {i}: {e}")
            time.sleep(random.uniform(0.01, 0.05))

    threads = []
    for i in range(2):
        t = threading.Thread(target=writer, args=(i, 10))
        threads.append(t)
    for i in range(3):
        t = threading.Thread(target=reader, args=(i, 15))
        threads.append(t)

    print(f"[START] Starting {len(threads)} threads...")
    for t in threads:
        t.start()
    for t in threads:
        t.join()

    if errors:
        print(f"[FAIL] {len(errors)} errors occurred:")
        for err in errors[:5]:
            print(f"  - {err}")
        return False
    else:
        print("[PASS] Concurrent test completed without errors")

    # Cleanup
    os.remove(file_path)
    return True


def test_large_file(base_path: str, size_mb: float = 1.0):
    """
    Test write-read with larger file.
    """
    file_path = os.path.join(base_path, "test_large.bin")
    size_bytes = int(size_mb * 1024 * 1024)

    print(f"\n=== Test 6: Large file ({size_mb} MB) ===")
    print(f"[TEST] File: {file_path}")

    # Generate test data
    test_data = os.urandom(size_bytes)

    print(f"[WRITE] Writing {size_bytes} bytes...")
    start = time.time()
    with open(file_path, 'wb') as f:
        f.write(test_data)
    write_time = time.time() - start
    print(f"[WRITE] Completed in {write_time:.2f}s ({size_mb/write_time:.2f} MB/s)")

    print("[READ] Reading file immediately...")
    start = time.time()
    try:
        with open(file_path, 'rb') as f:
            content = f.read()
        read_time = time.time() - start
        print(f"[READ] Read {len(content)} bytes in {read_time:.2f}s ({size_mb/read_time:.2f} MB/s)")

        if content == test_data:
            print("[PASS] Content matches!")
        else:
            print(f"[FAIL] Content mismatch! Got {len(content)} bytes, expected {len(test_data)}")
            return False
    except Exception as e:
        print(f"[FAIL] Read failed: {e}")
        return False

    # Cleanup
    os.remove(file_path)
    return True


def main():
    parser = argparse.ArgumentParser(description='Test write-read consistency on curvine-fuse')
    parser.add_argument('--path', '-p', default='/mnt/curvine/lsr-test/torch-test',
                        help='Base path to test (default: /mnt/curvine/lsr-test/torch-test)')
    parser.add_argument('--local', '-l', action='store_true',
                        help='Also run on local filesystem for comparison')
    parser.add_argument('--size', '-s', type=float, default=1.0,
                        help='Size in MB for large file test (default: 1.0)')
    args = parser.parse_args()

    results = {}

    # Test on curvine
    print("=" * 60)
    print(f"Testing on CURVINE: {args.path}")
    print("=" * 60)

    if not os.path.exists(args.path):
        print(f"[ERROR] Path does not exist: {args.path}")
        print("Make sure curvine-fuse is mounted at /mnt/curvine")
        sys.exit(1)

    results['curvine_basic'] = test_write_then_read(args.path)
    results['curvine_concurrent'] = test_concurrent_write_read(args.path)
    results['curvine_large'] = test_large_file(args.path, args.size)

    # Test on local filesystem for comparison
    if args.local:
        print("\n" + "=" * 60)
        print("Testing on LOCAL filesystem for comparison")
        print("=" * 60)

        with tempfile.TemporaryDirectory() as tmpdir:
            results['local_basic'] = test_write_then_read(tmpdir)
            results['local_concurrent'] = test_concurrent_write_read(tmpdir)
            results['local_large'] = test_large_file(tmpdir, args.size)

    # Summary
    print("\n" + "=" * 60)
    print("SUMMARY")
    print("=" * 60)
    all_passed = True
    for name, passed in results.items():
        status = "PASS" if passed else "FAIL"
        print(f"  {name}: {status}")
        if not passed:
            all_passed = False

    print("=" * 60)
    if all_passed:
        print("All tests PASSED!")
        sys.exit(0)
    else:
        print("Some tests FAILED!")
        sys.exit(1)


if __name__ == '__main__':
    main()
root@curvine-train-test-pod:/mnt/curvine/lsr-test# python3 test_write_read_consistency.py --path /mnt/curvine/lsr-test/fs-test/
============================================================
Testing on CURVINE: /mnt/curvine/lsr-test/fs-test/
============================================================
[TEST] Testing write-read consistency on: /mnt/curvine/lsr-test/fs-test/test_consistency.csv

=== Test 1: Create new file and read immediately ===
[WRITE] Writing 30 bytes...
[WRITE] Write completed
[READ] Reading file immediately after write...
[READ] Read 30 bytes successfully
[PASS] Content matches!

=== Test 2: Append to file and read immediately ===
[APPEND] Appending 11 bytes...
[APPEND] Append completed
[READ] Reading file immediately after append...
[READ] Read 41 bytes successfully
[PASS] Content matches!

=== Test 3: Multiple rapid append-read cycles ===
[CYCLE 3] Appending: 3,0.35,0.86
[CYCLE 3] Read OK, 4 lines total
[CYCLE 4] Appending: 4,0.30,0.88
[CYCLE 4] Read OK, 5 lines total
[CYCLE 5] Appending: 5,0.25,0.90
[CYCLE 5] Read OK, 6 lines total

=== Test 4: File stat consistency ===
[STAT] File size: 77 bytes
[PASS] Stat size matches content length: 77

[CLEANUP] Removing test file: /mnt/curvine/lsr-test/fs-test/test_consistency.csv

=== Test 5: Concurrent write-read ===
[TEST] File: /mnt/curvine/lsr-test/fs-test/test_concurrent.txt
[START] Starting 5 threads...
[FAIL] 6 errors occurred:
  - Writer 1 failed at 6: [Errno 5] Input/output error
  - Writer 0 failed at 6: [Errno 5] Input/output error
  - Writer 0 failed at 8: [Errno 5] Input/output error
  - Writer 1 failed at 8: [Errno 5] Input/output error
  - Reader 0 failed at 6: [Errno 5] Input/output error

=== Test 6: Large file (1.0 MB) ===
[TEST] File: /mnt/curvine/lsr-test/fs-test/test_large.bin
[WRITE] Writing 1048576 bytes...
[WRITE] Completed in 0.18s (5.57 MB/s)
[READ] Reading file immediately...
[READ] Read 1048576 bytes in 0.00s (370.03 MB/s)
[PASS] Content matches!

============================================================
SUMMARY
============================================================
  curvine_basic: PASS
  curvine_concurrent: FAIL
  curvine_large: PASS
============================================================
Some tests FAILED!

To Reproduce
Steps to reproduce the behavior:

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

OS Version (please complete the following information):

Additional context
Add any other context about the problem here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions