注册

领导说我工作 3 年了只会 CRUD

在老东家工作 3 年了,公司的业务和技术栈相对熟练得差不多了。


领导觉得我能够委以重任,便把一个新项目交给我负责,另外指派一名同事协助我。


项目的重点在于数据的交互比较多,以及每天大量的数据同步和批量操作,不能出错。


队友建议以短、平、快为主,能够使用已有现成的技术就用现成的技术。直接面向过程开发是人们最为舒适,是人为本能的习惯。由于他有这一种能够处理好的决心,便把数据批量操作这块委托于他。


查看了以往公司现成一些写法,一部分是直接面向 SQL 写法批量插入,面对增量同步则先查出,存在的更新,不存在的插入。一部分是通过 Kafka 和后台任务原子操作。


理论上这么操作结果也能成,但是看到修改记录,我就知道面临的需求变了很多变化很快,导致大量的更改。私底下询问负责人也了解出了太多问题,原本一劳永逸赶紧写完结果反而投入了更多的精力和时间。


出于预防心理,也对那位同事进行了提醒并且加以思考再下手。


不到一个月,我们就把项目上线了,并且没有出现数据上的错误,得到了领导的表扬。


我们也提前收场,做一些小的优化,其余时间在摸鱼。


一段时间之后,麻烦便接踵而至,其一就是开始数据量暴增,那位同事在做增量同步时进行了锁表操作,批量操作需要一些时间,在前台读取时出现响应超时。


其二就是增量同步要调整,以主库或第三方来源库为主,出现数据更新和删除的需要同步操作。


同事目前的主力放在了新项目上,把一些零散的时间用来调整需求和 bug,结果越处理,bug 出现的越多,不是数量过多卡死就是变量不对导致数据处理不对。


于是到了某一时刻终于爆发,领导找到我俩,被痛批一顿,工作这么久就只会 CRUD 操作,来的实习生都会干的活,还养你们干什么。


当然,要复盘的话当然有迹可循。我想碰见这种情况还真不少,首次开发项目时一鼓作气,以“短、平、快” 战术面向过程开发,短时间内上线。


但是,一个软件的生命周期可不止步于上线,还要过程运维以及面对变化。


导致在二次开发的时候就脱节了,要么当时写法不符合现有业务,要么改动太多动不动就割到了大动脉大出血,要么人跑了...


所以我们会采用面向对象,抽象化编程,就是用来保稳定,预留一部分来应付变化,避免牵一发而动全身。


挨完骂,也要开始收拾烂摊子。


于是我打算重新组装一个通用的方法,打算一劳永逸。


首先我们定义一个接口通用思维 IDbAsyncBulk。由于源码已经发布到了github,所以一些注释写成了英文,大致也能看出蹩脚英文的注释。


