비동기 BlockingCollection과 같은 것이 있습니까??
나는 싶습니다 await
의 결과에 BlockingCollection<T>.Take()
비동기, 그래서 스레드를 차단하지 않습니다. 다음과 같은 것을 찾고 있습니다.
var item = await blockingCollection.TakeAsync();
나는 이것을 할 수 있다는 것을 알고있다
var item = await Task.Run(() => blockingCollection.Take());
그러나 그것은 다른 스레드 (의 ThreadPool
)가 대신 차단 되기 때문에 전체 아이디어를 죽 입니다.
대안이 있습니까?
내가 아는 네 가지 대안이 있습니다.
첫 번째는 비동기 및 작업 을 지원하는 스레드 세이프 대기열을 제공하는 Channels 입니다. 채널은 고도로 최적화되어 있으며 임계 값에 도달하면 일부 항목을 삭제할 수 있습니다.Read
Write
다음은 TPL DataflowBufferBlock<T>
에서 가져온 것 입니다. 당신은 단지 하나의 소비자가있는 경우 사용 하거나 , 또는 단지는 링크 . 자세한 내용 은 내 블로그를 참조하십시오 .OutputAvailableAsync
ReceiveAsync
ActionBlock<T>
마지막 두 가지는 내가 만든 유형이며 AsyncEx 라이브러리 에서 사용할 수 있습니다 .
AsyncCollection<T>
과 async
거의 동일 BlockingCollection<T>
하며 ConcurrentQueue<T>
또는 같은 동시 생산자 / 소비자 컬렉션을 래핑 할 수 ConcurrentBag<T>
있습니다. TakeAsync
컬렉션의 항목을 비동기 적으로 소비 하는 데 사용할 수 있습니다 . 자세한 내용 은 내 블로그를 참조하십시오 .
AsyncProducerConsumerQueue<T>
더 이식 가능하고 async
호환되는 생산자 / 소비자 대기열입니다. DequeueAsync
큐에서 항목을 비동기 적으로 소비 하는 데 사용할 수 있습니다 . 자세한 내용 은 내 블로그를 참조하십시오 .
마지막 세 가지 대안은 동기 및 비동기 풋 앤 테이크를 허용합니다.
... 또는 다음과 같이 할 수 있습니다.
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
public class AsyncQueue<T>
{
private readonly SemaphoreSlim _sem;
private readonly ConcurrentQueue<T> _que;
public AsyncQueue()
{
_sem = new SemaphoreSlim(0);
_que = new ConcurrentQueue<T>();
}
public void Enqueue(T item)
{
_que.Enqueue(item);
_sem.Release();
}
public void EnqueueRange(IEnumerable<T> source)
{
var n = 0;
foreach (var item in source)
{
_que.Enqueue(item);
n++;
}
_sem.Release(n);
}
public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
{
for (; ; )
{
await _sem.WaitAsync(cancellationToken);
T item;
if (_que.TryDequeue(out item))
{
return item;
}
}
}
}
간단하고 완벽하게 작동하는 비동기식 FIFO 대기열.
참고 :
SemaphoreSlim.WaitAsync
그 이전에 .NET 4.5에 추가되었는데, 이것이 그렇게 간단하지는 않았습니다.
약간의 해킹이 괜찮다면이 확장 프로그램을 사용해 볼 수 있습니다.
public static async Task AddAsync<TEntity>(
this BlockingCollection<TEntity> Bc, TEntity item, CancellationToken abortCt)
{
while (true)
{
try
{
if (Bc.TryAdd(item, 0, abortCt))
return;
else
await Task.Delay(100, abortCt);
}
catch (Exception)
{
throw;
}
}
}
public static async Task<TEntity> TakeAsync<TEntity>(
this BlockingCollection<TEntity> Bc, CancellationToken abortCt)
{
while (true)
{
try
{
TEntity item;
if (Bc.TryTake(out item, 0, abortCt))
return item;
else
await Task.Delay(100, abortCt);
}
catch (Exception)
{
throw;
}
}
}
다음은 BlockingCollection
많은 누락 된 기능과 함께 대기를 지원 하는의 매우 기본적인 구현입니다 . AsyncEnumerable
8.0 이전의 C # 버전에 대해 비동기 열거를 가능하게 하는 라이브러리를 사용합니다 .
public class AsyncBlockingCollection<T>
{ // Missing features: cancellation, boundedCapacity, TakeAsync
private Queue<T> _queue = new Queue<T>();
private SemaphoreSlim _semaphore = new SemaphoreSlim(0);
private int _consumersCount = 0;
private bool _isAddingCompleted;
public void Add(T item)
{
lock (_queue)
{
if (_isAddingCompleted) throw new InvalidOperationException();
_queue.Enqueue(item);
}
_semaphore.Release();
}
public void CompleteAdding()
{
lock (_queue)
{
if (_isAddingCompleted) return;
_isAddingCompleted = true;
if (_consumersCount > 0) _semaphore.Release(_consumersCount);
}
}
public IAsyncEnumerable<T> GetConsumingEnumerable()
{
lock (_queue) _consumersCount++;
return new AsyncEnumerable<T>(async yield =>
{
while (true)
{
lock (_queue)
{
if (_queue.Count == 0 && _isAddingCompleted) break;
}
await _semaphore.WaitAsync();
bool hasItem;
T item = default;
lock (_queue)
{
hasItem = _queue.Count > 0;
if (hasItem) item = _queue.Dequeue();
}
if (hasItem) await yield.ReturnAsync(item);
}
});
}
}
사용 예 :
var abc = new AsyncBlockingCollection<int>();
var producer = Task.Run(async () =>
{
for (int i = 1; i <= 10; i++)
{
await Task.Delay(100);
abc.Add(i);
}
abc.CompleteAdding();
});
var consumer = Task.Run(async () =>
{
await abc.GetConsumingEnumerable().ForEachAsync(async item =>
{
await Task.Delay(200);
await Console.Out.WriteAsync(item + " ");
});
});
await Task.WhenAll(producer, consumer);
산출:
12 34 5678 9 10
Update: With the release of C# 8, asynchronous enumeration has become a build-in language feature. The required classes (IAsyncEnumerable
, IAsyncEnumerator
) are embedded in .NET Core 3.0, and are offered as a package for .NET Framework 4.6.1+ (Microsoft.Bcl.AsyncInterfaces).
Here is an alternative GetConsumingEnumerable
implementation, featuring the new C# 8 syntax:
public async IAsyncEnumerable<T> GetConsumingEnumerable()
{
lock (_queue) _consumersCount++;
while (true)
{
lock (_queue)
{
if (_queue.Count == 0 && _isAddingCompleted) break;
}
await _semaphore.WaitAsync();
bool hasItem;
T item = default;
lock (_queue)
{
hasItem = _queue.Count > 0;
if (hasItem) item = _queue.Dequeue();
}
if (hasItem) yield return item;
}
}
Note the coexistence of await
and yield
in the same method.
Usage example (C# 8):
var consumer = Task.Run(async () =>
{
await foreach (var item in abc.GetConsumingEnumerable())
{
await Task.Delay(200);
await Console.Out.WriteAsync(item + " ");
}
});
Note the await
before the foreach
.
참고URL : https://stackoverflow.com/questions/21225361/is-there-anything-like-asynchronous-blockingcollectiont
'developer tip' 카테고리의 다른 글
LL과 재귀 하강 파서의 차이점은 무엇입니까? (0) | 2020.11.02 |
---|---|
표준 UIButton에 배지를 추가하려면 어떻게해야합니까? (0) | 2020.11.02 |
phpMyAdmin과 유사한 PostgreSQL 시각적 인터페이스? (0) | 2020.11.02 |
C ++ 0x에서 nullptr을 삭제하는 것이 여전히 안전합니까? (0) | 2020.11.02 |
What is web.xml file and what are all things can I do with it? (0) | 2020.11.01 |