返回顶部
首页 > 资讯 > 后端开发 > 其他教程 >C#多线程系列之工作流实现
  • 351
分享到

C#多线程系列之工作流实现

2024-04-02 19:04:59 351人浏览 安东尼
摘要

目录前言节点ThenParallelScheduleDelay试用一下顺序节点并行任务编写工作流接口构建器工作流构建器依赖注入实现工作流解析前言 前面学习了很多多线程和任务的基础知识

前言

前面学习了很多多线程和任务的基础知识,这里要来实践一下啦。通过本篇教程,你可以写出一个简单的工作流引擎。

本篇教程内容完成是基于任务的,只需要看过笔者的三篇关于异步的文章,掌握 C# 基础,即可轻松完成。

  • C#多线程系列之任务基础(一)
  • C#多线程系列之任务基础(二)
  • C#多线程系列之任务基础(三)

由于本篇文章编写的工作流程序,主要使用任务,有些逻辑过程会比较难理解,多测试一下就好。代码主要还是 C# 基础,为什么说简单?

  • 不包含 async 、await
  • 几乎不含包含多线程(有个读写)
  • 不包含表达式树
  • 几乎不含反射(有个小地方需要反射一下,但是非常简单)
  • 没有复杂的算法

因为是基于任务(Task)的,所以可以轻松设计组合流程,组成复杂的工作流。

由于只是讲述基础,所以不会包含很多种流程控制,这里只实现一些简单的。

先说明,别用到业务上。。。这个工作流非常简单,就几个功能,这个工作流是基于笔者的多线程系列文章的知识点。写这个东西是为了讲解任务操作,让读者更加深入理解任务。

代码地址:https://GitHub.com/whuanle/CZGL.FLow

节点

在开始前,我们来设计几种流程控制的东西。

将一个 步骤/流程/节点 称为 step。

Then

一个普通的节点,包含一个任务。

多个 Then 节点,可以组成一条连续的工作流。

Parallel

并行节点,可以设置多个并行节点放到 Parallel 中,以及在里面为任一个节点创建新的分支。

Schedule

定时节点,创建后会在一定时间后执行节点中的任务。

Delay

让当前任务阻塞一段时间。

试用一下

顺序节点

打开你的 VS ,创建项目,Nuget 引用 CZGL.DoFlow ,版本 1.0.2 。

创建一个类 MyFlow1,继承 IDoFlow

    public class MyFlow1 : IDoFlow
    {
        public int Id => 1;

        public string Name => "随便起个名字";

        public int Version => 1;

        public IDoFlowBuilder Build(IDoFlowBuilder builder)
        {
            throw new NotImplementedException();
        }
    }

你可以创建多个工作流任务,每个工作流的 Id 必须唯一。Name 和 Version 随便填,因为这里笔者没有对这几个字段做逻辑。

IDoFlowBuilder 是构建工作流的一个接口。

我们来写一个工作流测试一下。

/// <summary>
/// 普通节点 Then 使用方法
/// </summary>
public class MyFlow1 : IDoFlow
{
    public int Id => 1;
    public string Name => "test";
    public int Version => 1;

    public IDoFlowBuilder Build(IDoFlowBuilder builder)
    {
        builder.StartWith(() =>
        {
            Console.WriteLine("工作流开始");
        }).Then(() =>
        {
            Console.WriteLine("下一个节点");
        }).Then(() =>
         {
             Console.WriteLine("最后一个节点");
         });
        return builder;
    }
} 

Main 方法中:

        static void Main(string[] args)
        {
            FlowCore.ReGISterWorkflow<MyFlow1>();
            // FlowCore.RegisterWorkflow(new MyFlow1());
            FlowCore.Start(1);
            Console.ReadKey();
        }

.StartWith() 方法开始一个工作流;

FlowCore.RegisterWorkflow<T>() 注册一个工作流;

FlowCore.Start();执行一个工作流;

并行任务

其代码如下:

    /// <summary>
    /// 并行节点 Parallel 使用方法
    /// </summary>
    public class MyFlow2 : IDoFlow
    {
        public int Id => 2;
        public string Name => "test";
        public int Version => 1;

        public IDoFlowBuilder Build(IDoFlowBuilder builder)
        {
            builder.StartWith()
                .Parallel(steps =>
                {
                    // 每个并行任务也可以设计后面继续执行其它任务
                    steps.Do(() =>
                    {
                        Console.WriteLine("并行1");
                    }).Do(() =>
                    {
                        Console.WriteLine("并行2");
                    });
                    steps.Do(() =>
                    {
                        Console.WriteLine("并行3");
                    });

                    // 并行任务设计完成后,必须调用此方法
                    // 此方法必须放在所有并行任务 .Do() 的最后
                    steps.EndParallel();

                    // 如果 .Do() 在 EndParallel() 后,那么不会等待此任务
                    steps.Do(() => { Console.WriteLine("并行异步"); });

                    // 开启新的分支
                    steps.StartWith()
                    .Then(() =>
                    {
                        Console.WriteLine("新的分支" + Task.CurrentId);
                    }).Then(() => { Console.WriteLine("分支2.0" + Task.CurrentId); });

                }, false)
                .Then(() =>
                {
                    Console.WriteLine("11111111111111111 ");
                });

            return builder;
        }
    }

Main 方法中:

        static void Main(string[] args)
        {
            FlowCore.RegisterWorkflow<MyFlow2>();
            FlowCore.Start(2);
            Console.ReadKey();
        }

通过以上示例,可以大概了解本篇文章中我们要写的程序。

编写工作流

建立一个类库项目,名为 DoFlow

建立 ExtensionsInterfacesServices 三个目录。

接口构建器

新建 IStepBuilder 接口文件到 Interfaces 目录,其内容如下:

using System;

namespace DoFlow.Interfaces
{
    public interface IStepBuilder
    {
        /// <summary>
        /// 普通节点
        /// </summary>
        /// <param name="stepBuilder"></param>
        /// <returns></returns>
        IStepBuilder Then(Action action);

        /// <summary>
        /// 多个节点
        /// <para>默认下,需要等待所有的任务完成,这个step才算完成</para>
        /// </summary>
        /// <param name="action"></param>
        /// <param name="anyWait">任意一个任务完成即可跳转到下一个step</param>
        /// <returns></returns>
        IStepBuilder Parallel(Action<IStepParallel> action, bool anyWait = false);

        /// <summary>
        /// 节点将在某个时间间隔后执行
        /// <para>异步,不会阻塞当前工作流的运行,计划任务将在一段时间后触发</para>
        /// </summary>
        /// <returns></returns>
        IStepBuilder Schedule(Action action, TimeSpan time);

        /// <summary>
        /// 阻塞一段时间
        /// </summary>
        /// <param name="time"></param>
        /// <returns></returns>
        IStepBuilder Delay(TimeSpan time);
    }
}

新建 IStepParallel 文件到 Interfaces 目录。

using System;

namespace DoFlow.Interfaces
{
    /// <summary>
    /// 并行任务
    ///  <para>默认情况下,只有这个节点的所有并行任务都完成后,这个节点才算完成</para>
    /// </summary>
    public interface IStepParallel
    {
        /// <summary>
        /// 一个并行任务
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        IStepParallel Do(Action action);

        /// <summary>
        /// 开始一个分支
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        IStepBuilder StartWith(Action action = null);

        /// <summary>
        /// 必须使用此方法结束一个并行任务
        /// </summary>
        void EndParallel();
    }

    /// <summary>
    /// 并行任务
    /// <para>任意一个任务完成后,就可以跳转到下一个 step</para>
    /// </summary>
    public interface IStepParallelAny : IStepParallel
    {

    }
}

工作流构建器

新建 IDoFlowBuilder 接口文件到 Interfaces 目录。

using System;
using System.Threading.Tasks;

