пятница, 6 февраля 2026 г.

Chad, EventHub, TradingEntities, Part2

Chad, EventHub, TradingEntities, Part2

https://ask.chadgpt.ru/share/b7d47f07-c985-48df-bbbc-70622e419623

Текущая версия хороша как “минимально рабочая”, но у неё есть несколько типичных ограничений: нельзя удалить обработчик, нет “одноразовых” подписок, нет асинхронных обработчиков/ожидания, нет обработки исключений, нет фильтрации/шаблонов ключей, нет гарантии порядка, ConcurrentBag неудобен для отписки, нет диагностики и контроля нагрузки.

Ниже — направления, что можно “логично” добавить, и один из удачных вариантов дизайна (при этом не превращая всё в полноценный брокер).

1) Типизировать ключ и/или событие

Вариант A: ключ типом TKey

Если вы хотите оставить концепцию “тикер → строковое сообщение”, но типизировать ключ:

C#

Копировать код

public sealed class EventHub<TKey>

    where TKey : notnull

{

    // ...

    public void AddHandler(TKey key, Action<string> handler) { /* ... */ }

    public void Publish(TKey key, string message) { /* ... */ }

}

Это уже убираетмагические строкии позволяет использовать, например, Symbol, Guid, int, record struct.

Вариант B: типизировать payload: TMessage

Чаще полезнее типизировать именно сообщение:

C#

Копировать код

public sealed class EventHub<TKey, TMessage>

    where TKey : notnull

{

    public void Subscribe(TKey key, Action<TMessage> handler) { /* ... */ }

    public void Publish(TKey key, TMessage message) { /* ... */ }

}

Вариант C: интерфейс IKey

Если хочется общий контракт:

C#

Копировать код

public interface IKey

{

    string Value { get; }

}

Но на практике TKey : notnull обычно удобнее и быстрее (без виртуальных вызовов и без навязывания строки).

2) Отписка (must-have)

С ConcurrentBag отписка неудобна. Лучше хранить обработчики так, чтобы можно было удалять:

  • либо copy-on-write через ImmutableArray/ImmutableList
  • либо хранить ConcurrentDictionary<SubscriptionId, handler>

Самый простой UX — возвращать IDisposable из Subscribe, чтобы можно было сделать:

C#

Копировать код

using var sub = hub.Subscribe(key, msg => ...);

3) “Одноразовые” подписки (Once)

Удобно для сценариев “жду первое событие”:

C#

Копировать код

hub.SubscribeOnce(key, handler);

Реализация: обёртка, которая сначала отписывается, потом вызывает обработчик (или наоборот, если важна reentrancy).

4) Асинхронные обработчики и управление параллелизмом

Синхронный Action<T> блокирует Publish. Часто нужно:

  • Func<T, ValueTask> и PublishAsync
  • выбор стратегии: вызывать последовательно или параллельно
  • ограничивать параллелизм (например, SemaphoreSlim)

Минимально:

C#

Копировать код

public ValueTask PublishAsync(TKey key, TMessage message, CancellationToken ct = default);

И опция:

  • Parallel = false (по умолчанию) — предсказуемее
  • Parallel = true — быстрее, но требует внимания к потокобезопасности хендлеров

5) Обработка исключений в обработчиках

Сейчас одно исключение “роняет” весь InvokeHandlers и остальные обработчики не вызовутся.

Варианты:

  • “проглатывать” и продолжать, а ошибки отдавать в OnError
  • агрегировать и бросать AggregateException
  • иметь стратегию: StopOnFirstError/Continue

6) Порядок вызова

ConcurrentBag не гарантирует порядок. Иногда важно “как подписались — так и вызываем”.
Если порядок важен, лучше:

  • ImmutableArray (копируется при добавлении/удалении)
  • или обычный List под lock (если нагрузка небольшая, это часто самый практичный путь)

7) Диагностика и удобства

Полезные мелочи:

  • int GetSubscriberCount(TKey key)
  • IReadOnlyCollection<TKey> Keys (или GetKeysSnapshot)
  • bool RemoveKey(TKey key) (очистить всех подписчиков)
  • метрики: сколько событий опубликовано, сколько ошибок, среднее время обработчиков (минимально — через коллбеки)

8) Фильтры / подписка “на всё” / шаблоны

Иногда хочется:

  • подписка на все ключи: SubscribeAll(...)
  • фильтр по payload: Subscribe(key, handler, predicate)
  • wildcard по строкам (если ключ строковый), например AAPL.*

Это можно сделать отдельным уровнем: ключевой матчинг — отдельная стратегия/интерфейс.


Предложение конкретной “хорошей” реализации (без лишнего)

Ниже вариант: TKey, TMessage, отписка через IDisposable, хранение подписчиков через ImmutableArray (copy-on-write). Это даёт:

  • быстрый Publish без локов (только чтение массива)
  • нормальную отписку
  • предсказуемый порядок (по порядку подписки)
  • потокобезопасность

C#

Копировать код

using System.Collections.Concurrent;

using System.Collections.Immutable;

using System.Threading;

 

public sealed class EventHub<TKey, TMessage> where TKey : notnull

{

    private sealed class Subscription : IDisposable

    {

        private readonly EventHub<TKey, TMessage> _hub;

        private readonly TKey _key;

        internal readonly Action<TMessage> Handler;

        private int _disposed;

 

        public Subscription(EventHub<TKey, TMessage> hub, TKey key, Action<TMessage> handler)

        {

            _hub = hub;

            _key = key;

            Handler = handler;

        }

 

        public void Dispose()

        {

            if (Interlocked.Exchange(ref _disposed, 1) == 1) return;

            _hub.Unsubscribe(_key, this);

        }

    }

 

    private readonly ConcurrentDictionary<TKey, ImmutableArray<Subscription>> _map = new();

 

    public IDisposable Subscribe(TKey key, Action<TMessage> handler)

    {

        if (handler is null) throw new ArgumentNullException(nameof(handler));

 

        var sub = new Subscription(this, key, handler);

 

        _map.AddOrUpdate(

            key,

            _ => ImmutableArray.Create(sub),

            (_, arr) => arr.Add(sub));

 

        return sub;

    }

 

    public IDisposable SubscribeOnce(TKey key, Action<TMessage> handler)

    {

        IDisposable? sub = null;

        sub = Subscribe(key, msg =>

        {

            sub!.Dispose();

            handler(msg);

        });

        return sub;

    }

 

    public void Publish(TKey key, TMessage message)

    {

        if (!_map.TryGetValue(key, out var subs)) return;

 

        // ImmutableArray безопасно перечислять без локов

        foreach (var sub in subs)

        {

            sub.Handler(message);

        }

    }

 

    public int GetSubscriberCount(TKey key)

        => _map.TryGetValue(key, out var subs) ? subs.Length : 0;

 

    public bool ClearKey(TKey key) => _map.TryRemove(key, out _);

 

    private void Unsubscribe(TKey key, Subscription sub)

    {

        while (true)

        {

            if (!_map.TryGetValue(key, out var current)) return;

 

            var next = current.Remove(sub);

            if (next.Length == 0)

            {

                // никто не подписан — убираем ключ целиком

                _map.TryRemove(new KeyValuePair<TKey, ImmutableArray<Subscription>>(key, current));

                return;

            }

 

            // попытка атомарно заменить массив

            if (_map.TryUpdate(key, next, current))

                return;

        }

    }

}

Что можно добавить поверх этого (точечно и без “переписывания всего”):

  • PublishSafe(...) с перехватом исключений и Action<Exception, TKey>/Action<Exception, TKey, TMessage>
  • асинхронный EventHub<TKey, TMessage> на Func<TMessage, ValueTask> + PublishAsync
  • настройку “параллелить обработчики или нет”

Вопросы, чтобы выбрать лучший дизайн под ваш кейс

  1. Важен ли порядок вызова обработчиков (FIFO по подписке)?
  2. Нужна ли отписка и как часто она будет происходить?
  3. Publish должен быть синхронным или нужно await-ить обработчики?
  4. В случае исключения в одном обработчике: останавливаемся или продолжаем?
  5. Насколько много ключей и подписчиков (примерно порядок: десятки/тысячи/сотни тысяч)?

Если ответите на эти 5 пунктов, я предложу оптимальную конфигурацию (простую, но “правильную”) и можно будет добавить ровно те фичи, которые реально нужны.

1. В данном варианте порядок очень важен FIFO.
2. Отписка, подписка - нужна всегда в любое время без остановки работы EventHub
3. Publish должен быть синхронным про "await - ить" я не понял.
Сделай и то и другое. По логической переменной: либо sync либо await
4. В случае возникновении исключение работаем дальше, но блокируем вызывать хэндлер на этом топике, пишем сообщение в лог. Все остальное должно работать. Мы запрещаем работу на этом канале с определенным топиком.
5. Ключей может быть десятки - как Тикеры на бирже. Сколько этих Тикеров. Наверное, больше сотни.

Ниже — вариант EventHub<TKey, TMessage> с:

  • FIFO: порядок вызова обработчиков = порядок подписки.
  • Subscribe/Unsubscribe в любое время без остановки хаба.
  • Два режима публикации: sync и async (выбирается логической переменной).
  • Fault isolation по топику: если любой хэндлер на топике кидает исключение, мы:
    1. продолжаем работу хаба в целом;
    2. помечаем топик как “заблокированный” и больше не вызываем обработчики для этого топика;
    3. логируем причину (исключение + топик).
  • Ключей “десятки/сотни” — подходит.

Почему FIFO при конкурентных подписках/отписках работает

Мы храним для каждого ключа неизменяемый массив подписок (ImmutableArray). Подписка/отписка создают новый массив и атомарно подменяют ссылку в ConcurrentDictionary через TryUpdate. Publish берёт снимок текущего массива и вызывает по порядку. Это даёт:

  • стабильный FIFO для текущего Publish;
  • отсутствие блокировок на чтении;
  • возможность подписываться/отписываться параллельно.

Код

C#

Копировать код

using System;

using System.Collections.Concurrent;

using System.Collections.Immutable;

using System.Threading;

using System.Threading.Tasks;

 

public sealed class EventHub<TKey, TMessage> where TKey : notnull

{

    public enum PublishMode { Sync, Async }

 

    private sealed class Subscription

    {

        public readonly Action<TMessage>? Sync;

        public readonly Func<TMessage, ValueTask>? Async;

 

        public Subscription(Action<TMessage> sync) => Sync = sync;

        public Subscription(Func<TMessage, ValueTask> async) => Async = async;

    }

 

    public readonly struct SubscriptionToken : IDisposable

    {

        private readonly EventHub<TKey, TMessage>? _hub;

        private readonly TKey _key;

        private readonly Subscription? _sub;

 

        internal SubscriptionToken(EventHub<TKey, TMessage> hub, TKey key, Subscription sub)

        {

            _hub = hub;

            _key = key;

            _sub = sub;

        }

 

        public void Dispose()

        {

            if (_hub is null || _sub is null) return;

            _hub.Unsubscribe(_key, _sub);

        }

    }

 

    private sealed class TopicState

    {

        public ImmutableArray<Subscription> Subs = ImmutableArray<Subscription>.Empty;

        public Exception? Fault;

        public bool IsFaulted => Fault is not null;

    }

 

    private readonly ConcurrentDictionary<TKey, TopicState> _topics = new();

 

    /// <summary>Логгер вызывается при первом падении топика (когда он блокируется).</summary>

    private readonly Action<TKey, Exception>? _onTopicFault;

 

    public EventHub(Action<TKey, Exception>? onTopicFault = null)

    {

        _onTopicFault = onTopicFault;

    }

 

    public bool IsTopicBlocked(TKey key)

        => _topics.TryGetValue(key, out var s) && s.IsFaulted;

 

    public Exception? GetTopicFault(TKey key)

        => _topics.TryGetValue(key, out var s) ? s.Fault : null;

 

    /// <summary>Ручная разблокировка топика (например после алерта/разбора).</summary>

    public bool TryUnblockTopic(TKey key)

    {

        if (!_topics.TryGetValue(key, out var s)) return false;

        // Если хотите строго атомарно — можно через lock(s). Но для вашего масштаба и модели — достаточно Exchange.

        Interlocked.Exchange(ref s.Fault, null);

        return true;

    }

 

    public SubscriptionToken Subscribe(TKey key, Action<TMessage> handler)

        => AddSubscription(key, new Subscription(handler));

 

    public SubscriptionToken SubscribeAsync(TKey key, Func<TMessage, ValueTask> handler)

        => AddSubscription(key, new Subscription(handler));

 

    private SubscriptionToken AddSubscription(TKey key, Subscription sub)

    {

        var state = _topics.GetOrAdd(key, _ => new TopicState());

 

        while (true)

        {

            var current = state.Subs;

 

            // FIFO: добавляем в конец

            var next = current.Add(sub);

 

            // атомарная замена ссылочного поля через CAS

            if (ReferenceEquals(

                    Interlocked.CompareExchange(ref state.Subs, next, current),

                    current))

            {

                return new SubscriptionToken(this, key, sub);

            }

        }

    }

 

    private void Unsubscribe(TKey key, Subscription sub)

    {

        if (!_topics.TryGetValue(key, out var state)) return;

 

        while (true)

        {

            var current = state.Subs;

            var next = current.Remove(sub);

 

            if (ReferenceEquals(

                    Interlocked.CompareExchange(ref state.Subs, next, current),

                    current))

            {

                // опционально: если пусто и не faulted — можно чистить ключ, но это не обязательно при вашем масштабе.

                // if (next.IsEmpty && !state.IsFaulted) _topics.TryRemove(key, out _);

                return;

            }

        }

    }

 

    /// <summary>

    /// Синхронная публикация. В режиме Async вызывает async-хэндлеры, но блокирует поток на каждом (GetAwaiter().GetResult()).

    /// </summary>

    public void Publish(TKey key, TMessage message, PublishMode mode)

    {

        if (!_topics.TryGetValue(key, out var state)) return;

        if (state.IsFaulted) return;

 

        var snapshot = state.Subs; // FIFO snapshot

 

        for (int i = 0; i < snapshot.Length; i++)

        {

            var sub = snapshot[i];

            try

            {

                if (mode == PublishMode.Sync)

                {

                    // Sync режим: вызываем только sync-хэндлеры (async игнорируем)

                    sub.Sync?.Invoke(message);

                }

                else

                {

                    // Async режим: предпочитаем async, иначе sync

                    if (sub.Async is not null)

                        sub.Async(message).GetAwaiter().GetResult();

                    else

                        sub.Sync?.Invoke(message);

                }

            }

            catch (Exception ex)

            {

                BlockTopic(state, key, ex);

                return; // после блокировки топика прекращаем дальнейшие вызовы на этом топике

            }

        }

    }

 

    /// <summary>

    /// Асинхронная публикация. В режиме Sync вызывает sync-хэндлеры, async игнорирует.

    /// В режиме Async await-ит async-хэндлеры последовательно (FIFO).

    /// </summary>

    public async ValueTask PublishAsync(TKey key, TMessage message, PublishMode mode)

    {

        if (!_topics.TryGetValue(key, out var state)) return;

        if (state.IsFaulted) return;

 

        var snapshot = state.Subs;

 

        for (int i = 0; i < snapshot.Length; i++)

        {

            var sub = snapshot[i];

            try

            {

                if (mode == PublishMode.Sync)

                {

                    sub.Sync?.Invoke(message);

                }

                else

                {

                    if (sub.Async is not null)

                        await sub.Async(message).ConfigureAwait(false);

                    else

                        sub.Sync?.Invoke(message);

                }

            }

            catch (Exception ex)

            {

                BlockTopic(state, key, ex);

                return;

            }

        }

    }

 

    private void BlockTopic(TopicState state, TKey key, Exception ex)

    {

        // Блокируем топик ровно один раз (первое исключение выигрывает)

        var prev = Interlocked.CompareExchange(ref state.Fault, ex, null);

        if (prev is null)

            _onTopicFault?.Invoke(key, ex);

    }

}


Важные уточнения по вашим требованиям

  1. FIFO соблюдается внутри одного Publish и соответствует порядку подписки.
    При конкурентной подписке “во время публикации” новый подписчик попадёт в следующий
    Publish (что обычно и нужно).
  2. “Сделай и sync и await”
    Я сделал оба API:
    • Publish(key, msg, mode)синхронный вызов.
    • PublishAsync(key, msg, mode)асинхронный вызов (можно await).

При этом mode решает, вызываем ли async-хэндлеры.

    • mode == Sync: вызываются только Action<TMessage> (async-хэндлеры игнорируются).
    • mode == Async: Func<TMessage, ValueTask> await-ится (или блокируется в Publish), а если подписан sync — он тоже отработает.

Если вам нужно другое поведение (например, в Sync режиме вызывать async-хэндлеры “огрублённо” через .GetResult()), скажите — поменяю.

  1. При исключении блокируем топик
    Сейчас: при первом исключении топик блокируется и дальнейшие публикации по ключу сразу
    return. Остальные топики работают.
  2. Логирование
    Передайте в конструктор
    onTopicFault: (key, ex) => logger.LogError(ex, "...").

Два вопроса, чтобы “доточить” поведение без сюрпризов

  1. При Publish(..., mode: Sync) если подписчик был добавлен через SubscribeAsync: его нужно игнорировать (как сейчас) или всё равно вызвать, блокируя поток?
  2. Нужно ли после блокировки топика автоматически отписывать всех (очищать список), или достаточно просто держать флаг блокировки? (сейчас — просто блокировка, список остаётся).

8. При Publish(..., mode: Sync) если подписчик был добавлен через SubscribeAsync: его нужно игнорировать (как сейчас) или всё равно вызвать, блокируя поток?
Здесь лучше из одного канала сделать два канала с разными Topics (Sync, Async)
9. Блокируем канал и очищаем весь список.

Понял. Тогда делаем два независимых канала (по сути, два MessageBus/Topic-пространства): один строго sync, второй строго async. Это убирает неоднозначность Publish(mode: Sync) vs SubscribeAsync.

Плюс: при любой ошибке блокируем топик и очищаем список подписчиков.

Ниже вариант с одним “фасадом” ChannelsBus, который содержит SyncBus и AsyncBus, с разными типами подписок и публикаций.

C#

Копировать код

using System;

using System.Collections.Concurrent;

using System.Collections.Generic;

using System.Threading;

using System.Threading.Tasks;

 

namespace Messaging

{

    public sealed class ChannelsBus<TKey>

        where TKey : notnull

    {

        public SyncChannel<TKey> Sync { get; }

        public AsyncChannel<TKey> Async { get; }

 

        public ChannelsBus(

            Action<TKey, Exception>? onSyncTopicFault = null,

            Action<TKey, Exception>? onAsyncTopicFault = null)

        {

            Sync = new SyncChannel<TKey>(onSyncTopicFault);

            Async = new AsyncChannel<TKey>(onAsyncTopicFault);

        }

    }

 

    // ---------------- SYNC ----------------

 

    public sealed class SyncChannel<TKey>

        where TKey : notnull

    {

        private readonly ConcurrentDictionary<TKey, SyncTopic> _topics = new();

        private readonly Action<TKey, Exception>? _onTopicFault;

 

        public SyncChannel(Action<TKey, Exception>? onTopicFault = null)

            => _onTopicFault = onTopicFault;

 

        public IDisposable Subscribe<TMessage>(TKey key, Action<TMessage> handler)

        {

            if (handler is null) throw new ArgumentNullException(nameof(handler));

            var topic = _topics.GetOrAdd(key, k => new SyncTopic(k, _onTopicFault));

            return topic.Add(handler);

        }

 

        public void Publish<TMessage>(TKey key, TMessage message)

        {

            if (!_topics.TryGetValue(key, out var topic)) return;

            topic.Publish(message);

        }

 

        private sealed class SyncTopic

        {

            private readonly TKey _key;

            private readonly Action<TKey, Exception>? _onFault;

 

            private readonly object _gate = new();

            private List<ISyncHandler> _handlers = new();

            private bool _blocked;

 

            public SyncTopic(TKey key, Action<TKey, Exception>? onFault)

            {

                _key = key;

                _onFault = onFault;

            }

 

            public IDisposable Add<TMessage>(Action<TMessage> handler)

            {

                lock (_gate)

                {

                    if (_blocked) return NoopDisposable.Instance;

 

                    var entry = new SyncHandler<TMessage>(handler);

                    var next = new List<ISyncHandler>(_handlers.Count + 1);

                    next.AddRange(_handlers);

                    next.Add(entry); // FIFO by subscription order

                    _handlers = next;

 

                    return new Subscription(this, entry);

                }

            }

 

            public void Publish<TMessage>(TMessage message)

            {

                ISyncHandler[] snapshot;

                lock (_gate)

                {

                    if (_blocked) return;

                    snapshot = _handlers.ToArray();

                }

 

                foreach (var h in snapshot)

                {

                    try

                    {

                        h.Invoke(message);

                    }

                    catch (Exception ex)

                    {

                        BlockAndClear(ex);

                        return;

                    }

                }

            }

 

            private void BlockAndClear(Exception ex)

            {

                lock (_gate)

                {

                    if (_blocked) return;

                    _blocked = true;

                    _handlers = new List<ISyncHandler>(0); // очищаем

                }

                _onFault?.Invoke(_key, ex);

            }

 

            private void Remove(ISyncHandler entry)

            {

                lock (_gate)

                {

                    if (_blocked) return;

 

                    var idx = _handlers.IndexOf(entry);

                    if (idx < 0) return;

 

                    var next = new List<ISyncHandler>(_handlers.Count - 1);

                    for (int i = 0; i < _handlers.Count; i++)

                        if (i != idx) next.Add(_handlers[i]);

 

                    _handlers = next;

                }

            }

 

            private sealed class Subscription : IDisposable

            {

                private SyncTopic? _topic;

                private readonly ISyncHandler _entry;

 

                public Subscription(SyncTopic topic, ISyncHandler entry)

                {

                    _topic = topic;

                    _entry = entry;

                }

 

                public void Dispose()

                {

                    var t = Interlocked.Exchange(ref _topic, null);

                    t?.Remove(_entry);

                }

            }

 

            private interface ISyncHandler

            {

                void Invoke<TMessage>(TMessage msg);

            }

 

            private sealed class SyncHandler<TMessage> : ISyncHandler

            {

                private readonly Action<TMessage> _handler;

                public SyncHandler(Action<TMessage> handler) => _handler = handler;

 

                public void Invoke<TAny>(TAny msg)

                {

                    if (msg is TMessage typed)

                        _handler(typed);

                }

            }

        }

    }

 

    // ---------------- ASYNC ----------------

 

    public sealed class AsyncChannel<TKey>

        where TKey : notnull

    {

        private readonly ConcurrentDictionary<TKey, AsyncTopic> _topics = new();

        private readonly Action<TKey, Exception>? _onTopicFault;

 

        public AsyncChannel(Action<TKey, Exception>? onTopicFault = null)

            => _onTopicFault = onTopicFault;

 

        public IDisposable Subscribe<TMessage>(TKey key, Func<TMessage, ValueTask> handler)

        {

            if (handler is null) throw new ArgumentNullException(nameof(handler));

            var topic = _topics.GetOrAdd(key, k => new AsyncTopic(k, _onTopicFault));

            return topic.Add(handler);

        }

 

        public ValueTask PublishAsync<TMessage>(TKey key, TMessage message, CancellationToken ct = default)

        {

            if (!_topics.TryGetValue(key, out var topic)) return ValueTask.CompletedTask;

            return topic.PublishAsync(message, ct);

        }

 

        private sealed class AsyncTopic

        {

            private readonly TKey _key;

            private readonly Action<TKey, Exception>? _onFault;

 

            private readonly object _gate = new();

            private List<IAsyncHandler> _handlers = new();

            private bool _blocked;

 

            public AsyncTopic(TKey key, Action<TKey, Exception>? onFault)

            {

                _key = key;

                _onFault = onFault;

            }

 

            public IDisposable Add<TMessage>(Func<TMessage, ValueTask> handler)

            {

                lock (_gate)

                {

                    if (_blocked) return NoopDisposable.Instance;

 

                    var entry = new AsyncHandler<TMessage>(handler);

                    var next = new List<IAsyncHandler>(_handlers.Count + 1);

                    next.AddRange(_handlers);

                    next.Add(entry); // FIFO by subscription order

                    _handlers = next;

 

                    return new Subscription(this, entry);

                }

            }

 

            public async ValueTask PublishAsync<TMessage>(TMessage message, CancellationToken ct)

            {

                IAsyncHandler[] snapshot;

                lock (_gate)

                {

                    if (_blocked) return;

                    snapshot = _handlers.ToArray();

                }

 

                foreach (var h in snapshot)

                {

                    ct.ThrowIfCancellationRequested();

                    try

                    {

                        await h.InvokeAsync(message, ct).ConfigureAwait(false);

                    }

                    catch (Exception ex)

                    {

                        BlockAndClear(ex);

                        return;

                    }

                }

            }

 

            private void BlockAndClear(Exception ex)

            {

                lock (_gate)

                {

                    if (_blocked) return;

                    _blocked = true;

                    _handlers = new List<IAsyncHandler>(0); // очищаем

                }

                _onFault?.Invoke(_key, ex);

            }

 

            private void Remove(IAsyncHandler entry)

            {

                lock (_gate)

                {

                    if (_blocked) return;

 

                    var idx = _handlers.IndexOf(entry);

                    if (idx < 0) return;

 

                    var next = new List<IAsyncHandler>(_handlers.Count - 1);

                    for (int i = 0; i < _handlers.Count; i++)

                        if (i != idx) next.Add(_handlers[i]);

 

                    _handlers = next;

                }

            }

 

            private sealed class Subscription : IDisposable

            {

                private AsyncTopic? _topic;

                private readonly IAsyncHandler _entry;

 

                public Subscription(AsyncTopic topic, IAsyncHandler entry)

                {

                    _topic = topic;

                    _entry = entry;

                }

 

                public void Dispose()

                {

                    var t = Interlocked.Exchange(ref _topic, null);

                    t?.Remove(_entry);

                }

            }

 

            private interface IAsyncHandler

            {

                ValueTask InvokeAsync<TMessage>(TMessage msg, CancellationToken ct);

            }

 

            private sealed class AsyncHandler<TMessage> : IAsyncHandler

            {

                private readonly Func<TMessage, ValueTask> _handler;

                public AsyncHandler(Func<TMessage, ValueTask> handler) => _handler = handler;

 

                public ValueTask InvokeAsync<TAny>(TAny msg, CancellationToken ct)

                {

                    if (msg is TMessage typed)

                        return _handler(typed);

 

                    return ValueTask.CompletedTask;

                }

            }

        }

    }

 

    internal sealed class NoopDisposable : IDisposable

    {

        public static NoopDisposable Instance { get; } = new();

        public void Dispose() { }

    }

}

Как использовать:

C#

Копировать код

var bus = new Messaging.ChannelsBus<string>(

    onSyncTopicFault: (k, ex) => Console.WriteLine($"SYNC topic '{k}' fault: {ex.Message}"),

    onAsyncTopicFault: (k, ex) => Console.WriteLine($"ASYNC topic '{k}' fault: {ex.Message}")

);

 

bus.Sync.Subscribe<int>("SyncTopic", x => Console.WriteLine($"sync {x}"));

bus.Async.Subscribe<int>("AsyncTopic", async x => { await Task.Delay(10); Console.WriteLine($"async {x}"); });

 

bus.Sync.Publish("SyncTopic", 123);

await bus.Async.PublishAsync("AsyncTopic", 456);

Если хотите, добавлю:

  • TryPublish/IsBlocked(key) для диагностики.
  • Авто-удаление топика из словаря после блокировки (сейчас он остаётся, но пустой и заблокированный).

Спасибо.
Все работает.
Я предполагаю это использовать для работы в Asp.Core c BackGroundServices
Bus будет SingleTone
BAckGroundServices - будут взаимодействовать между собой через Bus
1. BAckGroundServiceQuotes генерит биржевую котировку
2. Котировка попадает в BAckGroundServiceStrategy. Strategy генерит Order
3.
Order попадает В BAckGroundServiceExecutor. BAckGroundServiceExecutor генерит Trade
4. Trade
попадает в BAckGroundServicePositions. Расчитывается Позиция итд
5. Все новые сущности попадают в BAckGroundServiceInMemoryDataBase - здесь происходит их сохранение для визуализации
6. Все новые сущности также попадают в BAckGroundServiceEntityFrameworkDataBase - здесь происходит их сохранение.
--------
Вот такой очень примерный сценарий.
Несколько BAckgroundServices и BusLibrary.

 


Комментариев нет:

Отправить комментарий