Commit eda729bf authored by Genaro Juan Sánchez Gallegos's avatar Genaro Juan Sánchez Gallegos Committed by Javier Garcia Blas
Browse files

Add improvements for metadata.

1 merge request!5Merge from Debug into master
Showing with 201 additions and 140 deletions
+201 -140
......@@ -41,7 +41,7 @@ ucp_worker_h ucp_worker;
// ucp_ep_h pub_ep;
ucp_address_t *req_addr;
ucp_ep_h client_ep;
ucp_ep_h **metadata_endpoints;
size_t req_addr_len;
unsigned long number_active_storage_servers = 0; // stores the current number of active storage servers.
......@@ -163,7 +163,7 @@ int stop_server()
// last "0" is the server status to be set.
sprintf(key_plus_size, "%d SET %lu %s %d", args.id, number_active_storage_servers, args.imss_uri, 0);
slog_debug("[main] Request - %s", key_plus_size);
if (send_req(ucp_worker, client_ep, req_addr, req_addr_len, key_plus_size) == 0)
if (send_req(ucp_worker, *metadata_endpoints[0], req_addr, req_addr_len, key_plus_size) == 0)
{
perror("HERCULES_ERR_STOP_SERVER_SEND_REQ");
return -1;
......@@ -195,7 +195,7 @@ int wakeup_server()
sprintf(key_plus_size, "%d SET %lu %s %d", args.id, number_active_storage_servers, args.imss_uri, 1);
// fprintf(stderr, "Request - %s\n", key_plus_size);
slog_debug("[main] Request - %s", key_plus_size);
if (send_req(ucp_worker, client_ep, req_addr, req_addr_len, key_plus_size) == 0)
if (send_req(ucp_worker, *metadata_endpoints[0], req_addr, req_addr_len, key_plus_size) == 0)
{
perror("ERR_HERCULES_RLS_SERVER_SEND_REQ");
return -1;
......@@ -276,7 +276,7 @@ void handle_signal_server(int signal)
pthread_mutex_lock(&global_finish_mut);
pthread_cond_wait(&global_finish_cond, &global_finish_mut);
fprintf(stderr, "Waiting for snapshot and checkpointing in server %d\n", args.id);
// This file is readed by the hercules script to know if this server
// was correctly shutting down.
......@@ -536,157 +536,211 @@ int32_t main(int32_t argc, char **argv)
uint32_t id = args.id;
slog_debug("Establishing a connection with %s:%ld\n", stat_add, stat_port);
oob_sock = connect_common(stat_add, stat_port, AF_INET);
// %%%%%%%%%%%%%%%%%%%%%%%%%%
// Read the metadata hostfile.
// FILE entity managing the HERCULES deployfile.
FILE *metadata_nodes_fd;
char request[REQUEST_SIZE];
sprintf(request, "%" PRIu32 " GET %s", id, "MAIN!QUERRY");
slog_debug("Request - %s", request);
if (send(oob_sock, request, REQUEST_SIZE, 0) < 0)
if (args.meta_hostfile[0] == '\0')
{
perror("HERCULES_ERR_META_HOSTFILE_NOT_SET");
exit(1);
}
slog_debug("Opening file %s", args.meta_hostfile)
if ((metadata_nodes_fd = fopen(args.meta_hostfile, "r+")) == NULL)
{
perror("HERCULES_ERR_STAT_HELLO");
slog_error("HERCULES_ERR_STAT_HELLO");
char err_msg[MAX_ERR_MSG_LEN];
sprintf(err_msg, "HERCULES_ERR_DEPLOYFILE_OPEN:%s", deployfile);
perror(err_msg);
slog_fatal("%s", err_msg);
return -1;
}
ret = recv(oob_sock, &addr_len, sizeof(addr_len), MSG_WAITALL);
peer_addr = (ucp_address *)malloc(addr_len);
ret = recv(oob_sock, peer_addr, addr_len, MSG_WAITALL);
close(oob_sock);
/* Send client UCX address to server */
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS |
UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
UCP_EP_PARAM_FIELD_ERR_HANDLER |
UCP_EP_PARAM_FIELD_USER_DATA;
ep_params.address = peer_addr;
ep_params.err_mode = UCP_ERR_HANDLING_MODE_PEER;
ep_params.err_handler.cb = err_cb_client;
ep_params.err_handler.arg = NULL;
ep_params.user_data = &ep_status;
status = ucp_ep_create(ucp_worker, &ep_params, &client_ep);
// status = ucp_worker_get_address(ucp_worker, &req_addr, &req_addr_len);
ucp_worker_attr_t worker_attr;
worker_attr.field_mask = UCP_WORKER_ATTR_FIELD_ADDRESS;
status = ucp_worker_query(ucp_worker, &worker_attr);
req_addr_len = worker_attr.address_length;
req_addr = worker_attr.address;
attr.field_mask = UCP_WORKER_ADDRESS_ATTR_FIELD_UID;
ucp_worker_address_query(req_addr, &attr);
slog_debug("[srv_worker_thread] Server UID %" PRIu64 ".", attr.worker_uid);
if (!args.id)
{ // Only performs by the data server with ID = 0.
// Formated HERCULES uri to be sent to the metadata server.
char formated_uri[REQUEST_SIZE];
sprintf(formated_uri, "%" PRIu32 " GET 0 %s", id, args.imss_uri);
slog_debug("Request - %s", formated_uri);
// Send the request.
if (send_req(ucp_worker, client_ep, req_addr, req_addr_len, formated_uri) == 0)
// Number of characters successfully read from the line.
int32_t n_chars;
int init_server_status = 1;
// int num_active_data_servers = 0;
metadata_endpoints = (ucp_ep_h **)malloc(args.num_metadata_servers * sizeof(ucp_ep_h *));
for (int32_t i = 0; i < args.num_metadata_servers; i++)
{
metadata_endpoints[i] = (ucp_ep_h *)malloc(sizeof(ucp_ep_h));
// Allocate resources in the metadata structure so as to store the current HERCULES's IP.
// (my_imss.ips)[i] = (char *)calloc(LINE_LENGTH, sizeof(char));
size_t num_bytes_for_line = 0;
stat_add = NULL;
// Save HERCULES metadata deployment.
n_chars = getline(&stat_add, &num_bytes_for_line, metadata_nodes_fd);
// Erase the new line character ('') from the string.
if (stat_add[n_chars - 1] == '\n')
{
slog_error("HERCULES_ERR__SEND_REQ");
perror("HERCULES_ERR__SEND_REQ");
return -1;
stat_add[n_chars - 1] = '\0';
}
// Get the length of the message to be received.
size_t length = 0;
length = get_recv_data_length(ucp_worker, attr.worker_uid);
if (length == 0)
slog_debug("Establishing a connection with %s:%ld\n", stat_add, stat_port);
oob_sock = connect_common(stat_add, stat_port, AF_INET);
char request[REQUEST_SIZE];
sprintf(request, "%" PRIu32 " GET %s", id, "MAIN!QUERRY");
slog_debug("Request - %s", request);
if (send(oob_sock, request, REQUEST_SIZE, 0) < 0)
{
slog_error("HERCULES_ERR__GET_RECV_DATA_LENGTH");
perror("HERCULES_ERR__GET_RECV_DATA_LENGTH");
perror("HERCULES_ERR_STAT_HELLO");
slog_error("HERCULES_ERR_STAT_HELLO");
return -1;
}
// Receive the associated structure.
imss_info *imss_info_ = (imss_info *)malloc(sizeof(imss_info) * length);
ret = recv_dynamic_stream(ucp_worker, client_ep, imss_info_, IMSS_INFO, attr.worker_uid, length);
// If another data server has taken the URI, this HERCULES configuration should not be deployed.
// Two HERCULES configurations cannot have the same URI.
// We check if "recv_dynamic_stream" has successed, if so, there are another HERCULES instance using
// the same URI.
// On success, we free memory and stop this instance.
int new_id = 0;
if (ret != -1)
{ // success "recv_dynamic_stream".
// fprintf(stderr, "imss_info_.num_storages=%d, length=%lu\n", imss_info_->num_storages, length);
imss_exists = 1;
for (int32_t i = 0; i < imss_info_->num_storages; i++)
ret = recv(oob_sock, &addr_len, sizeof(addr_len), MSG_WAITALL);
slog_debug("Address len=%lu", addr_len);
peer_addr = (ucp_address *)malloc(addr_len);
ret = recv(oob_sock, peer_addr, addr_len, MSG_WAITALL);
slog_debug("Peer Address=%lu", peer_addr);
close(oob_sock);
free(stat_add);
/* Send client UCX address to server */
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS |
UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
UCP_EP_PARAM_FIELD_ERR_HANDLER |
UCP_EP_PARAM_FIELD_USER_DATA;
ep_params.address = peer_addr;
ep_params.err_mode = UCP_ERR_HANDLING_MODE_PEER;
ep_params.err_handler.cb = err_cb_client;
ep_params.err_handler.arg = NULL;
ep_params.user_data = &ep_status;
slog_debug("Creating endpoint with the metadata server %d", i);
status = ucp_ep_create(ucp_worker, &ep_params, metadata_endpoints[i]);
slog_debug("Endpoint with the metadata %d created", i);
// status = ucp_worker_get_address(ucp_worker, &req_addr, &req_addr_len);
ucp_worker_attr_t worker_attr;
worker_attr.field_mask = UCP_WORKER_ATTR_FIELD_ADDRESS;
status = ucp_worker_query(ucp_worker, &worker_attr);
req_addr_len = worker_attr.address_length;
req_addr = worker_attr.address;
attr.field_mask = UCP_WORKER_ADDRESS_ATTR_FIELD_UID;
ucp_worker_address_query(req_addr, &attr);
slog_debug("[srv_worker_thread] Server UID %" PRIu64 ".", attr.worker_uid);
if (!args.id)
{ // Only performs by the data server with ID = 0.
// Formated HERCULES uri to be sent to the metadata server.
char formated_uri[REQUEST_SIZE];
sprintf(formated_uri, "%" PRIu32 " GET 0 %s", id, args.imss_uri);
slog_debug("Request - %s", formated_uri);
// Send the request.
if (send_req(ucp_worker, *metadata_endpoints[i], req_addr, req_addr_len, formated_uri) == 0)
{
// fprintf(stderr,"ip[%d]=%s\n", i, imss_info_->ips[i]);
free(imss_info_->ips[i]);
new_id++;
slog_error("HERCULES_ERR__SEND_REQ");
perror("HERCULES_ERR__SEND_REQ");
return -1;
}
free(imss_info_->ips);
}
free(imss_info_);
if (args.id != new_id)
{
fprintf(stderr, "Data server with id = %d already in use, changing to %d\n", args.id, new_id);
// Set a new id to this server.
args.id = new_id;
/* code */
}
}
if (imss_exists)
{
// Here we need to stop all HERCULES data servers
// related to this configuration, or check if them
// are not running anymore to continue deploying this configuration.
perror("HERCULES_ERR_SERVER_URI_TAKEN");
slog_error("HERCULES_ERR_SERVER_URI_TAKEN, ret=%d", ret);
// ready(tmp_file_path, "ERROR");
// return 0;
}
// When LOCAL policy is used, the server creates a shared memory region.
if (!strcmp(POLICY, "LOCAL") || !strcmp(POLICY, "ZCOPY"))
{
// Get the shared memory key and tries to create the shared memory region (pool).
key_t key = getKeySM();
slog_info("Generated Key = %d\n", key);
// Get the length of the message to be received.
size_t length = 0;
length = get_recv_data_length(ucp_worker, attr.worker_uid);
if (length == 0)
{
slog_error("HERCULES_ERR__GET_RECV_DATA_LENGTH");
perror("HERCULES_ERR__GET_RECV_DATA_LENGTH");
return -1;
}
// Receive the associated structure.
imss_info *imss_info_ = (imss_info *)malloc(sizeof(imss_info) * length);
ret = recv_dynamic_stream(ucp_worker, *metadata_endpoints[i], imss_info_, IMSS_INFO, attr.worker_uid, length);
// If another data server has taken the URI, this HERCULES configuration should not be deployed.
// Two HERCULES configurations cannot have the same URI.
// We check if "recv_dynamic_stream" has successed, if so, there are another HERCULES instance using
// the same URI.
// On success, we free memory and stop this instance.
int new_id = 0;
if (ret != -1)
{ // success "recv_dynamic_stream".
// fprintf(stderr, "imss_info_.num_storages=%d, length=%lu\n", imss_info_->num_storages, length);
imss_exists = 1;
for (int32_t i = 0; i < imss_info_->num_storages; i++)
{
// fprintf(stderr,"ip[%d]=%s\n", i, imss_info_->ips[i]);
free(imss_info_->ips[i]);
new_id++;
}
free(imss_info_->ips);
}
free(imss_info_);
if (args.id != new_id)
{
fprintf(stderr, "Data server with id = %d already in use, changing to %d\n", args.id, new_id);
// Set a new id to this server.
args.id = new_id;
/* code */
}
}
shm_data_id = getIdentifierSM(key, SHM_SIZE);
if (shm_data_id == -1)
if (imss_exists)
{
perror("ERR_HERCULES_GET_SM_IDENTIFIER");
// Do not stop the process.
// Here we need to stop all HERCULES data servers
// related to this configuration, or check if them
// are not running anymore to continue deploying this configuration.
perror("HERCULES_ERR_SERVER_URI_TAKEN");
slog_error("HERCULES_ERR_SERVER_URI_TAKEN, ret=%d", ret);
// ready(tmp_file_path, "ERROR");
// return 0;
}
else
// When LOCAL policy is used, the server creates a shared memory region.
if (!strcmp(POLICY, "LOCAL") || !strcmp(POLICY, "ZCOPY"))
{
void *pool_memory = createSM(shm_data_id);
if (pool_memory == NULL)
{ // error creating the shared memory region.
perror("HERCULES_ERR_CREATE_SM");
slog_error("HERCULES_ERR_CREATE_SM");
ready(tmp_file_path, "ERROR");
exit(0);
// Get the shared memory key and tries to create the shared memory region (pool).
key_t key = getKeySM();
slog_info("Generated Key = %d\n", key);
shm_data_id = getIdentifierSM(key, SHM_SIZE);
if (shm_data_id == -1)
{
perror("ERR_HERCULES_GET_SM_IDENTIFIER");
// Do not stop the process.
}
else
{
// Shared memory has been created.
args.pool_memory = pool_memory;
// Becasue the shared memory was successfully created, we
// initializate a semaphore to sincronize block 0.
sem_shared_memory = sem_open("/hercules_shm_sem", O_CREAT, 0644, 1);
if (sem_shared_memory == SEM_FAILED)
void *pool_memory = createSM(shm_data_id);
if (pool_memory == NULL)
{ // error creating the shared memory region.
perror("HERCULES_ERR_CREATE_SM");
slog_error("HERCULES_ERR_CREATE_SM");
ready(tmp_file_path, "ERROR");
exit(0);
}
else
{
perror("HERCULES_ERR_SHM_SEM_OPEN");
exit(-1);
// Shared memory has been created.
args.pool_memory = pool_memory;
// Becasue the shared memory was successfully created, we
// initializate a semaphore to sincronize block 0.
sem_shared_memory = sem_open("/hercules_shm_sem", O_CREAT, 0644, 1);
if (sem_shared_memory == SEM_FAILED)
{
perror("HERCULES_ERR_SHM_SEM_OPEN");
exit(-1);
}
// Close the semaphore. The semaphore will remain and can
// be used by the front-end until unlink is called.
sem_close(sem_shared_memory);
}
// Close the semaphore. The semaphore will remain and can
// be used by the front-end until unlink is called.
sem_close(sem_shared_memory);
}
}
}
// Close the file.
if (fclose(metadata_nodes_fd) != 0)
{
perror("HERCULES_ERR_DEPLOYFILE_CLOSE");
slog_fatal("HERCULES_ERR_DEPLOYFILE_CLOSE");
return -1;
}
}
// Metadata server.
else
......@@ -824,6 +878,7 @@ int32_t main(int32_t argc, char **argv)
ucp_worker_attr_t worker_attr;
worker_attr.field_mask = UCP_WORKER_ATTR_FIELD_ADDRESS;
status = ucp_worker_query(ucp_worker_threads[aux_idx], &worker_attr);
slog_debug("Setting address=%lu (len=%lu) to local_addr at %d", worker_attr.address, worker_attr.address_length, aux_idx)
local_addr_len[aux_idx] = worker_attr.address_length;
local_addr[aux_idx] = worker_attr.address;
......@@ -932,22 +987,27 @@ int32_t main(int32_t argc, char **argv)
// Send the created structure to the metadata server.
sprintf(key_plus_size, "%" PRIu32 " SET %lu %s", id, (sizeof(imss_info) + my_imss.num_storages * LINE_LENGTH + my_imss.num_storages * sizeof(int) + my_imss.num_storages * sizeof(int)), my_imss.uri_);
slog_debug("[main] Request - %s", key_plus_size);
if (send_req(ucp_worker, client_ep, req_addr, req_addr_len, key_plus_size) == 0)
for (size_t j = 0; j < args.num_metadata_servers; j++)
{
perror("HERCULES_ERR_SEND_REQ_SET_STR");
slog_fatal("HERCULES_ERR_SEND_REQ_SET_STR");
return -1;
}
slog_debug("[SERVER] Creating IMSS_INFO at metadata server. ");
// Send the new HERCULES metadata structure to the metadata server entity.
if (send_dynamic_stream(ucp_worker, client_ep, (char *)&my_imss, IMSS_INFO, attr.worker_uid) == -1)
return -1;
if (send_req(ucp_worker, *metadata_endpoints[j], req_addr, req_addr_len, key_plus_size) == 0)
{
perror("HERCULES_ERR_SEND_REQ_SET_STR");
slog_fatal("HERCULES_ERR_SEND_REQ_SET_STR");
return -1;
}
slog_debug("[SERVER] Creating IMSS_INFO at metadata server. ");
// Send the new HERCULES metadata structure to the metadata server entity.
if (send_dynamic_stream(ucp_worker, *metadata_endpoints[j], (char *)&my_imss, IMSS_INFO, attr.worker_uid) == -1)
{
return -1;
}
}
for (int32_t i = 0; i < num_servers; i++)
free(my_imss.ips[i]);
free(my_imss.ips);
// ucp_ep_close_nb(client_ep, UCP_EP_CLOSE_MODE_FORCE);
}
if (args.type == TYPE_DATA_SERVER)
......@@ -1034,7 +1094,7 @@ int32_t main(int32_t argc, char **argv)
// Close publisher socket.
// ep_close(ucp_worker, pub_ep, UCP_EP_CLOSE_MODE_FORCE);
// ep_close(ucp_worker, client_ep, UCP_EP_CLOSE_MODE_FORCE);
// ep_close(ucp_worker, metadata_endpoints, UCP_EP_CLOSE_MODE_FORCE);
// ucp_cleanup(ucp_context);
// sprintf(tmp_file_path, "%s/tmp/%c-hercules-%d-stop", args.hercules_path, args.type, args.id);
......
......@@ -2606,6 +2606,7 @@ void *dispatcher(void *th_argv)
// metadata servers.
ret = send(new_socket, &local_addr_len[0], sizeof(local_addr_len[0]), 0);
ret = send(new_socket, local_addr[0], local_addr_len[0], 0);
slog_debug("[DISPATCHER] Sent address %lu (%lu) to the client", local_addr[0], local_addr_len[0]);
}
// Check if someone is requesting identity resources.
else if (*((int32_t *)req) == WHO)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment