博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
gj11 多线程、多进程和线程池编程
阅读量:6306 次
发布时间:2019-06-22

本文共 15974 字,大约阅读时间需要 53 分钟。

11.1 python中的GIL

 

# coding=utf-8# gil global interpreter lock (cpython)# python中一个线程对应于c语言中的一个线程# gil使得同一个时刻只有一个线程在一个cpu上执行字节码, 无法将多个线程映射到多个cpu上执行# gil会根据执行的字节码行数以及时间片释放gil,# gil在遇到io的操作时候主动释放import dis
def add(a):    a = a + 1    return aprint(dis.dis(add))total = 0def add():    # 1. dosomething1    # 2. io操作    # 1. dosomething3    global total    for i in range(1000000):        total += 1def desc():    global total    for i in range(1000000):        total -= 1import threadingthread1 = threading.Thread(target=add)thread2 = threading.Thread(target=desc)thread1.start()thread2.start()thread1.join()thread2.join()print(total)# 在IO频繁的时候是很适合的

 

执行多少行后字节码会释放

11.2 python多线程编程

操作系统最小的执行单元

# coding=utf-8# __auther__ = 'lewen'import timeimport threadingdef get_detail_html(url):    print("get detail html started")    time.sleep(2)    print("get detail html end")def get_detail_url(url):    print("get detail url started")    time.sleep(4)    print("get detail url end")if __name__ == "__main__":    # 在主线程起两个线程    thread1 = threading.Thread(target=get_detail_html, args=("",))    thread2 = threading.Thread(target=get_detail_url, args=("",))    #     thread1.setDaemon(True)    # thread2.setDaemon(True)  # 守护线程,当主线程退出的时候, 子线程kill掉    start_time = time.time()    thread1.start()    thread2.start()    thread1.join()  # 等待线程的执行完成,才会执行下面    thread2.join()    print("last time: {}".format(time.time() - start_time))

 

通过集成Thread来实现多线程

class GetDetailHtml(threading.Thread):    def __init__(self, name):        # py2 必须在括号写类名        # 继承父类的name        super().__init__(name=name)    def run(self):        print("get detail html started")        time.sleep(2)        print("get detail html end")class GetDetailUrl(threading.Thread):    def __init__(self, name):        super().__init__(name=name)    def run(self):        print("get detail url started")        time.sleep(4)        print("get detail url end")if __name__ == "__main__":    thread1 = GetDetailHtml("get_detail_html")    thread2 = GetDetailUrl("get_detail_url")    start_time = time.time()    thread1.start()    thread2.start()    thread1.join()    thread2.join()    print("last time: {}".format(time.time() - start_time))

 

11.3 线程间通信-Queue

 

共用变量

# 线程间通信import timeimport threadingfrom chapter11 import variablesfrom threading import Conditiondetail_url_list = []# 1. 生产者当生产10个url以后就就等待,保证detail_url_list中最多只有十个url# 2. 当url_list为空的时候,消费者就暂停def get_detail_html(lock):    # 爬取文章详情页    detail_url_list = variables.detail_url_list  # 将共享变量存放到文件中去    while True:        if len(variables.detail_url_list):               if len(detail_url_list):                url = detail_url_list.pop()                # for url in detail_url_list:                print("get detail html started")                time.sleep(2)                print("get detail html end")            else:                time.sleep(1)def get_detail_url(lock):    # 爬取文章列表页    detail_url_list = variables.detail_url_list    while True:        print("get detail url started")        time.sleep(4)        for i in range(20):            if len(detail_url_list) >= 10:                time.sleep(1)            else:                detail_url_list.append("http://projectsedu.com/{id}".format(id=i))        print("get detail url end")# 1. 线程通信方式- 共享变量if __name__ == "__main__":    thread_detail_url = threading.Thread(target=get_detail_url, args=(lock,))    for i in range(10):        html_thread = threading.Thread(target=get_detail_html, args=(lock,))        html_thread.start()    # # thread2 = GetDetailUrl("get_detail_url")    start_time = time.time()    # 当主线程退出的时候, 子线程kill掉    print("last time: {}".format(time.time() - start_time))
# 通过共用变量

 

