Преглед изворни кода

Benchmark both NUMA nodes concurrently via async uri

Fire all node1 prompts into Ollama's queue (async, poll:0), then
immediately fire all node0 prompts. Both nodes drain simultaneously,
roughly halving total benchmark runtime.

uri timeout = large_timeout × 15 (18 000 s) covers the full queue-drain
wait for the worst case (21 queued requests, 2 parallel, 1 200 s each:
max wait ≈ 12 000 s < 18 000 s). async timeout = ×18, collect retries
730 × 30 s = 21 900 s of headroom.

loop_var: _async_job on the async_status collect tasks prevents Ansible
from overwriting the uri module's item=[model, prompt] field, so the
compute task needs no changes.

Also suppress the loop-variable collision warning on load tasks by
making the loop_var explicit.
Shaun Arman пре 4 дана
родитељ
комит
cb0f438214
1 измењених фајлова са 58 додато и 6 уклоњено
  1. 58 6
      playbooks/_bench_tier_batch.yml

+ 58 - 6
playbooks/_bench_tier_batch.yml

@@ -9,6 +9,15 @@
 # Mutates host facts (accumulated across batches):
 #   bench_all_results    — list of uri result dicts
 #   all_eligible_models  — list of model names that passed load
+#
+# Concurrency design:
+#   Loads are sequential (disk I/O: node1 then node0).
+#   Benchmarks are concurrent: all node1 prompts are fired into Ollama's
+#   queue (async, poll:0), then all node0 prompts are fired immediately
+#   after. Both nodes drain their queues simultaneously.
+#   OLLAMA_SCHED_SPREAD=false (default) ensures each node processes its
+#   models one at a time in queue order, so scores are not cross-model
+#   contaminated. uri timeout accounts for full queue-drain wait time.
 
 # ── Load models into RAM ──────────────────────────────────────────────────────
 
@@ -27,6 +36,7 @@
     status_code: 200
   loop: "{{ _batch_node1 }}"
   loop_control:
+    loop_var: item
     label: "node1 load: {{ item }}"
   register: _load_node1
   failed_when: false
@@ -46,6 +56,7 @@
     status_code: 200
   loop: "{{ _batch_node0 }}"
   loop_control:
+    loop_var: item
     label: "node0 load: {{ item }}"
   register: _load_node0
   failed_when: false
@@ -57,9 +68,14 @@
     _eligible_node1: "{{ _load_node1.results | selectattr('status', 'equalto', 200) | map(attribute='item') | list }}"
     _eligible_node0: "{{ _load_node0.results | selectattr('status', 'equalto', 200) | map(attribute='item') | list }}"
 
-# ── Benchmark loaded models ───────────────────────────────────────────────────
+# ── Fire benchmark prompts at both nodes concurrently ─────────────────────────
+# uri timeout = large_timeout × 15 covers the full queue-drain wait:
+#   worst case 3 models × 7 prompts = 21 requests, 2 parallel →
+#   last request waits ≤ 10 × large_timeout = 12 000 s < 18 000 s (15×).
+# async timeout must exceed uri timeout; use 18× for headroom.
+# Collect retries × delay must exceed async timeout: 730 × 30 = 21 900 s.
 
-- name: "Benchmark | Run test prompts against node1 models"
+- name: "Benchmark | Fire test prompts at node1 (async)"
   ansible.builtin.uri:
     url: "http://localhost:11434/api/generate"
     method: POST
@@ -70,15 +86,17 @@
       stream: false
     headers:
       Authorization: "Bearer {{ ollama_api_key }}"
-    timeout: "{{ _benchmark_timeout_map[item.0] | default(benchmark_large_timeout) }}"
+    timeout: "{{ (benchmark_large_timeout | int) * 15 }}"
     status_code: 200
   loop: "{{ _eligible_node1 | product(test_prompts.keys() | list) | list }}"
   loop_control:
     label: "{{ item.0 }} / {{ item.1 }}"
-  register: _bench_node1
+  async: "{{ (benchmark_large_timeout | int) * 18 }}"
+  poll: 0
+  register: _bench_node1_jobs
   failed_when: false
 
-- name: "Benchmark | Run test prompts against node0 models"
+- name: "Benchmark | Fire test prompts at node0 (async)"
   ansible.builtin.uri:
     url: "http://localhost:{{ ollama_node0_port }}/api/generate"
     method: POST
@@ -89,12 +107,46 @@
       stream: false
     headers:
       Authorization: "Bearer {{ ollama_api_key }}"
-    timeout: "{{ _benchmark_timeout_map[item.0] | default(benchmark_large_timeout) }}"
+    timeout: "{{ (benchmark_large_timeout | int) * 15 }}"
     status_code: 200
   loop: "{{ _eligible_node0 | product(test_prompts.keys() | list) | list }}"
   loop_control:
     label: "{{ item.0 }} / {{ item.1 }}"
+  async: "{{ (benchmark_large_timeout | int) * 18 }}"
+  poll: 0
+  register: _bench_node0_jobs
+  failed_when: false
+
+# ── Collect results (both queues are draining in parallel during these tasks) ──
+# loop_var: _async_job prevents Ansible from overwriting the `item` field in
+# each collected result. The uri module stores item=[model, prompt] in the
+# async job file; async_status surfaces it directly so the compute task can
+# use result.item[0] / result.item[1] without any structural changes.
+
+- name: "Benchmark | Collect node1 results"
+  ansible.builtin.async_status:
+    jid: "{{ _async_job.ansible_job_id }}"
+  loop: "{{ _bench_node1_jobs.results | default([]) }}"
+  loop_control:
+    loop_var: _async_job
+    label: "{{ _async_job.item[0] | default('?') }} / {{ _async_job.item[1] | default('?') }}"
+  register: _bench_node1
+  until: _bench_node1.finished
+  retries: 730
+  delay: 30
+  failed_when: false
+
+- name: "Benchmark | Collect node0 results"
+  ansible.builtin.async_status:
+    jid: "{{ _async_job.ansible_job_id }}"
+  loop: "{{ _bench_node0_jobs.results | default([]) }}"
+  loop_control:
+    loop_var: _async_job
+    label: "{{ _async_job.item[0] | default('?') }} / {{ _async_job.item[1] | default('?') }}"
   register: _bench_node0
+  until: _bench_node0.finished
+  retries: 730
+  delay: 30
   failed_when: false
 
 # ── Accumulate results into play-scoped facts ─────────────────────────────────