PacketProcessor.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "Qt.h"
00024 #include "PacketProcessor.h"
00025 #include "PongCache.h"
00026 #include "PacketRouter.h"
00027
00028 #include "Gnutella/LocalPeer.h"
00029 #include "Protocols/Transports/Connection.h"
00030 #include "Gnutella/PacketProcessing/DynamicSearching/DynamicSearcher.h"
00031 #include "Gnutella/PacketProcessing/PacketSession.h"
00032 #include "Gnutella/PacketProcessing/QueryRouting/QrtExchanger.h"
00033 #include "Gnutella/Packets/Ping.h"
00034 #include "Gnutella/Packets/Pong.h"
00035 #include "Gnutella/Packets/Query.h"
00036 #include "Gnutella/Packets/QueryHits.h"
00037 #include "Gnutella/Packets/Push.h"
00038 #include "Gnutella/Packets/Bye.h"
00039 #include "Gnutella/Packets/VendorMessages/SupportedMessages.h"
00040 #include "Gnutella/Packets/VendorMessages/QueryStatusResponse.h"
00041 #include "Gnutella/Packets/VendorMessages/QueryStatusRequest.h"
00042 #include "Gnutella/NodeType.h"
00043 #include "Gnutella/Handshaking/SlotAllocator.h"
00044
00045 using namespace Gnutella::Packets;
00046 using Gnutella::Packets::VendorMessages::SupportedMessages;
00047 using Gnutella::Packets::VendorMessages::QueryStatusResponse;
00048 using Gnutella::Packets::VendorMessages::QueryStatusRequest;
00049 using Gnutella::PacketProcessing::DynamicSearching::DynamicSearcher;
00050 using Gnutella::PacketProcessing::PacketProcessor;
00051 using Gnutella::PacketProcessing::PacketSession;
00052 using Gnutella::PacketProcessing::PongCache;
00053 using Gnutella::PacketProcessing::PacketRouter;
00054 using Gnutella::PacketProcessing::QueryRouting::QrtExchanger;
00055 using Gnutella::Bootstrapping::NodeCache;
00056 using Gnutella::Handshaking::SlotAllocator;
00057 using Gnutella::NodeInfo;
00058 using Gnutella::LocalPeer;
00059 using Protocols::Transports::Connection;
00060
00061 typedef PacketRouter GuidRouter;
00062
00063
00064 class Gnutella::PacketProcessing::PacketProcessorPrivate
00065 {
00066 REFERENCE_OBJECT (PacketProcessorPrivate)
00067
00068 public:
00069 LocalPeer *localPeer;
00070 SlotAllocator *slotAllocator;
00071 PacketProcessor::Sessions sessions;
00072 PongCache *pongCache;
00073 GuidRouter *guidRouter;
00074
00075 DynamicSearcher *dynamicSearcher;
00076 QrtExchanger *qrtExchanger;
00077
00079 PacketProcessor::Sessions leafSessions() const
00080 {
00081 PacketProcessor::Sessions leafSessions;
00082 foreach (PacketSession *session, sessions)
00083 if (session->nodeInfo().type == TypeLeaf)
00084 leafSessions += session;
00085 return leafSessions;
00086 }
00087
00088 PacketProcessorPrivate() : localPeer (0), slotAllocator (0), sessions(),
00089 pongCache(0), guidRouter (0),
00090 dynamicSearcher (0), qrtExchanger (0)
00091 {}
00092 };
00093
00094 PacketProcessor::PacketProcessor (LocalPeer *lp)
00095 : d (new PacketProcessorPrivate)
00096 {
00097 d->pongCache = new PongCache (this, lp);
00098 d->guidRouter = new GuidRouter();
00099 d->dynamicSearcher = new DynamicSearcher (this);
00100 d->qrtExchanger = new QrtExchanger();
00101 d->localPeer = lp;
00102 d->slotAllocator = lp->slotAllocator();
00103 }
00104
00105 PacketProcessor::~PacketProcessor()
00106 {
00107 delete d->pongCache;
00108 delete d->guidRouter;
00109 delete d->dynamicSearcher;
00110 delete d->qrtExchanger;
00111 delete d;
00112 }
00113
00120 void PacketProcessor::addConnection (Connection *connection, NodeInfo nodeInfo)
00121 {
00122 Q_ASSERT (connection != 0);
00123
00124 PacketSession *session = new PacketSession (connection, nodeInfo);
00125
00126 QObject::connect (session, SIGNAL (packetRead (Packet &, PacketSession *)),
00127 this, SLOT (packetReceived (Packet &, PacketSession *)));
00128 QObject::connect (session, SIGNAL (sessionClosed (PacketSession *)),
00129 this, SLOT (sessionClosed (PacketSession *)));
00130 QObject::connect (session, SIGNAL (qrtRead (PacketSession *)),
00131 d->qrtExchanger, SLOT (qrtRead (PacketSession *)));
00132
00133 bool gotSlot = d->slotAllocator->allocateSlot (session);
00134 d->sessions.insert (session);
00135 d->pongCache->addSession (session);
00136 d->dynamicSearcher->addSession (session);
00137 d->qrtExchanger->addSession (session);
00138
00139 if (!gotSlot) {
00140 connection->disconnectFromNode();
00141 } else {
00142 Ping ping;
00143 ping.setTtl (1);
00144 session->sendPacket (ping);
00145
00146
00147 SupportedMessages supported;
00148 supported.addMessage (VendorCode ("BEAR"), 11, 1);
00149 supported.addMessage (VendorCode ("BEAR"), 12, 1);
00150 session->sendPacket (supported);
00151
00152
00153 }
00154 }
00155
00156 void PacketProcessor::sessionClosed (PacketSession *session)
00157 {
00158 Q_ASSERT (d->sessions.contains (session));
00159
00160 d->qrtExchanger->removeSession (session);
00161 d->dynamicSearcher->removeSession (session);
00162 d->pongCache->removeSession (session);
00163 d->sessions.remove (session);
00164 session->deleteLater();
00165 }
00166
00167
00168
00169 void PacketProcessor::packetReceived (Packet &packet,
00170 PacketSession *session)
00171 {
00172
00173 if (packet.ttl() > 0)
00174 packet.doHop();
00175
00176 if (d->guidRouter->isDuplicate (packet))
00177 return;
00178
00179
00180 switch (packet.payloadDescriptor())
00181 {
00182 case PingDescriptor:
00183 d->pongCache->sendCachedPongs (Ping::castFrom (packet), session);
00184 break;
00185 case PongDescriptor:
00186 d->pongCache->cachePong (Pong::castFrom (packet), session);
00187 break;
00188 case QueryDescriptor:
00189 d->guidRouter->addQueryPath (Query::castFrom (packet), session);
00190 if (session->nodeInfo().type == TypeLeaf) {
00191 d->dynamicSearcher->startSearch (Query::castFrom (packet));
00192
00193 return;
00194 }
00195 break;
00196 case QueryHitsDescriptor:
00197 d->guidRouter->addQueryHitsPath (QueryHits::castFrom (packet), session);
00198 d->dynamicSearcher->countHits (QueryHits::castFrom (packet));
00199 break;
00200 case PushDescriptor:
00201 break;
00202 case ByeDescriptor:
00203 qDebug() << "Bye packet saying:" << Bye::castFrom (packet).code() << Bye::castFrom (packet).message();
00204 session->closeSession (PacketSession::CloseNoBye);
00205 break;
00206 case OpenVendorDescriptor:
00207 case StandardVendorDescriptor:
00208 if (typeid (packet) == typeid (QueryStatusResponse)) {
00209 d->dynamicSearcher->countHits (QueryStatusResponse::castFrom (packet));
00210 } else if (typeid (packet) == typeid (QueryStatusRequest)) {
00211
00212
00213
00214
00215
00216 d->guidRouter->addQueryStatusRequestPath (packet, session);
00217 }
00218 break;
00219 case IbmcDescriptor:
00220 case QueryRoutingDescriptor:
00221 default:
00222 return;
00223 }
00224 notifyHandlers (packet);
00225
00226
00227 if (d->localPeer->nodeInfo().type == Gnutella::TypeLeaf)
00228 return;
00229 routePacket (packet);
00230 }
00231
00232 void PacketProcessor::notifyHandlers (Packet &packet)
00233 {
00234 switch (packet.payloadDescriptor())
00235 {
00236 case PingDescriptor:
00237
00238
00239
00240
00241
00242
00243
00244
00245 break;
00246 case PongDescriptor:
00247 emit receivedPong (Pong::castFrom (packet));
00248 break;
00249 case QueryDescriptor:
00250 emit receivedQuery (Query::castFrom (packet));
00251 break;
00252 case QueryHitsDescriptor:
00253 emit receivedQueryHits (QueryHits::castFrom (packet));
00254 break;
00255 case PushDescriptor:
00256 emit receivedPush (Push::castFrom (packet));
00257 break;
00258 case ByeDescriptor:
00259 break;
00260 case OpenVendorDescriptor:
00261 case StandardVendorDescriptor:
00262 if (typeid (packet) == typeid (QueryStatusResponse)) {
00263
00264 d->dynamicSearcher->countHits (QueryStatusResponse::castFrom (packet));
00265 } else if (typeid (packet) == typeid (QueryStatusRequest)) {
00266
00267 emit processQueryStatusRequest (QueryStatusRequest::castFrom (packet));
00268 }
00269 break;
00270 case IbmcDescriptor:
00271 case QueryRoutingDescriptor:
00272 default:
00273 Q_ASSERT (false);
00274 }
00275 }
00276
00277 void PacketProcessor::routePacket (const Packet &packet)
00278 {
00279
00280
00281 Sessions sessions;
00282 switch (packet.payloadDescriptor())
00283 {
00284 case PingDescriptor:
00285 sessions = d->pongCache->pingRoutePaths (Ping::castFrom (packet));
00286 break;
00287 case PongDescriptor:
00288
00289
00290 break;
00291 case QueryDescriptor:
00292
00293 if (packet.ttl() == 0)
00294 sessions = d->leafSessions();
00295 else
00296 sessions = d->sessions;
00297
00298 break;
00299 case QueryHitsDescriptor:
00300 sessions = d->guidRouter->queryHitsRoutePaths
00301 (QueryHits::castFrom (packet));
00302 break;
00303 case PushDescriptor:
00304 sessions = d->guidRouter->pushRoutePaths (Push::castFrom (packet));
00305 break;
00306 case ByeDescriptor:
00307 break;
00308 case OpenVendorDescriptor:
00309 case StandardVendorDescriptor:
00310
00311 if (packet.hops() == 0 && packet.ttl() == 1) {
00312 if (typeid (packet) == typeid (QueryStatusRequest)){
00313
00314 sessions = d->guidRouter->queryHitsRoutePaths (packet);
00315 } else if (typeid (packet) == typeid (QueryStatusResponse)) {
00316
00317
00318 sessions = d->guidRouter->queryStatusResponsePaths (packet);
00319 }
00320 }
00321 break;
00322 case IbmcDescriptor:
00323 case QueryRoutingDescriptor:
00324 default:
00325 Q_ASSERT (false);
00326 }
00327
00328 foreach (PacketSession *routePath, sessions) {
00329
00330
00331 if (d->sessions.contains (routePath))
00332 routePath->sendPacket (packet);
00333 }
00334 }
00335
00336 void PacketProcessor::sendPacket (const Packet &packet)
00337 {
00338
00339 Q_ASSERT (packet.hops() == 0);
00340
00341
00342 switch (packet.payloadDescriptor())
00343 {
00344 case PongDescriptor:
00345 d->pongCache->cachePong (Pong::castFrom (packet), 0);
00346 break;
00347 case QueryDescriptor:
00348 if (d->localPeer->nodeInfo().type == TypeLeaf)
00349 routePacket (packet);
00350 else
00351 d->dynamicSearcher->startSearch (Query::castFrom (packet));
00352 break;
00353 case OpenVendorDescriptor:
00354 if (typeid (packet) == typeid (QueryStatusResponse)) {
00355
00356
00357 d->dynamicSearcher->countHits (QueryStatusResponse::castFrom (packet));
00358 } else if (typeid (packet) == typeid (QueryStatusRequest)) {
00359
00360
00361 emit processQueryStatusRequest (QueryStatusRequest::castFrom (packet));
00362 }
00363 routePacket (packet);
00364 break;
00365 case PingDescriptor:
00366 case QueryHitsDescriptor:
00367 case PushDescriptor:
00368 case ByeDescriptor:
00369 case IbmcDescriptor:
00370 case QueryRoutingDescriptor:
00371 case StandardVendorDescriptor:
00372 default:
00373 routePacket (packet);
00374 break;
00375 }
00376 }
00377