public interface IDbAsyncBulk
    {
        /// <summary>
        /// default init.
        /// use reflect to auto init all type, to lower case database fileds,and  default basic type.
        /// if ignore some fileds,please use DbBulk,Ignore property to remarkable fileds.
        /// if other operating,need user-defined to init operate.
        /// </summary>
        /// <typeparam name="T">Corresponding type</typeparam>
        Task InitDefaultMappings<T>();

        /// <summary>
        /// batch operating
        /// </summary>
        /// <typeparam name="T">will operate object entity type.</typeparam>
        /// <param name="connection_string">database connecting string.</param>
        /// <param name="targetTable">target table name. </param>
        /// <param name="list">will operate data list.</param>
        Task CopyToServer<T>(string connection_string, string targetTable, List<T> list);

        /// <summary>
        /// batch operating
        /// </summary>
        /// <typeparam name="T">will operate object entity type.</typeparam>
        /// <param name="connection">database connecting string.need to check database connecting is openning.
        /// if nothing other follow-up operate, shouldn't cover this connecting.</param>
        /// <param name="targetTable">target table name.</param>
        /// <param name="list">will operate data list.</param>
        Task CopyToServer<T>(DbConnection connection, string targetTable, List<T> list);

        /// <summary>
        /// renew as it exists,insert as it not exists.
        /// follow up : 
        /// 1.create temporary table
        /// 2.put data into temporary table.
        /// 3.merge data to target table.
        /// </summary>
        /// <typeparam name="T">data type</typeparam>
        /// <param name="connection_string">connecting string</param>
        /// <param name="keys">mapping orignal table and target table fileds,need primary key and data only,if not will throw error.</param>
        /// <param name="targetTable">target table name.</param>
        /// <param name="list">will operate data list.</param>
        /// <param name="tempTable">put data into temporary table,default name as 'target table name + # or _temp'</param>
        /// <param name="insertmapping">need to insert column,if is null,just use Mapping fileds,in order to avoid auto-create column</param>
        /// <param name="updatemapping">need to modify column,if is null,just use Mapping fileds</param>
        Task MergeToServer<T>(string connection_string, List<string> keys, string targetTable, List<T> list, string tempTable = null, List<string> insertmapping = null, List<string> updatemapping = null);

        /// <summary>
        /// renew as it exists,insert as it not exists.
        /// follow up : 
        /// 1.create temporary table
        /// 2.put data into temporary table.
        /// 3.merge data to target table.
        /// </summary>
        /// <typeparam name="T">data type</typeparam>
        /// <param name="connection">database connecting string.need to check database connecting is openning.</param>
        /// <param name="keys">mapping orignal table and target table fileds,need primary key and data only,if not will throw error.</param>
        /// <param name="targetTable">target table name.</param>
        /// <param name="list">will operate data list.</param>
        /// <param name="tempTable">put data into temporary table,default name as 'target table name + # or _temp'</param>
        /// <param name="insertmapping">need to insert column,if is null,just use Mapping fileds,in order to avoid auto-create column</param>
        /// <param name="updatemapping">need to modify column,if is null,just use Mapping fileds</param>
        Task MergeToServer<T>(DbConnection connection, List<string> keys, string targetTable, List<T> list, string tempTable = null, List<string> insertmapping = null, List<string> updatemapping = null);

        /// <summary>
        ///  batch update operating。
        /// 1.create temporary table
        /// 2.put data into temporary table.
        /// 3.merge data to target table.
        /// </summary>
        /// <typeparam name="T">data type</typeparam>
        /// <param name="connection_string">connecting string</param>
        /// <param name="where_name">matching 'where' compare fileds.</param>
        /// <param name="update_name">need to update fileds.</param>
        /// <param name="targetTable">target table name</param>
        /// <param name="list">will operate data list.</param>
        /// <param name="tempTable">put data into temporary table,default name as 'target table name + # or _temp'</param>
        Task UpdateToServer<T>(string connection_string, List<string> where_name, List<string> update_name, string targetTable, List<T> list, string tempTable = null);

        /// <summary>
        ///  batch update operating。
        /// 1.create temporary table
        /// 2.put data into temporary table.
        /// 3.merge data to target table.
        /// </summary>
        /// <typeparam name="T">data type</typeparam>
        /// <param name="connection_string">connecting string</param>
        /// <param name="where_name">matching 'where' compare fileds.</param>
        /// <param name="update_name">need to update fileds.</param>
        /// <param name="targetTable">target table name</param>
        /// <param name="list">will operate data list.</param>
        /// <param name="tempTable">put data into temporary table,default name as 'target table name + # or _temp'</param>
        /// <param name="createtemp"> create temporary table or not </param>
        Task UpdateToServer<T>(DbConnection connection, List<string> where_name, List<string> update_name, string targetTable, List<T> list, string tempTable = nullbool createtemp = true);

        /// <summary>
        /// renew as it exists,insert as it not exists.original table not exist and  target table exist will remove.
        /// 1.create temporary table
        /// 2.put data into temporary table.
        /// 3.merge data to target table.
        /// 4.will remove data that temporary data not exist and target table exist.
        /// </summary>
        /// <typeparam name="T">data type</typeparam>
        /// <param name="connection_string">connecting string</param>
        /// <param name="keys">mapping orignal table and target table fileds,need primary key and data only,if not will throw error.</param>
        /// <param name="targetTable">target table name</param>
        /// <param name="list">will operate data list.</param>
        /// <param name="tempTable">put data into temporary table,default name as 'target table name + # or _temp'</param>
        /// <param name="insertmapping">need to insert column,if is null,just use Mapping fileds,in order to avoid auto-create column</param>
        /// <param name="updatemapping">need to modify column,if is null,just use Mapping fileds</param>
        Task MergeAndDeleteToServer<T>(string connection_string, List<string> keys, string targetTable, List<T> list, string tempTable = null, List<string> insertmapping = null, List<string> updatemapping = null);

        /// <summary>
        /// renew as it exists,insert as it not exists.original table not exist and  target table exist will remove.
        ///  1.create temporary table
        /// 2.put data into temporary table.
        /// 3.merge data to target table.
        /// 4.will remove data that temporary data not exist and target table exist.
        /// </summary>
        /// <typeparam name="T">data type</typeparam>
        /// <param name="connection">database connecting string.need to check database connecting is openning.</param>
        /// <param name="keys">mapping orignal table and target table fileds,need primary key and data only,if not will throw error.</param>
        /// <param name="targetTable">target table name</param>
        /// <param name="list">will operate data list.</param>
        /// <param name="tempTable">put data into temporary table,default name as 'target table name + # or _temp'</param>
        /// <param name="insertmapping">need to insert column,if is null,just use Mapping fileds,in order to avoid auto-create column</param>
        /// <param name="updatemapping">need to modify column,if is null,just use Mapping fileds</param>
        Task MergeAndDeleteToServer<T>(DbConnection connection, List<string> keys, string targetTable, List<T> list, string tempTable = null, List<string> insertmapping = null, List<string> updatemapping = null);

        /// <summary>
        /// create temporary table
        /// </summary>
        /// <param name="tempTable">create temporary table name</param>
        /// <param name="targetTable">rarget table name</param>
        /// <param name="connection">database connecting</param>
        Task CreateTempTable(string tempTable, string targetTable, DbConnection connection);
    }

解释几个方法的作用:



InitDefaultMappings:初始化映射,将目标表的字段映射到实体,在批量操作时候会根据反射进行一一匹配表字段;


CopyToServer:批量新增,在符合数据表结构时批量复制到目标表,采用官方 SqlBulkCopy 类结合实体简化操作。


MergeToServer:增量同步,需指定唯一键,存在即更新,不存在则插入。支持指定更新字段,指定插入字段。


UpdateToServer:批量更新,需指定 where 条件,以及更新的字段。


MergeAndDeleteToServer:增量同步,以数据源和目标表进行匹配,目标表存在的则更新,不存在的则插入,目标表存在,数据源不存在则目标表移除。


CreateTempTable:创建临时表。



增加实体属性标记,用来标记列名是否忽略同步数据,以及消除数据库别名,大小写的差异。


 /// <summary>
    /// 数据库批量操作标记,用于标记对象属性。
    /// </summary>
    public class DbBulkAttribute : Attribute
    {
        /// <summary>
        /// 是否忽略。忽略则其余属性不需要设置,不忽略则必须设置Type。
        /// </summary>
        public bool Ignore { getset; }

        /// <summary>
        /// 列名,不设置则默认为实体字段名小写
        /// </summary>
        public string ColumnName { getset; }

    }

实现类,目前仅支持 SqlServer 数据库,正在更新 MySql 和 PGSql 中。然后需要定义BatchSize(default 10000)、BulkCopyTimeout (default 300)、ColumnMappings,分别是每批次大小,允许超时时间和映射的字段。


/// <summary>
    /// sql server batch
    /// </summary>
    public class SqlServerAsyncBulk : IDbAsyncBulk
    {
        /// <summary>
        /// log recoding
        /// </summary>
        private ILogger _log;
        /// <summary>
        ///batch insert size(handle a batch every time )。default 10000。
        /// </summary>
        public int BatchSize { getset; }
        /// <summary>
        /// overtime,default 300
        /// </summary>
        public int BulkCopyTimeout { getset; }
        /// <summary>
        /// columns mapping
        /// </summary>
        public Dictionary<stringstring> ColumnMappings { getset; }
        /// <summary>
        /// structure function
        /// </summary>
        /// <param name="log"></param>
        public SqlServerAsyncBulk(ILogger<SqlServerAsyncBulk> log)
        {
            _log = log;
            BatchSize = 10000;
            BulkCopyTimeout = 300;
        }
        
        //...to do

使用上也非常的简便,直接在服务里注册单例模式,使用的时候直接依赖注入。


 //if you use SqlServer database, config SqlServerAsyncBulk service.
services.AddSingleton<IDbAsyncBulk, SqlServerAsyncBulk>();

public class BatchOperate
{
  private readonly IDbAsyncBulk _bulk;
  public BatchOperate(IDbAsyncBulk bulk)
  {
    _bulk = bulk;
  }
}

以 user_base 表举两个实例,目前测试几十万数据也才零点几秒。


 public async Task CopyToServerTest()
        {
            var connectStr = @"Data Source=KF009\SQLEXPRESS;Initial Catalog=MockData;User ID=xxx;Password=xxx";
            await _bulk.InitDefaultMappings<UserBaseModel>();
            var mock_list = new List<UserBaseModel>();
            for (var i = 0; i < 1000; i++) {
                mock_list.Add(new UserBaseModel
                {
                    age = i,
                    birthday = DateTime.Now.AddMonths(-i).Date,
                    education = "本科",
                    email = "xiaoyu@163.com",
                    name = $"小榆{i}",
                    nation = "
",
                    nationality="
中国"
                });
            }
            await _bulk.CopyToServer(connectStr, "
user_base", mock_list);
        }

public async Task MergeToServerTest()
        {
            var connectStr = @"Data Source=KF009\SQLEXPRESS;Initial Catalog=MockData;User ID=sa;Password=root";
            await _bulk.InitDefaultMappings<UserBaseModel>();
            var mock_list = new List<UserBaseModel>();
            for (var i = 0; i < 1000; i++)
            {
                mock_list.Add(new UserBaseModel
                {
                    age = i,
                    birthday = DateTime.Now.AddMonths(-i).Date,
                    education = "本科",
                    email = "mock@163.com",
                    name = $"小榆{i}",
                    nation = "汉",
                    nationality = "中国"
                });
            }
            var insertMapping = new List<string> { "birthday""education""age""email""name""nation""nationality" };
            var updateMapping = new List<string> { "birthday""education""age""email"};
            await _bulk.MergeToServer(connectStr,new List<string> {"id"}, "user_base", mock_list,null, insertMapping, updateMapping);
        

到这里,也已经完成了批量数据操作啦,不用再面对大量的sql操作啦。面向 sql 开发一时确实爽,但是面临变化或者别人接手的时候,是很痛苦的。


具体实现细节内容过多,篇幅有限暂时不全部展示,有兴趣或者尝试的伙伴可以进 github 进行参考。



github👉:github.com/sangxiaoyu/… 💖



作者:桑小榆呀
来源:juejin.cn/post/7290361767141376057

0 个评论

要回复文章请先登录注册