namespace DoFlow.Interfaces
{
    /// <summary>
    /// 构建工作流任务
    /// </summary>
    public interface IDoFlowBuilder
    {
        /// <summary>
        /// 开始一个 step
        /// </summary>
        IStepBuilder StartWith(Action action = null);
        void EndWith(Action action);

        Task ThatTask { get; }
    }
}

新建 IDoFlow 接口文件到 Interfaces 目录。

namespace DoFlow.Interfaces
{

    /// <summary>
    /// 工作流
    /// <para>无参数传递</para>
    /// </summary>
    public interface IDoFlow
    {
        /// <summary>
        /// 全局唯一标识
        /// </summary>
        int Id { get; }

        /// <summary>
        /// 标识此工作流的名称
        /// </summary>
        string Name { get; }

        /// <summary>
        /// 标识此工作流的版本
        /// </summary>
        int Version { get; }

        IDoFlowBuilder Build(IDoFlowBuilder builder);
    }
}

依赖注入

新建 DependencyInjectionService 文件到 Services 目录。

用于实现依赖注入和解耦。

using DoFlow.Extensions;
using Microsoft.Extensions.DependencyInjection;
using System;

namespace DoFlow.Services
{
    /// <summary>
    /// 依赖注入服务
    /// </summary>
    public static class DependencyInjectionService
    {
        private static IServiceCollection _servicesList;
        private static IServiceProvider _services;
        static DependencyInjectionService()
        {
            IServiceCollection services = new ServiceCollection();
            _servicesList = services;
            // 注入引擎需要的服务
            InitExtension.StartInitExtension();
            var serviceProvider = services.BuildServiceProvider();
            _services = serviceProvider;
        }

        /// <summary>
        /// 添加一个注入到容器服务
        /// </summary>
        /// <typeparam name="TService"></typeparam>
        /// <typeparam name="TImplementation"></typeparam>
        public static void AddService<TService, TImplementation>()
            where TService : class
            where TImplementation : class, TService
        {
            _servicesList.AddTransient<TService, TImplementation>();
        }

        /// <summary>
        /// 获取需要的服务
        /// </summary>
        /// <typeparam name="TIResult"></typeparam>
        /// <returns></returns>
        public static TIResult GetService<TIResult>()
        {
            TIResult Tservice = _services.GetService<TIResult>();
            return Tservice;
        }
    }
}

添加一个 InitExtension 文件到 Extensions 目录。

using DoFlow.Interfaces;
using DoFlow.Services;

namespace DoFlow.Extensions
{
    public static class InitExtension
    {
        private static bool IsInit = false;
        public static void StartInitExtension()
        {
            if (IsInit) return;
            IsInit = true;
            DependencyInjectionService.AddService<IStepBuilder, StepBuilder>();
            DependencyInjectionService.AddService<IDoFlowBuilder, DoFlowBuilder>();
            DependencyInjectionService.AddService<IStepParallel, StepParallelWhenAll>();
            DependencyInjectionService.AddService<IStepParallelAny, StepParallelWhenAny>();
        }
    }
}

实现工作流解析

以下文件均在 Services 目录建立。

新建 StepBuilder 文件,用于解析节点,构建任务。

using DoFlow.Interfaces;
using System;
using System.Threading.Tasks;

namespace DoFlow.Services
{

    /// <summary>
    /// 节点工作引擎
    /// </summary>
    public class StepBuilder : IStepBuilder
    {
        private Task _task;

        /// <summary>
        /// 延迟执行
        /// </summary>
        /// <param name="time"></param>
        /// <returns></returns>
        public IStepBuilder Delay(TimeSpan time)
        {
            Task.Delay(time).Wait();
            return this;
        }

