Skip to content

Commit dc39693

Browse files
committed
Errors : merge back to dev-streaming
1 parent c1a25d3 commit dc39693

16 files changed

+443
-334
lines changed

.gitignore

+1-4
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,4 @@ init_setup.sh
3434
.idea/
3535

3636
# doc built
37-
*/build
38-
39-
# test file
40-
/tests
37+
*/build

run_bench.py

+1-6
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,4 @@
1-
"""
2-
=========
3-
run bench hots
4-
=========
5-
Run the bench of hots with one dataset, specifying the parameters k and tau you want.
6-
"""
1+
"""Run the bench of hots with one dataset, specifying the parameters k and tau you want."""
72

83
import subprocess
94

src/hots/clustering.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""
22
Provide clustering algorithms and all clustering-related methods.
3+
34
Here are the available clustering algorithms : k-means, hierarchical,
45
spectral, custom spectral.
56
"""
@@ -406,8 +407,7 @@ def get_distance_cluster(instance, cluster_centers_):
406407

407408
def get_distance_container_cluster(conso_cont, profile):
408409
"""
409-
Compute the distance between the container profile and his cluster's
410-
mean profile.
410+
Compute the distance between the container profile and his cluster's mean profile.
411411
412412
:param conso_cont: _description_
413413
:type conso_cont: np.array

src/hots/container.py

+1-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
"""
2-
Provide actions specific to containers (plot containers data,
3-
build dictionnary for container IDs ...)
4-
"""
1+
"""Provide actions specific to containers."""
52

63
from itertools import combinations
74

src/hots/init.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -192,10 +192,12 @@ def define_globals(p_path, config, kafka_var):
192192
global csv_reader
193193

194194
global avro_deserializer
195-
global Sentry
195+
global s_entry
196196
global kafka_producer
197197
global kafka_consumer
198198
global kafka_topics
199+
global kafka_schema
200+
global kafka_schema_url
199201
global tick_time
200202
global time_at
201203
global memory_usage
@@ -219,11 +221,15 @@ def define_globals(p_path, config, kafka_var):
219221
optim_file = open(p_path / 'optim_logs.log', 'w')
220222
clustering_file = open(p_path / 'clustering_logs.log', 'w')
221223

224+
s_entry = True
225+
222226
if kafka_var:
223227
kafka_topics = config['kafkaConf']['topics']
224228
reader.kafka_availability(config)
225229
kafka_producer = reader.get_producer(config)
226230
kafka_consumer = reader.get_consumer(config)
231+
kafka_schema = config['kafkaConf']['schema']
232+
kafka_schema_url = config['kafkaConf']['schema_url']
227233
dict_agg_metrics = {}
228234
for metric in metrics:
229235
dict_agg_metrics[metric] = 'sum'

src/hots/instance.py

+2-6
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,18 @@ class Instance:
3333
:type dict_id_c: Dict
3434
"""
3535

36-
def __init__(self, path, config, use_kafka):
36+
def __init__(self, path, config):
3737
"""Instance constructor.
3838
3939
:param path: Filesystem path to the input files
4040
:type path: str
4141
:param config: Configuration dict from config file
4242
:type config: Dict
43-
:param use_kafka: streaming platform
44-
:type use_kafka: bool
4543
"""
44+
# TODO update by empty df_indiv and df_host => how to init df_host_meta ?
4645
(self.df_indiv, # container usage
4746
self.df_host, # node usage
4847
self.df_host_meta) = it.init_dfs(path) # node meta deta
49-
# new data
50-
# self.df_container = self.df_indiv[self.df_indiv["timestamp"] < 2]
51-
# self.df_container.reset_index()
5248

5349
# count of unique time values from timestamp column = 6
5450
self.time: int = self.df_indiv[it.tick_field].nunique()

src/hots/main.py

+145-13
Original file line numberDiff line numberDiff line change
@@ -113,16 +113,143 @@ def main(path, k, tau, method, cluster_method, param, output, tolclust, tolplace
113113
total_method_time = time.time()
114114

115115
# Global loop for getting data
116-
reader.init_reader(path, use_kafka)
117116
print(it.csv_reader)
118117
print(it.avro_deserializer)
119-
it.Sentry = True
120-
# print('df_indiv1: ',my_instance.df_indiv)
118+
# it.s_entry = True
119+
current_time = 0
120+
# Offline / online separation : TODO in parameters
121+
offline_sep = 3
122+
my_instance.sep_time = offline_sep
123+
tick = config['loop']['tick']
124+
125+
total_loop_time = 0.0
126+
loop_nb = 1
127+
nb_clust_changes = 0
128+
nb_place_changes = 0
129+
total_nb_overload = 0
130+
121131
print('Ready for new data...')
122132
try:
123-
while it.Sentry:
124-
print('Getting next data one more timestamp data')
133+
while it.s_entry:
134+
if current_time < offline_sep:
135+
current_data = reader.get_next_data(
136+
current_time, offline_sep, offline_sep + 1, use_kafka
137+
)
138+
current_time += offline_sep
139+
my_instance.df_indiv = current_data
140+
my_instance.df_host = current_data.groupby(
141+
[current_data[it.tick_field], current_data[it.host_field]],
142+
as_index=False).agg(it.dict_agg_metrics)
143+
144+
# Analysis period
145+
start = time.time()
146+
(my_instance, df_host_evo,
147+
df_indiv_clust, labels_) = analysis_period(
148+
my_instance, config, method
149+
)
150+
add_time(-1, 'total_t_obs', (time.time() - start))
151+
152+
cluster_profiles = clt.get_cluster_mean_profile(df_indiv_clust)
153+
tmin = my_instance.sep_time - (my_instance.window_duration - 1)
154+
tmax = my_instance.sep_time
155+
156+
# it.results_file.write('Loop mode : %s\n' % mode)
157+
logging.info('Beginning the loop process ...\n')
158+
159+
(working_df_indiv, df_clust, w, u, v) = build_matrices(
160+
my_instance, tmin, tmax, labels_
161+
)
162+
163+
# build initial optimisation model in pre loop using offline data
164+
start = time.time()
165+
(clust_model, place_model,
166+
clustering_dual_values, placement_dual_values) = pre_loop(
167+
my_instance, working_df_indiv, df_clust,
168+
w, u, config['loop']['constraints_dual'], v,
169+
cluster_method, config['optimization']['solver']
170+
)
171+
add_time(0, 'total_loop', (time.time() - start))
172+
print('ready for loop ?')
173+
tmax += tick
174+
tmin = tmax - (my_instance.window_duration - 1)
175+
176+
else:
177+
last_index = my_instance.df_indiv.index.levels[0][-1]
178+
current_data = reader.get_next_data(
179+
current_time, config['loop']['tick'],
180+
config['loop']['tick'] - current_time + 1, use_kafka
181+
)
182+
current_time += config['loop']['tick']
183+
my_instance.df_indiv = pd.concat(
184+
[my_instance.df_indiv, current_data])
185+
new_df_host = current_data.groupby(
186+
[current_data[it.tick_field], it.host_field], as_index=False
187+
).agg(it.dict_agg_metrics)
188+
new_df_host = new_df_host.astype({
189+
it.host_field: str,
190+
it.tick_field: int}
191+
)
192+
previous_timestamp = last_index
193+
existing_machine_ids = my_instance.df_host[
194+
my_instance.df_host[it.tick_field] == previous_timestamp
195+
][it.host_field].unique()
196+
missing_machine_ids = set(existing_machine_ids) - set(
197+
new_df_host[it.host_field])
198+
199+
missing_rows = pd.DataFrame({
200+
'timestamp': int(current_time),
201+
'machine_id': list(missing_machine_ids),
202+
'cpu': 0.0
203+
})
204+
# new_df_host.sort_values(it.tick_field, inplace=True)
205+
new_df_host = pd.concat([new_df_host, missing_rows])
206+
new_df_host.set_index([it.tick_field, it.host_field], inplace=True, drop=False)
207+
my_instance.df_host = pd.concat([
208+
my_instance.df_host, new_df_host
209+
])
210+
print(current_data)
211+
print(current_time)
212+
print(my_instance.df_indiv)
213+
print('perform loop')
214+
(working_df_indiv, df_clust, w, u, v) = build_matrices(
215+
my_instance, tmin, tmax, labels_
216+
)
217+
nb_clust_changes_loop = 0
218+
nb_place_changes_loop = 0
219+
(nb_clust_changes_loop, nb_place_changes_loop,
220+
init_loop_silhouette, end_loop_silhouette,
221+
clust_conf_nodes, clust_conf_edges, clust_max_deg, clust_mean_deg,
222+
place_conf_nodes, place_conf_edges, place_max_deg, place_mean_deg,
223+
clust_model, place_model,
224+
clustering_dual_values, placement_dual_values,
225+
df_clust, cluster_profiles, labels_) = eval_sols(
226+
my_instance, working_df_indiv, cluster_method,
227+
w, u, v, clust_model, place_model,
228+
config['loop']['constraints_dual'],
229+
clustering_dual_values, placement_dual_values,
230+
config['loop']['tol_dual_clust'],
231+
config['loop']['tol_move_clust'],
232+
config['loop']['tol_open_clust'],
233+
config['loop']['tol_dual_place'],
234+
config['loop']['tol_move_place'],
235+
df_clust, cluster_profiles, labels_, loop_nb,
236+
config['optimization']['solver']
237+
)
238+
it.results_file.write(
239+
'Number of changes in clustering : %d\n' % nb_clust_changes_loop
240+
)
241+
it.results_file.write(
242+
'Number of changes in placement : %d\n' % nb_place_changes_loop
243+
)
244+
nb_clust_changes += nb_clust_changes_loop
245+
nb_place_changes += nb_place_changes_loop
246+
tmax += tick
247+
tmin = tmax - (my_instance.window_duration - 1)
248+
my_instance.time += 1
249+
loop_nb += 1
250+
print('success ?')
125251
input()
252+
126253
finally:
127254
# Close down consumer to commit final offsets.
128255
reader.close_reader(use_kafka)
@@ -266,7 +393,12 @@ def preprocess(
266393

267394
# Init containers & nodes data, then Instance
268395
logging.info('Loading data and creating Instance (Instance information are in results file)\n')
269-
instance = Instance(path, config, use_kafka)
396+
if use_kafka:
397+
reader.consume_all_data(config)
398+
# reader.delete_kafka_topic(config)
399+
reader.csv_to_stream(path, config)
400+
reader.init_reader(path, use_kafka)
401+
instance = Instance(path, config)
270402
it.results_file.write('Method used : %s\n' % method)
271403
instance.print_times(config['loop']['tick'])
272404

@@ -328,8 +460,7 @@ def analysis_period(my_instance, config, method):
328460
df_indiv[it.tick_field] >= start_point) & (
329461
my_instance.df_indiv[it.tick_field] <= end_point)
330462
]
331-
print('Check Stats', start_point, end_point, working_df_indiv, my_instance.df_host)
332-
# working_df_indiv contains info of historical data 1/3 of the data
463+
333464
# First clustering part
334465
logging.info('Starting first clustering ...')
335466
print('Starting first clustering ...')
@@ -474,7 +605,7 @@ def run_period(
474605
def signal_handler_sigint(signal_number, frame):
475606
"""Handle for exiting application via signal."""
476607
print('Exit application')
477-
it.Sentry = False
608+
it.s_entry = False
478609

479610

480611
def streaming_eval(
@@ -567,7 +698,8 @@ def streaming_eval(
567698

568699
if use_kafka:
569700
use_schema = False
570-
avro_deserializer = reader.connect_schema(use_schema)
701+
avro_deserializer = reader.connect_schema(
702+
use_schema, it.kafka_schema_url)
571703
# time_to_send = my_instance.df_indiv['timestamp'].iloc[-1]
572704
# history = True # consider historical data
573705
# send last historical data to kafka
@@ -585,11 +717,11 @@ def streaming_eval(
585717
# it.time_at.append(x)
586718
# it.memory_usage.append(mem_before)
587719
# it.total_mem_use.append(tot_mem_after)
588-
it.Sentry = True
720+
it.s_entry = True
589721
# print('df_indiv1: ',my_instance.df_indiv)
590722
print('Ready for new data...')
591723
try:
592-
while it.Sentry:
724+
while it.s_entry:
593725

594726
loop_time = time.time()
595727
it.kafka_consumer.subscribe([it.kafka_topics['docker_topic']])
@@ -806,7 +938,7 @@ def streaming_eval(
806938

807939
# if tmax >= my_instance.time:
808940

809-
# it.Sentry = True # change to False to end loop according to mock data
941+
# it.s_entry = True # change to False to end loop according to mock data
810942
# else:
811943
# loop_nb += 1
812944
my_instance.time += 1

src/hots/model.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
"""
2-
Define the optimization models we have, with its objective, constraints,
3-
variables, and build it from the ``Instance``. Provide all optimization models
4-
related methods.
2+
Define the optimization models.
3+
4+
Describing its objective, constraints, variables, and build it from the ``Instance``.
5+
Provide all optimization models related methods.
56
The optimization model description is based on Pyomo.
67
"""
78

src/hots/node.py

+18-10
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
"""
2-
Provide actions specific to nodes (plot nodes data, build dictionnary
3-
for node IDs, compute different statistic measures ...)
4-
"""
1+
"""Provide actions specific to nodes (plot nodes data, compute stat measures ...)."""
52

63
import math
74

@@ -78,8 +75,7 @@ def plot_all_data_all_nodes_end(df_host, total_time):
7875

7976

8077
def plot_total_usage(df_host, title='Total conso on all nodes'):
81-
"""Plot the global resources consumption and return the global
82-
maximum usage for each metric.
78+
"""Plot the global resources consumption and return the global maximum usage for each metric.
8379
8480
:param df_host: _description_
8581
:type df_host: pd.DataFrame
@@ -171,8 +167,7 @@ def get_list_var(df_host, total_time):
171167

172168

173169
def get_variance_consumption(df_host):
174-
"""Compute the variance and standard deviation consumption for each metric
175-
in each node and globally.
170+
"""Compute the variance and std dev consumption for each metric in each node and globally.
176171
177172
:param df_host: _description_
178173
:type df_host: pd.DataFrame
@@ -255,8 +250,7 @@ def get_list_vmr(df_host, total_time):
255250

256251

257252
def get_nodes_variance(df_host, total_time, part):
258-
"""Compute the Variance for each metric in each node and return the results
259-
in two numpy arrays.
253+
"""Compute the Variance for each metric in each node.
260254
261255
:param df_host: _description_
262256
:type df_host: pd.DataFrame
@@ -402,3 +396,17 @@ def check_capacities(df_host, df_host_meta):
402396
host_overload.append(host)
403397

404398
return host_overload
399+
400+
401+
def reassign_node(c_info):
402+
"""Reassign node in containers df."""
403+
# c_info = value['containers']
404+
new_df_container = pd.DataFrame(c_info)
405+
new_df_container = new_df_container.astype({
406+
it.indiv_field: str,
407+
it.host_field: str,
408+
it.tick_field: int})
409+
new_df_container.sort_values(it.tick_field, inplace=True)
410+
new_df_container.set_index([it.tick_field, it.indiv_field], inplace=True, drop=False)
411+
new_df_container.sort_index(inplace=True)
412+
return new_df_container

src/hots/placement.py

+1-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
"""
2-
Provide placement heuristics and all placement-related methods (check capacity, change assignment
3-
...).
4-
"""
1+
"""Provide placement heuristics and all placement-related methods."""
52

63
import math
74
import random

0 commit comments

Comments
 (0)