Commit 5f42c79f authored by Genaro Juan Sánchez Gallegos's avatar Genaro Juan Sánchez Gallegos Committed by Javier Garcia Blas
Browse files

Add improvements getting the offset from the server.

No related merge requests found
Showing with 54 additions and 48 deletions
+54 -48
......@@ -67,7 +67,7 @@ pthread_t *threads;
extern int global_finish_threads;
extern int global_server_fd_thread;
#define SHM_SIZE 1 * 1024 * 1024 * 1024
#define SHM_SIZE 20L * 1024L * 1024L * 1024L
#define RAM_STORAGE_USE_PCT 0.75f // percentage of free system RAM to be used for storage
......@@ -448,7 +448,6 @@ int32_t main(int32_t argc, char **argv)
return 0;
}
sprintf(tmp_file_path, "/tmp/%c-hercules-%d-start", args.type, args.id);
cfg = cfg_init();
......@@ -639,6 +638,12 @@ int32_t main(int32_t argc, char **argv)
sprintf(log_path, "./%c-server-%d.%02d-%02d-%02d", args.type, args.id, tm.tm_hour, tm.tm_min, tm.tm_sec);
// sprintf(log_path, "./%c-server", args.type);
slog_init(log_path, IMSS_DEBUG_LEVEL, IMSS_DEBUG_FILE, IMSS_DEBUG_SCREEN, 1, 1, 1, args.id);
if (IMSS_DEBUG_FILE > 0)
{
printf("Log path = %s\n", log_path);
fflush(stdout);
}
// fprintf(stderr, "IMSS DEBUG FILE AT %s\n", log_path);
slog_info(",Time(msec), Comment, RetCode");
......@@ -1141,6 +1146,7 @@ int32_t main(int32_t argc, char **argv)
time_taken = ((double)t) / (CLOCKS_PER_SEC);
ready(tmp_file_path, "OK");
printf("Server %d is ready\n", args.id);
if (pthread_join(threads[i], NULL) != 0)
{
perror("ERR_HERCULES_SERVER_THREAD_JOIN");
......
#!/bin/bash
SCRIPT_NAME="ior_hercules_slurm.sh"
FILE_SIZE=$((1024*1024*1))
#FILE_SIZE=$((1024*1024*100))
#FILE_SIZE=$((1024*1024*1))
FILE_SIZE=$((1024*1024*100))
ATTACHED=1
#TEST_TYPE="weak"
TEST_TYPE="strong"
......@@ -11,9 +11,9 @@ TEMPLATE_CONFIG_PATH="../conf/hercules-template.conf"
POLICY="LOCAL"
NUM_SERVERS_RANGE=( 1 )
NUM_SERVERS_RANGE=( 8 )
#NUM_SERVERS_RANGE=( 1 4 8 16 32 )
NODES_FOR_CLIENTS_RANGE=( 1 )
NODES_FOR_CLIENTS_RANGE=( 8 )
#NODES_FOR_CLIENTS_RANGE=( 1 4 8 16 32 )
#CLIENTS_PER_NODE_RANGE=( 1 2 4 8 16 32 )
CLIENTS_PER_NODE_RANGE=( 1 )
......
......@@ -81,7 +81,6 @@ int32_t MALLEABILITY_TYPE;
int32_t UPPER_BOUND_SERVERS;
int32_t LOWER_BOUND_SERVERS;
// const char *TESTX = "imss://lorem_text.txt$1";
// const char *TESTX = "imss://wfc1.dat$1";
// const char *TESTX = "p4x2.save/wfc1.dat";
......@@ -346,7 +345,10 @@ int imss_getattr(const char *path, struct stat *stbuf)
pthread_mutex_lock(&lock_file);
map_erase(map, imss_path);
pthread_mutex_unlock(&lock_file);
map_release_prefetch(map_prefetch, path);
// map_release_prefetch(map_prefetch, path);
slog_debug("[imss_posix_api] Calling release prefetch path = %s", imss_path);
map_release_prefetch(map_prefetch, imss_path);
slog_debug("[imss_posix_api] Ending release prefetch path = %s", imss_path);
return -ENOENT;
case 1: // Directory case?
if ((n_ent = get_dir((char *)imss_path, &buffer, &refs)) != -1)
......@@ -392,7 +394,7 @@ int imss_getattr(const char *path, struct stat *stbuf)
{
int ret = 0;
// slog_live("[imss_getattr] IMSS_BLKSIZE=%lu KBytes, IMSS_DATA_BSIZE=%lu Bytes", IMSS_BLKSIZE, IMSS_DATA_BSIZE);
void *data = (void *)malloc(IMSS_DATA_BSIZE);
void *data = (void *)malloc(IMSS_DATA_BSIZE*sizeof(char));
// void *data = NULL;
// data is allocated in "get data".
ret = get_data(ds, 0, data);
......@@ -655,7 +657,6 @@ int imss_open(char *path, uint64_t *fh)
return 0;
}
int performance = 0;
ssize_t total_amount_read = 0;
......@@ -695,8 +696,7 @@ ssize_t imss_sread(const char *path, void *buf, size_t size, off_t offset)
end_blk = ceil((double)(offset + stats.st_size) / IMSS_DATA_BSIZE);
}
slog_debug("[imss_read] TotalSizeToRead=%ld (%ld kb), start_offset=%ld, curr_blk=%ld, end_blk=%ld, num_of_blks=%ld, offset=%ld, end_offset=%ld, IMSS_DATA_BSIZE=%ld, stats.st_size=%ld", size, size/1024, start_offset, curr_blk, end_blk, num_of_blk, offset, end_offset, IMSS_DATA_BSIZE, stats.st_size);
slog_debug("[imss_read] TotalSizeToRead=%ld (%ld kb), start_offset=%ld, curr_blk=%ld, end_blk=%ld, num_of_blks=%ld, offset=%ld, end_offset=%ld, IMSS_DATA_BSIZE=%ld, stats.st_size=%ld", size, size / 1024, start_offset, curr_blk, end_blk, num_of_blk, offset, end_offset, IMSS_DATA_BSIZE, stats.st_size);
// Check if offset is bigger than filled, return 0 because is EOF case
if (offset >= stats.st_size)
......@@ -821,7 +821,7 @@ ssize_t imss_sread(const char *path, void *buf, size_t size, off_t offset)
{
return to_read;
}
block_offset = 0;
// memcpy(buf + byte_count, aux, to_read);
......@@ -830,10 +830,9 @@ ssize_t imss_sread(const char *path, void *buf, size_t size, off_t offset)
}
total_amount_read += byte_count;
slog_read("TotalSizeToRead=%lu B (%lu kB, %lu mB), offset=%lu, total(to_read+offset)=%lu B (%lu mB), file size=%ld B (%ld mB), readed=%lu B, total_amount_read=", size, size/1024, size/1024/1024, offset, size+offset, (size+offset)/1024/10240, stats.st_size, stats.st_size/1024/1024, byte_count, total_amount_read);
// performance =
slog_read("TotalSizeToRead=%lu B (%lu kB, %lu mB), offset=%lu, total(to_read+offset)=%lu B (%lu mB), file size=%ld B (%ld mB), readed=%lu B, total_amount_read=", size, size / 1024, size / 1024 / 1024, offset, size + offset, (size + offset) / 1024 / 10240, stats.st_size, stats.st_size / 1024 / 1024, byte_count, total_amount_read);
// performance =
// free(rpath);
return byte_count;
......@@ -1492,7 +1491,7 @@ ssize_t imss_write(const char *path, const void *buf, size_t size, off_t off)
char *aux;
// char *data_pointer = (char *)buf; // points to the buffer containing all bytes to be stored
const void *data_pointer = buf; // points to the buffer containing all bytes to be stored
const char *rpath = path; // this pointer should not be free. //(char *)calloc(MAX_PATH, sizeof(char));
const char *rpath = path; // this pointer should not be free. //(char *)calloc(MAX_PATH, sizeof(char));
// get_iuri(path, rpath);
int middle = 0;
......@@ -2139,7 +2138,7 @@ int imss_create(const char *path, mode_t mode, uint64_t *fh, int opened)
// Assing file handler and create dataset
int res = 0;
res = create_dataset((char *)rpath, POLICY, N_BLKS, IMSS_BLKSIZE, REPL_FACTOR, N_SERVERS, NO_LINK, opened);
slog_live("[imss_create] create_dataset((char*)rpath:%s, POLICY:%s, N_BLKS:%ld, IMSS_BLKSIZE:%d, REPL_FACTOR:%ld, N_SERVERS:%d), res:%d", (char *)rpath, POLICY, N_BLKS, IMSS_BLKSIZE, REPL_FACTOR, N_SERVERS, res);
if (res < 0)
......@@ -2310,7 +2309,7 @@ int imss_unlink(const char *path)
// Write initial block (0).
memcpy(buff, &header, sizeof(struct stat));
slog_debug("[FUSE][imss_posix_api] header.st_nlink=%lu", header.st_nlink);
slog_debug("[imss_posix_api] header.st_nlink=%lu", header.st_nlink);
set_data(ds, 0, (char *)buff, 0, 0);
pthread_mutex_unlock(&lock);
......@@ -2336,7 +2335,10 @@ int imss_unlink(const char *path)
map_erase(map, imss_path);
pthread_mutex_unlock(&lock_file);
map_release_prefetch(map_prefetch, path);
slog_debug("[imss_posix_api] Calling map_release_prefetch %s", path);
// map_release_prefetch(map_prefetch, path);
map_release_prefetch(map_prefetch, imss_path);
slog_debug("[imss_posix_api] Finish map_release_prefetch %s", path);
// *******************************
ret = release_dataset(ds);
slog_debug("[imss_posix_api] relese_dataset ret=%d", ret);
......@@ -2345,7 +2347,7 @@ int imss_unlink(const char *path)
slog_error("ERR_HERCULES_RELEASE_DATASET");
}
ret = 3;
ret = 3;
break;
}
......
......@@ -94,7 +94,7 @@ pthread_mutex_t lock_gtree = PTHREAD_MUTEX_INITIALIZER;
// To synchronize network operations.
pthread_mutex_t lock_network = PTHREAD_MUTEX_INITIALIZER;
#define SHM_SIZE 1 * 1024 * 1024 * 1024
#define SHM_SIZE 20L * 1024L * 1024L * 1024L
int32_t imss_comm_cleanup()
{
......@@ -2676,7 +2676,7 @@ int32_t delete_dataset_srv_worker(const char *dataset_uri, int32_t dataset_id, i
// // char result[msg_length];
// char *result = (char *)malloc(msg_length * sizeof(char));
void *result = malloc(msg_length);
void *result = (void *)malloc(msg_length);
msg_length = recv_data(ucp_worker_data, ep, result, msg_length, local_data_uid, 0);
// msg_length = recv_data_opt(ucp_worker_data, ep, &result, msg_length, local_data_uid, 0);
free(result);
......@@ -3209,7 +3209,7 @@ int32_t get_data(int32_t dataset_id, int32_t data_id, void *buffer)
return -EINVAL;
}
void *response_bufer = malloc(msg_length);
void *response_bufer = malloc(msg_length*sizeof(char));
// msg_length = recv_data(ucp_worker_data, ep, buffer, msg_length, local_data_uid, 0);
msg_length = recv_data(ucp_worker_data, ep, response_bufer, msg_length, local_data_uid, 0);
......@@ -3245,9 +3245,9 @@ int32_t get_data(int32_t dataset_id, int32_t data_id, void *buffer)
// msg_length = atoi((const char *)response_bufer);
// server_offset = atol((const char *)response_bufer);
sscanf((const char *)response_bufer, "%lu %u", &server_offset, &size);
sscanf((const char *)response_bufer, "%lu %d", &server_offset, &size);
slog_info("Opening shared memory, name=%s, id=%ld, key=%d, response_bufer=%s, server_offset=%lu, size=%lu", curr_dataset.uri_, data_id, key, response_bufer, server_offset, size);
slog_info("Opening shared memory, name=%s, id=%ld, key=%d, response_bufer=%s, server_offset=%lu, size=%d", curr_dataset.uri_, data_id, key, response_bufer, server_offset, size);
slog_debug("msg_length=%ld", msg_length);
// TODO: EL PUNTERO A MEMORIA COMPARTIDA SE PUEDE MANTENER ABIERTO!
......@@ -3271,6 +3271,7 @@ int32_t get_data(int32_t dataset_id, int32_t data_id, void *buffer)
}
pthread_mutex_unlock(&lock_network);
free(response_bufer);
return (int32_t)msg_length;
}
else
......@@ -3617,9 +3618,9 @@ int32_t set_data(int32_t dataset_id, int32_t data_id, const void *buffer, size_t
// when "LOCAL" is in use, we copy the data to shared memory.
if (!strcmp(curr_dataset.policy, "LOCAL"))
{
ep = curr_imss.conns.eps[n_server_];
sprintf(key_, "LOCALSET %lu %ld %s$%d", size, offset, curr_dataset.uri_, data_id);
slog_info("[IMSS] BLOCK %d SENT TO SERVER %d with Request: %s (%d)", data_id, n_server_, key_, size);
ep = curr_imss.conns.eps[n_server_];
// send the request to the data server, indicating we will perform a local write operation (LOCALSET) to certain data block (data_id)
// in a dataset (curr_dataset.uri).
if (send_req(ucp_worker_data, ep, local_addr_data, local_addr_len_data, key_) == 0)
......@@ -3642,7 +3643,7 @@ int32_t set_data(int32_t dataset_id, int32_t data_id, const void *buffer, size_t
return -EINVAL;
}
void *msg_received = malloc(msg_length);
void *msg_received = (void *)malloc(msg_length*sizeof(char));
msg_length = recv_data(ucp_worker_data, ep, msg_received, msg_length, local_data_uid, 0);
slog_info("[IMSS] After recv_data, msg_length=%lu", msg_length);
......@@ -3687,9 +3688,10 @@ int32_t set_data(int32_t dataset_id, int32_t data_id, const void *buffer, size_t
{ // Updates the shared memory segment.
char response[strlen("TOUPDATE") + 1];
size_t server_offset = 0;
sscanf((const char *)msg_received, "%s %lu", response, &server_offset);
uint32_t block_size = 0;
sscanf((const char *)msg_received, "%s %lu %d", response, &server_offset, &block_size);
slog_info("Updating shared memory, name=%s, id=%ld, key=%d, response=%s, server_offset=%ld, buffer_size=%ld", curr_dataset.uri_, data_id, key, response, server_offset, size);
slog_info("Updating shared memory, name=%s, id=%ld, key=%d, response=%s, server_offset=%ld, block_size=%d, size=%ld", curr_dataset.uri_, data_id, key, response, server_offset, block_size, size);
// const void* old_buffer;
shared_memory = getContentSM(key, SHM_SIZE);
......@@ -3705,11 +3707,12 @@ int32_t set_data(int32_t dataset_id, int32_t data_id, const void *buffer, size_t
unlinkSM(shared_memory->content);
free(shared_memory);
}
free(msg_received);
}
else
{
sprintf(key_, "SET %lu %ld %s$%d", size, offset, curr_dataset.uri_, data_id);
slog_info("[IMSS][set_data] BLOCK %d SENT TO SERVER %d with Request: %s (%d)", data_id, n_server_, key_, size);
slog_info("[IMSS] BLOCK %d SENT TO SERVER %d with Request: %s (%d)", data_id, n_server_, key_, size);
ep = curr_imss.conns.eps[n_server_];
// send the request to the data server, indicating we will perform a write operation (SET) to certain data block (data_id)
......@@ -3800,8 +3803,6 @@ int32_t set_data_mall(int32_t dataset_id, int32_t data_id, const void *buffer, s
int32_t n_server;
clock_t t;
// size_t (*const send_choose_stream)(ucp_worker_h ucp_worker, ucp_ep_h ep, const char *msg, size_t msg_length) = (IMSS_WRITE_ASYNC == 1) ? send_istream : send_data;
// slog_debug("[IMSS][set_data]");
t = clock();
curr_imss.info.num_storages = num_storages;
......@@ -3835,7 +3836,7 @@ int32_t set_data_mall(int32_t dataset_id, int32_t data_id, const void *buffer, s
size = curr_dataset.data_entity_size;
sprintf(key_, "SET %lu %ld %s$%d", size, offset, curr_dataset.uri_, data_id);
slog_info("[IMSS][set_data] Request - '%s'", key_);
slog_info("[IMSS] Request - '%s'", key_);
ep = curr_imss.conns.eps[n_server_];
if (send_req(ucp_worker_data, ep, local_addr_data, local_addr_len_data, key_) == 0)
......@@ -3846,8 +3847,7 @@ int32_t set_data_mall(int32_t dataset_id, int32_t data_id, const void *buffer, s
return -1;
}
// slog_debug("[IMSS][set_data] send_data(curr_imss.conns.id[%ld]:%ld, key_:%s, REQUEST_SIZE:%d)", n_server_, curr_imss.conns.id[n_server_], key_, REQUEST_SIZE);
// slog_debug("[IMSS] send_data(curr_imss.conns.id[%ld]:%ld, key_:%s, REQUEST_SIZE:%d)", n_server_, curr_imss.conns.id[n_server_], key_, REQUEST_SIZE);
if (send_data(ucp_worker_data, ep, buffer, size, local_data_uid) == 0)
{
pthread_mutex_unlock(&lock_network);
......@@ -3859,8 +3859,8 @@ int32_t set_data_mall(int32_t dataset_id, int32_t data_id, const void *buffer, s
delta_us = (long) (end.tv_usec - start.tv_usec);
printf("[CLIENT] [SWRITE SEND_DATA] delta_us=%6.3f",(delta_us/1000.0F));*/
// slog_debug("[IMSS] Request set_data: client_id '%" PRIu32 "', mode 'SET', key '%s'", curr_imss.conns.id[n_server_], key_);
// slog_debug("[IMSS][set_data] send_data(curr_imss.conns.id[%ld]:%ld, curr_dataset.data_entity_size:%ld)", n_server_, curr_imss.conns.id[n_server_], curr_dataset.data_entity_size);
// slog_debug("[IMSS] Request: client_id '%" PRIu32 "', mode 'SET', key '%s'", curr_imss.conns.id[n_server_], key_);
// slog_debug("[IMSS] send_data(curr_imss.conns.id[%ld]:%ld, curr_dataset.data_entity_size:%ld)", n_server_, curr_imss.conns.id[n_server_], curr_dataset.data_entity_size);
}
t = clock() - t;
double time_taken = ((double)t) / CLOCKS_PER_SEC; // in seconds
......
......@@ -453,9 +453,10 @@ int srv_worker_helper(p_argv *arguments, const char *req)
{
// Send the requested block.
struct stat *stats;
stats = (struct stat *)address_;
slog_debug("[READ_OP][READ_OP] Send the requested block with key=%s, block_offset=%ld, block_size_rtvd=%ld kb, to_read=%ld kb, stat->st_nlink=%lu, is_shared_memory=%d", key.c_str(), block_offset, block_size_rtvd / 1024, to_read / 1024, stats->st_nlink, is_shared_memory);
// struct stat *stats;
// stats = (struct stat *)address_;
// slog_debug("[READ_OP][READ_OP] Send the requested block with key=%s, block_offset=%ld, block_size_rtvd=%ld kb, to_read=%ld kb, stat->st_nlink=%lu, is_shared_memory=%d", key.c_str(), block_offset, block_size_rtvd / 1024, to_read / 1024, stats->st_nlink, is_shared_memory);
slog_debug("[READ_OP][READ_OP] Send the requested block with key=%s, block_offset=%ld, block_size_rtvd=%ld kb, to_read=%ld kb, is_shared_memory=%d", key.c_str(), block_offset, block_size_rtvd / 1024, to_read / 1024, is_shared_memory);
size_t ret_send_data = 0;
if (is_shared_memory)
{
......@@ -870,7 +871,6 @@ int srv_worker_helper(p_argv *arguments, const char *req)
return -1;
}
// printf("WRITEV-buffer=%s",buf);
int pos = path.find('$');
std::string first_element = path.substr(0, pos + 1);
first_element = first_element + "0";
......@@ -1139,7 +1139,7 @@ int srv_worker_helper(p_argv *arguments, const char *req)
}
// Find the length of the string required to store the number, including the null terminator
int length_number = snprintf(NULL, 0, "%lu %u", global_offset, block_size_recv) + 1;
int length_number = snprintf(NULL, 0, "%lu %d", global_offset, block_size_recv) + 1;
buffer = (void *)calloc(length_number, sizeof(char));
if (buffer == NULL)
{
......@@ -1149,7 +1149,7 @@ int srv_worker_helper(p_argv *arguments, const char *req)
}
// When using shared memory, buffer will store the offset.
ret = snprintf((char *)buffer, length_number, "%lu %u", global_offset, block_size_recv);
ret = snprintf((char *)buffer, length_number, "%lu %d", global_offset, block_size_recv);
if (ret < 0)
{
perror("ERR_HERCULES_ENCODING");
......@@ -1243,7 +1243,6 @@ int srv_worker_helper(p_argv *arguments, const char *req)
else
{ // Data is in shared memory.
// Tell the client to update the shared memory.
// char answer[] = "TOUPDATE\0";
char answer[RESPONSE_SIZE];
// "address_" is the shared memory offset.
sprintf(answer, "TOUPDATE %s", (char *)address_);
......@@ -1302,7 +1301,6 @@ int srv_worker_helper(p_argv *arguments, const char *req)
else
{ // Data is in shared memory.
// Tell the client to update the shared memory.
// char answer[] = "TOUPDATE\0";
char answer[RESPONSE_SIZE];
// "address_" is the shared memory offset.
sprintf(answer, "TOUPDATE %s", (char *)address_);
......@@ -2356,7 +2354,6 @@ void *dispatcher(void *th_argv)
char *tmp_file_path = arguments->tmp_file_path;
int client = 0;
// snprintf(service, sizeof(service), "%ld", arguments->port);
// Get a socket file descriptor.
global_server_fd_thread = socket(AF_INET, SOCK_STREAM, 0);
if (global_server_fd_thread < 0)
......
......@@ -522,6 +522,7 @@ __attribute__((constructor)) void imss_posix_init(void)
if (IMSS_DEBUG_FILE > 0)
{
printf("Log path = %s\n", log_path);
fflush(stdout);
}
slog_info(",Time(msec), Comment, RetCode");
......@@ -1205,7 +1206,7 @@ pid_t fork(void)
// sprintf(log_path, "%s/client-child.%02d-%02d.%d", HERCULES_PATH, tm.tm_hour, tm.tm_min, pid); // original.
// fprintf(stderr, "[POSIX]. Fork child created, hostname=%s, pid=%d, new rank = %d, log_path=%s, old_log_path=%s\n", hostname, pid, new_rank, log_path, old_log_path);
// slog_init(log_path, IMSS_DEBUG_LEVEL, IMSS_DEBUG_FILE, IMSS_DEBUG_SCREEN, 1, 1, 1, new_rank);
// slog_xinit(log_path, IMSS_DEBUG_LEVEL, IMSS_DEBUG_FILE, IMSS_DEBUG_SCREEN, 1, 1, 1, new_rank);
// slog_info("[POSIX]. Fork child created, hostname=%s, new rank=%d, log_path=%s, old_log_path=%s, init=%d", hostname, new_rank, log_path, old_log_path, init);
// slog_info("[POSIX]. Fork child created, hostname=%s, pid=%d, log_path=%s, old_log_path=%s, init=%d", hostname, pid, log_path, old_log_path, init);
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment