返回顶部
首页 > 资讯 > 后端开发 > Python >用python3的多进程和协程处理MyS
  • 472
分享到

用python3的多进程和协程处理MyS

进程和协MyS 2023-01-31 08:01:45 472人浏览 泡泡鱼

Python 官方文档:入门教程 => 点击学习

摘要

本文介绍用python3的多进程 + 协程处理Mysql的数据,主要逻辑是拉取mysql的数据,然后使用flashtext匹配关键字,在存回Mysql,代码如下(async_mysql.py): import time import asy

本文介绍用python3的多进程 + 协程处理Mysql的数据,主要逻辑是拉取mysql的数据,然后使用flashtext匹配关键字,在存回Mysql,代码如下(async_mysql.py):

import time
import asyncio
import random
from concurrent.futures import ProcessPoolExecutor as Pool

import aiomysql
from flashtext import KeyWordProcessor
import click

class AttrDict(dict):
    """可以用"."获取属性,没有该属性时返回None的字典"""
    def __getattr__(self, name):
        try:
            return self[name]
        except KeyError:
            return None

    def __setattr__(self, name, value):
        self[name] = value

class AttrDictCursor(aiomysql.DictCursor):
    """继承aiomysql的字典cursor"""
    dict_type = AttrDict

class MultiProceSSMysql(object):
    """用多进程和协程处理MySQL数据"""

    def __init__(self, workers=2, pool=10, start=0, end=2000):
        """第一段的参数需要跟随需求变动"""
        self.host = "192.168.0.34"
        self.port = 3306
        self.user = "root"
        self.password = "root"
        self.db = "mydb"
        self.origin_table = "judgment_main_etl"  # main
        self.dest_table = "laws_finance1"
        self.s_sql = f"select uuid, court_idea, judge_result, reason, plt_claim, dft_rep, crs_exm from {self.origin_table} where %s<=id and id<%s;"
        self.i_sql = f"insert into {self.dest_table} (uuid, title, reason, keyword) values (%s, %s, %s, %s)"

        self.pool = pool    # 协程数和MySQL连接数
        self.aionum = self.pool
        self.step = 2000  # 一次性从MySQL拉取的行数
        self.workers = workers  # 进程数
        self.start = start  # MySQL开始的行数
        self.end = end  # MySQL结束的行数

        self.keyword = ['非法经营支付业务', '网络洗钱', '资金池', '支付牌照', '清洁算', '网络支付', '网上支付', '移动支付', '聚合支付', '保本保息', '担保交易', '供应链金融', '网贷', '网络借贷', '网络投资', '虚假标的', '自融', '资金池', '关联交易', '庞氏骗局', '网络金融理财', '线上投资理财', '互联网私募', '互联网股权', '非法集资', '合同欺诈', '众筹投资', '股权转让', '互联网债权转让', '资本自融', '投资骗局', '洗钱', '非法集资', '网络传销', '虚拟币泡沫', '网络互助金融', '金融欺诈', '网上银行', '信用卡盗刷', '网络钓鱼', '信用卡信息窃取', '网上洗钱', '洗钱诈骗', '数字签名更改', '支付命令窃取', '金融诈骗', '引诱投资', '隐瞒项目信息', '风险披露', '夸大收益', '诈骗保险金', '非法经营保险业务', '侵占客户资金', '征信报告窃取', '金融诈骗', '破坏金融管理']
        self.kp = KeywordProcessor()    # flashtext是一个文本匹配包,在关键词数量大时速度远大于re
        self.kp.add_keywords_from_list(self.keyword)

    async def createMysqlPool(self, loop):
        """每个进程要有独立的pool,所以不绑定self"""
        pool = await aiomysql.create_pool(
            loop=loop, host=self.host, port=self.port, user=self.user,
            password=self.password, db=self.db, maxsize=self.pool,
            charset='utf8', cursorclass=AttrDictCursor
        )
        return pool

    def cutRange(self, start, end, times):
        """将数据区间分段"""
        partition = (end - start) // times
        ranges = []
        tmp_end = start
        while tmp_end < end:
            tmp_end += partition
            # 剩下的不足以再分
            if (end - tmp_end) < partition:
                tmp_end = end
            ranges.append((start, tmp_end))
            start = tmp_end
        return ranges

    async def findKeyword(self, db, start, end):
        """从MySQL数据中匹配出关键字"""
        # 随机休息一定时间,防止数据同时到达,同时处理, 应该是一部分等待,一部分处理
        await asyncio.sleep(random.random() * self.workers * 2)
        print("coroutine start")
        async with db.acquire() as conn:
            async with conn.cursor() as cur:
                while start < end:
                    tmp_end = start + self.step
                    if tmp_end > end:
                        tmp_end = end
                    print("aio start: %s, end: %s" % (start, tmp_end))
                    # <=id 和 id<
                    await cur.execute(self.s_sql, (start, tmp_end))
                    datas = await cur.fetchall()
                    uuids = []
                    for data in datas:
                        if data:
                            for key in list(data.keys()):
                                if not data[key]:
                                    data.pop(key)
                            keyword = self.kp.extract_keywords(
                                " ".join(data.values()))
                            if keyword:
                                keyword = ' '.join(set(keyword))   # 对关键字去重
                                # print(keyword)
                                uuids.append(
                                    (data.uuid, data.title, data.reason, keyword))
                    await cur.executemany(self.i_sql, uuids)
                    await conn.commit()
                    start = tmp_end

    def singleProcess(self, start, end):
        """单个进程的任务"""
        loop = asyncio.get_event_loop()
        # 为每个进程创建一个pool
        db = loop.run_until_complete(asyncio.ensure_future(
            self.createMysqlPool(loop)))

        tasks = []
        ranges = self.cutRange(start, end, self.aionum)
        print(ranges)
        for start, end in ranges:
            tasks.append(self.findKeyword(db, start, end))
        loop.run_until_complete(asyncio.gather(*tasks))

    def run(self):
        """多进程跑"""
        tasks = []
        ranges = self.cutRange(self.start, self.end, self.workers)
        start_time = time.time()
        with Pool(max_workers=self.workers) as executor:
            for start, end in ranges:
                print("processor start: %s, end: %s" % (start, end))
                tasks.append(executor.submit(self.singleProcess, start, end))
            for task in tasks:
                task.result()
        print("total time: %s" % (time.time() - start_time))

@click.command(help="运行")
@click.option("-w", "--workers", default=2, help="进程数")
@click.option('-p', "--pool", default=10, help="协程数")
@click.option('-s', '--start', default=0, help='MySQL开始的id')
@click.option('-e', "--end", default=2640000, help="MySQL结束的id")
def main(workers, pool, start, end):
    mp = MultiProcessMysql(workers=workers, pool=pool, start=start, end=end)
    if workers * pool > 100:
        if not click.confirm('MySQL连接数超过100(%s),确认吗?' % (workers * pool)):
            return
    mp.run()

if __name__ == "__main__":
    main()

运行如下:
$ python3 async_mysql.py -w 2 # 可以指定其他参数,也可使用默认值


个人博客

--结束END--

本文标题: 用python3的多进程和协程处理MyS

本文链接: https://lsjlt.com/news/192581.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 用python3的多进程和协程处理MyS
    本文介绍用python3的多进程 + 协程处理MySQL的数据,主要逻辑是拉取MySQL的数据,然后使用flashtext匹配关键字,在存回MySQL,代码如下(async_mysql.py): import time import asy...
    99+
    2023-01-31
    进程 和协 MyS
  • python 多进程和协程配合使用
    有一批key已经写入到3个txt文件中,每一个txt文件有30万行记录。现在需要读取这些txt文件,判断key是否在数据仓库中。(redis或者mysql)为空的记录,需要写入到日志文件中! 任务分工1. 使用多进程技术,每一个进...
    99+
    2023-01-31
    进程 和协 python
  • 异步 PHP — 多进程、多线程和协程
    让我们看一下这段典型的 PHP 代码: function names(){ $data = Http::get('data.location/products')->json(); $names = []; foreach...
    99+
    2023-09-09
    servlet json java
  • Python3多线程处理爬虫的实战
    多线程 到底什么是多线程?说起多线程我们首先从单线程来说。例如,我在这里看书,等这件事情干完,我就再去听音乐。对于这两件事情来说都是属于单线程,是一个完成了再接着完成下一个。但是我一...
    99+
    2023-03-02
    Python3多线程爬虫 Python 多线程爬虫
  • python中的多进程处理
      众所周知,python本身是单线程的,python中的线程处理是由python解释器分配时间片的;但在python 3.0中吸收了开源模块,开始支...
    99+
    2023-01-31
    进程 python
  • 怎么使用Python3多线程处理爬虫
    本文小编为大家详细介绍“怎么使用Python3多线程处理爬虫”,内容详细,步骤清晰,细节处理妥当,希望这篇“怎么使用Python3多线程处理爬虫”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。多线程到底什么是多线程...
    99+
    2023-07-05
  • python并发编程之多进程、多线程、异步和协程详解
    最近学习python并发,于是对多进程、多线程、异步和协程做了个总结。 一、多线程 多线程就是允许一个进程内存在多个控制权,以便让多个函数同时处于激活状态,从而让多个函数的操作同时运行。即使是单CPU的计...
    99+
    2022-06-04
    之多 多线程 详解
  • 深入浅析python中的多进程、多线程、协程
    进程与线程的历史 我们都知道计算机是由硬件和软件组成的。硬件中的CPU是计算机的核心,它承担计算机的所有任务。 操作系统是运行在硬件之上的软件,是计算机的管理者,它负责资源的管理和分配、任务的调度。 程序...
    99+
    2022-06-04
    多线程 进程 python
  • Python中多线程、多进程、协程的区别是什么
    今天就跟大家聊聊有关Python中多线程、多进程、协程的区别是什么,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。首先我们写一个简化的爬虫,对各个功能细分,有意识进行函数式编程。下面代...
    99+
    2023-06-16
  • 如何使用 Golang 协程进行错误处理?
    在 go 协程中处理错误可使用 panic 和 recover 机制。panic 可触发异常,而 recover 用于在协程中捕获异常,可返回指向 panic 值的指针。通过使用 pan...
    99+
    2024-05-21
    golang 协程
  • Golang函数的协程模型和多进程模型的比较
    Go语言是一种由Google开发的静态编译型语言,被广泛用于网络应用开发、系统编程、云计算等领域。Golang在并发编程方面做的非常出色,它通过引入协程和通道的概念,使得并发编程变得更加简单和高效。在不同的并发模型中,Golang的协程模型...
    99+
    2023-05-17
    Golang 协程模型 多进程模型
  • Python3的进程和线程你了解吗
    目录1.概述2.多进程3.子进程4.进程间通信5.多线程6.Lock7.ThreadLocal8.进程VS线程9.分布式进程总结1.概述 """ 基础知识: 1.多任务:操作系统可以...
    99+
    2024-04-02
  • golang协程数量太多怎么处理
    当使用goroutine数量过多时,可能会引发以下问题: 内存消耗:每个goroutine都需要一定的内存空间,如果gorout...
    99+
    2023-10-22
    golang
  • python多进程multiprocessing的原理和应用
    这篇文章主要介绍“python多进程multiprocessing的原理和应用”,在日常操作中,相信很多人在python多进程multiprocessing的原理和应用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家...
    99+
    2023-06-20
  • 如何在PHP开发中处理多线程和进程管理?
    如何在PHP开发中处理多线程和进程管理?简介:在PHP开发中,多线程和进程管理是一个重要的话题。随着应用程序变得越来越复杂,处理并发和高并发访问请求的能力变得至关重要。本文将介绍如何在PHP开发中处理多线程和进程管理的技术和工具。一、多线程...
    99+
    2023-11-03
    PHP并发编程 PHP多线程处理 PHP进程管理
  • PHP中的多线程和协程开发
    随着网络应用的不断发展和整个计算机应用的崛起,针对并发问题的解决方案也不断地发展和壮大。在使用PHP进行应用程序开发时,PHP中也提供了多线程和协程的开发方式来帮助程序员解决并发处理问题,本文将介绍PHP中的多线程和协程开发。一、PHP多线...
    99+
    2023-05-24
    PHP 多线程 协程。
  • Python的进程,线程和协程实例详解
    目录相关介绍实验环境进程多进程用进程池对多进程进行操作线程使用_thread模块实现使用 threading 模块实现协程使用asyncio模块实现总结相关介绍 Python是一种跨...
    99+
    2024-04-02
  • 实例详解Python的进程,线程和协程
    目录前言前提条件相关介绍实验环境进程多进程用进程池对多进程进行操作线程使用_thread模块实现使用 threading 模块实现协程使用asyncio模块实现总结前言 本文用Pyt...
    99+
    2024-04-02
  • Python的进程,线程和协程实例分析
    这篇“Python的进程,线程和协程实例分析”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Python的进程,线程和协程实例...
    99+
    2023-06-29
  • python多进程和多线程的实际用法
    这篇文章主要讲解了“python多进程和多线程的实际用法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“python多进程和多线程的实际用法”吧!  写在前面  总所周知,unix/linux...
    99+
    2023-06-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作