123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382# 1 "src/owl/working/owl_parallel.ml"(*
* OWL - OCaml Scientific and Engineering Computing
* Copyright (c) 2016-2020 Liang Wang <liang.wang@cl.cam.ac.uk>
*)(** Parallel & distributed computing: engine interface *)(* Experimental module, do not use now *)moduletypeMapre_Engine=sigvalmap:('a->'b)->string->stringvalmap_partition:('alist->'blist)->string->stringvalunion:string->string->stringvalreduce:('a->'a->'a)->string->'aoptionvalcollect:string->'alistvalworkers:unit->stringlistvalmyself:unit->stringvalload:string->stringvalsave:string->string->intendmoduletypeNdarray=sigtypearrtypeeltvalshape:arr->intarrayvalempty:intarray->arrvalcreate:intarray->elt->arrvalzeros:intarray->arrvalones:intarray->arrvaluniform:?a:float->?b:float->intarray->arrvalnumel:arr->intvalget:arr->intarray->eltvalset:arr->intarray->elt->unitvalfill:arr->elt->unitvalconcatenate:?axis:int->arrarray->arrvalreshape:arr->intarray->arrvalmap:(elt->elt)->arr->arrvalmap2:(elt->elt->elt)->arr->arr->arrvalsin:arr->arrvalcos:arr->arrvaladd:arr->arr->arrvalsub:arr->arr->arrvalmul:arr->arr->arrvaldiv:arr->arr->arrvalsum':arr->eltendmoduleMake_Distributed(M:Ndarray)(E:Mapre_Engine)=structtypedistr_arr={mutableid:string;mutableshape:intarray;mutablec_start:intarray;mutablec_len:intarray;mutableworkers:stringarray}letmake_distr_arridshapec_startc_lenworkers={id;shape;c_start;c_len;workers}letshapex=x.shapeletnum_dimsx=Array.lengthx.shapeletnumelx=Array.fold_left(funab->a*b)1x.shapeletdivide_to_chunksshapen=lettotal_sz=Array.fold_left(funab->a*b)1shapeinletchunk_sz=total_sz/ninmatchchunk_sz=0with|true->[|0,total_sz|]|false->Array.initn(funi->letc_start=i*chunk_szinletc_len=matchi=n-1with|true->total_sz-c_start|false->chunk_szinc_start,c_len)(* make a distributed version of [create_fun d], the elements will be
distributed among the working nodes.
[create_fun] receives three parameters: shape, starting pos (1d), and
length of the chunk (1d).
*)letdistributed_create_basiccreate_fund=letworkers=E.workers()inletchunks=divide_to_chunksd(List.lengthworkers)inletc_start=Array.mapfstchunksinletc_len=Array.mapsndchunksinletid=E.map(fun_->letme=E.myself()inletpos=Owl_utils.list_searchmeworkersinme,create_fundc_start.(pos)c_len.(pos))""inmake_distr_arriddc_startc_len(Array.of_listworkers)letdistributed_createcreate_fund=letf__len=create_fun[|len|]indistributed_create_basicfd(* init function [f] receives 1d index *)letinitdf=letcreate_fun_c_startc_len=letx=M.empty[|c_len|]infori=0toc_len-1doletj=c_start+iinM.setx[|i|](fj)done;xindistributed_create_basiccreate_fundletcreateda=letcreate_fund=M.createdaindistributed_createcreate_fundletzerosd=letcreate_fund=M.zerosdindistributed_createcreate_fundletonesd=letcreate_fund=M.onesdindistributed_createcreate_fundletsequential?_a?_step_d=Noneletuniform?a?bd=letcreate_fund=M.uniform?a?bdindistributed_createcreate_fundletgaussian_d=None(* given 1d index, calculate its owner *)letcalc_index_owneri_1dc_startc_len=leti=ref0intryforj=0toArray.lengthc_start-1doi:=j;leta=c_start.(j)inletb=c_start.(j)+c_len.(j)inifa<=i_1d&&i_1d<bthenfailwith"found"done;!iwith|_exn->!iletgetxi_nd=letstride=Owl_utils.calc_stridex.shapeinleti_1d=Owl_utils.index_nd_1di_ndstrideinassert(numelx>i_1d);letpos=calc_index_owneri_1dx.c_startx.c_leninletowner_id=x.workers.(pos)in(* the offset in the owner's chunk *)letj_1d=i_1d-x.c_start.(pos)inlety_id=E.map_partition(funl->matchE.myself()=owner_idwith|true->letarr=List.nthl0|>sndin[M.getarr[|j_1d|]]|false->[])x.idinletl=E.collecty_id|>List.filter(funl->List.lengthl>0)inList.(nth(nthl0)0)letsetxi_nda=letstride=Owl_utils.calc_stridex.shapeinleti_1d=Owl_utils.index_nd_1di_ndstrideinassert(numelx>i_1d);letpos=calc_index_owneri_1dx.c_startx.c_leninletowner_id=x.workers.(pos)in(* the offset in the owner's chunk *)letj_1d=i_1d-x.c_start.(pos)inlety_id=E.map_partition(funl->matchE.myself()=owner_idwith|true->letarr=List.nthl0|>sndin[M.setarr[|j_1d|]a]|false->[])x.idinE.collecty_id|>ignoreletmap_chunkfx=lety_id=E.map(fun(node_id,arr)->node_id,farr)x.idinletshape=Array.copyx.shapeinletc_start=Array.copyx.c_startinletc_len=Array.copyx.c_leninletworkers=Array.copyx.workersinmake_distr_arry_idshapec_startc_lenworkersletmapfx=map_chunk(M.mapf)xletmap2_chunkfxy=assert(x.shape=y.shape);letz_id=E.unionx.idy.idinletz_id=E.map_partition(funl->letx_node_id,x_arr=List.nthl0inlet_y_node_id,y_arr=List.nthl1in[x_node_id,fx_arry_arr])z_idinletshape=Array.copyx.shapeinletc_start=Array.copyx.c_startinletc_len=Array.copyx.c_leninletworkers=Array.copyx.workersinmake_distr_arrz_idshapec_startc_lenworkersletmap2fxy=map2_chunk(M.map2f)xyletfoldfxa=lety_id=E.map(fun(_node_id,arr)->letb=refM.(getarr[|0|])infori=1toM.numelarr-1dob:=f!bM.(getarr[|i|])done;!b)x.idinE.collecty_id|>List.fold_left(funbc->fb(List.nthc0))aletfill_xa=map_chunk(funy->M.fillya)|>ignore(* of_ndarray and to_ndarray convert between distributed ndarray and local
ndarray. They are equivalent to [distribute] and [collect] in some other
distributed data processing frameworks. *)letof_ndarray_x=Noneletto_ndarrayx=letl=E.collectx.id|>List.map(funl'->List.nthl'0|>snd)|>Array.of_listinlety=M.concatenate~axis:0linM.reshapeyx.shapeletsinx=map_chunkM.sinxletcosx=map_chunkM.cosx(* TODO: need to implement add_elt to support complex number *)letsumx=lety=map_chunkM.sum'xinletl=E.collecty.id|>List.map(funl'->List.nthl'0|>snd)inleta=ref(List.nthl0)infori=1toList.lengthl-1doa:=!a+.List.nthlidone;!aletmin_x=Noneletmax_x=Noneletaddxy=map2_chunkM.addxyletsubxy=map2_chunkM.subxyletmulxy=map2_chunkM.mulxyletdivxy=map2_chunkM.divxy(* TODO: distributed garbage collection is not implemented yet. *)endmoduleMake_Shared(M:Ndarray)(E:Mapre_Engine)=structendmoduletypeNdarray_Any=sigtype'aarrvalshape:'aarr->intarrayvalnumel:'aarr->intvalcreate:intarray->'a->'aarrendmoduleMake_Distributed_Any(M:Ndarray_Any)(E:Mapre_Engine)=structtypedistr_arr={mutableid:string;mutableshape:intarray;mutablec_start:intarray;mutablec_len:intarray;mutableworkers:stringarray}letmake_distr_arridshapec_startc_lenworkers={id;shape;c_start;c_len;workers}letdivide_to_chunksshapen=lettotal_sz=Array.fold_left(funab->a*b)1shapeinletchunk_sz=total_sz/ninmatchchunk_sz=0with|true->[|0,total_sz|]|false->Array.initn(funi->letc_start=i*chunk_szinletc_len=matchi=n-1with|true->total_sz-c_start|false->chunk_szinc_start,c_len)letdistributed_createcreate_fund=letworkers=E.workers()inletchunks=divide_to_chunksd(List.lengthworkers)inletc_start=Array.mapfstchunksinletc_len=Array.mapsndchunksinletid=E.map(fun_->letme=E.myself()inletpos=Owl_utils.list_searchmeworkersinme,create_fun[|c_len.(pos)|])""inmake_distr_arriddc_startc_len(Array.of_listworkers)end