Commit 2af06a5d authored by Javier Garcia Blas's avatar Javier Garcia Blas
Browse files

Add timers to measure Snapshot operations

1 merge request!5Merge from Debug into master
Showing with 59 additions and 16 deletions
+59 -16
......@@ -63,7 +63,7 @@ public:
int32_t get_snapshot(std::string key, int *to_copy);
int32_t get_broadcast(std::string key, void **add_, uint64_t *size_);
char *GetDataFromFile(std::string file_name, uint64_t *file_size_occupied);
char *GetDataOfFile(std::string file_name, uint64_t *file_size_occupied);
char *MergeData(__off_t *size_of_data, uint32_t num_of_data_servers, std::string file_name, __off_t file_size, uint64_t block_size);
// char* getDataFromFile(std::string file_name);
......
......@@ -3939,7 +3939,7 @@ ssize_t Write_2_disk(int fd, void *buffer, off_t size, size_t offset)
bytes_to_write = size - bytes_written;
}
bytes = write(fd, buffer, bytes_to_write);
bytes = write(fd, (char *)buffer+bytes_written, bytes_to_write);
if (bytes < 0)
{
......
......@@ -698,7 +698,7 @@ int extractNumber(const std::string &key)
/**
* @brief Fetch all data related to a file name.
*/
char *map_records::GetDataFromFile(string file_name, uint64_t *file_size_occupied)
char *map_records::GetDataOfFile(string file_name, uint64_t *file_size_occupied)
{
std::vector<string> vec;
// uint64_t file_size_occupied = 0;
......@@ -891,8 +891,8 @@ char *map_records::MergeData(off_t *size_of_data, uint32_t num_of_data_servers,
int32_t map_records::Snapshot(uint64_t block_size, const char *checkpoint_dir, int finish, int server_id, char *data_hostname, struct arguments args)
{
clock_t t;
double parcial_time_taken = 0.0, total_time_taken = 0.0;
int pos = 0, ret = 0, fd = -1, block_number = 0, skip = 0, continue_exe = 0;
double parcial_time_taken = 0.0, time_taken_for_writting = 0.0, time_taken_for_merge = 0.0, time_taken_for_collecting = 0.0;
int pos = 0, ret = 0, fd = -1, block_number = 0, continue_exe = 0;
u_int32_t number_active_storage_servers = 0;
size_t offset = 0;
string key, inner_key, block, file_name, inner_file_name, data_uri;
......@@ -902,7 +902,7 @@ int32_t map_records::Snapshot(uint64_t block_size, const char *checkpoint_dir, i
char old_expected_uri[PATH_MAX];
void *address_ = NULL;
void *address_block_0 = NULL;
uint64_t block_size_rtvd = 0, block_0_size = 0, total_written = 0;
uint64_t block_size_rtvd = 0, block_0_size = 0;
int copy_to_disk = 0;
int origin_server_id = 0;
struct stat *stats = NULL;
......@@ -911,7 +911,6 @@ int32_t map_records::Snapshot(uint64_t block_size, const char *checkpoint_dir, i
size_t iteration = 0;
int to_free = 0, is_shared_memory = 0;
__off_t file_size = 0;
// char *data_buffer = NULL;
char *POLICY = args.policy;
// TODO: it is better to use "number_active_storage_servers"
......@@ -923,7 +922,7 @@ int32_t map_records::Snapshot(uint64_t block_size, const char *checkpoint_dir, i
{
is_shared_memory = 1;
}
char *reconstructed_data_file = NULL;
off_t block_offset = 0;
......@@ -937,8 +936,6 @@ int32_t map_records::Snapshot(uint64_t block_size, const char *checkpoint_dir, i
for (const auto &it : buffer_snapshot)
{
// TODO: use the function getBlockInformation().
key = it.first;
if (key.empty())
{
......@@ -1047,20 +1044,36 @@ int32_t map_records::Snapshot(uint64_t block_size, const char *checkpoint_dir, i
uint64_t file_size_occupied = 0;
// This server add their data.
fprintf(stderr,"Performing Snapshopt from file %s in data server %d\n", expected_uri, args.id);
char *data_ = GetDataFromFile(expected_uri, &file_size_occupied);
fprintf(stderr, "Performing Snapshopt from file %s in data server %d\n", expected_uri, args.id);
t = clock();
char *data_ = GetDataOfFile(expected_uri, &file_size_occupied);
if (data_ != NULL)
{
// TODO: add the error condition.
sprintf(expected_key_format, "%s$%d", expected_uri, args.id);
put_broadcast((string)expected_key_format, data_, file_size_occupied);
}
t = clock() - t;
time_taken_for_collecting = ((double)t) / (CLOCKS_PER_SEC);
off_t size_of_merge_data = 0;
t = clock();
char *full_data_from_file = MergeData(&size_of_merge_data, number_active_storage_servers, file_name, file_size, block_size);
if (full_data_from_file == NULL)
{
fprintf(stderr, "Data from file %s has not been merge in server %d\n", file_name.c_str(), args.id);
slog_error("HERCULES_ERR_MERGE_DATA_SNAPSHOT");
perror("HERCULES_ERR_MERGE_DATA_SNAPSHOT");
if (data_ != NULL)
{
free(data_);
}
continue;
}
t = clock() - t;
time_taken_for_merge = ((double)t) / (CLOCKS_PER_SEC);
t = clock();
int fd = Open_file(checkpoint_dir, data_hostname);
slog_debug("writting %lu bytes to disk with the name %s", size_of_merge_data, data_hostname);
ssize_t written_bytes_in_disk = Write_2_disk(fd, full_data_from_file, size_of_merge_data, 0);
......@@ -1073,8 +1086,26 @@ int32_t map_records::Snapshot(uint64_t block_size, const char *checkpoint_dir, i
Close_file(fd);
t = clock() - t;
time_taken_for_writting = ((double)t) / (CLOCKS_PER_SEC);
continue_exe = 1;
slog_time("ServerID,%d,Hostname,%s,Total-written,%lu,B,%f,MB,%f,GB,Time-Writting,%f,s,Troughput,%f,B/s,%f,MB/s,%f,GB/s,Blocksize,%lu,KB,Time-Collecting,%f,Time-Merging,%f s", args.id,
server_id,
data_hostname,
written_bytes_in_disk,
(double)written_bytes_in_disk / 1024 / 1024,
(double)written_bytes_in_disk / 1024 / 1024 / 1024,
time_taken_for_writting,
(double)written_bytes_in_disk / time_taken_for_writting,
(double)written_bytes_in_disk / time_taken_for_writting / 1024 / 1024,
(double)written_bytes_in_disk / time_taken_for_writting / 1024 / 1024 / 1024,
block_size,
time_taken_for_collecting,
time_taken_for_merge
);
// int find = erase_broadcast_element(key);
int find = erase_snapshot_element(key);
if (find)
......@@ -1094,7 +1125,18 @@ int32_t map_records::Snapshot(uint64_t block_size, const char *checkpoint_dir, i
// means that there are a server waiting for
// the data.
uint64_t file_size_occupied = 0;
char *data_ = GetDataFromFile(key, &file_size_occupied);
t = clock();
char *data_ = GetDataOfFile(key, &file_size_occupied);
if (data_ == NULL)
{
fprintf(stderr, "Data from file %s has not been get in server %d\n", key.c_str(), args.id);
slog_error("HERCULES_ERR_GET_DATA_OF_SNAPSHOT");
perror("HERCULES_ERR_GET_DATA_OF_SNAPSHOT");
continue;
}
t = clock() - t;
time_taken_for_collecting = ((double)t) / (CLOCKS_PER_SEC);
// char key_[REQUEST_SIZE];
int n_server_ = origin_server_id;
......@@ -1294,6 +1336,7 @@ int32_t map_records::Snapshot(uint64_t block_size, const char *checkpoint_dir, i
// (double)total_written / total_time_taken / 1024 / 1024 / 1024,
// block_size);
// }
return continue_exe;
// return total_time_taken;
}
......@@ -232,7 +232,7 @@ extern "C"
// static int (*real_fcntl)(int fd, int cmd, ... /* arg */) = NULL;
static int (*real_syncfs)(int fd) = NULL;
static int (*real_posix_fadvise)(int fd, off_t offset, off_t len, int advice) = NULL;
static int (*real_posix_fadvise64)(int fd, off64_t offset, off64_t len, int advice) = NULL;
// static int (*real_posix_fadvise64)(int fd, off64_t offset, off64_t len, int advice) = NULL;
static int (*real_faccessat)(int dir_fd, const char *pathname, int mode, int flags) = NULL;
static int (*real_unlinkat)(int fd, const char *name, int flag) = NULL;
static int (*real_renameat2)(int olddirfd, const char *oldpath, int newdirfd, const char *newpath, unsigned int flags) = NULL;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment