пятница, 6 февраля 2026 г.

Chad, EventHub, TradingEntities, Part3

Chad, EventHub, TradingEntities, Part3

26.02.07 EventHub, TradingEntities

https://ask.chadgpt.ru/share/b7d47f07-c985-48df-bbbc-70622e419623

Отличный сценарий для BackgroundService + Singleton Bus: по сути это in-process event bus/mediator между конвейерными стадиями.

Ниже — практичные рекомендации, чтобы это работало предсказуемо в ASP.NET Core и не “стреляло в ногу” по потокам, порядку, остановке и памяти.

1) Разделите “команды” и “события”

У вас есть два типа сообщений:

  • События (fan-out): QuoteReceived, TradeExecuted — их могут слушать несколько сервисов (например, InMemoryDB и EFDB одновременно).
  • Команды (point-to-point): ExecuteOrder — обычно должен обработать один исполнитель.

Если оставить всё как “топики” — тоже можно, но договоритесь:

  • quotes — событие (много подписчиков допустимо)
  • orders.execute — команда (ожидается один подписчик)
  • trades — событие
  • positions.updated — событие

Тогда вы избежите случайных “двойных исполнений”.

2) Самое важное: не делайте тяжёлую работу внутри обработчика Bus

Если у вас Publish вызывает подписчиков напрямую, то:

  • синхронный publish будет блокировать генератора котировок,
  • асинхронный publish начнёт выполнять обработчики параллельно и может породить лавину задач,
  • ошибки/зависания обработчика будут влиять на остальных (в зависимости от реализации).

Практический паттерн для BackgroundService:

  • подписчик минимально делает channel.Writer.TryWrite(...)/WriteAsync(...) в свой входной канал,
  • а вся тяжёлая работа делается в ExecuteAsync сервиса, читая из канала.

То есть Bus становится “маршрутизатором”, а BackgroundService — “воркером”.

3) Сделайте входные очереди bounded + стратегия backpressure

Для котировок особенно важно: поток может быть выше, чем скорость стратегии/EF.

Рекомендация:

  • Quotes -> Strategy: bounded канал, при переполнении либо DropOldest (если вам важны свежие котировки), либо блокировать (если важна каждая).
  • Orders -> Executor: bounded, обычно без дропа (каждый ордер важен).
  • Trades -> Positions: bounded, без дропа.
  • Events -> EF: bounded, можно блокировать или буферизовать, но лучше не терять.

В ASP.NET Core это обычно System.Threading.Channels с BoundedChannelOptions.

4) Про порядок сообщений

События по разным топикам/подписчикам почти неизбежно придут “в разное время”.
Если порядок критичен:

  • внутри одного потока (например, QuotesService) добавляйте Sequence/Timestamp,
  • Strategy/Executor/Positions учитывают sequence и могут отбрасывать “устаревшее”.

Особенно если вы решите сделать обработчики параллельными.

5) Обработка ошибок: локально и глобально

У вас уже есть onTopicFault — это хорошо как “предохранитель”.
Но ещё нужно:

  • чтобы каждый BackgroundService логировал и решал: продолжать или останавливать pipeline.
  • для EF-сохранения — retry/backoff (например, 5 попыток с задержкой), и отдельный dead-letter (хотя бы лог + метрика).

Важно: если EF упал, не обязательно останавливать Strategy/Executor. Часто лучше продолжить торговать, а сохранение догонит позже (или деградировать).

6) DI и Scoped зависимости в Singleton Bus

Bus — singleton, а DbContext — scoped. Значит:

  • не инжектите DbContext в Bus/подписки напрямую (подписка живёт долго),
  • в BackgroundServiceEntityFrameworkDataBase используйте IServiceScopeFactory и создавайте scope на обработку батча/сообщения.

Иначе получите утечки/ошибки lifetime.

7) Остановка приложения (graceful shutdown)

Нужно, чтобы при остановке:

  • Quotes перестал генерить,
  • Strategy/Executor/Positions дочитали свои каналы (или остановились по токену),
  • InMemory/EF успели сбросить очереди (по возможности).

Практика:

  • StopAsync: завершать writer’ы каналов (TryComplete) и ждать drain с таймаутом.
  • ExecuteAsync: await foreach по ReadAllAsync(stoppingToken) + try/catch.

8) Дублирование доставки (at-least-once) и идемпотентность

В in-process bus обычно “at-most-once”, но при ошибках/перезапусках сервисов вы всё равно столкнётесь с:

  • повторной генерацией котировок,
  • повторной обработкой трейда при ретраях EF.

Поэтому полезно:

  • иметь Id у Order/Trade,
  • в EF хранить уникальные ключи и делать idempotent insert/update.

9) Не смешивайте “данные для визуализации” и “источник истины”

Ваш InMemoryDataBase — отлично как read-model/кэш для UI.
Но решите заранее:

  • является ли EF “истиной” и InMemory — производным,
  • или InMemory — первичен, а EF — реплика.

Это влияет на то, что делать при падении EF/задержках.


Если хотите, я могу предложить конкретный скелет кода под ваш сценарий: типы сообщений (Quote, Order, Trade), имена топиков, и шаблон каждого BackgroundService с входным Channel<T> + подпиской на Bus. Скажите, котировки должны идти “каждая” или достаточно последней (drop-oldest/latest)? И нужна ли параллельная обработка ордеров исполнителем (N-воркеров) или строго последовательно?

"Если хотите, я могу предложить конкретный скелет кода под ваш сценарий: типы сообщений (Quote, Order, Trade), имена топиков, и шаблон каждого BackgroundService с входным Channel<T> + подпиской на Bus." - это было бы хорошо.
----------
Уточнение.
Мы создаем полный цикл котировка->Strategy->order->executor->trade->position->(open_position, fix_profit, open_profit)->расчет_equity-> sharp-коефициент.
То есть мы сами создаем все текущие сущности.
В этом проекте мы не работаем с реальными рыночными данными.
Все что нам необходимо (рыночные сущности, excutors,quotes) мы генерируем сами.
Наша задача проверить как работает вся цепочка без реального рынка.
- online trade entities в InMemoryDataBase для текущей визуализации без истории
- offline trade entities в EntityFrameworkMemoryDataBase для текущей визуализации offline с историией - персистентное хранилище.
Пока цель только тестирование всего полного цикла торговли.
Когда этот этап будет работать правильно мы подключим реальные котировки и рыночные executors
Примерная схема БД
Tickers
Strategies - каждая стратегия имеет Ticker, Position, Orders, Trades
Orders
Trades
Position (CurrentPosition, RealizedProfit, UnrealizedProfit)
-------
Примерно так

Ниже — рабочий “скелет” под ваш полный цикл Quote → Strategy → Order → Executor → Trade → Position → PnL → Equity → Sharpe в виде нескольких BackgroundService + простая in-process BusLibrary на Channel<T>. Он заточен под ваш текущий режим: всё генерим сами, без внешнего рынка/брокера, с online (InMemory) для живой визуализации и offline (EF persistent) для истории/оффлайн просмотра.

Дам минимально-целостный набор файлов/типов; дальше вы сможете наращивать доменную модель.


1) Контракты сообщений (Bus messages)

C#
Копировать код
public interface IBusMessage
{
    DateTimeOffset Ts { get; }
}

public sealed record Quote(
    string Ticker,
    decimal Bid,
    decimal Ask,
    DateTimeOffset Ts
) : IBusMessage
{
    public decimal Mid => (Bid + Ask) / 2m;
}

public enum OrderSide { Buy, Sell }

public sealed record Order(
    Guid OrderId,
    string StrategyId,
    string Ticker,
    OrderSide Side,
    decimal Quantity,
    decimal? LimitPrice,
    DateTimeOffset Ts
) : IBusMessage;

public sealed record Trade(
    Guid TradeId,
    Guid OrderId,
    string StrategyId,
    string Ticker,
    OrderSide Side,
    decimal Quantity,
    decimal Price,
    DateTimeOffset Ts
) : IBusMessage;

public sealed record PositionSnapshot(
    string StrategyId,
    string Ticker,
    decimal NetQuantity,
    decimal AvgPrice,
    decimal RealizedPnL,
    decimal UnrealizedPnL,
    decimal LastPrice,
    DateTimeOffset Ts
) : IBusMessage;

public sealed record EquityPoint(
    string StrategyId,
    decimal Equity,
    DateTimeOffset Ts
) : IBusMessage;

public sealed record SharpePoint(
    string StrategyId,
    double Sharpe,
    DateTimeOffset Ts
) : IBusMessage;

2) “BusLibrary”: топики на Channel<T>

Идея: Publish<T> пушит во все подписки на тип T. Подписчик получает ChannelReader<T>.

Важно: для котировок можно включить режим “последнее значение” (drop old), но ниже — базовый вариант. Потом добавим оптимизацию.

C#
Копировать код
using System.Collections.Concurrent;
using System.Threading.Channels;

public interface IBus
{
    ValueTask Publish<T>(T msg, CancellationToken ct = default);
    ChannelReader<T> Subscribe<T>(string name, int capacity = 1024, BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait);
}

public sealed class InProcessBus : IBus
{
    private readonly ConcurrentDictionary<Type, ConcurrentBag<object>> _subscribers = new();

    public ChannelReader<T> Subscribe<T>(string name, int capacity = 1024, BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait)
    {
        var ch = Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)
        {
            SingleReader = true,
            SingleWriter = false,
            FullMode = fullMode
        });

        var bag = _subscribers.GetOrAdd(typeof(T), _ => new ConcurrentBag<object>());
        bag.Add(ch);

        return ch.Reader;
    }

    public async ValueTask Publish<T>(T msg, CancellationToken ct = default)
    {
        if (!_subscribers.TryGetValue(typeof(T), out var bag)) return;

        foreach (var obj in bag)
        {
            if (obj is Channel<T> ch)
            {
                // Wait-режим: backpressure. Для котировок часто лучше DropOldest.
                await ch.Writer.WriteAsync(msg, ct);
            }
        }
    }

    // Дополнительно для graceful shutdown можно сделать CompleteAll()
}

3) Online / Offline хранилища (проекции)

3.1 InMemoryDataBase для “живой” визуализации без истории

Храним только текущие состояния/последние значения.

C#
Копировать код
using System.Collections.Concurrent;

public sealed class InMemoryDataBase
{
    public ConcurrentDictionary<string, Quote> LastQuotes { get; } = new();
    public ConcurrentDictionary<(string StrategyId, string Ticker), PositionSnapshot> Positions { get; } = new();
    public ConcurrentDictionary<string, EquityPoint> LastEquity { get; } = new();
    public ConcurrentDictionary<string, SharpePoint> LastSharpe { get; } = new();
}

3.2 EF “персистентное” хранилище для истории

Вы сказали EntityFrameworkMemoryDataBase, но вам нужна персистентность — значит обычно SQLite (файл) или Postgres. Для теста можно SQLite.

Скелет сущностей (упрощённо):

C#
Копировать код
using Microsoft.EntityFrameworkCore;

public sealed class TradingDbContext : DbContext
{
    public TradingDbContext(DbContextOptions<TradingDbContext> options) : base(options) { }

    public DbSet<QuoteEntity> Quotes => Set<QuoteEntity>();
    public DbSet<OrderEntity> Orders => Set<OrderEntity>();
    public DbSet<TradeEntity> Trades => Set<TradeEntity>();
    public DbSet<PositionPointEntity> PositionPoints => Set<PositionPointEntity>();
    public DbSet<EquityEntity> Equities => Set<EquityEntity>();
    public DbSet<SharpeEntity> Sharpes => Set<SharpeEntity>();

    protected override void OnModelCreating(ModelBuilder b)
    {
        b.Entity<OrderEntity>().HasKey(x => x.OrderId);
        b.Entity<TradeEntity>().HasKey(x => x.TradeId);

        b.Entity<TradeEntity>()
            .HasIndex(x => new { x.OrderId, x.TradeId })
            .IsUnique();

        // индексы по StrategyId/Ticker/Ts для быстрых графиков
        b.Entity<EquityEntity>().HasIndex(x => new { x.StrategyId, x.Ts });
        b.Entity<SharpeEntity>().HasIndex(x => new { x.StrategyId, x.Ts });
        b.Entity<PositionPointEntity>().HasIndex(x => new { x.StrategyId, x.Ticker, x.Ts });
    }
}

public sealed class QuoteEntity
{
    public long Id { get; set; }
    public string Ticker { get; set; } = default!;
    public decimal Bid { get; set; }
    public decimal Ask { get; set; }
    public DateTimeOffset Ts { get; set; }
}

public sealed class OrderEntity
{
    public Guid OrderId { get; set; }
    public string StrategyId { get; set; } = default!;
    public string Ticker { get; set; } = default!;
    public OrderSide Side { get; set; }
    public decimal Quantity { get; set; }
    public decimal? LimitPrice { get; set; }
    public DateTimeOffset Ts { get; set; }
}

public sealed class TradeEntity
{
    public Guid TradeId { get; set; }
    public Guid OrderId { get; set; }
    public string StrategyId { get; set; } = default!;
    public string Ticker { get; set; } = default!;
    public OrderSide Side { get; set; }
    public decimal Quantity { get; set; }
    public decimal Price { get; set; }
    public DateTimeOffset Ts { get; set; }
}

public sealed class PositionPointEntity
{
    public long Id { get; set; }
    public string StrategyId { get; set; } = default!;
    public string Ticker { get; set; } = default!;
    public decimal NetQuantity { get; set; }
    public decimal AvgPrice { get; set; }
    public decimal RealizedPnL { get; set; }
    public decimal UnrealizedPnL { get; set; }
    public decimal LastPrice { get; set; }
    public DateTimeOffset Ts { get; set; }
}

public sealed class EquityEntity
{
    public long Id { get; set; }
    public string StrategyId { get; set; } = default!;
    public decimal Equity { get; set; }
    public DateTimeOffset Ts { get; set; }
}

public sealed class SharpeEntity
{
    public long Id { get; set; }
    public string StrategyId { get; set; } = default!;
    public double Sharpe { get; set; }
    public DateTimeOffset Ts { get; set; }
}

4) BackgroundService: генератор котировок (QuoteGeneratorService)

Генерация “рынка”: рандом-волк (упрощённо). Публикуем Quote, а online db обновляем последним.

C#
Копировать код
using Microsoft.Extensions.Hosting;

public sealed class QuoteGeneratorService : BackgroundService
{
    private readonly IBus _bus;
    private readonly InMemoryDataBase _mem;
    private readonly string[] _tickers;
    private readonly Random _rng = new(1);

    private readonly Dictionary<string, decimal> _mid = new();

    public QuoteGeneratorService(IBus bus, InMemoryDataBase mem, IEnumerable<string> tickers)
    {
        _bus = bus;
        _mem = mem;
        _tickers = tickers.ToArray();
        foreach (var t in _tickers) _mid[t] = 100m;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var now = DateTimeOffset.UtcNow;

            foreach (var t in _tickers)
            {
                var step = (decimal)(_rng.NextDouble() - 0.5) * 0.2m;   // шум
                _mid[t] = Math.Max(1m, _mid[t] + step);

                var spread = 0.02m;
                var q = new Quote(t, _mid[t] - spread / 2m, _mid[t] + spread / 2m, now);

                _mem.LastQuotes[t] = q;
                await _bus.Publish(q, stoppingToken);
            }

            await Task.Delay(50, stoppingToken); // 20 Hz на тикер (пример)
        }
    }
}

5) StrategyService: Quote → Order

Стратегия читает quotes по подписке, принимает решение, генерит Order.

Для примера: простая mean-reversion/threshold.

C#
Копировать код
using Microsoft.Extensions.Hosting;
using System.Threading.Channels;

public sealed class StrategyService : BackgroundService
{
    private readonly IBus _bus;
    private readonly ChannelReader<Quote> _quotes;

    private readonly string _strategyId;
    private readonly string _ticker;

    private decimal _lastMid;
    private bool _hasLast;

    public StrategyService(IBus bus, string strategyId, string ticker)
    {
        _bus = bus;
        _strategyId = strategyId;
        _ticker = ticker;

        _quotes = bus.Subscribe<Quote>($"strategy:{strategyId}", capacity: 2048, fullMode: BoundedChannelFullMode.DropOldest);
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (var q in _quotes.ReadAllAsync(stoppingToken))
        {
            if (q.Ticker != _ticker) continue;

            var mid = q.Mid;
            if (!_hasLast)
            {
                _lastMid = mid;
                _hasLast = true;
                continue;
            }

            var delta = mid - _lastMid;
            _lastMid = mid;

            // Игрушечное правило: если цена упала сильно — покупаем, выросла — продаём
            const decimal threshold = 0.05m;
            if (delta <= -threshold)
            {
                var order = new Order(
                    OrderId: Guid.NewGuid(),
                    StrategyId: _strategyId,
                    Ticker: _ticker,
                    Side: OrderSide.Buy,
                    Quantity: 1m,
                    LimitPrice: null,
                    Ts: q.Ts
                );
                await _bus.Publish(order, stoppingToken);
            }
            else if (delta >= threshold)
            {
                var order = new Order(
                    Guid.NewGuid(), _strategyId, _ticker, OrderSide.Sell, 1m, null, q.Ts
                );
                await _bus.Publish(order, stoppingToken);
            }
        }
    }
}

6) ExecutorService: Order → Trade

Executor — это ваш “симулятор биржи”: исполняет по Mid/Ask/Bid из последней котировки.

C#
Копировать код
using Microsoft.Extensions.Hosting;
using System.Threading.Channels;

public sealed class ExecutorService : BackgroundService
{
    private readonly IBus _bus;
    private readonly InMemoryDataBase _mem;
    private readonly ChannelReader<Order> _orders;

    public ExecutorService(IBus bus, InMemoryDataBase mem)
    {
        _bus = bus;
        _mem = mem;
        _orders = bus.Subscribe<Order>("executor", capacity: 4096, fullMode: BoundedChannelFullMode.Wait);
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (var o in _orders.ReadAllAsync(stoppingToken))
        {
            if (!_mem.LastQuotes.TryGetValue(o.Ticker, out var q))
                continue; // или ждать появления котировки

            var price = o.Side == OrderSide.Buy ? q.Ask : q.Bid;

            var trade = new Trade(
                TradeId: Guid.NewGuid(),
                OrderId: o.OrderId,
                StrategyId: o.StrategyId,
                Ticker: o.Ticker,
                Side: o.Side,
                Quantity: o.Quantity,
                Price: price,
                Ts: DateTimeOffset.UtcNow
            );

            await _bus.Publish(trade, stoppingToken);
        }
    }
}

7) PositionService: Trade → PositionSnapshot (+ PnL)

Сводим сделки в позицию (net qty, avg price), считаем realized/unrealized по last price.

C#
Копировать код
using Microsoft.Extensions.Hosting;
using System.Threading.Channels;

public sealed class PositionService : BackgroundService
{
    private readonly IBus _bus;
    private readonly InMemoryDataBase _mem;
    private readonly ChannelReader<Trade> _trades;
    private readonly ChannelReader<Quote> _quotes;

    private sealed class State
    {
        public decimal NetQty;
        public decimal AvgPrice;      // по открытой части
        public decimal RealizedPnL;
        public decimal LastPrice;
    }

    private readonly Dictionary<(string StrategyId, string Ticker), State> _state = new();

    public PositionService(IBus bus, InMemoryDataBase mem)
    {
        _bus = bus;
        _mem = mem;

        _trades = bus.Subscribe<Trade>("positions:trades", capacity: 4096);
        _quotes = bus.Subscribe<Quote>("positions:quotes", capacity: 4096, fullMode: BoundedChannelFullMode.DropOldest);
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // Два входа: trades и quotes. Проще всего — объединить через отдельные задачи.
        var t1 = Task.Run(() => ConsumeTrades(stoppingToken), stoppingToken);
        var t2 = Task.Run(() => ConsumeQuotes(stoppingToken), stoppingToken);
        await Task.WhenAll(t1, t2);
    }

    private async Task ConsumeQuotes(CancellationToken ct)
    {
        await foreach (var q in _quotes.ReadAllAsync(ct))
        {
            foreach (var key in _state.Keys.Where(k => k.Ticker == q.Ticker).ToArray())
            {
                _state[key].LastPrice = q.Mid;
                PublishSnapshot(key, q.Ts, ct);
            }
        }
    }

    private async Task ConsumeTrades(CancellationToken ct)
    {
        await foreach (var tr in _trades.ReadAllAsync(ct))
        {
            var key = (tr.StrategyId, tr.Ticker);
            if (!_state.TryGetValue(key, out var s))
            {
                s = new State { LastPrice = tr.Price };
                _state[key] = s;
            }

            var signedQty = tr.Side == OrderSide.Buy ? tr.Quantity : -tr.Quantity;

            // Реализация: если меняем знак/уменьшаем позицию — часть PnL реализуется
            if (s.NetQty == 0 || Math.Sign(s.NetQty) == Math.Sign(signedQty))
            {
                // наращиваем в ту же сторону
                var newQty = s.NetQty + signedQty;
                var totalCost = s.AvgPrice * Math.Abs(s.NetQty) + tr.Price * Math.Abs(signedQty);
                s.AvgPrice = Math.Abs(newQty) == 0 ? 0 : totalCost / Math.Abs(newQty);
                s.NetQty = newQty;
            }
            else
            {
                // закрываем часть/всё
                var closingQty = Math.Min(Math.Abs(s.NetQty), Math.Abs(signedQty));
                var pnlPerUnit = (tr.Price - s.AvgPrice) * Math.Sign(s.NetQty); // знак позиции
                s.RealizedPnL += pnlPerUnit * closingQty;

                s.NetQty += signedQty;

                if (s.NetQty == 0)
                {
                    s.AvgPrice = 0;
                }
                else if (Math.Sign(s.NetQty) == Math.Sign(signedQty))
                {
                    // переворот: оставшаяся часть открыта по цене сделки
                    s.AvgPrice = tr.Price;
                }
            }

            s.LastPrice = tr.Price;
            await PublishSnapshotAsync(key, tr.Ts, ct);
        }
    }

    private void PublishSnapshot((string StrategyId, string Ticker) key, DateTimeOffset ts, CancellationToken ct)
        => PublishSnapshotAsync(key, ts, ct).GetAwaiter().GetResult();

    private async Task PublishSnapshotAsync((string StrategyId, string Ticker) key, DateTimeOffset ts, CancellationToken ct)
    {
        var s = _state[key];
        var unreal = s.NetQty == 0 ? 0m : (s.LastPrice - s.AvgPrice) * s.NetQty;

        var snap = new PositionSnapshot(
            StrategyId: key.StrategyId,
            Ticker: key.Ticker,
            NetQuantity: s.NetQty,
            AvgPrice: s.AvgPrice,
            RealizedPnL: s.RealizedPnL,
            UnrealizedPnL: unreal,
            LastPrice: s.LastPrice,
            Ts: ts
        );

        _mem.Positions[(key.StrategyId, key.Ticker)] = snap;
        await _bus.Publish(snap, ct);
    }
}

8) EquityService: PositionSnapshot → EquityPoint

У вас может быть несколько тикеров на стратегию — суммируйте PnL + base capital.

C#
Копировать код
using Microsoft.Extensions.Hosting;
using System.Threading.Channels;

public sealed class EquityService : BackgroundService
{
    private readonly IBus _bus;
    private readonly InMemoryDataBase _mem;
    private readonly ChannelReader<PositionSnapshot> _pos;

    private readonly decimal _initialCapital;

    private readonly Dictionary<string, decimal> _lastEquity = new();
    private readonly Dictionary<string, Dictionary<string, PositionSnapshot>> _byStrat = new();

    public EquityService(IBus bus, InMemoryDataBase mem, decimal initialCapital = 10_000m)
    {
        _bus = bus;
        _mem = mem;
        _initialCapital = initialCapital;
        _pos = bus.Subscribe<PositionSnapshot>("equity", capacity: 4096);
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (var p in _pos.ReadAllAsync(stoppingToken))
        {
            if (!_byStrat.TryGetValue(p.StrategyId, out var dict))
            {
                dict = new Dictionary<string, PositionSnapshot>();
                _byStrat[p.StrategyId] = dict;
            }

            dict[p.Ticker] = p;

            var totalPnL = dict.Values.Sum(x => x.RealizedPnL + x.UnrealizedPnL);
            var equity = _initialCapital + totalPnL;

            var point = new EquityPoint(p.StrategyId, equity, p.Ts);
            _mem.LastEquity[p.StrategyId] = point;
            await _bus.Publish(point, stoppingToken);
        }
    }
}

9) SharpeService: EquityPoint → SharpePoint

Для теста: считаем по доходностям за окно (например, 200 точек). Это “поточный” Sharpe без годовых коэффициентов.

C#
Копировать код
using Microsoft.Extensions.Hosting;
using System.Threading.Channels;

public sealed class SharpeService : BackgroundService
{
    private readonly IBus _bus;
    private readonly InMemoryDataBase _mem;
    private readonly ChannelReader<EquityPoint> _eq;

    private readonly int _window;
    private readonly Dictionary<string, Queue<double>> _rets = new();
    private readonly Dictionary<string, decimal> _lastEq = new();

    public SharpeService(IBus bus, InMemoryDataBase mem, int window = 200)
    {
        _bus = bus;
        _mem = mem;
        _window = window;
        _eq = bus.Subscribe<EquityPoint>("sharpe", capacity: 4096);
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (var e in _eq.ReadAllAsync(stoppingToken))
        {
            if (_lastEq.TryGetValue(e.StrategyId, out var prev) && prev > 0)
            {
                var r = (double)((e.Equity - prev) / prev);

                if (!_rets.TryGetValue(e.StrategyId, out var q))
                {
                    q = new Queue<double>(_window + 1);
                    _rets[e.StrategyId] = q;
                }

                q.Enqueue(r);
                while (q.Count > _window) q.Dequeue();

                var arr = q.ToArray();
                var mean = arr.Average();
                var var = arr.Length > 1 ? arr.Select(x => (x - mean) * (x - mean)).Sum() / (arr.Length - 1) : 0.0;
                var std = Math.Sqrt(var);

                var sharpe = std == 0 ? 0.0 : mean / std;

                var sp = new SharpePoint(e.StrategyId, sharpe, e.Ts);
                _mem.LastSharpe[e.StrategyId] = sp;
                await _bus.Publish(sp, stoppingToken);
            }

            _lastEq[e.StrategyId] = e.Equity;
        }
    }
}

10) Offline persistence: подписчики-проекции в EF (Quotes/Orders/Trades/Positions/Equity/Sharpe)

Лучше делать отдельные writer-сервисы (или один батчер), чтобы не тормозить торговый контур. Скелет батчера (один сервис на всё):

C#
Копировать код
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System.Threading.Channels;

public sealed class EfPersistenceService : BackgroundService
{
    private readonly IServiceScopeFactory _scopeFactory;
    private readonly ChannelReader<Quote> _quotes;
    private readonly ChannelReader<Order> _orders;
    private readonly ChannelReader<Trade> _trades;
    private readonly ChannelReader<PositionSnapshot> _pos;
    private readonly ChannelReader<EquityPoint> _eq;
    private readonly ChannelReader<SharpePoint> _sh;

    public EfPersistenceService(IBus bus, IServiceScopeFactory scopeFactory)
    {
        _scopeFactory = scopeFactory;
        _quotes = bus.Subscribe<Quote>("ef:quotes", capacity: 10_000, fullMode: BoundedChannelFullMode.DropOldest);
        _orders = bus.Subscribe<Order>("ef:orders", capacity: 10_000);
        _trades = bus.Subscribe<Trade>("ef:trades", capacity: 10_000);
        _pos = bus.Subscribe<PositionSnapshot>("ef:pos", capacity: 10_000, fullMode: BoundedChannelFullMode.DropOldest);
        _eq = bus.Subscribe<EquityPoint>("ef:eq", capacity: 10_000, fullMode: BoundedChannelFullMode.DropOldest);
        _sh = bus.Subscribe<SharpePoint>("ef:sh", capacity: 10_000, fullMode: BoundedChannelFullMode.DropOldest);
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var batchQuotes = new List<QuoteEntity>(200);
        var batchPos = new List<PositionPointEntity>(200);
        var batchEq = new List<EquityEntity>(200);
        var batchSh = new List<SharpeEntity>(200);
        var batchOrders = new List<OrderEntity>(200);
        var batchTrades = new List<TradeEntity>(200);

        while (!stoppingToken.IsCancellationRequested)
        {
            // Неблокирующая выборка "сколько успели" + flush по таймеру
            Drain(_quotes, batchQuotes, q => new QuoteEntity { Ticker = q.Ticker, Bid = q.Bid, Ask = q.Ask, Ts = q.Ts }, 200);
            Drain(_pos, batchPos, p => new PositionPointEntity
            {
                StrategyId = p.StrategyId, Ticker = p.Ticker,
                NetQuantity = p.NetQuantity, AvgPrice = p.AvgPrice,
                RealizedPnL = p.RealizedPnL, UnrealizedPnL = p.UnrealizedPnL,
                LastPrice = p.LastPrice, Ts = p.Ts
            }, 200);
            Drain(_eq, batchEq, e => new EquityEntity { StrategyId = e.StrategyId, Equity = e.Equity, Ts = e.Ts }, 200);
            Drain(_sh, batchSh, s => new SharpeEntity { StrategyId = s.StrategyId, Sharpe = s.Sharpe, Ts = s.Ts }, 200);
            Drain(_orders, batchOrders, o => new OrderEntity
            {
                OrderId = o.OrderId, StrategyId = o.StrategyId, Ticker = o.Ticker,
                Side = o.Side, Quantity = o.Quantity, LimitPrice = o.LimitPrice, Ts = o.Ts
            }, 200);
            Drain(_trades, batchTrades, t => new TradeEntity
            {
                TradeId = t.TradeId, OrderId = t.OrderId, StrategyId = t.StrategyId, Ticker = t.Ticker,
                Side = t.Side, Quantity = t.Quantity, Price = t.Price, Ts = t.Ts
            }, 200);

            if (batchQuotes.Count + batchPos.Count + batchEq.Count + batchSh.Count + batchOrders.Count + batchTrades.Count > 0)
            {
                using var scope = _scopeFactory.CreateScope();
                var db = scope.ServiceProvider.GetRequiredService<TradingDbContext>();

                db.AddRange(batchQuotes); batchQuotes.Clear();
                db.AddRange(batchPos); batchPos.Clear();
                db.AddRange(batchEq); batchEq.Clear();
                db.AddRange(batchSh); batchSh.Clear();
                db.AddRange(batchOrders); batchOrders.Clear();
                db.AddRange(batchTrades); batchTrades.Clear();

                await db.SaveChangesAsync(stoppingToken);
            }

            await Task.Delay(200, stoppingToken);
        }
    }

    private static void Drain<TMsg, TEntity>(
        ChannelReader<TMsg> reader,
        List<TEntity> batch,
        Func<TMsg, TEntity> map,
        int max)
    {
        while (batch.Count < max && reader.TryRead(out var msg))
            batch.Add(map(msg));
    }
}

11) Регистрация в DI (Program.cs)

Примерно так (SQLite файл как persistent). Если вы реально хотите EF InMemory provider — замените на UseInMemoryDatabase, но это не персистентно.

C#
Копировать код
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

var host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        services.AddSingleton<IBus, InProcessBus>();
        services.AddSingleton<InMemoryDataBase>();

        services.AddDbContext<TradingDbContext>(opt =>
            opt.UseSqlite("Data Source=trading.db"));

        // миграции/ensure created (для прототипа)
        services.AddHostedService<DbInitService>();

        // Market sim
        services.AddSingleton<IEnumerable<string>>(_ => new[] { "AAPL", "MSFT" });
        services.AddHostedService<QuoteGeneratorService>();

        // Strategy(ии)
        services.AddHostedService(sp => new StrategyService(
            sp.GetRequiredService<IBus>(), strategyId: "S1", ticker: "AAPL"));

        services.AddHostedService<ExecutorService>();
        services.AddHostedService<PositionService>();
        services.AddHostedService(sp => new EquityService(sp.GetRequiredService<IBus>(), sp.GetRequiredService<InMemoryDataBase>(), 10_000m));
        services.AddHostedService<SharpeService>();

        // Offline persistence projection
        services.AddHostedService<EfPersistenceService>();
    })
    .Build();

await host.RunAsync();

public sealed class DbInitService : IHostedService
{
    private readonly IServiceScopeFactory _scopeFactory;
    public DbInitService(IServiceScopeFactory scopeFactory) => _scopeFactory = scopeFactory;

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        using var scope = _scopeFactory.CreateScope();
        var db = scope.ServiceProvider.GetRequiredService<TradingDbContext>();
        await db.Database.EnsureCreatedAsync(cancellationToken);
    }

    public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}

12) Важные настройки именно для вашего кейса (быстро и по делу)

Для “полного цикла” симуляции почти всегда нужно развести режимы очередей:

  • Quote и производные “визуализационные” (PositionSnapshot, EquityPoint, SharpePoint) — DropOldest (нужна последняя картинка, а не каждая точка).
  • Order и TradeWait (нельзя терять).

Сейчас в примере это учтено частично.


Уточняющие вопросы (чтобы подогнать скелет под вашу схему БД и требования)

  1. Одна стратегия = один тикер, или стратегия может торговать несколько тикеров?
  2. Orders/Trades вам нужны в online InMemory тоже (для UI “лента”), или достаточно только позиции/эквити?
  3. Исполнение: всегда market по Bid/Ask, или хотите поддержать LimitPrice и частичное исполнение/задержку?
  4. Для offline истории: хотите хранить все quotes, или достаточно downsample (например, 1/sec), чтобы база не разрасталась?

Ответьте на эти 4 пункта — и я подстрою:

  • модель Strategies/Tickers ближе к вашей таблице,
  • routing (топики) и ключи,
  • и сделаю более “правильный” Position (в т.ч. short/flip/fees).
1. Одна стратегия = один тикер
2. InMemoryDataBase24 - все торговые сущности хранятся 24 часа
3. Поддержка MarketOrder, LimitOrder. Поддержка частичного исполнения. В Order должно быть поля Начальное кол-во контрактов и Rest (остаток контрактов).
4. Offline Quotes History Будет отдельный специальный проект и отдельная база данных для истории котировок для тестирования. Там будет совсем другая База Данных с подневным хранением нескольких торгуемых Тикеров в формате csv cо сжатием и с хранением в БазеДанных как Image . Но это уже совсем другой проект, поэтому в данном проекте мы это не делаем. Это уже не торговля, а сопутствующие проекты, которых на самом деле тоже много. Но пока мы их не трогаем.

Ок, тогда схема и “скелет” доменной модели можно зафиксировать так, чтобы:

  • 1 стратегия = 1 тикер (жёсткая связь)
  • все торговые сущности (orders/trades/positions) живут в InMemory “окне” 24 часа
  • есть Market/Limit, partial fills, у Order есть InitialQty и RestQty
  • offline quotes/history — вообще не трогаем в этом проекте

Ниже даю компактную, но “правильную” БД-схему (под EF Core), и как это ложится на вашу структуру.


Рекомендуемая схема БД (операционная, 24 часа)

tickers

  • id (PK, GUID/INT)
  • symbol (unique, string)
  • price_step (decimal) — опционально
  • qty_step (decimal) — опционально
  • created_utc

Индекс: UX_tickers_symbol


strategies

  • id (PK)
  • name (string)
  • ticker_id (FK → tickers.id) NOT NULL
  • status (enum: Running/Stopped/Archived)
  • created_utc

Ограничение: если строго “1 стратегия = 1 тикер” и тикер не может быть у двух стратегий, тогда:

  • UX_strategies_ticker_id (unique)

Если же допустимо несколько стратегий на один тикер — убираете unique.


orders

  • id (PK)
  • strategy_id (FK → strategies.id)
  • ticker_id (FK → tickers.id) — можно держать для быстрых выборок, но тогда следить за консистентностью (обычно ок)
  • client_order_id (string, unique per strategy) — удобно для идемпотентности
  • side (enum: Buy/Sell)
  • type (enum: Market/Limit)
  • time_in_force (enum: GTC/IOC/FOK) — можно позже
  • limit_price (decimal, nullable) — для Market = null
  • initial_qty (decimal/int) — “Начальное кол-во контрактов”
  • rest_qty (decimal/int) — “остаток”
  • filled_qty (computed или поле) — можно хранить и поддерживать: filled = initial - rest
  • avg_fill_price (decimal, nullable) — средняя цена исполнения (по трейдам)
  • status (enum: New/PartiallyFilled/Filled/Canceled/Rejected/Expired)
  • created_utc
  • updated_utc

Индексы:

  • (strategy_id, created_utc desc)
  • (ticker_id, created_utc desc)
  • UX_orders_strategy_clientOrderId (если используете)

trades (fills)

Каждый partial fill — отдельная запись.

  • id (PK)
  • order_id (FK → orders.id)
  • strategy_id (FK → strategies.id) — денормализация для скорости
  • ticker_id (FK → tickers.id)
  • side (Buy/Sell) — обычно дублируют с order.side, но полезно
  • qty (decimal/int)
  • price (decimal)
  • fee (decimal, default 0) — опционально
  • executed_utc

Индексы:

  • (order_id, executed_utc)
  • (strategy_id, executed_utc desc)

positions

Позиция по стратегии (так как 1 стратегия = 1 тикер, позиция фактически 1:1 со strategy).

  • strategy_id (PK, FK → strategies.id)
  • ticker_id (FK → tickers.id)
  • net_qty (decimal/int) — текущая позиция (long > 0, short < 0)
  • avg_entry_price (decimal, nullable) — средняя цена входа по открытой позиции
  • realized_pnl (decimal) — реализованная прибыль
  • unrealized_pnl (decimal) — нереализованная прибыль (зависит от last price)
  • last_price (decimal, nullable) — последняя цена для расчёта unrealized
  • updated_utc

Индекс: (ticker_id) (если нужно быстро по тикеру)

> Важно: unrealized_pnl лучше либо не хранить, либо хранить как “кэш” и пересчитывать при обновлении last_price. Для InMemory/24h можно хранить.


Связи (в терминах “как вы написали”)

  • Ticker 1 — * Strategies (или Ticker 1 — 1 Strategy если unique по ticker_id)
  • Strategy 1 — 1 Position
  • Strategy 1 — * Orders
  • Order 1 — * Trades
  • Strategy 1 — * Trades (денормализовано, удобно)
  • В каждом Order/Trade есть TickerId (можно и без него, но с ним проще фильтровать и чистить по тикеру)

24 часа в InMemoryDataBase24: как реализовать правильно

Есть два подхода:

Вариант A (самый простой): скользящее окно через периодическую очистку

  • Все сущности пишутся в обычные таблицы (InMemory provider EF или SQLite in-memory, или даже реальная БД).
  • Раз в N минут фоновой задачей удаляем всё, что старше 24 часов:
    • trades.executed_utc < now-24h
    • orders.created_utc < now-24h (но осторожно, если order ещё “живой”)
    • positions обычно не удаляем (позиция — текущее состояние), но если вы хотите “полностью 24h” — тогда позицию тоже пересоздавать/обнулять по правилам.

Для консистентности обычно:

  • trades чистим по executed_utc
  • orders чистим по updated_utc (если ордер давно завершён)
  • positions не чистим автоматически, потому что это не “история”, а состояние.

Вариант B: двухслойная модель

  • “Состояние” (positions, активные orders) хранится как есть (не вычищается по TTL).
  • “Лента/история 24h” (trades, завершенные orders) — TTL 24h.

С учётом ваших целей (симуляция + UI + последняя активность) обычно Вариант B удобнее.


Частичное исполнение: что обязательно в доменной логике

Для каждого fill (trade) вы делаете:

  • order.rest_qty -= trade.qty
  • order.status:
    • если rest_qty == initial_qty и был reject/cancel — Canceled/Rejected
    • если 0 < rest_qty < initial_qtyPartiallyFilled
    • если rest_qty == 0Filled
  • order.avg_fill_price пересчитывается по всем fills (взвешенная средняя)

И параллельно обновляется position:

  • net_qty меняется по знаку side
  • avg_entry_price и realized_pnl считаются по правилам закрытия/переворота (long→flat, long→short и т.д.)
  • unrealized_pnl считается от last_price (которую вы обновляете из котировки/last)

Минимальные EF Core сущности (структура полей)

Если хотите — я дам готовые классы C# под EF Core (с enum’ами, конфигурацией OnModelCreating, индексами и ValueConverter под decimal/long), но чтобы не гадать, уточню 2 момента:

  1. Количество контрактов — это int (целые) или decimal (дробные, как в крипте)?
  2. LimitOrder: исполняем по лучшей доступной цене (обычно min(ask, limit) для buy и max(bid, limit) для sell), или строго по limit_price?

Скажете эти 2 вещи — и я в следующем сообщении выдам:

  • полные C# entity-классы,
  • EF Core fluent config,
  • и простую реализацию “matching/исполнения” для Market/Limit с partial fills (на уровне симулятора).
1. Kол-во контрактов: decimal (дробные)
2. LimitOrder: Исполнение по лучшей доступной цене

Ниже — рабочая схема сущностей и логики под EF Core для симулятора, где количество — decimal, а Limit исполняется по лучшей доступной цене (price improvement).

1) Сущности (Orders, Trades/Fills, Position, MarketPrice)

Ключевая идея: Order хранит агрегированное состояние, а Fill (Trade) — неизменяемые события исполнения. Позиция — текущий агрегат по инструменту.

C#
Копировать код
public enum OrderSide { Buy = 1, Sell = 2 }
public enum OrderType { Market = 1, Limit = 2 }
public enum OrderStatus
{
    New = 1,
    PartiallyFilled = 2,
    Filled = 3,
    Canceled = 4,
    Rejected = 5
}

public class Order
{
    public long Id { get; set; }

    public string Symbol { get; set; } = default!;

    public OrderSide Side { get; set; }
    public OrderType Type { get; set; }
    public OrderStatus Status { get; set; } = OrderStatus.New;

    // Submitted
    public decimal Qty { get; set; }                 // initial
    public decimal? LimitPrice { get; set; }         // null for Market

    // Aggregates
    public decimal FilledQty { get; set; }           // sum of fills
    public decimal AvgFillPrice { get; set; }        // weighted avg (0 if no fills)
    public decimal RestQty { get; set; }             // Qty - FilledQty

    public DateTimeOffset CreatedAt { get; set; }
    public DateTimeOffset? LastUpdatedAt { get; set; }

    public List<Fill> Fills { get; set; } = new();
}

public class Fill
{
    public long Id { get; set; }

    public long OrderId { get; set; }
    public Order Order { get; set; } = default!;

    public string Symbol { get; set; } = default!;
    public OrderSide Side { get; set; }

    public decimal Qty { get; set; }
    public decimal Price { get; set; }

    public DateTimeOffset ExecutedAt { get; set; }

    // удобно для идемпотентности/аудита (можно убрать)
    public Guid ExecutionId { get; set; } = Guid.NewGuid();
}

public class Position
{
    public long Id { get; set; }

    public string Symbol { get; set; } = default!;

    // Net: long > 0, short < 0
    public decimal NetQty { get; set; }

    // Средняя цена входа по ОТКРЫТОЙ части позиции (0 если flat)
    public decimal AvgEntryPrice { get; set; }

    public decimal RealizedPnl { get; set; }

    public DateTimeOffset UpdatedAt { get; set; }
}

public class MarketPrice
{
    public long Id { get; set; }
    public string Symbol { get; set; } = default!;

    public decimal Bid { get; set; }
    public decimal Ask { get; set; }
    public decimal Last { get; set; }

    public DateTimeOffset Timestamp { get; set; }
}

Почему так

  • Order.FilledQty/AvgFillPrice/RestQty — денормализация для быстрого UI/отчетов.
  • Истина о сделках — в Fill (журнал событий).
  • Position хранит только текущий результат по символу.

2) EF Core Fluent Config (индексы, precision для decimal)

Для decimal обязательно задать HasPrecision, иначе провайдер может выбрать неудобный тип.

C#
Копировать код
using Microsoft.EntityFrameworkCore;

public class SimDbContext : DbContext
{
    public DbSet<Order> Orders => Set<Order>();
    public DbSet<Fill> Fills => Set<Fill>();
    public DbSet<Position> Positions => Set<Position>();
    public DbSet<MarketPrice> MarketPrices => Set<MarketPrice>();

