An error occurred while loading the file. Please try again.
-
eda729bf
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/signal.h>
#include "metadata_stat.h"
#include "memalloc.h"
#include "directory.h"
#include "records.hpp"
#include "map_ep.hpp"
#include <inttypes.h>
#include <unistd.h>
// Pointer to the tree's root node.
extern GNode *tree_root;
extern pthread_mutex_t tree_mut;
extern int32_t __thread current_dataset; // Dataset whose policy has been set last.
extern dataset_info __thread curr_dataset; // Currently managed dataset.
extern imss __thread curr_imss;
// Initial buffer address.
extern char *buffer_address;
// Set of locks dealing with the memory buffer access.
extern pthread_mutex_t *region_locks;
// Segment size (amount of memory assigned to each thread).
extern uint64_t buffer_segment;
char POLICY[MAX_POLICY_LEN];
extern ucp_worker_h *ucp_worker_threads;
extern ucp_address_t **local_addr;
extern size_t *local_addr_len;
extern StsHeader *mem_pool;
struct arguments args;
std::shared_ptr<map_records> g_map;
/* UCP objects */
ucp_context_h ucp_context;
ucp_worker_h ucp_worker;
// ucp_ep_h pub_ep;
ucp_address_t *req_addr;
ucp_ep_h **metadata_endpoints;
size_t req_addr_len;
unsigned long number_active_storage_servers = 0; // stores the current number of active storage servers.
pthread_t *threads;
// global variables usted to finish threads.
extern int global_finish_threads;
extern int global_finish_checkpoint;
extern int global_finish_snapshot;
extern int global_server_fd_thread;
extern pthread_cond_t global_finish_cond;
extern pthread_cond_t global_run_snapshot_cond;
extern pthread_cond_t global_run_checkpoint_cond;
extern pthread_mutex_t global_finish_mut;
#define RAM_STORAGE_USE_PCT 0.75f // percentage of free system RAM to be used for storage
/**
* @brief Re-distribute the blocks of this server to another servers
* following the distribution policy choose by the user.
* @return 0 on success, on error -1 is returned.
*/
int move_blocks_2_server(uint64_t stat_port, uint32_t server_id, char *imss_uri, std::shared_ptr<map_records> map)
{
// Creates endpoints to all data servers. It is use in case of
// malleability to move blocks between data servers.
7172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
slog_debug("Connecting to data servers\n");
open_imss(imss_uri); // TODO: Check if this is still necessary due we called it on the main function.
if (number_active_storage_servers < 0)
{
slog_fatal("Error creating HERCULES's resources, the process cannot be started");
return -1;
}
// Here data server should to move the datablocks.
// print all key/value elements.
double time_taken;
time_t t = clock();
void *address_;
uint64_t block_size;
int curr_map_size = 0;
const char *uri_;
size_t size;
char key_[REQUEST_SIZE];
// Get the number of blocks stored by this data server.
int number_of_blocks_2_move = map->size();
slog_info("Server %d, has %d blocks, active storage servers=%lu", args.id, map->size(), number_active_storage_servers);
while ((curr_map_size = map->size()) > 0 && number_active_storage_servers > 0)
{
std::string key;
// get next key (block identifier) with the format <block_name>$<block_number>
// for example "myfile$199", where block_name = myfile, and block_number = 199.
key = map->get_head_element();
// get the element data and store it in "address_".
map->get(key, &address_, &block_size);
// fprintf(stderr, "**** curr_map_size=%d, head element=%s, block_size=%ld\n", curr_map_size, key.c_str(), block_size);
slog_debug("**** curr_map_size=%d, head element=%s, block_size=%ld\n", curr_map_size, key.c_str(), block_size);
int pos = key.find('$') + 1; // +1 to skip '$' on the block number.
std::string block = key.substr(pos, key.length() + 1); // substract the block number from the key.
int block_number = stoi(block, 0, 10); // string to number.
pos -= 1; // -1 to skip '$' on the data uri.
std::string data_uri = key.substr(0, pos); // substract the data uri from the key.
slog_debug("key='%s',\turi='%s',\tblock='%s'\n", key.c_str(), data_uri.c_str(), block.c_str());
int next_server = find_server(number_active_storage_servers, block_number, data_uri.c_str(), 0);
slog_info("key='%s',\turi='%s%s',\tfrom server %d to server %d,\tactive servers=%lu\n", key.c_str(), data_uri.c_str(), block.c_str(), server_id, next_server, number_active_storage_servers);
slog_debug("new server=%d, curr_server=%d\n", next_server, server_id);
// here we can send key.c_str() directly to reduce the number of operations.
if (set_data_server(data_uri.c_str(), block_number, address_, block_size, 0, next_server) < 0)
{
slog_error("ERR_HERCULES_SET_DATA_IN_SERVER\n");
perror("ERR_HERCULES_SET_DATA_IN_SERVER");
return -1;
}
// delete the element from the map.
map->erase_head_element();
// get new map size to print it.
curr_map_size = map->size();
// fprintf(stderr, "**** curr_map_size=%d\n", curr_map_size);
slog_debug("**** curr_map_size=%d\n", curr_map_size);
}
t = clock() - t;
time_taken = ((double)t) / (CLOCKS_PER_SEC);
if (number_active_storage_servers > 0)
{
// fprintf(stderr, "[HS] Data movement %d blocks %lu %f sec.\n", number_of_blocks_2_move, number_active_storage_servers, time_taken);
fprintf(stderr, "\033[0;34m [HS] Server %d has moved %d blocks to %lu servers in %f sec. \033[0m\n", args.id, number_of_blocks_2_move, number_active_storage_servers, time_taken);
}