Skip to content

Commit 332e8c6

Browse files
committed
support for multithreaded processing
1 parent c65f708 commit 332e8c6

File tree

5 files changed

+148
-8
lines changed

5 files changed

+148
-8
lines changed

gp.cpp

+54-3
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,35 @@
1313
#include <QTcpSocket>
1414
#include <QSslSocket>
1515
#include <QDataStream>
16+
#include "thread.h"
1617
#include "gp_exception.h"
1718
#include "gp.h"
1819

1920
using namespace libgp;
2021

21-
GP::GP(QTcpSocket *tcp_socket)
22+
GP::GP(QTcpSocket *tcp_socket, bool mt)
2223
{
2324
this->socket = tcp_socket;
2425
this->sentBytes = 0;
2526
this->recvBytes = 0;
2627
// We don't want to receive single packet bigger than 800kb
2728
this->MaxIncomingCacheSize = 800 * 1024;
2829
this->incomingPacketSize = 0;
30+
if (!mt)
31+
{
32+
this->thread = NULL;
33+
}
34+
else
35+
{
36+
this->thread = new Thread(this);
37+
this->thread->start();
38+
}
39+
this->isMultithreaded = mt;
2940
}
3041

3142
GP::~GP()
3243
{
44+
delete this->thread;
3345
delete this->socket;
3446
}
3547

@@ -75,6 +87,23 @@ void GP::OnIncomingCommand(QString text, QHash<QString, QVariant> parameters)
7587
emit this->Event_IncomingCommand(text, parameters);
7688
}
7789

90+
void GP::processPacket()
91+
{
92+
if (this->isMultithreaded)
93+
{
94+
// Store this byte array into fifo for later processing by processor thread
95+
this->mtLock.lock();
96+
this->mtBuffer.append(this->incomingCache);
97+
this->mtLock.unlock();
98+
this->incomingPacketSize = 0;
99+
this->incomingCache.clear();
100+
return;
101+
}
102+
103+
QHash<QString, QVariant> pack = this->packetFromIncomingCache();
104+
this->processPacket(pack);
105+
}
106+
78107
void GP::processPacket(QHash<QString, QVariant> pack)
79108
{
80109
if (!pack.contains("type"))
@@ -112,7 +141,7 @@ void GP::processIncoming(QByteArray data)
112141
// this is extremely rare
113142
// the packet we received is exactly the remaining part of a block of data we are receiving
114143
this->incomingCache.append(data);
115-
this->processPacket(this->packetFromIncomingCache());
144+
this->processPacket();
116145
} else if (data.size() < remaining_packet_data)
117146
{
118147
// just append and skip
@@ -123,7 +152,7 @@ void GP::processIncoming(QByteArray data)
123152
// packet, so we need to cut the remaining part and process
124153
QByteArray remaining_part = data.mid(0, remaining_packet_data);
125154
this->incomingCache.append(remaining_part);
126-
this->processPacket(this->packetFromIncomingCache());
155+
this->processPacket();
127156
data = data.mid(remaining_packet_data);
128157
this->processIncoming(data);
129158
}
@@ -185,6 +214,15 @@ QHash<QString, QVariant> GP::packetFromIncomingCache()
185214
return data;
186215
}
187216

217+
QHash<QString, QVariant> GP::packetFromRawBytes(QByteArray packet)
218+
{
219+
QDataStream stream(&packet, QIODevice::ReadWrite);
220+
GP_INIT_DS(stream);
221+
QHash<QString, QVariant> data;
222+
stream >> data;
223+
return data;
224+
}
225+
188226
void GP::processHeader(QByteArray data)
189227
{
190228
qint64 header;
@@ -197,6 +235,19 @@ void GP::processHeader(QByteArray data)
197235
this->incomingPacketSize = header;
198236
}
199237

238+
QByteArray GP::mtPop()
239+
{
240+
QByteArray result;
241+
this->mtLock.lock();
242+
if (this->mtBuffer.size() > 0)
243+
{
244+
result = this->mtBuffer.at(0);
245+
this->mtBuffer.removeAt(0);
246+
}
247+
this->mtLock.unlock();
248+
return result;
249+
}
250+
200251
bool GP::SendPacket(QHash<QString, QVariant> packet)
201252
{
202253
if (!this->socket)

gp.h

+13-3
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ class QTcpSocket;
6868

6969
namespace libgp
7070
{
71+
class Thread;
72+
7173
//! Grumpy protocol
7274

7375
//! This class is able to handle server or client connection using
@@ -76,7 +78,7 @@ namespace libgp
7678
{
7779
Q_OBJECT
7880
public:
79-
GP(QTcpSocket *tcp_socket = 0);
81+
GP(QTcpSocket *tcp_socket = 0, bool mt = false);
8082
virtual ~GP();
8183
virtual bool IsConnected() const;
8284
virtual bool SendPacket(QHash<QString, QVariant> packet);
@@ -90,6 +92,7 @@ namespace libgp
9092
unsigned long long GetBytesRcvd();
9193
virtual int GetVersion();
9294
unsigned long MaxIncomingCacheSize;
95+
friend class libgp::Thread;
9396

9497
signals:
9598
void Event_Connected();
@@ -108,10 +111,15 @@ namespace libgp
108111

109112
protected:
110113
virtual void OnIncomingCommand(QString text, QHash<QString, QVariant> parameters);
111-
void processPacket(QHash<QString, QVariant> pack);
112-
void processIncoming(QByteArray data);
114+
virtual void processPacket();
115+
virtual void processPacket(QHash<QString, QVariant> pack);
116+
virtual void processIncoming(QByteArray data);
113117
QHash<QString, QVariant> packetFromIncomingCache();
118+
QHash<QString, QVariant> packetFromRawBytes(QByteArray packet);
114119
void processHeader(QByteArray data);
120+
QByteArray mtPop();
121+
QMutex mtLock;
122+
QList<QByteArray> mtBuffer;
115123
QMutex mutex;
116124
qint64 incomingPacketSize;
117125
QByteArray incomingCache;
@@ -125,6 +133,8 @@ namespace libgp
125133
#endif
126134
unsigned long long sentBytes;
127135
unsigned long long recvBytes;
136+
Thread *thread;
137+
bool isMultithreaded;
128138
};
129139
}
130140

gp.pro

+4-2
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ TEMPLATE = lib
1414
DEFINES += GP_LIBRARY
1515

1616
SOURCES += gp.cpp \
17-
gp_exception.cpp
17+
gp_exception.cpp \
18+
thread.cpp
1819

1920
HEADERS += gp.h\
2021
gp_global.h \
21-
gp_exception.h
22+
gp_exception.h \
23+
thread.h
2224

2325
unix {
2426
target.path = /usr/lib

thread.cpp

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
//This program is free software: you can redistribute it and/or modify
2+
//it under the terms of the GNU Lesser General Public License as published by
3+
//the Free Software Foundation, either version 3 of the License, or
4+
//(at your option) any later version.
5+
6+
//This program is distributed in the hope that it will be useful,
7+
//but WITHOUT ANY WARRANTY; without even the implied warranty of
8+
//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9+
//GNU Lesser General Public License for more details.
10+
11+
// Copyright (c) Petr Bena 2015
12+
13+
#include "thread.h"
14+
#include "gp.h"
15+
16+
using namespace libgp;
17+
18+
Thread::Thread(GP *gp)
19+
{
20+
this->owner = gp;
21+
}
22+
23+
Thread::~Thread()
24+
{
25+
26+
}
27+
28+
void Thread::run()
29+
{
30+
while (this->isRunning())
31+
{
32+
QByteArray incoming = this->owner->mtPop();
33+
if (incoming.isEmpty())
34+
{
35+
this->msleep(100);
36+
continue;
37+
}
38+
// These 2 calls are probably CPU intensive
39+
QHash<QString, QVariant> packet = this->owner->packetFromRawBytes(incoming);
40+
this->owner->processPacket(packet);
41+
}
42+
}
43+

thread.h

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
//This program is free software: you can redistribute it and/or modify
2+
//it under the terms of the GNU Lesser General Public License as published by
3+
//the Free Software Foundation, either version 3 of the License, or
4+
//(at your option) any later version.
5+
6+
//This program is distributed in the hope that it will be useful,
7+
//but WITHOUT ANY WARRANTY; without even the implied warranty of
8+
//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9+
//GNU Lesser General Public License for more details.
10+
11+
// Copyright (c) Petr Bena 2015
12+
13+
#ifndef THREAD_H
14+
#define THREAD_H
15+
16+
#include <QThread>
17+
18+
namespace libgp
19+
{
20+
class GP;
21+
22+
class Thread : public QThread
23+
{
24+
public:
25+
Thread(GP *gp);
26+
~Thread();
27+
28+
private:
29+
GP *owner;
30+
void run();
31+
};
32+
}
33+
34+
#endif // THREAD_H

0 commit comments

Comments
 (0)