...

понедельник, 27 июля 2015 г.

Многопоточный SOCKS 4 сервер на Qt

Время от времени на форумах рунета в ветке Qt появляются вопросы связанные с программированием сетевых приложение. Одна из проблем которая терзает этих людей, это подход к организации сервера. Обычно перечисляют три подхода:
  • однопоточный асинхронный;
  • многопоточный, создавать по потоку на соединение;
  • многопоточный, с пулом потоков на QThreadPool и QRunnable.

Когда говоришь про фиксированное количество рабочих потоков со своим циклом обработки событий, то просят привести пример. Дальше будет приведен пример сервера с пулом потоков, каждый из которых имеет свой цикл обработки событий.
Действующие лица:
  • класс Server, приминающий соединения и раздающий задачи рабочим;
  • класс Worker, экземпляры которого будут создавать в рабочих потоках экземпляры класса Client;
  • класс Client инкапсулирующий запросы клиента и реализующий SOCKS 4

Самый простой из этой троицы Worker, он наследуется от QObject и реализует всего одну функцию создания клиента и чисто для «правильного» употребления потоков:
class Worker: public QObject
{
Q_OBJECT

public:

    Q_INVOKABLE void addClient(qintptr socketDescriptor);
};

void Worker::addClient(qintptr socketDescriptor)
{
    new Client(socketDescriptor, this);
}


Сервер так же прост:
class Server: public QTcpServer
{
Q_OBJECT

public:

    Server(size_t threads = 4, QObject * parent = nullptr);
    ~Server();

protected:

    virtual void incomingConnection(qintptr socketDescriptor);

private:

    void initThreads();

private:

    size_t m_threadCount;

    QVector<QThread*> m_threads;
    QVector<Worker*> m_workers;
    size_t m_rrcounter;
};

Server::Server(size_t threads, QObject * parent) :
        QTcpServer(parent),
        m_threadCount(threads),
        m_rrcounter(0)
{
    initThreads();
}

Server::~Server()
{
    for(QThread* thread: m_threads)
    {
        thread->quit();
        thread->wait();
    }
}

void Server::initThreads()
{
    for (size_t i = 0; i < m_threadCount; ++i)
    {
        QThread* thread = new QThread(this);

        Worker* worker = new Worker();
        worker->moveToThread(thread);
        connect(thread, &QThread::finished,
                worker, &QObject::deleteLater);

        m_threads.push_back(thread);
        m_workers.push_back(worker);

        thread->start();
    }
}

void Server::incomingConnection(qintptr socketDescriptor)
{
    Worker* worker = m_workers[m_rrcounter % m_threadCount];
    ++m_rrcounter;

    QMetaObject::invokeMethod(worker, "addClient",
            Qt::QueuedConnection,
            Q_ARG(qintptr, socketDescriptor));
}


Он в конструкторе создает потоки, рабочих и перемещает рабочих в потоки. Каждое новое соединение он передает рабочему. Рабочего он выбирает «почестноку», т. е. по Round-robin.

SOCKS 4 очень простой протокол, нужно лишь:

  1. прочитать IP-адрес, номер порта;
  2. установить соединение с «миром»;
  3. отправить клиенту сообщение, что запрос подтвержден;
  4. пересылать данные из одного сокета в другой, пока кто-нибудь не закроет соединение.
class Client: public QObject
{
Q_OBJECT

public:

    Client(qintptr socketDescriptor, QObject* parent = 0);

public slots:

    void onRequest();

    void client2world();
    void world2client();

    void sendSocksAnsver();

    void onClientDisconnected();
    void onWorldDisconnected();

private:

    void done();

private:

    QTcpSocket m_client;
    QTcpSocket m_world;
};

namespace
{
#pragma pack(push, 1)
    struct socks4request
    {
        uint8_t version;
        uint8_t command;
        uint16_t port;
        uint32_t address;
        uint8_t end;
    };

    struct socks4ansver
    {
        uint8_t empty = 0;
        uint8_t status;
        uint16_t field1 = 0;
        uint32_t field2 = 0;
    };
#pragma pack(pop)

    enum SocksStatus
    {
        Granted = 0x5a,
        Failed = 0x5b,
        Failed_no_identd = 0x5c,
        Failed_bad_user_id = 0x5d
    };
}
Client::Client(qintptr socketDescriptor, QObject* parent) :
        QObject(parent)
{
    m_client.setSocketDescriptor(socketDescriptor);

    connect(&m_client, &QTcpSocket::readyRead,
            this, &Client::onRequest);

    connect(&m_client,&QTcpSocket::disconnected,
            this, &Client::onClientDisconnected);

    connect(&m_world, &QTcpSocket::connected,
            this, &Client::sendSocksAnsver);

    connect(&m_world, &QTcpSocket::readyRead,
            this, &Client::world2client);

    connect(&m_world,&QTcpSocket::disconnected,
            this, &Client::onWorldDisconnected);
}

void Client::onRequest()
{
    QByteArray request = m_client.readAll();

    socks4request* header = reinterpret_cast<socks4request*>(request.data());

#if Q_BYTE_ORDER == Q_LITTLE_ENDIAN
    const QHostAddress address(qFromBigEndian(header->address));
#else
    const QHostAddress address(header->address);
#endif

#if Q_BYTE_ORDER == Q_LITTLE_ENDIAN
    const uint16_t port = qFromBigEndian(header->port);
#else
    const uint16_t port = header->port;
#endif
    //qDebug()<<"connection:"<<address<<"port:"<<port;

    m_world.connectToHost(address, port);

    disconnect(&m_client, &QTcpSocket::readyRead, this,
            &Client::onRequest);

    connect(&m_client, &QTcpSocket::readyRead, this,
            &Client::client2world);
}

void Client::sendSocksAnsver()
{
    socks4ansver ans;
    ans.status = Granted;
    m_client.write(reinterpret_cast<char*>(&ans), sizeof(ans));
    m_client.flush();
}

void Client::client2world()
{
    m_world.write(m_client.readAll());
}

void Client::world2client()
{
    m_client.write(m_world.readAll());
}

void Client::onClientDisconnected()
{
    m_world.flush();

    done();
}

void Client::onWorldDisconnected()
{
    m_client.flush();

    done();
}

void Client::done()
{
    m_client.close();
    m_world.close();

    deleteLater();
}


Epoll и Qt

Крик подобен грому:
— Дайте людям рому
Нужно по любому
Людям выпить рому!

Если мы скомпилим предыдущий код и прогоним его через strace -f, то увидим вызовы poll. Обязательно найдется кто-то, кто скажет своё веское «фи», мол с epoll будет «ну ваще ракета».

В Qt есть класс QAbstractEventDispatcher, позволяющий определять свой диспетчер событий. Естественно нашлись добрые люди которые сделали и выложили диспетчеры с разными бекенд. Вот небольшой их список:


При использовании своего диспетчера в main.cpp прописываем
QCoreApplication::setEventDispatcher(new QEventDispatcherEpoll);
QCoreApplication app(argc, argv)


а метод initThreads у сервера становится таким:
void Server::initThreads()
{
    for (size_t i = 0; i < m_threadCount; ++i)
    {
        QThread* thread = new QThread(this);
        thread->setEventDispatcher(new QEventDispatcherEpoll);
        Worker* worker = new Worker();
        worker->moveToThread(thread);
        connect(thread, &QThread::finished,
                worker, &QObject::deleteLater);

        m_threads.push_back(thread);
        m_workers.push_back(worker);

        thread->start();
    }
}


И если мы снова запустим strace, то увидим заветные вызовы функций с префиксом epoll_.
Выводы

Выводы сугубо прагматические.

Если вы прикладной программист и у вас нет задач из разряда «больших» данных или highload по Бунину, то пишите на чем хотите и как можете. Задача прикладного программиста выдать продукт определенного качества, затратив определенное количество ресурсов. В противном случае одними лишь сокетами с epoll не обойдешься.

P.S.

Исходные коды доступны на GitHub.

This entry passed through the Full-Text RSS service - if this is your content and you're reading it on someone else's site, please read the FAQ at http://ift.tt/jcXqJW.

Комментариев нет:

Отправить комментарий