class EL_WORK_DISTRIBUTER
Client examples: DISTRIBUTED_INTEGRATION_COMMAND
Object to distribute work of applying routine-agents over a fixed number of CPU processors (threads)
EL_WORK_DISTRIBUTER* [G, R -> ROUTINE] EL_PROCEDURE_DISTRIBUTER [G] EL_LOGGED_PROCEDURE_DISTRIBUTER [G] EL_FUNCTION_DISTRIBUTER [G] EL_DISTRIBUTED_PROCEDURE_CALLBACK EIFFEL_CLASS_PARSER EIFFEL_CLASS_UPDATE_CHECKER EL_LOGGED_FUNCTION_DISTRIBUTER [G]
USAGE:
1. Declare an instance of a descendant EL_PROCEDURE_DISTRIBUTER or EL_FUNCTION_DISTRIBUTER.
2. Repeatedly call wait_apply with the routines you want to execute in parallel.
distributer.wait_apply (agent my_routine)
3. Call the collect routine at any time with a list to receive routines that have already been applied (executed)
4. Call the do_final routine to wait for any remaining routines to finish executing and then wipe out all the threads.
5. Collect any remaining results with a call to collect
Alternatively steps 3 and 5 can be replaced with a call to do_with_completed.
note
description: "[
Object to distribute work of applying routine-agents over a fixed number of CPU processors (threads)
]"
descendants: "See end of class"
instructions: "See end of class"
author: "Finnian Reilly"
copyright: "Copyright (c) 2001-2022 Finnian Reilly"
contact: "finnian at eiffel hyphen loop dot com"
license: "MIT license (See: en.wikipedia.org/wiki/MIT_License)"
date: "2024-11-10 17:18:23 GMT (Sunday 10th November 2024)"
revision: "15"
deferred class
EL_WORK_DISTRIBUTER [G, R -> ROUTINE]
inherit
EL_SINGLE_THREAD_ACCESS
EL_MODULE_EXECUTION_ENVIRONMENT
EL_MODULE_SYSTEM
EL_LAZY_ATTRIBUTE
rename
new_item as new_collection_list,
cached_item as actual_collection_list
end
feature {NONE} -- Initialization
make (maximum_cpu_percentage: INTEGER)
-- make with maximum percentage of available CPU processors to use
-- 0 % indicate a single threaded implementation of `wait_apply'
require
valid_percentage: 0 <= maximum_cpu_percentage and maximum_cpu_percentage <= 100
do
make_threads (System.scaled_processor_count (maximum_cpu_percentage))
end
make_threads (maximum_thread_count: INTEGER)
do
make_default
create available.make (maximum_thread_count)
create thread_available.make
create pool.make (maximum_thread_count)
create applied.make (20)
create final_applied.make (0)
create thread_attributes.make
end
feature -- Access
launched_count: INTEGER
-- number of threads launched
do
Result := pool.count
end
feature -- Status query
is_finalized: BOOLEAN
-- `True' if `do_final' has been called
feature -- Contract Support
valid_routine (routine: R): BOOLEAN
deferred
end
feature -- Status change
set_max_priority
-- set thread priority to maximum
do
thread_attributes.set_priority (thread_attributes.max_priority)
end
set_normal_priority
-- set thread priority to maximum
do
thread_attributes.set_priority (thread_attributes.default_priority)
end
feature -- Basic operations
collect (completed_list: LIST [G])
-- collect the list of completed function results of type G from `applied' function list
do
if is_finalized then
move (final_applied, completed_list)
is_finalized := False
else
restrict_access
move (applied, completed_list)
end_restriction
end
end
do_final
-- wait until all threads are available before stopping and joining all threads.
-- Wipeout the thread pool and make the applied routines available in `final_applied'
do
restrict_access
from until available.count = pool.count loop
wait_until_signaled (thread_available)
end
applied.do_all (agent final_applied.extend)
end_restriction
pool.do_all (agent {like pool.item}.wait_to_stop)
is_finalized := True
pool.wipe_out
available.wipe_out
end
do_with_completed (action: PROCEDURE [G])
do
if attached collection_list as list then
collect (list)
list.do_all (action); list.wipe_out
end
end
wait_apply (routine: R)
-- SYNCHRONOUS execution if `threads.capacity' = 0
-- call apply on `routine' and add it to `applied' list
-- ASYNCHRONOUS execution if `pool.capacity' >= 1
-- assign `routine' to an available thread for execution, waiting if necessary for one
-- to become available. If there is no suspended thread available and the `threads' pool is not yet full,
-- then add a new thread and launch it.
require
valid_routine: valid_routine (routine)
routine_has_no_open_arguments: routine.open_count = 0
not_finalized: not is_finalized
local
thread: like pool.item; index: INTEGER
do
if pool.capacity = 0 then
-- SYNCHRONOUS execution
routine.apply
applied.extend (routine)
else
restrict_access
if not available.is_empty then
index := available.item
available.remove
elseif pool.full then
wait_until_signaled (thread_available)
index := available.item
available.remove
end
end_restriction
if index = 0 then
-- launch a new worker thread
create thread.make (Current, routine, pool.count + 1)
thread.launch_with_attributes (thread_attributes)
pool.extend (thread)
else
thread := pool [index]
thread.set_routine (routine)
thread.resume
end
end
end
feature {EL_WORK_DISTRIBUTION_THREAD} -- Event handling
on_applied (thread: like pool.item)
do
restrict_access
if attached {R} thread.routine as r then
applied.extend (r)
end
available.put (thread.index)
end_restriction
thread_available.signal
end
feature {NONE} -- Implementation
collection_list: like new_collection_list
do
Result := lazy_item
end
move (routines: like applied; completed_list: LIST [G])
do
from routines.start until routines.after loop
completed_list.extend (new_completed (routines.item))
routines.remove
end
end
new_collection_list: ARRAYED_LIST [G]
do
create Result.make (10)
end
new_completed (routine: R): G
deferred
end
feature {NONE} -- Thread shared attributes
applied: ARRAYED_LIST [R]
-- list of routines that have been applied since last call to `fill'
available: ARRAYED_STACK [INTEGER]
-- indices of available suspended threads
thread_available: CONDITION_VARIABLE
-- `true' if at least one thread is in a suspended state
feature {NONE} -- Internal attributes
final_applied: like applied
-- contains applied routines after a call to `do_final'
thread_attributes: THREAD_ATTRIBUTES
pool: ARRAYED_LIST [EL_WORK_DISTRIBUTION_THREAD];
-- pool of worker threads
note
descendants: "[
EL_WORK_DISTRIBUTER* [G, R -> ${ROUTINE}]
${EL_PROCEDURE_DISTRIBUTER [G]}
${EL_LOGGED_PROCEDURE_DISTRIBUTER [G]}
${EL_FUNCTION_DISTRIBUTER [G]}
${EL_DISTRIBUTED_PROCEDURE_CALLBACK}
${EIFFEL_CLASS_PARSER}
${EIFFEL_CLASS_UPDATE_CHECKER}
${EL_LOGGED_FUNCTION_DISTRIBUTER [G]}
]"
instructions: "[
**USAGE:**
**1.** Declare an instance of a descendant ${EL_PROCEDURE_DISTRIBUTER} or ${EL_FUNCTION_DISTRIBUTER}.
**2.** Repeatedly call `wait_apply' with the routines you want to execute in parallel.
distributer.wait_apply (agent my_routine)
**3.** Call the `collect' routine at any time with a list to receive routines that have
already been applied (executed)
**4.** Call the `do_final' routine to wait for any remaining routines to finish executing and
then wipe out all the threads.
**5.** Collect any remaining results with a call to `collect'
Alternatively steps 3 and 5 can be replaced with a call to `do_with_completed'.
]"
end