tech-bid-manage20260423/utils/word_allocation.py
2026-04-23 14:37:19 +08:00

372 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
技术评分驱动的章节字数分配:读取 data/word_allocation_rules.json
结合 VOLUME_PRESETS 的 base/core 与项目 rating_json为每个叶节点生成
min_chars、word_count_spec及可选 max_tokens
"""
from __future__ import annotations
import json
import logging
import os
import re
from typing import Any, Dict, List, Optional, Tuple
import config
logger = logging.getLogger(__name__)
# 与 modules/generator.VOLUME_PRESETS 保持一致
VOLUME_PRESETS: Dict[str, Tuple[int, int, str, int]] = {
'concise': (1200, 2500, '精简版', 5000),
'standard': (2000, 4000, '标准版', 8000),
'detailed': (3000, 5500, '详细版', 12000),
'full': (4000, 7000, '充实版', 16000),
}
_PROVIDER_TOKEN_LIMITS = {
'deepseek': 8192,
'qwen': 8192,
'openai': 16384,
'ollama': 8192,
'doubao': 8192,
'kimi': 8192,
}
DEFAULT_RULES: Dict[str, Any] = {
'schema_version': 1,
'alpha': 0.85,
'budget_mode': 'target_pages',
'per_section_floor': None,
'per_section_cap': None,
'relevance': {'method': 'keyword_overlap', 'min_rating_weight': 0.01},
'rating_parse': {},
'prompt': {'top_k_rating_items': 4, 'intro_line': ''},
'max_tokens_scale': False,
}
def rules_path() -> str:
return os.path.join(config.DATA_DIR, 'word_allocation_rules.json')
def load_rules(path: Optional[str] = None) -> Dict[str, Any]:
"""加载规则 JSON文件缺失或解析失败时返回内置 DEFAULT_RULES。"""
p = path or rules_path()
data = dict(DEFAULT_RULES)
if not os.path.isfile(p):
return data
try:
with open(p, encoding='utf-8') as f:
raw = json.load(f)
if isinstance(raw, dict):
for k, v in raw.items():
if k.startswith('_'):
continue
if k == 'relevance' and isinstance(v, dict):
data['relevance'] = {**data.get('relevance', {}), **v}
elif k == 'prompt' and isinstance(v, dict):
data['prompt'] = {**data.get('prompt', {}), **v}
else:
data[k] = v
except Exception as e:
logger.warning('加载 word_allocation_rules.json 失败,使用内置默认: %s', e)
return data
def _as_float(x: Any, default: float = 0.0) -> float:
if x is None:
return default
if isinstance(x, (int, float)):
return float(x)
if isinstance(x, str):
s = re.sub(r'[^\d.\-]', '', x)
if not s:
return default
try:
return float(s)
except ValueError:
return default
return default
def _item_name(d: Dict[str, Any]) -> str:
for k in ('name', 'title', 'item_name', '评分项', '评分项名称', 'indicator'):
v = d.get(k)
if isinstance(v, str) and v.strip():
return v.strip()
return ''
def _item_weight(d: Dict[str, Any]) -> float:
for k in ('weight', 'score', '分值', 'max_score', '满分', 'points'):
if k in d:
w = _as_float(d.get(k), 0.0)
if w > 0:
return w
return 1.0
def _collect_rating_dicts(obj: Any, acc: List[Dict[str, Any]]) -> None:
if isinstance(obj, dict):
acc.append(obj)
for v in obj.values():
_collect_rating_dicts(v, acc)
elif isinstance(obj, list):
for v in obj:
_collect_rating_dicts(v, acc)
def parse_rating_json(raw: Optional[str]) -> List[Dict[str, Any]]:
"""
从 rating_json 字符串解析评分项列表。
每项: { 'name': str, 'weight': float, 'keywords': List[str] }
"""
if not raw or not isinstance(raw, str) or not raw.strip():
return []
try:
root = json.loads(raw.strip())
except json.JSONDecodeError:
return []
dicts: List[Dict[str, Any]] = []
_collect_rating_dicts(root, dicts)
items: List[Dict[str, Any]] = []
seen: set = set()
for d in dicts:
name = _item_name(d)
if not name or len(name) < 2:
continue
key = name.lower()
if key in seen:
continue
w = _item_weight(d)
kws: List[str] = []
kw = d.get('keywords') or d.get('keyword') or d.get('要点')
if isinstance(kw, list):
kws = [str(x).strip() for x in kw if isinstance(x, (str, int, float)) and str(x).strip()]
elif isinstance(kw, str) and kw.strip():
kws = [kw.strip()]
seen.add(key)
items.append({'name': name, 'weight': w, 'keywords': kws})
return items
def _title_tokens(title: str) -> List[str]:
if not title:
return []
s = re.sub(r'[\s\d..、,,;:/\\()【】\[\]「」]+', ' ', title)
parts = [p for p in s.split() if len(p) >= 2]
toks = list(parts)
for m in re.findall(r'[\u4e00-\u9fff]{2,}', title):
if m not in toks:
toks.append(m)
return toks
def _overlap_score(title: str, item: Dict[str, Any]) -> float:
tokens = _title_tokens(title)
if not tokens:
return 0.0
blob = item['name'] + ''.join(item.get('keywords') or [])
hit = sum(1 for t in tokens if t and t in blob)
score = hit / max(len(tokens), 1)
if item['name'] in title or title in item['name']:
score = max(score, 0.85)
for kw in item.get('keywords') or []:
if isinstance(kw, str) and len(kw) >= 2 and kw in title:
score = max(score, 0.7)
return min(1.0, score)
def _raw_utilities(
leaves: List[Dict[str, Any]],
items: List[Dict[str, Any]],
min_w: float,
) -> Tuple[List[float], List[List[Tuple[str, float]]]]:
"""每节 u_i = sum_j w_j * c_ij返回 u 与每节 top 相关项 (name, contrib)。"""
filtered = [it for it in items if it['weight'] >= min_w]
if not filtered:
filtered = items
n = len(leaves)
u = [0.0] * n
top_lists: List[List[Tuple[str, float]]] = [[] for _ in range(n)]
for i, leaf in enumerate(leaves):
title = leaf.get('section_title') or ''
contribs: List[Tuple[str, float]] = []
for it in filtered:
c = _overlap_score(title, it)
contrib = it['weight'] * c
if contrib > 0:
contribs.append((it['name'], contrib))
u[i] += contrib
contribs.sort(key=lambda x: -x[1])
top_lists[i] = contribs[:12]
max_u = max(u) if u else 0.0
if max_u <= 0:
u = [1.0] * n
else:
u = [x / max_u for x in u]
return u, top_lists
def _clamp_int(x: int, lo: int, hi: int) -> int:
return max(lo, min(hi, x))
def _water_adjust(
targets: List[int],
budget: int,
floor_v: int,
cap_v: int,
priority: List[float],
) -> List[int]:
"""在 [floor_v, cap_v] 内将 targets 整数化并尽量使 sum 接近 budget。"""
n = len(targets)
if n == 0:
return []
if floor_v > cap_v:
floor_v, cap_v = cap_v, floor_v
if n * floor_v > budget:
floor_v = max(1, budget // n)
if n * cap_v < budget:
cap_v = max(floor_v, (budget + n - 1) // n)
cur = [_clamp_int(t, floor_v, cap_v) for t in targets]
s = sum(cur)
delta = budget - s
order = sorted(range(n), key=lambda i: -priority[i])
inv_order = sorted(range(n), key=lambda i: priority[i])
step = 0
max_steps = max(n * 2000, abs(delta) + n)
while delta != 0 and step < max_steps:
step += 1
if delta > 0:
moved = False
for i in order:
if cur[i] < cap_v:
cur[i] += 1
delta -= 1
moved = True
break
if not moved:
break
else:
moved = False
for i in inv_order:
if cur[i] > floor_v:
cur[i] -= 1
delta += 1
moved = True
break
if not moved:
break
return cur
def compute_leaf_allocations(
volume_key: str,
leaves: List[Dict[str, Any]],
rating_raw: Optional[str],
rules: Optional[Dict[str, Any]] = None,
) -> Optional[Dict[int, Dict[str, Any]]]:
"""
为每个叶节点计算 target_chars、word_count_spec、max_tokens。
有技术评分项时按标题相关性分配;无评分项时,若规则为按目标页控总篇且已设页数,
则均分全稿总预算 B=目标页数×每页字数(否则返回 None调用方沿用旧逻辑
leaves: [{'id': int, 'section_title': str}, ...]
"""
rules = rules or load_rules()
if not leaves:
return {}
base, core, _, preset_tokens = VOLUME_PRESETS.get(
volume_key, VOLUME_PRESETS['standard']
)
floor_default = int(base * 0.5)
cap_default = core
floor_v = int(rules['per_section_floor']) if rules.get('per_section_floor') is not None else floor_default
cap_v = int(rules['per_section_cap']) if rules.get('per_section_cap') is not None else cap_default
floor_v = min(floor_v, cap_v)
alpha = float(rules.get('alpha', 0.85))
alpha = max(0.0, min(1.0, alpha))
min_w = float(rules.get('relevance', {}).get('min_rating_weight', 0.01))
n = len(leaves)
mode = (rules.get('budget_mode') or 'anchor_mean').strip()
pages_cfg = int(getattr(config, 'TARGET_PAGES', 0) or 0)
pce = max(1, int(getattr(config, 'PAGE_CHAR_ESTIMATE', 700) or 700))
if mode == 'target_pages' and pages_cfg > 0:
budget = int(round(pages_cfg * pce))
elif mode == 'anchor_base':
budget = int(round(n * base))
else:
budget = int(round(n * (base + core) / 2.0))
items = parse_rating_json(rating_raw)
if not items:
if not (mode == 'target_pages' and pages_cfg > 0):
return None
u = [1.0] * n
top_lists = [[] for _ in range(n)]
mid = 0.5 * (base + core)
raw_float = [float(mid)] * n
else:
u, top_lists = _raw_utilities(leaves, items, min_w)
band = core - base
raw_float = [
base + band * (alpha * u[i] + (1.0 - alpha) * 0.5) for i in range(n)
]
targets = [int(round(x)) for x in raw_float]
adjusted = _water_adjust(targets, budget, floor_v, cap_v, u)
provider = getattr(config, 'MODEL_PROVIDER', 'openai')
tok_limit = _PROVIDER_TOKEN_LIMITS.get(provider, 8192)
base_max_tok = min(preset_tokens, tok_limit)
scale_tokens = bool(rules.get('max_tokens_scale', False))
prompt_cfg = rules.get('prompt') or {}
top_k = int(prompt_cfg.get('top_k_rating_items', 4))
intro = (prompt_cfg.get('intro_line') or '').strip() or (
'本节须对下列技术评分要点作实质展开(结合工艺、流程、标准与可验证措施,禁止空泛承诺与复述招标文件):'
)
out: Dict[int, Dict[str, Any]] = {}
for i, leaf in enumerate(leaves):
sid = int(leaf['id'])
min_chars = max(1, adjusted[i])
contribs = top_lists[i][:top_k]
if contribs:
lines = '\n'.join(f' · {name}' for name, _ in contribs[:top_k])
spec = (
f'- 字数硬性要求(必须达到,不达标将续写补足):本节正文不少于 {min_chars}\n'
f'- {intro}\n{lines}\n'
f'- 内容须由可检验的技术与管理措施支撑,禁止堆砌套话与重复背景'
)
else:
spec = (
f'- 字数硬性要求(必须达到,不达标将续写补足):本节正文不少于 {min_chars}\n'
f'- 须紧扣章节标题与标书目录定位,充分展开可执行方案细节\n'
f'- 内容须由可检验的技术与管理措施支撑,禁止堆砌套话与重复背景'
)
max_tok = base_max_tok
if scale_tokens and base > 0:
max_tok = int(min(tok_limit, max(1024, base_max_tok * min_chars / base)))
out[sid] = {
'target_chars': min_chars,
'word_count_spec': spec,
'max_tokens': max_tok,
}
return out
def continuation_threshold(target_chars: int) -> int:
"""与 generator._get_min_chars 一致:续写到约目标字数的 65% 即停(多轮叠加逼近全文目标)。"""
return int(max(200, target_chars * 0.65))