Bonjour,
J'ai un code utilisant xarray pour extraire des donn�es depuis une source netCDF (ECMWF ERA5).
De mani�re simplifi�e, il y a quatre dimensions (x, y, z et t) et trois variables (r, h et g).
Toujours en simplifiant un peu, je dois extraire toutes les valeurs de r, h et g pour toutes les dimensions z et t pour chaque couple x/y.
J'ai un code s�rie qui marche, dont une repr�sentation simplifi�e est :
Je l'ai �crit de cette mani�re (cr�er des listes pour chaque param�tre puis les passer � la fonction via map) dans l'objectif de parall�liser l'ex�cution en utilisant concurrent.futures.ThreadPoolExecutor ou concurrent.futures.ProcessPoolExecutor en rempla�ant l'appel map de cette mani�re :
Code : S�lectionner tout - Visualiser dans une fen�tre � part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 import xarray as xr def extract_data(data,z,x,y,t): out_str = '' for t_val in t: for z_val in z: out_str += data.sel(x,y,t_val,z_val).data return(out_str) def data_get_write(dir,r,h,g,x,y,z,t): r_str = extract_data(r,x,y,z,t) with open(dir+'r_file', 'w') as f_r: f_r.write(r_str) h_str = extract_data(h,x,y,z,t) with open(dir+'h_file', 'w') as f_h: f_h.write(h_str) g_str = extract_data(g,x,y,z,t) with open(dir+'g_file', 'w') as f_g: f_g.write(g_str) return(<operation>) if __name__ == '__main__': ds=xr.open_dataset('data.nc') r=ds['r'] h=ds['h'] g=ds['g'] z=ds['z'] t=ds['t'] dir_list=[] x_list=[] y_list=[] for val_x in x: for val_y in y: dir=(<operation>) dir_list.append(dir) x_list.append(x) y_list.append(y) list_len=len(dir_list) r_list=[r]*list_len h_list=[h]*list_len g_list=[g]*list_len z_list=[z]*list_len t_list=[t]*list_len results = map(data_get_write, r_list, h_list, g_list, x_list, y_list, z_list, t_list)
Avec ThreadPoolExecutor le probl�me c'est que la version parall�lis�e est moins rapide que la version s�rie...
Code : S�lectionner tout - Visualiser dans une fen�tre � part
1
2
3
4 max_workers = min(32, int(arguments["-n"]) + 4) with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: results = executor.map(data_get_write, r_list, h_list, g_list, x_list, y_list, z_list, t_list)
Sur mon jeu de donn�es de test (netcdf de 600Mo, dimensions x et y avec une vingtaine de valeurs et t � 6 valeurs) et mon serveur de test, la version s�rie prend environ 2'40", la version parall�le 6'30". J'ai fait varier le nombre de c�urs allou�s de 1 � 20, il n'y a aucun impact sur le temps d'ex�cution (ou du moins juste � la marge, plus ou moins quelques secondes, moins de 10%). Il n'y a pas plus de variabilit� entre les runs en faisant changer le nombre de c�urs allou�s qu'en relan�ant plusieurs fois de suite le m�me run.
�a s'explique, si je ne m'abuse, par le verrou global qui emp�che les threads concurrents d'acc�der aux donn�es en lecture de mani�re r�ellement concurrente puisqu'elles sont pass�es par r�f�rence et pas par valeur. Et �a semble �tre confirm� en basculant sur ProcessPoolExecutor m�me si la baisse de vitesse d'ex�cution est surprenante.
Avec ProcessPoolExecutor on constate un gain, �a passe � 1'25" mais � nouveau, quel que soit le nombre de c�urs, il n'y pas de modification significative du temps d'ex�cution. On pourrait penser qu'il s'agit d'un bottleneck sur le filesystem mais il n'y a pas non plus de modification en fonction du syst�me de stockage utilis�, que ce soit le disque local au serveur ou un montage NFS.
Il doit donc y avoir quelque chose de fondamental, soit dans mon choix de biblioth�que de parall�lisation ou dans mon code m�me, qui l'emp�che de scaler avec la surface d'ex�cution. C'est peut-�tre tr�s simple mais je suis compl�tement novice, autant en lecture / extraction de donn�es depuis du netCDF qu'en parall�lisation de code, donc il ne serait pas surprenant que ce soit un cas de pebkac. Toute aide / suggestion de piste pour am�liorer mon code est la bienvenue, merci !
Partager