Chad, EventHub, TradingEntities, Part3
26.02.07 EventHub, TradingEntities
https://ask.chadgpt.ru/share/b7d47f07-c985-48df-bbbc-70622e419623
Отличный сценарий для BackgroundService + Singleton Bus: по сути это in-process event bus/mediator между конвейерными стадиями.
Ниже — практичные рекомендации, чтобы это работало предсказуемо в ASP.NET Core и не “стреляло в ногу” по потокам, порядку, остановке и памяти.
1) Разделите “команды” и “события”
У вас есть два типа сообщений:
- События (fan-out):
QuoteReceived,TradeExecuted— их могут слушать несколько сервисов (например, InMemoryDB и EFDB одновременно). - Команды (point-to-point):
ExecuteOrder— обычно должен обработать один исполнитель.
Если оставить всё как “топики” — тоже можно, но договоритесь:
quotes— событие (много подписчиков допустимо)orders.execute— команда (ожидается один подписчик)trades— событиеpositions.updated— событие
Тогда вы избежите случайных “двойных исполнений”.
2) Самое важное: не делайте тяжёлую работу внутри обработчика Bus
Если у вас Publish вызывает подписчиков напрямую, то:
- синхронный publish будет блокировать генератора котировок,
- асинхронный publish начнёт выполнять обработчики параллельно и может породить лавину задач,
- ошибки/зависания обработчика будут влиять на остальных (в зависимости от реализации).
Практический паттерн для BackgroundService:
- подписчик минимально делает
channel.Writer.TryWrite(...)/WriteAsync(...)в свой входной канал, - а вся тяжёлая работа делается в
ExecuteAsyncсервиса, читая из канала.
То есть Bus становится “маршрутизатором”, а BackgroundService — “воркером”.
3) Сделайте входные очереди bounded + стратегия backpressure
Для котировок особенно важно: поток может быть выше, чем скорость стратегии/EF.
Рекомендация:
Quotes -> Strategy: bounded канал, при переполнении либоDropOldest(если вам важны свежие котировки), либо блокировать (если важна каждая).Orders -> Executor: bounded, обычно без дропа (каждый ордер важен).Trades -> Positions: bounded, без дропа.Events -> EF: bounded, можно блокировать или буферизовать, но лучше не терять.
В ASP.NET Core это обычно System.Threading.Channels с BoundedChannelOptions.
4) Про порядок сообщений
События по разным топикам/подписчикам почти неизбежно придут “в разное время”.
Если порядок критичен:
- внутри одного потока (например, QuotesService) добавляйте
Sequence/Timestamp, - Strategy/Executor/Positions учитывают sequence и могут отбрасывать “устаревшее”.
Особенно если вы решите сделать обработчики параллельными.
5) Обработка ошибок: локально и глобально
У вас уже есть onTopicFault — это хорошо как “предохранитель”.
Но ещё нужно:
- чтобы каждый
BackgroundServiceлогировал и решал: продолжать или останавливать pipeline. - для EF-сохранения — retry/backoff (например, 5 попыток с задержкой), и отдельный dead-letter (хотя бы лог + метрика).
Важно: если EF упал, не обязательно останавливать Strategy/Executor. Часто лучше продолжить торговать, а сохранение догонит позже (или деградировать).
6) DI и Scoped зависимости в Singleton Bus
Bus — singleton, а DbContext — scoped. Значит:
- не инжектите
DbContextв Bus/подписки напрямую (подписка живёт долго), - в
BackgroundServiceEntityFrameworkDataBaseиспользуйтеIServiceScopeFactoryи создавайте scope на обработку батча/сообщения.
Иначе получите утечки/ошибки lifetime.
7) Остановка приложения (graceful shutdown)
Нужно, чтобы при остановке:
- Quotes перестал генерить,
- Strategy/Executor/Positions дочитали свои каналы (или остановились по токену),
- InMemory/EF успели сбросить очереди (по возможности).
Практика:
StopAsync: завершать writer’ы каналов (TryComplete) и ждать drain с таймаутом.ExecuteAsync:await foreachпоReadAllAsync(stoppingToken)+ try/catch.
8) Дублирование доставки (at-least-once) и идемпотентность
В in-process bus обычно “at-most-once”, но при ошибках/перезапусках сервисов вы всё равно столкнётесь с:
- повторной генерацией котировок,
- повторной обработкой трейда при ретраях EF.
Поэтому полезно:
- иметь
IdуOrder/Trade, - в EF хранить уникальные ключи и делать idempotent insert/update.
9) Не смешивайте “данные для визуализации” и “источник истины”
Ваш InMemoryDataBase — отлично как read-model/кэш для UI.
Но решите заранее:
- является ли EF “истиной” и InMemory — производным,
- или InMemory — первичен, а EF — реплика.
Это влияет на то, что делать при падении EF/задержках.
Если хотите, я могу предложить конкретный скелет кода под ваш сценарий: типы сообщений (Quote, Order, Trade), имена топиков, и шаблон каждого BackgroundService с входным Channel<T> + подпиской на Bus. Скажите, котировки должны идти “каждая” или достаточно последней (drop-oldest/latest)? И нужна ли параллельная обработка ордеров исполнителем (N-воркеров) или строго последовательно?
----------
Уточнение.
Мы создаем полный цикл котировка->Strategy->order->executor->trade->position->(open_position, fix_profit, open_profit)->расчет_equity-> sharp-коефициент.
То есть мы сами создаем все текущие сущности.
В этом проекте мы не работаем с реальными рыночными данными.
Все что нам необходимо (рыночные сущности, excutors,quotes) мы генерируем сами.
Наша задача проверить как работает вся цепочка без реального рынка.
- online trade entities в InMemoryDataBase для текущей визуализации без истории
- offline trade entities в EntityFrameworkMemoryDataBase для текущей визуализации offline с историией - персистентное хранилище.
Пока цель только тестирование всего полного цикла торговли.
Когда этот этап будет работать правильно мы подключим реальные котировки и рыночные executors
Примерная схема БД
Tickers
Strategies - каждая стратегия имеет Ticker, Position, Orders, Trades
Orders
Trades
Position (CurrentPosition, RealizedProfit, UnrealizedProfit)
-------
Примерно так
Ниже — рабочий “скелет” под ваш полный цикл Quote → Strategy → Order → Executor → Trade → Position → PnL → Equity → Sharpe в виде нескольких BackgroundService + простая in-process BusLibrary на Channel<T>. Он заточен под ваш текущий режим: всё генерим сами, без внешнего рынка/брокера, с online (InMemory) для живой визуализации и offline (EF persistent) для истории/оффлайн просмотра.
Дам минимально-целостный набор файлов/типов; дальше вы сможете наращивать доменную модель.
1) Контракты сообщений (Bus messages)
public interface IBusMessage
{
DateTimeOffset Ts { get; }
}
public sealed record Quote(
string Ticker,
decimal Bid,
decimal Ask,
DateTimeOffset Ts
) : IBusMessage
{
public decimal Mid => (Bid + Ask) / 2m;
}
public enum OrderSide { Buy, Sell }
public sealed record Order(
Guid OrderId,
string StrategyId,
string Ticker,
OrderSide Side,
decimal Quantity,
decimal? LimitPrice,
DateTimeOffset Ts
) : IBusMessage;
public sealed record Trade(
Guid TradeId,
Guid OrderId,
string StrategyId,
string Ticker,
OrderSide Side,
decimal Quantity,
decimal Price,
DateTimeOffset Ts
) : IBusMessage;
public sealed record PositionSnapshot(
string StrategyId,
string Ticker,
decimal NetQuantity,
decimal AvgPrice,
decimal RealizedPnL,
decimal UnrealizedPnL,
decimal LastPrice,
DateTimeOffset Ts
) : IBusMessage;
public sealed record EquityPoint(
string StrategyId,
decimal Equity,
DateTimeOffset Ts
) : IBusMessage;
public sealed record SharpePoint(
string StrategyId,
double Sharpe,
DateTimeOffset Ts
) : IBusMessage;2) “BusLibrary”: топики на Channel<T>
Идея: Publish<T> пушит во все подписки на тип T. Подписчик получает ChannelReader<T>.
Важно: для котировок можно включить режим “последнее значение” (drop old), но ниже — базовый вариант. Потом добавим оптимизацию.
using System.Collections.Concurrent;
using System.Threading.Channels;
public interface IBus
{
ValueTask Publish<T>(T msg, CancellationToken ct = default);
ChannelReader<T> Subscribe<T>(string name, int capacity = 1024, BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait);
}
public sealed class InProcessBus : IBus
{
private readonly ConcurrentDictionary<Type, ConcurrentBag<object>> _subscribers = new();
public ChannelReader<T> Subscribe<T>(string name, int capacity = 1024, BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait)
{
var ch = Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)
{
SingleReader = true,
SingleWriter = false,
FullMode = fullMode
});
var bag = _subscribers.GetOrAdd(typeof(T), _ => new ConcurrentBag<object>());
bag.Add(ch);
return ch.Reader;
}
public async ValueTask Publish<T>(T msg, CancellationToken ct = default)
{
if (!_subscribers.TryGetValue(typeof(T), out var bag)) return;
foreach (var obj in bag)
{
if (obj is Channel<T> ch)
{
// Wait-режим: backpressure. Для котировок часто лучше DropOldest.
await ch.Writer.WriteAsync(msg, ct);
}
}
}
// Дополнительно для graceful shutdown можно сделать CompleteAll()
}3) Online / Offline хранилища (проекции)
3.1 InMemoryDataBase для “живой” визуализации без истории
Храним только текущие состояния/последние значения.
using System.Collections.Concurrent;
public sealed class InMemoryDataBase
{
public ConcurrentDictionary<string, Quote> LastQuotes { get; } = new();
public ConcurrentDictionary<(string StrategyId, string Ticker), PositionSnapshot> Positions { get; } = new();
public ConcurrentDictionary<string, EquityPoint> LastEquity { get; } = new();
public ConcurrentDictionary<string, SharpePoint> LastSharpe { get; } = new();
}3.2 EF “персистентное” хранилище для истории
Вы сказали EntityFrameworkMemoryDataBase, но вам нужна персистентность — значит обычно SQLite (файл) или Postgres. Для теста можно SQLite.
Скелет сущностей (упрощённо):
using Microsoft.EntityFrameworkCore;
public sealed class TradingDbContext : DbContext
{
public TradingDbContext(DbContextOptions<TradingDbContext> options) : base(options) { }
public DbSet<QuoteEntity> Quotes => Set<QuoteEntity>();
public DbSet<OrderEntity> Orders => Set<OrderEntity>();
public DbSet<TradeEntity> Trades => Set<TradeEntity>();
public DbSet<PositionPointEntity> PositionPoints => Set<PositionPointEntity>();
public DbSet<EquityEntity> Equities => Set<EquityEntity>();
public DbSet<SharpeEntity> Sharpes => Set<SharpeEntity>();
protected override void OnModelCreating(ModelBuilder b)
{
b.Entity<OrderEntity>().HasKey(x => x.OrderId);
b.Entity<TradeEntity>().HasKey(x => x.TradeId);
b.Entity<TradeEntity>()
.HasIndex(x => new { x.OrderId, x.TradeId })
.IsUnique();
// индексы по StrategyId/Ticker/Ts для быстрых графиков
b.Entity<EquityEntity>().HasIndex(x => new { x.StrategyId, x.Ts });
b.Entity<SharpeEntity>().HasIndex(x => new { x.StrategyId, x.Ts });
b.Entity<PositionPointEntity>().HasIndex(x => new { x.StrategyId, x.Ticker, x.Ts });
}
}
public sealed class QuoteEntity
{
public long Id { get; set; }
public string Ticker { get; set; } = default!;
public decimal Bid { get; set; }
public decimal Ask { get; set; }
public DateTimeOffset Ts { get; set; }
}
public sealed class OrderEntity
{
public Guid OrderId { get; set; }
public string StrategyId { get; set; } = default!;
public string Ticker { get; set; } = default!;
public OrderSide Side { get; set; }
public decimal Quantity { get; set; }
public decimal? LimitPrice { get; set; }
public DateTimeOffset Ts { get; set; }
}
public sealed class TradeEntity
{
public Guid TradeId { get; set; }
public Guid OrderId { get; set; }
public string StrategyId { get; set; } = default!;
public string Ticker { get; set; } = default!;
public OrderSide Side { get; set; }
public decimal Quantity { get; set; }
public decimal Price { get; set; }
public DateTimeOffset Ts { get; set; }
}
public sealed class PositionPointEntity
{
public long Id { get; set; }
public string StrategyId { get; set; } = default!;
public string Ticker { get; set; } = default!;
public decimal NetQuantity { get; set; }
public decimal AvgPrice { get; set; }
public decimal RealizedPnL { get; set; }
public decimal UnrealizedPnL { get; set; }
public decimal LastPrice { get; set; }
public DateTimeOffset Ts { get; set; }
}
public sealed class EquityEntity
{
public long Id { get; set; }
public string StrategyId { get; set; } = default!;
public decimal Equity { get; set; }
public DateTimeOffset Ts { get; set; }
}
public sealed class SharpeEntity
{
public long Id { get; set; }
public string StrategyId { get; set; } = default!;
public double Sharpe { get; set; }
public DateTimeOffset Ts { get; set; }
}4) BackgroundService: генератор котировок (QuoteGeneratorService)
Генерация “рынка”: рандом-волк (упрощённо). Публикуем Quote, а online db обновляем последним.
using Microsoft.Extensions.Hosting;
public sealed class QuoteGeneratorService : BackgroundService
{
private readonly IBus _bus;
private readonly InMemoryDataBase _mem;
private readonly string[] _tickers;
private readonly Random _rng = new(1);
private readonly Dictionary<string, decimal> _mid = new();
public QuoteGeneratorService(IBus bus, InMemoryDataBase mem, IEnumerable<string> tickers)
{
_bus = bus;
_mem = mem;
_tickers = tickers.ToArray();
foreach (var t in _tickers) _mid[t] = 100m;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var now = DateTimeOffset.UtcNow;
foreach (var t in _tickers)
{
var step = (decimal)(_rng.NextDouble() - 0.5) * 0.2m; // шум
_mid[t] = Math.Max(1m, _mid[t] + step);
var spread = 0.02m;
var q = new Quote(t, _mid[t] - spread / 2m, _mid[t] + spread / 2m, now);
_mem.LastQuotes[t] = q;
await _bus.Publish(q, stoppingToken);
}
await Task.Delay(50, stoppingToken); // 20 Hz на тикер (пример)
}
}
}5) StrategyService: Quote → Order
Стратегия читает quotes по подписке, принимает решение, генерит Order.
Для примера: простая mean-reversion/threshold.
using Microsoft.Extensions.Hosting;
using System.Threading.Channels;
public sealed class StrategyService : BackgroundService
{
private readonly IBus _bus;
private readonly ChannelReader<Quote> _quotes;
private readonly string _strategyId;
private readonly string _ticker;
private decimal _lastMid;
private bool _hasLast;
public StrategyService(IBus bus, string strategyId, string ticker)
{
_bus = bus;
_strategyId = strategyId;
_ticker = ticker;
_quotes = bus.Subscribe<Quote>($"strategy:{strategyId}", capacity: 2048, fullMode: BoundedChannelFullMode.DropOldest);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var q in _quotes.ReadAllAsync(stoppingToken))
{
if (q.Ticker != _ticker) continue;
var mid = q.Mid;
if (!_hasLast)
{
_lastMid = mid;
_hasLast = true;
continue;
}
var delta = mid - _lastMid;
_lastMid = mid;
// Игрушечное правило: если цена упала сильно — покупаем, выросла — продаём
const decimal threshold = 0.05m;
if (delta <= -threshold)
{
var order = new Order(
OrderId: Guid.NewGuid(),
StrategyId: _strategyId,
Ticker: _ticker,
Side: OrderSide.Buy,
Quantity: 1m,
LimitPrice: null,
Ts: q.Ts
);
await _bus.Publish(order, stoppingToken);
}
else if (delta >= threshold)
{
var order = new Order(
Guid.NewGuid(), _strategyId, _ticker, OrderSide.Sell, 1m, null, q.Ts
);
await _bus.Publish(order, stoppingToken);
}
}
}
}6) ExecutorService: Order → Trade
Executor — это ваш “симулятор биржи”: исполняет по Mid/Ask/Bid из последней котировки.
using Microsoft.Extensions.Hosting;
using System.Threading.Channels;
public sealed class ExecutorService : BackgroundService
{
private readonly IBus _bus;
private readonly InMemoryDataBase _mem;
private readonly ChannelReader<Order> _orders;
public ExecutorService(IBus bus, InMemoryDataBase mem)
{
_bus = bus;
_mem = mem;
_orders = bus.Subscribe<Order>("executor", capacity: 4096, fullMode: BoundedChannelFullMode.Wait);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var o in _orders.ReadAllAsync(stoppingToken))
{
if (!_mem.LastQuotes.TryGetValue(o.Ticker, out var q))
continue; // или ждать появления котировки
var price = o.Side == OrderSide.Buy ? q.Ask : q.Bid;
var trade = new Trade(
TradeId: Guid.NewGuid(),
OrderId: o.OrderId,
StrategyId: o.StrategyId,
Ticker: o.Ticker,
Side: o.Side,
Quantity: o.Quantity,
Price: price,
Ts: DateTimeOffset.UtcNow
);
await _bus.Publish(trade, stoppingToken);
}
}
}7) PositionService: Trade → PositionSnapshot (+ PnL)
Сводим сделки в позицию (net qty, avg price), считаем realized/unrealized по last price.
using Microsoft.Extensions.Hosting;
using System.Threading.Channels;
public sealed class PositionService : BackgroundService
{
private readonly IBus _bus;
private readonly InMemoryDataBase _mem;
private readonly ChannelReader<Trade> _trades;
private readonly ChannelReader<Quote> _quotes;
private sealed class State
{
public decimal NetQty;
public decimal AvgPrice; // по открытой части
public decimal RealizedPnL;
public decimal LastPrice;
}
private readonly Dictionary<(string StrategyId, string Ticker), State> _state = new();
public PositionService(IBus bus, InMemoryDataBase mem)
{
_bus = bus;
_mem = mem;
_trades = bus.Subscribe<Trade>("positions:trades", capacity: 4096);
_quotes = bus.Subscribe<Quote>("positions:quotes", capacity: 4096, fullMode: BoundedChannelFullMode.DropOldest);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Два входа: trades и quotes. Проще всего — объединить через отдельные задачи.
var t1 = Task.Run(() => ConsumeTrades(stoppingToken), stoppingToken);
var t2 = Task.Run(() => ConsumeQuotes(stoppingToken), stoppingToken);
await Task.WhenAll(t1, t2);
}
private async Task ConsumeQuotes(CancellationToken ct)
{
await foreach (var q in _quotes.ReadAllAsync(ct))
{
foreach (var key in _state.Keys.Where(k => k.Ticker == q.Ticker).ToArray())
{
_state[key].LastPrice = q.Mid;
PublishSnapshot(key, q.Ts, ct);
}
}
}
private async Task ConsumeTrades(CancellationToken ct)
{
await foreach (var tr in _trades.ReadAllAsync(ct))
{
var key = (tr.StrategyId, tr.Ticker);
if (!_state.TryGetValue(key, out var s))
{
s = new State { LastPrice = tr.Price };
_state[key] = s;
}
var signedQty = tr.Side == OrderSide.Buy ? tr.Quantity : -tr.Quantity;
// Реализация: если меняем знак/уменьшаем позицию — часть PnL реализуется
if (s.NetQty == 0 || Math.Sign(s.NetQty) == Math.Sign(signedQty))
{
// наращиваем в ту же сторону
var newQty = s.NetQty + signedQty;
var totalCost = s.AvgPrice * Math.Abs(s.NetQty) + tr.Price * Math.Abs(signedQty);
s.AvgPrice = Math.Abs(newQty) == 0 ? 0 : totalCost / Math.Abs(newQty);
s.NetQty = newQty;
}
else
{
// закрываем часть/всё
var closingQty = Math.Min(Math.Abs(s.NetQty), Math.Abs(signedQty));
var pnlPerUnit = (tr.Price - s.AvgPrice) * Math.Sign(s.NetQty); // знак позиции
s.RealizedPnL += pnlPerUnit * closingQty;
s.NetQty += signedQty;
if (s.NetQty == 0)
{
s.AvgPrice = 0;
}
else if (Math.Sign(s.NetQty) == Math.Sign(signedQty))
{
// переворот: оставшаяся часть открыта по цене сделки
s.AvgPrice = tr.Price;
}
}
s.LastPrice = tr.Price;
await PublishSnapshotAsync(key, tr.Ts, ct);
}
}
private void PublishSnapshot((string StrategyId, string Ticker) key, DateTimeOffset ts, CancellationToken ct)
=> PublishSnapshotAsync(key, ts, ct).GetAwaiter().GetResult();
private async Task PublishSnapshotAsync((string StrategyId, string Ticker) key, DateTimeOffset ts, CancellationToken ct)
{
var s = _state[key];
var unreal = s.NetQty == 0 ? 0m : (s.LastPrice - s.AvgPrice) * s.NetQty;
var snap = new PositionSnapshot(
StrategyId: key.StrategyId,
Ticker: key.Ticker,
NetQuantity: s.NetQty,
AvgPrice: s.AvgPrice,
RealizedPnL: s.RealizedPnL,
UnrealizedPnL: unreal,
LastPrice: s.LastPrice,
Ts: ts
);
_mem.Positions[(key.StrategyId, key.Ticker)] = snap;
await _bus.Publish(snap, ct);
}
}8) EquityService: PositionSnapshot → EquityPoint
У вас может быть несколько тикеров на стратегию — суммируйте PnL + base capital.
using Microsoft.Extensions.Hosting;
using System.Threading.Channels;
public sealed class EquityService : BackgroundService
{
private readonly IBus _bus;
private readonly InMemoryDataBase _mem;
private readonly ChannelReader<PositionSnapshot> _pos;
private readonly decimal _initialCapital;
private readonly Dictionary<string, decimal> _lastEquity = new();
private readonly Dictionary<string, Dictionary<string, PositionSnapshot>> _byStrat = new();
public EquityService(IBus bus, InMemoryDataBase mem, decimal initialCapital = 10_000m)
{
_bus = bus;
_mem = mem;
_initialCapital = initialCapital;
_pos = bus.Subscribe<PositionSnapshot>("equity", capacity: 4096);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var p in _pos.ReadAllAsync(stoppingToken))
{
if (!_byStrat.TryGetValue(p.StrategyId, out var dict))
{
dict = new Dictionary<string, PositionSnapshot>();
_byStrat[p.StrategyId] = dict;
}
dict[p.Ticker] = p;
var totalPnL = dict.Values.Sum(x => x.RealizedPnL + x.UnrealizedPnL);
var equity = _initialCapital + totalPnL;
var point = new EquityPoint(p.StrategyId, equity, p.Ts);
_mem.LastEquity[p.StrategyId] = point;
await _bus.Publish(point, stoppingToken);
}
}
}9) SharpeService: EquityPoint → SharpePoint
Для теста: считаем по доходностям за окно (например, 200 точек). Это “поточный” Sharpe без годовых коэффициентов.
using Microsoft.Extensions.Hosting;
using System.Threading.Channels;
public sealed class SharpeService : BackgroundService
{
private readonly IBus _bus;
private readonly InMemoryDataBase _mem;
private readonly ChannelReader<EquityPoint> _eq;
private readonly int _window;
private readonly Dictionary<string, Queue<double>> _rets = new();
private readonly Dictionary<string, decimal> _lastEq = new();
public SharpeService(IBus bus, InMemoryDataBase mem, int window = 200)
{
_bus = bus;
_mem = mem;
_window = window;
_eq = bus.Subscribe<EquityPoint>("sharpe", capacity: 4096);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var e in _eq.ReadAllAsync(stoppingToken))
{
if (_lastEq.TryGetValue(e.StrategyId, out var prev) && prev > 0)
{
var r = (double)((e.Equity - prev) / prev);
if (!_rets.TryGetValue(e.StrategyId, out var q))
{
q = new Queue<double>(_window + 1);
_rets[e.StrategyId] = q;
}
q.Enqueue(r);
while (q.Count > _window) q.Dequeue();
var arr = q.ToArray();
var mean = arr.Average();
var var = arr.Length > 1 ? arr.Select(x => (x - mean) * (x - mean)).Sum() / (arr.Length - 1) : 0.0;
var std = Math.Sqrt(var);
var sharpe = std == 0 ? 0.0 : mean / std;
var sp = new SharpePoint(e.StrategyId, sharpe, e.Ts);
_mem.LastSharpe[e.StrategyId] = sp;
await _bus.Publish(sp, stoppingToken);
}
_lastEq[e.StrategyId] = e.Equity;
}
}
}10) Offline persistence: подписчики-проекции в EF (Quotes/Orders/Trades/Positions/Equity/Sharpe)
Лучше делать отдельные writer-сервисы (или один батчер), чтобы не тормозить торговый контур. Скелет батчера (один сервис на всё):
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System.Threading.Channels;
public sealed class EfPersistenceService : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ChannelReader<Quote> _quotes;
private readonly ChannelReader<Order> _orders;
private readonly ChannelReader<Trade> _trades;
private readonly ChannelReader<PositionSnapshot> _pos;
private readonly ChannelReader<EquityPoint> _eq;
private readonly ChannelReader<SharpePoint> _sh;
public EfPersistenceService(IBus bus, IServiceScopeFactory scopeFactory)
{
_scopeFactory = scopeFactory;
_quotes = bus.Subscribe<Quote>("ef:quotes", capacity: 10_000, fullMode: BoundedChannelFullMode.DropOldest);
_orders = bus.Subscribe<Order>("ef:orders", capacity: 10_000);
_trades = bus.Subscribe<Trade>("ef:trades", capacity: 10_000);
_pos = bus.Subscribe<PositionSnapshot>("ef:pos", capacity: 10_000, fullMode: BoundedChannelFullMode.DropOldest);
_eq = bus.Subscribe<EquityPoint>("ef:eq", capacity: 10_000, fullMode: BoundedChannelFullMode.DropOldest);
_sh = bus.Subscribe<SharpePoint>("ef:sh", capacity: 10_000, fullMode: BoundedChannelFullMode.DropOldest);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var batchQuotes = new List<QuoteEntity>(200);
var batchPos = new List<PositionPointEntity>(200);
var batchEq = new List<EquityEntity>(200);
var batchSh = new List<SharpeEntity>(200);
var batchOrders = new List<OrderEntity>(200);
var batchTrades = new List<TradeEntity>(200);
while (!stoppingToken.IsCancellationRequested)
{
// Неблокирующая выборка "сколько успели" + flush по таймеру
Drain(_quotes, batchQuotes, q => new QuoteEntity { Ticker = q.Ticker, Bid = q.Bid, Ask = q.Ask, Ts = q.Ts }, 200);
Drain(_pos, batchPos, p => new PositionPointEntity
{
StrategyId = p.StrategyId, Ticker = p.Ticker,
NetQuantity = p.NetQuantity, AvgPrice = p.AvgPrice,
RealizedPnL = p.RealizedPnL, UnrealizedPnL = p.UnrealizedPnL,
LastPrice = p.LastPrice, Ts = p.Ts
}, 200);
Drain(_eq, batchEq, e => new EquityEntity { StrategyId = e.StrategyId, Equity = e.Equity, Ts = e.Ts }, 200);
Drain(_sh, batchSh, s => new SharpeEntity { StrategyId = s.StrategyId, Sharpe = s.Sharpe, Ts = s.Ts }, 200);
Drain(_orders, batchOrders, o => new OrderEntity
{
OrderId = o.OrderId, StrategyId = o.StrategyId, Ticker = o.Ticker,
Side = o.Side, Quantity = o.Quantity, LimitPrice = o.LimitPrice, Ts = o.Ts
}, 200);
Drain(_trades, batchTrades, t => new TradeEntity
{
TradeId = t.TradeId, OrderId = t.OrderId, StrategyId = t.StrategyId, Ticker = t.Ticker,
Side = t.Side, Quantity = t.Quantity, Price = t.Price, Ts = t.Ts
}, 200);
if (batchQuotes.Count + batchPos.Count + batchEq.Count + batchSh.Count + batchOrders.Count + batchTrades.Count > 0)
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<TradingDbContext>();
db.AddRange(batchQuotes); batchQuotes.Clear();
db.AddRange(batchPos); batchPos.Clear();
db.AddRange(batchEq); batchEq.Clear();
db.AddRange(batchSh); batchSh.Clear();
db.AddRange(batchOrders); batchOrders.Clear();
db.AddRange(batchTrades); batchTrades.Clear();
await db.SaveChangesAsync(stoppingToken);
}
await Task.Delay(200, stoppingToken);
}
}
private static void Drain<TMsg, TEntity>(
ChannelReader<TMsg> reader,
List<TEntity> batch,
Func<TMsg, TEntity> map,
int max)
{
while (batch.Count < max && reader.TryRead(out var msg))
batch.Add(map(msg));
}
}11) Регистрация в DI (Program.cs)
Примерно так (SQLite файл как persistent). Если вы реально хотите EF InMemory provider — замените на UseInMemoryDatabase, но это не персистентно.
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.AddSingleton<IBus, InProcessBus>();
services.AddSingleton<InMemoryDataBase>();
services.AddDbContext<TradingDbContext>(opt =>
opt.UseSqlite("Data Source=trading.db"));
// миграции/ensure created (для прототипа)
services.AddHostedService<DbInitService>();
// Market sim
services.AddSingleton<IEnumerable<string>>(_ => new[] { "AAPL", "MSFT" });
services.AddHostedService<QuoteGeneratorService>();
// Strategy(ии)
services.AddHostedService(sp => new StrategyService(
sp.GetRequiredService<IBus>(), strategyId: "S1", ticker: "AAPL"));
services.AddHostedService<ExecutorService>();
services.AddHostedService<PositionService>();
services.AddHostedService(sp => new EquityService(sp.GetRequiredService<IBus>(), sp.GetRequiredService<InMemoryDataBase>(), 10_000m));
services.AddHostedService<SharpeService>();
// Offline persistence projection
services.AddHostedService<EfPersistenceService>();
})
.Build();
await host.RunAsync();
public sealed class DbInitService : IHostedService
{
private readonly IServiceScopeFactory _scopeFactory;
public DbInitService(IServiceScopeFactory scopeFactory) => _scopeFactory = scopeFactory;
public async Task StartAsync(CancellationToken cancellationToken)
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<TradingDbContext>();
await db.Database.EnsureCreatedAsync(cancellationToken);
}
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}12) Важные настройки именно для вашего кейса (быстро и по делу)
Для “полного цикла” симуляции почти всегда нужно развести режимы очередей:
Quoteи производные “визуализационные” (PositionSnapshot,EquityPoint,SharpePoint) —DropOldest(нужна последняя картинка, а не каждая точка).OrderиTrade—Wait(нельзя терять).
Сейчас в примере это учтено частично.
Уточняющие вопросы (чтобы подогнать скелет под вашу схему БД и требования)
- Одна стратегия = один тикер, или стратегия может торговать несколько тикеров?
Orders/Tradesвам нужны в online InMemory тоже (для UI “лента”), или достаточно только позиции/эквити?- Исполнение: всегда market по
Bid/Ask, или хотите поддержатьLimitPriceи частичное исполнение/задержку? - Для offline истории: хотите хранить все quotes, или достаточно downsample (например, 1/sec), чтобы база не разрасталась?
Ответьте на эти 4 пункта — и я подстрою:
- модель
Strategies/Tickersближе к вашей таблице, - routing (топики) и ключи,
- и сделаю более “правильный”
Position(в т.ч. short/flip/fees).
2. InMemoryDataBase24 - все торговые сущности хранятся 24 часа
3. Поддержка MarketOrder, LimitOrder. Поддержка частичного исполнения. В Order должно быть поля Начальное кол-во контрактов и Rest (остаток контрактов).
4. Offline Quotes History Будет отдельный специальный проект и отдельная база данных для истории котировок для тестирования. Там будет совсем другая База Данных с подневным хранением нескольких торгуемых Тикеров в формате csv cо сжатием и с хранением в БазеДанных как Image . Но это уже совсем другой проект, поэтому в данном проекте мы это не делаем. Это уже не торговля, а сопутствующие проекты, которых на самом деле тоже много. Но пока мы их не трогаем.
Ок, тогда схема и “скелет” доменной модели можно зафиксировать так, чтобы:
- 1 стратегия = 1 тикер (жёсткая связь)
- все торговые сущности (orders/trades/positions) живут в InMemory “окне” 24 часа
- есть Market/Limit, partial fills, у
OrderестьInitialQtyиRestQty - offline quotes/history — вообще не трогаем в этом проекте
Ниже даю компактную, но “правильную” БД-схему (под EF Core), и как это ложится на вашу структуру.
Рекомендуемая схема БД (операционная, 24 часа)
tickers
id(PK, GUID/INT)symbol(unique, string)price_step(decimal) — опциональноqty_step(decimal) — опциональноcreated_utc
Индекс: UX_tickers_symbol
strategies
id(PK)name(string)ticker_id(FK →tickers.id) NOT NULLstatus(enum: Running/Stopped/Archived)created_utc
Ограничение: если строго “1 стратегия = 1 тикер” и тикер не может быть у двух стратегий, тогда:
UX_strategies_ticker_id(unique)
Если же допустимо несколько стратегий на один тикер — убираете unique.
orders
id(PK)strategy_id(FK →strategies.id)ticker_id(FK →tickers.id) — можно держать для быстрых выборок, но тогда следить за консистентностью (обычно ок)client_order_id(string, unique per strategy) — удобно для идемпотентностиside(enum: Buy/Sell)type(enum: Market/Limit)time_in_force(enum: GTC/IOC/FOK) — можно позжеlimit_price(decimal, nullable) — для Market = nullinitial_qty(decimal/int) — “Начальное кол-во контрактов”rest_qty(decimal/int) — “остаток”filled_qty(computed или поле) — можно хранить и поддерживать:filled = initial - restavg_fill_price(decimal, nullable) — средняя цена исполнения (по трейдам)status(enum: New/PartiallyFilled/Filled/Canceled/Rejected/Expired)created_utcupdated_utc
Индексы:
(strategy_id, created_utc desc)(ticker_id, created_utc desc)UX_orders_strategy_clientOrderId(если используете)
trades (fills)
Каждый partial fill — отдельная запись.
id(PK)order_id(FK →orders.id)strategy_id(FK →strategies.id) — денормализация для скоростиticker_id(FK →tickers.id)side(Buy/Sell) — обычно дублируют с order.side, но полезноqty(decimal/int)price(decimal)fee(decimal, default 0) — опциональноexecuted_utc
Индексы:
(order_id, executed_utc)(strategy_id, executed_utc desc)
positions
Позиция по стратегии (так как 1 стратегия = 1 тикер, позиция фактически 1:1 со strategy).
strategy_id(PK, FK →strategies.id)ticker_id(FK →tickers.id)net_qty(decimal/int) — текущая позиция (long > 0, short < 0)avg_entry_price(decimal, nullable) — средняя цена входа по открытой позицииrealized_pnl(decimal) — реализованная прибыльunrealized_pnl(decimal) — нереализованная прибыль (зависит от last price)last_price(decimal, nullable) — последняя цена для расчёта unrealizedupdated_utc
Индекс: (ticker_id) (если нужно быстро по тикеру)
> Важно: unrealized_pnl лучше либо не хранить, либо хранить как “кэш” и пересчитывать при обновлении last_price. Для InMemory/24h можно хранить.
Связи (в терминах “как вы написали”)
Ticker 1 — * Strategies(илиTicker 1 — 1 Strategyесли unique по ticker_id)Strategy 1 — 1 PositionStrategy 1 — * OrdersOrder 1 — * TradesStrategy 1 — * Trades(денормализовано, удобно)- В каждом
Order/TradeестьTickerId(можно и без него, но с ним проще фильтровать и чистить по тикеру)
24 часа в InMemoryDataBase24: как реализовать правильно
Есть два подхода:
Вариант A (самый простой): скользящее окно через периодическую очистку
- Все сущности пишутся в обычные таблицы (InMemory provider EF или SQLite in-memory, или даже реальная БД).
- Раз в N минут фоновой задачей удаляем всё, что старше 24 часов:
trades.executed_utc < now-24horders.created_utc < now-24h(но осторожно, если order ещё “живой”)positionsобычно не удаляем (позиция — текущее состояние), но если вы хотите “полностью 24h” — тогда позицию тоже пересоздавать/обнулять по правилам.
Для консистентности обычно:
tradesчистим по executed_utcordersчистим по updated_utc (если ордер давно завершён)positionsне чистим автоматически, потому что это не “история”, а состояние.
Вариант B: двухслойная модель
- “Состояние” (
positions, активныеorders) хранится как есть (не вычищается по TTL). - “Лента/история 24h” (
trades, завершенныеorders) — TTL 24h.
С учётом ваших целей (симуляция + UI + последняя активность) обычно Вариант B удобнее.
Частичное исполнение: что обязательно в доменной логике
Для каждого fill (trade) вы делаете:
order.rest_qty -= trade.qtyorder.status:- если
rest_qty == initial_qtyи был reject/cancel —Canceled/Rejected - если
0 < rest_qty < initial_qty—PartiallyFilled - если
rest_qty == 0—Filled
- если
order.avg_fill_priceпересчитывается по всем fills (взвешенная средняя)
И параллельно обновляется position:
net_qtyменяется по знакуsideavg_entry_priceиrealized_pnlсчитаются по правилам закрытия/переворота (long→flat, long→short и т.д.)unrealized_pnlсчитается отlast_price(которую вы обновляете из котировки/last)
Минимальные EF Core сущности (структура полей)
Если хотите — я дам готовые классы C# под EF Core (с enum’ами, конфигурацией OnModelCreating, индексами и ValueConverter под decimal/long), но чтобы не гадать, уточню 2 момента:
- Количество контрактов — это
int(целые) илиdecimal(дробные, как в крипте)? LimitOrder: исполняем по лучшей доступной цене (обычноmin(ask, limit)для buy иmax(bid, limit)для sell), или строго поlimit_price?
Скажете эти 2 вещи — и я в следующем сообщении выдам:
- полные C# entity-классы,
- EF Core fluent config,
- и простую реализацию “matching/исполнения” для Market/Limit с partial fills (на уровне симулятора).
2. LimitOrder: Исполнение по лучшей доступной цене
Ниже — рабочая схема сущностей и логики под EF Core для симулятора, где количество — decimal, а Limit исполняется по лучшей доступной цене (price improvement).
1) Сущности (Orders, Trades/Fills, Position, MarketPrice)
Ключевая идея: Order хранит агрегированное состояние, а Fill (Trade) — неизменяемые события исполнения. Позиция — текущий агрегат по инструменту.
public enum OrderSide { Buy = 1, Sell = 2 }
public enum OrderType { Market = 1, Limit = 2 }
public enum OrderStatus
{
New = 1,
PartiallyFilled = 2,
Filled = 3,
Canceled = 4,
Rejected = 5
}
public class Order
{
public long Id { get; set; }
public string Symbol { get; set; } = default!;
public OrderSide Side { get; set; }
public OrderType Type { get; set; }
public OrderStatus Status { get; set; } = OrderStatus.New;
// Submitted
public decimal Qty { get; set; } // initial
public decimal? LimitPrice { get; set; } // null for Market
// Aggregates
public decimal FilledQty { get; set; } // sum of fills
public decimal AvgFillPrice { get; set; } // weighted avg (0 if no fills)
public decimal RestQty { get; set; } // Qty - FilledQty
public DateTimeOffset CreatedAt { get; set; }
public DateTimeOffset? LastUpdatedAt { get; set; }
public List<Fill> Fills { get; set; } = new();
}
public class Fill
{
public long Id { get; set; }
public long OrderId { get; set; }
public Order Order { get; set; } = default!;
public string Symbol { get; set; } = default!;
public OrderSide Side { get; set; }
public decimal Qty { get; set; }
public decimal Price { get; set; }
public DateTimeOffset ExecutedAt { get; set; }
// удобно для идемпотентности/аудита (можно убрать)
public Guid ExecutionId { get; set; } = Guid.NewGuid();
}
public class Position
{
public long Id { get; set; }
public string Symbol { get; set; } = default!;
// Net: long > 0, short < 0
public decimal NetQty { get; set; }
// Средняя цена входа по ОТКРЫТОЙ части позиции (0 если flat)
public decimal AvgEntryPrice { get; set; }
public decimal RealizedPnl { get; set; }
public DateTimeOffset UpdatedAt { get; set; }
}
public class MarketPrice
{
public long Id { get; set; }
public string Symbol { get; set; } = default!;
public decimal Bid { get; set; }
public decimal Ask { get; set; }
public decimal Last { get; set; }
public DateTimeOffset Timestamp { get; set; }
}Почему так
Order.FilledQty/AvgFillPrice/RestQty— денормализация для быстрого UI/отчетов.- Истина о сделках — в
Fill(журнал событий). Positionхранит только текущий результат по символу.
2) EF Core Fluent Config (индексы, precision для decimal)
Для decimal обязательно задать HasPrecision, иначе провайдер может выбрать неудобный тип.
using Microsoft.EntityFrameworkCore;
public class SimDbContext : DbContext
{
public DbSet<Order> Orders => Set<Order>();
public DbSet<Fill> Fills => Set<Fill>();
public DbSet<Position> Positions => Set<Position>();
public DbSet<MarketPrice> MarketPrices => Set<MarketPrice>();
protected override void OnModelCreating(ModelBuilder b)
{
// Order
b.Entity<Order>(e =>
{
e.HasKey(x => x.Id);
e.Property(x => x.Symbol).HasMaxLength(32).IsRequired();
e.Property(x => x.Qty).HasPrecision(28, 10);
e.Property(x => x.FilledQty).HasPrecision(28, 10);
e.Property(x => x.RestQty).HasPrecision(28, 10);
e.Property(x => x.LimitPrice).HasPrecision(28, 10);
e.Property(x => x.AvgFillPrice).HasPrecision(28, 10);
e.HasMany(x => x.Fills)
.WithOne(x => x.Order)
.HasForeignKey(x => x.OrderId)
.OnDelete(DeleteBehavior.Cascade);
e.HasIndex(x => new { x.Symbol, x.CreatedAt });
e.HasIndex(x => x.Status);
});
// Fill
b.Entity<Fill>(e =>
{
e.HasKey(x => x.Id);
e.Property(x => x.Symbol).HasMaxLength(32).IsRequired();
e.Property(x => x.Qty).HasPrecision(28, 10);
e.Property(x => x.Price).HasPrecision(28, 10);
e.HasIndex(x => new { x.OrderId, x.ExecutedAt });
e.HasIndex(x => x.ExecutionId).IsUnique();
});
// Position
b.Entity<Position>(e =>
{
e.HasKey(x => x.Id);
e.Property(x => x.Symbol).HasMaxLength(32).IsRequired();
e.Property(x => x.NetQty).HasPrecision(28, 10);
e.Property(x => x.AvgEntryPrice).HasPrecision(28, 10);
e.Property(x => x.RealizedPnl).HasPrecision(28, 10);
e.HasIndex(x => x.Symbol).IsUnique();
});
// MarketPrice
b.Entity<MarketPrice>(e =>
{
e.HasKey(x => x.Id);
e.Property(x => x.Symbol).HasMaxLength(32).IsRequired();
e.Property(x => x.Bid).HasPrecision(28, 10);
e.Property(x => x.Ask).HasPrecision(28, 10);
e.Property(x => x.Last).HasPrecision(28, 10);
e.HasIndex(x => new { x.Symbol, x.Timestamp });
});
}
}Precision 28,10 — типичный безопасный выбор. Если у вас ограниченные масштабы, можно меньше.
3) Ценообразование (Market / Limit с лучшей ценой)
Пусть на момент исполнения есть bid/ask.
- Market Buy →
price = ask - Market Sell →
price = bid - Limit Buy:
- если
ask <= limit→ исполняется по ask (улучшение) - иначе не исполняется
- если
- Limit Sell:
- если
bid >= limit→ исполняется по bid - иначе не исполняется
- если
Частичное исполнение в простом симуляторе обычно задают через “доступную ликвидность” на тике (availableQty). Тогда fillQty = min(order.RestQty, availableQty).
4) Пересчет Order по fill (денормализация)
static void ApplyFillToOrder(Order o, decimal fillQty, decimal fillPrice, DateTimeOffset ts)
{
if (fillQty <= 0) throw new ArgumentOutOfRangeException(nameof(fillQty));
if (fillPrice <= 0) throw new ArgumentOutOfRangeException(nameof(fillPrice));
if (o.RestQty <= 0) throw new InvalidOperationException("Order already completed.");
if (fillQty > o.RestQty) fillQty = o.RestQty;
var prevFilledQty = o.FilledQty;
var prevNotional = o.AvgFillPrice * prevFilledQty;
var newFilledQty = prevFilledQty + fillQty;
var newNotional = prevNotional + (fillQty * fillPrice);
o.FilledQty = newFilledQty;
o.RestQty = o.Qty - o.FilledQty;
o.AvgFillPrice = (o.FilledQty == 0m) ? 0m : (newNotional / o.FilledQty);
o.LastUpdatedAt = ts;
if (o.RestQty == 0m) o.Status = OrderStatus.Filled;
else if (o.FilledQty > 0m) o.Status = OrderStatus.PartiallyFilled;
else o.Status = OrderStatus.New;
}5) Обновление позиции (long/short, частичное закрытие, переворот)
Ниже — надежный вариант “netting” с учетом закрытия и переворота.
Соглашение:
- Buy увеличивает
NetQty - Sell уменьшает
NetQty AvgEntryPrice— цена открытой части позицииRealizedPnlобновляется при закрытии части позиции- Комиссии/слиппедж не учитываем (можно добавить позже)
static void ApplyFillToPosition(Position p, OrderSide side, decimal qty, decimal price, DateTimeOffset ts)
{
if (qty <= 0) throw new ArgumentOutOfRangeException(nameof(qty));
if (price <= 0) throw new ArgumentOutOfRangeException(nameof(price));
// signedQty: buy +, sell -
var signedQty = side == OrderSide.Buy ? qty : -qty;
var prevQty = p.NetQty;
var newQty = prevQty + signedQty;
// If flat before
if (prevQty == 0m)
{
p.NetQty = newQty;
p.AvgEntryPrice = price; // вход по цене сделки
p.UpdatedAt = ts;
return;
}
// Same direction (increase existing position)
if ((prevQty > 0m && signedQty > 0m) || (prevQty < 0m && signedQty < 0m))
{
var prevAbs = Math.Abs(prevQty);
var addAbs = Math.Abs(signedQty);
var newAbs = prevAbs + addAbs;
// weighted avg entry
p.AvgEntryPrice = (p.AvgEntryPrice * prevAbs + price * addAbs) / newAbs;
p.NetQty = newQty;
p.UpdatedAt = ts;
return;
}
// Opposite direction: closing and maybe flipping
var closeAbs = Math.Min(Math.Abs(prevQty), Math.Abs(signedQty));
var openAbs = Math.Abs(signedQty) - closeAbs;
// Realized PnL from closing part:
// If prev was long (>0) and we sell: pnl = (sellPrice - entry) * closeQty
// If prev was short (<0) and we buy: pnl = (entry - buyPrice) * closeQty
decimal realized;
if (prevQty > 0m)
realized = (price - p.AvgEntryPrice) * closeAbs;
else
realized = (p.AvgEntryPrice - price) * closeAbs;
p.RealizedPnl += realized;
// After closing:
if (openAbs == 0m)
{
// fully closed to flat
p.NetQty = 0m;
p.AvgEntryPrice = 0m;
p.UpdatedAt = ts;
return;
}
// flipped: remaining opens new position in direction of signedQty
p.NetQty = newQty; // will have sign of signedQty
p.AvgEntryPrice = price; // new position entry at execution price (for remaining)
p.UpdatedAt = ts;
}6) Связать всё в “ExecuteOrderOnTick”
Примерно так: берете MarketPrice, определяете возможно ли исполнение, находите fillQty, создаете Fill, применяете к Order и Position, сохраняете.
Если хотите, я напишу полностью метод сервиса (с транзакцией EF Core, FirstOrDefaultAsync, обработкой идемпотентности через ExecutionId и т.п.). Для этого уточните:
- У вас одна позиция на символ (netting), как выше, или “по сделкам” (FIFO lots)?
- Нужно ли учитывать частичную ликвидность на тике (например,
availableQty), или можно считать что всегда исполняется весьRestQtyпри выполнении условия?
2. Учитывать частичную ликвидность на тике (например, availableQty)
Понял: делаем netting (одна позиция на символ) и на тике учитываем частичную ликвидность. Ниже — цельный, рабоче-ориентированный скелет: decimal для количества контрактов, LimitOrder исполняется по лучшей доступной цене (price improvement), поддержка partial fills и корректный PnL для netting.
Ключевые правила
- Qty и все расчёты количества:
decimal. - На каждом тике есть “стаканная” доступность:
- для покупки: лучшая цена продажи
askи объёмaskQty - для продажи: лучшая цена покупки
bidи объёмbidQty
- для покупки: лучшая цена продажи
- Лимитка исполняется если:
- Buy:
ask <= limitPrice - Sell:
bid >= limitPrice
- Buy:
- Цена исполнения: всегда лучшая доступная (то есть
askдля Buy,bidдля Sell), а не лимитная. - Объём исполнения:
min(orderRemaining, availableQty).
Модели (минимум)
public enum Side { Buy, Sell }
public enum OrderType { Limit }
public enum OrderStatus { New, PartiallyFilled, Filled, Cancelled }
public sealed class Order
{
public Guid Id { get; set; }
public string Symbol { get; set; } = default!;
public Side Side { get; set; }
public OrderType Type { get; set; } = OrderType.Limit;
public decimal Qty { get; set; } // исходное
public decimal FilledQty { get; set; } // накопленное
public decimal LimitPrice { get; set; }
public OrderStatus Status { get; set; } = OrderStatus.New;
public DateTimeOffset CreatedAt { get; set; }
public DateTimeOffset UpdatedAt { get; set; }
public decimal RemainingQty => Qty - FilledQty;
}
public sealed class Position
{
public Guid Id { get; set; }
public string Symbol { get; set; } = default!;
public decimal NetQty { get; set; } // >0 long, <0 short
public decimal AvgEntryPrice { get; set; } // средняя входа текущей позиции (для netting)
public decimal RealizedPnl { get; set; } // накопленная реализованная
public DateTimeOffset UpdatedAt { get; set; }
}
public sealed class Fill
{
public Guid Id { get; set; }
public Guid OrderId { get; set; }
public string Symbol { get; set; } = default!;
public Side Side { get; set; }
public decimal Qty { get; set; } // decimal
public decimal Price { get; set; } // цена исполнения (bid/ask)
public DateTimeOffset Ts { get; set; }
// На случай идемпотентности:
public string ExecutionKey { get; set; } = default!; // напр. tickId+orderId+seq
}
public sealed class TopOfBookTick
{
public string Symbol { get; init; } = default!;
public DateTimeOffset Ts { get; init; }
public decimal Bid { get; init; }
public decimal BidQty { get; init; }
public decimal Ask { get; init; }
public decimal AskQty { get; init; }
public string TickId { get; init; } = default!; // уникальный идентификатор события/тика
}Проверка исполнения лимитки (best available price)
private static bool CanFillLimit(Order o, TopOfBookTick t)
{
if (o.Type != OrderType.Limit) throw new NotSupportedException();
return o.Side switch
{
Side.Buy => t.AskQty > 0m && t.Ask <= o.LimitPrice,
Side.Sell => t.BidQty > 0m && t.Bid >= o.LimitPrice,
_ => false
};
}
private static decimal BestPrice(Order o, TopOfBookTick t) =>
o.Side == Side.Buy ? t.Ask : t.Bid;
private static decimal AvailableQty(Order o, TopOfBookTick t) =>
o.Side == Side.Buy ? t.AskQty : t.BidQty;Применение fill к ордеру
private static void ApplyFillToOrder(Order o, decimal fillQty, DateTimeOffset ts)
{
if (fillQty <= 0m) return;
o.FilledQty += fillQty;
if (o.FilledQty >= o.Qty)
{
o.FilledQty = o.Qty;
o.Status = OrderStatus.Filled;
}
else
{
o.Status = OrderStatus.PartiallyFilled;
}
o.UpdatedAt = ts;
}Netting-позиция + Realized PnL (корректно при частичных закрытиях и переворотах)
private static void ApplyFillToPosition(Position p, Side side, decimal qty, decimal price, DateTimeOffset ts)
{
if (qty <= 0m) return;
// signedQty: buy => +, sell => -
var signedQty = side == Side.Buy ? qty : -qty;
var prevQty = p.NetQty;
var newQty = prevQty + signedQty;
// If position was flat => open new
if (prevQty == 0m)
{
p.NetQty = newQty;
p.AvgEntryPrice = price;
p.UpdatedAt = ts;
return;
}
// Same direction => increase position, update avg
if (Math.Sign((double)prevQty) == Math.Sign((double)newQty) && Math.Sign((double)prevQty) == Math.Sign((double)signedQty))
{
var prevAbs = Math.Abs(prevQty);
var addAbs = Math.Abs(signedQty);
var newAbs = prevAbs + addAbs;
p.AvgEntryPrice = (p.AvgEntryPrice * prevAbs + price * addAbs) / newAbs;
p.NetQty = newQty;
p.UpdatedAt = ts;
return;
}
// Otherwise: reducing or flipping
var closeAbs = Math.Min(Math.Abs(prevQty), Math.Abs(signedQty));
var openAbs = Math.Abs(signedQty) - closeAbs;
decimal realized = 0m;
if (closeAbs > 0m)
{
// If we were long and we sell => (sell - entry) * close
// If we were short and we buy => (entry - buy) * close
realized = prevQty > 0m
? (price - p.AvgEntryPrice) * closeAbs
: (p.AvgEntryPrice - price) * closeAbs;
p.RealizedPnl += realized;
}
if (openAbs == 0m)
{
// fully closed to flat
p.NetQty = 0m;
p.AvgEntryPrice = 0m;
p.UpdatedAt = ts;
return;
}
// flipped: leftover opens new position at fill price
p.NetQty = newQty; // sign flipped
p.AvgEntryPrice = price; // new entry
p.UpdatedAt = ts;
}Примечание: Math.Sign(double) — не идеально для decimal. Лучше сделать свою функцию знака для decimal:
private static int Sign(decimal x) => x > 0m ? 1 : x < 0m ? -1 : 0;и заменить Math.Sign((double)...).
Полный сервис: “исполнить ордер на тике” с учётом liquidity и идемпотентности
Ниже пример “один ордер — один тик”. В реальности вы вероятно будете матчить пачку ордеров по символу на каждый тик.
public sealed class MatchingService
{
private readonly AppDbContext _db;
public MatchingService(AppDbContext db) => _db = db;
public async Task<Fill?> TryExecuteLimitOnTickAsync(Guid orderId, TopOfBookTick tick, CancellationToken ct)
{
// 1) Load order
var order = await _db.Orders.SingleOrDefaultAsync(o => o.Id == orderId, ct);
if (order is null) return null;
if (order.Symbol != tick.Symbol) return null;
if (order.Status is OrderStatus.Filled or OrderStatus.Cancelled) return null;
if (order.RemainingQty <= 0m) return null;
// 2) Price/liq check
if (!CanFillLimit(order, tick)) return null;
var px = BestPrice(order, tick);
var avail = AvailableQty(order, tick);
if (avail <= 0m) return null;
// 3) Determine fill qty (partial liquidity)
var fillQty = Math.Min(order.RemainingQty, avail);
if (fillQty <= 0m) return null;
// 4) Idempotency key: prevent duplicate fill on reprocessing same tick
// Choose a scheme that is unique per (order, tick) if you allow at most one fill per tick.
var execKey = $"{tick.TickId}:{order.Id}";
var already = await _db.Fills.AnyAsync(f => f.ExecutionKey == execKey, ct);
if (already) return null;
// 5) Load/Create position (netting: one per symbol/account)
// Если есть AccountId — добавьте в ключ.
var pos = await _db.Positions.SingleOrDefaultAsync(p => p.Symbol == order.Symbol, ct);
if (pos is null)
{
pos = new Position
{
Id = Guid.NewGuid(),
Symbol = order.Symbol,
NetQty = 0m,
AvgEntryPrice = 0m,
RealizedPnl = 0m,
UpdatedAt = tick.Ts
};
_db.Positions.Add(pos);
}
// 6) Apply changes atomically
// Важно: в проде оберните в транзакцию + правильный уровень изоляции/row locks.
var fill = new Fill
{
Id = Guid.NewGuid(),
OrderId = order.Id,
Symbol = order.Symbol,
Side = order.Side,
Qty = fillQty,
Price = px,
Ts = tick.Ts,
ExecutionKey = execKey
};
_db.Fills.Add(fill);
ApplyFillToOrder(order, fillQty, tick.Ts);
ApplyFillToPosition(pos, order.Side, fillQty, px, tick.Ts);
await _db.SaveChangesAsync(ct);
return fill;
}
}Про важное в многопоточности/конкурентности
Если у вас несколько воркеров могут одновременно исполнять один и тот же ордер/символ, вам нужно одно из:
- транзакция + блокировка строки ордера/позиции (в SQL Server:
UPDLOCK, в Postgres:SELECT ... FOR UPDATE); - оптимистичная конкуренция через
RowVersion/xmin+ retry; - уникальный индекс на
Fills.ExecutionKey(желательно) и обработкаunique violation.
Минимально рекомендую: уникальный индекс на ExecutionKey и retry на конфликт.
Уточнения, которые влияют на финальную реализацию
availableQtyна тике — это “объём на лучшей цене” (top-of-book) или суммарная глубина? Сейчас я заложил top-of-book.- Разрешаете ли несколько исполнений одного ордера в рамках одного tickId (например, если вы прогоняете ордера циклом и обновляете доступность)? Если да —
ExecutionKeyнадо сделать сseq/fillNo. - Есть ли комиссии/проскальзывание/маржа — добавим в
Fillи в расчёт PnL.
Скажите 1–2–3, и я под это подправлю код (включая транзакции и индексы).
2. Возьмите самое простое решение
3. Да, комиссия учитываться должна,
проскальзование можно опустить,
маржы должно хватать на исполнение ордера
Ок: top-of-book, самое простое, комиссии да, проскальзывание нет, маржи всегда хватает.
Ниже — простая схема:
- берём цену и доступное кол-во на лучшей стороне;
- исполняем
min(remaining, available); - создаём один
Fillна тик; - учитываем комиссию в
Fillи в позиции (через накопленную комиссию), не усложняя PnL; - защищаемся от дублей самым простым способом:
ExecutionKey = (orderId, tickId)+ уникальный индекс.
Модели (минимально нужное)
public enum Side { Buy, Sell }
public class Order
{
public Guid Id { get; set; }
public string Symbol { get; set; } = "";
public Side Side { get; set; }
public decimal Qty { get; set; } // желаемый объём
public decimal FilledQty { get; set; } // сколько уже исполнено
public DateTimeOffset CreatedTs { get; set; }
public DateTimeOffset? ClosedTs { get; set; }
}
public class Position
{
public Guid Id { get; set; }
public string Symbol { get; set; } = "";
public decimal Qty { get; set; } // +long, -short
public decimal AvgPx { get; set; } // средняя цена входа для открытой позиции
public decimal RealizedPnl { get; set; } // можно считать позже, но пусть будет
public decimal FeesPaid { get; set; } // накопленные комиссии
public DateTimeOffset UpdatedTs { get; set; }
}
public class Fill
{
public Guid Id { get; set; }
public Guid OrderId { get; set; }
public string Symbol { get; set; } = "";
public Side Side { get; set; }
public decimal Qty { get; set; }
public decimal Price { get; set; }
public decimal Fee { get; set; }
public DateTimeOffset Ts { get; set; }
// Самое простое для “один fill на тик на ордер”
public string ExecutionKey { get; set; } = ""; // $"{orderId:N}:{tickId}"
}Уникальный индекс на ExecutionKey (защита от дублей)
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<Fill>()
.HasIndex(x => x.ExecutionKey)
.IsUnique();
}Tick (top-of-book)
public sealed record MarketTick(
string Symbol,
long TickId,
DateTimeOffset Ts,
decimal BestBidPx,
decimal BestBidQty,
decimal BestAskPx,
decimal BestAskQty
);Комиссия (простая модель)
Самый простой вариант — фиксированная ставка в базисных пунктах от notional:
public interface IFeeModel
{
decimal CalcFee(decimal qty, decimal price, Side side);
}
public sealed class ProportionalFeeModel : IFeeModel
{
private readonly decimal _rate; // например 0.0004m = 4 bps
public ProportionalFeeModel(decimal rate) => _rate = rate;
public decimal CalcFee(decimal qty, decimal price, Side side)
=> Math.Abs(qty) * price * _rate;
}Исполнение: один fill на тик, top-of-book, с комиссией
public sealed class ExecutionService
{
private readonly AppDbContext _db;
private readonly IFeeModel _fees;
public ExecutionService(AppDbContext db, IFeeModel fees)
{
_db = db;
_fees = fees;
}
public async Task<Fill?> TryExecuteAtTopOfBookAsync(Guid orderId, MarketTick tick, CancellationToken ct)
{
// 1) Загружаем ордер + позицию (простое: отдельные запросы)
var order = await _db.Orders.SingleAsync(o => o.Id == orderId, ct);
if (order.Symbol != tick.Symbol) return null;
var remaining = order.Qty - order.FilledQty;
if (remaining <= 0) return null;
// 2) Определяем цену и доступность по top-of-book
decimal px, avail;
if (order.Side == Side.Buy)
{
px = tick.BestAskPx;
avail = tick.BestAskQty;
}
else
{
px = tick.BestBidPx;
avail = tick.BestBidQty;
}
if (px <= 0 || avail <= 0) return null;
// 3) Один fill на тик: берём минимум из remaining и avail
var fillQty = Math.Min(remaining, avail);
if (fillQty <= 0) return null;
// 4) ExecutionKey = orderId+tickId
var execKey = $"{order.Id:N}:{tick.TickId}";
// 5) Проверка “уже исполняли на этом тике?”
// (самое простое: сначала быстрый запрос, затем уникальный индекс подстрахует)
var already = await _db.Fills.AnyAsync(f => f.ExecutionKey == execKey, ct);
if (already) return null;
// 6) Создаём fill + комиссия
var fee = _fees.CalcFee(fillQty, px, order.Side);
var fill = new Fill
{
Id = Guid.NewGuid(),
OrderId = order.Id,
Symbol = order.Symbol,
Side = order.Side,
Qty = fillQty,
Price = px,
Fee = fee,
Ts = tick.Ts,
ExecutionKey = execKey
};
_db.Fills.Add(fill);
// 7) Обновляем ордер
order.FilledQty += fillQty;
if (order.FilledQty >= order.Qty)
order.ClosedTs = tick.Ts;
// 8) Обновляем позицию (простая логика: средняя цена для наращивания, кросс — уменьшает/разворачивает)
var pos = await _db.Positions.SingleOrDefaultAsync(p => p.Symbol == order.Symbol, ct)
?? new Position { Id = Guid.NewGuid(), Symbol = order.Symbol, Qty = 0, AvgPx = 0 };
ApplyFillToPositionSimple(pos, order.Side, fillQty, px, fee, tick.Ts);
if (_db.Entry(pos).State == EntityState.Detached)
_db.Positions.Add(pos);
// 9) Сохраняем
// Если два потока всё же попытаются вставить один и тот же ExecutionKey,
// БД отобьёт (unique violation). Можно поймать и вернуть null.
try
{
await _db.SaveChangesAsync(ct);
}
catch (DbUpdateException)
{
// максимально просто: считаем, что это дубль (по индексу) и "не исполнили"
return null;
}
return fill;
}
private static void ApplyFillToPositionSimple(Position pos, Side side, decimal qty, decimal px, decimal fee, DateTimeOffset ts)
{
// Комиссия всегда увеличивает FeesPaid
pos.FeesPaid += fee;
var signed = (side == Side.Buy) ? qty : -qty;
// Если позиция нулевая — просто открываем
if (pos.Qty == 0)
{
pos.Qty = signed;
pos.AvgPx = px;
pos.UpdatedTs = ts;
return;
}
// Если сделка в ту же сторону — пересчёт средней
if ((pos.Qty > 0 && signed > 0) || (pos.Qty < 0 && signed < 0))
{
var newQty = pos.Qty + signed;
var wavg = (Math.Abs(pos.Qty) * pos.AvgPx + Math.Abs(signed) * px) / Math.Abs(newQty);
pos.Qty = newQty;
pos.AvgPx = wavg;
pos.UpdatedTs = ts;
return;
}
// Иначе это уменьшение/переворот. Для “самого простого”:
// - RealizedPnl считаем при закрытии части по разнице цен
// - Если переворот — новая AvgPx = px на оставшийся объём
var closingQty = Math.Min(Math.Abs(pos.Qty), Math.Abs(signed));
// PnL: для long закрытие по sell: (px - avg) * closeQty
// для short закрытие по buy: (avg - px) * closeQty
if (pos.Qty > 0) // long, сделка sell
pos.RealizedPnl += (px - pos.AvgPx) * closingQty;
else // short, сделка buy
pos.RealizedPnl += (pos.AvgPx - px) * closingQty;
var newPosQty = pos.Qty + signed; // signed противоположен знаку pos.Qty
if (newPosQty == 0)
{
pos.Qty = 0;
pos.AvgPx = 0;
}
else
{
// переворот: остаток открывается по цене текущего fill
pos.Qty = newPosQty;
pos.AvgPx = px;
}
pos.UpdatedTs = ts;
}
}Что именно “простое решение” гарантирует/не гарантирует
Гарантирует:
- один
FillнаorderId + tickId; - комиссия учитывается и копится в
Position.FeesPaid; - PnL по закрываемой части считается без проскальзывания.
Не гарантирует (если будет параллелизм):
- идеальную согласованность
order.FilledQtyпри одновременном исполнении одного ордера разными потоками (для этого уже нужен lock/rowversion+retry). Но при “самом простом” часто достаточно однопоточного исполнителя на символ/ордер.
Если хотите остаться “простыми”, но сделать надёжнее в concurrency, следующий минимальный шаг — добавить RowVersion в Order и Position и retry при DbUpdateConcurrencyException.
Все что сделано меня уже устраивает.
Надо с начала это все переварить.
Понял. Если захотите позже, можно будет:
- прогнать это на небольшом наборе тест-кейсов (частичное исполнение, переворот, повторный тик, комиссия);
- или накидать минимальные unit-тесты, чтобы зафиксировать логику и не бояться рефакторинга.
Когда переварите и появятся вопросы — приносите конкретный кейс (набор Fill-ов и ожидаемый итог по Qty/AvgPx/RealizedPnl/FeesPaid), разберём.