Chad, EventHub, TradingEntities, Part1
26.02.07 EventHub, TradingEntities
https://ask.chadgpt.ru/share/b7d47f07-c985-48df-bbbc-70622e419623
В одном из проектов мне понравилась реализация очень простого mini
EventHub
Давайте его немного расширим и доработаем.
public class EventHub
{
private readonly ConcurrentDictionary<string,
ConcurrentBag<Action<string>>> _handlers = new();
public void AddHandler(string ticker, Action<string> handler)
{
var bag = _handlers.GetOrAdd(ticker, _ => new
ConcurrentBag<Action<string>>());
bag.Add(handler);
}
public void InvokeHandlers(string ticker, string message)
{
if (_handlers.TryGetValue(ticker, out var handlers))
{
foreach (var handler in handlers)
{
handler(message);
}
}
}
}
------
Например
public void AddHandler(string ticker, Action<string> handler)
можно вместо string сделать T или какой либо интерфейс IKey
Вообщем посмотри, что можно добавить
Текущая версия хороша как “минимально рабочая”, но у
неё есть несколько типичных ограничений: нельзя удалить обработчик, нет
“одноразовых” подписок, нет асинхронных обработчиков/ожидания, нет обработки
исключений, нет фильтрации/шаблонов ключей, нет гарантии порядка, ConcurrentBag неудобен для
отписки, нет диагностики и контроля нагрузки.
Ниже — направления, что можно “логично” добавить, и
один из удачных вариантов дизайна (при этом не превращая всё в полноценный
брокер).
1) Типизировать ключ и/или событие
Вариант A: ключ типом TKey
Если вы хотите оставить концепцию “тикер → строковое
сообщение”, но типизировать ключ:
public sealed class EventHub<TKey>
where TKey :
notnull
{
// ...
public void AddHandler(TKey
key, Action<string> handler) { /* ... */ }
public void Publish(TKey
key, string message) { /* ... */ }
}
Это уже убирает “магические строки” и позволяет использовать, например, Symbol, Guid, int, record struct.
Вариант B: типизировать payload: TMessage
Чаще полезнее типизировать именно сообщение:
public sealed class EventHub<TKey, TMessage>
where TKey :
notnull
{
public void Subscribe(TKey
key, Action<TMessage> handler) { /* ... */ }
public void Publish(TKey
key, TMessage message) { /* ... */ }
}
Вариант C: интерфейс IKey
Если хочется общий контракт:
public interface IKey
{
string Value { get; }
}
Но на практике TKey : notnull обычно удобнее
и быстрее (без виртуальных вызовов и без навязывания строки).
2) Отписка (must-have)
С ConcurrentBag отписка неудобна. Лучше хранить обработчики так,
чтобы можно было удалять:
- либо copy-on-write через ImmutableArray/ImmutableList
- либо хранить ConcurrentDictionary<SubscriptionId, handler>
Самый простой UX — возвращать IDisposable из Subscribe, чтобы можно
было сделать:
using var sub = hub.Subscribe(key, msg => ...);
3) “Одноразовые” подписки (Once)
Удобно для сценариев “жду первое событие”:
hub.SubscribeOnce(key,
handler);
Реализация: обёртка, которая сначала отписывается,
потом вызывает обработчик (или наоборот, если важна reentrancy).
4) Асинхронные обработчики и управление параллелизмом
Синхронный Action<T> блокирует Publish. Часто нужно:
- Func<T,
ValueTask> и PublishAsync
- выбор
стратегии: вызывать последовательно или параллельно
- ограничивать
параллелизм (например, SemaphoreSlim)
Минимально:
public ValueTask PublishAsync(TKey key, TMessage
message, CancellationToken ct = default);
И опция:
- Parallel =
false (по умолчанию) — предсказуемее
- Parallel =
true — быстрее, но требует внимания к потокобезопасности хендлеров
5) Обработка исключений в обработчиках
Сейчас одно исключение “роняет” весь InvokeHandlers и остальные
обработчики не вызовутся.
Варианты:
- “проглатывать”
и продолжать, а ошибки отдавать в OnError
- агрегировать
и бросать AggregateException
- иметь
стратегию: StopOnFirstError/Continue
6) Порядок вызова
ConcurrentBag не гарантирует порядок. Иногда важно “как подписались
— так и вызываем”.
Если порядок важен, лучше:
- ImmutableArray
(копируется при добавлении/удалении)
- или
обычный List под lock (если нагрузка небольшая, это часто
самый практичный путь)
7) Диагностика и удобства
Полезные мелочи:
- int
GetSubscriberCount(TKey key)
- IReadOnlyCollection<TKey> Keys (или GetKeysSnapshot)
- bool
RemoveKey(TKey key) (очистить всех подписчиков)
- метрики:
сколько событий опубликовано, сколько ошибок, среднее время обработчиков
(минимально — через коллбеки)
8) Фильтры / подписка “на всё” / шаблоны
Иногда хочется:
- подписка
на все ключи: SubscribeAll(...)
- фильтр по payload: Subscribe(key, handler, predicate)
- wildcard
по строкам (если ключ строковый), например AAPL.*
Это можно сделать отдельным уровнем: ключевой матчинг
— отдельная стратегия/интерфейс.
Предложение конкретной
“хорошей” реализации (без лишнего)
Ниже вариант: TKey, TMessage, отписка через IDisposable, хранение подписчиков через ImmutableArray
(copy-on-write). Это даёт:
- быстрый Publish без локов
(только чтение массива)
- нормальную
отписку
- предсказуемый
порядок (по порядку подписки)
- потокобезопасность
using
System.Collections.Concurrent;
using
System.Collections.Immutable;
using
System.Threading;
public sealed class EventHub<TKey, TMessage> where
TKey : notnull
{
private sealed
class Subscription : IDisposable
{
private readonly
EventHub<TKey, TMessage> _hub;
private readonly
TKey _key;
internal
readonly Action<TMessage> Handler;
private int
_disposed;
public Subscription(EventHub<TKey,
TMessage> hub, TKey key, Action<TMessage> handler)
{
_hub = hub;
_key
= key;
Handler = handler;
}
public void
Dispose()
{
if
(Interlocked.Exchange(ref _disposed, 1) == 1) return;
_hub.Unsubscribe(_key, this);
}
}
private readonly
ConcurrentDictionary<TKey, ImmutableArray<Subscription>> _map = new();
public
IDisposable Subscribe(TKey key, Action<TMessage> handler)
{
if
(handler is null) throw new ArgumentNullException(nameof(handler));
var sub
= new Subscription(this, key, handler);
_map.AddOrUpdate(
key,
_
=> ImmutableArray.Create(sub),
(_,
arr) => arr.Add(sub));
return
sub;
}
public
IDisposable SubscribeOnce(TKey key, Action<TMessage> handler)
{
IDisposable? sub = null;
sub =
Subscribe(key, msg =>
{
sub!.Dispose();
handler(msg);
});
return
sub;
}
public void Publish(TKey
key, TMessage message)
{
if
(!_map.TryGetValue(key, out var subs)) return;
//
ImmutableArray безопасно перечислять без локов
foreach
(var sub in subs)
{
sub.Handler(message);
}
}
public int GetSubscriberCount(TKey
key)
=>
_map.TryGetValue(key, out var subs) ? subs.Length : 0;
public bool ClearKey(TKey
key) => _map.TryRemove(key, out _);
private void
Unsubscribe(TKey key, Subscription sub)
{
while (true)
{
if
(!_map.TryGetValue(key, out var current)) return;
var
next = current.Remove(sub);
if (next.Length == 0)
{
// никто не подписан — убираем
ключ целиком
_map.TryRemove(new KeyValuePair<TKey,
ImmutableArray<Subscription>>(key, current));
return;
}
// попытка атомарно заменить массив
if
(_map.TryUpdate(key, next, current))
return;
}
}
}
Что можно добавить поверх этого (точечно и без
“переписывания всего”):
- PublishSafe(...) с перехватом исключений и Action<Exception, TKey>/Action<Exception,
TKey, TMessage>
- асинхронный EventHub<TKey,
TMessage> на Func<TMessage,
ValueTask> + PublishAsync
- настройку
“параллелить обработчики или нет”
Вопросы, чтобы выбрать лучший дизайн под ваш кейс
- Важен ли порядок
вызова обработчиков (FIFO по подписке)?
- Нужна ли отписка
и как часто она будет происходить?
- Publish должен
быть синхронным или нужно await-ить обработчики?
- В случае
исключения в одном обработчике: останавливаемся или продолжаем?
- Насколько
много ключей и подписчиков (примерно порядок: десятки/тысячи/сотни тысяч)?
Если ответите на эти 5 пунктов, я предложу оптимальную
конфигурацию (простую, но “правильную”) и можно будет добавить ровно те фичи,
которые реально нужны.
GPT-5.2
1. В данном
варианте порядок очень важен FIFO.
2. Отписка, подписка - нужна всегда в любое время без остановки работы EventHub
3. Publish должен быть синхронным про "await - ить" я не понял.
Сделай и то и другое. По логической переменной: либо sync либо await
4. В случае возникновении исключение работаем дальше, но блокируем вызывать
хэндлер на этом топике, пишем сообщение в лог. Все остальное должно работать.
Мы запрещаем работу на этом канале с определенным топиком.
5. Ключей может быть десятки - как Тикеры на бирже. Сколько этих Тикеров.
Наверное, больше сотни.
Ниже — вариант EventHub<TKey, TMessage> с:
- FIFO: порядок
вызова обработчиков = порядок подписки.
- Subscribe/Unsubscribe
в любое время без остановки хаба.
- Два режима
публикации: sync и async (выбирается логической переменной).
- Fault
isolation по топику: если любой хэндлер на топике кидает исключение,
мы:
- продолжаем
работу хаба в целом;
- помечаем
топик как “заблокированный” и больше не вызываем обработчики для
этого топика;
- логируем
причину (исключение + топик).
- Ключей
“десятки/сотни” — подходит.
Почему FIFO при конкурентных подписках/отписках
работает
Мы храним для каждого ключа неизменяемый массив
подписок (ImmutableArray).
Подписка/отписка создают новый массив и атомарно подменяют ссылку в ConcurrentDictionary через TryUpdate. Publish берёт снимок
текущего массива и вызывает по порядку. Это даёт:
- стабильный
FIFO для текущего Publish;
- отсутствие
блокировок на чтении;
- возможность
подписываться/отписываться параллельно.
Код
using System;
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Threading;
using System.Threading.Tasks;
public sealed class EventHub<TKey, TMessage> where
TKey : notnull
{
public enum
PublishMode { Sync, Async }
private sealed
class Subscription
{
public readonly Action<TMessage>?
Sync;
public readonly
Func<TMessage, ValueTask>? Async;
public Subscription(Action<TMessage>
sync) => Sync = sync;
public Subscription(Func<TMessage,
ValueTask> async) => Async = async;
}
public readonly
struct SubscriptionToken : IDisposable
{
private readonly
EventHub<TKey, TMessage>? _hub;
private readonly
TKey _key;
private readonly
Subscription? _sub;
internal
SubscriptionToken(EventHub<TKey, TMessage> hub, TKey key, Subscription
sub)
{
_hub
= hub;
_key
= key;
_sub
= sub;
}
public void
Dispose()
{
if
(_hub is null || _sub is null) return;
_hub.Unsubscribe(_key,
_sub);
}
}
private sealed
class TopicState
{
public
ImmutableArray<Subscription> Subs =
ImmutableArray<Subscription>.Empty;
public
Exception? Fault;
public bool
IsFaulted => Fault is not null;
}
private readonly
ConcurrentDictionary<TKey, TopicState> _topics = new();
/// <summary>Логгер
вызывается при первом падении топика (когда он блокируется).</summary>
private readonly
Action<TKey, Exception>? _onTopicFault;
public EventHub(Action<TKey,
Exception>? onTopicFault = null)
{
_onTopicFault = onTopicFault;
}
public bool IsTopicBlocked(TKey
key)
=>
_topics.TryGetValue(key, out var s) && s.IsFaulted;
public
Exception? GetTopicFault(TKey key)
=>
_topics.TryGetValue(key, out var s) ? s.Fault : null;
/// <summary>Ручная
разблокировка топика (например после алерта/разбора).</summary>
public bool TryUnblockTopic(TKey
key)
{
if (!_topics.TryGetValue(key,
out var s)) return false;
// Если хотите
строго атомарно — можно через lock(s). Но для вашего масштаба и модели —
достаточно Exchange.
Interlocked.Exchange(ref s.Fault, null);
return true;
}
public
SubscriptionToken Subscribe(TKey key, Action<TMessage> handler)
=>
AddSubscription(key, new Subscription(handler));
public
SubscriptionToken SubscribeAsync(TKey key, Func<TMessage, ValueTask>
handler)
=>
AddSubscription(key, new Subscription(handler));
private
SubscriptionToken AddSubscription(TKey key, Subscription sub)
{
var
state = _topics.GetOrAdd(key, _ => new TopicState());
while (true)
{
var
current = state.Subs;
//
FIFO: добавляем в конец
var
next = current.Add(sub);
// атомарная замена
ссылочного поля через CAS
if (ReferenceEquals(
Interlocked.CompareExchange(ref state.Subs, next, current),
current))
{
return
new SubscriptionToken(this, key, sub);
}
}
}
private void
Unsubscribe(TKey key, Subscription sub)
{
if
(!_topics.TryGetValue(key, out var state)) return;
while (true)
{
var
current = state.Subs;
var
next = current.Remove(sub);
if
(ReferenceEquals(
Interlocked.CompareExchange(ref state.Subs, next, current),
current))
{
// опционально: если пусто и не
faulted — можно чистить ключ, но это не обязательно при вашем масштабе.
// if (next.IsEmpty && !state.IsFaulted) _topics.TryRemove(key, out
_);
return;
}
}
}
/// <summary>
/// Синхронная публикация. В режиме Async вызывает async-хэндлеры, но блокирует поток на каждом (GetAwaiter().GetResult()).
/// </summary>
public void Publish(TKey
key, TMessage message, PublishMode mode)
{
if (!_topics.TryGetValue(key, out var
state)) return;
if
(state.IsFaulted) return;
var
snapshot = state.Subs; // FIFO snapshot
for (int
i = 0; i < snapshot.Length; i++)
{
var sub
= snapshot[i];
try
{
if
(mode == PublishMode.Sync)
{
// Sync режим: вызываем только sync-хэндлеры (async игнорируем)
sub.Sync?.Invoke(message);
}
else
{
//
Async режим: предпочитаем async, иначе sync
if (sub.Async is not null)
sub.Async(message).GetAwaiter().GetResult();
else
sub.Sync?.Invoke(message);
}
}
catch
(Exception ex)
{
BlockTopic(state, key, ex);
return; // после
блокировки топика прекращаем дальнейшие вызовы на этом топике
}
}
}
/// <summary>
/// Асинхронная публикация. В режиме Sync
вызывает sync-хэндлеры, async игнорирует.
/// В режиме Async await-ит async-хэндлеры
последовательно (FIFO).
/// </summary>
public async
ValueTask PublishAsync(TKey key, TMessage message, PublishMode mode)
{
if
(!_topics.TryGetValue(key, out var state)) return;
if
(state.IsFaulted) return;
var
snapshot = state.Subs;
for (int
i = 0; i < snapshot.Length; i++)
{
var
sub = snapshot[i];
try
{
if
(mode == PublishMode.Sync)
{
sub.Sync?.Invoke(message);
}
else
{
if (sub.Async is not null)
await sub.Async(message).ConfigureAwait(false);
else
sub.Sync?.Invoke(message);
}
}
catch
(Exception ex)
{
BlockTopic(state, key, ex);
return;
}
}
}
private void
BlockTopic(TopicState state, TKey key, Exception ex)
{
// Блокируем топик ровно один раз
(первое исключение выигрывает)
var prev = Interlocked.CompareExchange(ref state.Fault, ex, null);
if (prev
is null)
_onTopicFault?.Invoke(key, ex);
}
}
Важные уточнения по вашим требованиям
- FIFO
соблюдается внутри одного Publish и соответствует порядку подписки.
При конкурентной подписке “во время публикации” новый подписчик попадёт в следующий Publish (что обычно и нужно). - “Сделай и
sync и await”
Я сделал оба API: - Publish(key, msg, mode) — синхронный вызов.
- PublishAsync(key, msg, mode) — асинхронный вызов (можно await).
При этом mode решает, вызываем ли async-хэндлеры.
- mode == Sync:
вызываются только Action<TMessage>
(async-хэндлеры игнорируются).
- mode == Async: Func<TMessage,
ValueTask> await-ится (или блокируется в Publish), а если
подписан sync — он тоже отработает.
Если вам нужно другое поведение (например, в Sync режиме
вызывать async-хэндлеры “огрублённо” через .GetResult()), скажите —
поменяю.
- При
исключении блокируем топик
Сейчас: при первом исключении топик блокируется и дальнейшие публикации по ключу сразу return. Остальные топики работают. - Логирование
Передайте в конструктор onTopicFault: (key, ex) => logger.LogError(ex, "...").
Два вопроса, чтобы “доточить” поведение без сюрпризов
- При Publish(...,
mode: Sync) если подписчик был добавлен через SubscribeAsync: его
нужно игнорировать (как сейчас) или всё равно вызвать, блокируя
поток?
- Нужно ли
после блокировки топика автоматически отписывать всех (очищать
список), или достаточно просто держать флаг блокировки? (сейчас — просто
блокировка, список остаётся).
GPT-5.2
8. При
Publish(..., mode: Sync) если подписчик был добавлен через SubscribeAsync: его
нужно игнорировать (как сейчас) или всё равно вызвать, блокируя поток?
Здесь лучше из одного канала сделать два канала с разными Topics (Sync, Async)
9. Блокируем канал и очищаем весь список.
Понял. Тогда делаем два независимых канала (по
сути, два MessageBus/Topic-пространства):
один строго sync, второй строго async. Это убирает неоднозначность Publish(mode: Sync) vs SubscribeAsync.
Плюс: при любой ошибке блокируем топик и очищаем
список подписчиков.
Ниже вариант с одним “фасадом” ChannelsBus, который
содержит SyncBus и AsyncBus, с разными
типами подписок и публикаций.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Messaging
{
public sealed
class ChannelsBus<TKey>
where TKey
: notnull
{
public
SyncChannel<TKey> Sync { get; }
public
AsyncChannel<TKey> Async { get; }
public ChannelsBus(
Action<TKey, Exception>? onSyncTopicFault = null,
Action<TKey, Exception>? onAsyncTopicFault = null)
{
Sync
= new SyncChannel<TKey>(onSyncTopicFault);
Async = new AsyncChannel<TKey>(onAsyncTopicFault);
}
}
//
---------------- SYNC ----------------
public sealed
class SyncChannel<TKey>
where TKey
: notnull
{
private readonly
ConcurrentDictionary<TKey, SyncTopic> _topics = new();
private readonly
Action<TKey, Exception>? _onTopicFault;
public SyncChannel(Action<TKey,
Exception>? onTopicFault = null)
=> _onTopicFault = onTopicFault;
public
IDisposable Subscribe<TMessage>(TKey key, Action<TMessage> handler)
{
if
(handler is null) throw new ArgumentNullException(nameof(handler));
var
topic = _topics.GetOrAdd(key, k => new SyncTopic(k, _onTopicFault));
return
topic.Add(handler);
}
public void
Publish<TMessage>(TKey key, TMessage message)
{
if
(!_topics.TryGetValue(key, out var topic)) return;
topic.Publish(message);
}
private sealed
class SyncTopic
{
private
readonly TKey _key;
private
readonly Action<TKey, Exception>? _onFault;
private
readonly object _gate = new();
private List<ISyncHandler> _handlers
= new();
private
bool _blocked;
public
SyncTopic(TKey key, Action<TKey, Exception>? onFault)
{
_key = key;
_onFault = onFault;
}
public IDisposable Add<TMessage>(Action<TMessage>
handler)
{
lock
(_gate)
{
if (_blocked) return NoopDisposable.Instance;
var entry = new SyncHandler<TMessage>(handler);
var next = new List<ISyncHandler>(_handlers.Count + 1);
next.AddRange(_handlers);
next.Add(entry); // FIFO by subscription order
_handlers = next;
return new Subscription(this,
entry);
}
}
public
void Publish<TMessage>(TMessage message)
{
ISyncHandler[] snapshot;
lock
(_gate)
{
if (_blocked) return;
snapshot = _handlers.ToArray();
}
foreach
(var h in snapshot)
{
try
{
h.Invoke(message);
}
catch (Exception ex)
{
BlockAndClear(ex);
return;
}
}
}
private
void BlockAndClear(Exception ex)
{
lock
(_gate)
{
if (_blocked) return;
_blocked = true;
_handlers = new List<ISyncHandler>(0); // очищаем
}
_onFault?.Invoke(_key, ex);
}
private
void Remove(ISyncHandler entry)
{
lock
(_gate)
{
if (_blocked) return;
var idx = _handlers.IndexOf(entry);
if (idx < 0) return;
var next = new List<ISyncHandler>(_handlers.Count - 1);
for (int i = 0; i < _handlers.Count; i++)
if (i != idx) next.Add(_handlers[i]);
_handlers = next;
}
}
private
sealed class Subscription : IDisposable
{
private
SyncTopic? _topic;
private
readonly ISyncHandler _entry;
public
Subscription(SyncTopic topic, ISyncHandler entry)
{
_topic = topic;
_entry = entry;
}
public
void Dispose()
{
var t = Interlocked.Exchange(ref _topic, null);
t?.Remove(_entry);
}
}
private
interface ISyncHandler
{
void
Invoke<TMessage>(TMessage msg);
}
private
sealed class SyncHandler<TMessage> : ISyncHandler
{
private
readonly Action<TMessage> _handler;
public
SyncHandler(Action<TMessage> handler) => _handler = handler;
public
void Invoke<TAny>(TAny msg)
{
if (msg is TMessage typed)
_handler(typed);
}
}
}
}
//
---------------- ASYNC ----------------
public sealed
class AsyncChannel<TKey>
where TKey
: notnull
{
private readonly
ConcurrentDictionary<TKey, AsyncTopic> _topics = new();
private readonly
Action<TKey, Exception>? _onTopicFault;
public AsyncChannel(Action<TKey,
Exception>? onTopicFault = null)
=> _onTopicFault = onTopicFault;
public
IDisposable Subscribe<TMessage>(TKey key, Func<TMessage, ValueTask>
handler)
{
if
(handler is null) throw new ArgumentNullException(nameof(handler));
var
topic = _topics.GetOrAdd(key, k => new AsyncTopic(k, _onTopicFault));
return
topic.Add(handler);
}
public
ValueTask PublishAsync<TMessage>(TKey key, TMessage message,
CancellationToken ct = default)
{
if
(!_topics.TryGetValue(key, out var topic)) return ValueTask.CompletedTask;
return
topic.PublishAsync(message, ct);
}
private sealed
class AsyncTopic
{
private
readonly TKey _key;
private
readonly Action<TKey, Exception>? _onFault;
private
readonly object _gate = new();
private
List<IAsyncHandler> _handlers = new();
private
bool _blocked;
public
AsyncTopic(TKey key, Action<TKey, Exception>? onFault)
{
_key = key;
_onFault = onFault;
}
public
IDisposable Add<TMessage>(Func<TMessage, ValueTask> handler)
{
lock
(_gate)
{
if (_blocked) return NoopDisposable.Instance;
var entry = new AsyncHandler<TMessage>(handler);
var next = new List<IAsyncHandler>(_handlers.Count + 1);
next.AddRange(_handlers);
next.Add(entry); // FIFO by subscription order
_handlers = next;
return new Subscription(this, entry);
}
}
public
async ValueTask PublishAsync<TMessage>(TMessage message,
CancellationToken ct)
{
IAsyncHandler[] snapshot;
lock
(_gate)
{
if (_blocked) return;
snapshot = _handlers.ToArray();
}
foreach
(var h in snapshot)
{
ct.ThrowIfCancellationRequested();
try
{
await h.InvokeAsync(message,
ct).ConfigureAwait(false);
}
catch (Exception ex)
{
BlockAndClear(ex);
return;
}
}
}
private
void BlockAndClear(Exception ex)
{
lock
(_gate)
{
if (_blocked) return;
_blocked = true;
_handlers = new List<IAsyncHandler>(0); // очищаем
}
_onFault?.Invoke(_key, ex);
}
private
void Remove(IAsyncHandler entry)
{
lock
(_gate)
{
if (_blocked) return;
var idx = _handlers.IndexOf(entry);
if (idx < 0) return;
var next = new List<IAsyncHandler>(_handlers.Count - 1);
for (int i = 0; i < _handlers.Count; i++)
if (i != idx) next.Add(_handlers[i]);
_handlers = next;
}
}
private
sealed class Subscription : IDisposable
{
private
AsyncTopic? _topic;
private
readonly IAsyncHandler _entry;
public
Subscription(AsyncTopic topic, IAsyncHandler entry)
{
_topic = topic;
_entry = entry;
}
public
void Dispose()
{
var t = Interlocked.Exchange(ref _topic, null);
t?.Remove(_entry);
}
}
private
interface IAsyncHandler
{
ValueTask
InvokeAsync<TMessage>(TMessage msg, CancellationToken ct);
}
private
sealed class AsyncHandler<TMessage> : IAsyncHandler
{
private
readonly Func<TMessage, ValueTask> _handler;
public
AsyncHandler(Func<TMessage, ValueTask> handler) => _handler = handler;
public
ValueTask InvokeAsync<TAny>(TAny msg, CancellationToken ct)
{
if (msg is TMessage typed)
return _handler(typed);
return ValueTask.CompletedTask;
}
}
}
}
internal sealed
class NoopDisposable : IDisposable
{
public static
NoopDisposable Instance { get; } = new();
public void Dispose()
{ }
}
}
Как использовать:
var bus = new Messaging.ChannelsBus<string>(
onSyncTopicFault: (k, ex) => Console.WriteLine($"SYNC topic '{k}'
fault: {ex.Message}"),
onAsyncTopicFault:
(k, ex) => Console.WriteLine($"ASYNC topic '{k}' fault: {ex.Message}")
);
bus.Sync.Subscribe<int>("SyncTopic", x
=> Console.WriteLine($"sync {x}"));
bus.Async.Subscribe<int>("AsyncTopic",
async x => { await Task.Delay(10); Console.WriteLine($"async {x}");
});
bus.Sync.Publish("SyncTopic", 123);
await bus.Async.PublishAsync("AsyncTopic", 456);
Если хотите, добавлю:
- TryPublish/IsBlocked(key) для диагностики.
- Авто-удаление
топика из словаря после блокировки (сейчас он остаётся, но пустой и
заблокированный).
GPT-5.2
Спасибо.
Все работает.
Я предполагаю это использовать для работы в Asp.Core c BackGroundServices
Bus будет SingleTone
BAckGroundServices - будут взаимодействовать между собой через Bus
1. BAckGroundServiceQuotes генерит биржевую котировку
2. Котировка попадает в BAckGroundServiceStrategy. Strategy генерит Order
3. Order попадает В BAckGroundServiceExecutor.
BAckGroundServiceExecutor генерит Trade
4. Trade попадает в BAckGroundServicePositions. Расчитывается
Позиция итд
5. Все новые сущности попадают в BAckGroundServiceInMemoryDataBase - здесь
происходит их сохранение для визуализации
6. Все новые сущности также попадают в BAckGroundServiceEntityFrameworkDataBase
- здесь происходит их сохранение.
--------
Вот такой очень примерный сценарий.
Несколько BAckgroundServices и BusLibrary.
Комментариев нет:
Отправить комментарий