PongCache.cpp

Go to the documentation of this file.
00001 /*
00002 
00003 Copyright (C) 2005-2007 by Peter Dimov.
00004 
00005 This file is part of Calitko (http://www.calitko.org).
00006 
00007 Calitko is free software; you can redistribute it and/or modify
00008 it under the terms of the GNU General Public License as published by
00009 the Free Software Foundation; either version 2 of the License, or
00010 (at your option) any later version.
00011 
00012 Calitko is distributed in the hope that it will be useful,
00013 but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 GNU General Public License for more details.
00016 
00017 You should have received a copy of the GNU General Public License
00018 along with Calitko; if not, write to the Free Software
00019 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00020 
00021 */
00022 
00023 #include "Qt.h"
00024 #include "PongCache.h"
00025 #include "PacketProcessor.h"
00026 #include "Gnutella/LocalPeer.h"
00027 #include "Gnutella/Packets/Extensions/Ggeps/DailyUptime.h"
00028 #include "Gnutella/Packets/Extensions/Ggeps/Ultrapeer.h"
00029 #include "Gnutella/Packets/Extensions/Ggeps/VendorCode.h"
00030 #include "Gnutella/Packets/Extensions/Ggeps/UdpHostCache.h"
00031 #include "Gnutella/Packets/Extensions/Ggeps/PackedHostCaches.h"
00032 #include "Gnutella/Packets/Extensions/Ggeps/SupportsCachedPongs.h"
00033 #include "Gnutella/Handshaking/SlotAllocator.h"
00034 #include "Utils/Version.h"
00035 #include <algorithm>
00036 
00037 using Gnutella::PacketProcessing::PongCache;
00038 using Gnutella::PacketProcessing::PacketProcessor;
00039 using Gnutella::PacketProcessing::PacketSession;
00040 using Gnutella::Packets::Packet;
00041 using Gnutella::Packets::Ping;
00042 using Gnutella::Packets::Pong;
00043 using Gnutella::Packets::PingDescriptor;
00044 using Gnutella::Packets::PongDescriptor;
00045 using Gnutella::Bootstrapping::NodeCache;
00046 using Gnutella::LocalPeer;
00047 
00048 PongCache::SessionData::SessionData()
00049  :  directPong(), neighborPong(),
00050     lastInPingTime (QTime::currentTime().addMSecs (-MinimalPingInterval)),
00051     lastOutPingTime (QTime::currentTime().addMSecs (-MinimalPingInterval))
00052 {
00053 }
00054 
00055 PongCache::PongCache(PacketProcessor *pp, LocalPeer *lp)
00056  :  localPeer (lp), packetProcessor (pp), slotAllocator (lp->slotAllocator()),
00057     directPing(), directPong(), hostPong(), pingTimer(), sessions()
00058 {
00059     // \todo Now we have the class PingHnadler, which creates a response Pong!
00060     // use it instead this local implementation. Connect the pingReceived signal
00061     // with processPing. Let only one ping every say 10 sec flow to the handler,
00062     // cache the resulting pongs and answer with them!
00063     hostPong.setTtl (3);
00064     hostPong.setSharedKilobytes (1 << 21); // Square of two means we are an ultrapeer.
00065     hostPong.setSharedFiles (1<<10);
00066 
00067     using Gnutella::Packets::Extensions::GgepBlock;
00068     using Gnutella::Packets::Extensions::Ggeps::VendorCode;
00069     using Gnutella::Packets::Extensions::Ggeps::Ultrapeer;
00070     using Gnutella::Packets::Extensions::Ggeps::DailyUptime;
00071     using Utils::Version;
00072 
00073     GgepBlock ggepBlock;
00074     Ultrapeer ultrapeer (true, 0, 0); // Don't advertise free slots now (do so after 0.6.0)
00075 
00076     Packets::VendorCode myVendorCode ("LIME");
00077     Version myVersion (4, 6);
00078     VendorCode vendorCode (myVendorCode, myVersion);
00079 
00080     DailyUptime dailyUptime (34345);
00081 
00082     ggepBlock.addExtension (ultrapeer);
00083     ggepBlock.addExtension (vendorCode);
00084     ggepBlock.addExtension (dailyUptime);
00085     hostPong.setGgepBlock (ggepBlock);
00086 
00087     directPing.setTtl (1);
00088     directPing.setHops (0);
00089 
00090     // \todo Just to test the UHC, remove later:
00091     /*{
00092     using Gnutella::Packets::Extensions::Ggeps::SupportsCachedPongs;
00093     using Gnutella::Packets::Extensions::GgepBlock;
00094     SupportsCachedPongs scp (SupportsCachedPongs::UltrapeerPreference);
00095 
00096     GgepBlock ggepBlock;
00097     ggepBlock.addExtension (scp);
00098     directPing.setGgepBlock (ggepBlock);
00099     }*/
00100     // \todo remove the above
00101 
00102     directPong.setTtl (1);
00103     directPong.setHops (0);
00104 
00105     //hostHasFreeSlots = true;
00106 
00107     connect (&pingTimer, SIGNAL (timeout()), this, SLOT (refreshCache()));
00108     pingTimer.setInterval (PongExpirationTime);
00109     pingTimer.start();
00110 }
00111 
00115 PongCache::~PongCache()
00116 {
00117 }
00118 
00119 void PongCache::addSession (PacketSession *session)
00120 {
00121     Q_ASSERT (!sessions.contains (session));
00122     SessionData *sessionData = new SessionData();
00123     sessions.insert (session, sessionData);
00124 }
00125 
00126 void PongCache::removeSession (PacketSession *session)
00127 {
00128     Q_ASSERT (sessions.contains (session));
00129     SessionData *sessionData = sessions.take (session);
00130     delete sessionData;
00131 }
00132 
00136 void PongCache::sendCachedPongs (const Ping &ping, PacketSession *session)
00137 {
00138     // Direct Ping
00139     if (ping.ttl() == 1 && ping.hops() == 0) {
00140         directPong.setDescriptorId (ping.descriptorId());
00141         directPong.setIpAddress (localPeer->nodeInfo().address.hostAddress());
00142         session->sendPacket (directPong);
00143     } else if (ping.ttl() == 2 && ping.hops() == 0) { // Browse Ping
00144         foreach (PacketSession *otherSession, sessions.keys()) {
00145             if (session == otherSession)
00146                 continue;
00147             SessionData *sessionData = sessions [otherSession];
00148             sessionData->neighborPong.setDescriptorId (ping.descriptorId());
00149             otherSession->sendPacket (sessionData->neighborPong);
00150         }
00151     } else {
00152         SessionData *data = sessions [session];
00153         // Ignore ping if sent less than 1 sec after the previous one.
00154         if (data->lastInPingTime.elapsed() >= MinimalPingInterval) {
00155             data->lastInPingTime.start();
00156             for (int i = 0; i < MaximalPongHops; i++) {
00157                 // Forward the first PongsPerHops number of pongs, which did not
00158                 // come over \a session.
00159                 int pongsSent = 0;
00161                 // to the back. Stop iteration after count() times.
00162                 foreach (PongInfo *pongInfo, cachedPongForwardQueue[i]) {
00163                     if (pongInfo->second != session) {
00164                         pongInfo->first.setDescriptorId (ping.descriptorId());
00165                         session->sendPacket (pongInfo->first);
00166                         if (++pongsSent == PongsPerHops)
00167                             break;
00168                     }
00169                 }
00170                 // Now move the forwarded pongs to the back of the queue.
00171                 if (pongsSent > 0) {
00172                     foreach (PongInfo *pongInfo, cachedPongForwardQueue[i]) {
00173                         if (pongInfo->second != session) {
00174                             cachedPongForwardQueue[i].removeAll (pongInfo);
00175                             cachedPongForwardQueue[i].enqueue (pongInfo);
00176                             if (--pongsSent == 0)
00177                                 break;
00178                         }
00179                     }
00180                 }
00181             }
00182             // \todo Send also pongs for ultrapeer hosts whose connections were rejected
00183             // during handshaking. These hosts DO have free slots. Send a few for them.
00184 
00185             // \todo Use localPeer->nodeInfo to check the slots and create the
00186             // pong with all ggep extensions.
00187             if (slotAllocator->hasFreePeerSlots() || slotAllocator->hasFreeLeafSlots()) {
00188                 hostPong.setDescriptorId (ping.descriptorId());
00189                 hostPong.setIpAddress (localPeer->nodeInfo().address.hostAddress());
00190                 hostPong.setPort (localPeer->nodeInfo().address.hostPort());
00191                 session->sendPacket (hostPong);
00192             }
00193         }
00194     }
00195 }
00196 
00197 PongCache::Paths PongCache::pingRoutePaths (const Ping &)
00198 {
00199     Paths paths;
00200     foreach (PacketSession *session, sessions.keys()) {
00201         SessionData *data = this->sessions [session];
00202         // Ignore ping if sent less than 1 sec after the previous one.
00203         if (data->lastOutPingTime.elapsed() >= MinimalPingInterval) {
00204             data->lastOutPingTime.start();
00205             paths += session;
00206         }
00207     }
00208     return paths;
00209 }
00210 
00215 void PongCache::cachePong (const Pong &pong, PacketSession *session)
00216 {
00217     if (pong.ttl() == 1 && pong.hops() == 0) {
00218         SessionData *data = sessions [session];
00219         data->directPong = pong;
00220         data->neighborPong = pong;
00221         data->neighborPong.setHops (1);
00222     } else {
00223         if (pong.hops() < MaximalPongHops) {
00224             PongInfo *pongInfo = new PongInfo (pong, session);
00225             uchar hops = pongInfo->first.hops();
00226             pongForwardQueue[hops].append (pongInfo);
00227             pongInfo->first.doHop();
00228             if (pongInfo->first.ttl() > Gnutella::Packets::MaximalTtl)
00229                 pongInfo->first.setTtl (0);
00230         }
00231     }
00232     // \todo Verify that pongs with hops 0 have the same address as session->remoteNodeAddress()
00233 }
00234 
00237 void PongCache::refreshCache()
00238 {
00239     for (int i = 0; i < MaximalPongHops; i++) {
00240         // Make sure we did get new pongs before we erase the old cached ones.
00241         if (pongForwardQueue[i].size() > 0) {
00242             foreach (PongInfo *pongInfo, cachedPongForwardQueue[i])
00243                 delete pongInfo;
00244             cachedPongForwardQueue[i] = pongForwardQueue[i];
00245             std::random_shuffle (cachedPongForwardQueue[i].begin(), cachedPongForwardQueue[i].end());
00246             pongForwardQueue[i] = PongForwardQueue();
00247         }
00248     }
00249 
00250     // Send a PingPacket over all connections.
00251     Ping ping;
00252 
00253     using Gnutella::Packets::Extensions::Ggeps::SupportsCachedPongs;
00254     using Gnutella::Packets::Extensions::GgepBlock;
00255     SupportsCachedPongs scp (SupportsCachedPongs::UltrapeerPreference);
00256 
00257     GgepBlock ggepBlock;
00258     ggepBlock.addExtension (scp);
00259     ping.setGgepBlock (ggepBlock);
00260 
00261     /*  Make the PacketProcessor try to route the ping, causing our
00262         pingRoutePaths() getting called. Thus, the ping is forwarded only over
00263         sessions that were not recently pinged. */
00264     packetProcessor->sendPacket (ping);
00265 }