net: Create CConnman to encapsulate p2p connections

This commit is contained in:
Cory Fields
2016-04-16 14:47:18 -04:00
parent d93b14dc5d
commit cd16f48028
6 changed files with 112 additions and 54 deletions

View File

@@ -65,13 +65,6 @@
namespace {
const int MAX_OUTBOUND_CONNECTIONS = 8;
const int MAX_FEELER_CONNECTIONS = 1;
struct ListenSocket {
SOCKET socket;
bool whitelisted;
ListenSocket(SOCKET _socket, bool _whitelisted) : socket(_socket), whitelisted(_whitelisted) {}
};
}
const static std::string NET_MESSAGE_COMMAND_OTHER = "*other*";
@@ -1015,7 +1008,7 @@ static bool AttemptToEvictConnection() {
return false;
}
static void AcceptConnection(const ListenSocket& hListenSocket) {
void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
struct sockaddr_storage sockaddr;
socklen_t len = sizeof(sockaddr);
SOCKET hSocket = accept(hListenSocket.socket, (struct sockaddr*)&sockaddr, &len);
@@ -1089,7 +1082,7 @@ static void AcceptConnection(const ListenSocket& hListenSocket) {
}
}
void ThreadSocketHandler()
void CConnman::ThreadSocketHandler()
{
unsigned int nPrevNodeCount = 0;
while (true)
@@ -1497,7 +1490,7 @@ static std::string GetDNSHost(const CDNSSeedData& data, ServiceFlags* requiredSe
}
void ThreadDNSAddressSeed()
void CConnman::ThreadDNSAddressSeed()
{
// goal: only query DNS seeds if address need is acute
if ((addrman.size() > 0) &&
@@ -1577,7 +1570,7 @@ void DumpData()
DumpBanlist();
}
void static ProcessOneShot()
void CConnman::ProcessOneShot()
{
std::string strDest;
{
@@ -1595,7 +1588,7 @@ void static ProcessOneShot()
}
}
void ThreadOpenConnections()
void CConnman::ThreadOpenConnections()
{
// Connect to specific addresses
if (mapArgs.count("-connect") && mapMultiArgs["-connect"].size() > 0)
@@ -1791,7 +1784,7 @@ std::vector<AddedNodeInfo> GetAddedNodeInfo()
return ret;
}
void ThreadOpenAddedConnections()
void CConnman::ThreadOpenAddedConnections()
{
{
LOCK(cs_vAddedNodes);
@@ -1848,7 +1841,7 @@ bool OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSem
}
void ThreadMessageHandler()
void CConnman::ThreadMessageHandler()
{
boost::mutex condition_mutex;
boost::unique_lock<boost::mutex> lock(condition_mutex);
@@ -2064,7 +2057,11 @@ void static Discover(boost::thread_group& threadGroup)
#endif
}
void StartNode(boost::thread_group& threadGroup, CScheduler& scheduler)
CConnman::CConnman()
{
}
bool StartNode(CConnman& connman, boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError)
{
uiInterface.InitMessage(_("Loading addresses..."));
// Load addresses from peers.dat
@@ -2102,6 +2099,17 @@ void StartNode(boost::thread_group& threadGroup, CScheduler& scheduler)
fAddressesInitialized = true;
Discover(threadGroup);
bool ret = connman.Start(threadGroup, strNodeError);
// Dump network addresses
scheduler.scheduleEvery(DumpData, DUMP_ADDRESSES_INTERVAL);
return ret;
}
bool CConnman::Start(boost::thread_group& threadGroup, std::string& strNodeError)
{
if (semOutbound == NULL) {
// initialize semaphore
int nMaxOutbound = std::min((MAX_OUTBOUND_CONNECTIONS + MAX_FEELER_CONNECTIONS), nMaxConnections);
@@ -2114,8 +2122,6 @@ void StartNode(boost::thread_group& threadGroup, CScheduler& scheduler)
pnodeLocalHost = new CNode(INVALID_SOCKET, CAddress(CService(local, 0), nLocalServices));
}
Discover(threadGroup);
//
// Start threads
//
@@ -2123,34 +2129,30 @@ void StartNode(boost::thread_group& threadGroup, CScheduler& scheduler)
if (!GetBoolArg("-dnsseed", true))
LogPrintf("DNS seeding disabled\n");
else
threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "dnsseed", &ThreadDNSAddressSeed));
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "dnsseed", boost::function<void()>(boost::bind(&CConnman::ThreadDNSAddressSeed, this))));
// Map ports with UPnP
MapPort(GetBoolArg("-upnp", DEFAULT_UPNP));
// Send and receive from sockets, accept connections
threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "net", &ThreadSocketHandler));
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "net", boost::function<void()>(boost::bind(&CConnman::ThreadSocketHandler, this))));
// Initiate outbound connections from -addnode
threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "addcon", &ThreadOpenAddedConnections));
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "addcon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenAddedConnections, this))));
// Initiate outbound connections
threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "opencon", &ThreadOpenConnections));
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "opencon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenConnections, this))));
// Process messages
threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "msghand", &ThreadMessageHandler));
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "msghand", boost::function<void()>(boost::bind(&CConnman::ThreadMessageHandler, this))));
// Dump network addresses
scheduler.scheduleEvery(&DumpData, DUMP_ADDRESSES_INTERVAL);
return true;
}
bool StopNode()
bool StopNode(CConnman& connman)
{
LogPrintf("StopNode()\n");
MapPort(false);
if (semOutbound)
for (int i=0; i<(MAX_OUTBOUND_CONNECTIONS + MAX_FEELER_CONNECTIONS); i++)
semOutbound->post();
if (fAddressesInitialized)
{
@@ -2158,6 +2160,7 @@ bool StopNode()
fAddressesInitialized = false;
}
connman.Stop();
return true;
}
@@ -2168,28 +2171,6 @@ public:
~CNetCleanup()
{
// Close sockets
BOOST_FOREACH(CNode* pnode, vNodes)
if (pnode->hSocket != INVALID_SOCKET)
CloseSocket(pnode->hSocket);
BOOST_FOREACH(ListenSocket& hListenSocket, vhListenSocket)
if (hListenSocket.socket != INVALID_SOCKET)
if (!CloseSocket(hListenSocket.socket))
LogPrintf("CloseSocket(hListenSocket) failed with error %s\n", NetworkErrorString(WSAGetLastError()));
// clean up some globals (to help leak detection)
BOOST_FOREACH(CNode *pnode, vNodes)
delete pnode;
BOOST_FOREACH(CNode *pnode, vNodesDisconnected)
delete pnode;
vNodes.clear();
vNodesDisconnected.clear();
vhListenSocket.clear();
delete semOutbound;
semOutbound = NULL;
delete pnodeLocalHost;
pnodeLocalHost = NULL;
#ifdef WIN32
// Shutdown Windows Sockets
WSACleanup();
@@ -2198,6 +2179,38 @@ public:
}
instance_of_cnetcleanup;
void CConnman::Stop()
{
if (semOutbound)
for (int i=0; i<(MAX_OUTBOUND_CONNECTIONS + MAX_FEELER_CONNECTIONS); i++)
semOutbound->post();
// Close sockets
BOOST_FOREACH(CNode* pnode, vNodes)
if (pnode->hSocket != INVALID_SOCKET)
CloseSocket(pnode->hSocket);
BOOST_FOREACH(ListenSocket& hListenSocket, vhListenSocket)
if (hListenSocket.socket != INVALID_SOCKET)
if (!CloseSocket(hListenSocket.socket))
LogPrintf("CloseSocket(hListenSocket) failed with error %s\n", NetworkErrorString(WSAGetLastError()));
// clean up some globals (to help leak detection)
BOOST_FOREACH(CNode *pnode, vNodes)
delete pnode;
BOOST_FOREACH(CNode *pnode, vNodesDisconnected)
delete pnode;
vNodes.clear();
vNodesDisconnected.clear();
vhListenSocket.clear();
delete semOutbound;
semOutbound = NULL;
delete pnodeLocalHost;
pnodeLocalHost = NULL;
}
CConnman::~CConnman()
{
}
void RelayTransaction(const CTransaction& tx)
{