Commit e38397f6 authored by Genaro Juan Sánchez Gallegos's avatar Genaro Juan Sánchez Gallegos
Browse files

Fixed deployment's scripts and configuration file

1 merge request!1added block size and storage size as data server CLI options
Showing with 71 additions and 69 deletions
+71 -69
......@@ -138,7 +138,6 @@ int32_t main(int32_t argc, char **argv)
args.type = argv[1][0];
args.id = atoi(argv[2]);
args.stat_host = argv[3];
if (cfg_get(cfg, "URI"))
{
......@@ -207,11 +206,15 @@ int32_t main(int32_t argc, char **argv)
else if (strstr(getenv("IMSS_DEBUG"), "all"))
{
IMSS_DEBUG_FILE = 1;
IMSS_DEBUG_SCREEN = 1;
IMSS_DEBUG_LEVEL = SLOG_PANIC;
}
else if (strstr(getenv("IMSS_DEBUG"), "none"))
else if (strstr(getenv("IMSS_DEBUG"), "none")) {
IMSS_DEBUG_FILE = 0;
IMSS_DEBUG_SCREEN = 0;
IMSS_DEBUG_LEVEL = SLOG_NONE;
unsetenv("IMSS_DEBUG");
else
} else
{
IMSS_DEBUG_FILE = 1;
IMSS_DEBUG_LEVEL = getLevel(getenv("IMSS_DEBUG"));
......@@ -231,13 +234,14 @@ int32_t main(int32_t argc, char **argv)
// sprintf(log_path, "./%c-server.%02d-%02d-%02d", args.type, tm.tm_hour, tm.tm_min, tm.tm_sec);
fprintf(stderr, "Server type=%c\n", args.type);
sprintf(log_path, "./%c-server", args.type);
slog_init(log_path, SLOG_DEBUG, 1, 1, 1, 1, 1, args.id);
slog_info("IMSS DEBUG FILE AT %s", log_path);
slog_init(log_path, IMSS_DEBUG_LEVEL, IMSS_DEBUG_FILE, IMSS_DEBUG_SCREEN, 1, 1, 1, args.id);
fprintf(stderr, "IMSS DEBUG FILE AT %s\n", log_path);
slog_info(",Time(msec), Comment, RetCode");
slog_debug("[SERVER] Starting server.");
if (args.type == TYPE_DATA_SERVER)
{
args.stat_host = argv[3];
slog_debug("imss_uri = %s stat-host = %s stat-port = %" PRId64 " num-servers = %" PRId64 " deploy-hostfile = %s block-size = %" PRIu64 " storage-size = %" PRIu64 "",
args.imss_uri, args.stat_host, args.stat_port, args.num_servers, args.deploy_hostfile, args.block_size, args.storage_size);
// bind port number.
......@@ -283,6 +287,7 @@ int32_t main(int32_t argc, char **argv)
mem_pool = StsQueue.create();
// figure out how many blocks we need and allocate them
num_blocks = max_storage_size / (args.block_size * KB);
slog_info("[main] num_blocks=%lu", num_blocks);
for (int i = 0; i < num_blocks; ++i)
{
char *buffer = (char *)calloc(args.block_size * KB, sizeof(char));
......
......@@ -7,50 +7,38 @@
#SBATCH --oversubscribe
## Uncomment when working in Tucan.
#IOR_PATH=/home/software/io500/bin
#module unload mpi
#module load mpi/openmpi
#USE_OPEN_MPI=1
## Uncomment when working in Italia cluster.
IOR_PATH=/home/software/io500/bin
module unload mpi
module load mpi/openmpi
USE_OPEN_MPI=1
## Uncomment when working in Italia cluster.
#USE_OPEN_MPI=1
./hercules start
#H_NUM_DATA=$(cat ".hercules.env" | grep "H_H_NUM_DATA " | awk '{print $2}')
readarray -t data_hosts < data_hostfile
readarray -t meta_hosts < meta_hostfile
#exit 0
readarray -t client_hosts < client_hostfile
H_NUM_DATA=${#data_hosts[@]}
H_NUM_METADATA=${#meta_hosts[@]}
H_NUM_CLIENT=${#client_hosts[@]}
H_PATH=$(cat .hercules.conf | grep "H_PATH" | awk '{print $2}')
H_BUILD_PATH=$(cat .hercules.conf | grep "H_BUILD_PATH" | awk '{print $2}')
H_BASH_PATH=$(cat .hercules.conf | grep "H_BASH_PATH" | awk '{print $2}')
H_META_PORT=$(cat .hercules.conf | grep "H_META_PORT" | awk '{print $2}')
H_DATA_PORT=$(cat .hercules.conf | grep "H_DATA_PORT" | awk '{print $2}')
H_BLOCK_SIZE=$(cat .hercules.conf | grep "H_BLOCK_SIZE" | awk '{print $2}')
H_STORAGE_SIZE=$(cat .hercules.conf | grep "H_STORAGE_SIZE" | awk '{print $2}')
H_PATH=$(dirname `pwd`)
echo "Running client on $client_hosts"
# COMMAND="$IOR_PATH/ior -t 1M -b 100M -s 1 -i 5 -o /mnt/imss/data.out"
COMMAND="hostname"
#COMMAND="echo 'hello' > /tmp/hello"
echo "Running client"
IOR_PATH=/beegfs/home/javier.garciablas/io500/bin
COMMAND="$IOR_PATH/ior -t 1M -b 1M -s 1 -i 1 -o /mnt/imss/data.out"
#COMMAND="free -h"
spack load ucx@1.14.0%gcc@9.4.0 arch=linux-ubuntu20.04-zen glib@2.74.1%gcc@9.4.0 arch=linux-ubuntu20.04-zen pcre@8.45%gcc@9.4.0 arch=linux-ubuntu20.04-zen
set -x
whereis mpiexec
mpiexec --hostfile ./client_hostfile -npernode $H_NUM_CLIENT \
-x LD_PRELOAD=$H_PATH/build/tools/libhercules_posix.so \
-x IMSS_DEBUG=all \
$COMMAND
mpiexec --hostfile ./client_hostfile \
$COMMAND
# mpiexec --hostfile ./client_hostfile -npernode $H_NUM_CLIENT \
# -x LD_PRELOAD=$H_PATH/build/tools/libhercules_posix.so \
# -x IMSS_DEBUG=none \
# $COMMAND
sleep 10
......
......@@ -19,6 +19,12 @@ NUM_DATA_SERVERS = 1
# Total number of metadadata nodes
NUM_META_SERVERS = 1
# Total number of client nodes
NUM_NODES_FOR_CLIENTS = 3
# Total number of clients per node
NUM_CLIENTS_PER_NODE = 3
# 1: enables malleability functions
MALLEABILITY = 0
UPPER_BOUND_MALLEABILITY = 0
......@@ -28,7 +34,7 @@ LOWER_BOUND_MALLEABILITY = 0
DATA_HOSTFILE = data_hostfile
# File path of the persistence metadata
METADA_PERSISTENCE_FILE = /beegfs/home/javier.garciablas/imss/bash/metadata
METADA_PERSISTENCE_FILE = /home/gsanchez/IMSS/imss/bash/metadata
# Number of threads attending data requests
THREAD_POOL = 1
......
......@@ -155,7 +155,7 @@ int32_t send_dynamic_stream(ucp_worker_h ucp_worker, ucp_ep_h ep, void * data_st
//Method retrieving a serialized dynamic data structure.
int32_t recv_dynamic_stream(ucp_worker_h ucp_worker, ucp_ep_h ep, void * data_struct, int32_t data_type, uint64_t dest);
int connect_common(const char *server, uint16_t server_port, sa_family_t af);
int connect_common(const char *server, uint64_t server_port, sa_family_t af);
ucs_status_t ucx_wait(ucp_worker_h ucp_worker, struct ucx_context *request, const char *op_str, const char *data_str);
......
......@@ -174,7 +174,7 @@ rank - Application process identifier used as communications ID in the
RETURNS: 0 - Communication channel and initializations performed successfully.
-1 - In case of error.
*/
int32_t stat_init(char *stat_hostfile, uint16_t port, int32_t num_stat_servers, uint32_t rank);
int32_t stat_init(char *stat_hostfile, uint64_t port, int32_t num_stat_servers, uint32_t rank);
/* Method disabling the communication channel with the metadata server. Besides, the current method releases session-related elements previously initialized.
......
......@@ -601,6 +601,9 @@ int32_t recv_dynamic_stream(ucp_worker_h ucp_worker, ucp_ep_h ep, void *data_str
// Copy the actual structure into the one provided through reference.
memcpy(struct_, msg_data, sizeof(imss_info));
slog_debug(" \t\t msg_data=%s", msg_data);
if (!strncmp("$ERRIMSS_NO_KEY_AVAIL$", struct_->uri_, 22))
{
slog_debug("[COMM] recv_dynamic_stream end %ld", length);
......@@ -752,7 +755,7 @@ ucs_status_t flush_ep(ucp_worker_h worker, ucp_ep_h ep)
}
}
int connect_common(const char *server, uint16_t server_port, sa_family_t af)
int connect_common(const char *server, uint64_t server_port, sa_family_t af)
{
int sockfd = -1;
int listenfd = -1;
......@@ -761,7 +764,7 @@ int connect_common(const char *server, uint16_t server_port, sa_family_t af)
struct addrinfo hints, *res, *t;
int ret;
snprintf(service, sizeof(service), "%u", server_port);
snprintf(service, sizeof(service), "%lu", server_port);
memset(&hints, 0, sizeof(hints));
hints.ai_flags = (server == NULL) ? AI_PASSIVE : 0;
hints.ai_family = af;
......
......@@ -223,7 +223,7 @@ delete_imss(char *imss_uri,
// Method creating a communication channel with the IMSS metadata server. Besides, the stat_imss method initializes a set of elements that will be used through the session.
int32_t stat_init(char *stat_hostfile,
uint16_t port,
uint64_t port,
int32_t num_stat_servers,
uint32_t rank)
{
......@@ -356,13 +356,14 @@ int32_t stat_init(char *stat_hostfile,
n_chars = getline(&stat_node, &l_size, stat_nodes);
// Erase the new line character ('') from the string.
stat_node[n_chars - 1] = '\0';
slog_debug("[IMSS] stat_client=%s", stat_node);
slog_debug("[IMSS] i=%d, stat_node=%s, port=%d, rank=%" PRIu32 "", i, stat_node, port, rank);
slog_debug("[IMSS] stat_int: Contacting stat dispatcher at %s:%d", stat_node, port);
// slog_debug("[IMSS] stat_client=%s", stat_node);
slog_debug("[IMSS][stat_int] i=%d, stat_node=%s, port=%ld, rank=%" PRIu32 "", i, stat_node, port, rank);
slog_debug("[IMSS][stat_int] Contacting stat dispatcher at %s:%ld", stat_node, port);
oob_sock = connect_common(stat_node, port, AF_INET);
sprintf(request, "%" PRIu32 " GET HELLO!", rank);
slog_debug("[IMSS][stat_int] request=%s", request);
if (send(oob_sock, request, REQUEST_SIZE, 0) < 0)
{
perror("ERRIMSS_STAT_HELLO");
......@@ -375,7 +376,7 @@ int32_t stat_init(char *stat_hostfile,
close(oob_sock);
client_create_ep(ucp_worker_meta, &stat_eps[i], stat_addr[i]);
slog_debug("[IMSS] stat_int: created ep with %s", stat_node);
slog_debug("[IMSS] stat_int: created ep with %s:%ld", stat_node, port);
}
// Close the file.
if (fclose(stat_nodes) != 0)
......@@ -728,7 +729,7 @@ int32_t open_imss(char *imss_uri)
int32_t not_initialized = 0;
slog_debug("[IMSS] open_imss: starting function");
slog_debug("[IMSS][open_imss] starting function, imss_uri=%s", imss_uri);
// Retrieve the actual information from the metadata server.
int32_t imss_existance = stat_imss(imss_uri, &new_imss.info);
// Check if the requested IMSS did not exist or was already stored in the local vector.
......@@ -883,7 +884,7 @@ int32_t stat_imss(char *imss_uri, imss_info *imss_info_)
int ret = 0;
ucp_ep_h ep;
slog_debug("[IMSS][stat_imss]");
slog_debug("[IMSS][stat_imss] imss_uri=%s", imss_uri);
if ((imss_found_in = find_imss(imss_uri, &searched_imss)) != -1)
{
memcpy(imss_info_, &searched_imss.info, sizeof(imss_info));
......
......@@ -304,7 +304,7 @@ void slog(int flag, const char *msg, ...)
}
/* Print output. */
if (slg.exclusive && flag >= slg.level)
if (slg.exclusive && flag <= slg.level)
if (slg.to_console != 0)
if (flag <= slg.level || slg.pretty)
{
......
......@@ -1006,7 +1006,7 @@ int stat_worker_helper(p_argv *arguments, char *req)
int client_id = 0;
char mode[MODE_SIZE];
slog_debug("[STAT WORKER] Waiting for new request.");
// slog_debug("[STAT WORKER] Waiting for new request.");
// Save the request to be served.
// recv_data(arguments->ucp_worker, arguments->server_ep, req);
// slog_info("[STAT WORKER] Request - '%s'", req);
......@@ -1028,6 +1028,7 @@ int stat_worker_helper(p_argv *arguments, char *req)
raw_msg[req_size] = '\0';
// printf("*********worker_metadata raw_msg %s",raw_msg);
slog_info("[workers][stat_worker_helper] request received=%s", req);
// Reference to the client request.
char number[16];
......@@ -1037,6 +1038,8 @@ int stat_worker_helper(p_argv *arguments, char *req)
char *uri_ = raw_msg + number_length + 1;
uint64_t block_size_recv = (uint64_t)atoi(number);
slog_info("[workers][stat_worker_helper] number=%s, number_length=%d uri=%s, block_size_recv=%ld", number, number_length, uri_, block_size_recv);
// Create an std::string in order to be managed by the map structure.
std::string key;
key.assign((const char *)uri_);
......@@ -1487,7 +1490,7 @@ void *dispatcher(void *th_argv)
char mode[MODE_SIZE];
slog_debug("[DISPATCHER] Waiting for connection requests.");
fprintf(stderr, "[DISPATCHER] Waiting for connection requests.");
fprintf(stderr, "[DISPATCHER] Waiting for connection requests.\n");
sockfd = accept(listenfd, NULL, NULL);
ret = recv(sockfd, req, REQUEST_SIZE, MSG_WAITALL);
......
......@@ -85,7 +85,7 @@ static struct err_handling
} err_handling_opt;
static ucs_status_t ep_status = UCS_OK;
static uint16_t server_port = 13337;
static uint64_t server_port = 13337;
static sa_family_t ai_family = AF_INET;
static long test_string_length = 16;
static const ucp_tag_t tag = 0x1337a880u;
......
......@@ -222,6 +222,8 @@ __attribute__((constructor)) void imss_posix_init(void)
// fill global variables with the enviroment variables value.
getConfiguration();
fprintf(stderr, "IMSS_DEBUG_LEVEL=%d\n", IMSS_DEBUG_LEVEL);
IMSS_DATA_BSIZE = IMSS_BLKSIZE * KB;
// aux_refresh = (char *)malloc(IMSS_DATA_BSIZE); // global buffer to refresh metadata.
// imss_path_refresh = calloc(MAX_PATH, sizeof(char));
......@@ -246,22 +248,6 @@ __attribute__((constructor)) void imss_posix_init(void)
}
sprintf(hostname, "%s:%d", hostname, getpid());
if (getenv("IMSS_DEBUG") != NULL)
{
if (strstr(getenv("IMSS_DEBUG"), "file"))
IMSS_DEBUG_FILE = 1;
if (strstr(getenv("IMSS_DEBUG"), "stdout"))
IMSS_DEBUG_SCREEN = 1;
if (strstr(getenv("IMSS_DEBUG"), "debug"))
IMSS_DEBUG_LEVEL = SLOG_DEBUG;
if (strstr(getenv("IMSS_DEBUG"), "all"))
IMSS_DEBUG_LEVEL = SLOG_LIVE;
if (strstr(getenv("IMSS_DEBUG"), "none"))
unsetenv("IMSS_DEBUG");
}
IMSS_DEBUG_LEVEL = SLOG_DEBUG;
rank = MurmurOAAT32(hostname);
// log init.
......@@ -272,6 +258,7 @@ __attribute__((constructor)) void imss_posix_init(void)
slog_info(",Time(msec), Comment, RetCode");
slog_debug(" -- IMSS_MOUNT_POINT: %s", MOUNT_POINT);
slog_debug(" -- IMSS_ROOT: %s", IMSS_ROOT);
slog_debug(" -- IMSS_HOSTFILE: %s", IMSS_HOSTFILE);
slog_debug(" -- IMSS_N_SERVERS: %d", N_SERVERS);
slog_debug(" -- IMSS_SRV_PORT: %d", IMSS_SRV_PORT);
......@@ -296,7 +283,11 @@ __attribute__((constructor)) void imss_posix_init(void)
if (deployment == 2)
{
open_imss(IMSS_ROOT);
ret = open_imss(IMSS_ROOT);
if(ret < 0) {
slog_fatal("Error creating IMSS's resources, the process cannot be started");
return 1;
}
}
if (deployment != 2)
......@@ -345,10 +336,10 @@ void getConfiguration()
// readlink("/proc/self/exe", abs_exe_path, PATH_MAX);
getcwd(abs_exe_path, sizeof(abs_exe_path));
strcpy(abs_exe_path2, abs_exe_path);
strcat(abs_exe_path, "/../conf/hercules.conf");
strcpy(abs_exe_path2, abs_exe_path);
strcat(abs_exe_path2, "hercules.conf");
strcat(abs_exe_path2, "./hercules.conf");
cfg = cfg_init();
......@@ -529,7 +520,12 @@ void getConfiguration()
IMSS_DEBUG_LEVEL = SLOG_PANIC;
}
else if (strstr(getenv("IMSS_DEBUG"), "none"))
{
IMSS_DEBUG_FILE = 0;
IMSS_DEBUG_SCREEN = 0;
IMSS_DEBUG_LEVEL = SLOG_NONE;
unsetenv("IMSS_DEBUG");
}
else
{
IMSS_DEBUG_FILE = 1;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment