-
-
Notifications
You must be signed in to change notification settings - Fork 46k
/
bankers_algorithm.py
228 lines (210 loc) · 8.36 KB
/
bankers_algorithm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# A Python implementation of the Banker's Algorithm in Operating Systems using
# Processes and Resources
# {
# "Author: "Biney Kingsley ([email protected]), [email protected]",
# "Date": 28-10-2018
# }
"""
The Banker's algorithm is a resource allocation and deadlock avoidance algorithm
developed by Edsger Dijkstra that tests for safety by simulating the allocation of
predetermined maximum possible amounts of all resources, and then makes a "s-state"
check to test for possible deadlock conditions for all other pending activities,
before deciding whether allocation should be allowed to continue.
| [Source] Wikipedia
| [Credit] Rosetta Code C implementation helped very much.
| (https://rosettacode.org/wiki/Banker%27s_algorithm)
"""
from __future__ import annotations
import numpy as np
test_claim_vector = [8, 5, 9, 7]
test_allocated_res_table = [
[2, 0, 1, 1],
[0, 1, 2, 1],
[4, 0, 0, 3],
[0, 2, 1, 0],
[1, 0, 3, 0],
]
test_maximum_claim_table = [
[3, 2, 1, 4],
[0, 2, 5, 2],
[5, 1, 0, 5],
[1, 5, 3, 0],
[3, 0, 3, 3],
]
class BankersAlgorithm:
def __init__(
self,
claim_vector: list[int],
allocated_resources_table: list[list[int]],
maximum_claim_table: list[list[int]],
) -> None:
"""
:param claim_vector: A nxn/nxm list depicting the amount of each resources
(eg. memory, interface, semaphores, etc.) available.
:param allocated_resources_table: A nxn/nxm list depicting the amount of each
resource each process is currently holding
:param maximum_claim_table: A nxn/nxm list depicting how much of each resource
the system currently has available
"""
self.__claim_vector = claim_vector
self.__allocated_resources_table = allocated_resources_table
self.__maximum_claim_table = maximum_claim_table
def __processes_resource_summation(self) -> list[int]:
"""
Check for allocated resources in line with each resource in the claim vector
"""
return [
sum(p_item[i] for p_item in self.__allocated_resources_table)
for i in range(len(self.__allocated_resources_table[0]))
]
def __available_resources(self) -> list[int]:
"""
Check for available resources in line with each resource in the claim vector
"""
return np.array(self.__claim_vector) - np.array(
self.__processes_resource_summation()
)
def __need(self) -> list[list[int]]:
"""
Implement safety checker that calculates the needs by ensuring that
``max_claim[i][j] - alloc_table[i][j] <= avail[j]``
"""
return [
list(np.array(self.__maximum_claim_table[i]) - np.array(allocated_resource))
for i, allocated_resource in enumerate(self.__allocated_resources_table)
]
def __need_index_manager(self) -> dict[int, list[int]]:
"""
This function builds an index control dictionary to track original ids/indices
of processes when altered during execution of method "main"
:Return: {0: [a: int, b: int], 1: [c: int, d: int]}
>>> index_control = BankersAlgorithm(
... test_claim_vector, test_allocated_res_table, test_maximum_claim_table
... )._BankersAlgorithm__need_index_manager()
>>> {key: [int(x) for x in value] for key, value
... in index_control.items()} # doctest: +NORMALIZE_WHITESPACE
{0: [1, 2, 0, 3], 1: [0, 1, 3, 1], 2: [1, 1, 0, 2], 3: [1, 3, 2, 0],
4: [2, 0, 0, 3]}
"""
return {self.__need().index(i): i for i in self.__need()}
def main(self, **kwargs) -> None:
"""
Utilize various methods in this class to simulate the Banker's algorithm
:Return: None
>>> BankersAlgorithm(test_claim_vector, test_allocated_res_table,
... test_maximum_claim_table).main(describe=True)
Allocated Resource Table
P1 2 0 1 1
<BLANKLINE>
P2 0 1 2 1
<BLANKLINE>
P3 4 0 0 3
<BLANKLINE>
P4 0 2 1 0
<BLANKLINE>
P5 1 0 3 0
<BLANKLINE>
System Resource Table
P1 3 2 1 4
<BLANKLINE>
P2 0 2 5 2
<BLANKLINE>
P3 5 1 0 5
<BLANKLINE>
P4 1 5 3 0
<BLANKLINE>
P5 3 0 3 3
<BLANKLINE>
Current Usage by Active Processes: 8 5 9 7
Initial Available Resources: 1 2 2 2
__________________________________________________
<BLANKLINE>
Process 3 is executing.
Updated available resource stack for processes: 5 2 2 5
The process is in a safe state.
<BLANKLINE>
Process 1 is executing.
Updated available resource stack for processes: 7 2 3 6
The process is in a safe state.
<BLANKLINE>
Process 2 is executing.
Updated available resource stack for processes: 7 3 5 7
The process is in a safe state.
<BLANKLINE>
Process 4 is executing.
Updated available resource stack for processes: 7 5 6 7
The process is in a safe state.
<BLANKLINE>
Process 5 is executing.
Updated available resource stack for processes: 8 5 9 7
The process is in a safe state.
<BLANKLINE>
"""
need_list = self.__need()
alloc_resources_table = self.__allocated_resources_table
available_resources = self.__available_resources()
need_index_manager = self.__need_index_manager()
for kw, val in kwargs.items():
if kw and val is True:
self.__pretty_data()
print("_" * 50 + "\n")
while need_list:
safe = False
for each_need in need_list:
execution = True
for index, need in enumerate(each_need):
if need > available_resources[index]:
execution = False
break
if execution:
safe = True
# get the original index of the process from ind_ctrl db
for original_need_index, need_clone in need_index_manager.items():
if each_need == need_clone:
process_number = original_need_index
print(f"Process {process_number + 1} is executing.")
# remove the process run from stack
need_list.remove(each_need)
# update available/freed resources stack
available_resources = np.array(available_resources) + np.array(
alloc_resources_table[process_number]
)
print(
"Updated available resource stack for processes: "
+ " ".join([str(x) for x in available_resources])
)
break
if safe:
print("The process is in a safe state.\n")
else:
print("System in unsafe state. Aborting...\n")
break
def __pretty_data(self):
"""
Properly align display of the algorithm's solution
"""
print(" " * 9 + "Allocated Resource Table")
for item in self.__allocated_resources_table:
print(
f"P{self.__allocated_resources_table.index(item) + 1}"
+ " ".join(f"{it:>8}" for it in item)
+ "\n"
)
print(" " * 9 + "System Resource Table")
for item in self.__maximum_claim_table:
print(
f"P{self.__maximum_claim_table.index(item) + 1}"
+ " ".join(f"{it:>8}" for it in item)
+ "\n"
)
print(
"Current Usage by Active Processes: "
+ " ".join(str(x) for x in self.__claim_vector)
)
print(
"Initial Available Resources: "
+ " ".join(str(x) for x in self.__available_resources())
)
if __name__ == "__main__":
import doctest
doctest.testmod()