當(dāng)計(jì)算機(jī)運(yùn)行程序時(shí),就會(huì)創(chuàng)建包含代碼和狀態(tài)的進(jìn)程。這些進(jìn)程會(huì)通過計(jì)算機(jī)的一個(gè)或多個(gè)CPU執(zhí)行。不過,同一時(shí)刻每個(gè)CPU只會(huì)執(zhí)行一個(gè)進(jìn)程,然后不同進(jìn)程間快速切換,給我們一種錯(cuò)覺,感覺好像多個(gè)程序在同時(shí)進(jìn)行。例如:有一個(gè)大型工廠,該工廠負(fù)責(zé)生產(chǎn)電腦,工廠有很多的車間用來生產(chǎn)不同的電腦部件。每個(gè)車間又有很多工人互相合作共享資源來生產(chǎn)某個(gè)電腦部件。這里的工廠相當(dāng)于一個(gè)爬蟲工程,每個(gè)車間相當(dāng)于一個(gè)進(jìn)程,每個(gè)工人就相當(dāng)于線程。線程是CPU調(diào)度的基本單元。
需要注意的是單核CPU系統(tǒng)中,真正的并發(fā)是不可能的.
1.順序執(zhí)行
2.多進(jìn)程并發(fā) 注意除了時(shí)間的加速意外也要看看函數(shù)返回值的寫法,帶有多進(jìn)程的map,是返回一個(gè)列表
- import requests
- import re
- import time
- from multiprocessing import Pool
- from multiprocessing.dummy import Pool as ThreadPool
- def spyder(url):
- # res = []
- res = {'init:':'hello'}
- print('hahah:{}'.format(url))
- time.sleep(1)
- # res.append(url)
- res.update({'entr:'+url:url})
- return res
- def use_process():
- urls = ["https://www.qiushibaike.com/text/page/{}/".format(str(i)) for i in range(0, 4)]
- start_1 = time.time()
- #獲取函數(shù)返回結(jié)果
- res1 = []
- for url in urls:
- res_ = spyder(url)
- res1.append(res_)
- end_1 = time.time()
- print("單進(jìn)程:", end_1 - start_1)
- print('res1:', res1)
- # 獲取函數(shù)返回結(jié)果
- # 進(jìn)程池
- start_2 = time.time()
- pool = Pool(processes=2)
- res2 = pool.map(spyder, urls)
- pool.close()
- pool.join()
- print('res2:', res2)
- end_2 = time.time()
- print("2進(jìn)程:", end_2 - start_2)
- # 獲取函數(shù)返回結(jié)果
- # 進(jìn)程池
- start_3 = time.time()
- pool = Pool(processes=4)
- res3 = pool.map(spyder, urls)
- pool.close()
- pool.join()
- print('res2:', res3)
- end_3 = time.time()
- print("4進(jìn)程:", end_3 - start_3)
- if __name__ == "__main__":
- use_process()
2.多線程
2.1 thread多線程
- import time
- import _thread
- from threading import Thread
- # 使用線程鎖,防止線程死鎖
- mutex = _thread.allocate_lock()
- def test(d_num):
- d_num.append(89)
- print("test: %s"% str(d_num))
- def test1(d_num):
- print("test1: %s"% str(d_num))
- def main():
- d_num = [100, 58]
- t1 = Thread(target=test, args=(d_num,))
- t2 = Thread(target=test1, args=(d_num,))
- t1.start()
- time.sleep(1)
- t2.start()
- time.sleep(1)
- if __name__ == '__main__':
- main()
2.2 多線程隊(duì)列版
- import time
- import _thread
- from threading import Thread
- import queue
- # 使用線程鎖,防止線程死鎖
- mutex = _thread.allocate_lock()
- frame_queue = queue.Queue()
- def test(d_num):
- print("test: %s" % str(d_num))
- for i in range(d_num):
- frame_queue.put(i)
- def test1():
- while 1:
- if frame_queue.empty() != True:
- # 從隊(duì)列中取出圖片
- value = frame_queue.get()
- print('==value:', value)
- time.sleep(1)
- else:
- break
- def main():
- d_num = 10
- t1 = Thread(target=test, args=(d_num,))
- t1.start()
- t2 = Thread(target=test1)
- t2.start()
- if __name__ == '__main__':
- main()
2.3 注意傳參與多進(jìn)程的區(qū)別,線程池
- from functools import partial
- from itertools import repeat
- from multiprocessing import Pool, freeze_support
- def func(a, b):
- return a + b
- def main():
- a_args = [1, 2, 3]
- second_arg = 1
- with Pool() as pool:
- L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
- print('L:', L)
- M = pool.starmap(func, zip(a_args, repeat(second_arg)))
- print('M:', M)
- N = pool.map(partial(func, b=second_arg), a_args)
- print('N:', N)
- main()
- import requests
- import re
- import time
- from multiprocessing import Pool
- from multiprocessing.dummy import Pool as ThreadPool
- def spyder(url):
- # res = []
- res = {'init:':'hello'}
- print('hahah:{}'.format(url))
- time.sleep(1)
- # res.append(url)
- res.update({'entr:'+url:url})
- return res
- def use_process():
- urls = ["https://www.qiushibaike.com/text/page/{}/".format(str(i)) for i in range(0, 4)]
- start_1 = time.time()
- #獲取函數(shù)返回結(jié)果
- res1 = []
- for url in urls:
- res_ = spyder(url)
- res1.append(res_)
- end_1 = time.time()
- print("單進(jìn)程:", end_1 - start_1)
- print('res1:', res1)
- # 獲取函數(shù)返回結(jié)果
- # 進(jìn)程池
- start_2 = time.time()
- pool = Pool(processes=2)
- res2 = pool.map(spyder, urls)
- pool.close()
- pool.join()
- print('res2:', res2)
- end_2 = time.time()
- print("2進(jìn)程:", end_2 - start_2)
- # 獲取函數(shù)返回結(jié)果
- # 進(jìn)程池
- start_3 = time.time()
- pool = Pool(processes=4)
- res3 = pool.map(spyder, urls)
- pool.close()
- pool.join()
- print('res2:', res3)
- end_3 = time.time()
- print("4進(jìn)程:", end_3 - start_3)
- def use_threadpool():
- urls = [["https://www.qiushibaike.com/text/page/{}/".format(str(i))] for i in range(0, 4)]
- print('urls:', urls)
- # 線程池
- start = time.time()
- pool = ThreadPool(processes=4)
- res = pool.starmap(spyder, urls)
- pool.close()
- pool.join()
- end = time.time()
- print('res:', res)
- print("4線程:", end - start)
- if __name__ == "__main__":
- # use_process()
- use_threadpool()
實(shí)際應(yīng)用將圖片路徑和名字傳入,用zip方式打包傳參
- import os
- import cv2
- import time
- import itertools
- from multiprocessing.dummy import Pool as ThreadPool
- SIZE = (75,75)
- SAVE_DIRECTORY='thumbs'
- def save_img(filename,save_path):
- save_path+= filename.split('/')[-1]
- im = cv2.imread(filename)
- im=cv2.resize(im,SIZE)
- cv2.imwrite(save_path,im)
- if __name__ == '__main__':
- path='./data/testlabel'
- print(path)
- output_path='./data/thumbs/'
- if not os.path.exists(output_path):
- os.mkdir(output_path)
- print(output_path)
- imgs_list_path=[os.path.join(path,i) for i in os.listdir(path)]
- print(len(imgs_list_path))
- start_time=time.time()
- pool = ThreadPool(processes=8)
- print(list(zip(imgs_list_path,[output_path]*len(imgs_list_path))))
- pool.starmap(save_img,zip(imgs_list_path,[output_path]*len(imgs_list_path)))
- pool.close()
- pool.join()
- end_time=time.time()
- print('use time=',end_time-start_time)
聯(lián)系客服