微信扫码
添加专属顾问
我要投稿
多Agent系统中数据读写不同步?揭秘Milvus四档一致性配置如何解决这一痛点。核心内容: 1. 单Agent与多Agent系统读写设计的本质差异 2. Milvus写入流程中数据可见性的时间窗口问题 3. 通过guarantee_timestamp参数实现读写同步的解决方案
实验设计思路
#!/usr/bin/env python3import argparseimport itertoolsimport randomimport threadingimport timeimport uuidfrom contextlib import suppressfrom pymilvus import DataType, MilvusClientdef make_vector(seed, dim):rng = random.Random(seed)vec = [rng.uniform(-1.0, 1.0) for _ in range(dim)]norm = sum(x * x for x in vec) ** 0.5 or 1.0return [x / norm for x in vec]def make_records(start_id, count, dim, marker, round_no):return [{"id": start_id + i,"vector": make_vector(start_id + i, dim),"marker": marker,"round": round_no,}for i in range(count)]def create_collection(client, name, dim):if client.has_collection(name):client.drop_collection(name)schema = client.create_schema(auto_id=False, enable_dynamic_field=False)schema.add_field("id", DataType.INT64, is_primary=True)schema.add_field("vector", DataType.FLOAT_VECTOR, dim=dim)schema.add_field("marker", DataType.VARCHAR, max_length=128)schema.add_field("round", DataType.INT64)index_params = client.prepare_index_params()index_params.add_index(field_name="vector",index_type="AUTOINDEX",metric_type="COSINE",)client.create_collection(collection_name=name,schema=schema,index_params=index_params,consistency_level="Bounded",)client.load_collection(name)def search_marker(client, name, vector, marker, consistency, timeout):result = client.search(collection_name=name,data=[vector],anns_field="vector",search_params={"metric_type": "COSINE"},filter=f'marker == "{marker}"',limit=1,output_fields=["id", "marker", "round"],consistency_level=consistency,timeout=timeout,)hits = result[0] if result else []return len(hits), hitsdef writer_storm(uri, name, dim, stop_event, id_counter, batch_size, sleep_seconds):client = MilvusClient(uri=uri)while not stop_event.is_set():start_id = next(id_counter)records = make_records(start_id, batch_size, dim, "storm", -1)with suppress(Exception):client.insert(collection_name=name, data=records)if sleep_seconds > 0:time.sleep(sleep_seconds)def main():parser = argparse.ArgumentParser()parser.add_argument("--uri", default="http://localhost:19530")parser.add_argument("--collection", default="")parser.add_argument("--dim", type=int, default=16)parser.add_argument("--attempts", type=int, default=200)parser.add_argument("--bounded-timeout", type=float, default=2.0)parser.add_argument("--strong-timeout", type=float, default=30.0)parser.add_argument("--storm-writers", type=int, default=2)parser.add_argument("--storm-batch-size", type=int, default=2000)parser.add_argument("--storm-sleep", type=float, default=0.0)parser.add_argument("--preload", type=int, default=5000)parser.add_argument("--keep", action="store_true")args = parser.parse_args()collection = args.collection or f"consistency_probe_{int(time.time())}_{uuid.uuid4().hex[:8]}"writer = MilvusClient(uri=args.uri)bounded_reader = MilvusClient(uri=args.uri)strong_reader = MilvusClient(uri=args.uri)stop_event = threading.Event()storm_threads = []storm_id_counter = itertools.count(10_000_000, args.storm_batch_size)print(f"uri={args.uri}")print(f"collection={collection}")try:create_collection(writer, collection, args.dim)if args.preload > 0:print(f"preload {args.preload} rows")writer.insert(collection_name=collection,data=make_records(1_000_000, args.preload, args.dim, "preload", -2),)_, _ = search_marker(strong_reader,collection,make_vector(1_000_000, args.dim),"preload","Strong",args.strong_timeout,)for _ in range(args.storm_writers):thread = threading.Thread(target=writer_storm,args=(args.uri,collection,args.dim,stop_event,storm_id_counter,args.storm_batch_size,args.storm_sleep,),daemon=True,)thread.start()storm_threads.append(thread)for attempt in range(args.attempts):marker = f"probe_{attempt}_{uuid.uuid4().hex[:12]}"record_id = attempt + 1vector = make_vector(record_id, args.dim)record = {"id": record_id,"vector": vector,"marker": marker,"round": attempt,}insert_start = time.perf_counter()writer.insert(collection_name=collection, data=[record])insert_ms = (time.perf_counter() - insert_start) * 1000bounded_start = time.perf_counter()bounded_count, bounded_hits = search_marker(bounded_reader, collection, vector, marker,"Bounded", args.bounded_timeout,)bounded_ms = (time.perf_counter() - bounded_start) * 1000strong_start = time.perf_counter()strong_count, strong_hits = search_marker(strong_reader, collection, vector, marker,"Strong", args.strong_timeout,)strong_ms = (time.perf_counter() - strong_start) * 1000print(f"attempt={attempt:03d} insert={insert_ms:.1f}ms "f"bounded={bounded_count}({bounded_ms:.1f}ms) "f"strong={strong_count}({strong_ms:.1f}ms)")if bounded_count == 0 and strong_count > 0:print("\nREPRODUCED: Bounded missed the just-inserted row, Strong found it.")print(f"marker={marker}")print(f"strong_hit={strong_hits[0] if strong_hits else None}")returnif bounded_hits and not strong_hits:print("Unexpected: Bounded found the row but Strong did not; check service config.")print("\nNot reproduced. QueryNode likely consumed the insert before each Bounded search.")print("Try increasing --storm-writers/--storm-batch-size/--attempts, or run against a cluster under write load.")finally:stop_event.set()for thread in storm_threads:thread.join(timeout=1)if args.keep:print(f"kept collection={collection}")else:with suppress(Exception):writer.drop_collection(collection)print(f"dropped collection={collection}")if __name__ == "__main__":main()
运行命令(替换uri为自身Milvus服务地址):
python probe.py --uri http://localhost:19530 \--storm-writers 2 \--storm-batch-size 2000 \--preload 5000
运行结果:(与文档中报错URL对应的服务地址一致):
uri=http://192.168.4.115:19530collection=consistency_probe_1777278755_71fb2959preload 5000 rowsattempt=000 insert=47.7ms bounded=0(100.7ms) strong=1(171.7ms)REPRODUCED: Bounded missed the just-inserted row, Strong found it.marker=probe_0_96fadc07d29estrong_hit={'id': 1, 'distance': 1.0, 'entity': {'marker': 'probe_0_96fadc07d29e', 'round': 0, 'id': 1}}dropped collection=consistency_probe_1777278755_71fb2959
作者介绍
Zilliz黄金写手:尹珉
阅读推荐 官宣:Zilliz Cloud&Milvus发布CLI工具与官方Skill,让AI Agent成为专业VDB运维与开发助手 OpenClaw的记忆系统,用在企业场景还是太粗糙了 Vector Graph RAG 开源!一套向量数据库同时搞定语义检索+RAG多跳
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2026-05-06
看 AgentRun 如何玩转记忆存储,最佳实践来了!
2026-05-06
RAG 与 MCP:每位 AI 开发人员真正需要了解的知识
2026-04-30
RAG已死?不,是Grep回归了!
2026-04-27
Mem0 深度解析:智能记忆层的架构原理
2026-04-27
Karpathy的LLM Wiki + 3.5 万Star的Graphify:企业级 RAG 缺的真是知识图谱?
2026-04-23
2026 年做搜索就是做 Agent Memory
2026-04-22
专题解读 | 可更新的检索增强知识库发展方向及进展
2026-04-22
AI实践|基于 Spring AI 从0到1构建 AI Agent
2026-02-13
2026-02-06
2026-03-23
2026-02-06
2026-04-06
2026-02-06
2026-02-22
2026-03-18
2026-03-20
2026-02-15
2026-05-06
2026-04-27
2026-04-21
2026-03-17
2026-03-11
2026-02-22
2026-02-15
2026-02-04