        /// <summary>
        /// 并行 step
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public IStepBuilder Parallel(Action<IStepParallel> action, bool anyAwait = false)
        {
            IStepParallel parallel = anyAwait ? DependencyInjectionService.GetService<IStepParallelAny>() : DependencyInjectionService.GetService<IStepParallel>();
            Task task = new Task(() =>
            {
                action.Invoke(parallel);
            });

            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                task.Start();
            });
            _task = task;
            return this;
        }

        /// <summary>
        /// 计划任务
        /// </summary>
        /// <param name="action"></param>
        /// <param name="time"></param>
        /// <returns></returns>
        public IStepBuilder Schedule(Action action, TimeSpan time)
        {
            Task.Factory.StartNew(() =>
            {
                Task.Delay(time).Wait();
                action.Invoke();
            });
            return this;
        }

        /// <summary>
        /// 普通 step
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public IStepBuilder Then(Action action)
        {
            Task task = new Task(action);
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                task.Start();
                task.Wait();
            });
            _task = task;
            return this;
        }

        public void SetTask(Task task)
        {
            _task = task;
        }
    }
}

新建 StepParallel 文件,里面有两个类,用于实现同步任务。

using DoFlow.Interfaces;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace DoFlow.Services
{
    /// <summary>
    /// 第一层所有任务结束后才能跳转下一个 step
    /// </summary>
    public class StepParallelWhenAll : IStepParallel
    {
        private Task _task;
        private readonly List<Task> _tasks = new List<Task>();
        public StepParallelWhenAll()
        {
            _task = new Task(() => { },TaskCreationOptions.AttachedToParent);
        }
        public IStepParallel Do(Action action)
        {
            _tasks.Add(Task.Run(action));
            return this;
        }

        public void EndParallel()
        {
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                Task.WhenAll(_tasks).Wait();
            });
        }

        public IStepBuilder StartWith(Action action = null)
        {
            Task task =
                action is null ? new Task(() => { })
                : new Task(action);
            var _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); });

            return _stepBuilder;
        }
    }

    /// <summary>
    /// 完成任意一个任务即可跳转到下一个 step
    /// </summary>
    public class StepParallelWhenAny : IStepParallelAny
    {
        private Task _task;
        private readonly List<Task> _tasks = new List<Task>();
        public StepParallelWhenAny()
        {
            _task = Task.Run(() => { });
        }
        public IStepParallel Do(Action action)
        {
            _tasks.Add(Task.Run(action));
            return this;
        }

        public void EndParallel()
        {
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                Task.WhenAny(_tasks).Wait();
            });
        }

        public IStepBuilder StartWith(Action action = null)
        {
            Task task =
                action is null ? new Task(() => { })
                : new Task(action);
            var _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); });

            return _stepBuilder;
        }
    }
}

新建 DoFlowBuilder 文件,用于构建工作流。

using DoFlow.Interfaces;
using System;
using System.Threading.Tasks;

namespace DoFlow.Services
{
    public class DoFlowBuilder : IDoFlowBuilder
    {
        private Task _task;
        public Task ThatTask => _task;

        public void EndWith(Action action)
        {
            _task.Start();
        }

        public IStepBuilder StartWith(Action action = null)
        {
            if (action is null)
                _task = new Task(() => { });
            else _task = new Task(action);

            IStepBuilder _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            ((StepBuilder)_stepBuilder).SetTask(_task);
            return _stepBuilder;
        }
    }
}

新建 FlowEngine 文件,用于执行工作流。

using DoFlow.Interfaces;

namespace DoFlow.Services
{
    /// <summary>
    /// 工作流引擎
    /// </summary>
    public class FlowEngine
    {
        private readonly IDoFlow _flow;
        public FlowEngine(IDoFlow flow)
        {
            _flow = flow;
        }

        /// <summary>
        /// 开始一个工作流
        /// </summary>
        public void Start()
        {
            IDoFlowBuilder builder = DependencyInjectionService.GetService<IDoFlowBuilder>();
            _flow.Build(builder).ThatTask.Start();
        }
    }
}

新建 FlowCore 文件,用于存储和索引工作流。使用读写锁解决并发字典问题。

using DoFlow.Interfaces;
using System;
using System.Collections.Generic;
using System.Threading;

namespace DoFlow.Services
{
    public static class FlowCore
    {
        private static Dictionary<int, FlowEngine> flowEngines = new Dictionary<int, FlowEngine>();

        // 读写锁
        private static ReaderWriterLockSlim readerWriterLockSlim = new ReaderWriterLockSlim();

        /// <summary>
        /// 注册工作流
        /// </summary>
        /// <param name="flow"></param>
        public static bool RegisterWorkflow(IDoFlow flow)
        {
            try
            {
                readerWriterLockSlim.EnterReadLock();
                if (flowEngines.ContainsKey(flow.Id))
                    return false;
                flowEngines.Add(flow.Id, new FlowEngine(flow));
                return true;
            }
            finally
            {
                readerWriterLockSlim.ExitReadLock();
            }
        }

        /// <summary>
        /// 注册工作流
        /// </summary>
        /// <param name="flow"></param>
        public static bool RegisterWorkflow<TDoFlow>()
        {

            Type type = typeof(TDoFlow);
            IDoFlow flow = (IDoFlow)Activator.CreateInstance(type);
            try
            {
                readerWriterLockSlim.EnterReadLock();
                if (flowEngines.ContainsKey(flow.Id))
                    return false;
                flowEngines.Add(flow.Id, new FlowEngine(flow));
                return true;
            }
            finally
            {
                readerWriterLockSlim.ExitReadLock();
            }
        }

        /// <summary>
        /// 要启动的工作流
        /// </summary>
        /// <param name="id"></param>
        public static bool Start(int id)
        {
            FlowEngine engine;
            // 读写锁
            try
            {
                readerWriterLockSlim.EnterUpgradeableReadLock();

                if (!flowEngines.ContainsKey(id))
                    return default;
                try
                {
                    readerWriterLockSlim.EnterWriteLock();
                    engine = flowEngines[id];
                }
                catch { return default; }
                finally
                {
                    readerWriterLockSlim.ExitWriteLock();
                }
            }
            catch { return default; }
            finally
            {
                readerWriterLockSlim.ExitUpgradeableReadLock();
            }

            engine.Start();
            return true;
        }
    }
}

就这样程序写完了。

到此这篇关于C#多线程系列之工作流实现的文章就介绍到这了。希望对大家的学习有所帮助,也希望大家多多支持编程网。

--结束END--

本文标题: C#多线程系列之工作流实现

