其他分享
首页 > 其他分享> > 互操作

互操作

作者:互联网

>>返回《C# 并发编程》

异步封装

1. 用 async 代码封装异步方法与 Completed 事件

public static void MyDownloadStringTaskAsyncRun()
{
    WebClient client = new WebClient();
    string res = client.MyDownloadStringTaskAsync(new Uri("http://www.baidu.com")).Result;
    System.Console.WriteLine(res);
}

public static Task<string> MyDownloadStringTaskAsync(this WebClient client, Uri address)
{
    var tcs = new TaskCompletionSource<string>();
    // 这个事件处理程序会完成 Task 对象,并自行注销。
    DownloadStringCompletedEventHandler handler = null;
    handler = (_, e) =>
    {
        client.DownloadStringCompleted -= handler;
        if (e.Cancelled)
            tcs.TrySetCanceled();
        else if (e.Error != null)
            tcs.TrySetException(e.Error);
        else
            tcs.TrySetResult(e.Result);
    };
    // 登记事件,然后开始操作。
    client.DownloadStringCompleted += handler;
    client.DownloadStringAsync(address);
    return tcs.Task;
}

输出:

<!DOCTYPE html><!--STATUS OK-->
<html>
... ...
</html>

2. 用 async 代码封装 Begin/End 方法

public static void GetResponseAsyncRun()
{
    WebRequest request = WebRequest.Create("http://www.baidu.com");
    var response = request.MyGetResponseAsync().Result;

    System.Console.WriteLine($"WebResponse.ContentLength:{response.ContentLength}");
}
public static Task<WebResponse> MyGetResponseAsync(this WebRequest client)
{
    return Task<WebResponse>.Factory.FromAsync(client.BeginGetResponse, client.EndGetResponse, null);
}

输出:

WebResponse.ContentLength:14615

3. 用 async 代码封装并行代码

await Task.Run(() => Parallel.ForEach(...));

通过使用 Task.Run ,所有的并行处理过程都推给了线程池

Task.Run 返回一个代表并行任务的 Task 对象

4. 用 async 代码封装 Rx Observable 对象

事件流中几种可能关注的情况:

public delegate void HelloEventHandler(object sender, HelloEventArgs e);
public class HelloEventArgs : EventArgs
{
    public string Name { get; set; }
    public HelloEventArgs(string name)
    {
        Name = name;
    }
    public int SayHello()
    {
        System.Console.WriteLine(Name + " Hello.");
        return DateTime.Now.Millisecond;
    }
}

public static event HelloEventHandler HelloHandlerEvent;
public static void FirstLastRun()
{
    var task = Task.Run(() =>
    {
        Thread.Sleep(500);
        HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("lilei"));
        HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("HanMeimei"));
        HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Tom"));
        HelloHandlerEvent?.Invoke(new object(), new HelloEventArgs("Jerry"));

    });

    var observable = Observable.FromEventPattern<HelloEventHandler, HelloEventArgs>(
     handler => (s, a) => handler.Invoke(s, a), handler => HelloHandlerEvent += handler, handler => HelloHandlerEvent -= handler)
     .Select(evt => evt.EventArgs.SayHello()).ObserveOn(Scheduler.Default)
     .Select(s =>
     {
        // 复杂的计算过程。
        Thread.Sleep(100);
        var result = s;
        Console.WriteLine("Now Millisecond result " + result + " on thread " + Environment.CurrentManagedThreadId);
        return result;
     })
     .Take(3)//这个标识3个就结束了
     ;

    var res =
        Task.Run(async () => await observable
    // //4个hello,3个result,res为最后一个的结果
    //.FirstAsync()//4个hello,1个result,res为第一个的结果
    //.LastAsync()//4个hello,3个result,res为最后一个的结果
    //.ToList()//4个hello,3个result,res为3个的结果
    ).Result;
    System.Console.WriteLine($"Res:{string.Join(',', res)},ResType:{res.GetType().Name}");

    task.Wait();
}

输出:

lilei Hello.
HanMeimei Hello.
Tom Hello.
Jerry Hello.
Now Millisecond result 534 on thread 7
Now Millisecond result 544 on thread 7
Now Millisecond result 544 on thread 7
Res:544,ResType:Int32

await 调用 Observable 对象或 LastAsync 时,代码(异步地)等待事件流完成,然后返 回最后一个元素。

使用 FirstAsync 可捕获事件流中, FirstAsync 方法执行后的下一个事件。

使用 ToList 可捕获事件流中的所有事件:

IObservable<int> observable = ...;     
IList<int> allElements = await observable.ToList();

5. 用 Rx Observable 对象封装 async 代码

任何异步操作都可看作一个满足以下条件之一的可观察流:

ToObservableStartAsync 都会立即启动异步操作,而不会等待订阅

如果要让 observable 对象在接受订阅后才启动操作,可使用 FromAsync

public static void AsyncObservableRun()
{
    var client = new HttpClient();

    IObservable<int> response1 = Task.Run(() => { System.Console.WriteLine("Run 1."); return 1; }).ToObservable();//直接执行

    IObservable<int> response2 = Observable.StartAsync(token => Task.Run(() => { System.Console.WriteLine("Run 2."); return 2; }, token));//直接执行

    IObservable<int> response3 = Observable.FromAsync(token => Task.Run(() => { System.Console.WriteLine("Run 3."); return 3; }, token));//订阅后执行

    var res = Task.Run(async () =>
        await response1
        //await response2
        //await response3
    ).Result;
    System.Console.WriteLine($"Res:{res}");
}

输出(response1):

Run 1.
Run 2.
Res:1

输出(response2):

Run 1.
Run 2.
Res:2

输出(response1):

Run 1.
Run 2.
Run 3.
Res:3

下面的例子使用一个已有的 URL 事件流,在每个 URL 到达时发出一个请求:

public static void SelectManyRun()
{
    IObservable<int> nums = new int[] { 1, 2, 3 }.ToObservable();
    IObservable<int> observable = nums.SelectMany((n, token) => Task.Run<int>(() => { System.Console.WriteLine($"Run {n}."); return n + 1; }, token));

    var res = Task.Run(async () => await observable.LastAsync()).Result;

    System.Console.WriteLine($"Res:{res}");
}

输出:

Run 1.
Run 2.
Run 3.
Res:3

6. Rx Observable 对象和数据流网格

同一个项目中

现在需要它们能互相沟通。

网格转可观察流

public static void BlockToObservableRun()
{
    var buffer = new BufferBlock<int>();
    IObservable<int> integers = buffer.AsObservable();
    integers.Subscribe(
        data => Console.WriteLine(data),
        ex => Console.WriteLine(ex),
        () => Console.WriteLine("Done"));

    buffer.Post(1);
    buffer.Post(2);
    buffer.Complete();

    buffer.Completion.Wait();
}

输出:

1
2

AsObservable 方法会把数据流块的完成信息(或出错信息)转化可观察流的完成信息。

可观察流转网格

public static void ObservableToBlockRun()
{
    IObservable<DateTimeOffset> ticks = Observable.Interval(TimeSpan.FromSeconds(1))
        .Timestamp()
        .Select(x => x.Timestamp)
        .Take(5);

    var display = new ActionBlock<DateTimeOffset>(x => Console.WriteLine(x));
    ticks.Subscribe(display.AsObserver());

    try
    {
        display.Completion.Wait();
        Console.WriteLine("Done.");
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex);
    }
}

输出:

2020/2/1 上午1:42:24 +00:00
2020/2/1 上午1:42:25 +00:00
2020/2/1 上午1:42:26 +00:00
2020/2/1 上午1:42:27 +00:00
2020/2/1 上午1:42:28 +00:00
Done.

标签:Task,Console,互操作,WriteLine,new,Run,public
来源: https://www.cnblogs.com/BigBrotherStone/p/12247598.html