from queue import Queueimport timeimport threadingdef get_detail_html(queue):    # 爬取文章详情页    while True:        url = queue.get()  # 阻塞,没有会停在这        print(url)        # 内部基于deque        print("get detail html started")        time.sleep(1)        print("get detail html end")def get_detail_url(queue):    # 爬取文章列表页    while True:        print("get detail url started")        time.sleep(2)        for i in range(9):            queue.put("http://www.baidu.com/s?wd=".format(id=i))        print(queue.qsize())        print("get detail url end")# 1. 线程通信方式- 共享变量if __name__ == "__main__":    detail_url_queue = Queue(maxsize=1000)    thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,))    html_thread_list = []    for i in range(10):        html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,))        html_thread.start()        html_thread_list.append(html_thread)    start_time = time.time()    for h in html_thread_list:        h.join()    # detail_url_queue.join()  # 这里想退出,必须等到 detail_url_queue.task_done()调用,才会退出    print("last time: {}".format(time.time() - start_time))
# 通过queue的方式进行线程间同步

 

11.4 线程同步(Lock、RLock、Semaphores、Condition)

 

from threading import Lock, RLock, Condition  # 可重入的锁# Lock 不能重复调用total = 0lock = RLock()  # 在同一个线程里面,可以连续调用多次acquire, 一定要注意acquire的次数要和release的次数相等                # 多个线程之间仍会竞争def add():    global lock    global total    for i in range(1000000):        lock.acquire()        lock.acquire()   # 一个线程里面重入的锁        total += 1        lock.release()        lock.release()def desc():    global total    global lock    for i in range(1000000):        lock.acquire()        total -= 1        lock.release()import threadingthread1 = threading.Thread(target=add)thread2 = threading.Thread(target=desc)thread1.start()thread2.start()thread1.join()thread2.join()print(total)# 1. 用锁会影响性能# 2. 锁会引起死锁# 死锁的情况 A(a,b)"""A(a、b)acquire (a)acquire (b)   # 阻塞住,死在这B(b、a)acquire (b)   # 交互死锁,资源竞争acquire (a)"""
Lock、RLock

 

condition 使用以及源码分析

import threadingclass XiaoAi(threading.Thread):    def __init__(self, lock):        super().__init__(name="小爱")        self.lock = lock    def run(self):        self.lock.acquire()        print("{} : 在 ".format(self.name))        self.lock.release()        self.lock.acquire()        print("{} : 好啊 ".format(self.name))        self.lock.release()class TianMao(threading.Thread):    def __init__(self, lock):        super().__init__(name="天猫精灵")        self.lock = lock    def run(self):        self.lock.acquire()        print("{} : 小爱同学 ".format(self.name))        self.lock.release()        self.lock.acquire()        print("{} : 我们来对古诗吧 ".format(self.name))        self.lock.release()if __name__ == "__main__":    lock = threading.Lock()    xiaoai = XiaoAi(lock)    tianmao = TianMao(lock)    tianmao.start()    xiaoai.start()# ---天猫精灵 : 小爱同学天猫精灵 : 我们来对古诗吧小爱 : 在小爱 : 好啊
没有使用condition

 

class XiaoAi(threading.Thread):    def __init__(self, cond):        super().__init__(name="小爱")        self.cond = cond    def run(self):        with self.cond:#第一把锁            self.cond.wait()            print("{} : 在 ".format(self.name))            self.cond.notify()            self.cond.wait()            print("{} : 好啊 ".format(self.name))            self.cond.notify()            self.cond.wait()            print("{} : 君住长江尾 ".format(self.name))            self.cond.notify()            self.cond.wait()            print("{} : 共饮长江水 ".format(self.name))            self.cond.notify()            self.cond.wait()            print("{} : 此恨何时已 ".format(self.name))            self.cond.notify()            self.cond.wait()            print("{} : 定不负相思意 ".format(self.name))            self.cond.notify()class TianMao(threading.Thread):    def __init__(self, cond):        super().__init__(name="天猫精灵")        self.cond = cond    def run(self):        with self.cond:  #第一把锁            print("{} : 小爱同学 ".format(self.name))            self.cond.notify()  # 提醒            self.cond.wait()    # 等待条件提醒            print("{} : 我们来对古诗吧 ".format(self.name))            self.cond.notify()            self.cond.wait()            print("{} : 我住长江头 ".format(self.name))            self.cond.notify()            self.cond.wait()            print("{} : 日日思君不见君 ".format(self.name))            self.cond.notify()            self.cond.wait()            print("{} : 此水几时休 ".format(self.name))            self.cond.notify()            self.cond.wait()            print("{} : 只愿君心似我心 ".format(self.name))            self.cond.notify()            self.cond.wait()if __name__ == "__main__":    cond = threading.Condition()    xiaoai = XiaoAi(cond)    tianmao = TianMao(cond)    # 在调用with cond之后才能调用wait或者notify方法    # condition有两层锁, 一把底层锁(with condition)会在线程调用了wait方法的时候释放,     # 上面的锁会在每次调用wait的时候分配一把并放入到cond的等待队列中,等到notify方法的唤醒    xiaoai.start()    tianmao.start()    # 启动顺序很重要    # 天猫start 后 notify ,然后小爱 start 进入wait ,一直接受不到 notify 就阻塞住    # start 后 wait 的线程应该先启动去等着,以免接受不到notify# ---天猫精灵 : 小爱同学小爱 : 在天猫精灵 : 我们来对古诗吧小爱 : 好啊天猫精灵 : 我住长江头小爱 : 君住长江尾天猫精灵 : 日日思君不见君小爱 : 共饮长江水天猫精灵 : 此水几时休小爱 : 此恨何时已天猫精灵 : 只愿君心似我心小爱 : 定不负相思意

在调用with cond之后才能调用wait或者notify方法condition有两层锁, 一把底层锁(with condition)会在线程调用了wait方法的时候释放, 上面的锁会在每次调用wait的时候分配一把并放入到cond的等待队列中,等到notify方法的唤醒

 

Semaphore 使用

# Semaphore 是用于控制进入数量的锁# 文件, 读、写, 写一般只是用于一个线程写,读可以允许有多个# 做爬虫import threadingimport timeclass HtmlSpider(threading.Thread):    def __init__(self, url, sem):        super().__init__()        self.url = url        self.sem = sem    def run(self):        time.sleep(2)        print("got html text success")        self.sem.release()class UrlProducer(threading.Thread):    def __init__(self, sem):        super().__init__()        self.sem = sem    def run(self):        for i in range(20):            self.sem.acquire()            html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem)            html_thread.start()if __name__ == "__main__":    sem = threading.Semaphore(3)    url_producer = UrlProducer(sem)    url_producer.start()

 

11.5 concurrent线程池编码

 

# 线程池, 为什么要线程池# 主线程中可以获取某一个线程的状态或者某一个任务的状态,以及返回值# 当一个线程完成的时候我们主线程能立即知道# futures可以让多线程和多进程编码接口一致from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETEDimport timedef get_html(times):    time.sleep(times)    print("get page {} success".format(times))    return timesexecutor = ThreadPoolExecutor(max_workers=2)# 通过submit函数提交执行的函数到线程池中, submit 是立即返回task1 = executor.submit(get_html, (3))task2 = executor.submit(get_html, (2))# done方法用于判定某个任务是否完成print(task1.done())# print(task2.cancel())   # 取消任务(成功返回True),在执行中或开始执行的时候是不能取消的# time.sleep(3)# print(task1.done())# result 是阻塞的方法可以获取task的执行结果print(task1.result())# ---------urls = [3,2,4]all_task = [executor.submit(get_html, (url)) for url in urls]   # 批量提交wait(all_task, return_when=FIRST_COMPLETED)print("main")# 要获取已经成功的task的返回# for future in as_completed(all_task):#     data = future.result()#     print("get {} page".format(data))# 通过executor的map获取已经完成的task的值# for data in executor.map(get_html, urls):#     print("get {} page".format(data)) # 跟提交值顺序相同# ----Falseget page 2 successget page 3 success3get page 2 successmainget page 3 successget page 4 success

 

from concurrent.futures import Future #未来对象,task的返回容器

11.6 多进程编程-multiprocessing

 

# 多进程编程# 耗cpu的操作,用多进程编程, 对于io操作来说, 使用多线程编程,进程切换代价要高于线程# 1. 对于耗费cpu的操作,多进程由于多线程import timefrom concurrent.futures import ThreadPoolExecutor, as_completedfrom concurrent.futures import ProcessPoolExecutordef fib(n):    if n<=2:        return 1    return fib(n-1)+fib(n-2)if __name__ == "__main__":    with ProcessPoolExecutor(3) as executor:  #last time is: 14.505059242248535#     with ThreadPoolExecutor(3) as executor:  # last time is: 30.066641330718994        all_task = [executor.submit(fib, (num)) for num in range(25,40)]        start_time = time.time()        for future in as_completed(all_task):            data = future.result()            print("exe result: {}".format(data))        print("last time is: {}".format(time.time()-start_time))

 

#2. 对于io操作来说,多线程优于多进程def random_sleep(n):    time.sleep(n)    return nif __name__ == "__main__":    # with ThreadPoolExecutor(3) as executor:    with ProcessPoolExecutor(3) as executor:        all_task = [executor.submit(random_sleep, (num)) for num in [1]*30]        start_time = time.time()        for future in as_completed(all_task):            data = future.result()            print("exe result: {}".format(data))        print("last time is: {}".format(time.time()-start_time))

 

import osimport time# fork只能用Linux/unix中pid = os.fork()print("lewen",pid)if pid ==0:   #子进程拷贝    print("子进程 %s,父进程 %s"%(os.getpid(),os.getppid()))else:    print("我是父进程:%s"%(pid))time.sleep(2)[root@doit ~]# python fork_test.py ('lewen', 16077)我是父进程:16077('lewen', 0)子进程 16077,父进程 16076import osimport time# fork只能用Linux/unix中print("lewen",pid)pid = os.fork()if pid ==0:   #子进程拷贝    print("子进程 %s,父进程 %s"%(os.getpid(),os.getppid()))else:    print("我是父进程:%s"%(pid))time.sleep(2)[root@doit ~]# python fork_test.py lewen我是父进程:16096子进程 16096,父进程 16095
os.fork()

 

from concurrent.futures import ProcessPoolExecutor  # 进程池,基于multiprocessing,推荐import multiprocessing# 多进程编程import timedef get_html(n):    time.sleep(n)    print("sub_progress success")    return nclass MyProcess(multiprocessing.Process):    def run(self):        passif __name__ == "__main__":    # progress = multiprocessing.Process(target=get_html, args=(2,))    # print(progress.pid)    # progress.start()    # print(progress.pid)    # progress.join()    # print("main progress end")    """    None    10796    sub_progress success    main progress end        """    # 使用线程池    pool = multiprocessing.Pool(multiprocessing.cpu_count())    # result = pool.apply_async(get_html, args=(3,))  # 异步提交任务    #    # # 等待所有任务完成    # pool.close()  # 关闭,不再接受新的任务进来,才不会出错    # pool.join()    #    # print(result.get())    """    sub_progress success    3    """    # imap    # for result in pool.imap(get_html, [1, 5, 3]):    #     print("{} sleep success".format(result))    """    sub_progress success    1 sleep success    sub_progress success    sub_progress success    5 sleep success    3 sleep success    """    for result in pool.imap_unordered(get_html, [1, 5, 3]):  # 谁先完成就打出来        print("{} sleep success".format(result))    """    sub_progress success    1 sleep success    sub_progress success    3 sleep success    sub_progress success    5 sleep success    """

11.7 进程间通信

1 multiprocessing.Queue

 

 

# 共享全局变量通信# 共享全局变量不能适用于多进程编程,可以适用于多线程def producer(a):    a += 100    time.sleep(2)def consumer(a):    time.sleep(2)    print(a)if __name__ == "__main__":    a = 1    my_producer = Process(target=producer, args=(a,))    my_consumer = Process(target=consumer, args=(a,))    my_producer.start()    my_consumer.start()    my_producer.join()    my_consumer.join()---1
共享全局变量不能适用于多进程编程,可以适用于多线程

  

# multiprocessing中的queue不能用于pool进程池# pool中的进程间通信需要使用manager中的queue import timefrom multiprocessing import Process, Queue, Pool, Manager, Pipedef producer(queue):    queue.put("a")    time.sleep(2)def consumer(queue):    time.sleep(2)    data = queue.get()    print(data)if __name__ == "__main__":    queue = Manager().Queue(10)    pool = Pool(2)    pool.apply_async(producer, args=(queue,))    pool.apply_async(consumer, args=(queue,))    pool.close()    pool.join()--a
2 pool中的进程间通信需要使用manager中的queue

 

 

#通过pipe(管道)实现进程间通信#pipe的性能高于queuedef producer(pipe):    pipe.send("lewen")def consumer(pipe):    print(pipe.recv())if __name__ == "__main__":    recevie_pipe, send_pipe = Pipe()    # pipe只能适用于两个进程    my_producer= Process(target=producer, args=(send_pipe, ))    my_consumer = Process(target=consumer, args=(recevie_pipe,))    my_producer.start()    my_consumer.start()    my_producer.join()    my_consumer.join()
3 通过pipe(管道)实现进程间通信

 

 

内存共享

def add_data(p_dict, key, value):    p_dict[key] = valueif __name__ == "__main__":    progress_dict = Manager().dict()    from queue import PriorityQueue  # 优先级队列,后插入的数据尽快被获取到    first_progress = Process(target=add_data, args=(progress_dict, "lewen1", 22))    second_progress = Process(target=add_data, args=(progress_dict, "lewen2", 23))    first_progress.start()    second_progress.start()    first_progress.join()    second_progress.join()    print(progress_dict) --- {'lewen1': 22, 'lewen2': 23}

 

 

 

 

 

 

 

 

 

转载于:https://www.cnblogs.com/wenyule/p/10382123.html

你可能感兴趣的文章
「镁客早报」特斯拉裁员,马斯克解释没有办法;微软推出Azure DevOps赏金计划...
查看>>
Flink入坑指南第五章 - 语法糖 view
查看>>
centos 7.4 使用 pgxc_ctl 安装与使用
查看>>
Redis 单key值过大 优化方式
查看>>
【数据库】表分区
查看>>
nutz-sqltpl 1.3.4.RELEASE 发布,在 Nutz 项目中“解决 Java 拼接 SQL”问题
查看>>
城市 | 800个地铁站数据透析的京沪白领图鉴:隐形土豪、无产中产阶级和猪猪女孩...
查看>>
前端脚本!网站图片素材中文转英文
查看>>
linux的常用易忘命令
查看>>
PHP 分割字符串
查看>>
java 基于QRCode、zxing 的二维码生成与解析
查看>>
关于职业规划的一些思考
查看>>
img垂直水平居中与div
查看>>
Fabrik – 在浏览器中协作构建,可视化,设计神经网络
查看>>
防恶意注册的思考
查看>>
http2-head compression
查看>>
C# 命名空间
查看>>
订餐系统之同步美团商家订单
查看>>
使用ArrayList时设置初始容量的重要性
查看>>
Java Web-----JSP与Servlet(一)
查看>>