    protected override void OnModelCreating(ModelBuilder b)
    {
        // Order
        b.Entity<Order>(e =>
        {
            e.HasKey(x => x.Id);
            e.Property(x => x.Symbol).HasMaxLength(32).IsRequired();

            e.Property(x => x.Qty).HasPrecision(28, 10);
            e.Property(x => x.FilledQty).HasPrecision(28, 10);
            e.Property(x => x.RestQty).HasPrecision(28, 10);

            e.Property(x => x.LimitPrice).HasPrecision(28, 10);
            e.Property(x => x.AvgFillPrice).HasPrecision(28, 10);

            e.HasMany(x => x.Fills)
             .WithOne(x => x.Order)
             .HasForeignKey(x => x.OrderId)
             .OnDelete(DeleteBehavior.Cascade);

            e.HasIndex(x => new { x.Symbol, x.CreatedAt });
            e.HasIndex(x => x.Status);
        });

        // Fill
        b.Entity<Fill>(e =>
        {
            e.HasKey(x => x.Id);
            e.Property(x => x.Symbol).HasMaxLength(32).IsRequired();

            e.Property(x => x.Qty).HasPrecision(28, 10);
            e.Property(x => x.Price).HasPrecision(28, 10);

            e.HasIndex(x => new { x.OrderId, x.ExecutedAt });
            e.HasIndex(x => x.ExecutionId).IsUnique();
        });

        // Position
        b.Entity<Position>(e =>
        {
            e.HasKey(x => x.Id);
            e.Property(x => x.Symbol).HasMaxLength(32).IsRequired();

            e.Property(x => x.NetQty).HasPrecision(28, 10);
            e.Property(x => x.AvgEntryPrice).HasPrecision(28, 10);
            e.Property(x => x.RealizedPnl).HasPrecision(28, 10);

            e.HasIndex(x => x.Symbol).IsUnique();
        });

        // MarketPrice
        b.Entity<MarketPrice>(e =>
        {
            e.HasKey(x => x.Id);
            e.Property(x => x.Symbol).HasMaxLength(32).IsRequired();

            e.Property(x => x.Bid).HasPrecision(28, 10);
            e.Property(x => x.Ask).HasPrecision(28, 10);
            e.Property(x => x.Last).HasPrecision(28, 10);

            e.HasIndex(x => new { x.Symbol, x.Timestamp });
        });
    }
}

Precision 28,10 — типичный безопасный выбор. Если у вас ограниченные масштабы, можно меньше.


3) Ценообразование (Market / Limit с лучшей ценой)

Пусть на момент исполнения есть bid/ask.

  • Market Buy → price = ask
  • Market Sell → price = bid
  • Limit Buy:
    • если ask <= limit → исполняется по ask (улучшение)
    • иначе не исполняется
  • Limit Sell:
    • если bid >= limit → исполняется по bid
    • иначе не исполняется

Частичное исполнение в простом симуляторе обычно задают через “доступную ликвидность” на тике (availableQty). Тогда fillQty = min(order.RestQty, availableQty).


4) Пересчет Order по fill (денормализация)

C#
Копировать код
static void ApplyFillToOrder(Order o, decimal fillQty, decimal fillPrice, DateTimeOffset ts)
{
    if (fillQty <= 0) throw new ArgumentOutOfRangeException(nameof(fillQty));
    if (fillPrice <= 0) throw new ArgumentOutOfRangeException(nameof(fillPrice));
    if (o.RestQty <= 0) throw new InvalidOperationException("Order already completed.");
    if (fillQty > o.RestQty) fillQty = o.RestQty;

    var prevFilledQty = o.FilledQty;
    var prevNotional = o.AvgFillPrice * prevFilledQty;

    var newFilledQty = prevFilledQty + fillQty;
    var newNotional = prevNotional + (fillQty * fillPrice);

    o.FilledQty = newFilledQty;
    o.RestQty = o.Qty - o.FilledQty;
    o.AvgFillPrice = (o.FilledQty == 0m) ? 0m : (newNotional / o.FilledQty);

    o.LastUpdatedAt = ts;

    if (o.RestQty == 0m) o.Status = OrderStatus.Filled;
    else if (o.FilledQty > 0m) o.Status = OrderStatus.PartiallyFilled;
    else o.Status = OrderStatus.New;
}

5) Обновление позиции (long/short, частичное закрытие, переворот)

Ниже — надежный вариант “netting” с учетом закрытия и переворота.

Соглашение:

  • Buy увеличивает NetQty
  • Sell уменьшает NetQty
  • AvgEntryPrice — цена открытой части позиции
  • RealizedPnl обновляется при закрытии части позиции
  • Комиссии/слиппедж не учитываем (можно добавить позже)
C#
Копировать код
static void ApplyFillToPosition(Position p, OrderSide side, decimal qty, decimal price, DateTimeOffset ts)
{
    if (qty <= 0) throw new ArgumentOutOfRangeException(nameof(qty));
    if (price <= 0) throw new ArgumentOutOfRangeException(nameof(price));

    // signedQty: buy +, sell -
    var signedQty = side == OrderSide.Buy ? qty : -qty;

    var prevQty = p.NetQty;
    var newQty = prevQty + signedQty;

    // If flat before
    if (prevQty == 0m)
    {
        p.NetQty = newQty;
        p.AvgEntryPrice = price;     // вход по цене сделки
        p.UpdatedAt = ts;
        return;
    }

    // Same direction (increase existing position)
    if ((prevQty > 0m && signedQty > 0m) || (prevQty < 0m && signedQty < 0m))
    {
        var prevAbs = Math.Abs(prevQty);
        var addAbs = Math.Abs(signedQty);
        var newAbs = prevAbs + addAbs;

        // weighted avg entry
        p.AvgEntryPrice = (p.AvgEntryPrice * prevAbs + price * addAbs) / newAbs;
        p.NetQty = newQty;
        p.UpdatedAt = ts;
        return;
    }

    // Opposite direction: closing and maybe flipping
    var closeAbs = Math.Min(Math.Abs(prevQty), Math.Abs(signedQty));
    var openAbs = Math.Abs(signedQty) - closeAbs;

    // Realized PnL from closing part:
    // If prev was long (>0) and we sell: pnl = (sellPrice - entry) * closeQty
    // If prev was short (<0) and we buy: pnl = (entry - buyPrice) * closeQty
    decimal realized;
    if (prevQty > 0m)
        realized = (price - p.AvgEntryPrice) * closeAbs;
    else
        realized = (p.AvgEntryPrice - price) * closeAbs;

    p.RealizedPnl += realized;

    // After closing:
    if (openAbs == 0m)
    {
        // fully closed to flat
        p.NetQty = 0m;
        p.AvgEntryPrice = 0m;
        p.UpdatedAt = ts;
        return;
    }

    // flipped: remaining opens new position in direction of signedQty
    p.NetQty = newQty;          // will have sign of signedQty
    p.AvgEntryPrice = price;    // new position entry at execution price (for remaining)
    p.UpdatedAt = ts;
}

6) Связать всё в “ExecuteOrderOnTick”

Примерно так: берете MarketPrice, определяете возможно ли исполнение, находите fillQty, создаете Fill, применяете к Order и Position, сохраняете.

Если хотите, я напишу полностью метод сервиса (с транзакцией EF Core, FirstOrDefaultAsync, обработкой идемпотентности через ExecutionId и т.п.). Для этого уточните:

  1. У вас одна позиция на символ (netting), как выше, или “по сделкам” (FIFO lots)?
  2. Нужно ли учитывать частичную ликвидность на тике (например, availableQty), или можно считать что всегда исполняется весь RestQty при выполнении условия?
1. У нас одна позиция на символ (netting), как выше, или “по сделкам” (FIFO lots).
2. Учитывать частичную ликвидность на тике (например, availableQty)

Понял: делаем netting (одна позиция на символ) и на тике учитываем частичную ликвидность. Ниже — цельный, рабоче-ориентированный скелет: decimal для количества контрактов, LimitOrder исполняется по лучшей доступной цене (price improvement), поддержка partial fills и корректный PnL для netting.

Ключевые правила

  • Qty и все расчёты количества: decimal.
  • На каждом тике есть “стаканная” доступность:
    • для покупки: лучшая цена продажи ask и объём askQty
    • для продажи: лучшая цена покупки bid и объём bidQty
  • Лимитка исполняется если:
    • Buy: ask <= limitPrice
    • Sell: bid >= limitPrice
  • Цена исполнения: всегда лучшая доступная (то есть ask для Buy, bid для Sell), а не лимитная.
  • Объём исполнения: min(orderRemaining, availableQty).

Модели (минимум)

C#
Копировать код
public enum Side { Buy, Sell }
public enum OrderType { Limit }
public enum OrderStatus { New, PartiallyFilled, Filled, Cancelled }

public sealed class Order
{
    public Guid Id { get; set; }
    public string Symbol { get; set; } = default!;
    public Side Side { get; set; }
    public OrderType Type { get; set; } = OrderType.Limit;

    public decimal Qty { get; set; }              // исходное
    public decimal FilledQty { get; set; }        // накопленное
    public decimal LimitPrice { get; set; }

    public OrderStatus Status { get; set; } = OrderStatus.New;
    public DateTimeOffset CreatedAt { get; set; }
    public DateTimeOffset UpdatedAt { get; set; }

    public decimal RemainingQty => Qty - FilledQty;
}

public sealed class Position
{
    public Guid Id { get; set; }
    public string Symbol { get; set; } = default!;

    public decimal NetQty { get; set; }          // >0 long, <0 short
    public decimal AvgEntryPrice { get; set; }   // средняя входа текущей позиции (для netting)
    public decimal RealizedPnl { get; set; }     // накопленная реализованная
    public DateTimeOffset UpdatedAt { get; set; }
}

public sealed class Fill
{
    public Guid Id { get; set; }
    public Guid OrderId { get; set; }
    public string Symbol { get; set; } = default!;
    public Side Side { get; set; }
    public decimal Qty { get; set; }            // decimal
    public decimal Price { get; set; }          // цена исполнения (bid/ask)
    public DateTimeOffset Ts { get; set; }

    // На случай идемпотентности:
    public string ExecutionKey { get; set; } = default!; // напр. tickId+orderId+seq
}

public sealed class TopOfBookTick
{
    public string Symbol { get; init; } = default!;
    public DateTimeOffset Ts { get; init; }

    public decimal Bid { get; init; }
    public decimal BidQty { get; init; }

    public decimal Ask { get; init; }
    public decimal AskQty { get; init; }

    public string TickId { get; init; } = default!; // уникальный идентификатор события/тика
}

Проверка исполнения лимитки (best available price)

C#
Копировать код
private static bool CanFillLimit(Order o, TopOfBookTick t)
{
    if (o.Type != OrderType.Limit) throw new NotSupportedException();

    return o.Side switch
    {
        Side.Buy  => t.AskQty > 0m && t.Ask <= o.LimitPrice,
        Side.Sell => t.BidQty > 0m && t.Bid >= o.LimitPrice,
        _ => false
    };
}

private static decimal BestPrice(Order o, TopOfBookTick t) =>
    o.Side == Side.Buy ? t.Ask : t.Bid;

private static decimal AvailableQty(Order o, TopOfBookTick t) =>
    o.Side == Side.Buy ? t.AskQty : t.BidQty;

Применение fill к ордеру

C#
Копировать код
private static void ApplyFillToOrder(Order o, decimal fillQty, DateTimeOffset ts)
{
    if (fillQty <= 0m) return;

    o.FilledQty += fillQty;

    if (o.FilledQty >= o.Qty)
    {
        o.FilledQty = o.Qty;
        o.Status = OrderStatus.Filled;
    }
    else
    {
        o.Status = OrderStatus.PartiallyFilled;
    }

    o.UpdatedAt = ts;
}

Netting-позиция + Realized PnL (корректно при частичных закрытиях и переворотах)

C#
Копировать код
private static void ApplyFillToPosition(Position p, Side side, decimal qty, decimal price, DateTimeOffset ts)
{
    if (qty <= 0m) return;

    // signedQty: buy => +, sell => -
    var signedQty = side == Side.Buy ? qty : -qty;

    var prevQty = p.NetQty;
    var newQty  = prevQty + signedQty;

    // If position was flat => open new
    if (prevQty == 0m)
    {
        p.NetQty = newQty;
        p.AvgEntryPrice = price;
        p.UpdatedAt = ts;
        return;
    }

    // Same direction => increase position, update avg
    if (Math.Sign((double)prevQty) == Math.Sign((double)newQty) && Math.Sign((double)prevQty) == Math.Sign((double)signedQty))
    {
        var prevAbs = Math.Abs(prevQty);
        var addAbs  = Math.Abs(signedQty);
        var newAbs  = prevAbs + addAbs;

        p.AvgEntryPrice = (p.AvgEntryPrice * prevAbs + price * addAbs) / newAbs;
        p.NetQty = newQty;
        p.UpdatedAt = ts;
        return;
    }

    // Otherwise: reducing or flipping
    var closeAbs = Math.Min(Math.Abs(prevQty), Math.Abs(signedQty));
    var openAbs  = Math.Abs(signedQty) - closeAbs;

    decimal realized = 0m;
    if (closeAbs > 0m)
    {
        // If we were long and we sell => (sell - entry) * close
        // If we were short and we buy => (entry - buy) * close
        realized = prevQty > 0m
            ? (price - p.AvgEntryPrice) * closeAbs
            : (p.AvgEntryPrice - price) * closeAbs;

        p.RealizedPnl += realized;
    }

    if (openAbs == 0m)
    {
        // fully closed to flat
        p.NetQty = 0m;
        p.AvgEntryPrice = 0m;
        p.UpdatedAt = ts;
        return;
    }

    // flipped: leftover opens new position at fill price
    p.NetQty = newQty;         // sign flipped
    p.AvgEntryPrice = price;   // new entry
    p.UpdatedAt = ts;
}

Примечание: Math.Sign(double) — не идеально для decimal. Лучше сделать свою функцию знака для decimal:

C#
Копировать код
private static int Sign(decimal x) => x > 0m ? 1 : x < 0m ? -1 : 0;

и заменить Math.Sign((double)...).


Полный сервис: “исполнить ордер на тике” с учётом liquidity и идемпотентности

Ниже пример “один ордер — один тик”. В реальности вы вероятно будете матчить пачку ордеров по символу на каждый тик.

C#
Копировать код
public sealed class MatchingService
{
    private readonly AppDbContext _db;

    public MatchingService(AppDbContext db) => _db = db;

    public async Task<Fill?> TryExecuteLimitOnTickAsync(Guid orderId, TopOfBookTick tick, CancellationToken ct)
    {
        // 1) Load order
        var order = await _db.Orders.SingleOrDefaultAsync(o => o.Id == orderId, ct);
        if (order is null) return null;

        if (order.Symbol != tick.Symbol) return null;
        if (order.Status is OrderStatus.Filled or OrderStatus.Cancelled) return null;
        if (order.RemainingQty <= 0m) return null;

        // 2) Price/liq check
        if (!CanFillLimit(order, tick)) return null;

        var px = BestPrice(order, tick);
        var avail = AvailableQty(order, tick);
        if (avail <= 0m) return null;

        // 3) Determine fill qty (partial liquidity)
        var fillQty = Math.Min(order.RemainingQty, avail);
        if (fillQty <= 0m) return null;

        // 4) Idempotency key: prevent duplicate fill on reprocessing same tick
        // Choose a scheme that is unique per (order, tick) if you allow at most one fill per tick.
        var execKey = $"{tick.TickId}:{order.Id}";

        var already = await _db.Fills.AnyAsync(f => f.ExecutionKey == execKey, ct);
        if (already) return null;

        // 5) Load/Create position (netting: one per symbol/account)
        // Если есть AccountId — добавьте в ключ.
        var pos = await _db.Positions.SingleOrDefaultAsync(p => p.Symbol == order.Symbol, ct);
        if (pos is null)
        {
            pos = new Position
            {
                Id = Guid.NewGuid(),
                Symbol = order.Symbol,
                NetQty = 0m,
                AvgEntryPrice = 0m,
                RealizedPnl = 0m,
                UpdatedAt = tick.Ts
            };
            _db.Positions.Add(pos);
        }

        // 6) Apply changes atomically
        // Важно: в проде оберните в транзакцию + правильный уровень изоляции/row locks.
        var fill = new Fill
        {
            Id = Guid.NewGuid(),
            OrderId = order.Id,
            Symbol = order.Symbol,
            Side = order.Side,
            Qty = fillQty,
            Price = px,
            Ts = tick.Ts,
            ExecutionKey = execKey
        };

        _db.Fills.Add(fill);

        ApplyFillToOrder(order, fillQty, tick.Ts);
        ApplyFillToPosition(pos, order.Side, fillQty, px, tick.Ts);

        await _db.SaveChangesAsync(ct);

        return fill;
    }
}

Про важное в многопоточности/конкурентности

Если у вас несколько воркеров могут одновременно исполнять один и тот же ордер/символ, вам нужно одно из:

  • транзакция + блокировка строки ордера/позиции (в SQL Server: UPDLOCK, в Postgres: SELECT ... FOR UPDATE);
  • оптимистичная конкуренция через RowVersion/xmin + retry;
  • уникальный индекс на Fills.ExecutionKey (желательно) и обработка unique violation.

Минимально рекомендую: уникальный индекс на ExecutionKey и retry на конфликт.


Уточнения, которые влияют на финальную реализацию

  1. availableQty на тике — это “объём на лучшей цене” (top-of-book) или суммарная глубина? Сейчас я заложил top-of-book.
  2. Разрешаете ли несколько исполнений одного ордера в рамках одного tickId (например, если вы прогоняете ордера циклом и обновляете доступность)? Если да — ExecutionKey надо сделать с seq/fillNo.
  3. Есть ли комиссии/проскальзывание/маржа — добавим в Fill и в расчёт PnL.

Скажите 1–2–3, и я под это подправлю код (включая транзакции и индексы).

1. top-of-book
2. Возьмите самое простое решение
3. Да, комиссия учитываться должна,
проскальзование можно опустить,
маржы должно хватать на исполнение ордера

Ок: top-of-book, самое простое, комиссии да, проскальзывание нет, маржи всегда хватает.

Ниже — простая схема:

  • берём цену и доступное кол-во на лучшей стороне;
  • исполняем min(remaining, available);
  • создаём один Fill на тик;
  • учитываем комиссию в Fill и в позиции (через накопленную комиссию), не усложняя PnL;
  • защищаемся от дублей самым простым способом: ExecutionKey = (orderId, tickId) + уникальный индекс.

Модели (минимально нужное)

C#
Копировать код
public enum Side { Buy, Sell }

public class Order
{
    public Guid Id { get; set; }
    public string Symbol { get; set; } = "";
    public Side Side { get; set; }
    public decimal Qty { get; set; }            // желаемый объём
    public decimal FilledQty { get; set; }      // сколько уже исполнено
    public DateTimeOffset CreatedTs { get; set; }
    public DateTimeOffset? ClosedTs { get; set; }
}

public class Position
{
    public Guid Id { get; set; }
    public string Symbol { get; set; } = "";
    public decimal Qty { get; set; }            // +long, -short
    public decimal AvgPx { get; set; }          // средняя цена входа для открытой позиции
    public decimal RealizedPnl { get; set; }    // можно считать позже, но пусть будет
    public decimal FeesPaid { get; set; }       // накопленные комиссии
    public DateTimeOffset UpdatedTs { get; set; }
}

public class Fill
{
    public Guid Id { get; set; }
    public Guid OrderId { get; set; }
    public string Symbol { get; set; } = "";
    public Side Side { get; set; }
    public decimal Qty { get; set; }
    public decimal Price { get; set; }
    public decimal Fee { get; set; }
    public DateTimeOffset Ts { get; set; }

    // Самое простое для “один fill на тик на ордер”
    public string ExecutionKey { get; set; } = ""; // $"{orderId:N}:{tickId}"
}

Уникальный индекс на ExecutionKey (защита от дублей)

C#
Копировать код
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
    modelBuilder.Entity<Fill>()
        .HasIndex(x => x.ExecutionKey)
        .IsUnique();
}

Tick (top-of-book)

C#
Копировать код
public sealed record MarketTick(
    string Symbol,
    long TickId,
    DateTimeOffset Ts,
    decimal BestBidPx,
    decimal BestBidQty,
    decimal BestAskPx,
    decimal BestAskQty
);

Комиссия (простая модель)

Самый простой вариант — фиксированная ставка в базисных пунктах от notional:

fee=qtypriceratefee = |qty| \cdot price \cdot rate
C#
Копировать код
public interface IFeeModel
{
    decimal CalcFee(decimal qty, decimal price, Side side);
}

public sealed class ProportionalFeeModel : IFeeModel
{
    private readonly decimal _rate; // например 0.0004m = 4 bps

    public ProportionalFeeModel(decimal rate) => _rate = rate;

    public decimal CalcFee(decimal qty, decimal price, Side side)
        => Math.Abs(qty) * price * _rate;
}

Исполнение: один fill на тик, top-of-book, с комиссией

C#
Копировать код
public sealed class ExecutionService
{
    private readonly AppDbContext _db;
    private readonly IFeeModel _fees;

    public ExecutionService(AppDbContext db, IFeeModel fees)
    {
        _db = db;
        _fees = fees;
    }

    public async Task<Fill?> TryExecuteAtTopOfBookAsync(Guid orderId, MarketTick tick, CancellationToken ct)
    {
        // 1) Загружаем ордер + позицию (простое: отдельные запросы)
        var order = await _db.Orders.SingleAsync(o => o.Id == orderId, ct);
        if (order.Symbol != tick.Symbol) return null;

        var remaining = order.Qty - order.FilledQty;
        if (remaining <= 0) return null;

        // 2) Определяем цену и доступность по top-of-book
        decimal px, avail;
        if (order.Side == Side.Buy)
        {
            px = tick.BestAskPx;
            avail = tick.BestAskQty;
        }
        else
        {
            px = tick.BestBidPx;
            avail = tick.BestBidQty;
        }

        if (px <= 0 || avail <= 0) return null;

        // 3) Один fill на тик: берём минимум из remaining и avail
        var fillQty = Math.Min(remaining, avail);
        if (fillQty <= 0) return null;

        // 4) ExecutionKey = orderId+tickId
        var execKey = $"{order.Id:N}:{tick.TickId}";

        // 5) Проверка “уже исполняли на этом тике?”
        //    (самое простое: сначала быстрый запрос, затем уникальный индекс подстрахует)
        var already = await _db.Fills.AnyAsync(f => f.ExecutionKey == execKey, ct);
        if (already) return null;

        // 6) Создаём fill + комиссия
        var fee = _fees.CalcFee(fillQty, px, order.Side);

        var fill = new Fill
        {
            Id = Guid.NewGuid(),
            OrderId = order.Id,
            Symbol = order.Symbol,
            Side = order.Side,
            Qty = fillQty,
            Price = px,
            Fee = fee,
            Ts = tick.Ts,
            ExecutionKey = execKey
        };

        _db.Fills.Add(fill);

        // 7) Обновляем ордер
        order.FilledQty += fillQty;
        if (order.FilledQty >= order.Qty)
            order.ClosedTs = tick.Ts;

        // 8) Обновляем позицию (простая логика: средняя цена для наращивания, кросс — уменьшает/разворачивает)
        var pos = await _db.Positions.SingleOrDefaultAsync(p => p.Symbol == order.Symbol, ct)
                  ?? new Position { Id = Guid.NewGuid(), Symbol = order.Symbol, Qty = 0, AvgPx = 0 };

        ApplyFillToPositionSimple(pos, order.Side, fillQty, px, fee, tick.Ts);

        if (_db.Entry(pos).State == EntityState.Detached)
            _db.Positions.Add(pos);

        // 9) Сохраняем
        //    Если два потока всё же попытаются вставить один и тот же ExecutionKey,
        //    БД отобьёт (unique violation). Можно поймать и вернуть null.
        try
        {
            await _db.SaveChangesAsync(ct);
        }
        catch (DbUpdateException)
        {
            // максимально просто: считаем, что это дубль (по индексу) и "не исполнили"
            return null;
        }

        return fill;
    }

    private static void ApplyFillToPositionSimple(Position pos, Side side, decimal qty, decimal px, decimal fee, DateTimeOffset ts)
    {
        // Комиссия всегда увеличивает FeesPaid
        pos.FeesPaid += fee;

        var signed = (side == Side.Buy) ? qty : -qty;

        // Если позиция нулевая — просто открываем
        if (pos.Qty == 0)
        {
            pos.Qty = signed;
            pos.AvgPx = px;
            pos.UpdatedTs = ts;
            return;
        }

        // Если сделка в ту же сторону — пересчёт средней
        if ((pos.Qty > 0 && signed > 0) || (pos.Qty < 0 && signed < 0))
        {
            var newQty = pos.Qty + signed;
            var wavg = (Math.Abs(pos.Qty) * pos.AvgPx + Math.Abs(signed) * px) / Math.Abs(newQty);
            pos.Qty = newQty;
            pos.AvgPx = wavg;
            pos.UpdatedTs = ts;
            return;
        }

        // Иначе это уменьшение/переворот. Для “самого простого”:
        // - RealizedPnl считаем при закрытии части по разнице цен
        // - Если переворот — новая AvgPx = px на оставшийся объём
        var closingQty = Math.Min(Math.Abs(pos.Qty), Math.Abs(signed));

        // PnL: для long закрытие по sell: (px - avg) * closeQty
        //      для short закрытие по buy: (avg - px) * closeQty
        if (pos.Qty > 0) // long, сделка sell
            pos.RealizedPnl += (px - pos.AvgPx) * closingQty;
        else // short, сделка buy
            pos.RealizedPnl += (pos.AvgPx - px) * closingQty;

        var newPosQty = pos.Qty + signed; // signed противоположен знаку pos.Qty

        if (newPosQty == 0)
        {
            pos.Qty = 0;
            pos.AvgPx = 0;
        }
        else
        {
            // переворот: остаток открывается по цене текущего fill
            pos.Qty = newPosQty;
            pos.AvgPx = px;
        }

        pos.UpdatedTs = ts;
    }
}

Что именно “простое решение” гарантирует/не гарантирует

Гарантирует:

  • один Fill на orderId + tickId;
  • комиссия учитывается и копится в Position.FeesPaid;
  • PnL по закрываемой части считается без проскальзывания.

Не гарантирует (если будет параллелизм):

  • идеальную согласованность order.FilledQty при одновременном исполнении одного ордера разными потоками (для этого уже нужен lock/rowversion+retry). Но при “самом простом” часто достаточно однопоточного исполнителя на символ/ордер.

Если хотите остаться “простыми”, но сделать надёжнее в concurrency, следующий минимальный шаг — добавить RowVersion в Order и Position и retry при DbUpdateConcurrencyException.

Спасибо большое.
Все что сделано меня уже устраивает.
Надо с начала это все переварить.

Понял. Если захотите позже, можно будет:

  • прогнать это на небольшом наборе тест-кейсов (частичное исполнение, переворот, повторный тик, комиссия);
  • или накидать минимальные unit-тесты, чтобы зафиксировать логику и не бояться рефакторинга.

Когда переварите и появятся вопросы — приносите конкретный кейс (набор Fill-ов и ожидаемый итог по Qty/AvgPx/RealizedPnl/FeesPaid), разберём.