PacketProcessor.cpp

Go to the documentation of this file.
00001 /*
00002 
00003 Copyright (C) 2005-2007 by Peter Dimov.
00004 
00005 This file is part of Calitko (http://www.calitko.org).
00006 
00007 Calitko is free software; you can redistribute it and/or modify
00008 it under the terms of the GNU General Public License as published by
00009 the Free Software Foundation; either version 2 of the License, or
00010 (at your option) any later version.
00011 
00012 Calitko is distributed in the hope that it will be useful,
00013 but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 GNU General Public License for more details.
00016 
00017 You should have received a copy of the GNU General Public License
00018 along with Calitko; if not, write to the Free Software
00019 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00020 
00021 */
00022 
00023 #include "Qt.h"
00024 #include "PacketProcessor.h"
00025 #include "PongCache.h"
00026 #include "PacketRouter.h"
00027 //#include "QueryRouter.h"
00028 #include "Gnutella/LocalPeer.h"
00029 #include "Protocols/Transports/Connection.h"
00030 #include "Gnutella/PacketProcessing/DynamicSearching/DynamicSearcher.h"
00031 #include "Gnutella/PacketProcessing/PacketSession.h"
00032 #include "Gnutella/PacketProcessing/QueryRouting/QrtExchanger.h"
00033 #include "Gnutella/Packets/Ping.h"
00034 #include "Gnutella/Packets/Pong.h"
00035 #include "Gnutella/Packets/Query.h"
00036 #include "Gnutella/Packets/QueryHits.h"
00037 #include "Gnutella/Packets/Push.h"
00038 #include "Gnutella/Packets/Bye.h"
00039 #include "Gnutella/Packets/VendorMessages/SupportedMessages.h"
00040 #include "Gnutella/Packets/VendorMessages/QueryStatusResponse.h"
00041 #include "Gnutella/Packets/VendorMessages/QueryStatusRequest.h"
00042 #include "Gnutella/NodeType.h"
00043 #include "Gnutella/Handshaking/SlotAllocator.h"
00044 
00045 using namespace Gnutella::Packets;
00046 using Gnutella::Packets::VendorMessages::SupportedMessages;
00047 using Gnutella::Packets::VendorMessages::QueryStatusResponse;
00048 using Gnutella::Packets::VendorMessages::QueryStatusRequest;
00049 using Gnutella::PacketProcessing::DynamicSearching::DynamicSearcher;
00050 using Gnutella::PacketProcessing::PacketProcessor;
00051 using Gnutella::PacketProcessing::PacketSession;
00052 using Gnutella::PacketProcessing::PongCache;
00053 using Gnutella::PacketProcessing::PacketRouter;
00054 using Gnutella::PacketProcessing::QueryRouting::QrtExchanger;
00055 using Gnutella::Bootstrapping::NodeCache;
00056 using Gnutella::Handshaking::SlotAllocator;
00057 using Gnutella::NodeInfo;
00058 using Gnutella::LocalPeer;
00059 using Protocols::Transports::Connection;
00060 
00061 typedef PacketRouter GuidRouter;
00062 //using Gnutella::PacketProcessing::QueryRouter;
00063 
00064 class Gnutella::PacketProcessing::PacketProcessorPrivate
00065 {
00066     REFERENCE_OBJECT (PacketProcessorPrivate)
00067 
00068 public:
00069     LocalPeer           *localPeer;
00070     SlotAllocator       *slotAllocator;
00071     PacketProcessor::Sessions       sessions;
00072     PongCache           *pongCache;
00073     GuidRouter          *guidRouter;
00074     //QueryRouter       queryRouter;
00075     DynamicSearcher     *dynamicSearcher;
00076     QrtExchanger        *qrtExchanger;
00077 
00079     PacketProcessor::Sessions leafSessions() const
00080     {
00081         PacketProcessor::Sessions leafSessions;
00082         foreach (PacketSession *session, sessions)
00083             if (session->nodeInfo().type == TypeLeaf)
00084                 leafSessions += session;
00085         return leafSessions;
00086     }
00087 
00088     PacketProcessorPrivate() : localPeer (0), slotAllocator (0), sessions(),
00089                                 pongCache(0), guidRouter (0),
00090                                 dynamicSearcher (0), qrtExchanger (0)
00091     {}
00092 };
00093 
00094 PacketProcessor::PacketProcessor (LocalPeer *lp)
00095  :  d (new PacketProcessorPrivate)
00096 {
00097     d->pongCache = new PongCache (this, lp);
00098     d->guidRouter = new GuidRouter();
00099     d->dynamicSearcher = new DynamicSearcher (this);
00100     d->qrtExchanger = new QrtExchanger();
00101     d->localPeer = lp;
00102     d->slotAllocator = lp->slotAllocator();
00103 }
00104 
00105 PacketProcessor::~PacketProcessor()
00106 {
00107     delete d->pongCache;
00108     delete d->guidRouter;
00109     delete d->dynamicSearcher;
00110     delete d->qrtExchanger;
00111     delete d;
00112 }
00113 
00120 void PacketProcessor::addConnection (Connection *connection, NodeInfo nodeInfo)
00121 {
00122     Q_ASSERT (connection != 0);
00123 
00124     PacketSession *session = new PacketSession (connection, nodeInfo);
00125 
00126     QObject::connect (session, SIGNAL (packetRead (Packet &, PacketSession *)),
00127                       this, SLOT (packetReceived (Packet &, PacketSession *)));
00128     QObject::connect (session, SIGNAL (sessionClosed (PacketSession *)),
00129                       this, SLOT (sessionClosed (PacketSession *)));
00130     QObject::connect (session, SIGNAL (qrtRead (PacketSession *)),
00131                       d->qrtExchanger, SLOT (qrtRead (PacketSession *)));
00132 
00133     bool gotSlot = d->slotAllocator->allocateSlot (session);
00134     d->sessions.insert (session);
00135     d->pongCache->addSession (session);
00136     d->dynamicSearcher->addSession (session);
00137     d->qrtExchanger->addSession (session);
00138     // \todo Is this the right was to drop the connection?
00139     if (!gotSlot) {
00140         connection->disconnectFromNode();
00141     } else {
00142         Ping ping;
00143         ping.setTtl (1);
00144         session->sendPacket (ping);
00145         // \todo Maybe have the VendorMessageFactory return the packet
00146         // it will know anyway about all types of messages it supports.
00147         SupportedMessages supported;
00148         supported.addMessage (VendorCode ("BEAR"), 11, 1); // QueryStatusRequest
00149         supported.addMessage (VendorCode ("BEAR"), 12, 1); // QueryStatusResponse
00150         session->sendPacket (supported);
00151         // \todo We should read the MessageseSupported message sent from the
00152         // remote peer and only send out vendor messages it they are supported.
00153     }
00154 }
00155 
00156 void PacketProcessor::sessionClosed (PacketSession *session)
00157 {
00158     Q_ASSERT (d->sessions.contains (session));
00159 
00160     d->qrtExchanger->removeSession (session);
00161     d->dynamicSearcher->removeSession (session);
00162     d->pongCache->removeSession (session);
00163     d->sessions.remove (session);
00164     session->deleteLater();
00165 }
00166 
00167 /*
00168 */
00169 void PacketProcessor::packetReceived (Packet &packet,
00170                                       PacketSession *session)
00171 {
00172     // (Cached) Pong packets may have ttl of 0 => no doHop().
00173     if (packet.ttl() > 0)
00174         packet.doHop();
00175 
00176     if (d->guidRouter->isDuplicate (packet))
00177         return;
00178 
00179     // Store route paths.
00180     switch (packet.payloadDescriptor())
00181     {
00182     case PingDescriptor:
00183         d->pongCache->sendCachedPongs (Ping::castFrom (packet), session);
00184         break;
00185     case PongDescriptor:
00186         d->pongCache->cachePong (Pong::castFrom (packet), session);
00187         break;
00188     case QueryDescriptor:
00189         d->guidRouter->addQueryPath (Query::castFrom (packet), session);
00190         if (session->nodeInfo().type == TypeLeaf) {
00191             d->dynamicSearcher->startSearch (Query::castFrom (packet));
00192             // \todo We need to query our own leaves just the normal way!
00193             return; // \todo notifyHandlers() is skipped and we only want to skip routePacket()
00194         }
00195         break;
00196     case QueryHitsDescriptor:
00197         d->guidRouter->addQueryHitsPath (QueryHits::castFrom (packet), session);
00198         d->dynamicSearcher->countHits (QueryHits::castFrom (packet));
00199         break;
00200     case PushDescriptor:
00201         break;
00202     case ByeDescriptor:
00203 qDebug() << "Bye packet saying:" << Bye::castFrom (packet).code() << Bye::castFrom (packet).message();
00204         session->closeSession (PacketSession::CloseNoBye);
00205         break;
00206     case OpenVendorDescriptor:
00207     case StandardVendorDescriptor:
00208         if (typeid (packet) == typeid (QueryStatusResponse)) {
00209             d->dynamicSearcher->countHits (QueryStatusResponse::castFrom (packet));
00210         } else if (typeid (packet) == typeid (QueryStatusRequest)) {
00211             // Since we used the same descriptorId for queries sent over
00212             // different connections we would not be able to determine to
00213             // exactly which node to route the QueryStatusResponse back.
00214             // That would not be a problem though because such unsolicited
00215             // packet will be handled gracefully according the specs.
00216             d->guidRouter->addQueryStatusRequestPath (packet, session);
00217         }
00218         break;
00219     case IbmcDescriptor:
00220     case QueryRoutingDescriptor:
00221     default:
00222         return;
00223     }
00224     notifyHandlers (packet);
00225 
00226     // If we are a leaf, do not forward packets.
00227     if (d->localPeer->nodeInfo().type == Gnutella::TypeLeaf)
00228         return;
00229     routePacket (packet);
00230 }
00231 
00232 void PacketProcessor::notifyHandlers (Packet &packet)
00233 {
00234     switch (packet.payloadDescriptor())
00235     {
00236     case PingDescriptor:
00237         // With pong cacheing, the handler does not need to respond to each
00238         // ping. It also lacks the knowledge where exactly pongs came from, so
00239         // it is impossible to guarantee pongs are not routed back from where
00240         // they came. The PongCache does all this.
00241         // \todo Actually, the handler never needs to get or answer a ping.
00242         // It just handles the pongs.
00243         //if (p.pongCache->canNotifyHandler())
00244         //  emit receivedPing (Ping::castFrom (packet));
00245         break;
00246     case PongDescriptor:
00247         emit receivedPong (Pong::castFrom (packet));
00248         break;
00249     case QueryDescriptor:
00250         emit receivedQuery (Query::castFrom (packet));
00251         break;
00252     case QueryHitsDescriptor:
00253         emit receivedQueryHits (QueryHits::castFrom (packet));
00254         break;
00255     case PushDescriptor:
00256         emit receivedPush (Push::castFrom (packet));
00257         break;
00258     case ByeDescriptor:
00259         break; // \todo Do we really need the assert below?
00260     case OpenVendorDescriptor:
00261     case StandardVendorDescriptor:
00262         if (typeid (packet) == typeid (QueryStatusResponse)) {
00263             // A leaf might be sinding us a QueryStatusResponse.
00264             d->dynamicSearcher->countHits (QueryStatusResponse::castFrom (packet));
00265         } else if (typeid (packet) == typeid (QueryStatusRequest)) {
00266             // An ultrapeer might be sending us a QueryStatusRequest.
00267             emit processQueryStatusRequest (QueryStatusRequest::castFrom (packet));
00268         }
00269         break;
00270     case IbmcDescriptor:
00271     case QueryRoutingDescriptor:
00272     default:
00273         Q_ASSERT (false);
00274     }
00275 }
00276 
00277 void PacketProcessor::routePacket (const Packet &packet)
00278 {
00279     // \todo Query packets with ttl of 1 (0 after doHop()) should be forwarded
00280     // to leaves only. Should be in QueryRouter.
00281     Sessions sessions;
00282     switch (packet.payloadDescriptor())
00283     {
00284     case PingDescriptor:
00285         sessions = d->pongCache->pingRoutePaths (Ping::castFrom (packet));
00286         break;
00287     case PongDescriptor:
00288         // The PongCache responds immediately to a ping, no need to forward
00289         // the pongs, they were already cached.
00290         break;
00291     case QueryDescriptor:
00292         //sessions = p.queryRouter->getQueryPath (Query::castFrom (packet));
00293         if (packet.ttl() == 0)
00294             sessions = d->leafSessions();
00295         else
00296             sessions = d->sessions;
00297 
00298         break;
00299     case QueryHitsDescriptor:
00300         sessions = d->guidRouter->queryHitsRoutePaths
00301                                     (QueryHits::castFrom (packet));
00302         break;
00303     case PushDescriptor:
00304         sessions = d->guidRouter->pushRoutePaths (Push::castFrom (packet));
00305         break;
00306     case ByeDescriptor:
00307         break; // \todo rethink the assert below!
00308     case OpenVendorDescriptor:
00309     case StandardVendorDescriptor:
00310         // Only route the messages originating from us:
00311         if (packet.hops() == 0 && packet.ttl() == 1) {
00312             if (typeid (packet) == typeid (QueryStatusRequest)){
00313                 // Send back to leaf, i.e. where the Query came from:
00314                 sessions = d->guidRouter->queryHitsRoutePaths (packet);
00315             } else if (typeid (packet) == typeid (QueryStatusResponse)) {
00316                 // Send forward to ultrapeers, i.e. where the query was sent to:
00317                 //Q_ASSERT (d->localPeer->nodeInfo().type == TypeLeaf);
00318                 sessions = d->guidRouter->queryStatusResponsePaths (packet);
00319             }
00320         }
00321         break;
00322     case IbmcDescriptor:
00323     case QueryRoutingDescriptor:
00324     default:
00325         Q_ASSERT (false);
00326     }
00327 
00328     foreach (PacketSession *routePath, sessions) {
00329         // The routePath may have been closed, so check that:
00330         // \todo Should we make the routers take care for that?
00331         if (d->sessions.contains (routePath))
00332             routePath->sendPacket (packet);
00333     }
00334 }
00335 
00336 void PacketProcessor::sendPacket (const Packet &packet)
00337 {
00338     // Only packets our node sends should be sent using this function.
00339     Q_ASSERT (packet.hops() == 0);
00340 
00341     // The pongs returned from the HostCache need to be cached. not routed.
00342     switch (packet.payloadDescriptor())
00343     {
00344         case PongDescriptor:
00345             d->pongCache->cachePong (Pong::castFrom (packet), 0);
00346             break;
00347         case QueryDescriptor:
00348             if (d->localPeer->nodeInfo().type == TypeLeaf)
00349                 routePacket (packet);
00350             else
00351                 d->dynamicSearcher->startSearch (Query::castFrom (packet));
00352             break;
00353         case OpenVendorDescriptor:
00354             if (typeid (packet) == typeid (QueryStatusResponse)) {
00355                 // We send QueryStatusResponse only to the DynamicSearcher or
00356                 // over the network to our ultrapeers!
00357                 d->dynamicSearcher->countHits (QueryStatusResponse::castFrom (packet));
00358             } else if (typeid (packet) == typeid (QueryStatusRequest)) {
00359                 // The DynmaciSearcher may send a QueryStatusRequest to a leaf
00360                 // originating a query.
00361                 emit processQueryStatusRequest (QueryStatusRequest::castFrom (packet)); // \todo don't like that!
00362             }
00363             routePacket (packet);
00364             break;
00365         case PingDescriptor:
00366         case QueryHitsDescriptor:
00367         case PushDescriptor:
00368         case ByeDescriptor:
00369         case IbmcDescriptor:
00370         case QueryRoutingDescriptor:
00371         case StandardVendorDescriptor:
00372         default:
00373             routePacket (packet);
00374             break;
00375     }
00376 }
00377