PyQt5

PyQt5实现文件传输程序(五):PyQt多线程

在制作GUI程序时有一个比较重要的地方便是要将UI线程与工作线程分离。如果在主线程中同时放置GUI任务与处理任务的话,若处理任务运行的时间过长会阻塞事件循环。这样一来,GUI 所有的绘制和交互都被阻塞在事件队列中,整个程序就失去响应了。对于这样的阻塞一般有两种解决办法:一是在计算任务中不停地调用静态成员函数 QCoreApplication.processEvents() 来手动运行事件循环,它会在处理完队列中所有事件后返回。不过这样做毕竟没有从根本上解决问题,另外如果两次函数调用之间间隔的时间不够短,用户仍能明显感觉到程序卡顿。第二种解决办法就是为任务新开一个线程,这样就能在不干扰 GUI 线程的情况下完成计算了。
下面我们就来讲讲如何实现PyQt的多线程。

一、如何实现PyQt的多线程

Qt 提供了三种控制线程的方式:QThread、QRunnable / QThreadPool、QtConcurrent,其中最通用、也是最常见的是 QThread。但第二种方法并不为官方所推荐,Qt的开发人员Bradley T. Hughes专门为此写了一篇博客you are-doing-it-wrong来指出Qthread为大多数人错误地使用,因为只有定义在Qthread类的run函数中的代码是在子线程中执行的,而在run函数中发出信号调用的槽函数却是在主线程中执行的!。为了解决这一问题,官方提供了一种替代方式。

二、如何正确地使用PyQt多线程

Bradley T. Hughes 给出的上述问题的解决方法是: QThread 应该被看做是操作系统线程的接口或控制点,而不应该包含需要在新线程中运行的代码。需要运行的代码应该放到一个QObject的子类中,然后将该子类的对象moveToThread到新线程中。下面,我们就通过创建一个服务器处理子线程并与主线程进行通信的例子来说明这一用法。

三、一个栗子

首先定义一个继承QObject类的子类,并定义处理函数。

import time
import json
import socket
import struct
import hashlib
import sqlite3
from key import *
from queue import Queue
from threading import Thread
from os import listdir, remove
from os.path import getsize, isfile, join
from PyQt5.QtCore import QObject, pyqtSignal

class Server(QObject):
    msgSignal = pyqtSignal(str, str)    # 更新消息栏的信号
    lgSignal = pyqtSignal(str)  # 登录的信号
    statSignal = pyqtSignal(str)    # 更新日志的信号
    quitSignal = pyqtSignal(str)    # 退出的信号

    def __init__(self, host, port, num, path):
        super().__init__()
        self.s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        self.path = path    # 工作文件夹路径
        self.cnum = 0   # 已连接用户数
        self.num = num  # 最大连接数
        if host == 'localhost':
            host = socket.gethostname()
        self.s.bind((host, port))   # 绑定端口

        self.s.listen(num + 1)
        self.users = {} # 已连接用户字典

    # 主线程
    def run(self):
        # 打印启动信息
        now = time.strftime('%H:%M:%S')
        self.statSignal.emit('[' + now + ']: 服务器已启动!')


        while True:
            c, addr = self.s.accept()   # 每接收到一个连接请求

            self.cnum += 1  # 连接数加1
            cq = Queue()    # 实例化一个队列作为发送线程与接收线程间传递数据的通道

            # 开启接收线程
            crthread = Thread(target=self.crec, args=(c, addr, cq))
            crthread.daemon = True  # 设置随主线程退出
            crthread.start()    # 开始运行

            # 开启发送线程
            csthread = Thread(target=self.csend, args=(c, addr, cq))
            csthread.daemon = True  # 设置随主线程退出
            csthread.start()    # 开始运行

    # 接收线程
    def crec(self, c, addr, q):

        while True:
            head_dic = self.deread(c)   # 接收报头
            tp = head_dic['type']   # 提起报头的命令

            if tp == 'end': # 如果要求结束
                q.put(head_dic) # 将其传递给发送线程并跳出
                break
            else:   
                cnt = head_dic['cnt']   # 提取命令内容

            if tp == 'lg':  # 要求登录则调用登录处理方法
                self.lg(cnt, q)
            elif tp == 'rgs':
                self.rgs(cnt, q)    # 要求注册则调用注册处理方法
            elif tp == 'msg':
                # 对消息的处理
                now = time.strftime('%H:%M:%S')
                self.msgSignal.emit(cnt['ur'], cnt['ur'] + '(' + now + '):' + cnt['msg'])   # 更新消息栏
            elif tp == 'dwnf':
                q.put(head_dic) # 要求下载文件将命令传递给发送线程
            elif tp == 'sendf':
                self.recf(c, cnt, q)    # 要求发送文件则调用接收文件处理方法

    # 发送线程    
    def csend(self, c, addr, q):
        while True:
            data = q.get()  # 获取接收线程的信息
            if data['type'] == 'dwnf':    # 用户请求下载文件
                self.sendf(c, data) # 调用发送文件的处理方法

            elif data['type'] == 'end': # 要求退出

                # 打印退出日志信息并跳出循环
                if 'ur' in data:
                    ur = data['ur']
                    self.quitSignal.emit(ur)
                    self.users.pop(ur)
                else:
                    ur = str(addr)
                now = time.strftime('%H:%M:%S')
                self.statSignal.emit('[' + now + ']【' + ur + '】:退出')
                self.ensend(c, data)
                break

            else:
                self.ensend(c, data)    # 否则,就将接收的命令发送给客户端

    # 接收用户上传的文件
    def recf(self, c, cnt, q):
        ur = cnt['ur']  # 获取用户名
        fsize = cnt['fsize']    # 获取文件大小
        fname = cnt['fname']    # 获取文件名
        fmd5 = cnt['fmd5']  # 获取MD5值
        data = {'type': 'sendf', 'cnt': {}} # 构建命令

        dsize = 0   # 已接收文件大小
        dmd5 = hashlib.md5()

        with open(self.path + '/' + fname, 'wb') as f:
            # 打开文件持续接收
            while dsize < fsize:
                block = c.recv(1024)
                f.write(block)
                dmd5.update(block)
                dsize += len(block) # 更新已接收文件大小

        # 校验文件
        if fmd5 == dmd5.hexdigest():
            # 发送成功接收信息
            data['cnt']['result'] = True
            data['cnt']['flist'] = [ f for f in listdir(self.path) if isfile(join(self.path, f)) ]
            msg = fname + '发送成功!'

        else:
            remove(fname)    # 删除文件
            data['cnt']['result'] = False
            msg = fname + '发送失败!'

        data['cnt']['msg'] = msg    # 发送失败信息
        now = time.strftime('%H:%M:%S')
        self.statSignal.emit('[' + now + ']【' + ur + '】:' + msg)    # 更新服务器日志

        q.put(data) # 将命令送给发送线程

    # 发送用户下载的文件
    def sendf(self, c, data):
        ur = data['cnt']['ur']  # 获取用户名
        fname = data['cnt']['fname']    # 获取文件名
        fsize = getsize(self.path + '/' + fname)  # 文件大小
        data['cnt']['fsize'] = fsize    # 设置命令中的文件大小字段
        data['cnt']['fmd5'] = self.getMD5(data['cnt'])  # 设置命令中的MD5字段

        self.ensend(c, data)    # 向客户端发送控制命令

        dsize = 0

        # 打开文件进行发送
        with open(self.path + '/' + fname, 'rb') as f:
            while dsize < fsize:
                block = f.read(1024)
                c.send(block)
                dsize += len(block) # 更新发送文件大小

        msg = fname + '发送完毕!'
        now = time.strftime('%H:%M:%S')
        self.statSignal.emit('[' + now + ']【' + ur + '】:' + msg)    # 更新日志信息

    # 用户验证
    def lg(self, cnt, q):
        data = {'type': 'lg', 'cnt': {}}    # 构造命令

        # 如果已达最大连接数量
        if self.cnum > self.num:
            msg = '连接已达最大数量!'
            data['cnt']['result'] = False   # 返回失败信息

        else:
            db = sqlite3.connect('net.db')  # 连接数据库
            cursor = db.cursor()

            ur = cnt['ur']
            pw = cnt['pw']

            # 将用户名、密码进行加密
            dur = encrypt(13, ur)
            dpw = encrypt(13, pw)

            # SQL查询该用户的密码
            sql = "SELECT pw FROM usr WHERE ur = '" + dur + "'"

            cursor.execute(sql)
            rpw = cursor.fetchone()[0]

            # 只有密码正确且该用户没有登录时
            if dpw == rpw and ur not in self.users:
                msg = '登录成功!'
                data['cnt']['result'] = True    # 返回正确信息
                data['cnt']['flist'] = [ f for f in listdir(self.path) if isfile(join(self.path, f)) ]  # 发送服务器文件列表
                self.users[ur] = q  # 为该用户建立消息传输队列
                self.lgSignal.emit(ur)  # 更新日志已消息框可选用户列表

            # 若用户已登录
            elif ur in self.users:
                msg = '此用户已登录!'
                data['cnt']['result'] = False   # 返回失败信息
            # 若查询结果为空或密码错误
            else:
                msg = '用户名或密码错误!'
                data['cnt']['result'] = False   # 返回错误信息

        data['cnt']['msg'] = msg    # 设置返回信息
        now = time.strftime('%H:%M:%S')
        self.statSignal.emit('['+ now +']【' + ur + '】:' + msg)  # 更新日志

        db.close()  # 关闭数据库

        q.put(data) # 发送登录结果

    # 用户注册
    def rgs(self, cnt, q):

        # 连接数据库
        db = sqlite3.connect('net.db')
        cursor = db.cursor()

        ur = cnt['ur']
        pw = cnt['pw']
        data = {'type': 'rgs', 'cnt': {}}   # 构造命令

        # 对用户名、密码加密
        dur = encrypt(13, ur)
        dpw = encrypt(13, pw)

        # 使用SQL语句来查询该用户
        sql = "SELECT * FROM usr WHERE ur = '" + dur + "'"

        cursor.execute(sql)

        result = cursor.fetchone()

        # 若用户已存在
        if result:
            msg = '用户已存在!'
            data['cnt']['result'] = False   # 返回错误信息
        else:
            msg = '注册成功!'
            csql = "INSERT INTO usr VALUES ('%s', '%s')" %(dur, dpw)    # 加入该用户
            cursor.execute(csql)
            db.commit()
            data['cnt']['result'] = True    # 返回成功信息

        data['cnt']['msg'] = msg    # 设置命令中的返回信息
        now = time.strftime('%H:%M:%S')
        self.statSignal.emit('[' + now + ']【' + ur + '】:' + msg)    # 更新日志

        db.close()  # 关闭数据库

        q.put(data) # 返回消息

    # 将收到的信息转化为报头
    def deread(self, c):
        # 接收报头长度
        head_struct = c.recv(4)
        head_len = struct.unpack('i', head_struct)[0]

        # 接收报头
        head_bytes = c.recv(head_len)
        head_json = head_bytes.decode('utf-8')
        head_dic = json.loads(head_json)

        return head_dic

    # 将要发送的信息转化为报头发送
    def ensend(self, c, data):
        # 制作报头
        head_json = json.dumps(data)  # json 序列化
        head_bytes = head_json.encode('utf-8')  # 要发送需要转换成字节数据

        # 发送报头的长度
        head_len = len(head_bytes)
        c.send(struct.pack('i', head_len))

        # 发送报头
        c.send(head_bytes)

    # 获取文件MD5值
    def getMD5(self, cnt):
        fname = cnt['fname']    # 发送文件名
        fsize = cnt['fsize']    # 发送文件大小

        fmd5 = hashlib.md5()

        # 逐块计算文件的MD5值
        with open(self.path + '/' + fname, 'rb') as f:
            dsize = 0
            while dsize < fsize:
                block = f.read(1024)
                fmd5.update(block)
                dsize += len(block)

        return fmd5.hexdigest()

在主线程中创建处理子线程。

import sys
import time
import qdarkstyle
from server import Server
from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtWidgets import (QWidget, QApplication, QGroupBox, QPushButton, QLabel, QHBoxLayout,
                             QVBoxLayout, QGridLayout, QFormLayout, QLineEdit, QTextBrowser,
                             QFileDialog, QComboBox, QMessageBox, QStackedWidget)

info = '''
    欢迎使用本软件!本程序为桑运鑫计算机网络大实验作品。
    =======================================================
    当前版本:V1.1
    更新日志:
    * 收发线程分离,修复粘包问题
    * 修复客户端未选择文件时禁用按钮的问题
    * 修复用户登录验证码大小写问题
    * 修复客户端文件显示问题
    * 增加文件传输MD5校验功能
    * 增加用户名密码传输加密功能
    * 用户数据库从Mysql迁移至sqlite
    =======================================================
    相关信息请咨询:sangyunxin@gmail.com
    本项目Github地址:https://github.com/sangyunxin/Net
    Copyright 2017 - 2018 Sangyunxin. All Rights Reserved. 
    =======================================================
        '''

class SForm(QWidget):

    finishSignal = pyqtSignal() # 结束线程的信号

    def __init__(self):
        super().__init__()
        self.userBox = {}   # 每个用户对话框的字典

        self.initUI()

    # 绘制界面
    def initUI(self):
        self.createGridGroupBox()
        self.creatVboxGroupBox()
        self.creatFormGroupBox()
        mainLayout = QVBoxLayout()
        hboxLayout = QHBoxLayout()
        hboxLayout.addStretch()  

        # 调用方法绘制界面
        self.setWindowTitle('服务器软件')
        hboxLayout.addWidget(self.gridGroupBox)
        hboxLayout.addWidget(self.vboxGroupBox)
        mainLayout.addLayout(hboxLayout)
        mainLayout.addWidget(self.formGroupBox)
        self.setLayout(mainLayout)

    # 绘制服务器配置部分
    def createGridGroupBox(self):
        self.gridGroupBox = QGroupBox('服务器配置')
        layout = QGridLayout()

        # 设置标签、输入框
        iplb = QLabel('服务器地址')
        self.ip = QLineEdit('localhost')
        self.ip.setEnabled(False)
        portlb = QLabel('开放端口')
        self.port = QLineEdit('1234')
        self.port.setEnabled(False)
        maxlb = QLabel('最大连接数')
        self.maxnum = QLineEdit('5')
        self.flb = QLabel('工作文件夹')
        self.fpath = QLineEdit('./sfile')
        selpath = QPushButton('选择')

        # 为按钮绑定点击事件
        selpath.clicked.connect(self.showDialog)

        self.runbt = QPushButton('启动')
        self.runbt.clicked.connect(self.startServer)    # 为启动按钮绑定服务器启动方法

        layout.setSpacing(10) 
        layout.addWidget(iplb, 1, 0)
        layout.addWidget(self.ip, 1, 1)
        layout.addWidget(portlb, 2, 0)
        layout.addWidget(self.port, 2, 1)
        layout.addWidget(maxlb, 3, 0)
        layout.addWidget(self.maxnum, 3, 1)
        layout.addWidget(self.flb, 4, 0)
        layout.addWidget(self.fpath, 4, 1)
        layout.addWidget(selpath, 5, 0)
        layout.addWidget(self.runbt, 5, 1)

        layout.setColumnStretch(1, 10)
        self.gridGroupBox.setLayout(layout)

    # 绘制服务日志部分
    def creatVboxGroupBox(self):
        self.vboxGroupBox = QGroupBox('服务日志')
        layout = QVBoxLayout()
        self.log = QTextBrowser()
        layout.addWidget(self.log)
        self.vboxGroupBox.setLayout(layout)

    # 绘制消息对话框
    def creatFormGroupBox(self):
        self.formGroupBox = QGroupBox('消息')
        layout = QFormLayout()

        msgbox = QTextBrowser()

        self.stack = QStackedWidget(self)   # 设置一个堆栈以切换不同用户的对话界面
        self.stack.addWidget(msgbox)    # 每个用户有一个文本框展示信息

        self.userBox['无'] = msgbox

        self.showMsg('无', info)

        self.selur = QComboBox()    # 使用下拉列表选择用户对话框
        self.selur.addItem('无')
        self.selur.currentTextChanged.connect(self.changeBox)   # 绑定处理方法
        self.selur.setDisabled(True)

        # 绘制输入框和发送按钮
        childgrid = QGridLayout()
        self.umsg = QLineEdit()
        self.sendbt = QPushButton('发送')
        childgrid.addWidget(self.umsg, 0, 0)
        childgrid.addWidget(self.sendbt, 0, 1)
        layout.addRow(self.stack)
        layout.addRow(self.selur, childgrid)
        self.sendbt.clicked.connect(self.sendMsg)

        # 一开始禁用输入框和发送按钮
        self.umsg.setEnabled(False)
        self.sendbt.setEnabled(False)

        self.formGroupBox.setLayout(layout)

    # 展示选择文件夹对话框
    def showDialog(self):
        upath = QFileDialog.getExistingDirectory(self, '选择文件夹', '.')
        self.fpath.setText(upath)

    # 开始服务器线程
    def startServer(self):
        host = self.ip.text()
        port = int(self.port.text())
        num = int(self.maxnum.text())
        path = self.fpath.text()

        # 检测要求输入的字段是否为空
        if host and port and num and path:
            self.selur.setEnabled(True)
            self.runbt.setEnabled(False)

            # 实例化服务器线程
            self.sthread = QThread()    # 创建一个Qthread对象
            self.server = Server(host, port, num, path) # 创建一个Server对象

            # 绑定信号与槽
            self.server.statSignal.connect(self.addLog)
            self.server.lgSignal.connect(self.addUser)
            self.server.msgSignal.connect(self.showMsg)
            self.server.quitSignal.connect(self.removeUser)

            # 启动线程运行
            self.server.moveToThread(self.sthread)  # 使用moveToThread方法将server放到sthread上运行
            self.sthread.started.connect(self.server.run)   # 绑定开始事件

            self.sthread.start()    # 开始线程执行
        else:
            QMessageBox.information(self, '警告', '配置项不能为空!') # 发出警告

    # 更新服务器日志
    def addLog(self, logmsg):
        self.log.append(logmsg)

    # 为选择用户下拉列表中添加用户
    def addUser(self, ur):
        self.selur.addItem(ur)
        umsgBox = QTextBrowser()
        self.userBox[ur] = umsgBox
        self.stack.addWidget(umsgBox)

    # 显示信息
    def showMsg(self, ur, msg):
        self.userBox[ur].append(msg)

    # 移除用户
    def removeUser(self, ur):
        i = self.selur.findText(ur)
        self.selur.removeItem(i)
        self.stack.removeWidget(self.userBox[ur])

    # 根据选择的用户改变当前对话框
    def changeBox(self, ur):
        if ur != '无':
            self.umsg.setEnabled(True)
            self.sendbt.setEnabled(True)
        self.stack.setCurrentWidget(self.userBox[ur])

    # 发送消息
    def sendMsg(self):
        msg = self.umsg.text()

        now = time.strftime('%H:%M:%S')
        umsg = '本机(' + now + '): ' + msg
        self.stack.currentWidget().append(umsg) # 在对话框中展示消息

        self.umsg.clear()   # 清空输入框

        data = {'type': 'msg', 'cnt': {'msg': msg}} # 构造命令
        ur = self.selur.currentText()

        self.server.users[ur].put(data) # 发送数据

    # 自定义关闭事件
    def closeEvent(self, event):
        users = self.server.users
        for ur in users:
            data = {'type': 'end'}
            data['ur'] = ur
            users[ur].put(data)
        self.close()

# 服务器程序启动       
if __name__ == '__main__':
    app = QApplication(sys.argv)
    app.setStyleSheet(qdarkstyle.load_stylesheet_pyqt5())   # 美化界面
    sf = SForm()
    sf.show()
    sys.exit(app.exec_())

参考资料

纠正你的QThread 的使用方法