Commit 8e895b18 authored by Javier Garcia Blas's avatar Javier Garcia Blas
Browse files

Merge branch 'Debug' into 'master'

Merge from Debug into master

See merge request !5
1 merge request!5Merge from Debug into master
Showing with 745 additions and 566 deletions
+745 -566
......@@ -65,7 +65,7 @@ SET(IMSS_LIB "-L${CMAKE_SOURCE_DIR}/build/app")
target_link_directories(hercules_server PUBLIC ${UCX_LIBRARIES})
TARGET_LINK_LIBRARIES(imss_static ${CMAKE_THREAD_LIBS_INIT} ${MPI_LIBRARIES} ${GLIB_LIBRARIES} ucx::ucp ucx::ucs ${JEMALLOC_LIBRARIES})
TARGET_LINK_LIBRARIES(imss_static ${CMAKE_THREAD_LIBS_INIT} ${MPI_LIBRARIES} ${GLIB_LIBRARIES} ucx::ucp ucx::ucs ${JEMALLOC_LIBRARIES} )
TARGET_LINK_LIBRARIES(imss_shared ${CMAKE_THREAD_LIBS_INIT} ${MPI_LIBRARIES} ${GLIB_LIBRARIES} dl ucx::ucp ucx::ucs ${JEMALLOC_LIBRARIES} )
TARGET_LINK_LIBRARIES(hercules_server ${CMAKE_THREAD_LIBS_INIT} ${MPI_LIBRARIES} ${GLIB_LIBRARIES} ${PCRE_LIBRARY} ucx::ucp ucx::ucs ${JEMALLOC_LIBRARIES} )
......
......@@ -4,13 +4,14 @@
#include <stdlib.h>
#include <pthread.h>
#include <sys/signal.h>
#include "metadata_stat.h"
#include "memalloc.h"
#include "directory.h"
#include "records.hpp"
#include "map_ep.hpp"
#include <inttypes.h>
#include <unistd.h>
#include "records.hpp"
#include "map_ep.hpp"
#include "memalloc.h"
#include "metadata_stat.h"
#include "directory.h"
#include "policies.h"
// Pointer to the tree's root node.
extern GNode *tree_root;
......@@ -41,7 +42,7 @@ ucp_worker_h ucp_worker;
// ucp_ep_h pub_ep;
ucp_address_t *req_addr;
ucp_ep_h client_ep;
ucp_ep_h *metadata_endpoints;
size_t req_addr_len;
unsigned long number_active_storage_servers = 0; // stores the current number of active storage servers.
......@@ -50,7 +51,12 @@ pthread_t *threads;
// global variables usted to finish threads.
extern int global_finish_threads;
extern int global_finish_checkpoint;
extern int global_finish_snapshot;
extern int global_server_fd_thread;
extern pthread_cond_t global_finish_cond;
extern pthread_cond_t global_run_snapshot_cond;
extern pthread_cond_t global_run_checkpoint_cond;
extern pthread_mutex_t global_finish_mut;
#define RAM_STORAGE_USE_PCT 0.75f // percentage of free system RAM to be used for storage
......@@ -59,82 +65,7 @@ extern int global_server_fd_thread;
* following the distribution policy choose by the user.
* @return 0 on success, on error -1 is returned.
*/
int move_blocks_2_server(uint64_t stat_port, uint32_t server_id, char *imss_uri, std::shared_ptr<map_records> map)
{
// Creates endpoints to all data servers. It is use in case of
// malleability to move blocks between data servers.
slog_debug("Connecting to data servers\n");
open_imss(imss_uri); // TODO: Check if this is still necessary due we called it on the main function.
if (number_active_storage_servers < 0)
{
slog_fatal("Error creating HERCULES's resources, the process cannot be started");
return -1;
}
// Here data server should to move the datablocks.
// print all key/value elements.
double time_taken;
time_t t = clock();
void *address_;
uint64_t block_size;
int curr_map_size = 0;
const char *uri_;
size_t size;
char key_[REQUEST_SIZE];
// Get the number of blocks stored by this data server.
int number_of_blocks_2_move = map->size();
slog_info("Server %d, has %d blocks, active storage servers=%lu", args.id, map->size(), number_active_storage_servers);
while ((curr_map_size = map->size()) > 0 && number_active_storage_servers > 0)
{
std::string key;
// get next key (block identifier) with the format <block_name>$<block_number>
// for example "myfile$199", where block_name = myfile, and block_number = 199.
key = map->get_head_element();
// get the element data and store it in "address_".
map->get(key, &address_, &block_size);
// fprintf(stderr, "**** curr_map_size=%d, head element=%s, block_size=%ld\n", curr_map_size, key.c_str(), block_size);
slog_debug("**** curr_map_size=%d, head element=%s, block_size=%ld\n", curr_map_size, key.c_str(), block_size);
int pos = key.find('$') + 1; // +1 to skip '$' on the block number.
std::string block = key.substr(pos, key.length() + 1); // substract the block number from the key.
int block_number = stoi(block, 0, 10); // string to number.
pos -= 1; // -1 to skip '$' on the data uri.
std::string data_uri = key.substr(0, pos); // substract the data uri from the key.
slog_debug("key='%s',\turi='%s',\tblock='%s'\n", key.c_str(), data_uri.c_str(), block.c_str());
int next_server = find_server(number_active_storage_servers, block_number, data_uri.c_str(), 0);
slog_info("key='%s',\turi='%s%s',\tfrom server %d to server %d,\tactive servers=%lu\n", key.c_str(), data_uri.c_str(), block.c_str(), server_id, next_server, number_active_storage_servers);
slog_debug("new server=%d, curr_server=%d\n", next_server, server_id);
// here we can send key.c_str() directly to reduce the number of operations.
if (set_data_server(data_uri.c_str(), block_number, address_, block_size, 0, next_server) < 0)
{
slog_error("ERR_HERCULES_SET_DATA_IN_SERVER\n");
perror("ERR_HERCULES_SET_DATA_IN_SERVER");
return -1;
}
// delete the element from the map.
map->erase_head_element();
// get new map size to print it.
curr_map_size = map->size();
// fprintf(stderr, "**** curr_map_size=%d\n", curr_map_size);
slog_debug("**** curr_map_size=%d\n", curr_map_size);
}
t = clock() - t;
time_taken = ((double)t) / (CLOCKS_PER_SEC);
if (number_active_storage_servers > 0)
{
// fprintf(stderr, "[HS] Data movement %d blocks %lu %f sec.\n", number_of_blocks_2_move, number_active_storage_servers, time_taken);
fprintf(stderr, "\033[0;34m [HS] Server %d has moved %d blocks to %lu servers in %f sec. \033[0m\n", args.id, number_of_blocks_2_move, number_active_storage_servers, time_taken);
}
return 0;
}
int move_blocks_2_server(uint64_t stat_port, uint32_t server_id, char *imss_uri, std::shared_ptr<map_records> map);
/**
* @brief Comunicates data servers to metadata servers to
......@@ -142,31 +73,7 @@ int move_blocks_2_server(uint64_t stat_port, uint32_t server_id, char *imss_uri,
* status of this data server (non active).
* @return 0 on success, on error -1 is returned.
*/
int stop_server()
{
// Get the current number of active nodes.
number_active_storage_servers = get_number_of_active_nodes();
if (number_active_storage_servers < 0)
{
return -1;
}
// Tell metadata server to reduce number of servers.
char key_plus_size[REQUEST_SIZE];
// Send the created structure to the metadata server.
// last "0" is the server status to be set.
sprintf(key_plus_size, "%d SET %lu %s %d", args.id, number_active_storage_servers, args.imss_uri, 0);
slog_debug("[main] Request - %s", key_plus_size);
if (send_req(ucp_worker, client_ep, req_addr, req_addr_len, key_plus_size) == 0)
{
perror("HERCULES_ERR_STOP_SERVER_SEND_REQ");
return -1;
}
return 0;
}
int stop_server();
/**
* @brief Comunicates data servers to metadata servers to
* update the number of active servers and to update the
......@@ -176,7 +83,7 @@ int stop_server()
int wakeup_server()
{
// Get the current number of active nodes.
number_active_storage_servers = get_number_of_active_nodes();
number_active_storage_servers = get_number_of_active_nodes(args.hercules_path);
if (number_active_storage_servers < 0)
{
......@@ -190,7 +97,7 @@ int wakeup_server()
sprintf(key_plus_size, "%d SET %lu %s %d", args.id, number_active_storage_servers, args.imss_uri, 1);
// fprintf(stderr, "Request - %s\n", key_plus_size);
slog_debug("[main] Request - %s", key_plus_size);
if (send_req(ucp_worker, client_ep, req_addr, req_addr_len, key_plus_size) == 0)
if (send_req(ucp_worker, metadata_endpoints[0], req_addr, req_addr_len, key_plus_size) == 0)
{
perror("ERR_HERCULES_RLS_SERVER_SEND_REQ");
return -1;
......@@ -205,131 +112,24 @@ int wakeup_server()
* SIGUSR1 is used to srink and SIGUSR2 is used to
* increase the number of servers.
*/
void handle_signal_server(int signal)
{
if (signal == SIGUSR1) // suspend or shutdown this server.
{
slog_info("SIGUSR1 received");
int pkill_operation = 0, ret = 0;
char buf[10], action[20], temporal_path[PATH_MAX];
sprintf(temporal_path,"%s/tmp/hercules_pkill_operation", args.hercules_path);
// fprintf(stderr,"Temporal path: %s\n", temporal_path);
// Get the operation number.
int fd = open(temporal_path, O_RDONLY);
if (fd == -1)
{
char err_msg[MAX_ERR_MSG_LEN];
sprintf(err_msg, "ERR_HERCULES_OPEN_PKILL_OPERATION:%s", temporal_path);
perror(err_msg);
return;
}
ret = read(fd, buf, sizeof(buf) - 1);
buf[ret] = '\0';
// In case of read error, pkill_operation must be 0
// to suspend the server but not shutdown it.
if (ret == -1)
{
pkill_operation = 0;
perror("HERCULES_ERR_READ_PKILL_OPERATION");
}
else
{
pkill_operation = atoi(buf);
}
void handle_signal_server(int signal);
ret = close(fd);
if (fd == -1)
{
perror("ERR_HERCULES_CLOSE_PKILL_OPERATION");
}
slog_info("pkill_operation = %d", pkill_operation);
// fprintf(stderr, "pkill_operation = %d\n", pkill_operation);
switch (pkill_operation)
{
case 1: // finish data server processes (shutdown).
// "global_finish_threads" is a gloabl variable readed by the
// dispatcher and workers threads. 1 indicates those threads
// must finish their execution.
if (args.type == TYPE_METADATA_SERVER || global_finish_checkpoint == 1)
global_finish_threads = 1;
else
global_finish_checkpoint = 1;
sprintf(action, "stop");
// Shutdown or close the socket used by the dispatcher pointed
// by the file descriptor "global_server_fd_thread".
if (shutdown(global_server_fd_thread, SHUT_RD) == -1)
{
perror("ERR_HERCULES_SHUTDOWN_SERVER_FD\n");
}
break;
default: // suspend the data server.
sprintf(action, "remove");
// Data servers processes will still running to be reused on
// the future. On shrink process, this server won't be used,
// but backend processes will be still running.
break;
}
// Data servers performs malleability operations if it is enabled.
if (args.type == TYPE_DATA_SERVER && args.malleability == 1)
{
ret = stop_server();
if (ret == 0) // success.
{
ret = move_blocks_2_server(args.stat_port, args.id, args.imss_uri, g_map);
if (ret < 0) // error.
{
// TODO: if "move_blocks_2_server" fails, try again?
}
}
}
// This file is readed by the hercules script to know if this server
// was correctly shutting down.
char tmp_file_path[PATH_MAX];
sprintf(tmp_file_path, "%s/tmp/%c-hercules-%d-%s", args.hercules_path, args.type, args.id, action);
ready(tmp_file_path, "OK");
}
if (signal == SIGUSR2) // wake up this server.
{
slog_info("SIGUSR2 received");
if (args.type == TYPE_DATA_SERVER) // only data servers.
{
fprintf(stderr, " \033[0;32m Waking up server %d \033[0m\n", args.id);
// Changes the number of active servers in the metadata server
// and the status of this server.
wakeup_server();
// This file is readed by the hercules script to know if this server
// was correctly waking up.
char tmp_file_path[PATH_MAX];
sprintf(tmp_file_path, "%s/tmp/%c-hercules-%d-up", args.hercules_path, args.type, args.id);
fprintf(stderr, "Writting file %s\n", tmp_file_path);
ready(tmp_file_path, "OK");
}
}
}
void print_usage(const char *msg)
{
fprintf(stderr, "%s\n usage for METADATA server: hercules_server m <server_id>\n usage for DATA server: hercules_server d <server_id> <metadata_host> <initial_number_of_data_servers> \n", msg);
}
void print_usage(const char *msg);
int32_t main(int32_t argc, char **argv)
{
signal(SIGUSR1, handle_signal_server);
signal(SIGUSR2, handle_signal_server);
pthread_cond_init(&global_run_snapshot_cond, NULL);
pthread_cond_init(&global_finish_cond, NULL);
// clock_t t;
double time_taken;
int init_number_of_server = 1;
uint64_t bind_port;
char *stat_add = NULL;
// char *metadata_file;
char *deployfile = NULL;
int64_t buffer_size, stat_port, num_servers;
ucp_ep_params_t ep_params;
......@@ -350,6 +150,7 @@ int32_t main(int32_t argc, char **argv)
uint64_t max_system_ram_allowed;
uint64_t max_storage_size; // memory pool size
uint32_t num_blocks;
u_int16_t hercules_thread_pool_size;
// shared memory.
int shm_data_id = 0;
......@@ -392,17 +193,20 @@ int32_t main(int32_t argc, char **argv)
sprintf(tmp_file_path, "%s/tmp/%c-hercules-%d-start", args.hercules_path, args.type, args.id);
// args.logging.hercules_debug_level = SLOG_NONE;
IMSS_THREAD_POOL = args.thread_pool;
hercules_thread_pool_size = args.thread_pool;
fprintf(stderr, "Number of threads=%d\n", hercules_thread_pool_size);
// POLICY = args.policy;
strncpy(POLICY, args.policy, sizeof(POLICY));
char log_path[PATH_MAX];
char *workdir = getenv("PWD");
slog_debug("Server type=%c\n", args.type);
struct tm tm = *localtime(&t);
sprintf(log_path, "./%c-server-%d.%02d-%02d-%02d", args.type, args.id, tm.tm_hour, tm.tm_min, tm.tm_sec);
sprintf(log_path, "%s/%c-server-%d.%02d-%02d-%02d", workdir, args.type, args.id, tm.tm_hour, tm.tm_min, tm.tm_sec);
// Initializate logger.
slog_init(log_path, args.logging.hercules_debug_level, args.logging.hercules_debug_file, args.logging.hercules_debug_screen, 1, 1, 1, args.id);
// slog_time("Server %d started", args.id);
if (args.logging.hercules_debug_file > 0)
{
......@@ -504,170 +308,223 @@ int32_t main(int32_t argc, char **argv)
uint32_t id = args.id;
slog_debug("Establishing a connection with %s:%ld\n", stat_add, stat_port);
oob_sock = connect_common(stat_add, stat_port, AF_INET);
// %%%%%%%%%%%%%%%%%%%%%%%%%%
// Read the metadata hostfile.
// FILE entity managing the HERCULES deployfile.
FILE *metadata_nodes_fd;
char request[REQUEST_SIZE];
sprintf(request, "%" PRIu32 " GET %s", id, "MAIN!QUERRY");
slog_debug("Request - %s", request);
if (send(oob_sock, request, REQUEST_SIZE, 0) < 0)
if (args.meta_hostfile[0] == '\0')
{
perror("HERCULES_ERR_META_HOSTFILE_NOT_SET");
exit(1);
}
slog_debug("Opening file %s", args.meta_hostfile) if ((metadata_nodes_fd = fopen(args.meta_hostfile, "r+")) == NULL)
{
perror("HERCULES_ERR_STAT_HELLO");
slog_error("HERCULES_ERR_STAT_HELLO");
char err_msg[MAX_ERR_MSG_LEN];
sprintf(err_msg, "HERCULES_ERR_DEPLOYFILE_OPEN:%s", deployfile);
perror(err_msg);
slog_fatal("%s", err_msg);
return -1;
}
ret = recv(oob_sock, &addr_len, sizeof(addr_len), MSG_WAITALL);
peer_addr = (ucp_address *)malloc(addr_len);
ret = recv(oob_sock, peer_addr, addr_len, MSG_WAITALL);
close(oob_sock);
/* Send client UCX address to server */
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS |
UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
UCP_EP_PARAM_FIELD_ERR_HANDLER |
UCP_EP_PARAM_FIELD_USER_DATA;
ep_params.address = peer_addr;
ep_params.err_mode = UCP_ERR_HANDLING_MODE_PEER;
ep_params.err_handler.cb = err_cb_client;
ep_params.err_handler.arg = NULL;
ep_params.user_data = &ep_status;
status = ucp_ep_create(ucp_worker, &ep_params, &client_ep);
// status = ucp_worker_get_address(ucp_worker, &req_addr, &req_addr_len);
ucp_worker_attr_t worker_attr;
worker_attr.field_mask = UCP_WORKER_ATTR_FIELD_ADDRESS;
status = ucp_worker_query(ucp_worker, &worker_attr);
req_addr_len = worker_attr.address_length;
req_addr = worker_attr.address;
attr.field_mask = UCP_WORKER_ADDRESS_ATTR_FIELD_UID;
ucp_worker_address_query(req_addr, &attr);
slog_debug("[srv_worker_thread] Server UID %" PRIu64 ".", attr.worker_uid);
if (!args.id)
{ // Only performs by the data server with ID = 0.
// Formated HERCULES uri to be sent to the metadata server.
char formated_uri[REQUEST_SIZE];
sprintf(formated_uri, "%" PRIu32 " GET 0 %s", id, args.imss_uri);
slog_debug("Request - %s", formated_uri);
// Send the request.
if (send_req(ucp_worker, client_ep, req_addr, req_addr_len, formated_uri) == 0)
// Number of characters successfully read from the line.
int32_t n_chars;
int init_server_status = 1;
// int num_active_data_servers = 0;
metadata_endpoints = (ucp_ep_h *)malloc(args.num_metadata_servers * sizeof(ucp_ep_h));
for (int32_t i = 0; i < args.num_metadata_servers; i++)
{
// Allocate resources in the metadata structure so as to store the current HERCULES's IP.
// (my_imss.ips)[i] = (char *)calloc(LINE_LENGTH, sizeof(char));
size_t num_bytes_for_line = 0;
stat_add = NULL;
// Save HERCULES metadata deployment.
n_chars = getline(&stat_add, &num_bytes_for_line, metadata_nodes_fd);
// Erase the new line character ('') from the string.
if (stat_add[n_chars - 1] == '\n')
{
slog_error("HERCULES_ERR__SEND_REQ");
perror("HERCULES_ERR__SEND_REQ");
return -1;
stat_add[n_chars - 1] = '\0';
}
// Get the length of the message to be received.
size_t length = 0;
length = get_recv_data_length(ucp_worker, attr.worker_uid);
if (length == 0)
slog_debug("Establishing a connection with %s:%ld\n", stat_add, stat_port);
oob_sock = connect_common(stat_add, stat_port, AF_INET);
char request[REQUEST_SIZE];
sprintf(request, "%" PRIu32 " GET %s", id, "MAIN!QUERRY");
slog_debug("Request - %s", request);
if (send(oob_sock, request, REQUEST_SIZE, 0) < 0)
{
slog_error("HERCULES_ERR__GET_RECV_DATA_LENGTH");
perror("HERCULES_ERR__GET_RECV_DATA_LENGTH");
perror("HERCULES_ERR_STAT_HELLO");
slog_error("HERCULES_ERR_STAT_HELLO");
return -1;
}
// Receive the associated structure.
imss_info *imss_info_ = (imss_info *)malloc(sizeof(imss_info) * length);
ret = recv_dynamic_stream(ucp_worker, client_ep, imss_info_, IMSS_INFO, attr.worker_uid, length);
// If another data server has taken the URI, this HERCULES configuration should not be deployed.
// Two HERCULES configurations cannot have the same URI.
// We check if "recv_dynamic_stream" has successed, if so, there are another HERCULES instance using
// the same URI.
// On success, we free memory and stop this instance.
int new_id = 0;
if (ret != -1)
{ // success "recv_dynamic_stream".
// fprintf(stderr, "imss_info_.num_storages=%d, length=%lu\n", imss_info_->num_storages, length);
imss_exists = 1;
for (int32_t i = 0; i < imss_info_->num_storages; i++)
ret = recv(oob_sock, &addr_len, sizeof(addr_len), MSG_WAITALL);
slog_debug("Address len=%lu", addr_len);
peer_addr = (ucp_address *)malloc(addr_len);
ret = recv(oob_sock, peer_addr, addr_len, MSG_WAITALL);
slog_debug("Peer Address=%lu", peer_addr);
close(oob_sock);
free(stat_add);
/* Send client UCX address to server */
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS |
UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
UCP_EP_PARAM_FIELD_ERR_HANDLER |
UCP_EP_PARAM_FIELD_USER_DATA;
ep_params.address = peer_addr;
ep_params.err_mode = UCP_ERR_HANDLING_MODE_PEER;
ep_params.err_handler.cb = err_cb_client;
ep_params.err_handler.arg = NULL;
ep_params.user_data = &ep_status;
slog_debug("Creating endpoint with the metadata server %d", i);
status = ucp_ep_create(ucp_worker, &ep_params, &metadata_endpoints[i]);
slog_debug("Endpoint with the metadata %d created", i);
// status = ucp_worker_get_address(ucp_worker, &req_addr, &req_addr_len);
ucp_worker_attr_t worker_attr;
worker_attr.field_mask = UCP_WORKER_ATTR_FIELD_ADDRESS;
status = ucp_worker_query(ucp_worker, &worker_attr);
req_addr_len = worker_attr.address_length;
req_addr = worker_attr.address;
attr.field_mask = UCP_WORKER_ADDRESS_ATTR_FIELD_UID;
ucp_worker_address_query(req_addr, &attr);
slog_debug("[srv_worker_thread] Server UID %" PRIu64 ".", attr.worker_uid);
if (!args.id)
{ // Only performs by the data server with ID = 0.
// Formated HERCULES uri to be sent to the metadata server.
char formated_uri[REQUEST_SIZE];
sprintf(formated_uri, "%" PRIu32 " GET 0 %s", id, args.imss_uri);
slog_debug("Request - %s", formated_uri);
// Send the request.
if (send_req(ucp_worker, metadata_endpoints[i], req_addr, req_addr_len, formated_uri) == 0)
{
// fprintf(stderr,"ip[%d]=%s\n", i, imss_info_->ips[i]);
free(imss_info_->ips[i]);
new_id++;
slog_error("HERCULES_ERR__SEND_REQ");
perror("HERCULES_ERR__SEND_REQ");
return -1;
}
free(imss_info_->ips);
}
free(imss_info_);
if (args.id != new_id)
{
fprintf(stderr, "Data server with id = %d already in use, changing to %d\n", args.id, new_id);
// Set a new id to this server.
args.id = new_id;
/* code */
}
}
if (imss_exists)
{
// Here we need to stop all HERCULES data servers
// related to this configuration, or check if them
// are not running anymore to continue deploying this configuration.
perror("HERCULES_ERR_SERVER_URI_TAKEN");
slog_error("HERCULES_ERR_SERVER_URI_TAKEN, ret=%d", ret);
// ready(tmp_file_path, "ERROR");
// return 0;
}
// When LOCAL policy is used, the server creates a shared memory region.
if (!strcmp(POLICY, "LOCAL") || !strcmp(POLICY, "ZCOPY"))
{
// Get the shared memory key and tries to create the shared memory region (pool).
key_t key = getKeySM();
slog_info("Generated Key = %d\n", key);
// Get the length of the message to be received.
size_t length = 0;
length = get_recv_data_length(ucp_worker, attr.worker_uid);
if (length == 0)
{
slog_error("HERCULES_ERR__GET_RECV_DATA_LENGTH");
perror("HERCULES_ERR__GET_RECV_DATA_LENGTH");
return -1;
}
// Receive the associated structure.
imss_info *imss_info_ = (imss_info *)malloc(sizeof(imss_info) * length);
ret = recv_dynamic_stream(ucp_worker, metadata_endpoints[i], imss_info_, IMSS_INFO, attr.worker_uid, length);
// If another data server has taken the URI, this HERCULES configuration should not be deployed.
// Two HERCULES configurations cannot have the same URI.
// We check if "recv_dynamic_stream" has successed, if so, there are another HERCULES instance using
// the same URI.
// On success, we free memory and stop this instance.
int new_id = 0;
if (ret != -1)
{ // success "recv_dynamic_stream".
// fprintf(stderr, "imss_info_.num_storages=%d, length=%lu\n", imss_info_->num_storages, length);
imss_exists = 1;
for (int32_t i = 0; i < imss_info_->num_storages; i++)
{
// fprintf(stderr,"ip[%d]=%s\n", i, imss_info_->ips[i]);
free(imss_info_->ips[i]);
new_id++;
}
free(imss_info_->ips);
}
free(imss_info_);
if (args.id != new_id)
{
fprintf(stderr, "Data server with id = %d already in use, changing to %d\n", args.id, new_id);
// Set a new id to this server.
args.id = new_id;
/* code */
}
}
shm_data_id = getIdentifierSM(key, SHM_SIZE);
if (shm_data_id == -1)
if (imss_exists)
{
perror("ERR_HERCULES_GET_SM_IDENTIFIER");
// Do not stop the process.
// Here we need to stop all HERCULES data servers
// related to this configuration, or check if them
// are not running anymore to continue deploying this configuration.
perror("HERCULES_ERR_SERVER_URI_TAKEN");
slog_error("HERCULES_ERR_SERVER_URI_TAKEN, ret=%d", ret);
// ready(tmp_file_path, "ERROR");
// return 0;
}
else
// When LOCAL policy is used, the server creates a shared memory region.
if (!strcmp(POLICY, "LOCAL") || !strcmp(POLICY, "ZCOPY"))
{
void *pool_memory = createSM(shm_data_id);
if (pool_memory == NULL)
{ // error creating the shared memory region.
perror("HERCULES_ERR_CREATE_SM");
slog_error("HERCULES_ERR_CREATE_SM");
ready(tmp_file_path, "ERROR");
exit(0);
// Get the shared memory key and tries to create the shared memory region (pool).
key_t key = getKeySM();
slog_info("Generated Key = %d\n", key);
shm_data_id = getIdentifierSM(key, SHM_SIZE);
if (shm_data_id == -1)
{
perror("ERR_HERCULES_GET_SM_IDENTIFIER");
// Do not stop the process.
}
else
{
// Shared memory has been created.
args.pool_memory = pool_memory;
// Becasue the shared memory was successfully created, we
// initializate a semaphore to sincronize block 0.
sem_shared_memory = sem_open("/hercules_shm_sem", O_CREAT, 0644, 1);
if (sem_shared_memory == SEM_FAILED)
void *pool_memory = createSM(shm_data_id);
if (pool_memory == NULL)
{ // error creating the shared memory region.
perror("HERCULES_ERR_CREATE_SM");
slog_error("HERCULES_ERR_CREATE_SM");
ready(tmp_file_path, "ERROR");
exit(0);
}
else
{
perror("HERCULES_ERR_SHM_SEM_OPEN");
exit(-1);
// Shared memory has been created.
args.pool_memory = pool_memory;
// Becasue the shared memory was successfully created, we
// initializate a semaphore to sincronize block 0.
sem_shared_memory = sem_open("/hercules_shm_sem", O_CREAT, 0644, 1);
if (sem_shared_memory == SEM_FAILED)
{
perror("HERCULES_ERR_SHM_SEM_OPEN");
exit(-1);
}
// Close the semaphore. The semaphore will remain and can
// be used by the front-end until unlink is called.
sem_close(sem_shared_memory);
}
// Close the semaphore. The semaphore will remain and can
// be used by the front-end until unlink is called.
sem_close(sem_shared_memory);
}
}
}
// Close the file.
if (fclose(metadata_nodes_fd) != 0)
{
perror("HERCULES_ERR_DEPLOYFILE_CLOSE");
slog_fatal("HERCULES_ERR_DEPLOYFILE_CLOSE");
return -1;
}
}
// Metadata server.
else
{
// Create the tree_root node.
char *root_data = (char *)calloc(8, sizeof(char));
strcpy(root_data, "imss://");
// strcpy(root_data, "imss://");
strcpy(root_data, args.imss_uri);
tree_root = g_node_new((void *)root_data);
if (pthread_mutex_init(&tree_mut, NULL) != 0)
{
perror("HERCULES_ERR_TREE_MUT_INIT");
pthread_exit(NULL);
exit(1);
}
}
......@@ -705,15 +562,14 @@ int32_t main(int32_t argc, char **argv)
}
// Buffer segment size assigned to each thread.
buffer_segment = data_reserved / args.thread_pool;
buffer_segment = data_reserved / hercules_thread_pool_size;
slog_info("buffer_segment=%ld", buffer_segment);
// Initialize pool of threads.
// pthread_t threads[(args.thread_pool + 1)];
int extra_threads = 0, total_threads = 0;
if (args.type == TYPE_DATA_SERVER)
{
region_locks = (pthread_mutex_t *)calloc(args.thread_pool, sizeof(pthread_mutex_t));
region_locks = (pthread_mutex_t *)calloc(hercules_thread_pool_size, sizeof(pthread_mutex_t));
extra_threads = 2;
}
else
......@@ -721,14 +577,14 @@ int32_t main(int32_t argc, char **argv)
extra_threads = 1;
}
total_threads = args.thread_pool + extra_threads;
total_threads = hercules_thread_pool_size + extra_threads;
threads = (pthread_t *)malloc(total_threads * sizeof(pthread_t));
// Thread arguments.
p_argv arguments[total_threads];
ucp_worker_threads = (ucp_worker_h *)malloc(args.thread_pool * sizeof(ucp_worker_h));
local_addr = (ucp_address_t **)malloc(args.thread_pool * sizeof(ucp_address_t *));
local_addr_len = (size_t *)malloc(args.thread_pool * sizeof(size_t));
ucp_worker_threads = (ucp_worker_h *)malloc(hercules_thread_pool_size * sizeof(ucp_worker_h));
local_addr = (ucp_address_t **)malloc(hercules_thread_pool_size * sizeof(ucp_address_t *));
local_addr_len = (size_t *)malloc(hercules_thread_pool_size * sizeof(size_t));
// Execute all threads.
int32_t aux_idx = 0;
......@@ -740,6 +596,7 @@ int32_t main(int32_t argc, char **argv)
arguments[i].storage_size = max_storage_size;
arguments[i].port = bind_port;
arguments[i].tmp_file_path = tmp_file_path;
arguments[i].hercules_thread_pool_size = hercules_thread_pool_size;
// Add the instance URI to the thread arguments.
strcpy(arguments[i].my_uri, args.imss_uri);
......@@ -765,8 +622,8 @@ int32_t main(int32_t argc, char **argv)
slog_debug("[SERVER] Creating checkpoint thread.");
// Add the reference to the map into the set of thread arguments.
arguments[i].map = map;
// if (pthread_create(&threads[i], NULL, checkpoint, (void *)g_map.get()) == -1)
if (pthread_create(&threads[i], NULL, Checkpoint, (void *)&arguments[i]) == -1)
// if (pthread_create(&threads[i], NULL, Checkpoint, (void *)&arguments[i]) == -1)
if (pthread_create(&threads[i], NULL, Snapshot, (void *)&arguments[i]) == -1)
{
// Notify thread error deployment.
ready(tmp_file_path, "ERROR");
......@@ -792,7 +649,8 @@ int32_t main(int32_t argc, char **argv)
ucp_worker_attr_t worker_attr;
worker_attr.field_mask = UCP_WORKER_ATTR_FIELD_ADDRESS;
status = ucp_worker_query(ucp_worker_threads[aux_idx], &worker_attr);
local_addr_len[aux_idx] = worker_attr.address_length;
slog_debug("Setting address=%lu (len=%lu) to local_addr at %d", worker_attr.address, worker_attr.address_length, aux_idx)
local_addr_len[aux_idx] = worker_attr.address_length;
local_addr[aux_idx] = worker_attr.address;
// Add the reference to the map into the set of thread arguments.
......@@ -800,6 +658,7 @@ int32_t main(int32_t argc, char **argv)
// arguments[i].secondary_map = secondary_map;
// Specify the address used by each thread to write inside the buffer.
arguments[i].pt = (char *)(aux_idx * buffer_segment + buffer_address);
arguments[i].thread_id = aux_idx;
// HERCULES data server.
if (args.type == TYPE_DATA_SERVER)
......@@ -900,22 +759,27 @@ int32_t main(int32_t argc, char **argv)
// Send the created structure to the metadata server.
sprintf(key_plus_size, "%" PRIu32 " SET %lu %s", id, (sizeof(imss_info) + my_imss.num_storages * LINE_LENGTH + my_imss.num_storages * sizeof(int) + my_imss.num_storages * sizeof(int)), my_imss.uri_);
slog_debug("[main] Request - %s", key_plus_size);
if (send_req(ucp_worker, client_ep, req_addr, req_addr_len, key_plus_size) == 0)
for (size_t j = 0; j < args.num_metadata_servers; j++)
{
perror("HERCULES_ERR_SEND_REQ_SET_STR");
slog_fatal("HERCULES_ERR_SEND_REQ_SET_STR");
return -1;
}
slog_debug("[SERVER] Creating IMSS_INFO at metadata server. ");
// Send the new HERCULES metadata structure to the metadata server entity.
if (send_dynamic_stream(ucp_worker, client_ep, (char *)&my_imss, IMSS_INFO, attr.worker_uid) == -1)
return -1;
if (send_req(ucp_worker, metadata_endpoints[j], req_addr, req_addr_len, key_plus_size) == 0)
{
perror("HERCULES_ERR_SEND_REQ_SET_STR");
slog_fatal("HERCULES_ERR_SEND_REQ_SET_STR");
return -1;
}
slog_debug("[SERVER] Creating IMSS_INFO at metadata server. ");
// Send the new HERCULES metadata structure to the metadata server entity.
if (send_dynamic_stream(ucp_worker, metadata_endpoints[j], (char *)&my_imss, IMSS_INFO, attr.worker_uid) == -1)
{
return -1;
}
}
for (int32_t i = 0; i < num_servers; i++)
free(my_imss.ips[i]);
free(my_imss.ips);
// ucp_ep_close_nb(client_ep, UCP_EP_CLOSE_MODE_FORCE);
}
if (args.type == TYPE_DATA_SERVER)
......@@ -935,7 +799,7 @@ int32_t main(int32_t argc, char **argv)
return -1;
}
sleep(10);
sleep(3);
int num_active_storages = 0;
while (true)
{
......@@ -947,11 +811,14 @@ int32_t main(int32_t argc, char **argv)
// printf("Error creating HERCULES's resources, the process cannot be started. Please, make sure servers are running and clients can establish connections.\n");
// return -1;
sleep(3);
continue;
}
break;
}
}
ret = ready(tmp_file_path, "OK");
fprintf(stderr, "%c-server %d is ready = %d\n", args.type, args.id, ret);
// Wait for threads to finish.
for (int32_t i = 0; i < total_threads; i++)
{
......@@ -959,8 +826,6 @@ int32_t main(int32_t argc, char **argv)
t = clock() - t;
time_taken = ((double)t) / (CLOCKS_PER_SEC);
ready(tmp_file_path, "OK");
fprintf(stderr, "Server %d is ready\n", args.id);
if (pthread_join(threads[i], NULL) != 0)
{
perror("HERCULES_ERR_SERVER_THREAD_JOIN");
......@@ -1002,11 +867,11 @@ int32_t main(int32_t argc, char **argv)
// Close publisher socket.
// ep_close(ucp_worker, pub_ep, UCP_EP_CLOSE_MODE_FORCE);
// ep_close(ucp_worker, client_ep, UCP_EP_CLOSE_MODE_FORCE);
// ep_close(ucp_worker, metadata_endpoints, UCP_EP_CLOSE_MODE_FORCE);
// ucp_cleanup(ucp_context);
sprintf(tmp_file_path, "%s/tmp/%c-hercules-%d-stop", args.hercules_path, args.type, args.id);
ready(tmp_file_path, "OK");
// sprintf(tmp_file_path, "%s/tmp/%c-hercules-%d-stop", args.hercules_path, args.type, args.id);
// ready(tmp_file_path, "OK");
// Free the publisher release address.
fprintf(stderr, "Ending %c server\n", args.type);
......@@ -1015,3 +880,241 @@ int32_t main(int32_t argc, char **argv)
free(buffer);
return 0;
}
int stop_server()
{
// Get the current number of active nodes.
number_active_storage_servers = get_number_of_active_nodes(args.hercules_path);
if (number_active_storage_servers < 0)
{
return -1;
}
// Tell metadata server to reduce number of servers.
char key_plus_size[REQUEST_SIZE];
// Send the created structure to the metadata server.
// last "0" is the server status to be set.
sprintf(key_plus_size, "%d SET %lu %s %d", args.id, number_active_storage_servers, args.imss_uri, 0);
slog_debug("[main] Request - %s", key_plus_size);
// TODO: locate the metadata server.
if (send_req(ucp_worker, metadata_endpoints[0], req_addr, req_addr_len, key_plus_size) == 0)
{
perror("HERCULES_ERR_STOP_SERVER_SEND_REQ");
return -1;
}
return 0;
}
int move_blocks_2_server(uint64_t stat_port, uint32_t server_id, char *imss_uri, std::shared_ptr<map_records> map)
{
// Creates endpoints to all data servers. It is use in case of
// malleability to move blocks between data servers.
slog_debug("Connecting to data servers\n");
open_imss(imss_uri); // TODO: Check if this is still necessary due we called it on the main function.
if (number_active_storage_servers < 0)
{
slog_fatal("Error creating HERCULES's resources, the process cannot be started");
return -1;
}
// Here data server should to move the datablocks.
// print all key/value elements.
double time_taken;
time_t t = clock();
void *address_;
uint64_t block_size;
int curr_map_size = 0;
const char *uri_;
size_t size;
char key_[REQUEST_SIZE];
// Get the number of blocks stored by this data server.
int number_of_blocks_2_move = map->size();
slog_info("Server %d, has %d blocks, active storage servers=%lu", args.id, map->size(), number_active_storage_servers);
while ((curr_map_size = map->size()) > 0 && number_active_storage_servers > 0)
{
std::string key;
// get next key (block identifier) with the format <block_name>$<block_number>
// for example "myfile$199", where block_name = myfile, and block_number = 199.
key = map->get_head_element();
// get the element data and store it in "address_".
map->get(key, &address_, &block_size);
// fprintf(stderr, "**** curr_map_size=%d, head element=%s, block_size=%ld\n", curr_map_size, key.c_str(), block_size);
slog_debug("**** curr_map_size=%d, head element=%s, block_size=%ld\n", curr_map_size, key.c_str(), block_size);
int pos = key.find('$') + 1; // +1 to skip '$' on the block number.
std::string block = key.substr(pos, key.length() + 1); // substract the block number from the key.
int block_number = stoi(block, 0, 10); // string to number.
pos -= 1; // -1 to skip '$' on the data uri.
std::string data_uri = key.substr(0, pos); // substract the data uri from the key.
slog_debug("key='%s',\turi='%s',\tblock='%s'\n", key.c_str(), data_uri.c_str(), block.c_str());
int next_server = find_server(number_active_storage_servers, block_number, data_uri.c_str(), 0, args.type, curr_imss.info.session_plcy); // TODO: check for the current data policy in the dataset, not in the imss configuration.
slog_info("key='%s',\turi='%s%s',\tfrom server %d to server %d,\tactive servers=%lu\n", key.c_str(), data_uri.c_str(), block.c_str(), server_id, next_server, number_active_storage_servers);
slog_debug("new server=%d, curr_server=%d\n", next_server, server_id);
// here we can send key.c_str() directly to reduce the number of operations.
if (set_data_server(data_uri.c_str(), block_number, address_, block_size, 0, next_server) < 0)
{
slog_error("ERR_HERCULES_SET_DATA_IN_SERVER\n");
perror("ERR_HERCULES_SET_DATA_IN_SERVER");
return -1;
}
// delete the element from the map.
map->erase_head_element();
// get new map size to print it.
curr_map_size = map->size();
// fprintf(stderr, "**** curr_map_size=%d\n", curr_map_size);
slog_debug("**** curr_map_size=%d\n", curr_map_size);
}
t = clock() - t;
time_taken = ((double)t) / (CLOCKS_PER_SEC);
if (number_active_storage_servers > 0)
{
// fprintf(stderr, "[HS] Data movement %d blocks %lu %f sec.\n", number_of_blocks_2_move, number_active_storage_servers, time_taken);
fprintf(stderr, "\033[0;34m [HS] Server %d has moved %d blocks to %lu servers in %f sec. \033[0m\n", args.id, number_of_blocks_2_move, number_active_storage_servers, time_taken);
}
return 0;
}
void handle_signal_server(int signal)
{
if (signal == SIGUSR1) // suspend or shutdown this server.
{
slog_info("SIGUSR1 received");
int pkill_operation = 0, ret = 0;
char buf[10] = {0}, action[20], temporal_path[PATH_MAX];
char tmp_file_path[PATH_MAX];
sprintf(temporal_path, "%s/tmp/hercules_pkill_operation", args.hercules_path);
// fprintf(stderr,"Temporal path: %s\n", temporal_path);
// Get the operation number.
int fd = open(temporal_path, O_RDONLY);
if (fd == -1)
{
char err_msg[MAX_ERR_MSG_LEN];
sprintf(err_msg, "ERR_HERCULES_OPEN_PKILL_OPERATION:%s", temporal_path);
perror(err_msg);
return;
}
ret = read(fd, buf, sizeof(buf) - 1);
buf[ret] = '\0';
// In case of read error, pkill_operation must be 0
// to suspend the server but not shutdown it.
if (ret == -1)
{
pkill_operation = 0;
perror("HERCULES_ERR_READ_PKILL_OPERATION");
}
else
{
pkill_operation = atoi(buf);
}
ret = close(fd);
if (fd == -1)
{
perror("ERR_HERCULES_CLOSE_PKILL_OPERATION");
}
slog_info("pkill_operation = %d", pkill_operation);
// fprintf(stderr, "pkill_operation = %d\n", pkill_operation);
switch (pkill_operation)
{
case 1: // finish data server processes (shutdown).
// "global_finish_threads" is a gloabl variable readed by the
// dispatcher and workers threads. 1 indicates those threads
// must finish their execution.
sprintf(action, "stop");
// if (args.type == TYPE_METADATA_SERVER || global_finish_checkpoint == 1)
if (args.type == TYPE_METADATA_SERVER)
{
global_finish_threads = 1;
}
else
{
global_finish_checkpoint = 1;
global_finish_snapshot = 1;
pthread_cond_signal(&global_run_snapshot_cond);
pthread_cond_signal(&global_run_checkpoint_cond);
pthread_mutex_lock(&global_finish_mut);
pthread_cond_wait(&global_finish_cond, &global_finish_mut);
fprintf(stderr, "Waiting for snapshot and checkpointing in server %d\n", args.id);
// This file is readed by the hercules script to know if this server
// was correctly shutting down.
sprintf(tmp_file_path, "%s/tmp/%c-hercules-%d-%s", args.hercules_path, args.type, args.id, action);
ready(tmp_file_path, "LOCKED");
pthread_mutex_unlock(&global_finish_mut);
fprintf(stderr, "Server %d has been unlocked\n", args.id);
}
// Shutdown or close the socket used by the dispatcher pointed
// by the file descriptor "global_server_fd_thread".
if (shutdown(global_server_fd_thread, SHUT_RD) == -1)
{
perror("ERR_HERCULES_SHUTDOWN_SERVER_FD\n");
}
break;
default: // suspend the data server.
sprintf(action, "remove");
// Data servers processes will still running to be reused on
// the future. On shrink process, this server won't be used,
// but backend processes will be still running.
break;
}
// Data servers performs malleability operations if it is enabled.
if (args.type == TYPE_DATA_SERVER && args.malleability == 1)
{
ret = stop_server();
if (ret == 0) // success.
{
ret = move_blocks_2_server(args.stat_port, args.id, args.imss_uri, g_map);
if (ret < 0) // error.
{
// TODO: if "move_blocks_2_server" fails, try again?
}
}
}
// This file is readed by the hercules script to know if this server
// was correctly shutting down.
sprintf(tmp_file_path, "%s/tmp/%c-hercules-%d-%s", args.hercules_path, args.type, args.id, action);
ready(tmp_file_path, "OK");
}
if (signal == SIGUSR2) // wake up this server.
{
slog_info("SIGUSR2 received");
if (args.type == TYPE_DATA_SERVER) // only data servers.
{
fprintf(stderr, " \033[0;32m Waking up server %d \033[0m\n", args.id);
// Changes the number of active servers in the metadata server
// and the status of this server.
wakeup_server();
// This file is readed by the hercules script to know if this server
// was correctly waking up.
char tmp_file_path[PATH_MAX];
sprintf(tmp_file_path, "%s/tmp/%c-hercules-%d-up", args.hercules_path, args.type, args.id);
fprintf(stderr, "Writting file %s\n", tmp_file_path);
ready(tmp_file_path, "OK");
}
}
}
void print_usage(const char *msg)
{
fprintf(stderr, "%s\n usage for METADATA server: hercules_server m <server_id>\n usage for DATA server: hercules_server d <server_id> <metadata_host> <initial_number_of_data_servers> \n", msg);
}
#!/bin/bash
SCRIPT_NAME="ior_beegfs_slurm.sh"
#FILE_SIZE=$((1024*1024*1))
FILE_SIZE=$((1024*1024*10))
FILE_SIZE=$((1024*1024*1))
#FILE_SIZE=$((1024*1024*10))
# 0 = Shared single file, 1 = File per process.
IOR_FILE_PER_PROCESS=1
......
#!/bin/bash
SCRIPT_NAME="ior_lustre_slurm.sh"
#FILE_SIZE=$((1024*1024*1))
FILE_SIZE=$((1024*1024*100))
FILE_SIZE=$((1024*1024*1))
#FILE_SIZE=$((1024*1024*100))
# 0 = Shared single file, 1 = File per process.
IOR_FILE_PER_PROCESS=1
......@@ -10,8 +10,8 @@ IOR_FILE_PER_PROCESS=1
IOR_AVOID_CACHE=1
#NODES_FOR_CLIENTS_RANGE=( 1 2 4 8 16 32 )
NODES_FOR_CLIENTS_RANGE=( 16 )
CLIENTS_PER_NODE_RANGE=( 16 )
NODES_FOR_CLIENTS_RANGE=( 1 )
CLIENTS_PER_NODE_RANGE=( 1 )
BLOCK_SIZE_RANGE=( 512 )
TEST_TYPE="strong"
......
......@@ -30,7 +30,7 @@ TRANSFER_SIZE=$FILE_SIZE_PER_CLIENT
# COMMAND="$IOR_PATH/ior -w -r -k -e -t ${TRANSFER_SIZE}kb -b ${FILE_SIZE_PER_CLIENT}kb -s 1 -i 5 -o /beegfs/home/javier.garciablas/hercules/bash/ior_output/data.txt"
# fi
COMMAND="$IOR_PATH/ior -w -r -W -R -k -t ${TRANSFER_SIZE}kb -b ${FILE_SIZE_PER_CLIENT}kb -s 1 -i 5"
COMMAND="$IOR_PATH/ior -w -r -W -R -k -t ${TRANSFER_SIZE}kb -b ${FILE_SIZE_PER_CLIENT}kb -s 1 -i 3"
if [ "$IOR_FILE_PER_PROCESS" -eq 1 ]; then
## File-per-process with write and read verification.
......@@ -47,8 +47,8 @@ COMMAND="$COMMAND -C -e"
fi
## Add the output file path.
#COMMAND="$COMMAND -o /beegfs/home/javier.garciablas/hercules/bash/ior_output/data.txt"
COMMAND="$COMMAND -o /tmp/data.txt"
COMMAND="$COMMAND -o /beegfs/home/javier.garciablas/hercules/bash/ior_output/data.txt"
#COMMAND="$COMMAND -o /tmp/data.txt"
set -x
......@@ -56,5 +56,5 @@ mpiexec -np $NUMBER_OF_PROCESS -ppn $PROCESS_PER_NODE $COMMAND
set +x
#rm ./ior_output/*
rm /tmp/data.txt
rm ./ior_output/*
#rm /tmp/data.txt
......@@ -62,7 +62,6 @@ extern int32_t prefetch_offset;
extern pthread_cond_t cond_prefetch;
extern pthread_mutex_t mutex_prefetch;
// char fd_table[1024][MAX_PATH];
#define MAX_PATH 256
extern pthread_mutex_t lock;
......@@ -83,8 +82,6 @@ int32_t LOWER_BOUND_SERVERS;
// const char *TESTX = "imss://wfc1.dat$1";
// const char *TESTX = "p4x2.save/wfc1.dat";
// extern char *aux_refresh;
// extern char *imss_path_refresh;
/*
(*) Mapping for REPL_FACTOR values:
......@@ -174,17 +171,17 @@ void fd_lookup(const char *path, int *fd, struct stat *s, char **aux)
{
pthread_mutex_lock(&lock_file);
*fd = -1;
// TODO: check for directories ending with "/"
int found = map_search(map, path, fd, s, aux);
if (found == -1)
{
// fprintf(stderr, "File not found, %s\n", path);
slog_warn("[imss_posix_api] file not found, %s", path);
slog_debug("[imss_posix_api] path=%s, found=%d, fd=%d", path, found, *fd);
slog_warn("file not found, %s", path);
slog_debug("path=%s, found=%d, fd=%d", path, found, *fd);
*fd = -1;
}
else
{
slog_debug("[imss_posix_api] path=%s, found=%d, fd=%d, stat.st_nlink=%lu", path, found, *fd, s->st_nlink);
slog_debug("path=%s, found=%d, fd=%d, stat.st_nlink=%lu", path, found, *fd, s->st_nlink);
}
pthread_mutex_unlock(&lock_file);
......@@ -237,7 +234,7 @@ int imss_refresh(const char *path)
int fd = -1;
void *aux = NULL;
const char *imss_path = path; // this pointer should not be free.
// Lookup the current file on the local front-end map.
fd_lookup(imss_path, &fd, &old_stats, (char **)&aux);
if (fd >= 0)
{
......@@ -245,7 +242,7 @@ int imss_refresh(const char *path)
}
else
{
slog_warn("[imss_refresh] %s", strerror(ENOENT));
slog_warn("%s", strerror(ENOENT));
return -ENOENT;
}
......@@ -296,15 +293,14 @@ int imss_getattr(const char *path, struct stat *stbuf)
switch (type)
{
case 0:
// free(imss_path);
// erase dataset from the local maps.
pthread_mutex_lock(&lock_file);
map_erase(map, imss_path);
pthread_mutex_unlock(&lock_file);
// map_release_prefetch(map_prefetch, path);
slog_debug("[imss_posix_api] Calling release prefetch path = %s", imss_path);
slog_debug("Calling release prefetch path = %s", imss_path);
map_release_prefetch(map_prefetch, imss_path);
slog_debug("[imss_posix_api] Ending release prefetch path = %s", imss_path);
slog_debug("Ending release prefetch path = %s", imss_path);
return -ENOENT;
case 1: // Directory case?
if ((n_ent = get_dir((char *)imss_path, &buffer, &refs)) != -1)
......@@ -415,7 +411,6 @@ int imss_getattr(const char *path, struct stat *stbuf)
int imss_readdir(const char *path, void *buf, posix_fill_dir_t filler, off_t offset)
{
// fprintf(stderr,"imss_readdir=%s\n",path);
// Needed variables for the call
char *buffer;
char **refs;
......@@ -432,48 +427,46 @@ int imss_readdir(const char *path, void *buf, posix_fill_dir_t filler, off_t off
// strcat(imss_path, "/");
// }
slog_debug("[IMSS][imss_readdir] imss_path=%s", imss_path);
slog_debug("[IMSS] imss_path=%s", imss_path);
// Call IMSS to get metadata
n_ent = get_dir((char *)imss_path, &buffer, &refs);
slog_debug("[IMSS][imss_readdir] imss_path=%s, n_ent=%d", imss_path, n_ent);
slog_debug("[IMSS] imss_path=%s, n_ent=%d", imss_path, n_ent);
if (n_ent < 0)
{
strcat(imss_path, "/");
slog_debug("[IMSS][imss_readdir] imss_path=%s", imss_path);
slog_debug("[IMSS] imss_path=%s", imss_path);
// fprintf(stderr,"try again imss_path=%s\n",imss_path);
n_ent = get_dir((char *)imss_path, &buffer, &refs);
if (n_ent < 0)
{
fprintf(stderr, "[IMSS-FUSE] Error retrieving directories for URI=%s", path);
fprintf(stderr, "[IMSS-FUSE] Error retrieving directories for URI=%s\n", path);
return -ENOENT;
}
}
slog_debug("[IMSS][imss_readdir] Before flush data");
// slog_debug("[IMSS] Before flush data");
// flush_data();
// Fill buffer
// TODO: Check if subdirectory
// printf("[FUSE] imss_readdir %s has=%d\n",path, n_ent);
slog_debug("[IMSS][imss_readdir] imss_readdir %s has=%d", path, n_ent);
slog_debug("[IMSS] imss_readdir %s has=%d", path, n_ent);
for (int i = 0; i < n_ent; ++i)
{
if (i == 0)
{
// slog_debug("[IMSS][imss_readdir] . y ..");
// slog_debug("[IMSS]. y ..");
filler(buf, "..", NULL, 0);
filler(buf, ".", NULL, 0);
}
else
{
// slog_debug("[IMSS][imss_readdir] %s", refs[i]);
// slog_debug("[IMSS]%s", refs[i]);
// the stbuf is not used after here.
// struct stat stbuf;
// int error = imss_getattr(refs[i] + 6, &stbuf);
// if (!error)
{
char *last = refs[i] + strlen(refs[i]) - 1;
// slog_info("last=%s", last);
slog_info("last=%s of %s", last, refs[i]);
if (last[0] == '/')
{
last[0] = '\0';
......@@ -488,7 +481,7 @@ int imss_readdir(const char *path, void *buf, posix_fill_dir_t filler, off_t off
}
// filler(buf, refs[i] + offset + 1, &stbuf, 0); // original
// slog_info("refs[i] + offset + 1=%s", refs[i] + offset + 1);
slog_info("refs[i] + offset + 1=%s", refs[i] + offset + 1);
filler(buf, refs[i] + offset + 1, NULL, 0);
// filler(buf, refs[i], NULL, 0);
}
......@@ -529,14 +522,14 @@ int imss_open(char *path, uint64_t *fh)
char *aux = NULL;
// Look for the 'file descriptor' of 'imss_path' in the local map.
fd_lookup(imss_path, &fd, &stats, &aux);
slog_info("[FUSE][imss_posix_api] imss_path=%s, fd looked up=%d", imss_path, fd);
// fprintf(stderr, "[FUSE][imss_posix_api] imss_path=%s, fd looked up=%d\n", imss_path, fd);
slog_info("[FUSE]imss_path=%s, fd looked up=%d", imss_path, fd);
// fprintf(stderr, "[FUSE]imss_path=%s, fd looked up=%d\n", imss_path, fd);
if (fd >= 0)
{
print_file_type(stats, imss_path);
ret = open_local_dataset(imss_path, 1);
file_desc = fd;
slog_debug("[FUSE][imss_posix_api] open_local_dataset, ret=%d, file_desc=%d, nlink=%lu", ret, file_desc, stats.st_nlink);
slog_debug("[FUSE]open_local_dataset, ret=%d, file_desc=%d, nlink=%lu", ret, file_desc, stats.st_nlink);
}
else if (fd == -2)
{
......@@ -554,7 +547,7 @@ int imss_open(char *path, uint64_t *fh)
}
case -2: // symbolic link case.
{
slog_warn("[FUSE][imss_posix_api] imss_path=%s, file_desc=%d", imss_path, file_desc);
slog_warn("[FUSE]imss_path=%s, file_desc=%d", imss_path, file_desc);
*fh = file_desc;
// errno = 2;
// path = imss_path;
......@@ -599,7 +592,7 @@ int imss_open(char *path, uint64_t *fh)
// File does not exist
if (file_desc < 0)
{
slog_error("[FUSE][imss_posix_api] file descriptor: %d", file_desc);
slog_error("[FUSE]file descriptor: %d", file_desc);
return -1;
}
......@@ -1525,7 +1518,7 @@ ssize_t imss_write(const char *path, const void *buf, size_t size, off_t off)
}
bytes_stored += bytes_to_copy;
data_pointer = (char *) data_pointer + bytes_to_copy;
data_pointer = (char *)data_pointer + bytes_to_copy;
block_offset = 0; // first block has been stored, next blocks don't have an offset
++curr_blk;
}
......@@ -1790,7 +1783,6 @@ int imss_split_readv(const char *path, char *buf, size_t size, off_t offset)
char *number = (char *)calloc(64, sizeof(char));
for (int server = 0; server < N_SERVERS; server++)
{
/*char all_blocks[1024];*/
memset(all_blocks, '\0', lenght_message);
count = 0;
for (int i = 0; i < total; i++)
......@@ -1998,7 +1990,6 @@ int imss_release(const char *path)
else
return -ENOENT;
// char head[IMSS_DATA_BSIZE];
char *head = (char *)malloc(IMSS_DATA_BSIZE);
// Get time
......@@ -2035,8 +2026,6 @@ int imss_release(const char *path)
*/
int imss_close(const char *path, int fd)
{
// clock_t t;
// t = clock();
int ret = 0;
int ds = 0;
slog_debug("Calling imss_flush_data");
......@@ -2046,23 +2035,22 @@ int imss_close(const char *path, int fd)
slog_debug("Ending imss_release, ret=%d", ds);
ret = close_dataset(path, fd);
slog_debug("Ending close_dataset, ret=%d", ret);
// imss_refresh is too slow.
// When we remove it pass from 3.45 sec to 0.008505 sec.
if (ret)
{ // if the file was not deleted by the close we update the stat.
// imss_refresh is too slow.
// When we remove it pass from 3.45 sec to 0.008505 sec.
imss_refresh(path);
slog_debug("Ending imss_refresh");
}
clear_dataset(path);
// } else {
// delete_dataset(path, ds);
// Tell data server the file is ready to be copied to disk.
else
{ // if the file was deleted by the close, we delete registers from the
// local tree and local map.
clear_dataset(path);
map_erase(map, path);
}
map_erase(map, path);
// TODO: Tell data server the file is ready to be copied to disk.
// t = clock() - t;
// double time_taken = ((double)t) / CLOCKS_PER_SEC; // in seconds
return ret;
}
......@@ -2078,11 +2066,11 @@ int imss_create(const char *path, mode_t mode, uint64_t *fh, int opened)
// Assing file handler and create dataset.
int res = 0;
res = create_dataset((char *)rpath, POLICY, N_BLKS, IMSS_BLKSIZE, REPL_FACTOR, REPL_TYPE, N_SERVERS, NO_LINK, opened);
slog_debug("[imss_create] create_dataset((char*)rpath:%s, POLICY:%s, N_BLKS:%ld, IMSS_BLKSIZE:%d, REPL_FACTOR:%ld, REPL_TYPE:%ld, N_SERVERS:%d), res:%d", (char *)rpath, POLICY, N_BLKS, IMSS_BLKSIZE, REPL_FACTOR, REPL_TYPE, N_SERVERS, res);
slog_debug("create_dataset((char*)rpath:%s, POLICY:%s, N_BLKS:%ld, IMSS_BLKSIZE:%d, REPL_FACTOR:%ld, REPL_TYPE:%ld, N_SERVERS:%d), res:%d", (char *)rpath, POLICY, N_BLKS, IMSS_BLKSIZE, REPL_FACTOR, REPL_TYPE, N_SERVERS, res);
if (res < 0)
{
slog_error("[imss_create] Cannot create new dataset.\n");
// fprintf(stderr, "[imss_create] Cannot create new dataset %s, may already exist.\n", path);
slog_error("Cannot create new dataset.\n");
// fprintf(stderr, "Cannot create new dataset %s, may already exist.\n", path);
// free(rpath);
return res;
}
......@@ -2125,19 +2113,19 @@ int imss_create(const char *path, mode_t mode, uint64_t *fh, int opened)
pthread_mutex_unlock(&lock); // unlock.
map_erase(map, rpath);
slog_debug("[imss_create] map_erase(map, rpath:%s)", rpath);
slog_debug("map_erase(map, rpath:%s)", rpath);
// if(ret < 1){
// slog_debug("No elements erased by map_erase, ret:%d", ret);
// }
pthread_mutex_lock(&lock_file); // lock.
map_put(map, rpath, *fh, ds_stat, (char *)buff);
slog_debug("[imss_create] map_put(map, rpath:%s, fh:%ld, ds_stat.st_blksize=%ld)", rpath, *fh, ds_stat.st_blksize);
slog_debug("map_put(map, rpath:%s, fh:%ld, ds_stat.st_blksize=%ld)", rpath, *fh, ds_stat.st_blksize);
if (PREFETCH != 0)
{
char *buff = (char *)malloc(PREFETCH * IMSS_BLKSIZE * KB);
map_init_prefetch(map_prefetch, rpath, buff);
slog_debug("[imss_create] PREFETCH:%ld, map_init_prefetch(map_prefetch, rpath:%s)", PREFETCH, rpath);
slog_debug("PREFETCH:%ld, map_init_prefetch(map_prefetch, rpath:%s)", PREFETCH, rpath);
}
pthread_mutex_unlock(&lock_file); // unlock.
// free(rpath);
......@@ -2173,8 +2161,7 @@ int imss_rmdir(const char *path)
char *buffer;
char **refs;
int n_ent = 0;
const char *imss_path = path; // This pointer should not be free.
const char *imss_path = path; // This pointer should not be free.
if ((n_ent = get_dir((char *)imss_path, &buffer, &refs)) > 0)
{
......@@ -2221,8 +2208,8 @@ int imss_unlink(const char *path)
if (ret < 0)
{
pthread_mutex_unlock(&lock);
perror("ERR_HERCULES_IMSS_UNLINK_GET_DATA");
slog_error("ERR_HERCULES_IMSS_UNLINK_GET_DATA");
perror("HERCULES_ERR_IMSS_UNLINK_GET_DATA");
slog_error("HERCULES_ERR_IMSS_UNLINK_GET_DATA");
return -1;
}
// pthread_mutex_lock(&lock);
......@@ -2237,7 +2224,7 @@ int imss_unlink(const char *path)
// Write initial block (0).
memcpy(buff, &header, sizeof(struct stat));
slog_debug("[imss_posix_api] header.st_nlink=%lu", header.st_nlink);
slog_debug("header.st_nlink=%lu", header.st_nlink);
set_data(ds, 0, (char *)buff, 0, 0);
pthread_mutex_unlock(&lock);
......@@ -2247,7 +2234,7 @@ int imss_unlink(const char *path)
// Those operations must be performed by the server itself when it knows no more process are using the file.
// Erase metadata in the backend.
ret = delete_dataset(imss_path, ds);
slog_debug("[imss_posix_api] delete_dataset %s, ret=%d", imss_path, ret);
slog_debug("delete_dataset %s, ret=%d", imss_path, ret);
switch (ret)
{
......@@ -2262,16 +2249,16 @@ int imss_unlink(const char *path)
map_erase(map, imss_path);
pthread_mutex_unlock(&lock_file);
slog_debug("[imss_posix_api] Calling map_release_prefetch %s", path);
slog_debug("Calling map_release_prefetch %s", path);
// map_release_prefetch(map_prefetch, path);
map_release_prefetch(map_prefetch, imss_path);
slog_debug("[imss_posix_api] Finish map_release_prefetch %s", path);
slog_debug("Finish map_release_prefetch %s", path);
// *******************************
ret = release_dataset(ds);
slog_debug("[imss_posix_api] relese_dataset ret=%d", ret);
slog_debug("relese_dataset ret=%d", ret);
if (ret < 0)
{
slog_error("ERR_HERCULES_RELEASE_DATASET");
slog_error("HERCULES_ERR_RELEASE_DATASET");
}
ret = 3;
......@@ -2321,7 +2308,6 @@ int imss_utimens(const char *path, const struct timespec tv[2])
slog_error("[IMSS-FUSE] Cannot open dataset.");
}
// char *buff = malloc(IMSS_DATA_BSIZE);
pthread_mutex_lock(&lock);
get_ndata(file_desc, 0, buff, 0, 0);
pthread_mutex_unlock(&lock);
......@@ -2372,7 +2358,7 @@ int imss_symlinkat(char *new_path_1, char *new_path_2, int _case)
switch (_case)
{
case 0:
slog_debug("[FUSE][imss_posix_api] Entering case 0 ");
slog_debug("[FUSE]Entering case 0 ");
get_iuri(new_path_1, rpath1);
get_iuri(new_path_2, rpath2);
fd_lookup(new_path_1, &fd, &stats, &aux);
......@@ -2406,7 +2392,7 @@ int imss_symlinkat(char *new_path_1, char *new_path_2, int _case)
break;
case 1:
slog_debug("[FUSE][imss_posix_api] Entering case 1 ");
slog_debug("[FUSE]Entering case 1 ");
// rpath1 = new_path_1;
get_iuri(new_path_2, rpath2);
res = create_dataset((char *)rpath2, POLICY, N_BLKS, IMSS_BLKSIZE, REPL_FACTOR, REPL_TYPE, N_SERVERS, new_path_1, 3);
......@@ -2415,29 +2401,29 @@ int imss_symlinkat(char *new_path_1, char *new_path_2, int _case)
break;
}
slog_debug("[FUSE][imss_posix_api] rpath1=%s, rpath2=%s", rpath1, rpath2);
slog_debug("[FUSE]rpath1=%s, rpath2=%s", rpath1, rpath2);
// Assing file handler and create dataset
if (res < 0)
{
// fprintf(stderr, "[imss_create] Cannot create new dataset.\n");
slog_error("[imss_create] Cannot create new dataset.\n");
slog_error("Cannot create new dataset.\n");
free(rpath1);
free(rpath2);
return res;
}
map_erase(map, rpath2);
slog_debug("[imss_create] map_erase(map, rpath:%s), ret:%d", rpath2, ret);
slog_debug("map_erase(map, rpath:%s), ret:%d", rpath2, ret);
// if(ret < 1){
// slog_debug("No elements erased by map_erase, ret:%d", ret);
// }
pthread_mutex_lock(&lock_file); // lock.
// map_put(map, rpath2, file_desc, ds_stat, aux);
// slog_debug("[imss_create] map_put(map, rpath:%s, fh:%ld, ds_stat, buff:%s)", rpath, *fh, buff);
slog_debug("[imss_create] map_put(map, rpath:%s, fh:%ld, ds_stat.st_blksize=%ld)", rpath2, file_desc, ds_stat.st_blksize);
// slog_debug("map_put(map, rpath:%s, fh:%ld, ds_stat, buff:%s)", rpath, *fh, buff);
slog_debug("map_put(map, rpath:%s, fh:%ld, ds_stat.st_blksize=%ld)", rpath2, file_desc, ds_stat.st_blksize);
pthread_mutex_unlock(&lock_file); // unlock.
free(rpath1);
......@@ -2481,8 +2467,6 @@ int imss_flush(const char *path)
return -EACCES;
}
// char *buff = malloc(IMSS_DATA_BSIZE);
stats.st_mtime = spec.tv_sec;
// Write initial block
......@@ -2642,7 +2626,7 @@ int imss_rename(const char *old_path, const char *new_path)
if (file_desc_o < 0)
{
slog_error("[IMSS-FUSE] Cannot open dataset.");
slog_error("HERCULES_ERR_IMSS_RENAME_CANNOT_OPEN_DATASET");
free(new_rpath);
return -ENOENT;
}
......@@ -2680,7 +2664,7 @@ int imss_rename(const char *old_path, const char *new_path)
// RENAME SRV_WORKER(MAP)
rename_dataset_srv_worker_dir_dir(old_rpath, new_rpath, fd, 0);
rename_dataset_srv_worker_dir_dir(old_rpath, new_rpath, file_desc_o, 0);
free(dir_dest);
free(rdir_dest);
free(old_rpath);
......@@ -2712,7 +2696,7 @@ int imss_rename(const char *old_path, const char *new_path)
{
// fprintf(stderr, "[IMSS-FUSE] Cannot open dataset.\n");
slog_error("[imss_posix_api] Cannot open dataset, old_rpath=%s", old_rpath);
slog_error("Cannot open dataset, old_rpath=%s", old_rpath);
free(old_rpath);
free(new_rpath);
return -ENOENT;
......
......@@ -76,8 +76,8 @@ extern "C"
{
slog_debug("[map] element with key %s was not find", k);
}
m->erase(std::string(k));
slog_debug("[map] finish map_erase");
int num_elements_erased = m->erase(std::string(k));
slog_debug("[map] finish map_erase, num_elements_erased=%d", num_elements_erased);
// return ret;
}
......
......@@ -25,9 +25,7 @@
/* metadata server options */
#define STAT_LOGFILE_OPT 'l'
/*** TYPE argument legal values ***/
#define TYPE_DATA_SERVER 'd'
#define TYPE_METADATA_SERVER 'm'
struct logging_opts
......
......@@ -38,11 +38,20 @@
// Max lenght for POLICY name.
#define MAX_POLICY_LEN 16
/*** TYPE argument legal values ***/
#define TYPE_DATA_SERVER 'd'
#define TYPE_METADATA_SERVER 'm'
static const ucp_tag_t tag_req = 0x1337a880u;
static const ucp_tag_t tag_data = 0x2337a880u;
static const ucp_tag_t tag_reply = 0x3337a880u;
static const ucp_tag_t tag_mask = UINT64_MAX;
// Common messages between front and back ends.
#define MAX_RESPONSE_MSG_LEN 10
static char empty_directory_msg[] = "EMPTY_DIRECTORY\0";
/**
* Macro to measure the time spend by function_to_call.
* char*::print_comment: comment to be concatenated to the elapsed time.
......
......@@ -71,4 +71,4 @@
* to be passed to an open() syscall through *optr.
* Return 0 on error.
*/
int __sflags(const char *mode, int *optr);
\ No newline at end of file
int __sflags_(const char *mode, int *optr);
\ No newline at end of file
......@@ -11,7 +11,7 @@
#include "cfg_parse.h"
// #include "hercules.hpp"
static u_int16_t IMSS_THREAD_POOL = 1;
static u_int16_t HERCULES_THREAD_POOL_SIZE = 1;
// using std::string;
......
......@@ -123,10 +123,7 @@ static uint64_t BLOCK_SIZE;
#endif
int32_t get_data_location(int32_t, int32_t, int32_t);
int32_t find_server(int32_t n_servers,
int32_t n_msg,
const char *fname,
int32_t op_type);
// typedef enum {
// CLIENT_SERVER_SEND_RECV_STREAM = UCS_BIT(0),
// CLIENT_SERVER_SEND_RECV_DEFAULT = CLIENT_SERVER_SEND_RECV_STREAM
......@@ -154,6 +151,8 @@ typedef struct
int num_active_storages;
// Number of IMSS servers.
int32_t num_storages;
// Policy that was followed for the metadata.
int32_t session_plcy;
// Server's dispatcher thread connection port.
uint16_t conn_port;
} imss_info;
......@@ -187,6 +186,11 @@ typedef struct
char type; // = 'D';
// Policy that was followed in order to write the dataset.
char policy[MAX_POLICY_LEN];
// Original name when the data was created for the first time, need it for policy CRC16_ in distributed operation rename
char original_name[URI_];
// first parent directory.
char first_parent_dir[URI_];
int32_t session_plcy;
// Number of data elements conforming the dataset entity.
int32_t num_data_elem;
// Size of each data element (in KB).
......@@ -201,8 +205,7 @@ typedef struct
int32_t local_conn;
// Actual size
int64_t size;
// Original name when the data was created for the first time, need it for policy CRC16_ in distributed operation rename
char original_name[256];
// N_servers
int32_t n_servers;
/*************** USED EXCLUSIVELY BY LOCAL DATASETS ***************/
......@@ -537,7 +540,9 @@ RETURNS: 0 - The requested block was successfully stored.
int32_t set_data_server(const char *data_uri, int32_t data_id, const void *buffer, size_t size, off_t offset, int next_server);
int32_t set_data_server_reduce(int from_data_server_id, int to_data_server_id, const void *buffer, size_t size, off_t offset);
int32_t set_data_server_reduce(int from_data_server_id, int to_data_server_id, const void *buffer, size_t size, const char* key);
int32_t SendBroadcastMessage(int from_data_server_id, uint32_t num_of_servers, const char *request);
/* Method retrieving the location of a specific data object.
......@@ -577,7 +582,7 @@ char ** locations = get_dataloc(datasetd, data_id, &num_storages);
* the current number of active data nodes.
* @return Current number of active data nodes, on error -1 is returned.
*/
int get_number_of_active_nodes();
int get_number_of_active_nodes(char *hercules_path);
/* Method specifying the type (DATASET or IMSS INSTANCE) of a provided URI.
......@@ -637,13 +642,20 @@ RETURNS: 0 - Resources were released successfully.
void close_ucx_endpoint(ucp_worker_h worker, ucp_ep_h ep);
int find_first_parent_dir(const char *dataset_uri, char *first_parent_dir);
/**
* Compares two paths regardless of if one of them has a slash '/' at the end of the string.
*/
int paths_equal(const char *a, const char *b);
/**
* Disk methods.
*/
int32_t Make_directory(const char *dirname);
int32_t Open_file(const char *checkpoint_dir, const char *filename);
int32_t Close_file(int fd);
int32_t Write_2_disk(int fd, void *buffer, size_t size, size_t offset);
ssize_t Write_2_disk(int fd, void *buffer, off_t size, size_t offset);
#ifdef __cplusplus
}
......
......@@ -4,10 +4,11 @@
#include <map>
#include <ucp/api/ucp.h>
#include <inttypes.h>
#include <mutex>
typedef std::map<uint64_t, ucp_ep_h> map_server_eps_t;
static std::mutex mut_eps;
void * map_server_eps_create();
void map_server_eps_put(void * map, uint64_t uuid, ucp_ep_h ep);
......
......@@ -15,18 +15,26 @@
#include "imss.h"
//Method specifying the policy.
int32_t set_policy (dataset_info * dataset);
uint32_t get_policy_number(const char *policy_string);
int32_t set_policy_dataset (dataset_info * dataset);
/**
* @brief Method retriving the policy number setted by the "set_policy" method.
* @return policy number according to the distribution policy chosen by the user.
*/
int32_t get_policy();
// int32_t get_policy();
/**
* @brief Method retrieving the server that will receive the following message attending a policy.
* @return next server number (positive integer, >= 0) to send the operation according to the policy, on error -1 is returned,
*/
int32_t find_server (int32_t n_servers, int32_t n_msg, const char * fname, int32_t op_type);
int32_t find_server (int32_t n_servers, int32_t n_msg, const char * fname, int32_t op_type, char server_type, int32_t session_plcy);
int32_t RoundRobin(int32_t n_servers, int32_t n_msg,char *fname);
// int32_t Buckets(int32_t n_servers, int32_t n_msg,char *fname);
int32_t Hashed(int32_t n_servers, int32_t n_msg,char *fname);
int32_t CRC(int32_t n_servers, char *fname, int32_t bytes_);
void FindNameForPolicy(const char *fname, char *passed_name, char server_type);
#endif
......@@ -48,18 +48,20 @@ public:
int32_t erase_head_element();
// Used in stat_worker threads
// Method deleting a record.
int32_t delete_metadata_stat_worker(std::string key)
{
return buffer.erase(key);
}
int32_t delete_metadata_stat_worker(std::string key);
// Method storing a new record.
int32_t put(std::string key, void *address, uint64_t length);
int32_t put_simple(std::string key, int value);
int32_t put_snapshot(std::string key, int value);
int32_t put_broadcast(std::string key, void *address, uint64_t length);
// Method retrieving the address associated to a certain record.
int32_t get(std::string key, void **add_, uint64_t *size_);
int32_t get_simple(std::string key, uint64_t *to_copy);
int32_t get_snapshot(std::string key, int *to_copy);
int32_t get_broadcast(std::string key, void **add_, uint64_t *size_);
char *GetDataOfFile(std::string file_name, uint64_t *file_size_occupied);
char *MergeData(__off_t *size_of_data, uint32_t num_of_data_servers, __off_t file_size, uint64_t block_size);
// Method updating a new record.
int32_t update(std::string key, void *add_, uint64_t length);
......@@ -74,22 +76,28 @@ public:
// Method renaming from stat_worker
int32_t rename_metadata_dir_stat_worker(std::string old_dir, std::string rdir_dest);
// Used in str_worker threads
// Method retrieving the address associated to a certain record.
// Method deleting the address associated to a certain record.
int32_t cleaning();
int32_t cleaning_specific(std::string new_key);
int32_t freeAllMemory();
int32_t erase_broadcast_element(std::string key);
int32_t erase_snapshot_element(std::string key);
int32_t get_broadcast_size();
int32_t get_buffer_size();
// int32_t memory2disk(uint64_t block_size, const char *checkpoint_dir, int finish, int server_id);
int32_t Checkpoint(uint64_t block_size, const char *checkpoint_dir, int finish, int, char *, struct arguments args);
// int32_t Checkpoint(uint64_t block_size, const char *checkpoint_dir, int finish, int, char *, struct arguments args);
int32_t Checkpoint(uint64_t block_size, const char *checkpoint_dir, int finish, int server_id, char *data_hostname, struct arguments args);
int32_t Snapshot(uint64_t block_size, const char *checkpoint_dir, int finish, int, char *, struct arguments args);
// Method retrieving a map::begin iterator referencing the first element in the map container.
std::map<std::string, std::pair<void *, uint64_t>>::iterator begin()
std::unordered_map<std::string, std::pair<void *, uint64_t>>::iterator begin()
{
return buffer.begin();
}
// Method retrieving a reference to the end of the map.
std::map<std::string, std::pair<void *, uint64_t>>::iterator end()
std::unordered_map<std::string, std::pair<void *, uint64_t>>::iterator end()
{
return buffer.end();
}
......@@ -103,10 +111,10 @@ public:
private:
// Map structure tracking stored records (by default sorts keys with '<' op).
// <key(file uri), <data, lenght>>
std::map<std::string, std::pair<void *, uint64_t>> buffer;
std::unordered_map<std::string, std::pair<void *, uint64_t>> buffer;
std::map<std::string, int> buffer_snapshot;
std::unordered_map<std::string, int> buffer_broadcast;
// std::map<std::string, std::pair<uint64_t, uint64_t>> buffer_snapshot;
// std::unordered_map<std::string, int> buffer_broadcast;
std::map<std::string, std::pair<void *, uint64_t>> buffer_broadcast;
std::map<std::string, std::pair<int, __off_t>> buffer_fd;
// Mutex restricting access to structure.
uint64_t total_size;
......
......@@ -56,20 +56,23 @@ typedef struct
ucp_address_t *peer_address;
uint64_t worker_uid;
char *tmp_file_path;
u_int16_t hercules_thread_pool_size;
int thread_id;
struct arguments args;
} p_argv;
// Thread method attending client data requests.
void *srv_worker(void *th_argv);
int srv_worker_helper(p_argv *arguments, const char *req);
int srv_worker_helper(p_argv *arguments, const char *req, void *map_server_eps);
void *Checkpoint(void *th_argv);
void *Snapshot(void *th_argv);
// Thread method searching and cleaning nodes with st_nlink=0
void *garbage_collector(void *th_argv);
// Thread method attending client metadata requests.
void *stat_worker(void *th_argv);
int stat_worker_helper(p_argv *arguments, char *req);
int stat_worker_helper(p_argv *arguments, char *req, void *map_server_eps);
// Dispatcher thread method distributing clients among the pool server threads.
void *srv_attached_dispatcher(void *th_argv);
......
#!/bin/bash
CheckForStatusFile() {
FILE="${HERCULES_PATH}/tmp/$SERVER_TYPE-hercules-$SERVER_NUMBER-$ACTION"
#rm ${FILE} 2> /dev/null
## Checks if the file exists.
until [ -f "$FILE" ]; do
echo "Waiting for $FILE, attemp $i" > ${HERCULES_PATH}/tmp/waiting-$SERVER_TYPE-$SERVER_NUMBER-$ACTION
i=$(($i + 1))
## Waits "attemps" times, then an error is return.
if [ $i -gt $ATTEMPS ]; then
exit 1
fi
t=$(($i % 5))
if [ $t -eq 0 ]; then
echo "[+][$HOSTNAME] Waiting for server $SERVER_NUMBER"
fi
sleep 1
done
## Checks if the server was deploy correctly.
STATUS=$(cat -- "$FILE" | grep "STATUS" | awk '{print $3}')
## Removes the file.
# set -x
# rm ${FILE}
# set +x
}
SERVER_TYPE=$1
SERVER_NUMBER=$2
ACTION=$3 # expected string action, e.g., down when servers are stopped.
......@@ -13,29 +39,17 @@ if [ ! -d "${HERCULES_PATH}/tmp" ]; then
exit 1
fi
FILE="${HERCULES_PATH}/tmp/$SERVER_TYPE-hercules-$SERVER_NUMBER-$ACTION"
## Checks if the file exists.
until [ -f "$FILE" ]; do
echo "Waiting for $FILE, attemp $i"
i=$(($i + 1))
## Waits "attemps" times, then an error is return.
if [ $i -gt $ATTEMPS ]; then
exit 1
fi
t=$(($i % 5))
if [ $t -eq 0 ]; then
echo "[+][$HOSTNAME] Waiting for server $SERVER_NUMBER"
fi
sleep 1
echo ${HERCULES_PATH}
CheckForStatusFile
#echo "STATUS=$STATUS" >> ${HERCULES_PATH}/tmp/$SERVER_TYPE-hercules-$SERVER_NUMBER-$ACTION
# If the server is locked we will wait until it is unlocked.
until [ "$STATUS" != "LOCKED" ]; do
echo "[+][$HOSTNAME] Server $SERVER_NUMBER is locked, $STATUS"
CheckForStatusFile
done
## Checks if the server was deploy correctly.
STATUS=$(cat -- "$FILE" | grep "STATUS" | awk '{print $3}')
echo "STATUS=$STATUS"
## Removes the file.
set -x
rm ${FILE}
set +x
if [ "$STATUS" != "OK" ]; then
# echo "[X] Error deploying server $SERVER_NUMBER."
exit 1
......
......@@ -23,12 +23,19 @@ StopServers() {
echo "# Operation = ${OPERATION}"
fi
echo ${OPERATION} > "${HERCULES_PATH}/tmp/hercules_pkill_operation"
i=0
# Sends the pkill operation to each server.
for node in "${hosts[@]}"
do
RM="rm ${HERCULES_PATH}/tmp/d-hercules-$i-stop"
# Set the action to be doing by the servers when they received the pkill signal.
# ( ssh $node "echo $OPERATION > ./tmp/hercules_pkill_operation" )
# Kill threads and finish the server.
echo "[+] Removing temporal files on ${node}"
( ssh $node "$RM" )
( ssh ${node} "pkill -SIGUSR1 hercules_server" )
i=$(($i+1))
done
rm "${HERCULES_PATH}/tmp/hercules_pkill_operation"
}
......@@ -95,18 +102,19 @@ WaitForServers() {
else # if slurm is available.
set -x
ret=$(srun -N 1 -n 1 -c 1 -m block:block:block --mem=1G -w ${node} -- ${COMMAND})
set +x
fi
ret=$?
if [ "$ret" -gt 0 ]; then
echo "[Error: $ret] It has not been possible to "${ACTION}" a ${SERVER_NAME} server on ${node}, please verify the configuration file and logs."
exit 1
fi
SERVER_ID=$((SERVER_ID+1))
if [[ "$VERBOSE" -eq "1" ]]; then
echo "[OK] ${SERVER_NAME} ${i} server ${ACTION} in ${node}"
echo "[OK] ${SERVER_NAME} server ${SERVER_ID} ${ACTION} in ${node}"
fi
SERVER_ID=$((SERVER_ID+1))
set +x
done
}
......@@ -118,7 +126,7 @@ if [[ $STATUS != "stop" && $STATUS != "start" && $STATUS != "remove" && $STATUS
fi
WAIT_SERVERS=1
VERBOSE=1
VERBOSE=0
## Check if user pass arguments.
while getopts :m:d:o:c:s:f:k:w:V: flag
......@@ -308,7 +316,7 @@ export HERCULES_MOUNT_POINT=$(cat $FILE | grep "\<MOUNT_POINT\>" | head -1 | awk
META_PORT=$(cat $FILE | grep "\<METADATA_PORT\>" | awk '{print $3}')
DATA_PORT=$(cat $FILE | grep "\<DATA_PORT\>" | awk '{print $3}')
MALLEABILITY=$(cat $FILE | grep "\<MALLEABILITY\>" | awk '{print $3}')
HERCULES_NUM_METADATA=$(cat $FILE | grep "\<NUM_META_SERVERS\>" | awk '{print $3}')
export HERCULES_NUM_METADATA=$(cat $FILE | grep "\<NUM_META_SERVERS\>" | awk '{print $3}')
export HERCULES_NUM_DATA=$(cat $FILE | grep "\<NUM_DATA_SERVERS\>" | awk '{print $3}')
NUM_NODES_FOR_CLIENTS=$(cat $FILE | grep "\<NUM_NODES_FOR_CLIENTS\>" | awk '{print $3}')
NUM_CLIENTS_PER_NODE=$(cat $FILE | grep "\<NUM_CLIENTS_PER_NODE\>" | awk '{print $3}')
......@@ -329,16 +337,16 @@ if [[ "$SLURM" -eq "1" ]]; then
echo "[+] Slurm is active."
## If slurm is enabled, we concat the job id to the configuration file to create a new one.
CURR_HERCULES_CONF_FILE="${FILE}_${SLURM_JOB_ID}"
cp $FILE $CURR_HERCULES_CONF_FILE
cp "${FILE}" "${CURR_HERCULES_CONF_FILE}"
else
CURR_HERCULES_CONF_FILE="${FILE}"
fi
export HERCULES_CONF=$CURR_HERCULES_CONF_FILE
export HERCULES_CONF="${CURR_HERCULES_CONF_FILE}"
# exit 0
# echo "+ + + Initial number of data nodes is $INIT_HERCULES_NUM_DATA/$HERCULES_NUM_DATA"
export "HERCULES_INIT_NUM_DATA=${INIT_HERCULES_NUM_DATA}"
export HERCULES_INIT_NUM_DATA="${INIT_HERCULES_NUM_DATA}"
## \< \> to match exact word.
......@@ -369,6 +377,8 @@ then
## Creates an array with the data servers hostnames.
readarray -t hosts < $HERCULES_DATA_HOSTFILE
## Stop the data servers.
## TODO: check if the servers are running on the remote node
## before to try stop them.
StopServers "data" 1 ${hosts[@]}
## Checks if user wants to wait until all servers are stopped.
......@@ -472,9 +482,10 @@ then
exit 0
fi
# set -x
## If not empty, the field "METADATA_HOSTFILE" and "DATA_HOSTFILE" field was set on the configuration file.
if [ ! -z "$HERCULES_METADATA_HOSTFILE" ]; then
echo "${HERCULES_METADATA_HOSTFILE} exists"
if [[ "$SLURM" -eq "1" ]]; then
# If slurm is enabled, we added the slurm job id to the meta hostfile.
HERCULES_METADATA_HOSTFILE+="_"$SLURM_JOB_ID
......@@ -482,21 +493,23 @@ if [ ! -z "$HERCULES_METADATA_HOSTFILE" ]; then
ESCAPED_HERCULES_METADATA_HOSTFILE="${HERCULES_METADATA_HOSTFILE//\//\\/}"
## Replace the old metadata hostfile name with the new one containing the job id.
sed -i "s/^METADATA_HOSTFILE = .*/METADATA_HOSTFILE = $ESCAPED_HERCULES_METADATA_HOSTFILE/g" "$CURR_HERCULES_CONF_FILE"
# echo "HERCULES_METADATA_HOSTFILE=${HERCULES_METADATA_HOSTFILE}"
fi
fi
if [ ! -z "$HERCULES_DATA_HOSTFILE" ]; then
if [ ! -z "${HERCULES_DATA_HOSTFILE}" ]; then
if [[ "$SLURM" -eq "1" ]]; then
# If slurm is enabled, we added the slurm job id to the data hostfile.
HERCULES_DATA_HOSTFILE+="_"$SLURM_JOB_ID
HERCULES_DATA_HOSTFILE+="_${SLURM_JOB_ID}"
## Replace all "/" with "\/" because "/" is an special character for the sed command.
ESCAPED_HERCULES_DATA_HOSTFILE="${HERCULES_DATA_HOSTFILE//\//\\/}"
## Replace the old data hostfile name with the new one containing the job id.
sed -i "s/^DATA_HOSTFILE = .*/DATA_HOSTFILE = $ESCAPED_HERCULES_DATA_HOSTFILE/g" "$CURR_HERCULES_CONF_FILE"
sed -i "s/^DATA_HOSTFILE = .*/DATA_HOSTFILE = ${ESCAPED_HERCULES_DATA_HOSTFILE}/g" "${CURR_HERCULES_CONF_FILE}"
fi
fi
# set +x
## Creates the initial "hercules_num_act_nodes" file.
echo ${HERCULES_INIT_NUM_DATA} > hercules_num_act_nodes
echo ${HERCULES_INIT_NUM_DATA} > ${HERCULES_PATH}/tmp/hercules_num_act_nodes
## Set the name that will be used to create the
if [[ "$SLURM" -eq "1" ]]; then
......@@ -599,10 +612,12 @@ do
if [[ "$VERBOSE" -eq "1" ]]; then
echo "[+] Removing temporal files on ${node}"
fi
srun -N 1 -n 1 -c 1 -m block:block:block --mem=1G -w ${node} ${RM} 2> /dev/null
set -x
srun -N 1 -n 1 -c 1 -m block:block:block --mem=1G -w ${node} -- ${RM} 2> /dev/null
## If slurm is being used, the service is deploy using srun.
echo "[+] Starting metadata server $i on $node..."
srun -N 1 -n 1 -c 1 -m block:block:block --mem=80G -w ${node} ${COMMAND} &
srun -N 1 -n 1 -c 1 -m block:block:block --mem=0G -w $node -- $COMMAND &
set +x
fi
i=$(($i+1))
done
......@@ -658,7 +673,7 @@ fi
start=`date +%s.%N`
for node in ${data_hosts[@]}
do
RM="rm ${HERCULES_PATH}/tmp/d-hercules-$i"
RM="rm ${HERCULES_PATH}/tmp/d-hercules-$i-start"
COMMAND="$HERCULES_BUILD_PATH/hercules_server d $i ${meta_hosts[0]} $INIT_HERCULES_NUM_DATA"
if [[ "$SLURM" -eq "0" ]]; then
# ssh $node "$RM; cd $HERCULES_BASH_PATH && $COMMAND &"
......@@ -676,12 +691,12 @@ do
if [[ "$VERBOSE" -eq "1" ]]; then
echo "[+] Removing temporal files on $node"
fi
srun -N 1 -n 1 -c 1 -m block:block:block --mem=1G -w $node $RM 2> /dev/null
srun -N 1 -n 1 -c 1 -m block:block:block --mem=1G -w $node -- $RM 2> /dev/null
## If slurm is being used, the service is deploy using srun.
if [[ "$VERBOSE" -eq "1" ]]; then
echo "[+] Starting data server $i on $node..."
fi
srun -N 1 -n 1 -c 1 -m block:block:block --mem=80G -N 1 -n 1 -w $node $COMMAND &
srun -N 1 -n 1 -c 1 -m block:block:block --mem=0G -w $node -- $COMMAND &
fi
i=$(($i+1))
done
......
......@@ -25,7 +25,6 @@ void *recv_buffer;
int ep_timeout = 0;
ucs_status_t ucp_mem_alloc(ucp_context_h ucp_context, size_t length, void **address_p)
{
ucp_mem_map_params_t params;
......@@ -175,7 +174,6 @@ size_t send_data(ucp_worker_h ucp_worker, ucp_ep_h ep, const void *msg, size_t m
ucp_request_param_t send_param;
send_req_t ctx;
// char req[2048];
ctx.buffer = (void *)msg;
// ctx.buffer = (char *)msg;
// ctx.buffer = (char *)malloc(msg_len);
......
......@@ -31,15 +31,14 @@ GTree_search_(GNode *parent_node,
// HAVE TO CHECK IF IT IS A DIRECTORY OR A FILE
// For this i check if it has at the end /
slog_debug("child->data=%s, desired_data=%s", child->data, desired_data);
if (desired_data[strlen(desired_data) - 1] == '/' && !strncmp((char *)child->data, desired_data, strlen((char *)child->data)))
{
{ // directory case.
slog_debug("directory case");
// Check if the compared node is the requested one.
int a = 1;
if (!strcmp((char *)child->data, desired_data))
{
*found_node = child;
// The desired data was found.
return 1;
}
......@@ -50,12 +49,12 @@ GTree_search_(GNode *parent_node,
}
}
else if (desired_data[strlen(desired_data) - 1] != '/' && !strncmp((char *)child->data, desired_data, strlen((char *)child->data)))
{
{ // regular file.
slog_debug("regular file");
// Check if the compared node is the requested one.
if (!strcmp((char *)child->data, desired_data))
{
*found_node = child;
// The desired data was found.
return 1;
}
......@@ -65,7 +64,7 @@ GTree_search_(GNode *parent_node,
// CHECK THE NUMBERS OF '/' IN THE PATHS TO SEE IF WE ARRIVE TO THE DIRECTORY
int amount = 0;
for (int32_t j = 0; j < strlen(desired_data) - 1; j++)
{
{ // counts the number of "/" on the path.
if (desired_data[j] == '/')
{
amount = amount + 1;
......@@ -73,14 +72,15 @@ GTree_search_(GNode *parent_node,
}
int amount_child = 0;
char *path_child = (char *)child->data;
slog_debug("path_child=%s, desired_data=%s", path_child, desired_data);
for (int32_t j = 0; j < strlen(path_child) - 1; j++)
{
{ // counts the number of "/" on the child path.
if (path_child[j] == '/')
{
amount_child = amount_child + 1;
}
}
slog_debug("amount=%d, amount_child=%d", amount, amount_child);
if (amount == amount_child)
{
// Move on to the following child.
......@@ -97,6 +97,7 @@ GTree_search_(GNode *parent_node,
child = child->next;
}
last_parent = parent_node;
slog_debug("last_parent=%s", last_parent->data);
return 0;
}
......@@ -130,7 +131,7 @@ GTree_rename(char *old_desired_data, char *new_desired_data)
if (GTree_search(tree_root, old_desired_data, &closest_node) == 1)
{
slog_debug("\t[GTree] closest_node->data=%s", (char *)closest_node->data);
// If the searched name (old data) and the data of the node in the tree are equals,
// If the searched name (old data) and the data of the node in the tree are equals,
// we remove the node from the tree, and insert the new one.
if (strcmp(old_desired_data, (char *)closest_node->data) == 0)
{
......@@ -141,7 +142,7 @@ GTree_rename(char *old_desired_data, char *new_desired_data)
}
else
{
//fprintf(stderr, "Rename Error not found:%s\n", old_desired_data);
// fprintf(stderr, "Rename Error not found:%s\n", old_desired_data);
slog_error("Rename Error not found:%s", old_desired_data);
return 0;
}
......@@ -195,15 +196,19 @@ GTree_rename_dir_dir(char *old_dir, char *rdir_dest)
}
g_node_destroy(dir_node);
}
else
{
return 1;
}
return 0;
}
// Method deleting a new path.
int32_t
GTree_delete(char *desired_data)
int32_t GTree_delete(char *desired_data)
{
// Closest node to the one requested (or even the requested one itself).
GNode *closest_node;
int32_t ret = 0;
// Check if the node has been already inserted.
if (GTree_search(tree_root, desired_data, &closest_node) == 1)
......@@ -212,13 +217,22 @@ GTree_delete(char *desired_data)
{
g_node_destroy(closest_node); // Delete Node
}
ret = 1;
}
else
{
return 0;
// add recursive search.
// const char *last = desired_data + strlen(desired_data) - 1;
size_t len = strlen(desired_data);
if (len > 0 && desired_data[len - 1] != '/')
{
strcat(desired_data,"/");
ret = GTree_delete(desired_data);
}
// return 0;
}
return 1;
return ret;
}
// Method inserting a new path.
......@@ -227,10 +241,9 @@ GTree_insert(char *desired_data)
{
// Closest node to the one requested (or even the requested one itself).
GNode *closest_node = NULL;
if (last_parent != NULL)
{
slog_debug("last_parent->data=%s, desired_data=%s", last_parent->data, desired_data);
char *data_search = (char *)calloc(256, sizeof(char));
if (desired_data[strlen(desired_data) - 1] == '/')
{
......@@ -255,12 +268,17 @@ GTree_insert(char *desired_data)
free(father);
free(data_search);
}
else
{
slog_debug("last_parent is NULL");
}
// Check if the node has been already inserted.
if (closest_node == NULL)
{
if (GTree_search(tree_root, desired_data, &closest_node))
{
slog_debug("closest_node=%s", closest_node->data);
return 0;
}
}
......@@ -276,16 +294,15 @@ GTree_insert(char *desired_data)
if (!more_chars && (closest_data_length == 2))
{
more_chars = 1;
closest_data_length--;
}
// Search for the '/' characters within the additional ones.
slog_debug("[Gtree] path=%s, more_chars=%d, closest_data_length=%d", desired_data, more_chars, closest_data_length);
slog_debug("path=%s, more_chars=%d, closest_data_length=%d", desired_data, more_chars, closest_data_length);
for (int32_t i = 0; i < more_chars; i++)
{
int32_t new_position = closest_data_length + i;
slog_debug("[Gtree] path=%s, new_position=%d, i=%d, %c", desired_data, new_position, i, desired_data[new_position]);
// slog_debug("[Gtree] path=%s, new_position=%d, i=%d, %c", desired_data, new_position, i, desired_data[new_position]);
if ((desired_data[new_position] == '/') || (i == (more_chars - 1)))
{
......@@ -307,7 +324,6 @@ GTree_insert(char *desired_data)
g_node_append(closest_node, new_node);
return 0;
// closest_node = new_node;
}
}
......@@ -322,8 +338,8 @@ serialize_dir_childrens(GNode *visited_node,
char **buffer)
{
// Add the concerned uri into the buffer.
memcpy(*buffer, (char *)visited_node->data, URI_);
*buffer += URI_;
// memcpy(*buffer, (char *)visited_node->data, URI_);
// *buffer += URI_;
GNode *child = visited_node->children;
// printf("node=%s num_children=%d\n",(char *) visited_node->data,num_children);
......@@ -399,7 +415,10 @@ GTree_getdir(char *desired_dir,
// Check if the node is inserted.
if (!GTree_search(tree_root, desired_dir, &dir_node))
{
*numdir_elems = -1;
return NULL;
}
// Number of elements contained by the concerned directory.
// uint32_t num_elements_indir = g_node_n_nodes (dir_node, G_TRAVERSE_ALL);
......@@ -408,19 +427,26 @@ GTree_getdir(char *desired_dir,
// Number of children of the directory node.
uint32_t num_children = g_node_n_children(dir_node);
*numdir_elems = num_children + 1; //+1 because of the actual directory + childrens
slog_info("[GTree_getdir] num_children=%d", num_children);
// *numdir_elems = num_children + 1; //+1 because of the actual directory + childrens
*numdir_elems = num_children; // actual directory is concat in the front-end.
slog_info("num_children=%d", *numdir_elems);
if (*numdir_elems == 0)
{
return NULL;
}
// Buffer containing the whole set of elements within a certain directory.
// char * dir_elements = (char *) malloc(sizeof(char)*num_elements_indir*URI_);
// char *dir_elements = (char *) malloc(sizeof(char)*num_elements_indir*URI_);
char *dir_elements = (char *)malloc((num_children + 1) * URI_);
char *aux_dir_elem = dir_elements;
// Call the serialization function storing all dir elements in the buffer.
// TO CHECK!
slog_info("[GTree_getdir] serialize_dir_childrens(dir_node, num_children=%d, &aux_dir_elem)", num_children);
slog_info("serialize_dir_childrens(dir_node=%s, num_children=%d, &aux_dir_elem)", dir_node->data, num_children);
serialize_dir_childrens(dir_node, num_children, &aux_dir_elem);
slog_info("[GTree_getdir] ending serialize_dir_childrens, aux_dir_elem=%s", aux_dir_elem);
slog_info("ending serialize_dir_childrens, aux_dir_elem=%s", aux_dir_elem);
return dir_elements;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment