server.c 36.69 KiB
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/signal.h>
#include "metadata_stat.h"
#include "memalloc.h"
#include "directory.h"
#include "records.hpp"
#include "map_ep.hpp"
#include <inttypes.h>
#include <unistd.h>
// Pointer to the tree's root node.
extern GNode *tree_root;
extern pthread_mutex_t tree_mut;
extern int32_t __thread current_dataset;   // Dataset whose policy has been set last.
extern dataset_info __thread curr_dataset; // Currently managed dataset.
extern imss __thread curr_imss;
// Initial buffer address.
extern char *buffer_address;
// Set of locks dealing with the memory buffer access.
extern pthread_mutex_t *region_locks;
// Segment size (amount of memory assigned to each thread).
extern uint64_t buffer_segment;
char POLICY[MAX_POLICY_LEN];
extern ucp_worker_h *ucp_worker_threads;
extern ucp_address_t **local_addr;
extern size_t *local_addr_len;
extern StsHeader *mem_pool;
struct arguments args;
std::shared_ptr<map_records> g_map;
/* UCP objects */
ucp_context_h ucp_context;
ucp_worker_h ucp_worker;
// ucp_ep_h pub_ep;
ucp_address_t *req_addr;
ucp_ep_h **metadata_endpoints;
size_t req_addr_len;
unsigned long number_active_storage_servers = 0; // stores the current number of active storage servers.
pthread_t *threads;
// global variables usted to finish threads.
extern int global_finish_threads;
extern int global_finish_checkpoint;
extern int global_finish_snapshot;
extern int global_server_fd_thread;
extern pthread_cond_t global_finish_cond;
extern pthread_cond_t global_run_snapshot_cond;
extern pthread_cond_t global_run_checkpoint_cond;
extern pthread_mutex_t global_finish_mut;
#define RAM_STORAGE_USE_PCT 0.75f // percentage of free system RAM to be used for storage
/**
 * @brief Re-distribute the blocks of this server to another servers
 * following the distribution policy choose by the user.
 * @return 0 on success, on error -1 is returned.
int move_blocks_2_server(uint64_t stat_port, uint32_t server_id, char *imss_uri, std::shared_ptr<map_records> map)
	// Creates endpoints to all data servers. It is use in case of
	// malleability to move blocks between data servers.
7172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
slog_debug("Connecting to data servers\n"); open_imss(imss_uri); // TODO: Check if this is still necessary due we called it on the main function. if (number_active_storage_servers < 0) { slog_fatal("Error creating HERCULES's resources, the process cannot be started"); return -1; } // Here data server should to move the datablocks. // print all key/value elements. double time_taken; time_t t = clock(); void *address_; uint64_t block_size; int curr_map_size = 0; const char *uri_; size_t size; char key_[REQUEST_SIZE]; // Get the number of blocks stored by this data server. int number_of_blocks_2_move = map->size(); slog_info("Server %d, has %d blocks, active storage servers=%lu", args.id, map->size(), number_active_storage_servers); while ((curr_map_size = map->size()) > 0 && number_active_storage_servers > 0) { std::string key; // get next key (block identifier) with the format <block_name>$<block_number> // for example "myfile$199", where block_name = myfile, and block_number = 199. key = map->get_head_element(); // get the element data and store it in "address_". map->get(key, &address_, &block_size); // fprintf(stderr, "**** curr_map_size=%d, head element=%s, block_size=%ld\n", curr_map_size, key.c_str(), block_size); slog_debug("**** curr_map_size=%d, head element=%s, block_size=%ld\n", curr_map_size, key.c_str(), block_size); int pos = key.find('$') + 1; // +1 to skip '$' on the block number. std::string block = key.substr(pos, key.length() + 1); // substract the block number from the key. int block_number = stoi(block, 0, 10); // string to number. pos -= 1; // -1 to skip '$' on the data uri. std::string data_uri = key.substr(0, pos); // substract the data uri from the key. slog_debug("key='%s',\turi='%s',\tblock='%s'\n", key.c_str(), data_uri.c_str(), block.c_str()); int next_server = find_server(number_active_storage_servers, block_number, data_uri.c_str(), 0); slog_info("key='%s',\turi='%s%s',\tfrom server %d to server %d,\tactive servers=%lu\n", key.c_str(), data_uri.c_str(), block.c_str(), server_id, next_server, number_active_storage_servers); slog_debug("new server=%d, curr_server=%d\n", next_server, server_id); // here we can send key.c_str() directly to reduce the number of operations. if (set_data_server(data_uri.c_str(), block_number, address_, block_size, 0, next_server) < 0) { slog_error("ERR_HERCULES_SET_DATA_IN_SERVER\n"); perror("ERR_HERCULES_SET_DATA_IN_SERVER"); return -1; } // delete the element from the map. map->erase_head_element(); // get new map size to print it. curr_map_size = map->size(); // fprintf(stderr, "**** curr_map_size=%d\n", curr_map_size); slog_debug("**** curr_map_size=%d\n", curr_map_size); } t = clock() - t; time_taken = ((double)t) / (CLOCKS_PER_SEC); if (number_active_storage_servers > 0) { // fprintf(stderr, "[HS] Data movement %d blocks %lu %f sec.\n", number_of_blocks_2_move, number_active_storage_servers, time_taken); fprintf(stderr, "\033[0;34m [HS] Server %d has moved %d blocks to %lu servers in %f sec. \033[0m\n", args.id, number_of_blocks_2_move, number_active_storage_servers, time_taken); }