cpp11chat/chat_networking.hpp

183 lines
6.8 KiB
C++
Raw Normal View History

#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <functional>
#include <boost/asio.hpp>
#include <algorithm>
#include "chat_messages.hpp"
#include "check_policy.hpp"
namespace chat
{
using namespace chat;
using namespace boost;
using boost::asio::ip::tcp;
/* receive_policy and send_policy must provide the static methods
* that decide what happens with the incoming / outgoing messages.
* you can read about the exact requirements in check_policy.hpp */
template <class receive_policy, class send_policy>
class client_network_manager
{
/* compile-time checks for whether policies are valid or not */
static_assert(is_valid_receive_policy<receive_policy>::value,
"Receive policy does not supply neccessary methods!");
static_assert(is_valid_send_policy<send_policy>::value,
"Send policy does not supply neccessary methods!");
/* members */
private:
asio::io_service& _ios;
tcp::socket _socket;
asio::streambuf _isb;//<-------. input stream
asio::streambuf _osb;//<---. | output stream
2016-12-03 12:22:51 +00:00
std::istream _is;//--------|---'
std::ostream _os;//--------'
std::string _login;
std::vector<std::function<void(chat_message)>> _subscriptions;
/* interface */
public:
client_network_manager(asio::io_service& ioservice,
tcp::resolver::iterator epit,
std::string login)
: _ios(ioservice), _socket(_ios), _isb(), _osb(), _is(&_isb), _os(&_osb), _login(login)
{ connect(epit); }
void send(chat_message message)
{
if (!send_policy::check_msg_length(message))
return;
_ios.post([this, message]
{
asio::async_write(_socket, asio::buffer(message.get()),
[](boost::system::error_code, size_t)
{});
});
}
void close_connection()
{
_ios.post([this]
{
_os << chat_message(message::BYE);
asio::write(_socket, _osb);
// TODO: decide what to do with last server message
_socket.close();
_ios.stop();
});
}
void subscribe(std::function<void(chat_message)> callback)
{ _subscriptions.push_back(std::move(callback)); }
/* internal methods */
private:
/* throws if something went wrong on the network */
static void throw_if_error(const boost::system::error_code& ec)
{
if (ec)
throw std::runtime_error("Networking error: " + ec.message());
}
/* asynchronously connect to the host epit points to */
void connect(tcp::resolver::iterator epit)
{
asio::async_connect
(
_socket, epit,
std::bind(&client_network_manager::handshake,
this, std::placeholders::_1,
std::placeholders::_2)
);
}
/* handles the handshake as defined in the common protocol */
void handshake(boost::system::error_code ec, tcp::resolver::iterator)
{
throw_if_error(ec);
_os << chat_message(message::HELLO);
_os << chat_message(message::NEPTUN, _login);
std::reverse(_login.begin(), _login.end()); // passw is neptun reversed
_os << chat_message(message::PASSW, _login);
asio::write(_socket, _osb); // handshake is handled synchronously
std::string data;
for (int i = 0; i < 4; ++i) // hello + 3 responses for messages above
data += receive_message_sync() += '\n';
receive_policy::handshake_do_what(chat_message(message::SERVER_DIRECTION, data));
receive(); // then flow goes to receive-loop
}
/* helper method used by handshake() */
std::string receive_message_sync()
{
boost::system::error_code ec;
std::string data;
asio::read_until(_socket, _isb, byte(message::TERM), ec);
std::getline(_is, data, byte(message::TERM));
throw_if_error(ec);
return data.substr(1);
}
/* asynchronously receive a message, and decide what to do with it */
void receive()
{
asio::async_read_until
(_socket, _isb, byte(message::TERM),
[this](boost::system::error_code ec, size_t)
{
throw_if_error(ec);
std::string data;
std::getline(_is, data, byte(message::TERM));
char header = data[0]; // get command byte
auto content = data.substr(1, data.size()-1); // minus command & term bytes
chat_message msg(static_cast<message>(header), content);
switch (header) // decide what to do
{
case byte(message::PING):
send(chat_message(message::PONG));
break;
case byte(message::SERVER_DIRECTION):
receive_policy::serverdirection_do_what
(chat_message(message::SERVER_DIRECTION, content));
break;
case byte(message::MESSAGE):
receive_policy::message_do_what
(chat_message(message::MESSAGE, content));
break;
case byte(message::LOGIN):
receive_policy::login_do_what
(chat_message(message::LOGIN, content));
break;
case byte(message::LOGOUT):
receive_policy::logout_do_what
(chat_message(message::LOGOUT, content));
break;
}
for (auto& fun : _subscriptions) // notify subscribers
fun(msg);
receive(); // receive the next message
});
}
};
}