class EL_DELEGATING_CONSUMER_THREAD

(source code)

Client examples: COMPILED_CLASSES

Description

Consumer thread that delegates consumption to multiple consumer threads

note
	description: "Consumer thread that delegates consumption to multiple consumer threads"

	author: "Finnian Reilly"
	copyright: "Copyright (c) 2001-2022 Finnian Reilly"
	contact: "finnian at eiffel hyphen loop dot com"

	license: "MIT license (See: en.wikipedia.org/wiki/MIT_License)"
	date: "2022-11-15 19:56:06 GMT (Tuesday 15th November 2022)"
	revision: "7"

class
	EL_DELEGATING_CONSUMER_THREAD [P, CONSUMER_TYPE -> EL_MANY_TO_ONE_CONSUMER_THREAD [P] create make end]

inherit
	EL_CONSUMER_THREAD [P]
		rename
			make_default as make,
			consume_product as delegate_consumption_of_next_product,
			is_waiting as is_waiting_for_new_queue_item
		redefine
			make, product_queue, on_stopping, stop
		end

	EL_SUSPENDABLE
		rename
			make_default as make
		undefine
			is_equal, copy
		redefine
			make
		end

	EL_EVENT_LISTENER
		rename
			notify as continue_if_waiting_for_consumer
		undefine
			is_equal, copy
		end

	EL_SHARED_THREAD_MANAGER

create
	make

feature {NONE} -- Initialization

	make
		do
			Precursor {EL_SUSPENDABLE}
			Precursor {EL_CONSUMER_THREAD}
		end

feature -- Basic operations

	stop
			--
		local
			state_previous: INTEGER
		do
			state_previous := state
			set_state (State_stopping)
			if is_suspended then
				resume
				previous_call_is_thread_signal
-- THREAD SIGNAL
			elseif state_previous = State_waiting then
				prompt
				previous_call_is_thread_signal
-- THREAD SIGNAL
			end
			across product_queue.all_consumers as consumer loop
				consumer.item.stop
			end
		end

feature {NONE} -- Implementation

	continue_if_waiting_for_consumer
			--
		do
			if is_suspended then
				resume
				previous_call_is_thread_signal
-- THREAD SIGNAL
			end
		end

	new_consumer_delegate: CONSUMER_TYPE
			--
		do
			create Result.make (Current, product_queue.available_consumers)
		end

	delegate_consumption_of_next_product
			--
		local
			consumer_delegate: CONSUMER_TYPE
		do
			if not product_queue.available_consumers.is_empty then

			elseif not product_queue.all_consumers.full then
				consumer_delegate := new_consumer_delegate
				consumer_delegate.set_product_queue (product_queue)
				product_queue.all_consumers.extend (consumer_delegate)
				consumer_delegate.launch
				product_queue.available_consumers.put (consumer_delegate)

			else

				suspend
				Previous_call_is_blocking_thread
-- THREAD WAITING
				if not is_stopping then
					set_state (State_consuming)
				end
			end
			check
				consumer_now_available: not product_queue.available_consumers.is_empty
			end

			if is_consuming then
				consumer_delegate := product_queue.available_consumers.removed_item
				consumer_delegate.set_product (product)
				consumer_delegate.prompt
				Previous_call_is_thread_signal
-- THREAD SIGNAL
			end
		end

	on_stopping
			--
		local
			all_consumers_stopped: BOOLEAN
		do
			from until all_consumers_stopped loop
				if product_queue.available_consumers.count = product_queue.all_consumers.count then
					all_consumers_stopped := true
				else
					suspend
					Previous_call_is_blocking_thread
-- THREAD WAITING
					set_state (State_stopping)
				end
			end
		end

feature {NONE} -- Internal attributes

	product_queue: EL_ONE_TO_MANY_THREAD_PRODUCT_QUEUE [P, CONSUMER_TYPE]

end