Class: Tins::Limited
Overview
Tins::Limited provides a thread pool implementation that limits the number of concurrent threads running simultaneously.
This class implements a producer-consumer pattern where you can submit tasks that will be executed by a fixed number of worker threads, preventing resource exhaustion from too many concurrent operations.
Instance Attribute Summary collapse
-
#maximum ⇒ Integer
readonly
The maximum number of worker threads that can run concurrently.
Instance Method Summary collapse
-
#create_executor ⇒ Thread
private
Create and start the executor thread that manages the worker pool.
-
#done? ⇒ Boolean
private
Check if all tasks and threads have completed.
-
#execute {|Thread| ... } ⇒ Object
Submit a task to be executed by the thread pool.
-
#initialize(maximum, name: nil) ⇒ Limited
constructor
Create a Limited instance that runs at most maximum threads simultaneously.
-
#process {|Limited| ... } ⇒ void
Start processing tasks with the configured thread pool.
-
#stop ⇒ void
Stop processing new tasks and wait for existing tasks to complete.
-
#wait ⇒ void
private
Wait for all threads in the thread group to complete.
Constructor Details
#initialize(maximum, name: nil) ⇒ Limited
Create a Limited instance that runs at most maximum threads simultaneously.
32 33 34 35 36 37 38 39 40 |
# File 'lib/tins/limited.rb', line 32 def initialize(maximum, name: nil) @maximum = Integer(maximum) raise ArgumentError, "maximum < 1" if @maximum < 1 @mutex = Mutex.new @continue = ConditionVariable.new @name = name @count = 0 @tg = ThreadGroup.new end |
Instance Attribute Details
#maximum ⇒ Integer (readonly)
The maximum number of worker threads that can run concurrently.
45 46 47 |
# File 'lib/tins/limited.rb', line 45 def maximum @maximum end |
Instance Method Details
#create_executor ⇒ Thread (private)
Create and start the executor thread that manages the worker pool.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/tins/limited.rb', line 104 def create_executor Thread.new do @mutex.synchronize do loop do if @count < @maximum task = @tasks.pop @count += 1 Thread.new do @tg.add Thread.current task.(Thread.current) ensure @count -= 1 @continue.signal end else @continue.wait(@mutex) end end end end end |
#done? ⇒ Boolean (private)
Check if all tasks and threads have completed.
90 91 92 |
# File 'lib/tins/limited.rb', line 90 def done? @tasks.empty? && @tg.list.empty? end |
#execute {|Thread| ... } ⇒ Object
Submit a task to be executed by the thread pool.
51 52 53 54 |
# File 'lib/tins/limited.rb', line 51 def execute(&block) @tasks or raise ArgumentError, "start processing first" @tasks << block end |
#process {|Limited| ... } ⇒ void
This method returns an undefined value.
Start processing tasks with the configured thread pool.
This method blocks until all tasks are completed and the processing is stopped. The provided block is called repeatedly to submit tasks via execute().
64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/tins/limited.rb', line 64 def process @tasks = Queue.new @executor = create_executor @executor.name = @name if @name catch :stop do loop do yield self end ensure wait until done? @executor.kill end end |
#stop ⇒ void
This method returns an undefined value.
Stop processing new tasks and wait for existing tasks to complete.
81 82 83 |
# File 'lib/tins/limited.rb', line 81 def stop throw :stop end |
#wait ⇒ void (private)
This method returns an undefined value.
Wait for all threads in the thread group to complete.
97 98 99 |
# File 'lib/tins/limited.rb', line 97 def wait @tg.list.each(&:join) end |