Commit 2399fae4 authored by Genaro Juan Sánchez Gallegos's avatar Genaro Juan Sánchez Gallegos Committed by Javier Garcia Blas
Browse files

Add improvements when multiple metadata servers are deployed.

1 merge request!5Merge from Debug into master
Showing with 118 additions and 101 deletions
+118 -101
......@@ -748,13 +748,14 @@ int32_t main(int32_t argc, char **argv)
{
// Create the tree_root node.
char *root_data = (char *)calloc(8, sizeof(char));
strcpy(root_data, "imss://");
// strcpy(root_data, "imss://");
strcpy(root_data, args.imss_uri);
tree_root = g_node_new((void *)root_data);
if (pthread_mutex_init(&tree_mut, NULL) != 0)
{
perror("HERCULES_ERR_TREE_MUT_INIT");
pthread_exit(NULL);
exit(1);
}
}
......
......@@ -415,7 +415,6 @@ int imss_getattr(const char *path, struct stat *stbuf)
int imss_readdir(const char *path, void *buf, posix_fill_dir_t filler, off_t offset)
{
// fprintf(stderr,"imss_readdir=%s\n",path);
// Needed variables for the call
char *buffer;
char **refs;
......@@ -432,14 +431,14 @@ int imss_readdir(const char *path, void *buf, posix_fill_dir_t filler, off_t off
// strcat(imss_path, "/");
// }
slog_debug("[IMSS][imss_readdir] imss_path=%s", imss_path);
slog_debug("[IMSS] imss_path=%s", imss_path);
// Call IMSS to get metadata
n_ent = get_dir((char *)imss_path, &buffer, &refs);
slog_debug("[IMSS][imss_readdir] imss_path=%s, n_ent=%d", imss_path, n_ent);
slog_debug("[IMSS] imss_path=%s, n_ent=%d", imss_path, n_ent);
if (n_ent < 0)
{
strcat(imss_path, "/");
slog_debug("[IMSS][imss_readdir] imss_path=%s", imss_path);
slog_debug("[IMSS] imss_path=%s", imss_path);
// fprintf(stderr,"try again imss_path=%s\n",imss_path);
n_ent = get_dir((char *)imss_path, &buffer, &refs);
if (n_ent < 0)
......@@ -448,32 +447,32 @@ int imss_readdir(const char *path, void *buf, posix_fill_dir_t filler, off_t off
return -ENOENT;
}
}
slog_debug("[IMSS][imss_readdir] Before flush data");
slog_debug("[IMSS]Before flush data");
// flush_data();
// Fill buffer
// TODO: Check if subdirectory
// printf("[FUSE] imss_readdir %s has=%d\n",path, n_ent);
slog_debug("[IMSS][imss_readdir] imss_readdir %s has=%d", path, n_ent);
slog_debug("[IMSS] imss_readdir %s has=%d", path, n_ent);
for (int i = 0; i < n_ent; ++i)
{
if (i == 0)
{
// slog_debug("[IMSS][imss_readdir] . y ..");
// slog_debug("[IMSS]. y ..");
filler(buf, "..", NULL, 0);
filler(buf, ".", NULL, 0);
}
else
{
// slog_debug("[IMSS][imss_readdir] %s", refs[i]);
// slog_debug("[IMSS]%s", refs[i]);
// the stbuf is not used after here.
// struct stat stbuf;
// int error = imss_getattr(refs[i] + 6, &stbuf);
// if (!error)
{
char *last = refs[i] + strlen(refs[i]) - 1;
// slog_info("last=%s", last);
slog_info("last=%s of %s", last, refs);
if (last[0] == '/')
{
last[0] = '\0';
......
......@@ -31,15 +31,14 @@ GTree_search_(GNode *parent_node,
// HAVE TO CHECK IF IT IS A DIRECTORY OR A FILE
// For this i check if it has at the end /
slog_debug("child->data=%s, desired_data=%s", child->data, desired_data);
if (desired_data[strlen(desired_data) - 1] == '/' && !strncmp((char *)child->data, desired_data, strlen((char *)child->data)))
{
{ // directory case.
slog_debug("directory case");
// Check if the compared node is the requested one.
int a = 1;
if (!strcmp((char *)child->data, desired_data))
{
*found_node = child;
// The desired data was found.
return 1;
}
......@@ -50,12 +49,12 @@ GTree_search_(GNode *parent_node,
}
}
else if (desired_data[strlen(desired_data) - 1] != '/' && !strncmp((char *)child->data, desired_data, strlen((char *)child->data)))
{
{ // regular file.
slog_debug("regular file");
// Check if the compared node is the requested one.
if (!strcmp((char *)child->data, desired_data))
{
*found_node = child;
// The desired data was found.
return 1;
}
......@@ -65,7 +64,7 @@ GTree_search_(GNode *parent_node,
// CHECK THE NUMBERS OF '/' IN THE PATHS TO SEE IF WE ARRIVE TO THE DIRECTORY
int amount = 0;
for (int32_t j = 0; j < strlen(desired_data) - 1; j++)
{
{ // counts the number of "/" on the path.
if (desired_data[j] == '/')
{
amount = amount + 1;
......@@ -73,14 +72,15 @@ GTree_search_(GNode *parent_node,
}
int amount_child = 0;
char *path_child = (char *)child->data;
slog_debug("path_child=%s, desired_data=%s", path_child, desired_data);
for (int32_t j = 0; j < strlen(path_child) - 1; j++)
{
{ // counts the number of "/" on the child path.
if (path_child[j] == '/')
{
amount_child = amount_child + 1;
}
}
slog_debug("amount=%d, amount_child=%d", amount, amount_child);
if (amount == amount_child)
{
// Move on to the following child.
......@@ -97,6 +97,7 @@ GTree_search_(GNode *parent_node,
child = child->next;
}
last_parent = parent_node;
slog_debug("last_parent=%s", last_parent->data);
return 0;
}
......@@ -227,10 +228,9 @@ GTree_insert(char *desired_data)
{
// Closest node to the one requested (or even the requested one itself).
GNode *closest_node = NULL;
slog_debug("last_parent->data=%s, desired_data=%s", last_parent, desired_data);
if (last_parent != NULL)
{
char *data_search = (char *)calloc(256, sizeof(char));
if (desired_data[strlen(desired_data) - 1] == '/')
{
......@@ -261,6 +261,7 @@ GTree_insert(char *desired_data)
{
if (GTree_search(tree_root, desired_data, &closest_node))
{
slog_debug("closest_node=%s", closest_node);
return 0;
}
}
......@@ -276,7 +277,6 @@ GTree_insert(char *desired_data)
if (!more_chars && (closest_data_length == 2))
{
more_chars = 1;
closest_data_length--;
}
......@@ -285,7 +285,7 @@ GTree_insert(char *desired_data)
for (int32_t i = 0; i < more_chars; i++)
{
int32_t new_position = closest_data_length + i;
slog_debug("[Gtree] path=%s, new_position=%d, i=%d, %c", desired_data, new_position, i, desired_data[new_position]);
// slog_debug("[Gtree] path=%s, new_position=%d, i=%d, %c", desired_data, new_position, i, desired_data[new_position]);
if ((desired_data[new_position] == '/') || (i == (more_chars - 1)))
{
......@@ -307,7 +307,6 @@ GTree_insert(char *desired_data)
g_node_append(closest_node, new_node);
return 0;
// closest_node = new_node;
}
}
......@@ -409,18 +408,18 @@ GTree_getdir(char *desired_dir,
// Number of children of the directory node.
uint32_t num_children = g_node_n_children(dir_node);
*numdir_elems = num_children + 1; //+1 because of the actual directory + childrens
slog_info("[GTree_getdir] num_children=%d", num_children);
slog_info("num_children=%d", *numdir_elems);
// Buffer containing the whole set of elements within a certain directory.
// char * dir_elements = (char *) malloc(sizeof(char)*num_elements_indir*URI_);
// char *dir_elements = (char *) malloc(sizeof(char)*num_elements_indir*URI_);
char *dir_elements = (char *)malloc((num_children + 1) * URI_);
char *aux_dir_elem = dir_elements;
// Call the serialization function storing all dir elements in the buffer.
// TO CHECK!
slog_info("[GTree_getdir] serialize_dir_childrens(dir_node, num_children=%d, &aux_dir_elem)", num_children);
slog_info("serialize_dir_childrens(dir_node=%s, num_children=%d, &aux_dir_elem)", dir_node, num_children);
serialize_dir_childrens(dir_node, num_children, &aux_dir_elem);
slog_info("[GTree_getdir] ending serialize_dir_childrens, aux_dir_elem=%s", aux_dir_elem);
slog_info("ending serialize_dir_childrens, aux_dir_elem=%s", aux_dir_elem);
return dir_elements;
}
......
......@@ -509,75 +509,97 @@ uint32_t get_dir(char *requested_uri, char **buffer, char ***items)
int ret = 0;
// Discover the metadata server that shall deal with the former URI.
// uint32_t m_srv = discover_stat_srv(requested_uri);
uint32_t m_srv = find_server(n_stat_servers, 0, requested_uri, GET, TYPE_METADATA_SERVER, curr_imss.info.session_plcy);
ucp_ep_h ep = stat_eps[m_srv];
// uint32_t m_srv = find_server(n_stat_servers, 0, requested_uri, GET, TYPE_METADATA_SERVER, curr_imss.info.session_plcy);
pthread_mutex_lock(&lock_network);
// GETDIR request.
char getdir_req[REQUEST_SIZE];
sprintf(getdir_req, "%" PRIu32 " GET %d %s", stat_ids[m_srv], GETDIR, requested_uri);
slog_debug("[IMSS][get_dir] Request - %s", getdir_req);
if (send_req(ucp_worker_meta, ep, local_addr_meta, local_addr_len_meta, getdir_req) == 0)
{
pthread_mutex_unlock(&lock_network);
slog_error("HERCULES_ERR_GET_DIR_SEND_REQ");
perror("HERCULES_ERR_GET_DIR_SEND_REQ");
return -1;
}
uint32_t total_num_elements = 0;
// Get the length of the message to be received.
size_t length = 0;
length = get_recv_data_length(ucp_worker_meta, local_meta_uid);
if (length == 0)
char **arr_elements = (char **)malloc(n_stat_servers * sizeof(char *));
int arr_lengths[n_stat_servers] = {0};
// Search in all servers.
for (int m_srv = 0; m_srv < n_stat_servers; m_srv++)
{
pthread_mutex_unlock(&lock_network);
perror("HERCULES_ERR_GET_DIR_GET_RECV_DATA_LENGTH");
slog_error("HERCULES_ERR_GET_DIR_GET_RECV_DATA_LENGTH");
return -1;
}
void *elements = malloc(length);
// Retrieve the set of elements within the requested uri.
ret = recv_dynamic_stream(ucp_worker_meta, ep, elements, BUFFER, local_meta_uid, length);
if (ret < 0)
{
pthread_mutex_unlock(&lock_network);
perror("HERCULES_ERR_GET_DIR_RECV_STREAM");
slog_error("HERCULES_ERR_GET_DIR_RECV_STREAM");
free(elements);
return -1;
}
ucp_ep_h ep = stat_eps[m_srv];
if (!strncmp("$ERRIMSS_NO_KEY_AVAIL$", (const char *)elements, 22))
{
pthread_mutex_unlock(&lock_network);
slog_error("HERCULES_ERR_GET_DIR_NODIR");
free(elements);
return -1;
}
// GETDIR request.
sprintf(getdir_req, "%" PRIu32 " GET %d %s", stat_ids[m_srv], GETDIR, requested_uri);
slog_debug("[IMSS] Request - %s", getdir_req);
if (send_req(ucp_worker_meta, ep, local_addr_meta, local_addr_len_meta, getdir_req) == 0)
{
pthread_mutex_unlock(&lock_network);
slog_error("HERCULES_ERR_GET_DIR_SEND_REQ");
perror("HERCULES_ERR_GET_DIR_SEND_REQ");
return -1;
}
// Get the length of the message to be received.
size_t length = 0;
length = get_recv_data_length(ucp_worker_meta, local_meta_uid);
if (length == 0)
{
pthread_mutex_unlock(&lock_network);
perror("HERCULES_ERR_GET_DIR_GET_RECV_DATA_LENGTH");
slog_error("HERCULES_ERR_GET_DIR_GET_RECV_DATA_LENGTH");
return -1;
}
char *elements = (char *)malloc(length * sizeof(char));
// Retrieve the set of elements within the requested uri.
ret = recv_dynamic_stream(ucp_worker_meta, ep, elements, BUFFER, local_meta_uid, length);
if (ret < 0)
{
pthread_mutex_unlock(&lock_network);
perror("HERCULES_ERR_GET_DIR_RECV_STREAM");
slog_error("HERCULES_ERR_GET_DIR_RECV_STREAM");
free(elements);
return -1;
}
uint32_t elements_size = ret;
uint32_t num_elements = elements_size / URI_;
*items = (char **)calloc(num_elements, sizeof(char *));
if (!strncmp("$ERRIMSS_NO_KEY_AVAIL$", (const char *)elements, 22))
{
pthread_mutex_unlock(&lock_network);
perror("HERCULES_ERR_GET_DIR_NODIR");
slog_error("HERCULES_ERR_GET_DIR_NODIR");
free(elements);
return -1;
}
uint32_t elements_size = ret;
uint32_t num_elements = elements_size / URI_;
total_num_elements += num_elements;
// *items = (char **)calloc(num_elements, sizeof(char *));
arr_elements[m_srv] = elements;
arr_lengths[m_srv] = num_elements;
}
// Identify each element within the buffer provided.
char *curr = (char *)elements;
for (int32_t i = 0; i < num_elements; i++)
*items = (char **)calloc(total_num_elements, sizeof(char *));
int pos = 0;
for (int m_srv = 0; m_srv < n_stat_servers; m_srv++)
{
// slog_debug("[IMSS][get_dir] item %d -- calloc", i);
(*items)[i] = (char *)calloc(URI_, 1);
// slog_debug("[IMSS][get_dir] item %d -- memcpy", i);
memcpy((*items)[i], curr, URI_);
//(*items)[i] = elements;
// slog_debug("[IMSS][get_dir] item %d -- curr+=URI", i);
curr += URI_;
// slog_debug("[IMSS][get_dir] item %d: %s", i, (*items)[i]);
// char *curr = (char *)elements;
slog_debug("arr_lengths[%d]=%d, total_num_elements=%d", m_srv, arr_lengths[m_srv], total_num_elements);
char *curr = (char *)arr_elements[m_srv];
for (int32_t i = 0; i < arr_lengths[m_srv]; i++)
{
// slog_debug("[IMSS] item %d -- calloc", i);
(*items)[pos] = (char *)calloc(URI_, 1);
// slog_debug("[IMSS] item %d -- memcpy", i);
memcpy((*items)[pos], curr, URI_);
pos++;
//(*items)[i] = elements;
// slog_debug("[IMSS] item %d -- curr+=URI", i);
curr += URI_;
// slog_debug("[IMSS] item %d: %s", i, (*items)[i]);
}
free(arr_elements[m_srv]);
}
free(elements);
slog_debug("[IMSS] Ending, num_elements=%d", num_elements);
// free(elements);
free(arr_elements);
slog_debug("[IMSS] Ending, total_num_elements=%d", total_num_elements);
pthread_mutex_unlock(&lock_network);
return num_elements;
return total_num_elements;
}
/**********************************************************************************/
......@@ -1239,7 +1261,6 @@ int32_t create_dataset(char *dataset_uri,
// uint32_t m_srv = discover_stat_srv(new_dataset.uri_);
uint32_t m_srv = find_server(n_stat_servers, 0, new_dataset.uri_, GET, TYPE_METADATA_SERVER, curr_imss.info.session_plcy);
char formated_uri[REQUEST_SIZE];
// sprintf(formated_uri, "%" PRIu32 " SET %lu %s", stat_ids[m_srv], msg_size, new_dataset.uri_);
sprintf(formated_uri, "%" PRIu32 " SET %lu %s", opened, msg_size, new_dataset.uri_);
......@@ -3705,7 +3726,7 @@ int32_t get_type(const char *uri)
ep = stat_eps[m_srv];
sprintf(formated_uri, "%d GET 0 %s", 0, uri);
slog_info("[IMSS][get_type] Request - '%s'", formated_uri);
slog_info("[IMSS] Request - '%s'", formated_uri);
// printf("get_type=%s",uri);
// Send the request.
if (send_req(ucp_worker_meta, ep, local_addr_meta, local_addr_len_meta, formated_uri) == 0)
......@@ -3734,7 +3755,7 @@ int32_t get_type(const char *uri)
result = malloc(length);
ret = recv_dynamic_stream(ucp_worker_meta, ep, result, BUFFER, local_meta_uid, length);
slog_debug("[IMSS][get_type] after recv_dynamic_stream ret=%d", ret);
slog_debug("[IMSS] after recv_dynamic_stream ret=%d", ret);
if (ret < 0)
{
......
......@@ -1710,7 +1710,7 @@ int stat_worker_helper(p_argv *arguments, char *req)
// raw_msg[req_size] = '\0';
// printf("*********worker_metadata raw_msg %s",raw_msg);
slog_info("[workers][stat_worker_helper] request received=%s", req);
slog_info("[workers] request received=%s", req);
//fprintf(stderr,"Req=%s\n", req);
// Reference to the client request.
......@@ -1722,7 +1722,7 @@ int stat_worker_helper(p_argv *arguments, char *req)
// memcpy(uri, raw_msg, number_length + 1);
uint64_t block_size_recv = (uint64_t)atoi(number);
slog_info("[workers][stat_worker_helper] operation=%d, number=%s, uri=%s, block_size_recv=%ld", operation, number, uri_, block_size_recv);
slog_info("[workers] operation=%d, number=%s, uri=%s, block_size_recv=%ld", operation, number, uri_, block_size_recv);
// Create an std::string in order to be managed by the map structure.
std::string key;
......@@ -1748,9 +1748,9 @@ int stat_worker_helper(p_argv *arguments, char *req)
int32_t numelems_indir;
// Retrieve all elements inside the requested directory.
pthread_mutex_lock(&tree_mut);
slog_info("[workers][stat_worker_helper] Calling GTree_getdir, key=%s", key.c_str());
slog_info("[workers] Calling GTree_getdir, key=%s", key.c_str());
buffer = GTree_getdir((char *)key.c_str(), &numelems_indir);
slog_info("[workers][stat_worker_helper] Ending GTree_getdir, key=%s, numelems_indir=%d", key.c_str(), numelems_indir);
slog_info("[workers] Ending GTree_getdir, key=%s, numelems_indir=%d", key.c_str(), numelems_indir);
pthread_mutex_unlock(&tree_mut);
if (buffer == NULL)
{
......@@ -1767,7 +1767,7 @@ int stat_worker_helper(p_argv *arguments, char *req)
m.size = numelems_indir * URI_;
m.data = buffer;
slog_info("[workers][stat_worker_helper] MSG, numelems_indir=%ld, size=%ld", numelems_indir, m.size);
slog_info("[workers] MSG, numelems_indir=%ld, size=%ld", numelems_indir, m.size);
if (send_dynamic_stream(arguments->ucp_worker, arguments->server_ep, (char *)&m, MSG, arguments->worker_uid) < 0)
{
......@@ -1775,7 +1775,7 @@ int stat_worker_helper(p_argv *arguments, char *req)
return -1;
}
slog_debug("[workers][stat_worker_helper] buffer=%s", buffer);
slog_debug("[workers] buffer=%s", buffer);
char *curr = buffer;
// char *item = (char *)malloc(URI_ * sizeof(char));
......@@ -1792,6 +1792,7 @@ int stat_worker_helper(p_argv *arguments, char *req)
}
case READ_OP:
{
slog_debug("[READ_OP]");
// Check if there was an associated block to the key.
int err = map->get(key, &address_, &block_size_rtvd);
slog_debug("[STAT WORKER] map->get (key %s, block_size_rtvd %ld) get res %d", key.c_str(), block_size_rtvd, err);
......@@ -1908,7 +1909,7 @@ int stat_worker_helper(p_argv *arguments, char *req)
std::string old_key = key.substr(0, found);
std::string new_key = key.substr(found + 1, key.length());
slog_debug("[stat_worker_helper][RENAME] old_key=%s, new_key=%s\n", old_key.c_str(), new_key.c_str());
slog_debug("[RENAME] old_key=%s, new_key=%s\n", old_key.c_str(), new_key.c_str());
// RENAME MAP
int32_t result = map->rename_metadata_stat_worker(old_key, new_key);
......@@ -1916,13 +1917,13 @@ int stat_worker_helper(p_argv *arguments, char *req)
if (result == 0)
{
// printf("0 elements rename from stat_worker");
slog_warn("[stat_worker_helper][RENAME] 0 elements rename from stat_worker");
slog_warn("[RENAME] 0 elements rename from stat_worker");
break;
}
// RENAME TREE
int ret = GTree_rename((char *)old_key.c_str(), (char *)new_key.c_str());
slog_debug("[stat_worker_helper][RENAME] GTree_rename=%d", ret);
slog_debug("[RENAME] GTree_rename=%d", ret);
}
char release_msg[] = "RENAME\0";
......@@ -2075,7 +2076,6 @@ int stat_worker_helper(p_argv *arguments, char *req)
if (!map->get(key, &address_, &block_size_rtvd))
{
pthread_mutex_unlock(&mp);
// slog_debug("[STAT WORKER] Adding new block %p", &address_);
slog_debug("[STAT WORKER] Recv dynamic buffer size %ld", block_size_recv);
// Get the length of the message to be received.
......@@ -2094,8 +2094,6 @@ int stat_worker_helper(p_argv *arguments, char *req)
dataset_info *struct_ = (dataset_info *)buffer;
// ret = recv_dynamic_stream(arguments->ucp_worker, arguments->server_ep, buffer, BUFFER, arguments->worker_uid, length);
// length = recv_dynamic_stream_opt(arguments->ucp_worker, arguments->server_ep, &buffer, BUFFER, arguments->worker_uid, length);
slog_debug("[STAT WORKER] END Recv dynamic, n_server_when_created=%d", struct_->n_servers_when_created);
if (ret < 0)
......@@ -2107,7 +2105,6 @@ int stat_worker_helper(p_argv *arguments, char *req)
}
int32_t insert_successful = -1;
// insert_successful = map->put(key, buffer, block_size_recv);
insert_successful = map->put(key, buffer, length);
slog_debug("[STAT WORKER] map->put (key %s) err %d", key.c_str(), insert_successful);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment