11.1 python中的GIL
# coding=utf-8# gil global interpreter lock (cpython)# python中一个线程对应于c语言中的一个线程# gil使得同一个时刻只有一个线程在一个cpu上执行字节码, 无法将多个线程映射到多个cpu上执行# gil会根据执行的字节码行数以及时间片释放gil,# gil在遇到io的操作时候主动释放import dis
def add(a): a = a + 1 return aprint(dis.dis(add))total = 0def add(): # 1. dosomething1 # 2. io操作 # 1. dosomething3 global total for i in range(1000000): total += 1def desc(): global total for i in range(1000000): total -= 1import threadingthread1 = threading.Thread(target=add)thread2 = threading.Thread(target=desc)thread1.start()thread2.start()thread1.join()thread2.join()print(total)# 在IO频繁的时候是很适合的
执行多少行后字节码会释放
11.2 python多线程编程
操作系统最小的执行单元
# coding=utf-8# __auther__ = 'lewen'import timeimport threadingdef get_detail_html(url): print("get detail html started") time.sleep(2) print("get detail html end")def get_detail_url(url): print("get detail url started") time.sleep(4) print("get detail url end")if __name__ == "__main__": # 在主线程起两个线程 thread1 = threading.Thread(target=get_detail_html, args=("",)) thread2 = threading.Thread(target=get_detail_url, args=("",)) # thread1.setDaemon(True) # thread2.setDaemon(True) # 守护线程,当主线程退出的时候, 子线程kill掉 start_time = time.time() thread1.start() thread2.start() thread1.join() # 等待线程的执行完成,才会执行下面 thread2.join() print("last time: {}".format(time.time() - start_time))
通过集成Thread来实现多线程
class GetDetailHtml(threading.Thread): def __init__(self, name): # py2 必须在括号写类名 # 继承父类的name super().__init__(name=name) def run(self): print("get detail html started") time.sleep(2) print("get detail html end")class GetDetailUrl(threading.Thread): def __init__(self, name): super().__init__(name=name) def run(self): print("get detail url started") time.sleep(4) print("get detail url end")if __name__ == "__main__": thread1 = GetDetailHtml("get_detail_html") thread2 = GetDetailUrl("get_detail_url") start_time = time.time() thread1.start() thread2.start() thread1.join() thread2.join() print("last time: {}".format(time.time() - start_time))
11.3 线程间通信-Queue
共用变量
# 线程间通信import timeimport threadingfrom chapter11 import variablesfrom threading import Conditiondetail_url_list = []# 1. 生产者当生产10个url以后就就等待,保证detail_url_list中最多只有十个url# 2. 当url_list为空的时候,消费者就暂停def get_detail_html(lock): # 爬取文章详情页 detail_url_list = variables.detail_url_list # 将共享变量存放到文件中去 while True: if len(variables.detail_url_list): if len(detail_url_list): url = detail_url_list.pop() # for url in detail_url_list: print("get detail html started") time.sleep(2) print("get detail html end") else: time.sleep(1)def get_detail_url(lock): # 爬取文章列表页 detail_url_list = variables.detail_url_list while True: print("get detail url started") time.sleep(4) for i in range(20): if len(detail_url_list) >= 10: time.sleep(1) else: detail_url_list.append("http://projectsedu.com/{id}".format(id=i)) print("get detail url end")# 1. 线程通信方式- 共享变量if __name__ == "__main__": thread_detail_url = threading.Thread(target=get_detail_url, args=(lock,)) for i in range(10): html_thread = threading.Thread(target=get_detail_html, args=(lock,)) html_thread.start() # # thread2 = GetDetailUrl("get_detail_url") start_time = time.time() # 当主线程退出的时候, 子线程kill掉 print("last time: {}".format(time.time() - start_time))
from queue import Queueimport timeimport threadingdef get_detail_html(queue): # 爬取文章详情页 while True: url = queue.get() # 阻塞,没有会停在这 print(url) # 内部基于deque print("get detail html started") time.sleep(1) print("get detail html end")def get_detail_url(queue): # 爬取文章列表页 while True: print("get detail url started") time.sleep(2) for i in range(9): queue.put("http://www.baidu.com/s?wd=".format(id=i)) print(queue.qsize()) print("get detail url end")# 1. 线程通信方式- 共享变量if __name__ == "__main__": detail_url_queue = Queue(maxsize=1000) thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,)) html_thread_list = [] for i in range(10): html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,)) html_thread.start() html_thread_list.append(html_thread) start_time = time.time() for h in html_thread_list: h.join() # detail_url_queue.join() # 这里想退出,必须等到 detail_url_queue.task_done()调用,才会退出 print("last time: {}".format(time.time() - start_time))
11.4 线程同步(Lock、RLock、Semaphores、Condition)
from threading import Lock, RLock, Condition # 可重入的锁# Lock 不能重复调用total = 0lock = RLock() # 在同一个线程里面,可以连续调用多次acquire, 一定要注意acquire的次数要和release的次数相等 # 多个线程之间仍会竞争def add(): global lock global total for i in range(1000000): lock.acquire() lock.acquire() # 一个线程里面重入的锁 total += 1 lock.release() lock.release()def desc(): global total global lock for i in range(1000000): lock.acquire() total -= 1 lock.release()import threadingthread1 = threading.Thread(target=add)thread2 = threading.Thread(target=desc)thread1.start()thread2.start()thread1.join()thread2.join()print(total)# 1. 用锁会影响性能# 2. 锁会引起死锁# 死锁的情况 A(a,b)"""A(a、b)acquire (a)acquire (b) # 阻塞住,死在这B(b、a)acquire (b) # 交互死锁,资源竞争acquire (a)"""
condition 使用以及源码分析
import threadingclass XiaoAi(threading.Thread): def __init__(self, lock): super().__init__(name="小爱") self.lock = lock def run(self): self.lock.acquire() print("{} : 在 ".format(self.name)) self.lock.release() self.lock.acquire() print("{} : 好啊 ".format(self.name)) self.lock.release()class TianMao(threading.Thread): def __init__(self, lock): super().__init__(name="天猫精灵") self.lock = lock def run(self): self.lock.acquire() print("{} : 小爱同学 ".format(self.name)) self.lock.release() self.lock.acquire() print("{} : 我们来对古诗吧 ".format(self.name)) self.lock.release()if __name__ == "__main__": lock = threading.Lock() xiaoai = XiaoAi(lock) tianmao = TianMao(lock) tianmao.start() xiaoai.start()# ---天猫精灵 : 小爱同学天猫精灵 : 我们来对古诗吧小爱 : 在小爱 : 好啊
class XiaoAi(threading.Thread): def __init__(self, cond): super().__init__(name="小爱") self.cond = cond def run(self): with self.cond:#第一把锁 self.cond.wait() print("{} : 在 ".format(self.name)) self.cond.notify() self.cond.wait() print("{} : 好啊 ".format(self.name)) self.cond.notify() self.cond.wait() print("{} : 君住长江尾 ".format(self.name)) self.cond.notify() self.cond.wait() print("{} : 共饮长江水 ".format(self.name)) self.cond.notify() self.cond.wait() print("{} : 此恨何时已 ".format(self.name)) self.cond.notify() self.cond.wait() print("{} : 定不负相思意 ".format(self.name)) self.cond.notify()class TianMao(threading.Thread): def __init__(self, cond): super().__init__(name="天猫精灵") self.cond = cond def run(self): with self.cond: #第一把锁 print("{} : 小爱同学 ".format(self.name)) self.cond.notify() # 提醒 self.cond.wait() # 等待条件提醒 print("{} : 我们来对古诗吧 ".format(self.name)) self.cond.notify() self.cond.wait() print("{} : 我住长江头 ".format(self.name)) self.cond.notify() self.cond.wait() print("{} : 日日思君不见君 ".format(self.name)) self.cond.notify() self.cond.wait() print("{} : 此水几时休 ".format(self.name)) self.cond.notify() self.cond.wait() print("{} : 只愿君心似我心 ".format(self.name)) self.cond.notify() self.cond.wait()if __name__ == "__main__": cond = threading.Condition() xiaoai = XiaoAi(cond) tianmao = TianMao(cond) # 在调用with cond之后才能调用wait或者notify方法 # condition有两层锁, 一把底层锁(with condition)会在线程调用了wait方法的时候释放, # 上面的锁会在每次调用wait的时候分配一把并放入到cond的等待队列中,等到notify方法的唤醒 xiaoai.start() tianmao.start() # 启动顺序很重要 # 天猫start 后 notify ,然后小爱 start 进入wait ,一直接受不到 notify 就阻塞住 # start 后 wait 的线程应该先启动去等着,以免接受不到notify# ---天猫精灵 : 小爱同学小爱 : 在天猫精灵 : 我们来对古诗吧小爱 : 好啊天猫精灵 : 我住长江头小爱 : 君住长江尾天猫精灵 : 日日思君不见君小爱 : 共饮长江水天猫精灵 : 此水几时休小爱 : 此恨何时已天猫精灵 : 只愿君心似我心小爱 : 定不负相思意
在调用with cond之后才能调用wait或者notify方法condition有两层锁, 一把底层锁(with condition)会在线程调用了wait方法的时候释放, 上面的锁会在每次调用wait的时候分配一把并放入到cond的等待队列中,等到notify方法的唤醒
Semaphore 使用
# Semaphore 是用于控制进入数量的锁# 文件, 读、写, 写一般只是用于一个线程写,读可以允许有多个# 做爬虫import threadingimport timeclass HtmlSpider(threading.Thread): def __init__(self, url, sem): super().__init__() self.url = url self.sem = sem def run(self): time.sleep(2) print("got html text success") self.sem.release()class UrlProducer(threading.Thread): def __init__(self, sem): super().__init__() self.sem = sem def run(self): for i in range(20): self.sem.acquire() html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem) html_thread.start()if __name__ == "__main__": sem = threading.Semaphore(3) url_producer = UrlProducer(sem) url_producer.start()
11.5 concurrent线程池编码
# 线程池, 为什么要线程池# 主线程中可以获取某一个线程的状态或者某一个任务的状态,以及返回值# 当一个线程完成的时候我们主线程能立即知道# futures可以让多线程和多进程编码接口一致from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETEDimport timedef get_html(times): time.sleep(times) print("get page {} success".format(times)) return timesexecutor = ThreadPoolExecutor(max_workers=2)# 通过submit函数提交执行的函数到线程池中, submit 是立即返回task1 = executor.submit(get_html, (3))task2 = executor.submit(get_html, (2))# done方法用于判定某个任务是否完成print(task1.done())# print(task2.cancel()) # 取消任务(成功返回True),在执行中或开始执行的时候是不能取消的# time.sleep(3)# print(task1.done())# result 是阻塞的方法可以获取task的执行结果print(task1.result())# ---------urls = [3,2,4]all_task = [executor.submit(get_html, (url)) for url in urls] # 批量提交wait(all_task, return_when=FIRST_COMPLETED)print("main")# 要获取已经成功的task的返回# for future in as_completed(all_task):# data = future.result()# print("get {} page".format(data))# 通过executor的map获取已经完成的task的值# for data in executor.map(get_html, urls):# print("get {} page".format(data)) # 跟提交值顺序相同# ----Falseget page 2 successget page 3 success3get page 2 successmainget page 3 successget page 4 success
from concurrent.futures import Future #未来对象,task的返回容器
11.6 多进程编程-multiprocessing
# 多进程编程# 耗cpu的操作,用多进程编程, 对于io操作来说, 使用多线程编程,进程切换代价要高于线程# 1. 对于耗费cpu的操作,多进程由于多线程import timefrom concurrent.futures import ThreadPoolExecutor, as_completedfrom concurrent.futures import ProcessPoolExecutordef fib(n): if n<=2: return 1 return fib(n-1)+fib(n-2)if __name__ == "__main__": with ProcessPoolExecutor(3) as executor: #last time is: 14.505059242248535# with ThreadPoolExecutor(3) as executor: # last time is: 30.066641330718994 all_task = [executor.submit(fib, (num)) for num in range(25,40)] start_time = time.time() for future in as_completed(all_task): data = future.result() print("exe result: {}".format(data)) print("last time is: {}".format(time.time()-start_time))
#2. 对于io操作来说,多线程优于多进程def random_sleep(n): time.sleep(n) return nif __name__ == "__main__": # with ThreadPoolExecutor(3) as executor: with ProcessPoolExecutor(3) as executor: all_task = [executor.submit(random_sleep, (num)) for num in [1]*30] start_time = time.time() for future in as_completed(all_task): data = future.result() print("exe result: {}".format(data)) print("last time is: {}".format(time.time()-start_time))
import osimport time# fork只能用Linux/unix中pid = os.fork()print("lewen",pid)if pid ==0: #子进程拷贝 print("子进程 %s,父进程 %s"%(os.getpid(),os.getppid()))else: print("我是父进程:%s"%(pid))time.sleep(2)[root@doit ~]# python fork_test.py ('lewen', 16077)我是父进程:16077('lewen', 0)子进程 16077,父进程 16076import osimport time# fork只能用Linux/unix中print("lewen",pid)pid = os.fork()if pid ==0: #子进程拷贝 print("子进程 %s,父进程 %s"%(os.getpid(),os.getppid()))else: print("我是父进程:%s"%(pid))time.sleep(2)[root@doit ~]# python fork_test.py lewen我是父进程:16096子进程 16096,父进程 16095
from concurrent.futures import ProcessPoolExecutor # 进程池,基于multiprocessing,推荐import multiprocessing# 多进程编程import timedef get_html(n): time.sleep(n) print("sub_progress success") return nclass MyProcess(multiprocessing.Process): def run(self): passif __name__ == "__main__": # progress = multiprocessing.Process(target=get_html, args=(2,)) # print(progress.pid) # progress.start() # print(progress.pid) # progress.join() # print("main progress end") """ None 10796 sub_progress success main progress end """ # 使用线程池 pool = multiprocessing.Pool(multiprocessing.cpu_count()) # result = pool.apply_async(get_html, args=(3,)) # 异步提交任务 # # # 等待所有任务完成 # pool.close() # 关闭,不再接受新的任务进来,才不会出错 # pool.join() # # print(result.get()) """ sub_progress success 3 """ # imap # for result in pool.imap(get_html, [1, 5, 3]): # print("{} sleep success".format(result)) """ sub_progress success 1 sleep success sub_progress success sub_progress success 5 sleep success 3 sleep success """ for result in pool.imap_unordered(get_html, [1, 5, 3]): # 谁先完成就打出来 print("{} sleep success".format(result)) """ sub_progress success 1 sleep success sub_progress success 3 sleep success sub_progress success 5 sleep success """
11.7 进程间通信
1 multiprocessing.Queue
# 共享全局变量通信# 共享全局变量不能适用于多进程编程,可以适用于多线程def producer(a): a += 100 time.sleep(2)def consumer(a): time.sleep(2) print(a)if __name__ == "__main__": a = 1 my_producer = Process(target=producer, args=(a,)) my_consumer = Process(target=consumer, args=(a,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join()---1
# multiprocessing中的queue不能用于pool进程池# pool中的进程间通信需要使用manager中的queue import timefrom multiprocessing import Process, Queue, Pool, Manager, Pipedef producer(queue): queue.put("a") time.sleep(2)def consumer(queue): time.sleep(2) data = queue.get() print(data)if __name__ == "__main__": queue = Manager().Queue(10) pool = Pool(2) pool.apply_async(producer, args=(queue,)) pool.apply_async(consumer, args=(queue,)) pool.close() pool.join()--a
#通过pipe(管道)实现进程间通信#pipe的性能高于queuedef producer(pipe): pipe.send("lewen")def consumer(pipe): print(pipe.recv())if __name__ == "__main__": recevie_pipe, send_pipe = Pipe() # pipe只能适用于两个进程 my_producer= Process(target=producer, args=(send_pipe, )) my_consumer = Process(target=consumer, args=(recevie_pipe,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join()
内存共享
def add_data(p_dict, key, value): p_dict[key] = valueif __name__ == "__main__": progress_dict = Manager().dict() from queue import PriorityQueue # 优先级队列,后插入的数据尽快被获取到 first_progress = Process(target=add_data, args=(progress_dict, "lewen1", 22)) second_progress = Process(target=add_data, args=(progress_dict, "lewen2", 23)) first_progress.start() second_progress.start() first_progress.join() second_progress.join() print(progress_dict) --- {'lewen1': 22, 'lewen2': 23}