allow listen socket reuse plus log client IP

This commit is contained in:
Jethro Grassie 2020-09-02 18:11:27 -04:00
parent 9184050df2
commit 70f865798f
No known key found for this signature in database
GPG Key ID: DE8ED755616565BB
2 changed files with 76 additions and 25 deletions

View File

@ -211,6 +211,8 @@ typedef struct job_t
typedef struct client_t
{
int fd;
char host[MAX_HOST];
uint16_t port;
int json_id;
struct bufferevent *bev;
char address[ADDRESS_MAX];
@ -1271,7 +1273,8 @@ send_validation_error(const client_t *client, const char *message)
char body[ERROR_BODY_MAX] = {0};
stratum_get_error_body(body, client->json_id, message);
evbuffer_add(output, body, strlen(body));
log_debug("Validation error: %s", message);
log_debug("[%s:%d] Validation error: %s",
client->host, client->port, message);
}
static void
@ -2491,12 +2494,14 @@ trusted_on_account_connect(client_t *client)
evbuffer_remove(input, &count, sizeof(uint32_t));
pool_stats.connected_accounts += count;
client->downstream_accounts += count;
log_trace("Downstream account connected. "
"Accounts: %d, Pool hashrate: %"PRIu64,
pool_stats.connected_accounts, pool_stats.pool_hashrate);
log_trace("Downstream account connected");
trusted_send_stats(client);
if (upstream_event)
upstream_send_account_connect(count);
log_trace("Pool accounts: %d, workers: %d, hashrate: %"PRIu64,
pool_stats.connected_accounts,
gbag_used(bag_clients),
pool_stats.pool_hashrate);
}
static void
@ -2505,12 +2510,14 @@ trusted_on_account_disconnect(client_t *client)
pool_stats.connected_accounts--;
if (client->downstream_accounts)
client->downstream_accounts--;
log_trace("Downstream account disconnected. "
"Miner count: %d, Pool hashrate: %"PRIu64,
pool_stats.connected_accounts, pool_stats.pool_hashrate);
log_trace("Downstream account disconnected");
trusted_send_stats(client);
if (upstream_event)
upstream_send_account_disconnect();
log_trace("Pool accounts: %d, workers: %d, hashrate: %"PRIu64,
pool_stats.connected_accounts,
gbag_used(bag_clients),
pool_stats.pool_hashrate);
}
static void
@ -2753,8 +2760,9 @@ timer_on_10m(int fd, short kind, void *ctx)
evtimer_add(timer_10m, &timeout);
}
static void
client_add(int fd, struct bufferevent *bev, bool downstream)
static const client_t *
client_add(int fd, struct sockaddr_storage *ss,
struct bufferevent *bev, bool downstream)
{
client_t *c;
bool resize = gbag_used(bag_clients) == gbag_max(bag_clients);
@ -2774,10 +2782,23 @@ client_add(int fd, struct bufferevent *bev, bool downstream)
c->bev = bev;
c->connected_since = time(NULL);
c->downstream = downstream;
int rc = 0;
if ((rc = getnameinfo((struct sockaddr*)ss, sizeof(ss),
c->host, MAX_HOST, NULL, 0, NI_NUMERICHOST)))
{
log_error("Error getting client address: %s",
gai_strerror(rc));
}
else
{
struct sockaddr_in *sin = (struct sockaddr_in*) ss;
c->port = htons(sin->sin_port);
}
bstack_new(&c->active_jobs, CLIENT_JOBS_MAX, sizeof(job_t), job_recycle);
pthread_rwlock_wrlock(&rwlock_cfd);
HASH_ADD_INT(clients_by_fd, fd, c);
pthread_rwlock_unlock(&rwlock_cfd);
return c;
}
static void
@ -3146,7 +3167,7 @@ miner_on_submit(json_object *message, client_t *client)
char body[ERROR_BODY_MAX] = {0};
stratum_get_error_body(body, client->json_id, "Duplicate share");
evbuffer_add(output, body, strlen(body));
log_debug("Duplicate share");
log_debug("[%s:%d] Duplicate share", client->host, client->port);
free(block);
return;
}
@ -3351,7 +3372,7 @@ miner_on_read(struct bufferevent *bev, void *ctx)
char body[ERROR_BODY_MAX] = {0};
stratum_get_error_body(body, client->json_id, too_long);
evbuffer_add(output, body, strlen(body));
log_info(too_long);
log_warn("[%s:%d] %s", client->host, client->port, too_long);
evbuffer_drain(input, len);
client_clear(bev);
goto unlock;
@ -3366,7 +3387,7 @@ miner_on_read(struct bufferevent *bev, void *ctx)
char body[ERROR_BODY_MAX] = {0};
stratum_get_error_body(body, client->json_id, invalid_json);
evbuffer_add(output, body, strlen(body));
log_info(invalid_json);
log_warn("[%s:%d] %s", client->host, client->port, invalid_json);
evbuffer_drain(input, len);
client_clear(bev);
goto unlock;
@ -3417,7 +3438,7 @@ miner_on_read(struct bufferevent *bev, void *ctx)
char body[ERROR_BODY_MAX] = {0};
stratum_get_error_body(body, client->json_id, unknown_method);
evbuffer_add(output, body, strlen(body));
log_info(unknown_method);
log_warn("[%s:%d] %s", client->host, client->port, unknown_method);
evbuffer_drain(input, len);
client_clear(bev);
goto unlock;
@ -3427,7 +3448,7 @@ miner_on_read(struct bufferevent *bev, void *ctx)
char body[ERROR_BODY_MAX] = {0};
stratum_get_error_body(body, client->json_id, too_bad);
evbuffer_add(output, body, strlen(body));
log_info(too_bad);
log_warn("[%s:%d] %s", client->host, client->port, too_bad);
evbuffer_drain(input, len);
client_clear(bev);
goto unlock;
@ -3470,7 +3491,8 @@ trusted_on_read(struct bufferevent *bev, void *ctx)
tag = evbuffer_search(input, (const char*) msgbin, 8, NULL);
if (tag.pos < 0)
{
log_error("Bad message from downstream");
log_warn("[%s:%d] Bad message from downstream",
client->host, client->port);
evbuffer_drain(input, len);
client_clear(bev);
goto unlock;
@ -3508,7 +3530,8 @@ trusted_on_read(struct bufferevent *bev, void *ctx)
trusted_on_client_block(client);
break;
default:
log_error("Unknown message: %d", tnt[8]);
log_warn("[%s:%d] Unknown message: %d",
client->host, client->port, tnt[8]);
evbuffer_drain(input, len);
client_clear(bev);
goto unlock;
@ -3525,18 +3548,23 @@ static void
listener_on_error(struct bufferevent *bev, short error, void *ctx)
{
struct event_base *base = (struct event_base*)ctx;
client_t *client = NULL;
client_find(bev, &client);
char *type = base != trusted_base ? "Miner" : "Downstream";
if (error & BEV_EVENT_EOF)
{
log_debug("%s disconnected. Removing.", type);
log_debug("[%s:%d] %s disconnected. Removing.",
client->host, client->port, type);
}
else if (error & BEV_EVENT_ERROR)
{
log_debug("%s error: %d. Removing.", type, errno);
log_debug("[%s:%d] %s error: %d. Removing.",
client->host, client->port, type, errno);
}
else if (error & BEV_EVENT_TIMEOUT)
{
log_debug("%s timeout. Removing.", type);
log_debug("[%s:%d] %s timeout. Removing.",
client->host, client->port, type);
}
client_clear(bev);
}
@ -3594,9 +3622,12 @@ listener_on_accept(evutil_socket_t listener, short event, void *arg)
base == trusted_base ? trusted_on_read : miner_on_read,
NULL, listener_on_error, arg);
bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);
client_add(fd, bev, base == trusted_base);
log_info("New %s connected. Miner count: %d, Pool hashrate: %"PRIu64,
type, pool_stats.connected_accounts, pool_stats.pool_hashrate);
const client_t *c = client_add(fd, &ss, bev, base == trusted_base);
log_info("New %s [%s:%d] connected", type, c->host, c->port);
log_info("Pool accounts: %d, workers: %d, hashrate: %"PRIu64,
pool_stats.connected_accounts,
gbag_used(bag_clients),
pool_stats.pool_hashrate);
bufferevent_enable(bev, EV_READ|EV_WRITE);
}
@ -4019,6 +4050,7 @@ trusted_run(void *ctx)
{
int one = 1;
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
setsockopt(listener, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one));
}
#endif
@ -4083,6 +4115,7 @@ run(void)
{
int one = 1;
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
setsockopt(listener, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one));
}
#endif
@ -4315,7 +4348,7 @@ int main(int argc, char **argv)
{
fd_log = fopen(config.log_file, "a");
if (!fd_log)
log_info("Failed to open log file: %s", config.log_file);
log_warn("Failed to open log file: %s", config.log_file);
else
log_set_fp(fd_log);
}

View File

@ -45,6 +45,7 @@ developers.
#include <event2/event.h>
#include <event2/buffer.h>
#include <event2/http.h>
#include <event2/listener.h>
#include "log.h"
#include "pool.h"
@ -148,8 +149,25 @@ static void *
thread_main(void *ctx)
{
wui_context_t *context = (wui_context_t*) ctx;
webui_listener = evhttp_bind_socket_with_handle(
webui_httpd, context->pool_listen, context->port);
struct evconnlistener *lev = NULL;
struct addrinfo *info = NULL;
int rc;
char port[6] = {0};
sprintf(port, "%d", context->port);
if ((rc = getaddrinfo(context->pool_listen, port, 0, &info)))
{
log_error("Error parsing listen address: %s", gai_strerror(rc));
return 0;
}
lev = evconnlistener_new_bind(webui_base, 0, NULL,
LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE | LEV_OPT_REUSEABLE_PORT,
-1, (struct sockaddr*)info->ai_addr, info->ai_addrlen);
if (!lev)
{
log_error("%s", strerror(errno));
return 0;
}
webui_listener = evhttp_bind_listener(webui_httpd, lev);
if(!webui_listener)
{
log_error("Failed to bind for port: %u", context->port);