本文链接: https://lsjlt.com/news/139084.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • C#多线程系列之工作流实现
    目录前言节点ThenParallelScheduleDelay试用一下顺序节点并行任务编写工作流接口构建器工作流构建器依赖注入实现工作流解析前言 前面学习了很多多线程和任务的基础知识...
    99+
    2024-04-02
  • C#多线程系列之原子操作
    目录知识点竞争条件线程同步CPU时间片和上下文切换阻塞内核模式和用户模式Interlocked类1,出现问题2,Interlocked.Increment()3,Interlocke...
    99+
    2024-04-02
  • C#多线程系列之线程池
    目录线程池ThreadPool 常用属性和方法线程池说明和示例线程池线程数线程池线程数说明不支持的线程池异步委托任务取消功能计时器线程池 线程池全称为托管线程池,线程池受 .NET ...
    99+
    2024-04-02
  • C#多线程系列之线程通知
    AutoRestEvent 类用于从一个线程向另一个线程发送通知。 微软文档是这样介绍的:表示线程同步事件在一个等待线程释放后收到信号时自动重置。 其构造函数只有一个: 构造函数里面...
    99+
    2024-04-02
  • C#多线程系列之线程等待
    目录前言volatile 关键字三种常用等待再说自旋和阻塞SpinWait 结构属性和方法自旋示例新的实现SpinLock 结构属性和方法示例等待性能对比前言 volatile 关键...
    99+
    2024-04-02
  • C#多线程系列之线程完成数
    解决一个问题 假如,程序需要向一个 Web 发送 5 次请求,受网路波动影响,有一定几率请求失败。如果失败了,就需要重试。 示例代码如下: class Program ...
    99+
    2024-04-02
  • C#多线程系列之多线程锁lock和Monitor
    目录1,Locklock 原型lock 编写实例2,Monitor怎么用呢解释一下示例设置获取锁的时效1,Lock lock 用于读一个引用类型进行加锁,同一时刻内只有一个线程能够访...
    99+
    2024-04-02
  • C#多线程系列之多阶段并行线程
    前言 这一篇,我们将学习用于实现并行任务、使得多个线程有序同步完成多个阶段的任务。 应用场景主要是控制 N 个线程(可随时增加或减少执行的线程),使得多线程在能够在 M 个阶段中保持...
    99+
    2024-04-02
  • C#多线程系列之读写锁
    本篇的内容主要是介绍 ReaderWriterLockSlim 类,来实现多线程下的读写分离。 ReaderWriterLockSlim ReaderWriterLock 类:定义支...
    99+
    2024-04-02
  • C#多线程系列之手动线程通知
    区别与示例 AutoResetEvent 和 ManualResetEvent 十分相似。两者之间的区别,在于前者是自动(Auto),后者是手动(Manua)。 你可以先运行下面的示...
    99+
    2024-04-02
  • C#多线程系列之进程同步Mutex类
    Mutex 中文为互斥,Mutex 类叫做互斥锁。它还可用于进程间同步的同步基元。 Mutex 跟 lock 相似,但是 Mutex 支持多个进程。Mutex 大约比 lock 慢 ...
    99+
    2024-04-02
  • C#多线程系列之任务基础(一)
    目录多线程编程多线程编程模式探究优点任务操作两种创建任务的方式Task.Run() 创建任务取消任务父子任务任务返回结果以及异步获取返回结果捕获任务异常全局捕获任务异常多线程编程 多...
    99+
    2024-04-02
  • C#多线程系列之任务基础(二)
    目录判断任务状态再说父子任务组合任务/延续任务复杂的延续任务并行(异步)处理任务并行(同步)处理任务并行任务的 Task.WhenAny并行任务状态循环中值变化问题定时任务 Task...
    99+
    2024-04-02
  • C#多线程系列之任务基础(三)
    目录TaskAwaiter延续的另一种方法另一种创建任务的方法实现一个支持同步和异步任务的类型Task.FromCanceled()如何在内部取消任务Yield 关键字补充知识点Ta...
    99+
    2024-04-02
  • C#多线程系列之资源池限制
    Semaphore、SemaphoreSlim 类 两者都可以限制同时访问某一资源或资源池的线程数。 这里先不扯理论,我们从案例入手,通过示例代码,慢慢深入了解。 Semaphore...
    99+
    2024-04-02
  • C#多线程系列之线程的创建和生命周期
    目录1,获取当前线程信息2,管理线程状态2.1启动与参数传递2.1.1ParameterizedThreadStart2.1.2使用静态变量或类成员变量2.1.3委托与Lambda2...
    99+
    2024-04-02
  • C#多线程系列之async和await用法详解
    目录async和awaitasyncawait从以往知识推导创建异步任务创建异步任务并返回Task异步改同步说说 await Task说说 async Task<TR...
    99+
    2024-04-02
  • C#多线程之线程锁实例分析
    这篇文章主要介绍了C#多线程之线程锁实例分析的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇C#多线程之线程锁实例分析文章都会有所收获,下面我们一起来看看吧。一、Mutex类“mutex”是术语“互相排斥(mut...
    99+
    2023-06-30
  • Java多线程揭秘之synchronized工作原理
    目录一.特性二.加锁过程(锁升级/锁膨胀)1.无锁状态2.偏向锁3.轻量级锁4.重量级锁5.总结三.锁优化1.锁消除2.锁粗化在学习本篇文章时,如果有不太懂的地方,大家也可以先看看博...
    99+
    2024-04-02
  • c#多线程之间的排他锁的实现
    我们很多时候会碰到这样的问题,使用多线程刷一个表的数据时需要多个线程不能重复提取数据,那么这个时候就需要使用到线程的排他锁了。 在c#里面其实很简单,下面先来看一个简单的小例子 ...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作