#include "witmqttclient.h" #include #include #include //#include static char CRLF_SEP[] = "\r\n\r\n"; static int SEP_LEN = 4; class WitMqttRpcEventLoopPrivate { Q_DECLARE_PUBLIC(WitMqttRpcEventLoop) public: explicit WitMqttRpcEventLoopPrivate(WitMqttRpcEventLoop *qptr): q_ptr(qptr){} ~WitMqttRpcEventLoopPrivate(){} QString uid; QByteArray data; QTimer timer; protected: WitMqttRpcEventLoop * const q_ptr; }; WitMqttRpcEventLoop::WitMqttRpcEventLoop(const QString &iUid, int iTimeout, QObject *parent) : QEventLoop(parent), d_ptr(new WitMqttRpcEventLoopPrivate(this)) { Q_D(WitMqttRpcEventLoop); d->uid = iUid; d->timer.setSingleShot(true); d->timer.setInterval(iTimeout); connect(&d->timer, &QTimer::timeout, this, &WitMqttRpcEventLoop::onTimeout); } WitMqttRpcEventLoop::~WitMqttRpcEventLoop() { } void WitMqttRpcEventLoop::quit(const QString &iUid, const QByteArray &iData) { Q_D(WitMqttRpcEventLoop); if (d->uid == iUid) { d->data = iData; QEventLoop::quit(); } } void WitMqttRpcEventLoop::setUid(const QString &iUid) { Q_D(WitMqttRpcEventLoop); d->uid = iUid; } QString WitMqttRpcEventLoop::uid() const { Q_D(const WitMqttRpcEventLoop); return d->uid; } QByteArray WitMqttRpcEventLoop::data() const { Q_D(const WitMqttRpcEventLoop); return d->data; } void WitMqttRpcEventLoop::exec(QEventLoop::ProcessEventsFlags flags) { Q_D(WitMqttRpcEventLoop); d->timer.start(); QEventLoop::exec(flags); } void WitMqttRpcEventLoop::onTimeout() { Q_D(WitMqttRpcEventLoop); if (this->isRunning()) { WitMqttRpcRepMessage rep; rep.data = QVariantMap { {"errCode", "TIMEOUT"}, {"errText", "Timeout"}, }; d->data = rep.toByteArray(); QEventLoop::quit(); } } class WitMqttClientPrivate { Q_DECLARE_PUBLIC(WitMqttClient) public: explicit WitMqttClientPrivate(WitMqttClient *qptr) : q_ptr(qptr){} ~WitMqttClientPrivate() { for (const QString &k : fileMap.keys()) { delete fileMap[k]; } fileMap.clear(); } quint64 fileTransPackageSize {1024 * 512}; int fileTransTimeout {1000 * 60}; QString fileTempDir {QDir::tempPath()}; QMap fileMap; QMap fileTimer; bool canceled {false}; protected: WitMqttClient * const q_ptr; }; WitMqttClient::WitMqttClient(const QString &iHost, qint16 iPort, QObject *parent): QMQTT::Client(QHostAddress(iHost), iPort, parent), d_ptr(new WitMqttClientPrivate(this)) { connect(this, &WitMqttClient::connected, this, &WitMqttClient::onConnected); connect(this, &WitMqttClient::subscribed, this, &WitMqttClient::onSubscribed); connect(this, &WitMqttClient::received, this, &WitMqttClient::onReceived); } WitMqttClient::~WitMqttClient() { } QVariantMap WitMqttClient::rpcRequest(const QString &iTarget, const QString &iFunc, const QVariantMap &iParam, int iTimeout) { WitMqttRpcReqMessage req; req.from = this->clientId(); req.to = iTarget; req.func = iFunc; req.param = iParam; QMQTT::Message qmsg; qmsg.setTopic(QString("%1/%2").arg(req.to).arg(WIT_MQTT_TOPIC_REQ)); qmsg.setQos(1); qmsg.setPayload(req.toByteArray()); this->publish(qmsg); if (iTimeout > 0) { WitMqttRpcEventLoop loop(req.uid, iTimeout); connect(this, &WitMqttClient::rpcResponseReceived, &loop, &WitMqttRpcEventLoop::quit); loop.exec(); WitMqttRpcRepMessage rep(loop.data()); return rep.data; } return QVariantMap(); } void WitMqttClient::sendRpcReply(const WitMqttRpcRepMessage &iMsg) { QMQTT::Message repmsg; repmsg.setTopic(QString("%1/%2").arg(iMsg.to).arg(WIT_MQTT_TOPIC_REP)); repmsg.setQos(1); repmsg.setPayload(iMsg.toByteArray()); this->publish(repmsg); } bool WitMqttClient::sendFile(const QString &iTargetClientId, const QString &iSrcFilePath, const QString &iTargetFileName) { Q_D(WitMqttClient); QFile file(iSrcFilePath); if (!file.exists()) return false; int maxsize = d->fileTransPackageSize; int timeout = d->fileTransTimeout; QString fileuid = QUuid::createUuid().toString(); QVariantMap progressInfo; progressInfo.insert("uid", fileuid); progressInfo.insert("to", iTargetClientId); progressInfo.insert("distFile", iTargetFileName); progressInfo.insert("srcFile", iSrcFilePath); //请求开始传输文件 { WitMqttFileMessage fmsg; fmsg.uid = fileuid; fmsg.state = 2; fmsg.from = this->clientId(); fmsg.to = iTargetClientId; fmsg.data = QVariantMap{{"target", iTargetFileName}}; QMQTT::Message qmsg; qmsg.setTopic(QString("%1/%2").arg(fmsg.to).arg(WIT_MQTT_TOPIC_FILE)); qmsg.setQos(1); qmsg.setPayload(fmsg.toByteArray()); this->publish(qmsg); WitMqttRpcEventLoop loop(fileuid, timeout); connect(this, &WitMqttClient::fileAckReplied, &loop, &WitMqttRpcEventLoop::quit); loop.exec(); if (!loop.data().isEmpty()){ progressInfo.insert("errCode", "TIMEOUT"); progressInfo.insert("errText", "Timeout"); emit fileSendProgress(progressInfo); return false; } } if (!file.open(QFile::ReadOnly)) return false; auto fileSize = file.size(); progressInfo.insert("size", fileSize); //传输文件内容 while (!file.atEnd() && !d->canceled) { QByteArray data = file.read(maxsize); WitMqttFileMessage fmsg; fmsg.uid = fileuid; fmsg.state = 3; fmsg.from = this->clientId(); fmsg.to = iTargetClientId; fmsg.data = QVariantMap{{"size", fileSize}, {"pos", file.pos()}}; QMQTT::Message qmsg; qmsg.setTopic(QString("%1/%2").arg(fmsg.to).arg(WIT_MQTT_TOPIC_FILE)); qmsg.setQos(1); data.prepend(CRLF_SEP); data.prepend(fmsg.toByteArray()); qmsg.setPayload(data); this->publish(qmsg); progressInfo.insert("pos", file.pos()); emit fileSendProgress(progressInfo); WitMqttRpcEventLoop loop(fileuid, timeout); connect(this, &WitMqttClient::fileAckReplied, &loop, &WitMqttRpcEventLoop::quit); loop.exec(); if (!loop.data().isEmpty()){ progressInfo.insert("errCode", "TIMEOUT"); progressInfo.insert("errText", "Timeout"); emit fileSendProgress(progressInfo); file.close(); return false; } } file.close(); // 文件取消传输 if (d->canceled) { QMQTT::Message req; WitMqttFileMessage msg; msg.uid = fileuid; msg.state = 5; msg.from = this->clientId(); msg.to = iTargetClientId; req.setTopic(QString("%1/%2").arg(iTargetClientId).arg(WIT_MQTT_TOPIC_FILE)); req.setQos(1); req.setPayload(msg.toByteArray()); this->publish(req); progressInfo.insert("errText", "canceled"); emit fileSendProgress(progressInfo); return false; } //文件传输完成 { WitMqttFileMessage fmsg; fmsg.uid = fileuid; fmsg.state = 4; fmsg.from = this->clientId(); fmsg.to = iTargetClientId; fmsg.data = QVariantMap{}; QMQTT::Message qmsg; qmsg.setTopic(QString("%1/%2").arg(fmsg.to).arg(WIT_MQTT_TOPIC_FILE)); qmsg.setQos(1); qmsg.setPayload(fmsg.toByteArray()); this->publish(qmsg); WitMqttRpcEventLoop loop(fileuid, timeout); connect(this, &WitMqttClient::fileAckReplied, &loop, &WitMqttRpcEventLoop::quit); loop.exec(); if (!loop.data().isEmpty()){ progressInfo.insert("errCode", "TIMEOUT"); progressInfo.insert("errText", "Timeout"); emit fileSendProgress(progressInfo); return false; } } return true; } void WitMqttClient::setFileTempDir(const QString &iPath) { Q_D(WitMqttClient); d->fileTempDir = iPath; } QString WitMqttClient::fileTempDir() const { Q_D(const WitMqttClient); return d->fileTempDir; } void WitMqttClient::setFileTransPackageSize(quint64 iSize) { Q_D(WitMqttClient); d->fileTransPackageSize = iSize; } quint64 WitMqttClient::fileTransPackageSize() const { Q_D(const WitMqttClient); return d->fileTransPackageSize; } void WitMqttClient::setFileTransTimeout(int iTimeout) { Q_D(WitMqttClient); d->fileTransTimeout = iTimeout; } int WitMqttClient::fileTransTimeout() const { Q_D(const WitMqttClient); return d->fileTransTimeout; } void WitMqttClient::cancelFileTrans() { Q_D(WitMqttClient); d->canceled = true; } void WitMqttClient::onConnected() { // TOP8LOG.info() << QString("client [%1] connected to mqtt broker [%2:%3].").arg(this->clientId()).arg(this->host().toString()).arg(this->port()); this->subscribe(QString("%1/%2").arg(this->clientId()).arg(WIT_MQTT_TOPIC_REQ), 1); this->subscribe(QString("%1/%2").arg(this->clientId()).arg(WIT_MQTT_TOPIC_REP), 1); this->subscribe(QString("%1/%2").arg(this->clientId()).arg(WIT_MQTT_TOPIC_FILE), 1); this->subscribe(QString("%1/%2/#").arg(WIT_MQTT_TOPIC_PREFIX).arg(WIT_MQTT_TOPIC_BUS), 0); } void WitMqttClient::onSubscribed(const QString &iTopic, quint8 iQos) { Q_UNUSED(iTopic); Q_UNUSED(iQos); // TOP8LOG.info() << QString("client [%1] subscribed topic [%2] with qos [%3].").arg(this->clientId()).arg(iTopic).arg(iQos); } void WitMqttClient::onReceived(const QMQTT::Message &iMsg) { Q_D(WitMqttClient); if (iMsg.topic().endsWith(WIT_MQTT_TOPIC_REQ)) { WitMqttRpcReqMessage req(iMsg.payload()); // TOP8LOG.debug() << QString("[%1] receivce rpc request [%2].").arg(this->clientId()).arg(req.func); emit rpcRequestReceived(req); } else if (iMsg.topic().endsWith(WIT_MQTT_TOPIC_REP)) { WitMqttRpcRepMessage rep(iMsg.payload()); emit rpcResponseReceived(rep.uid, iMsg.payload()); } else if (iMsg.topic().endsWith(WIT_MQTT_TOPIC_FILE)) { QByteArray resp = iMsg.payload(); int idx = resp.indexOf(CRLF_SEP); WitMqttFileMessage fmsg(idx < 0 ? resp : resp.left(idx)); //文件传输应答 if (fmsg.state == 1) { emit fileAckReplied(fmsg.uid, ""); } //请求开始传输文件 else if (fmsg.state == 2) { if (d->fileMap.contains(fmsg.uid)) { auto file = d->fileMap[fmsg.uid]; file->close(); file->remove(); delete file; d->fileMap.remove(fmsg.uid); delete d->fileTimer[fmsg.uid]; d->fileTimer.remove(fmsg.uid); } QString filepath = fmsg.data.value("target").toString(); auto file = new QFile(QDir(d->fileTempDir).absoluteFilePath(fmsg.uid), this); file->setProperty("TARGET_PATH", filepath); if (file->exists()) file->remove(); file->open(QFile::WriteOnly); d->fileMap.insert(fmsg.uid, file); auto timer = new QTimer(this); timer->setSingleShot(true); connect(timer, &QTimer::timeout, [=](){ if (d->fileMap.contains(fmsg.uid)) { auto file = d->fileMap[fmsg.uid]; file->close(); file->remove(); delete file; d->fileMap.remove(fmsg.uid); d->fileTimer.remove(fmsg.uid); timer->deleteLater(); QVariantMap progressInfo; progressInfo.insert("uid", fmsg.uid); progressInfo.insert("from", fmsg.from); progressInfo.insert("distFile", filepath); progressInfo.insert("errCode", "TIMEOUT"); progressInfo.insert("errText", "Timeout"); emit fileReceiveProgress(progressInfo); } }); d->fileTimer.insert(fmsg.uid, timer); QVariantMap progressInfo; progressInfo.insert("uid", fmsg.uid); progressInfo.insert("from", fmsg.from); progressInfo.insert("distFile", filepath); emit fileReceiveProgress(progressInfo); WitMqttFileMessage ack; ack.uid = fmsg.uid; ack.from = fmsg.to; ack.to = fmsg.from; ack.state = 1; //状态应答 QMQTT::Message qmsg; qmsg.setTopic(QString("%1/%2").arg(ack.to).arg(WIT_MQTT_TOPIC_FILE)); qmsg.setQos(1); qmsg.setPayload(ack.toByteArray()); this->publish(qmsg); timer->start(d->fileTransTimeout); } //文件内容传输 else if (fmsg.state == 3) { if (d->fileMap.contains(fmsg.uid)) { auto file = d->fileMap.value(fmsg.uid); if (!file->isOpen()) file->open(QFile::WriteOnly); int ridx = resp.size() - idx - SEP_LEN; file->write(resp.right(ridx)); QVariantMap progressInfo; progressInfo.insert("uid", fmsg.uid); progressInfo.insert("from", fmsg.from); progressInfo.insert("size", fmsg.data.value("size")); progressInfo.insert("pos", fmsg.data.value("pos")); emit fileReceiveProgress(progressInfo); WitMqttFileMessage ack; ack.uid = fmsg.uid; ack.from = fmsg.to; ack.to = fmsg.from; ack.state = 1; //状态应答 QMQTT::Message qmsg; qmsg.setTopic(QString("%1/%2").arg(ack.to).arg(WIT_MQTT_TOPIC_FILE)); qmsg.setQos(1); qmsg.setPayload(ack.toByteArray()); this->publish(qmsg); d->fileTimer[fmsg.uid]->start(d->fileTransTimeout); } } //文件传输完成 else if (fmsg.state == 4) { if (d->fileMap.contains(fmsg.uid)) { auto timer = d->fileTimer.value(fmsg.uid); timer->stop(); delete timer; d->fileTempDir.remove(fmsg.uid); auto file = d->fileMap.value(fmsg.uid); file->close(); auto filename = file->fileName(); auto targetpath = file->property("TARGET_PATH").toString(); delete file; d->fileMap.remove(fmsg.uid); WitMqttFileMessage ack; ack.uid = fmsg.uid; ack.from = fmsg.to; ack.to = fmsg.from; ack.state = 1; //状态应答 QMQTT::Message qmsg; qmsg.setTopic(QString("%1/%2").arg(ack.to).arg(WIT_MQTT_TOPIC_FILE)); qmsg.setQos(1); qmsg.setPayload(ack.toByteArray()); this->publish(qmsg); emit fileReceiveFinished(filename, targetpath); } } // 文件取消传输 else if (fmsg.state == 5) { if (d->fileMap.contains(fmsg.uid)) { auto timer = d->fileTimer.value(fmsg.uid); timer->stop(); delete timer; d->fileTempDir.remove(fmsg.uid); auto file = d->fileMap.value(fmsg.uid); file->remove(); delete file; d->fileMap.remove(fmsg.uid); } } else { QVariantMap err { {"errText", "Invalid state"} }; emit fileReceiveProgress(err); } } else if (iMsg.topic().startsWith(QString("%1/%2").arg(WIT_MQTT_TOPIC_PREFIX).arg(WIT_MQTT_TOPIC_BUS))) { emit busMessageReceived(iMsg); } }