1 ;;; Copyright (C) 2020 Ahmet Artu Yildirim
3 ;;; orca is free software: you can redistribute it and/or modify
4 ;;; it under the terms of the GNU Lesser General Public License as
5 ;;; published by the Free Software Foundation, either version 3 of
6 ;;; the License, or (at your option) any later version.
8 ;;; orca is distributed in the hope that it will be useful,
9 ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
10 ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 ;;; GNU Lesser General Public License for more details.
13 ;;; You should have received a copy of the GNU Lesser General Public License
14 ;;; along with orca. If not, see <https://www.gnu.org/licenses/>.
17 #:use-module (system foreign)
18 #:use-module (rnrs bytevectors)
19 #:use-module (ice-9 q)
20 #:use-module (ice-9 format)
21 #:use-module (srfi srfi-9)
22 #:use-module (srfi srfi-1)
23 #:use-module (srfi srfi-11)
24 #:use-module (srfi srfi-13)
25 #:use-module (srfi srfi-4 gnu)
26 #:use-module (ice-9 eval-string)
27 #:use-module (ice-9 streams)
28 #:use-module (orca config)
29 #:use-module (orca internal))
31 ;;------------------------------------------------------------------------------
33 ;;; Internal data structures
35 ;;------------------------------------------------------------------------------
37 (define DEFAULT_TAG 0)
38 (define MASTER_P_ID 0)
40 (define MESSAGE_TAG 1)
42 (define stream-table (make-hash-table))
44 ;;------------------------------------------------------------------------------
46 ;;; Internal Messaging Functions
48 ;;------------------------------------------------------------------------------
50 (define (%finalize-and-exit)
54 (export %finalize-and-exit)
56 ;;------------------------------------------------------------------------------
58 (define* (%accept-rpc-call source #:key (tag DEFAULT_TAG))
59 (let ((message (mpi-recv-string source tag)))
60 (mpi-send-string source (eval-message message) tag)))
62 (export %accept-rpc-call)
64 ;;------------------------------------------------------------------------------
68 ;;------------------------------------------------------------------------------
70 (define (any-process-completed? s-result)
73 ((car s-result) (any-process-completed? (cdr s-result)))
76 ;;------------------------------------------------------------------------------
78 (define (all-process-completed? s-result)
82 (else (all-process-completed? (cdr s-result)))))
84 ;;------------------------------------------------------------------------------
86 (define (initialize-mpi)
87 (unless (mpi-initialized)
90 ;;------------------------------------------------------------------------------
92 (define (quote-params params)
95 ((null? param) '(quote ()))
98 (not (eq? 'list (car param))))
99 (append '(quote) (list param)))
100 (else param))) params))
102 (define (quote-params-for-exp exp)
103 `(,(car exp) ,@(quote-params (cdr exp))))
105 (define (quote-params-for-multiple-exp exps)
107 (quote-params-for-exp exp))
110 ;;------------------------------------------------------------------------------
112 (define (broadcast-rpc-message datum)
115 (mpi-isend-string p-id (exp->string datum) MESSAGE_TAG))
116 (iota (1- (%process-size)) 1)))
118 ;;------------------------------------------------------------------------------
120 (define (rpc-barrier)
123 ;;------------------------------------------------------------------------------
125 (define* (make-rpc-call dest s-exp #:key (tag DEFAULT_TAG))
126 (let ((message (exp->string s-exp)))
127 (mpi-send-string dest message tag)
128 (string->exp (mpi-recv-string dest tag))))
130 ;;------------------------------------------------------------------------------
132 (define* (make-rpc-async-call dest s-exp #:key (tag DEFAULT_TAG))
133 (let ((message (exp->string s-exp)))
134 (mpi-send-string dest message tag)
135 (mpi-irecv-string dest tag)))
137 ;;------------------------------------------------------------------------------
139 (define (wait-rpc-async request)
140 (let ((message (mpi-wait request)))
141 (string->exp message)))
143 ;;------------------------------------------------------------------------------
145 (define (set-streams! ns streams)
146 (let ((s-lst (hash-ref stream-table ns)))
149 (hash-set! stream-table ns streams))))
151 (define (has-empty-stream? streams)
152 (fold (lambda (current prev)
155 (stream-null? current))) #f streams))
157 (define (collect-streams ns streams)
158 (let ((result (map (lambda (s) (stream-car s)) streams)))
159 (hash-set! stream-table ns (map (lambda (s) (stream-cdr s)) streams))
163 (define (%set-and-collect-streams ns streams)
164 (let ((reg-stream (set-streams! ns streams)))
165 (if (has-empty-stream? reg-stream)
167 (collect-streams ns reg-stream))))
169 (export %set-and-collect-streams)
171 (define (%process-id)
172 "Return the id of the calling process."
175 (define (%process-size)
176 "Return the size of the process group."
179 ;;------------------------------------------------------------------------------
183 ;;------------------------------------------------------------------------------
185 (define (rpc-worker-process-id)
186 "Return the id of the worker process."
189 (export rpc-worker-process-id)
191 ;;------------------------------------------------------------------------------
193 (define (rpc-worker-process-size)
194 "Return the number of the worker processes."
197 (export rpc-worker-process-size)
199 ;;------------------------------------------------------------------------------
201 (define (rpc-is-master-process)
202 "Return #t if the process is master, #f otherwise"
203 (= MASTER_P_ID (%process-id)))
205 (export rpc-is-master-process)
207 ;;------------------------------------------------------------------------------
209 (define (rpc-make datum)
210 (broadcast-rpc-message `(%accept-rpc-call ,MASTER_P_ID))
212 (let ((my-message (exp->string datum)))
213 (for-each (lambda (request) (mpi-wait request))
216 (mpi-isend-string p-id my-message DATA_TAG))
217 (iota (1- (%process-size)) 1)))
218 (cons (string->exp (eval-message my-message))
220 (string->exp (mpi-recv-string p-id DATA_TAG)))
221 (iota (1- (%process-size)) 1)))))
225 ;;------------------------------------------------------------------------------
227 (define (rpc-scatter lst-datum)
228 (set! lst-datum (quote-params-for-multiple-exp lst-datum))
230 (broadcast-rpc-message `(%accept-rpc-call ,MASTER_P_ID))
232 (let lp ((cdr-datum (cdr lst-datum)) (p-id 1))
233 (unless (nil? cdr-datum)
234 (mpi-isend-string p-id (exp->string (car cdr-datum)) DATA_TAG)
235 (lp (cdr cdr-datum) (1+ p-id))))
236 (cons (string->exp (eval-message (exp->string (car lst-datum))))
238 (string->exp (mpi-recv-string p-id DATA_TAG)))
239 (iota (1- (%process-size)) 1))))
243 ;;------------------------------------------------------------------------------
245 (define-syntax rpc-apply-bcast
247 ((rpc-apply-bcast proc exp ...)
248 (rpc-make `(proc ,@(quote-params (list exp ...)))))))
250 (export rpc-apply-bcast)
252 ;;------------------------------------------------------------------------------
254 (define-syntax rpc-apply-scatter
256 ((rpc-apply-scatter proc exp ...)
258 (map (lambda (params) `(proc ,@params)) (zip exp ...))))))
260 (export rpc-apply-scatter)
262 ;;------------------------------------------------------------------------------
264 (define-syntax rpc-stream-map
266 ((rpc-stream-map proc stream1 ...)
269 (let* ((ns (symbol->string (gensym "orca")))
270 (datum `(%set-and-collect-streams ,ns (list stream1 ...)))
274 (set! s-result (rpc-make datum))
275 (if (any-process-completed? s-result)
280 (cons (lambda* (#:optional (ret #f))
283 (k (reverse result))))
284 (apply zip s-result))) result))))
285 (reverse result)))))))
287 (export rpc-stream-map)
289 ;;------------------------------------------------------------------------------
291 (define-syntax rpc-stream-for-each
293 ((rpc-stream-for-each proc stream1 ...)
296 (let* ((ns (symbol->string (gensym "orca")))
297 (datum `(%set-and-collect-streams ,ns (list stream1 ...)))
300 (set! s-result (rpc-make datum))
301 (if (any-process-completed? s-result)
303 (apply proc (cons k (apply zip s-result)))))
306 (export rpc-stream-for-each)
308 ;;------------------------------------------------------------------------------
310 (define-syntax rpc-stream-fold
312 ((rpc-stream-fold proc init stream1 ...)
315 (let* ((ns (symbol->string (gensym "orca")))
316 (datum `(%set-and-collect-streams ,ns (list stream1 ...)))
321 (set! s-result (rpc-make datum))
322 (if (any-process-completed? s-result)
327 (cons (lambda* (#:optional (ret #f))
332 (apply zip s-result)))))
333 (set! prev result))))
336 (export rpc-stream-fold)
338 ;;------------------------------------------------------------------------------
340 (define (rpc-finalize)
341 (broadcast-rpc-message '(%finalize-and-exit))
344 (export rpc-finalize)
346 ;;------------------------------------------------------------------------------
349 (unless (rpc-is-master-process)
351 (let ((message (mpi-recv-string MASTER_P_ID MESSAGE_TAG)))
352 (eval (string->exp message) (interaction-environment))))))
356 ;;------------------------------------------------------------------------------