Commit 6fc29d63 authored by Genaro Juan Sánchez Gallegos's avatar Genaro Juan Sánchez Gallegos Committed by Javier Garcia Blas
Browse files

Add improvements for Snapshot.

Showing with 313 additions and 306 deletions
+313 -306
......@@ -537,7 +537,7 @@ RETURNS: 0 - The requested block was successfully stored.
int32_t set_data_server(const char *data_uri, int32_t data_id, const void *buffer, size_t size, off_t offset, int next_server);
int32_t set_data_server_reduce(int from_data_server_id, int to_data_server_id, const void *buffer, size_t size);
int32_t set_data_server_reduce(int from_data_server_id, int to_data_server_id, const void *buffer, size_t size, const char* key);
int32_t SendBroadcastMessage(int from_data_server_id, uint32_t num_of_servers, const char *request);
......
......@@ -64,7 +64,7 @@ public:
int32_t get_broadcast(std::string key, void **add_, uint64_t *size_);
char *GetDataFromFile(std::string file_name, uint64_t *file_size_occupied);
char *MergeData(uint32_t num_of_data_servers, std::string file_name, __off_t file_size, uint64_t block_size);
char *MergeData(__off_t *size_of_data, uint32_t num_of_data_servers, std::string file_name, __off_t file_size, uint64_t block_size);
// char* getDataFromFile(std::string file_name);
......@@ -86,6 +86,7 @@ public:
int32_t cleaning_specific(std::string new_key);
int32_t freeAllMemory();
int32_t erase_broadcast_element(std::string key);
int32_t erase_snapshot_element(std::string key);
int32_t get_broadcast_size();
......
......@@ -1298,7 +1298,6 @@ int32_t open_dataset(char *dataset_uri, int opened)
if ((associated_imss_indx = imss_check(dataset_uri)) == -1)
{
slog_fatal("HERCULES_ERR_OPEN_DATASET_NOT_FOUND");
// return -1;
}
slog_live("[IMSS] associated_imss_indx=%d", associated_imss_indx);
......@@ -2130,7 +2129,6 @@ int32_t get_data_location(int32_t dataset_id, int32_t data_id, int32_t op_type)
int32_t old_num_storages = curr_imss.info.num_active_storages;
if (curr_num_data_nodes_env != curr_imss.info.num_active_storages)
{
// curr_imss.info.num_storages = atoi(curr_num_data_nodes);
stat_imss_info(curr_imss.info.uri_, &curr_imss.info);
fprintf(stderr, "HERCULES_CURR_ACTIVE_DATA_NODES=%d, old_num_storages=%d, num_active_storages=%d, uri=%s\n", curr_num_data_nodes_env, old_num_storages, curr_imss.info.num_active_storages, curr_imss.info.uri_);
// unsetenv("HERCULES_CURR_ACTIVE_DATA_NODES");
......@@ -2170,10 +2168,8 @@ int32_t get_data_location(int32_t dataset_id, int32_t data_id, int32_t op_type)
}
slog_warn("Server %d is not avaiable, number of active nodes was %d\n", server, curr_imss.info.arr_num_active_storages[server]);
// fprintf(stderr, "Server %d is not avaiable, recalculating\n", server);
// fprintf(stderr, "Server %d is not avaiable for data id %d, number of active nodes was %d\n", server, data_id, curr_imss.info.arr_num_active_storages[server]);
num_storages = curr_imss.info.arr_num_active_storages[server]; // curr_imss.info.num_active_storages;
num_storages = curr_imss.info.arr_num_active_storages[server];
it++;
// if (it >= curr_imss.info.num_storages)
if (it >= 10)
{
fprintf(stderr, "[ERROR] Not find server for data id %d after %d iterations, %s\n", data_id, it, curr_dataset.uri_);
......@@ -2233,7 +2229,6 @@ int32_t rename_dataset_srv_worker_dir_dir(char *old_dir, char *rdir_dest,
ucp_ep_h ep = curr_imss.conns.eps[i];
sprintf(key_, "GET 6 0 %s %s", old_dir, rdir_dest);
// if (comm_send(curr_imss.conns.eps_[repl_servers[i]], key, key_length, 0) != key_length)
if (send_req(ucp_worker_data, ep, local_addr_data, local_addr_len_data, key_) == 0)
{
pthread_mutex_unlock(&lock_network);
......@@ -2688,7 +2683,6 @@ void *split_readv(void *th_argv)
{
ucp_ep_h ep;
// Send read request message specifying the block URI.
// if (comm_send(curr_imss.conns.eps_[repl_servers[i]], key, KEY, 0) < 0)
// printf("[SPLIT READV] 1-send_data");
sprintf(key_, "GET 9 0 %s %ld %ld %d %d",
arguments->path, arguments->BLKSIZE, arguments->start_offset,
......@@ -2755,88 +2749,6 @@ void *split_readv(void *th_argv)
pthread_mutex_unlock(&lock_network);
pthread_exit(NULL);
}
// Method retrieving multiple data from a specific server
/*int32_t
split_readv(int32_t n_server,
char * path,
char * msg,
unsigned char * buffer,
int32_t size,
uint64_t BLKSIZE,
int64_t start_offset,
int stats_size)
{
printf("n_server=%d msg=%s size=%d", n_server, msg, size);
//Servers that the data block is going to be requested to.
int32_t repl_servers[curr_dataset.repl_factor];
int32_t curr_imss_storages = curr_imss.info.num_storages;
//Retrieve the corresponding connections to the previous servers.
for (int32_t i = 0; i < curr_dataset.repl_factor; i++)
{
//Server storing the current data block.
uint32_t n_server_ = (n_server + i*(curr_imss_storages/curr_dataset.repl_factor)) % curr_imss_storages;
repl_servers[i] = n_server_;
//Check if the current connection is the local one (if there is).
if (repl_servers[i] == curr_dataset.local_conn)
{
//Move the local connection to the first one to be requested.
int32_t aux_conn = repl_servers[0];
repl_servers[0] = repl_servers[i];
repl_servers[i] = aux_conn;
}
}
char key_[KEY];
//Key related to the requested data element.
sprintf(key_, "9 %s %ld %ld %d %s",path, BLKSIZE, start_offset, stats_size, msg);
int key_length = strlen(key_)+1;
char key[key_length];
memcpy((void *) key, (void *) key_, key_length);
key[key_length-1] = '\0';
//Request the concerned block to the involved servers.
for (int32_t i = 0; i < curr_dataset.repl_factor; i++)
{
//printf("BLOCK %d ASKED TO %d SERVER with key: %s (%d)", curr_block, repl_servers[i], key, key_length);
//Send read request message specifying the block URI.
//if (comm_send(curr_imss.conns.eps_[repl_servers[i]], key, KEY, 0) < 0)
if (comm_send(curr_imss.conns.eps_[repl_servers[i]], key, key_length, 0) != key_length)
{
perror("ERRIMSS_GETDATA_REQ");
return -1;
}
//Receive data related to the previous read request directly into the buffer.
if (comm_recv(curr_imss.conns.eps_[repl_servers[i]], buffer, size*BLKSIZE*KB, 0) == -1)
{
if (errno != EAGAIN)
{
perror("ERRIMSS_GETDATA_RECV");
return -1;
}
else
break;
}
//Check if the requested key was correctly retrieved.
if (strncmp((const char *) buffer, "$ERRIMSS_NO_KEY_AVAIL$", 22)){
return 0;
}
}
//slog_fatal( "ERRIMSS_GETDATA_UNAVAIL");
return -1;
}
*/
int32_t flush_data()
{
......@@ -2888,7 +2800,7 @@ ssize_t get_ndata(int32_t dataset_id, int32_t data_id, void *buffer, ssize_t to_
return -2;
}
replication_factor = curr_dataset.repl_factor;
curr_imss_storages = curr_imss.info.num_active_storages; // curr_imss.info.num_storages;
curr_imss_storages = curr_imss.info.num_active_storages;
// }
// else
// {
......@@ -3088,7 +3000,7 @@ size_t get_data_mall(int32_t dataset_id, int32_t data_id, void *buffer, ssize_t
// Servers that the data block is going to be requested to.
int32_t repl_servers[curr_dataset.repl_factor];
int32_t curr_imss_storages = curr_imss.info.num_active_storages; // curr_imss.info.num_storages;
int32_t curr_imss_storages = curr_imss.info.num_active_storages;
// Retrieve the corresponding connections to the previous servers.
// slog_debug("curr_dataset.repl_factor=%d", curr_dataset.repl_factor);
......@@ -3201,7 +3113,7 @@ int32_t set_data(int32_t dataset_id, int32_t data_id, const void *buffer, size_t
pthread_mutex_lock(&lock_network);
char key_[REQUEST_SIZE];
int32_t curr_imss_storages = curr_imss.info.num_active_storages; // curr_imss.info.num_storages;
int32_t curr_imss_storages = curr_imss.info.num_active_storages;
// Send the data block to every server implementing redundancy.
for (int32_t i = 0; i < curr_dataset.repl_factor; i++)
......@@ -3362,8 +3274,6 @@ int32_t set_data_server(const char *data_uri, int32_t data_id, const void *buffe
pthread_mutex_lock(&lock_network);
char key_[REQUEST_SIZE];
// int32_t curr_imss_storages = curr_imss.info.num_storages;
// curr_imss = g_array_index(imssd, imss, curr_dataset.imss_d);
// Send the data block to every server implementing redundancy.
// for (int32_t i = 0; i < curr_dataset.repl_factor; i++)
......@@ -3414,20 +3324,18 @@ int32_t set_data_server(const char *data_uri, int32_t data_id, const void *buffe
return 1;
}
int32_t set_data_server_reduce(int from_data_server_id, int to_data_server_id, const void *buffer, size_t size)
int32_t set_data_server_reduce(int from_data_server_id, int to_data_server_id, const void *buffer, size_t size, const char *key)
{
pthread_mutex_lock(&lock_network);
char key_[REQUEST_SIZE];
// int32_t curr_imss_storages = curr_imss.info.num_storages;
// curr_imss = g_array_index(imssd, imss, curr_dataset.imss_d);
// Send the data block to every server implementing redundancy.
// for (int32_t i = 0; i < curr_dataset.repl_factor; i++)
curr_imss = g_array_index(imssd, imss, curr_dataset.imss_d);
char request[REQUEST_SIZE];
{
ucp_ep_h ep;
// Server receiving the current data block.
uint32_t n_server_ = to_data_server_id; // (n_server + i * (curr_imss_storages / curr_dataset.repl_factor)) % curr_imss_storages;
uint32_t n_server_ = to_data_server_id;
// gettimeofday(&start, NULL);
......@@ -3437,14 +3345,14 @@ int32_t set_data_server_reduce(int from_data_server_id, int to_data_server_id, c
// size = curr_dataset.data_entity_size;
// sprintf(key_, "SET %lu %ld %s$%d", size, offset, data_uri, data_id);
sprintf(key_, "SNAPSET %lu %d %s$%d", size, 0, curr_dataset.uri_, from_data_server_id);
sprintf(request, "SNAPSET %lu %d %s$%d", size, 0, key, from_data_server_id);
slog_info("[IMSS] Request to Server %d: %s (%lu)", n_server_, request, size);
slog_info("[IMSS] Request to Server %d: %s (%lu)", n_server_, key_, size);
ep = curr_imss.conns.eps[n_server_];
// send the request to the data server, indicating we will perform a write operation (SET) to certain data block (data_id)
// in a dataset (curr_dataset.uri).
if (send_req(ucp_worker_data, ep, local_addr_data, local_addr_len_data, key_) == 0)
if (send_req(ucp_worker_data, ep, local_addr_data, local_addr_len_data, request) == 0)
{
pthread_mutex_unlock(&lock_network);
perror("HERCULES_ERR_SET_REQ_SEND_REQ");
......@@ -3461,7 +3369,7 @@ int32_t set_data_server_reduce(int from_data_server_id, int to_data_server_id, c
slog_error("HERCULES_ERR_SEND_DATA_SEND_DATA");
return -1;
}
slog_info("[IMSS][completed] Request sent to server %d: %s (%d)", n_server_, key_, size);
slog_info("[IMSS][completed] Request sent to server %d: %s (%d)", n_server_, request, size);
/* gettimeofday(&end, NULL);
delta_us = (long) (end.tv_usec - start.tv_usec);
printf("[CLIENT] [SWRITE SEND_DATA] delta_us=%6.3f",(delta_us/1000.0F));*/
......@@ -3490,8 +3398,7 @@ int32_t SendBroadcastMessage(int from_data_server_id, uint32_t num_of_servers, c
// Skip current server.
continue;
}
ucp_ep_h ep;
// Server receiving the current data block.
uint32_t n_server_ = i;
......@@ -3502,7 +3409,7 @@ int32_t SendBroadcastMessage(int from_data_server_id, uint32_t num_of_servers, c
slog_info("[IMSS] num_active_storages from curr_imss=%d", curr_imss.info.num_active_storages);
ep = curr_imss.conns.eps[n_server_];
slog_info("[IMSS] Request to Server %d: %s", n_server_, request);
if (send_req(ucp_worker_data, ep, local_addr_data, local_addr_len_data, (char *)request) == 0)
{
pthread_mutex_unlock(&lock_network);
......@@ -3521,7 +3428,6 @@ int32_t SendBroadcastMessage(int from_data_server_id, uint32_t num_of_servers, c
// return -1;
// }
slog_info("[IMSS][completed] Request sent to server %d: %s", n_server_, request);
}
pthread_mutex_unlock(&lock_network);
......@@ -3949,7 +3855,7 @@ int get_number_of_active_nodes(char *hercules_path)
{
int number_active_storage_servers = 0;
char buf[10], absolute_path[PATH_MAX];
sprintf(absolute_path,"%s/tmp/hercules_num_act_nodes", hercules_path);
sprintf(absolute_path, "%s/tmp/hercules_num_act_nodes", hercules_path);
// Open the "hercules_num_act_nodes" file. This file should be created by
// the user application or the malleability manager.
......@@ -3957,7 +3863,7 @@ int get_number_of_active_nodes(char *hercules_path)
if (fd == -1)
{
char err_msg[MAX_ERR_MSG_LEN];
sprintf(err_msg,"ERR_HERCULES_OPEN_NUM_ACTVIES_NODES:%s", absolute_path);
sprintf(err_msg, "ERR_HERCULES_OPEN_NUM_ACTVIES_NODES:%s", absolute_path);
perror(err_msg);
return -1;
}
......@@ -4003,7 +3909,7 @@ int32_t Open_file(const char *checkpoint_dir, const char *filename)
if (fd < 0)
{
char err_msg[MAX_ERR_MSG_LEN];
sprintf(err_msg,"HERCULES_ERR_OPEN_DISK: path=%s", disk_path);
sprintf(err_msg, "HERCULES_ERR_OPEN_DISK: path=%s", disk_path);
perror(err_msg);
slog_error("%s", err_msg);
return -1;
......@@ -4053,7 +3959,7 @@ int32_t Make_directory(const char *dirname)
const char *disk_path = dirname;
int ret = 1;
struct stat sb;
//sprintf(disk_path, "/beegfs/home/javier.garciablas/hercules/bash/tests/disk/output/%s", dirname);
// sprintf(disk_path, "/beegfs/home/javier.garciablas/hercules/bash/tests/disk/output/%s", dirname);
if (stat(disk_path, &sb) == 0 && S_ISDIR(sb.st_mode))
{
fprintf(stderr, "directory %s exists\n", disk_path);
......@@ -4071,4 +3977,3 @@ int32_t Make_directory(const char *dirname)
}
return ret;
}
This diff is collapsed.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment