PacketSession.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "Qt.h"
00024 #include "PacketSession.h"
00025 #include "PacketProcessor.h"
00026 #include "PacketReader.h"
00027 #include "PacketWriter.h"
00028 #include "Gnutella/PacketProcessing/QueryRouting/QrtReader.h"
00029 #include "Gnutella/PacketProcessing/QueryRouting/QrtWriter.h"
00030 #include "Gnutella/Packets/Packet.h"
00031 #include "Gnutella/Packets/Bye.h"
00032
00033 using Gnutella::Packets::Packet;
00034 using Gnutella::Packets::Bye;
00035 using Gnutella::PacketProcessing::PacketSession;
00036 using Gnutella::PacketProcessing::PacketSessionPrivate;
00037 using Gnutella::PacketProcessing::PacketReader;
00038 using Gnutella::PacketProcessing::PacketWriter;
00039 using Gnutella::PacketProcessing::QueryRouting::QrtReader;
00040 using Gnutella::PacketProcessing::QueryRouting::QrtWriter;
00041 using Gnutella::PacketProcessing::QueryRouting::QueryRoutingTable;
00042 using Gnutella::NodeInfo;
00043 using Protocols::Transports::Connection;
00044
00045 enum Constants
00046 {
00047 ReadTimeout = 60000
00048 };
00049
00050
00051 static const QString SoftwareIdentification = "Calitko/0.5.3";
00052
00053 class PacketSessionPrivate
00054 {
00055 REFERENCE_OBJECT (PacketSessionPrivate)
00056
00057 public:
00058 typedef QList <Packet *> PendingPackets;
00059
00060 QrtReader *qrtReader;
00061 QrtWriter *qrtWriter;
00062 PacketWriter *writer;
00063 PacketReader *reader;
00064 Connection *connection;
00065 NodeInfo nodeInfo;
00066 PendingPackets pendingPackets;
00067 QTimer readTimer;
00068 bool isPacketRead;
00069 bool isClosing;
00070
00071 PacketSessionPrivate() : qrtReader (0), qrtWriter (0), writer (0),
00072 reader (0), connection (0), nodeInfo(),
00073 pendingPackets(), readTimer(),
00074 isPacketRead (false), isClosing (false)
00075 {}
00076 };
00077
00078 NodeInfo PacketSession::nodeInfo() const
00079 { return p->nodeInfo; }
00080
00081 const QueryRoutingTable & PacketSession::queryRoutingTable() const
00082 { return p->qrtReader->currentTable(); }
00083
00084 int PacketSession::readTimeout() const
00085 { return p->readTimer.interval(); }
00086
00087 void PacketSession::setReadTimeout (int readTimeout)
00088 { p->readTimer.setInterval (readTimeout); }
00089
00090 PacketSession::PacketSession (Connection *c, const NodeInfo &ni)
00091 : p (new PacketSessionPrivate)
00092 {
00093 Q_ASSERT (c->state() == Connection::ConnectedState);
00094
00095 quint8 qrtEntryBits = 4;
00096 if (ni.type == Gnutella::TypeUltrapeer)
00097 qrtEntryBits = 1;
00098
00099 p->qrtReader = new QrtReader;
00100 p->qrtWriter = new QrtWriter (this, qrtEntryBits);
00101 p->reader = new PacketReader (c);
00102 p->writer = new PacketWriter (c);
00103 p->connection = c;
00104 p->nodeInfo = ni;
00105 p->isPacketRead = false;
00106 p->isClosing = false;
00107
00108 connect (p->connection, SIGNAL (connectionClosed()),
00109 this, SLOT (connectionClosed()));
00110 connect (p->reader, SIGNAL (packetRead()), this, SLOT (packetRead()));
00111 connect (p->writer, SIGNAL (packetWritten()), this, SLOT (packetWritten()));
00112 connect (p->qrtReader, SIGNAL (qrtRead()), this, SLOT (qrtRead()));
00113 QObject::connect (p->reader, SIGNAL (readError()),
00114 this, SLOT (readError()));
00115
00116
00117
00118
00119
00120
00121 connect (&p->readTimer, SIGNAL (timeout()), this, SLOT (readTimeout()));
00122
00123 p->readTimer.setInterval (ReadTimeout);
00124 p->readTimer.start();
00125
00126 p->reader->startReading();
00127 }
00128
00129 void PacketSession::deleteLater()
00130 {
00131 p->connection->disconnect (this);
00132 p->reader->disconnect (p->connection);
00133 p->reader->disconnect (this);
00134 p->writer->disconnect (p->connection);
00135 p->writer->disconnect (this);
00136 p->readTimer.disconnect (this);
00137 p->readTimer.stop();
00138
00139 QObject::deleteLater();
00140 }
00141
00142 PacketSession::~PacketSession()
00143 {
00144 foreach (Packet *packet, p->pendingPackets)
00145 delete packet;
00146 p->pendingPackets.clear();
00147
00148 delete p->qrtReader;
00149 delete p->qrtWriter;
00150 delete p->reader;
00151 delete p->writer;
00152 delete p->connection;
00153 delete p;
00154 }
00155
00156 void PacketSession::readTimeout()
00157 {
00158
00159 if (!p->isPacketRead)
00160 p->connection->disconnectFromNode();
00161 p->isPacketRead = false;
00162 }
00163
00164 void PacketSession::connectionClosed()
00165 {
00166 emit sessionClosed (this);
00167 }
00168
00169 void PacketSession::packetRead()
00170 {
00171 p->isPacketRead = true;
00172 bool isDropped = false;
00173 Packet *packet = p->reader->packet()->copy();
00174
00175
00176
00177 isDropped = !packet->isValid();
00178
00179
00180
00181
00182
00183 if (!isDropped)
00184 isDropped = p->qrtReader->handlePacket (*packet);
00185
00186
00187
00188 if (!isDropped)
00189 emit packetRead (*packet, this);
00190
00191 delete packet;
00192 p->reader->startReading();
00193 }
00194
00195 void PacketSession::packetWritten()
00196 {
00197 bool isDropped = true;
00198 Packet *packet = 0;
00199
00200 do {
00201 if (p->pendingPackets.isEmpty()) {
00202 packet = 0;
00203
00204 if (p->isClosing &&
00205 p->connection->state() == Connection::ConnectedState) {
00206 p->connection->disconnectFromNode();
00207 }
00208 break;
00209 } else {
00210 packet = p->pendingPackets.takeFirst();
00211
00212
00213 isDropped = false;
00214 }
00215
00216
00217
00218 if (isDropped)
00219 delete packet;
00220 } while (isDropped);
00221
00222 if (packet) {
00223 p->writer->write (*packet);
00224 delete packet;
00225 }
00226 }
00227
00228 void PacketSession::closeSession (CloseCode closeCode)
00229 {
00230 Q_ASSERT (!p->isClosing);
00231
00232 quint16 code = 0;
00233 QString message;
00234
00235 switch (closeCode)
00236 {
00237 case CloseNoBye:
00238 break;
00239 case ExittingNormally:
00240 message = "Application is terminating, closing all connections.";
00241 break;
00242 case ExplicitClose:
00243 message = "The user requested explicit close of this connection.";
00244 break;
00245 case TooBigPackets:
00246 message = "You send too big packets!";
00247 break;
00248 case TooManyDuplicates:
00249 message = "You are sending too many duplicate packets!";
00250 break;
00251 case BadQueries:
00252 message = "You are relaying bad queries!";
00253 break;
00254 case TooHighTtlHops:
00255 message = "You are relaying packets with excessive ttl + hops value!";
00256 break;
00257 case ManyUnknownPackets:
00258 message = "You are sending too many unknown packets!";
00259 break;
00260 case InactivityTimeout:
00261 message = "Your connection reached my inactivity timeout!";
00262 break;
00263 case NoTtl1PingReply:
00264 message = "You failed to reply to a ping with ttl=1!";
00265 break;
00266 case NotSharingEnough:
00267 message = "You are not sharing enough!";
00268 break;
00269 case InternalError:
00270 message = "Closing due to internal error! Sorry!";
00271 break;
00272 case ProtocolDesync:
00273 message = "Closing due to protocol desynchronization!";
00274 break;
00275 case SendQueueFull:
00276 message = "Send queue for your connection became full!";
00277 break;
00278 default:
00279 Q_ASSERT (false);
00280 }
00281
00282 foreach (Packet *packet, p->pendingPackets)
00283 delete packet;
00284 p->pendingPackets.clear();
00285
00286 if (code != 0) {
00287
00288 message += "\r\n";
00289 message += QString ("Server: %1\r\n\r\n").arg (SoftwareIdentification);
00290 Bye bye (code, message.toLatin1());
00291 sendPacket (bye);
00292 } else {
00293
00294 p->connection->disconnectFromNode();
00295 }
00296
00297 p->isClosing = true;
00298 }
00299
00300 void PacketSession::sendPacket (const Packet &packet)
00301 {
00302 if (p->isClosing)
00303 return;
00304
00305 p->pendingPackets.append (packet.copy());
00306 if (p->writer->canWrite())
00307 packetWritten();
00308 }
00309
00310 void PacketSession::qrtRead()
00311 {
00312 emit qrtRead (this);
00313 }
00314
00315 void PacketSession::sendQrt (const QueryRoutingTable &table)
00316 {
00317 p->qrtWriter->write (table);
00318 }
00319
00320 void PacketSession::readError()
00321 {
00322 if (!p->isClosing)
00323 closeSession (ProtocolDesync);
00324 }