PacketSession.cpp

Go to the documentation of this file.
00001 /*
00002 
00003 Copyright (C) 2005-2007 by Peter Dimov.
00004 
00005 This file is part of Calitko (http://www.calitko.org).
00006 
00007 Calitko is free software; you can redistribute it and/or modify
00008 it under the terms of the GNU General Public License as published by
00009 the Free Software Foundation; either version 2 of the License, or
00010 (at your option) any later version.
00011 
00012 Calitko is distributed in the hope that it will be useful,
00013 but WITHOUT ANY WARRANTY; without even the implied warranty of
00014 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015 GNU General Public License for more details.
00016 
00017 You should have received a copy of the GNU General Public License
00018 along with Calitko; if not, write to the Free Software
00019 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00020 
00021 */
00022 
00023 #include "Qt.h"
00024 #include "PacketSession.h"
00025 #include "PacketProcessor.h"
00026 #include "PacketReader.h"
00027 #include "PacketWriter.h"
00028 #include "Gnutella/PacketProcessing/QueryRouting/QrtReader.h"
00029 #include "Gnutella/PacketProcessing/QueryRouting/QrtWriter.h"
00030 #include "Gnutella/Packets/Packet.h"
00031 #include "Gnutella/Packets/Bye.h"
00032 
00033 using Gnutella::Packets::Packet;
00034 using Gnutella::Packets::Bye;
00035 using Gnutella::PacketProcessing::PacketSession;
00036 using Gnutella::PacketProcessing::PacketSessionPrivate;
00037 using Gnutella::PacketProcessing::PacketReader;
00038 using Gnutella::PacketProcessing::PacketWriter;
00039 using Gnutella::PacketProcessing::QueryRouting::QrtReader;
00040 using Gnutella::PacketProcessing::QueryRouting::QrtWriter;
00041 using Gnutella::PacketProcessing::QueryRouting::QueryRoutingTable;
00042 using Gnutella::NodeInfo;
00043 using Protocols::Transports::Connection;
00044 
00045 enum Constants
00046 {
00047     ReadTimeout = 60000
00048 };
00049 
00050 // \todo take it from some global configuration object:
00051 static const QString SoftwareIdentification = "Calitko/0.5.3";
00052 
00053 class PacketSessionPrivate
00054 {
00055     REFERENCE_OBJECT (PacketSessionPrivate)
00056 
00057 public:
00058     typedef QList <Packet *> PendingPackets;
00059 
00060     QrtReader           *qrtReader;
00061     QrtWriter           *qrtWriter;
00062     PacketWriter        *writer;
00063     PacketReader        *reader;
00064     Connection          *connection;
00065     NodeInfo            nodeInfo;
00066     PendingPackets      pendingPackets;
00067     QTimer              readTimer;
00068     bool                isPacketRead;
00069     bool                isClosing;
00070 
00071     PacketSessionPrivate() :    qrtReader (0), qrtWriter (0), writer (0),
00072                                 reader (0), connection (0), nodeInfo(),
00073                                 pendingPackets(), readTimer(),
00074                                 isPacketRead (false), isClosing (false)
00075     {}
00076 };
00077 
00078 NodeInfo PacketSession::nodeInfo() const
00079 { return p->nodeInfo; }
00080 
00081 const QueryRoutingTable & PacketSession::queryRoutingTable() const
00082 { return p->qrtReader->currentTable(); }
00083 
00084 int PacketSession::readTimeout() const
00085 { return p->readTimer.interval(); }
00086 
00087 void PacketSession::setReadTimeout (int readTimeout)
00088 { p->readTimer.setInterval (readTimeout); }
00089 
00090 PacketSession::PacketSession (Connection *c, const NodeInfo &ni)
00091  :  p (new PacketSessionPrivate)
00092 {
00093     Q_ASSERT (c->state() == Connection::ConnectedState);
00094 
00095     quint8 qrtEntryBits = 4;
00096     if (ni.type == Gnutella::TypeUltrapeer)
00097         qrtEntryBits = 1;
00098 
00099     p->qrtReader    = new QrtReader;
00100     p->qrtWriter    = new QrtWriter (this, qrtEntryBits);
00101     p->reader       = new PacketReader (c);
00102     p->writer       = new PacketWriter (c);
00103     p->connection   = c;
00104     p->nodeInfo     = ni;
00105     p->isPacketRead = false;
00106     p->isClosing    = false;
00107 
00108     connect (p->connection, SIGNAL (connectionClosed()),
00109              this, SLOT (connectionClosed()));
00110     connect (p->reader, SIGNAL (packetRead()), this, SLOT (packetRead()));
00111     connect (p->writer, SIGNAL (packetWritten()), this, SLOT (packetWritten()));
00112     connect (p->qrtReader, SIGNAL (qrtRead()), this, SLOT (qrtRead()));
00113     QObject::connect (p->reader,                SIGNAL  (readError()),
00114                       this,                     SLOT    (readError()));
00115     //QObject::connect (p->writer,  SIGNAL  (writeError()),
00116     //                this,                     SLOT    (writeError()));
00117     //QObject::connect (p->qrtReader,
00118     //                SIGNAL (qrtReadError (QrtReadError)),
00119     //                this,
00120     //                SLOT (qrtReadError (QrtReadError)));
00121     connect (&p->readTimer, SIGNAL (timeout()), this, SLOT (readTimeout()));
00122 
00123     p->readTimer.setInterval (ReadTimeout);
00124     p->readTimer.start();
00125 
00126     p->reader->startReading(); // Enable packet reading.
00127 }
00128 
00129 void PacketSession::deleteLater()
00130 {
00131     p->connection->disconnect (this);
00132     p->reader->disconnect (p->connection);
00133     p->reader->disconnect (this);
00134     p->writer->disconnect (p->connection);
00135     p->writer->disconnect (this);
00136     p->readTimer.disconnect (this);
00137     p->readTimer.stop();
00138 
00139     QObject::deleteLater();
00140 }
00141 
00142 PacketSession::~PacketSession()
00143 {
00144     foreach (Packet *packet, p->pendingPackets)
00145         delete packet;
00146     p->pendingPackets.clear();
00147 
00148     delete p->qrtReader;
00149     delete p->qrtWriter;
00150     delete p->reader;
00151     delete p->writer;
00152     delete p->connection;
00153     delete p;
00154 }
00155 
00156 void PacketSession::readTimeout()
00157 {
00158     // \todo Set some error, send Bye packet?
00159     if (!p->isPacketRead)
00160         p->connection->disconnectFromNode();
00161     p->isPacketRead = false;
00162 }
00163 
00164 void PacketSession::connectionClosed()
00165 {
00166     emit sessionClosed (this);
00167 }
00168 
00169 void PacketSession::packetRead()
00170 {
00171     p->isPacketRead = true;
00172     bool isDropped  = false;
00173     Packet *packet  = p->reader->packet()->copy();
00174 
00175     // \todo Validate packet (ttl, hops, size)
00176     //isDropped = validatePacket (packet);
00177     isDropped = !packet->isValid();
00178 
00179     // \todo Keep packet traffic stats.
00180     //p->inStats->countPacket (packet, isDropped);
00181 
00182     // Special handling for QRP/QRT packets:
00183     if (!isDropped)
00184         isDropped = p->qrtReader->handlePacket (*packet);
00185 
00186     // \todo What about ttl=1 pings? Should we answer them hiere directly and
00187     // not forward them?
00188     if (!isDropped)
00189         emit packetRead (*packet, this);
00190 
00191     delete packet;
00192     p->reader->startReading();
00193 }
00194 
00195 void PacketSession::packetWritten()
00196 {
00197     bool isDropped = true;
00198     Packet *packet = 0;
00199 
00200     do {
00201         if (p->pendingPackets.isEmpty()) {
00202             packet = 0;
00203             // We delayed actual disconnecting until all packets were written:
00204             if (p->isClosing &&
00205                 p->connection->state() == Connection::ConnectedState) {
00206                 p->connection->disconnectFromNode();
00207             }
00208             break;
00209         } else {
00210             packet = p->pendingPackets.takeFirst();
00211             /*  \todo When flow control is implemented, some  packets may be
00212                 dropped. Until then: */
00213             isDropped = false;
00214         }
00215         // \todo Keep packet traffic stats.
00216         //p->outStats->countPacket (packet, isDropped);
00217 
00218         if (isDropped)
00219             delete packet;
00220     } while (isDropped);
00221 
00222     if (packet) {
00223         p->writer->write (*packet);
00224         delete packet;
00225     }
00226 }
00227 
00228 void PacketSession::closeSession (CloseCode closeCode)
00229 {
00230     Q_ASSERT (!p->isClosing);
00231 
00232     quint16 code = 0;
00233     QString message;
00234 
00235     switch (closeCode)
00236     {
00237     case CloseNoBye:
00238         break;
00239     case ExittingNormally:
00240         message = "Application is terminating, closing all connections.";
00241         break;
00242     case ExplicitClose:
00243         message = "The user requested explicit close of this connection.";
00244         break;
00245     case TooBigPackets:
00246         message = "You send too big packets!";
00247         break;
00248     case TooManyDuplicates:
00249         message = "You are sending too many duplicate packets!";
00250         break;
00251     case BadQueries:
00252         message = "You are relaying bad queries!";
00253         break;
00254     case TooHighTtlHops:
00255         message = "You are relaying packets with excessive ttl + hops value!";
00256         break;
00257     case ManyUnknownPackets:
00258         message = "You are sending too many unknown packets!";
00259         break;
00260     case InactivityTimeout:
00261         message = "Your connection reached my inactivity timeout!";
00262         break;
00263     case NoTtl1PingReply:
00264         message = "You failed to reply to a ping with ttl=1!";
00265         break;
00266     case NotSharingEnough:
00267         message = "You are not sharing enough!";
00268         break;
00269     case InternalError:
00270         message = "Closing due to internal error! Sorry!";
00271         break;
00272     case ProtocolDesync:
00273         message = "Closing due to protocol desynchronization!";
00274         break;
00275     case SendQueueFull:
00276         message = "Send queue for your connection became full!";
00277         break;
00278     default:
00279         Q_ASSERT (false); // Bad CloseCode
00280     }
00281     // Remove all pending packets from send queue:
00282     foreach (Packet *packet, p->pendingPackets)
00283         delete packet;
00284     p->pendingPackets.clear();
00285 
00286     if (code != 0) {
00287         // Send a bye packet with HTTP style message:
00288         message += "\r\n";
00289         message += QString ("Server: %1\r\n\r\n").arg (SoftwareIdentification);
00290         Bye bye (code, message.toLatin1());
00291         sendPacket (bye);
00292     } else {
00293         // Directly close the connection:
00294         p->connection->disconnectFromNode();
00295     }
00296     // Disable sending any more packets, yet allow bye to be sent.
00297     p->isClosing = true;
00298 }
00299 
00300 void PacketSession::sendPacket (const Packet &packet)
00301 {
00302     if (p->isClosing)
00303         return;
00304     // \todo Add flow control => use a kind of priority queue for output packets
00305     p->pendingPackets.append (packet.copy());
00306     if (p->writer->canWrite())
00307         packetWritten();
00308 }
00309 
00310 void PacketSession::qrtRead()
00311 {
00312     emit qrtRead (this);
00313 }
00314 
00315 void PacketSession::sendQrt (const QueryRoutingTable &table)
00316 {
00317     p->qrtWriter->write (table);
00318 }
00319 
00320 void PacketSession::readError()
00321 {
00322     if (!p->isClosing)
00323         closeSession (ProtocolDesync);
00324 }