feat: integrate basin force estimator (pre-force) for 7x12 sensor

This commit is contained in:
lenn
2026-05-11 17:04:47 +08:00
parent 83832139a8
commit 0833694e1b
29 changed files with 33875 additions and 7 deletions

View File

@@ -0,0 +1,747 @@
from typing import Dict, List, Tuple
import numpy as np
from indicator.base import Indicator
import math
class EMA:
"""指数移动平均"""
@staticmethod
def calc(data: np.ndarray, period: int) -> np.ndarray:
alpha = 2.0 / (period + 1)
out = np.empty_like(data)
out[0] = data[0]
for i in range(1, len(data)):
out[i] = alpha * data[i] + (1 - alpha) * out[i - 1]
return out
class SMA:
"""简单移动平均"""
@staticmethod
def calc(data: np.ndarray, period: int) -> np.ndarray:
out = np.full_like(data, np.nan)
if len(data) < period:
return out
cumsum = np.cumsum(data)
out[period - 1:] = (cumsum[period - 1:] - np.concatenate([[0], cumsum[:-period]])) / period
return out
def true_range(high: np.ndarray, low: np.ndarray, close: np.ndarray) -> np.ndarray:
"""True Range"""
tr = np.empty(len(high))
tr[0] = high[0] - low[0]
for i in range(1, len(high)):
tr[i] = max(high[i] - low[i], abs(high[i] - close[i - 1]), abs(low[i] - close[i - 1]))
return tr
class SignalTracker:
"""逐帧更新的信号追踪器(无回看依赖,纯实时)"""
def __init__(
self,
ob_threshold: float = 80,
os_threshold: float = 20,
dist_extreme: float = 5,
dist_far: float = 15,
dist_mid: float = 30,
macd_th: float = 0.3,
rsi_ob: float = 70,
rsi_os: float = 30,
di_gap_bull: float = 15,
di_gap_bear: float = -15,
aroon_ob: float = 80,
aroon_os: float = 20,
williams_ob: float = -20,
williams_os: float = -80,
bias_ob: float = 5,
bias_os: float = -5,
):
self.ob_threshold = ob_threshold
self.os_threshold = os_threshold
self.dist_extreme = dist_extreme
self.dist_far = dist_far
self.dist_mid = dist_mid
self.macd_th = macd_th
self.rsi_ob = rsi_ob
self.rsi_os = rsi_os
self.di_gap_bull = di_gap_bull
self.di_gap_bear = di_gap_bear
self.aroon_ob = aroon_ob
self.aroon_os = aroon_os
self.williams_ob = williams_ob
self.williams_os = williams_os
self.bias_ob = bias_ob
self.bias_os = bias_os
# 状态缓存
self._intp_buy_sma: float = 0.0
self._intp_sell_sma: float = 0.0
self._intp_sma_count: int = 0
self._max_qtb_score: float = 0.0
self._bull_count: int = 0
self._bear_count: int = 0
self._last_bull_idx: int = -100
self._last_bear_idx: int = -100
self._frame_idx: int = 0
# 历史缓冲区保留最近200帧用于EMA/SMA/DI计算
self._buf_size: int = 200
self._bbi_buf: List[float] = []
self._amplitude_buf: List[float] = []
self._close_buf: List[float] = []
self._high_buf: List[float] = []
self._low_buf: List[float] = []
self._wr_buf: List[float] = []
def reset(self):
"""重置所有状态"""
self._intp_buy_sma = 0.0
self._intp_sell_sma = 0.0
self._intp_sma_count = 0
self._max_qtb_score = 0.0
self._bull_count = 0
self._bear_count = 0
self._last_bull_idx = -100
self._last_bear_idx = -100
self._frame_idx = 0
self._bbi_buf.clear()
self._amplitude_buf.clear()
self._close_buf.clear()
self._high_buf.clear()
self._low_buf.clear()
self._wr_buf.clear()
def _append_buf(self, buf: List[float], val: float):
buf.append(val)
if len(buf) > self._buf_size:
buf.pop(0)
def _ema_val(self, buf: List[float], period: int) -> float:
"""返回缓冲区中最后一个EMA值"""
if len(buf) < period:
return buf[-1] if buf else 0.0
arr = np.array(buf, dtype=float)
ema = EMA.calc(arr, period)
return float(ema[-1])
def _sma_val(self, buf: List[float], period: int) -> float:
if len(buf) < period:
return np.nan
arr = np.array(buf, dtype=float)
sma = SMA.calc(arr, period)
return float(sma[-1])
def get_signals(
self,
amplitude: float,
bbi: float,
cci: float,
close: float,
dmk: float,
high: float,
k: float,
low: float,
qtb_score: float,
result: float,
wr: float,
percent: float,
v0: float,
boll_upper: float,
boll_lower: float,
macd_val: float,
pmacd: float,
bias: float,
) -> Tuple[str, str, str, str]:
"""
逐帧更新,返回 (market_status, signal_type, intensity, detail)
不依赖历史数组,所有指标值由外部实时计算后传入。
"""
# ─── 更新缓冲区 ───
self._append_buf(self._bbi_buf, bbi)
self._append_buf(self._amplitude_buf, amplitude)
self._append_buf(self._close_buf, close)
self._append_buf(self._high_buf, high)
self._append_buf(self._low_buf, low)
self._append_buf(self._wr_buf, wr)
# ─── 指标因子计算(使用传入值 + 缓冲区衍生值)───
k_macd = self._k_macd_factor(k, macd_val, pmacd)
k_di = self._k_di_factor(dmk, amplitude, close)
k_aroon = self._k_aroon_factor(high, low, close)
k_williams = self._k_williams_factor(wr)
k_result = self._k_result_factor(result)
k_bias = self._k_bias_factor(bias)
k_trix = self._k_trix_factor(close)
k_ema = self._k_ema_factor(close)
k_amplitude = self._k_amplitude_factor(amplitude)
# ─── 累计 ───
k_bull = k_macd + k_di + k_aroon + k_williams + k_result + k_bias + k_trix + k_ema + k_amplitude
k_bear = k_macd + k_di + k_aroon + k_williams + k_result + k_bias + k_trix + k_ema + k_amplitude
if result > self.ob_threshold and k_bull > 0 and result >= 90:
k_bull *= 1.5
# ─── 指数平滑 ───
if self._intp_sma_count == 0:
self._intp_buy_sma = k_bull
self._intp_sell_sma = k_bear
self._intp_sma_count = 1
else:
self._intp_sma_count += 1
period = max(3, min(5, self._intp_sma_count))
alpha = 2.0 / (period + 1)
self._intp_buy_sma = alpha * k_bull + (1 - alpha) * self._intp_buy_sma
self._intp_sell_sma = alpha * k_bear + (1 - alpha) * self._intp_sell_sma
# ─── 趋势确认与背离检测 ───
is_bull_div, is_bear_div = self._detect_divergence(close, result)
if is_bull_div:
self._intp_buy_sma *= 1.3
k_bull *= 1.3
if is_bear_div:
self._intp_sell_sma *= 1.3
k_bear *= 1.3
is_bull, is_bear = self._detect_trend(close)
if is_bull:
self._intp_buy_sma *= 1.15
k_bull *= 1.15
if is_bear:
self._intp_sell_sma *= 1.15
k_bear *= 1.15
wmacd = self._wmacd()
if wmacd > 0:
k_bull += wmacd * 0.5
elif wmacd < 0:
k_bear += abs(wmacd) * 0.5
if qtb_score > self._max_qtb_score * 0.7:
pass
elif qtb_score < 3 and qtb_score > 1:
k_bull *= 0.85
k_bear *= 0.85
# ─── 信号分类 ───
max_k = max(abs(k_bull), abs(k_bear))
if max_k > 0:
buy_score = ((k_bull + max_k) / (2 * max_k)) * 100
sell_score = ((k_bear + max_k) / (2 * max_k)) * 100
else:
buy_score = sell_score = 50
bull_t = buy_score > 70 and result < 90
bear_t = sell_score > 70 and result > 10
k_cci = cci / 300
boll_mid = self._boll_mid()
is_bull_t = (
bull_t
and k_bull > k_cci
and close < boll_mid
and (
(result > 65 and result < 85 and k_bull > 0 and k_bear > 0 and (k_bear - k_bull < 1.5 or result > 75))
or (result < 35 and result > 15 and k_bear > 0 and k_bull > 0 and (k_bull - k_bear < 1.5 or result < 25))
)
)
is_bear_t = (
bear_t
and k_bear > k_cci
and close > boll_mid
and (
(result > 65 and result < 85 and k_bull > 0 and k_bear > 0 and (k_bear - k_bull < 1.5 or result > 75))
or (result < 35 and result > 15 and k_bear > 0 and k_bull > 0 and (k_bull - k_bear < 1.5 or result < 25))
)
)
if is_bull_t:
self._bull_count += 1
self._bear_count = 0
self._last_bull_idx = self._frame_idx
elif is_bear_t:
self._bear_count += 1
self._bull_count = 0
self._last_bear_idx = self._frame_idx
else:
self._bull_count = max(self._bull_count - 1, 0)
self._bear_count = max(self._bear_count - 1, 0)
# ─── 信号输出 ───
sig_strength = 1.0 + max(0, (self._bull_count - 3) * 0.1) + max(0, (self._bear_count - 3) * 0.1)
strength = (
"极强" if sig_strength >= 1.7
else "" if sig_strength >= 1.4
else "" if sig_strength >= 1.1
else ""
)
if buy_score >= sell_score:
signal = f"{strength}"
else:
signal = f"{strength}"
# ─── 市场状态 ───
if abs(buy_score - sell_score) < 10:
status = "中性震荡"
elif buy_score > sell_score:
if self._bull_count >= 3:
status = "强势上涨"
elif result > self.ob_threshold:
status = "高位企稳"
else:
status = "温和上涨"
else:
if self._bear_count >= 3:
status = "强势下跌"
elif result < self.os_threshold:
status = "低位企稳"
else:
status = "温和下跌"
# ─── 距离 ───
boll_len = boll_upper - boll_lower
if boll_len > 0:
dist_ratio = (close - boll_lower) / boll_len * 100
else:
dist_ratio = 50
if dist_ratio < 50 - self.dist_extreme:
dist = "极端超卖"
elif dist_ratio < 50 - self.dist_far:
dist = "远离"
elif dist_ratio < 50 - self.dist_mid:
dist = "偏离"
elif dist_ratio < 50 + self.dist_mid:
dist = "接近"
elif dist_ratio < 50 + self.dist_far:
dist = "靠近"
elif dist_ratio < 50 + self.dist_extreme:
dist = "远超"
else:
dist = "极端超买"
# ─── 强度 ───
max_score = max(buy_score, sell_score)
intensity = (
"超强" if max_score >= 90
else "" if max_score >= 80
else "" if max_score >= 65
else "" if max_score >= 55
else "极弱"
)
# ─── 详情 ───
bias_val = self._ema_bias()
bbp_val = self._boll_pct_b(close, boll_upper, boll_lower)
detail = (
f"前量:{percent:.1f} 数量:{int(amplitude):03d} 百分比:{bbp_val:.1f} "
f"正:{k_bull:.1f} 负:{k_bear:.1f} 连买:{self._bull_count} 连卖:{self._bear_count} "
f"误差:{bias_val:.1f}"
)
self._frame_idx += 1
return status, signal, intensity, detail
# ═══════════════════════════════════════════════════
# 指标因子(全部基于实时数据,无回看窗口)
# ═══════════════════════════════════════════════════
def _k_macd_factor(self, k: float, macd_val: float, pmacd: float) -> float:
is_ob = k > self.ob_threshold
is_os = k < self.os_threshold
k_macd = 0.0
if is_ob or is_os:
if macd_val < 0:
if is_os and k < 30 and pmacd > 0 and pmacd <= 10 and macd_val > 1.5:
k_macd = 3.5
elif is_ob and k > 80 and macd_val > 3 and abs(pmacd) < 5:
k_macd = -3.5
elif pmacd < 0 and macd_val < 0:
if abs(pmacd) > 15 and macd_val >= -0.5:
k_macd = 3.5
elif 5 < abs(pmacd) < 15 and macd_val > 0.7:
k_macd = 3.5
elif pmacd > 0 and macd_val > 0:
if pmacd >= 15 and macd_val < 0.5:
k_macd = -3.5
elif 5 < pmacd < 15 and macd_val < -0.7:
k_macd = -3.5
elif macd_val >= 3:
k_macd = macd_val * 1.5
elif macd_val <= -3:
k_macd = macd_val * 1.5
return k_macd
def _k_di_factor(self, dmk: float, amplitude: float, close: float) -> float:
if len(self._close_buf) < 20:
return 0.0
k_close = np.array(self._close_buf, dtype=float)
period = 14
if len(k_close) < period + 1:
return 0.0
dx_list = []
for i in range(1, min(period + 1, len(k_close))):
diff = k_close[-i] - k_close[-i - 1]
dx_list.append(diff)
if not dx_list:
return 0.0
last_diff = dx_list[0]
adx_val = abs(dmk) * 0.5
k_di = 0.0
if adx_val < 20:
return 0.0
if dmk > 0 and len(dx_list) > 5:
gains = [d for d in dx_list[:5] if d > 0]
if len(gains) >= 3 and last_diff > 0:
k_di = min(adx_val / 5, 6.0)
elif dmk < 0 and len(dx_list) > 5:
losses = [d for d in dx_list[:5] if d < 0]
if len(losses) >= 3 and last_diff < 0:
k_di = -min(adx_val / 5, 6.0)
return k_di
def _k_aroon_factor(self, high: float, low: float, close: float) -> float:
period = 14
if len(self._high_buf) < period:
return 0.0
highs = self._high_buf[-period:]
lows = self._low_buf[-period:]
highest = max(highs)
lowest = min(lows)
rng = highest - lowest
if rng == 0:
return 0.0
k_aroon = 0.0
pct = (close - lowest) / rng * 100
if pct >= self.aroon_ob:
if close >= highest * 0.995:
k_aroon = -2.0
else:
k_aroon = 2.0
elif pct <= self.aroon_os:
if close <= lowest * 1.005:
k_aroon = 2.0
else:
k_aroon = -2.0
else:
k_aroon = (pct - 50) / 50 * 1.2
return k_aroon
def _k_williams_factor(self, wr: float) -> float:
k_wr = 0.0
if wr > self.williams_ob:
if wr > -10:
k_wr = -1.2
else:
k_wr = -0.8
elif wr < self.williams_os:
if wr < -90:
k_wr = 1.2
else:
k_wr = 0.8
return k_wr
def _k_result_factor(self, result: float) -> float:
k_result = 0.0
is_ob = result > self.ob_threshold
is_os = result < self.os_threshold
if is_os and result < 10:
k_result = 1.5
elif is_ob and result > 90:
k_result = -1.5
return k_result
def _k_bias_factor(self, bias: float) -> float:
k_bias = 0.0
if bias < self.bias_os and bias < -3:
k_bias = min(abs(bias) / 5, 2.5)
elif bias > self.bias_ob and bias > 3:
k_bias = -min(abs(bias) / 5, 2.5)
return k_bias
def _k_trix_factor(self, close: float) -> float:
ema3 = self._ema_val(self._close_buf, 3)
ema9 = self._ema_val(self._close_buf, 9)
if ema9 == 0:
return 0.0
trix = (ema3 - ema9) / ema9 * 100
k_trix = 0.0
if trix > 0:
k_trix = min(trix / 2, 3.0)
elif trix < 0:
k_trix = max(trix / 2, -3.0)
return k_trix
def _k_ema_factor(self, close: float) -> float:
ema9 = self._ema_val(self._close_buf, 9)
if ema9 == 0:
return 0.0
bias = (close - ema9) / ema9 * 100
k_ema = 0.0
if bias < -2:
k_ema = min(abs(bias) * 0.3, 2.0)
elif bias > 2:
k_ema = -min(abs(bias) * 0.3, 2.0)
return k_ema
def _k_amplitude_factor(self, amplitude: float) -> float:
if len(self._amplitude_buf) < 10:
return 0.0
buf = self._amplitude_buf[-10:]
mean = sum(buf) / len(buf)
if mean == 0:
return 0.0
ratio = (amplitude - mean) / mean
k_amp = 0.0
if ratio > 0.5:
k_amp = min(ratio * 1.5, 3.0)
elif ratio < -0.3:
k_amp = max(ratio * 1.5, -3.0)
return k_amp
def _detect_divergence(self, close: float, result: float) -> Tuple[bool, bool]:
"""简化背离检测(基于累计计数)"""
is_bull_div = False
is_bear_div = False
if len(self._close_buf) > 30:
c30 = self._close_buf[-30]
if close > c30 * 1.05 and result < 50:
is_bull_div = True
elif close < c30 * 0.95 and result > 50:
is_bear_div = True
return is_bull_div, is_bear_div
def _detect_trend(self, close: float) -> Tuple[bool, bool]:
"""简化趋势确认"""
is_bull = False
is_bear = False
if len(self._close_buf) > 20:
c20 = self._close_buf[-20]
if close > c20 * 1.05:
is_bull = True
elif close < c20 * 0.95:
is_bear = True
return is_bull, is_bear
def _wmacd(self) -> float:
"""简化加权MACD基于缓冲区"""
if len(self._close_buf) < 12:
return 0.0
ema12 = self._ema_val(self._close_buf, 12)
ema26 = self._ema_val(self._close_buf, 26)
return ema12 - ema26
def _boll_mid(self) -> float:
if len(self._close_buf) < 20:
return self._close_buf[-1] if self._close_buf else 0.0
return sum(self._close_buf[-20:]) / 20
def _ema_bias(self) -> float:
if not self._close_buf:
return 0.0
ema5 = self._ema_val(self._close_buf, 5)
ema10 = self._ema_val(self._close_buf, 10)
if ema10 == 0:
return 0.0
return (ema5 - ema10) / ema10 * 1000
def _boll_pct_b(self, close: float, upper: float, lower: float) -> float:
boll_len = upper - lower
if boll_len == 0:
return 50.0
return (close - lower) / boll_len * 100
def get_signals(indicator: Indicator) -> Tuple[str, str, str, str]:
"""
基于技术指标生成交易信号(逐帧调用版)
参数:
indicator: Indicator 对象,包含所有实时指标值
返回:
tuple: (market_status, signal_type, intensity, detail)
"""
# ─── 指标提取 ───
bbi = indicator.BBI
amplitude = indicator.AMPLITUDE
cci = indicator.CCI
close = indicator.CLOSE
dmk = indicator.DMK
high = indicator.HIGH
k = indicator.K
low = indicator.LOW
macd_val = indicator.MACD
ob = indicator.OB
os_ = indicator.OS
ovs = indicator.OVS
ovc = indicator.OVC
result = indicator.RESULT
wr = indicator.WR
percent = indicator.PERCENT
v0 = indicator.V0
boll_upper = indicator.BOLL_UP
boll_lower = indicator.BOLL_LO
bias = indicator.BIAS
# ─── 指标阈值 ───
ob_threshold = ob if ob > 0 else 80
os_threshold = os_ if os_ > 0 else 20
dist_extreme = ovc if ovc > 0 else 5
dist_far = ovs if ovs > 0 else 15
dist_mid = 30
macd_th = 0.3
rsi_ob = ovc if ovc > 0 else 70
rsi_os = ovs if ovs > 0 else 30
di_gap_bull = ovc if ovc > 0 else 15
di_gap_bear = -di_gap_bull
aroon_ob = ob_threshold
aroon_os = os_threshold
williams_ob = -20
williams_os = -80
bias_ob = 5
bias_os = -5
# ─── 指标因子 ───
pmacd = getattr(indicator, 'PMACD', macd_val)
k_macd = _k_macd_factor(ob_threshold, os_threshold, k, macd_val, pmacd)
k_di = _k_di_factor(dmk, amplitude, close)
k_aroon = _k_aroon_factor(aroon_ob, aroon_os, high, low, close)
k_williams = _k_williams_factor(williams_ob, williams_os, wr)
k_result = _k_result_factor(ob_threshold, os_threshold, result)
k_bias = _k_bias_factor(bias_ob, bias_os, bias)
# ─── K值累计 ───
k_bull = k_macd + k_di + k_aroon + k_williams + k_result + k_bias
k_bear = k_macd + k_di + k_aroon + k_williams + k_result + k_bias
if result > ob_threshold and k_bull > 0 and result >= 90:
k_bull *= 1.5
# ─── 强度计算 ───
max_k = max(abs(k_bull), abs(k_bear))
if max_k > 0:
buy_score = ((k_bull + max_k) / (2 * max_k)) * 100
sell_score = ((k_bear + max_k) / (2 * max_k)) * 100
else:
buy_score = sell_score = 50
# ─── 信号判断 ───
bull_t = buy_score > 70 and result < 90
bear_t = sell_score > 70 and result > 10
k_cci = cci / 300
boll_mid = indicator.BOLL_MID if hasattr(indicator, 'BOLL_MID') else (boll_upper + boll_lower) / 2
boll_len = boll_upper - boll_lower
is_bull_t = (
bull_t
and k_bull > k_cci
and close < boll_mid
and (
(result > 65 and result < 85 and k_bull > 0 and k_bear > 0 and (k_bear - k_bull < 1.5 or result > 75))
or (result < 35 and result > 15 and k_bear > 0 and k_bull > 0 and (k_bull - k_bear < 1.5 or result < 25))
)
)
is_bear_t = (
bear_t
and k_bear > k_cci
and close > boll_mid
and (
(result > 65 and result < 85 and k_bull > 0 and k_bear > 0 and (k_bear - k_bull < 1.5 or result > 75))
or (result < 35 and result > 15 and k_bear > 0 and k_bull > 0 and (k_bull - k_bear < 1.5 or result < 25))
)
)
# ─── 信号强度 ───
sig_strength = 1.0
if is_bull_t:
sig_strength += 0.3 + max(0, (result - 70) / 30)
elif is_bear_t:
sig_strength += 0.3 + max(0, (30 - result) / 30)
strength = (
"极强" if sig_strength >= 1.7
else "" if sig_strength >= 1.4
else "" if sig_strength >= 1.1
else ""
)
# ─── 信号类型 ───
if is_bull_t:
signal = f"{strength}"
elif is_bear_t:
signal = f"{strength}"
else:
signal = "观望"
# ─── 市场状态 ───
if abs(buy_score - sell_score) < 10:
status = "中性震荡"
elif buy_score > sell_score:
if result > ob_threshold:
status = "高位企稳"
else:
status = "温和上涨"
else:
if result < os_threshold:
status = "低位企稳"
else:
status = "温和下跌"
# ─── 距离 ───
if boll_len > 0:
dist_ratio = (close - boll_lower) / boll_len * 100
else:
dist_ratio = 50
if dist_ratio < 50 - dist_extreme:
dist = "极端超卖"
elif dist_ratio < 50 - dist_far:
dist = "远离"
elif dist_ratio < 50 - dist_mid:
dist = "偏离"
elif dist_ratio < 50 + dist_mid:
dist = "接近"
elif dist_ratio < 50 + dist_far:
dist = "靠近"
elif dist_ratio < 50 + dist_extreme:
dist = "远超"
else:
dist = "极端超买"
# ─── 强度标签 ───
max_score = max(buy_score, sell_score)
intensity = (
"超强" if max_score >= 90
else "" if max_score >= 80
else "" if max_score >= 65
else "" if max_score >= 55
else "极弱"
)
# ─── 详情 ───
ema5 = indicator.EMA5 if hasattr(indicator, 'EMA5') else close
ema10 = indicator.EMA10 if hasattr(indicator, 'EMA10') else close
ema20 = indicator.EMA20 if hasattr(indicator, 'EMA20') else close
bias_val = (ema5 - ema10) / ema10 * 1000 if ema10 != 0 else 0
bbp_val = (close - boll_lower) / boll_len * 100 if boll_len > 0 else 50
detail = (
f"前量:{percent:.1f} 数量:{int(amplitude):03d} 百分比:{bbp_val:.1f} "
f"正:{k_bull:.1f} 负:{k_bear:.1f}"
)
return status, signal, intensity, detail