Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Amit Jaiswal committed Sep 23, 2024
1 parent 1887918 commit ef819a6
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 10 deletions.
3 changes: 3 additions & 0 deletions projects/hydra/scripts/run-entities/create-cluster.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

python3 create-cluster.py
8 changes: 8 additions & 0 deletions projects/hydra/scripts/run-entities/kill-all-entities.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

set -x

for pid in `ps -ef|grep 'CLUSTER_ENTITY'|grep -v 'grep'|awk '{print $2}'`; do
echo "Killing $pid"
kill $pid
done
13 changes: 13 additions & 0 deletions projects/hydra/scripts/run-entities/run-all-entities.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

set -x

mkdir -p logs
./run-entity.sh "master" 2>&1 > logs/master.log &
./run-entity.sh "resource_manager" 2>&1 > logs/resource_manager.log &
./run-entity.sh "job_manager" 2>&1 > logs/job_manager.log &
./run-entity.sh "task_manager" 2>&1 > logs/task_manager.log &
./run-entity.sh "swf_manager" 2>&1 > logs/swf_manager.log &
./run-entity.sh "wf_manager" 2>&1 > logs/wf_manager.log &
./run-entity.sh "worker" 2>&1 > logs/worker.log &
./run-entity.sh "agent" 2>&1 > logs/agent.log &
7 changes: 3 additions & 4 deletions projects/hydra/scripts/run-entities/run-entity.sh
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
#!/bin/bash

# command line parameters
if [ $# != 2 ]; then
echo "Usage: $0 <entity-type> <entity-marker>"
if [ $# != 1 ]; then
echo "Usage: $0 <entity-type>"
exit 0
fi

# read parameters
ENTITY_TYPE="$1"
ENTITY_MARKER="$2"

# run
python3 run-entity.py "$ENTITY_TYPE" "$ENTITY_MARKER"
python3 run-entity.py "$ENTITY_TYPE" "CLUSTER_ENTITY"
5 changes: 5 additions & 0 deletions python-packages/hydra/src/omigo_hydra/cluster_protocol_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,12 +827,15 @@ def monitor_incoming_for_supervisor(self):
for xentity_type in EntityType.get_all():
# get all incoming entity ids
xincoming_ids = self.cluster_handler.list_dirs(ClusterPaths.get_entities_incoming(xentity_type))
# utils.info("ClusterMasterProtocol: {}, xentity_type: {}, xincoming_ids: {}".format(self.get_entity_id(), xentity_type, xincoming_ids))

# get all assigned supervisor entity ids
xassigned_ids = self.cluster_handler.list_dirs(ClusterPaths.get_entities_assigned_supervisor(xentity_type))
# utils.info("ClusterMasterProtocol: {}, xentity_type: {}, xassigned_ids: {}".format(self.get_entity_id(), xentity_type, xassigned_ids))

# find which ones are not assigned yet
xnon_assigned_ids = list(set(xincoming_ids).difference(set(xassigned_ids)))
# utils.info("ClusterMasterProtocol: {}, xentity_type: {}, xnon_assigned_ids: {}".format(self.get_entity_id(), xentity_type, xnon_assigned_ids))

# sort the entities
xnon_assigned_ids = sorted(xnon_assigned_ids)
Expand Down Expand Up @@ -1660,6 +1663,8 @@ def run_election(self):
utils.info("ClusterMasterElectionProtocol: {}: run_election: already the current master".format(self.entity_id))
else:
utils.info("ClusterMasterElectionProtocol: {}: run_election: another master exists: {}".format(self.entity_id, current_master_id))

# return
return False

# get the current master id
Expand Down
13 changes: 7 additions & 6 deletions python-packages/hydra/src/omigo_hydra/cluster_services_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def read_workflow_file_path(file_path, max_duration = 3*86400, sleep_sec = 3):
utils.info("Waiting for the file to be present: {} ...".format(file_path))
while (duration < max_duration):
if (get_cluster_handler().file_exists(file_path) == True):
time.sleep(5)
time.sleep(sleep_sec)
found = True
break
else:
Expand All @@ -280,7 +280,8 @@ def read_workflow_file_path(file_path, max_duration = 3*86400, sleep_sec = 3):
try:
xtsv = read_tsv(file_path)
except Exception as e:
time.sleep(60)
utils.warn("Caught exception in reading file. Sleeping for {} seconds".format(sleep_sec))
time.sleep(sleep_sec)
xtsv = read_tsv(file_path)

utils.info("Read file: {}, num_rows: {}".format(file_path, xtsv.num_rows()))
Expand Down Expand Up @@ -447,7 +448,7 @@ def run(self):
self.run_step()

# sleep
utils.info("Sleeping for {} seconds".format(self.wait_sec))
utils.info("{}: sleeping for {} seconds".format(self.protocol.entity.entity_type, self.wait_sec))
time.sleep(self.wait_sec)

class EntityMasterRunner(EntityRunner):
Expand All @@ -470,14 +471,14 @@ def run(self):
if (self.election_protocol.run_election() == True):
self.protocol.refresh_master_cache()

# monitor incoming
self.protocol.monitor_incoming_for_supervisor()
# monitor incoming
self.protocol.monitor_incoming_for_supervisor()

# run base class
self.run_step()

# sleep
utils.info("Sleeping for {} seconds".format(self.wait_sec))
utils.info("{}: Sleeping for {} seconds".format(self.protocol.entity.entity_type, self.wait_sec))
time.sleep(self.wait_sec)

class EntityResourceManagerRunner(EntityRunner):
Expand Down

0 comments on commit ef819a6

Please sign in to comment.