DynamicSearch.cpp

Go to the documentation of this file.
00001 /*
00002 
00003 Copyright (C) 2005-2007 by Peter Dimov.
00004 
00005 This file is part of Calitko (http://www.calitko.org).
00006 
00007 Calitko is free software; you can redistribute it and/or modify
00008 it under the terms of the GNU General Public License as published by
00009 the Free Software Foundation; either version 2 of the License, or
00010 (at your option) any later version.
00011 
00012 Calitko is distributed in the hope that it will be useful,
00013 but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 GNU General Public License for more details.
00016 
00017 You should have received a copy of the GNU General Public License
00018 along with Calitko; if not, write to the Free Software
00019 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00020 
00021 */
00022 
00023 #include "Qt.h"
00024 #include "DynamicSearch.h"
00025 #include "DynamicSearcher.h"
00026 #include "Imports.cpp"
00027 
00028 using namespace Gnutella::PacketProcessing::DynamicSearching;
00029 
00030 // Open containing namespaces:
00031 namespace Gnutella {
00032 namespace PacketProcessing {
00033 namespace DynamicSearching {
00034 
00035 enum Constants
00036 {
00037     Degree                  = 32,
00038     TimerHopInterval        = 2400,
00039     ProbesCount             = 3,
00040     ProbeTtl                = 2,
00041     MaxHosts                = 300000,
00042     SearchConnections       = 1,
00043     DesiredResults          = 150,
00044     MaxTtl                  = 3,
00045     StatusResponseTimeout   = 10000
00046 };
00047 
00048 enum State
00049 {
00050     StateStopped,
00051     StateSearching,
00052     StateWaitingStatusResponse
00053 };
00054 
00055 class DynamicSearchPrivate {
00056     REFERENCE_OBJECT (DynamicSearchPrivate)
00057 
00058 public:
00059     typedef QList <PacketSession *> Sessions;
00060 
00061     quint32         hostsQueried;
00062     quint32         receivedResults;
00063     Query           query;
00064     Sessions        remainingSessions;
00065     QTimer          timer;
00066     State           state;
00067 
00068     DynamicSearchPrivate() :    hostsQueried (0), receivedResults (0),
00069                                 query(), remainingSessions(), timer(),
00070                                 state(StateStopped)
00071     {}
00072 };
00073 
00074 } // namespace DynamicSearching;
00075 } // namespace PacketProcessing;
00076 } // namespace Gnutella;
00077 
00078 DynamicSearch::DynamicSearch (const Query &query, Sessions sessions)
00079  :  p (new DynamicSearchPrivate)
00080 {
00081     p->query                = query;
00082     p->remainingSessions    = sessions;
00083 
00084     QObject::connect (&p->timer, SIGNAL (timeout()), this, SLOT (timeout()));
00085     p->timer.setSingleShot (true);
00086 }
00087 
00088 DynamicSearch::~DynamicSearch()
00089 {
00090     delete p;
00091 }
00092 
00093 void DynamicSearch::startSearch()
00094 {
00095     doProbeQuery();
00096     p->state = StateSearching;
00097 }
00098 
00099 void DynamicSearch::stopSearch()
00100 {
00101     p->timer.stop();
00102     p->state = StateStopped;
00103 }
00104 
00105 bool DynamicSearch::hasEnoughHits()
00106 {
00107     return (p->receivedResults >= DesiredResults)
00108             || (p->hostsQueried >= MaxHosts)
00109             || (p->remainingSessions.isEmpty());
00110 }
00111 
00113 
00120 void DynamicSearch::doProbeQuery()
00121 {
00122     p->query.setTtl (ProbeTtl);
00123 
00124     qDebug() << p->remainingSessions.count();
00125     p->hostsQueried = ProbesCount * hostsPerConnection (ProbeTtl);
00126 
00127     quint16 sessionCount = std::min<quint16> (p->remainingSessions.count(), ProbesCount);
00128     // Don't drop the probe sessions, we may try them at the end again:
00129     sendPacket (sessionCount, false);
00130     p->timer.start(ProbeTtl * TimerHopInterval + sessionCount * 1000);
00131     p->state = StateSearching;
00132 }
00133 
00135 
00142 void DynamicSearch::doQuery()
00143 {
00144     quint16 ttl;
00145     quint16 remainingConns  = p->remainingSessions.count();
00146     quint16 connections     = std::min <quint16> (remainingConns, SearchConnections);
00147 
00148     if (p->receivedResults != 0) {
00149         float   averageHostResults = static_cast <float> (p->receivedResults)
00150                                      / static_cast <float> (p->hostsQueried);
00151         quint32 hostsToQuery = (DesiredResults - p->receivedResults)
00152                                 / static_cast <int> (averageHostResults);
00153         qDebug() << remainingConns;
00154         ttl = computeTtl (hostsToQuery, remainingConns);
00155     } else {
00156         ttl = MaxTtl;
00157     }
00158     p->hostsQueried += connections * hostsPerConnection (ttl);
00159     p->query.setTtl (ttl);
00160 
00161     sendPacket (connections);
00162     p->timer.start (ttl * TimerHopInterval);
00163     p->state = StateSearching;
00164 }
00165 
00167 
00173 void DynamicSearch::sendStatusRequest()
00174 {
00175     QueryStatusRequest request;
00176     request.setDescriptorId (p->query.descriptorId());
00177 
00178     p->state = StateWaitingStatusResponse;
00179     p->timer.start (StatusResponseTimeout);
00180     emit statusRequest (request);
00181 }
00182 
00184 void DynamicSearch::timeout()
00185 {
00186     switch (p->state)
00187     {
00188     case StateSearching:
00189         if (hasEnoughHits()) {
00190             // If the leaf sends unsolicated QueryStatusResponse messages or
00191             // we don't have leaf guidance, and we already got enough hits:
00192             stopSearch();
00193         } else if (p->query.minimumSpeed() && 0x9000) {
00194             // Are bits 15 and 12 of MinSpeed were set, so leaf-guidance:
00195             sendStatusRequest();
00196         } else {
00197             doQuery();
00198         }
00199         break;
00200     case StateWaitingStatusResponse:
00201         stopSearch();
00202         break;
00203     case StateStopped:
00204     default:
00205         Q_ASSERT (false);
00206     }
00207 }
00208 
00210 void DynamicSearch::countHits (const QueryHits & queryHits)
00211 {
00212     // Only count hits if not doing leaf-guidance.
00213     if (!(p->query.minimumSpeed() && 0x9000))
00214         p->receivedResults += queryHits.numberOfHits();
00215 }
00216 
00218 
00223 void DynamicSearch::countHits (const QueryStatusResponse &response)
00224 {
00225     if (response.hitsCount() == 0xFFFF)
00226         stopSearch();
00227     else
00228         p->receivedResults = response.hitsCount();
00229 
00230     if (hasEnoughHits()) {
00231         stopSearch();
00232         p->state = StateStopped;
00233     } else {
00234         if (p->state == StateWaitingStatusResponse) {
00235             p->state = StateSearching;
00236             doQuery();
00237         }
00238     }
00239 }
00240 
00242 void DynamicSearch::addSession (PacketSession *session)
00243 {
00244     // \todo if searching stopped due to insufficient PacketSessions, resume it!
00245     p->remainingSessions.append (session);
00246 }
00247 
00249 void DynamicSearch::removeSession (PacketSession *session)
00250 {
00251     p->remainingSessions.removeAll (session);
00252 }
00253 
00255 quint32 DynamicSearch::hostsPerConnection(quint16 ttl)
00256 {
00257     quint32 hosts = 0;
00258     quint32 temp = 1;
00259 
00260     for (int i=0; i < ttl; i++) {
00261         hosts += temp;
00262         temp  *= Degree - 1;
00263     }
00264 
00265     return hosts;
00266 }
00267 
00269 qint16 DynamicSearch::computeTtl(quint32 hostsToQuery, quint16 numSessions)
00270 {
00271     int     ttl = 0;
00272     quint32 hosts;
00273     quint32 hostsPerConn = 1;
00274 
00275     do {
00276         ttl++;
00277         hostsPerConn = hostsPerConnection (ttl);
00278         hosts = numSessions * hostsPerConn;
00279     } while (hosts < hostsToQuery);
00280 
00281     return std::min <quint16> (ttl, MaxTtl);
00282 }
00283 
00285 
00293 void DynamicSearch::sendPacket (quint16 sessionsCount, bool dropSessions)
00294 {
00295     for (quint16 i = 0; i < sessionsCount; i++) {
00296         PacketSession *session = p->remainingSessions.takeFirst();
00297         session->sendPacket (p->query);
00298         if (!dropSessions)
00299             p->remainingSessions.append (session);
00300     }
00301 }